Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * txid.c
3 : *
4 : * Export internal transaction IDs to user level.
5 : *
6 : * Note that only top-level transaction IDs are ever converted to TXID.
7 : * This is important because TXIDs frequently persist beyond the global
8 : * xmin horizon, or may even be shipped to other machines, so we cannot
9 : * rely on being able to correlate subtransaction IDs with their parents
10 : * via functions such as SubTransGetTopmostTransaction().
11 : *
12 : *
13 : * Copyright (c) 2003-2017, PostgreSQL Global Development Group
14 : * Author: Jan Wieck, Afilias USA INC.
15 : * 64-bit txids: Marko Kreen, Skype Technologies
16 : *
17 : * src/backend/utils/adt/txid.c
18 : *
19 : *-------------------------------------------------------------------------
20 : */
21 :
22 : #include "postgres.h"
23 :
24 : #include "access/clog.h"
25 : #include "access/transam.h"
26 : #include "access/xact.h"
27 : #include "access/xlog.h"
28 : #include "funcapi.h"
29 : #include "miscadmin.h"
30 : #include "libpq/pqformat.h"
31 : #include "postmaster/postmaster.h"
32 : #include "storage/lwlock.h"
33 : #include "utils/builtins.h"
34 : #include "utils/memutils.h"
35 : #include "utils/snapmgr.h"
36 :
37 :
38 : /* txid will be signed int8 in database, so must limit to 63 bits */
39 : #define MAX_TXID ((uint64) PG_INT64_MAX)
40 :
41 : /* Use unsigned variant internally */
42 : typedef uint64 txid;
43 :
44 : /* sprintf format code for uint64 */
45 : #define TXID_FMT UINT64_FORMAT
46 :
47 : /*
48 : * If defined, use bsearch() function for searching for txids in snapshots
49 : * that have more than the specified number of values.
50 : */
51 : #define USE_BSEARCH_IF_NXIP_GREATER 30
52 :
53 :
54 : /*
55 : * Snapshot containing 8byte txids.
56 : */
57 : typedef struct
58 : {
59 : /*
60 : * 4-byte length hdr, should not be touched directly.
61 : *
62 : * Explicit embedding is ok as we want always correct alignment anyway.
63 : */
64 : int32 __varsz;
65 :
66 : uint32 nxip; /* number of txids in xip array */
67 : txid xmin;
68 : txid xmax;
69 : /* in-progress txids, xmin <= xip[i] < xmax: */
70 : txid xip[FLEXIBLE_ARRAY_MEMBER];
71 : } TxidSnapshot;
72 :
73 : #define TXID_SNAPSHOT_SIZE(nxip) \
74 : (offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip))
75 : #define TXID_SNAPSHOT_MAX_NXIP \
76 : ((MaxAllocSize - offsetof(TxidSnapshot, xip)) / sizeof(txid))
77 :
78 : /*
79 : * Epoch values from xact.c
80 : */
81 : typedef struct
82 : {
83 : TransactionId last_xid;
84 : uint32 epoch;
85 : } TxidEpoch;
86 :
87 :
88 : /*
89 : * Fetch epoch data from xact.c.
90 : */
91 : static void
92 370 : load_xid_epoch(TxidEpoch *state)
93 : {
94 370 : GetNextXidAndEpoch(&state->last_xid, &state->epoch);
95 370 : }
96 :
97 : /*
98 : * Helper to get a TransactionId from a 64-bit xid with wraparound detection.
99 : *
100 : * It is an ERROR if the xid is in the future. Otherwise, returns true if
101 : * the transaction is still new enough that we can determine whether it
102 : * committed and false otherwise. If *extracted_xid is not NULL, it is set
103 : * to the low 32 bits of the transaction ID (i.e. the actual XID, without the
104 : * epoch).
105 : *
106 : * The caller must hold CLogTruncationLock since it's dealing with arbitrary
107 : * XIDs, and must continue to hold it until it's done with any clog lookups
108 : * relating to those XIDs.
109 : */
110 : static bool
111 7 : TransactionIdInRecentPast(uint64 xid_with_epoch, TransactionId *extracted_xid)
112 : {
113 7 : uint32 xid_epoch = (uint32) (xid_with_epoch >> 32);
114 7 : TransactionId xid = (TransactionId) xid_with_epoch;
115 : uint32 now_epoch;
116 : TransactionId now_epoch_last_xid;
117 :
118 7 : GetNextXidAndEpoch(&now_epoch_last_xid, &now_epoch);
119 :
120 7 : if (extracted_xid != NULL)
121 7 : *extracted_xid = xid;
122 :
123 7 : if (!TransactionIdIsValid(xid))
124 0 : return false;
125 :
126 : /* For non-normal transaction IDs, we can ignore the epoch. */
127 7 : if (!TransactionIdIsNormal(xid))
128 2 : return true;
129 :
130 : /* If the transaction ID is in the future, throw an error. */
131 5 : if (xid_epoch > now_epoch
132 5 : || (xid_epoch == now_epoch && xid > now_epoch_last_xid))
133 1 : ereport(ERROR,
134 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
135 : errmsg("transaction ID " UINT64_FORMAT " is in the future",
136 : xid_with_epoch)));
137 :
138 : /*
139 : * ShmemVariableCache->oldestClogXid is protected by CLogTruncationLock,
140 : * but we don't acquire that lock here. Instead, we require the caller to
141 : * acquire it, because the caller is presumably going to look up the
142 : * returned XID. If we took and released the lock within this function, a
143 : * CLOG truncation could occur before the caller finished with the XID.
144 : */
145 4 : Assert(LWLockHeldByMe(CLogTruncationLock));
146 :
147 : /*
148 : * If the transaction ID has wrapped around, it's definitely too old to
149 : * determine the commit status. Otherwise, we can compare it to
150 : * ShmemVariableCache->oldestClogXid to determine whether the relevant
151 : * CLOG entry is guaranteed to still exist.
152 : */
153 4 : if (xid_epoch + 1 < now_epoch
154 4 : || (xid_epoch + 1 == now_epoch && xid < now_epoch_last_xid)
155 4 : || TransactionIdPrecedes(xid, ShmemVariableCache->oldestClogXid))
156 1 : return false;
157 :
158 3 : return true;
159 : }
160 :
161 : /*
162 : * do a TransactionId -> txid conversion for an XID near the given epoch
163 : */
164 : static txid
165 372 : convert_xid(TransactionId xid, const TxidEpoch *state)
166 : {
167 : uint64 epoch;
168 :
169 : /* return special xid's as-is */
170 372 : if (!TransactionIdIsNormal(xid))
171 0 : return (txid) xid;
172 :
173 : /* xid can be on either side when near wrap-around */
174 372 : epoch = (uint64) state->epoch;
175 372 : if (xid > state->last_xid &&
176 0 : TransactionIdPrecedes(xid, state->last_xid))
177 0 : epoch--;
178 737 : else if (xid < state->last_xid &&
179 365 : TransactionIdFollows(xid, state->last_xid))
180 0 : epoch++;
181 :
182 372 : return (epoch << 32) | xid;
183 : }
184 :
185 : /*
186 : * txid comparator for qsort/bsearch
187 : */
188 : static int
189 224 : cmp_txid(const void *aa, const void *bb)
190 : {
191 224 : txid a = *(const txid *) aa;
192 224 : txid b = *(const txid *) bb;
193 :
194 224 : if (a < b)
195 54 : return -1;
196 170 : if (a > b)
197 139 : return 1;
198 31 : return 0;
199 : }
200 :
201 : /*
202 : * Sort a snapshot's txids, so we can use bsearch() later. Also remove
203 : * any duplicates.
204 : *
205 : * For consistency of on-disk representation, we always sort even if bsearch
206 : * will not be used.
207 : */
208 : static void
209 2 : sort_snapshot(TxidSnapshot *snap)
210 : {
211 2 : txid last = 0;
212 : int nxip,
213 : idx1,
214 : idx2;
215 :
216 2 : if (snap->nxip > 1)
217 : {
218 0 : qsort(snap->xip, snap->nxip, sizeof(txid), cmp_txid);
219 :
220 : /* remove duplicates */
221 0 : nxip = snap->nxip;
222 0 : idx1 = idx2 = 0;
223 0 : while (idx1 < nxip)
224 : {
225 0 : if (snap->xip[idx1] != last)
226 0 : last = snap->xip[idx2++] = snap->xip[idx1];
227 : else
228 0 : snap->nxip--;
229 0 : idx1++;
230 : }
231 : }
232 2 : }
233 :
234 : /*
235 : * check txid visibility.
236 : */
237 : static bool
238 85 : is_visible_txid(txid value, const TxidSnapshot *snap)
239 : {
240 85 : if (value < snap->xmin)
241 11 : return true;
242 74 : else if (value >= snap->xmax)
243 14 : return false;
244 : #ifdef USE_BSEARCH_IF_NXIP_GREATER
245 60 : else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER)
246 : {
247 : void *res;
248 :
249 50 : res = bsearch(&value, snap->xip, snap->nxip, sizeof(txid), cmp_txid);
250 : /* if found, transaction is still in progress */
251 50 : return (res) ? false : true;
252 : }
253 : #endif
254 : else
255 : {
256 : uint32 i;
257 :
258 30 : for (i = 0; i < snap->nxip; i++)
259 : {
260 24 : if (value == snap->xip[i])
261 4 : return false;
262 : }
263 6 : return true;
264 : }
265 : }
266 :
267 : /*
268 : * helper functions to use StringInfo for TxidSnapshot creation.
269 : */
270 :
271 : static StringInfo
272 13 : buf_init(txid xmin, txid xmax)
273 : {
274 : TxidSnapshot snap;
275 : StringInfo buf;
276 :
277 13 : snap.xmin = xmin;
278 13 : snap.xmax = xmax;
279 13 : snap.nxip = 0;
280 :
281 13 : buf = makeStringInfo();
282 13 : appendBinaryStringInfo(buf, (char *) &snap, TXID_SNAPSHOT_SIZE(0));
283 13 : return buf;
284 : }
285 :
286 : static void
287 48 : buf_add_txid(StringInfo buf, txid xid)
288 : {
289 48 : TxidSnapshot *snap = (TxidSnapshot *) buf->data;
290 :
291 : /* do this before possible realloc */
292 48 : snap->nxip++;
293 :
294 48 : appendBinaryStringInfo(buf, (char *) &xid, sizeof(xid));
295 48 : }
296 :
297 : static TxidSnapshot *
298 11 : buf_finalize(StringInfo buf)
299 : {
300 11 : TxidSnapshot *snap = (TxidSnapshot *) buf->data;
301 :
302 11 : SET_VARSIZE(snap, buf->len);
303 :
304 : /* buf is not needed anymore */
305 11 : buf->data = NULL;
306 11 : pfree(buf);
307 :
308 11 : return snap;
309 : }
310 :
311 : /*
312 : * simple number parser.
313 : *
314 : * We return 0 on error, which is invalid value for txid.
315 : */
316 : static txid
317 83 : str2txid(const char *s, const char **endp)
318 : {
319 83 : txid val = 0;
320 83 : txid cutoff = MAX_TXID / 10;
321 83 : txid cutlim = MAX_TXID % 10;
322 :
323 497 : for (; *s; s++)
324 : {
325 : unsigned d;
326 :
327 486 : if (*s < '0' || *s > '9')
328 : break;
329 415 : d = *s - '0';
330 :
331 : /*
332 : * check for overflow
333 : */
334 415 : if (val > cutoff || (val == cutoff && d > cutlim))
335 : {
336 1 : val = 0;
337 1 : break;
338 : }
339 :
340 414 : val = val * 10 + d;
341 : }
342 83 : if (endp)
343 83 : *endp = s;
344 83 : return val;
345 : }
346 :
347 : /*
348 : * parse snapshot from cstring
349 : */
350 : static TxidSnapshot *
351 16 : parse_snapshot(const char *str)
352 : {
353 : txid xmin;
354 : txid xmax;
355 16 : txid last_val = 0,
356 : val;
357 16 : const char *str_start = str;
358 : const char *endp;
359 : StringInfo buf;
360 :
361 16 : xmin = str2txid(str, &endp);
362 16 : if (*endp != ':')
363 0 : goto bad_format;
364 16 : str = endp + 1;
365 :
366 16 : xmax = str2txid(str, &endp);
367 16 : if (*endp != ':')
368 1 : goto bad_format;
369 15 : str = endp + 1;
370 :
371 : /* it should look sane */
372 15 : if (xmin == 0 || xmax == 0 || xmin > xmax)
373 : goto bad_format;
374 :
375 : /* allocate buffer */
376 13 : buf = buf_init(xmin, xmax);
377 :
378 : /* loop over values */
379 13 : while (*str != '\0')
380 : {
381 : /* read next value */
382 51 : val = str2txid(str, &endp);
383 51 : str = endp;
384 :
385 : /* require the input to be in order */
386 51 : if (val < xmin || val >= xmax || val < last_val)
387 : goto bad_format;
388 :
389 : /* skip duplicates */
390 49 : if (val != last_val)
391 48 : buf_add_txid(buf, val);
392 49 : last_val = val;
393 :
394 49 : if (*str == ',')
395 40 : str++;
396 9 : else if (*str != '\0')
397 0 : goto bad_format;
398 : }
399 :
400 11 : return buf_finalize(buf);
401 :
402 : bad_format:
403 5 : ereport(ERROR,
404 : (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
405 : errmsg("invalid input syntax for type %s: \"%s\"",
406 : "txid_snapshot", str_start)));
407 : return NULL; /* keep compiler quiet */
408 : }
409 :
410 : /*
411 : * Public functions.
412 : *
413 : * txid_current() and txid_current_snapshot() are the only ones that
414 : * communicate with core xid machinery. All the others work on data
415 : * returned by them.
416 : */
417 :
418 : /*
419 : * txid_current() returns int8
420 : *
421 : * Return the current toplevel transaction ID as TXID
422 : * If the current transaction does not have one, one is assigned.
423 : *
424 : * This value has the epoch as the high 32 bits and the 32-bit xid
425 : * as the low 32 bits.
426 : */
427 : Datum
428 367 : txid_current(PG_FUNCTION_ARGS)
429 : {
430 : txid val;
431 : TxidEpoch state;
432 :
433 : /*
434 : * Must prevent during recovery because if an xid is not assigned we try
435 : * to assign one, which would fail. Programs already rely on this function
436 : * to always return a valid current xid, so we should not change this to
437 : * return NULL or similar invalid xid.
438 : */
439 367 : PreventCommandDuringRecovery("txid_current()");
440 :
441 367 : load_xid_epoch(&state);
442 :
443 367 : val = convert_xid(GetTopTransactionId(), &state);
444 :
445 367 : PG_RETURN_INT64(val);
446 : }
447 :
448 : /*
449 : * Same as txid_current() but doesn't assign a new xid if there isn't one
450 : * yet.
451 : */
452 : Datum
453 2 : txid_current_if_assigned(PG_FUNCTION_ARGS)
454 : {
455 : txid val;
456 : TxidEpoch state;
457 2 : TransactionId topxid = GetTopTransactionIdIfAny();
458 :
459 2 : if (topxid == InvalidTransactionId)
460 1 : PG_RETURN_NULL();
461 :
462 1 : load_xid_epoch(&state);
463 :
464 1 : val = convert_xid(topxid, &state);
465 :
466 1 : PG_RETURN_INT64(val);
467 : }
468 :
469 : /*
470 : * txid_current_snapshot() returns txid_snapshot
471 : *
472 : * Return current snapshot in TXID format
473 : *
474 : * Note that only top-transaction XIDs are included in the snapshot.
475 : */
476 : Datum
477 2 : txid_current_snapshot(PG_FUNCTION_ARGS)
478 : {
479 : TxidSnapshot *snap;
480 : uint32 nxip,
481 : i;
482 : TxidEpoch state;
483 : Snapshot cur;
484 :
485 2 : cur = GetActiveSnapshot();
486 2 : if (cur == NULL)
487 0 : elog(ERROR, "no active snapshot set");
488 :
489 2 : load_xid_epoch(&state);
490 :
491 : /*
492 : * Compile-time limits on the procarray (MAX_BACKENDS processes plus
493 : * MAX_BACKENDS prepared transactions) guarantee nxip won't be too large.
494 : */
495 : StaticAssertStmt(MAX_BACKENDS * 2 <= TXID_SNAPSHOT_MAX_NXIP,
496 : "possible overflow in txid_current_snapshot()");
497 :
498 : /* allocate */
499 2 : nxip = cur->xcnt;
500 2 : snap = palloc(TXID_SNAPSHOT_SIZE(nxip));
501 :
502 : /* fill */
503 2 : snap->xmin = convert_xid(cur->xmin, &state);
504 2 : snap->xmax = convert_xid(cur->xmax, &state);
505 2 : snap->nxip = nxip;
506 2 : for (i = 0; i < nxip; i++)
507 0 : snap->xip[i] = convert_xid(cur->xip[i], &state);
508 :
509 : /*
510 : * We want them guaranteed to be in ascending order. This also removes
511 : * any duplicate xids. Normally, an XID can only be assigned to one
512 : * backend, but when preparing a transaction for two-phase commit, there
513 : * is a transient state when both the original backend and the dummy
514 : * PGPROC entry reserved for the prepared transaction hold the same XID.
515 : */
516 2 : sort_snapshot(snap);
517 :
518 : /* set size after sorting, because it may have removed duplicate xips */
519 2 : SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(snap->nxip));
520 :
521 2 : PG_RETURN_POINTER(snap);
522 : }
523 :
524 : /*
525 : * txid_snapshot_in(cstring) returns txid_snapshot
526 : *
527 : * input function for type txid_snapshot
528 : */
529 : Datum
530 16 : txid_snapshot_in(PG_FUNCTION_ARGS)
531 : {
532 16 : char *str = PG_GETARG_CSTRING(0);
533 : TxidSnapshot *snap;
534 :
535 16 : snap = parse_snapshot(str);
536 :
537 11 : PG_RETURN_POINTER(snap);
538 : }
539 :
540 : /*
541 : * txid_snapshot_out(txid_snapshot) returns cstring
542 : *
543 : * output function for type txid_snapshot
544 : */
545 : Datum
546 9 : txid_snapshot_out(PG_FUNCTION_ARGS)
547 : {
548 9 : TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
549 : StringInfoData str;
550 : uint32 i;
551 :
552 9 : initStringInfo(&str);
553 :
554 9 : appendStringInfo(&str, TXID_FMT ":", snap->xmin);
555 9 : appendStringInfo(&str, TXID_FMT ":", snap->xmax);
556 :
557 52 : for (i = 0; i < snap->nxip; i++)
558 : {
559 43 : if (i > 0)
560 36 : appendStringInfoChar(&str, ',');
561 43 : appendStringInfo(&str, TXID_FMT, snap->xip[i]);
562 : }
563 :
564 9 : PG_RETURN_CSTRING(str.data);
565 : }
566 :
567 : /*
568 : * txid_snapshot_recv(internal) returns txid_snapshot
569 : *
570 : * binary input function for type txid_snapshot
571 : *
572 : * format: int4 nxip, int8 xmin, int8 xmax, int8 xip
573 : */
574 : Datum
575 0 : txid_snapshot_recv(PG_FUNCTION_ARGS)
576 : {
577 0 : StringInfo buf = (StringInfo) PG_GETARG_POINTER(0);
578 : TxidSnapshot *snap;
579 0 : txid last = 0;
580 : int nxip;
581 : int i;
582 : txid xmin,
583 : xmax;
584 :
585 : /* load and validate nxip */
586 0 : nxip = pq_getmsgint(buf, 4);
587 0 : if (nxip < 0 || nxip > TXID_SNAPSHOT_MAX_NXIP)
588 : goto bad_format;
589 :
590 0 : xmin = pq_getmsgint64(buf);
591 0 : xmax = pq_getmsgint64(buf);
592 0 : if (xmin == 0 || xmax == 0 || xmin > xmax || xmax > MAX_TXID)
593 : goto bad_format;
594 :
595 0 : snap = palloc(TXID_SNAPSHOT_SIZE(nxip));
596 0 : snap->xmin = xmin;
597 0 : snap->xmax = xmax;
598 :
599 0 : for (i = 0; i < nxip; i++)
600 : {
601 0 : txid cur = pq_getmsgint64(buf);
602 :
603 0 : if (cur < last || cur < xmin || cur >= xmax)
604 : goto bad_format;
605 :
606 : /* skip duplicate xips */
607 0 : if (cur == last)
608 : {
609 0 : i--;
610 0 : nxip--;
611 0 : continue;
612 : }
613 :
614 0 : snap->xip[i] = cur;
615 0 : last = cur;
616 : }
617 0 : snap->nxip = nxip;
618 0 : SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(nxip));
619 0 : PG_RETURN_POINTER(snap);
620 :
621 : bad_format:
622 0 : ereport(ERROR,
623 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
624 : errmsg("invalid external txid_snapshot data")));
625 : PG_RETURN_POINTER(NULL); /* keep compiler quiet */
626 : }
627 :
628 : /*
629 : * txid_snapshot_send(txid_snapshot) returns bytea
630 : *
631 : * binary output function for type txid_snapshot
632 : *
633 : * format: int4 nxip, int8 xmin, int8 xmax, int8 xip
634 : */
635 : Datum
636 0 : txid_snapshot_send(PG_FUNCTION_ARGS)
637 : {
638 0 : TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
639 : StringInfoData buf;
640 : uint32 i;
641 :
642 0 : pq_begintypsend(&buf);
643 0 : pq_sendint(&buf, snap->nxip, 4);
644 0 : pq_sendint64(&buf, snap->xmin);
645 0 : pq_sendint64(&buf, snap->xmax);
646 0 : for (i = 0; i < snap->nxip; i++)
647 0 : pq_sendint64(&buf, snap->xip[i]);
648 0 : PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
649 : }
650 :
651 : /*
652 : * txid_visible_in_snapshot(int8, txid_snapshot) returns bool
653 : *
654 : * is txid visible in snapshot ?
655 : */
656 : Datum
657 85 : txid_visible_in_snapshot(PG_FUNCTION_ARGS)
658 : {
659 85 : txid value = PG_GETARG_INT64(0);
660 85 : TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(1);
661 :
662 85 : PG_RETURN_BOOL(is_visible_txid(value, snap));
663 : }
664 :
665 : /*
666 : * txid_snapshot_xmin(txid_snapshot) returns int8
667 : *
668 : * return snapshot's xmin
669 : */
670 : Datum
671 5 : txid_snapshot_xmin(PG_FUNCTION_ARGS)
672 : {
673 5 : TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
674 :
675 5 : PG_RETURN_INT64(snap->xmin);
676 : }
677 :
678 : /*
679 : * txid_snapshot_xmax(txid_snapshot) returns int8
680 : *
681 : * return snapshot's xmax
682 : */
683 : Datum
684 4 : txid_snapshot_xmax(PG_FUNCTION_ARGS)
685 : {
686 4 : TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
687 :
688 4 : PG_RETURN_INT64(snap->xmax);
689 : }
690 :
691 : /*
692 : * txid_snapshot_xip(txid_snapshot) returns setof int8
693 : *
694 : * return in-progress TXIDs in snapshot.
695 : */
696 : Datum
697 41 : txid_snapshot_xip(PG_FUNCTION_ARGS)
698 : {
699 : FuncCallContext *fctx;
700 : TxidSnapshot *snap;
701 : txid value;
702 :
703 : /* on first call initialize snap_state and get copy of snapshot */
704 41 : if (SRF_IS_FIRSTCALL())
705 : {
706 4 : TxidSnapshot *arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
707 :
708 4 : fctx = SRF_FIRSTCALL_INIT();
709 :
710 : /* make a copy of user snapshot */
711 4 : snap = MemoryContextAlloc(fctx->multi_call_memory_ctx, VARSIZE(arg));
712 4 : memcpy(snap, arg, VARSIZE(arg));
713 :
714 4 : fctx->user_fctx = snap;
715 : }
716 :
717 : /* return values one-by-one */
718 41 : fctx = SRF_PERCALL_SETUP();
719 41 : snap = fctx->user_fctx;
720 41 : if (fctx->call_cntr < snap->nxip)
721 : {
722 37 : value = snap->xip[fctx->call_cntr];
723 37 : SRF_RETURN_NEXT(fctx, Int64GetDatum(value));
724 : }
725 : else
726 : {
727 4 : SRF_RETURN_DONE(fctx);
728 : }
729 : }
730 :
731 : /*
732 : * Report the status of a recent transaction ID, or null for wrapped,
733 : * truncated away or otherwise too old XIDs.
734 : *
735 : * The passed epoch-qualified xid is treated as a normal xid, not a
736 : * multixact id.
737 : *
738 : * If it points to a committed subxact the result is the subxact status even
739 : * though the parent xact may still be in progress or may have aborted.
740 : */
741 : Datum
742 7 : txid_status(PG_FUNCTION_ARGS)
743 : {
744 : const char *status;
745 7 : uint64 xid_with_epoch = PG_GETARG_INT64(0);
746 : TransactionId xid;
747 :
748 : /*
749 : * We must protect against concurrent truncation of clog entries to avoid
750 : * an I/O error on SLRU lookup.
751 : */
752 7 : LWLockAcquire(CLogTruncationLock, LW_SHARED);
753 7 : if (TransactionIdInRecentPast(xid_with_epoch, &xid))
754 : {
755 5 : Assert(TransactionIdIsValid(xid));
756 :
757 5 : if (TransactionIdIsCurrentTransactionId(xid))
758 1 : status = gettext_noop("in progress");
759 4 : else if (TransactionIdDidCommit(xid))
760 3 : status = gettext_noop("committed");
761 1 : else if (TransactionIdDidAbort(xid))
762 1 : status = gettext_noop("aborted");
763 : else
764 : {
765 : /*
766 : * The xact is not marked as either committed or aborted in clog.
767 : *
768 : * It could be a transaction that ended without updating clog or
769 : * writing an abort record due to a crash. We can safely assume
770 : * it's aborted if it isn't committed and is older than our
771 : * snapshot xmin.
772 : *
773 : * Otherwise it must be in-progress (or have been at the time we
774 : * checked commit/abort status).
775 : */
776 0 : if (TransactionIdPrecedes(xid, GetActiveSnapshot()->xmin))
777 0 : status = gettext_noop("aborted");
778 : else
779 0 : status = gettext_noop("in progress");
780 : }
781 : }
782 : else
783 : {
784 1 : status = NULL;
785 : }
786 6 : LWLockRelease(CLogTruncationLock);
787 :
788 6 : if (status == NULL)
789 1 : PG_RETURN_NULL();
790 : else
791 5 : PG_RETURN_TEXT_P(cstring_to_text(status));
792 : }
|