Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * snapmgr.c
4 : * PostgreSQL snapshot manager
5 : *
6 : * We keep track of snapshots in two ways: those "registered" by resowner.c,
7 : * and the "active snapshot" stack. All snapshots in either of them live in
8 : * persistent memory. When a snapshot is no longer in any of these lists
9 : * (tracked by separate refcounts on each snapshot), its memory can be freed.
10 : *
11 : * The FirstXactSnapshot, if any, is treated a bit specially: we increment its
12 : * regd_count and list it in RegisteredSnapshots, but this reference is not
13 : * tracked by a resource owner. We used to use the TopTransactionResourceOwner
14 : * to track this snapshot reference, but that introduces logical circularity
15 : * and thus makes it impossible to clean up in a sane fashion. It's better to
16 : * handle this reference as an internally-tracked registration, so that this
17 : * module is entirely lower-level than ResourceOwners.
18 : *
19 : * Likewise, any snapshots that have been exported by pg_export_snapshot
20 : * have regd_count = 1 and are listed in RegisteredSnapshots, but are not
21 : * tracked by any resource owner.
22 : *
23 : * Likewise, the CatalogSnapshot is listed in RegisteredSnapshots when it
24 : * is valid, but is not tracked by any resource owner.
25 : *
26 : * The same is true for historic snapshots used during logical decoding,
27 : * their lifetime is managed separately (as they live longer than one xact.c
28 : * transaction).
29 : *
30 : * These arrangements let us reset MyPgXact->xmin when there are no snapshots
31 : * referenced by this transaction, and advance it when the one with oldest
32 : * Xmin is no longer referenced. For simplicity however, only registered
33 : * snapshots not active snapshots participate in tracking which one is oldest;
34 : * we don't try to change MyPgXact->xmin except when the active-snapshot
35 : * stack is empty.
36 : *
37 : *
38 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
39 : * Portions Copyright (c) 1994, Regents of the University of California
40 : *
41 : * IDENTIFICATION
42 : * src/backend/utils/time/snapmgr.c
43 : *
44 : *-------------------------------------------------------------------------
45 : */
46 : #include "postgres.h"
47 :
48 : #include <sys/stat.h>
49 : #include <unistd.h>
50 :
51 : #include "access/transam.h"
52 : #include "access/xact.h"
53 : #include "access/xlog.h"
54 : #include "catalog/catalog.h"
55 : #include "lib/pairingheap.h"
56 : #include "miscadmin.h"
57 : #include "storage/predicate.h"
58 : #include "storage/proc.h"
59 : #include "storage/procarray.h"
60 : #include "storage/sinval.h"
61 : #include "storage/sinvaladt.h"
62 : #include "storage/spin.h"
63 : #include "utils/builtins.h"
64 : #include "utils/memutils.h"
65 : #include "utils/rel.h"
66 : #include "utils/resowner_private.h"
67 : #include "utils/snapmgr.h"
68 : #include "utils/syscache.h"
69 : #include "utils/tqual.h"
70 :
71 :
72 : /*
73 : * GUC parameters
74 : */
75 : int old_snapshot_threshold; /* number of minutes, -1 disables */
76 :
77 : /*
78 : * Structure for dealing with old_snapshot_threshold implementation.
79 : */
80 : typedef struct OldSnapshotControlData
81 : {
82 : /*
83 : * Variables for old snapshot handling are shared among processes and are
84 : * only allowed to move forward.
85 : */
86 : slock_t mutex_current; /* protect current_timestamp */
87 : TimestampTz current_timestamp; /* latest snapshot timestamp */
88 : slock_t mutex_latest_xmin; /* protect latest_xmin and next_map_update */
89 : TransactionId latest_xmin; /* latest snapshot xmin */
90 : TimestampTz next_map_update; /* latest snapshot valid up to */
91 : slock_t mutex_threshold; /* protect threshold fields */
92 : TimestampTz threshold_timestamp; /* earlier snapshot is old */
93 : TransactionId threshold_xid; /* earlier xid may be gone */
94 :
95 : /*
96 : * Keep one xid per minute for old snapshot error handling.
97 : *
98 : * Use a circular buffer with a head offset, a count of entries currently
99 : * used, and a timestamp corresponding to the xid at the head offset. A
100 : * count_used value of zero means that there are no times stored; a
101 : * count_used value of OLD_SNAPSHOT_TIME_MAP_ENTRIES means that the buffer
102 : * is full and the head must be advanced to add new entries. Use
103 : * timestamps aligned to minute boundaries, since that seems less
104 : * surprising than aligning based on the first usage timestamp. The
105 : * latest bucket is effectively stored within latest_xmin. The circular
106 : * buffer is updated when we get a new xmin value that doesn't fall into
107 : * the same interval.
108 : *
109 : * It is OK if the xid for a given time slot is from earlier than
110 : * calculated by adding the number of minutes corresponding to the
111 : * (possibly wrapped) distance from the head offset to the time of the
112 : * head entry, since that just results in the vacuuming of old tuples
113 : * being slightly less aggressive. It would not be OK for it to be off in
114 : * the other direction, since it might result in vacuuming tuples that are
115 : * still expected to be there.
116 : *
117 : * Use of an SLRU was considered but not chosen because it is more
118 : * heavyweight than is needed for this, and would probably not be any less
119 : * code to implement.
120 : *
121 : * Persistence is not needed.
122 : */
123 : int head_offset; /* subscript of oldest tracked time */
124 : TimestampTz head_timestamp; /* time corresponding to head xid */
125 : int count_used; /* how many slots are in use */
126 : TransactionId xid_by_minute[FLEXIBLE_ARRAY_MEMBER];
127 : } OldSnapshotControlData;
128 :
129 : static volatile OldSnapshotControlData *oldSnapshotControl;
130 :
131 :
132 : /*
133 : * CurrentSnapshot points to the only snapshot taken in transaction-snapshot
134 : * mode, and to the latest one taken in a read-committed transaction.
135 : * SecondarySnapshot is a snapshot that's always up-to-date as of the current
136 : * instant, even in transaction-snapshot mode. It should only be used for
137 : * special-purpose code (say, RI checking.) CatalogSnapshot points to an
138 : * MVCC snapshot intended to be used for catalog scans; we must invalidate it
139 : * whenever a system catalog change occurs.
140 : *
141 : * These SnapshotData structs are static to simplify memory allocation
142 : * (see the hack in GetSnapshotData to avoid repeated malloc/free).
143 : */
144 : static SnapshotData CurrentSnapshotData = {HeapTupleSatisfiesMVCC};
145 : static SnapshotData SecondarySnapshotData = {HeapTupleSatisfiesMVCC};
146 : SnapshotData CatalogSnapshotData = {HeapTupleSatisfiesMVCC};
147 :
148 : /* Pointers to valid snapshots */
149 : static Snapshot CurrentSnapshot = NULL;
150 : static Snapshot SecondarySnapshot = NULL;
151 : static Snapshot CatalogSnapshot = NULL;
152 : static Snapshot HistoricSnapshot = NULL;
153 :
154 : /*
155 : * These are updated by GetSnapshotData. We initialize them this way
156 : * for the convenience of TransactionIdIsInProgress: even in bootstrap
157 : * mode, we don't want it to say that BootstrapTransactionId is in progress.
158 : *
159 : * RecentGlobalXmin and RecentGlobalDataXmin are initialized to
160 : * InvalidTransactionId, to ensure that no one tries to use a stale
161 : * value. Readers should ensure that it has been set to something else
162 : * before using it.
163 : */
164 : TransactionId TransactionXmin = FirstNormalTransactionId;
165 : TransactionId RecentXmin = FirstNormalTransactionId;
166 : TransactionId RecentGlobalXmin = InvalidTransactionId;
167 : TransactionId RecentGlobalDataXmin = InvalidTransactionId;
168 :
169 : /* (table, ctid) => (cmin, cmax) mapping during timetravel */
170 : static HTAB *tuplecid_data = NULL;
171 :
172 : /*
173 : * Elements of the active snapshot stack.
174 : *
175 : * Each element here accounts for exactly one active_count on SnapshotData.
176 : *
177 : * NB: the code assumes that elements in this list are in non-increasing
178 : * order of as_level; also, the list must be NULL-terminated.
179 : */
180 : typedef struct ActiveSnapshotElt
181 : {
182 : Snapshot as_snap;
183 : int as_level;
184 : struct ActiveSnapshotElt *as_next;
185 : } ActiveSnapshotElt;
186 :
187 : /* Top of the stack of active snapshots */
188 : static ActiveSnapshotElt *ActiveSnapshot = NULL;
189 :
190 : /* Bottom of the stack of active snapshots */
191 : static ActiveSnapshotElt *OldestActiveSnapshot = NULL;
192 :
193 : /*
194 : * Currently registered Snapshots. Ordered in a heap by xmin, so that we can
195 : * quickly find the one with lowest xmin, to advance our MyPgXact->xmin.
196 : */
197 : static int xmin_cmp(const pairingheap_node *a, const pairingheap_node *b,
198 : void *arg);
199 :
200 : static pairingheap RegisteredSnapshots = {&xmin_cmp, NULL, NULL};
201 :
202 : /* first GetTransactionSnapshot call in a transaction? */
203 : bool FirstSnapshotSet = false;
204 :
205 : /*
206 : * Remember the serializable transaction snapshot, if any. We cannot trust
207 : * FirstSnapshotSet in combination with IsolationUsesXactSnapshot(), because
208 : * GUC may be reset before us, changing the value of IsolationUsesXactSnapshot.
209 : */
210 : static Snapshot FirstXactSnapshot = NULL;
211 :
212 : /* Define pathname of exported-snapshot files */
213 : #define SNAPSHOT_EXPORT_DIR "pg_snapshots"
214 :
215 : /* Structure holding info about exported snapshot. */
216 : typedef struct ExportedSnapshot
217 : {
218 : char *snapfile;
219 : Snapshot snapshot;
220 : } ExportedSnapshot;
221 :
222 : /* Current xact's exported snapshots (a list of ExportedSnapshot structs) */
223 : static List *exportedSnapshots = NIL;
224 :
225 : /* Prototypes for local functions */
226 : static TimestampTz AlignTimestampToMinuteBoundary(TimestampTz ts);
227 : static Snapshot CopySnapshot(Snapshot snapshot);
228 : static void FreeSnapshot(Snapshot snapshot);
229 : static void SnapshotResetXmin(void);
230 :
231 : /*
232 : * Snapshot fields to be serialized.
233 : *
234 : * Only these fields need to be sent to the cooperating backend; the
235 : * remaining ones can (and must) be set by the receiver upon restore.
236 : */
237 : typedef struct SerializedSnapshotData
238 : {
239 : TransactionId xmin;
240 : TransactionId xmax;
241 : uint32 xcnt;
242 : int32 subxcnt;
243 : bool suboverflowed;
244 : bool takenDuringRecovery;
245 : CommandId curcid;
246 : TimestampTz whenTaken;
247 : XLogRecPtr lsn;
248 : } SerializedSnapshotData;
249 :
250 : Size
251 10 : SnapMgrShmemSize(void)
252 : {
253 : Size size;
254 :
255 10 : size = offsetof(OldSnapshotControlData, xid_by_minute);
256 10 : if (old_snapshot_threshold > 0)
257 0 : size = add_size(size, mul_size(sizeof(TransactionId),
258 0 : OLD_SNAPSHOT_TIME_MAP_ENTRIES));
259 :
260 10 : return size;
261 : }
262 :
263 : /*
264 : * Initialize for managing old snapshot detection.
265 : */
266 : void
267 5 : SnapMgrInit(void)
268 : {
269 : bool found;
270 :
271 : /*
272 : * Create or attach to the OldSnapshotControlData structure.
273 : */
274 5 : oldSnapshotControl = (volatile OldSnapshotControlData *)
275 5 : ShmemInitStruct("OldSnapshotControlData",
276 : SnapMgrShmemSize(), &found);
277 :
278 5 : if (!found)
279 : {
280 5 : SpinLockInit(&oldSnapshotControl->mutex_current);
281 5 : oldSnapshotControl->current_timestamp = 0;
282 5 : SpinLockInit(&oldSnapshotControl->mutex_latest_xmin);
283 5 : oldSnapshotControl->latest_xmin = InvalidTransactionId;
284 5 : oldSnapshotControl->next_map_update = 0;
285 5 : SpinLockInit(&oldSnapshotControl->mutex_threshold);
286 5 : oldSnapshotControl->threshold_timestamp = 0;
287 5 : oldSnapshotControl->threshold_xid = InvalidTransactionId;
288 5 : oldSnapshotControl->head_offset = 0;
289 5 : oldSnapshotControl->head_timestamp = 0;
290 5 : oldSnapshotControl->count_used = 0;
291 : }
292 5 : }
293 :
294 : /*
295 : * GetTransactionSnapshot
296 : * Get the appropriate snapshot for a new query in a transaction.
297 : *
298 : * Note that the return value may point at static storage that will be modified
299 : * by future calls and by CommandCounterIncrement(). Callers should call
300 : * RegisterSnapshot or PushActiveSnapshot on the returned snap if it is to be
301 : * used very long.
302 : */
303 : Snapshot
304 65419 : GetTransactionSnapshot(void)
305 : {
306 : /*
307 : * Return historic snapshot if doing logical decoding. We'll never need a
308 : * non-historic transaction snapshot in this (sub-)transaction, so there's
309 : * no need to be careful to set one up for later calls to
310 : * GetTransactionSnapshot().
311 : */
312 65419 : if (HistoricSnapshotActive())
313 : {
314 0 : Assert(!FirstSnapshotSet);
315 0 : return HistoricSnapshot;
316 : }
317 :
318 : /* First call in transaction? */
319 65419 : if (!FirstSnapshotSet)
320 : {
321 : /*
322 : * Don't allow catalog snapshot to be older than xact snapshot. Must
323 : * do this first to allow the empty-heap Assert to succeed.
324 : */
325 24649 : InvalidateCatalogSnapshot();
326 :
327 24649 : Assert(pairingheap_is_empty(&RegisteredSnapshots));
328 24649 : Assert(FirstXactSnapshot == NULL);
329 :
330 24649 : if (IsInParallelMode())
331 0 : elog(ERROR,
332 : "cannot take query snapshot during a parallel operation");
333 :
334 : /*
335 : * In transaction-snapshot mode, the first snapshot must live until
336 : * end of xact regardless of what the caller does with it, so we must
337 : * make a copy of it rather than returning CurrentSnapshotData
338 : * directly. Furthermore, if we're running in serializable mode,
339 : * predicate.c needs to wrap the snapshot fetch in its own processing.
340 : */
341 24649 : if (IsolationUsesXactSnapshot())
342 : {
343 : /* First, create the snapshot in CurrentSnapshotData */
344 16 : if (IsolationIsSerializable())
345 13 : CurrentSnapshot = GetSerializableTransactionSnapshot(&CurrentSnapshotData);
346 : else
347 3 : CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
348 : /* Make a saved copy */
349 16 : CurrentSnapshot = CopySnapshot(CurrentSnapshot);
350 16 : FirstXactSnapshot = CurrentSnapshot;
351 : /* Mark it as "registered" in FirstXactSnapshot */
352 16 : FirstXactSnapshot->regd_count++;
353 16 : pairingheap_add(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
354 : }
355 : else
356 24633 : CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
357 :
358 24649 : FirstSnapshotSet = true;
359 24649 : return CurrentSnapshot;
360 : }
361 :
362 40770 : if (IsolationUsesXactSnapshot())
363 147 : return CurrentSnapshot;
364 :
365 : /* Don't allow catalog snapshot to be older than xact snapshot. */
366 40623 : InvalidateCatalogSnapshot();
367 :
368 40623 : CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
369 :
370 40623 : return CurrentSnapshot;
371 : }
372 :
373 : /*
374 : * GetLatestSnapshot
375 : * Get a snapshot that is up-to-date as of the current instant,
376 : * even if we are executing in transaction-snapshot mode.
377 : */
378 : Snapshot
379 359 : GetLatestSnapshot(void)
380 : {
381 : /*
382 : * We might be able to relax this, but nothing that could otherwise work
383 : * needs it.
384 : */
385 359 : if (IsInParallelMode())
386 0 : elog(ERROR,
387 : "cannot update SecondarySnapshot during a parallel operation");
388 :
389 : /*
390 : * So far there are no cases requiring support for GetLatestSnapshot()
391 : * during logical decoding, but it wouldn't be hard to add if required.
392 : */
393 359 : Assert(!HistoricSnapshotActive());
394 :
395 : /* If first call in transaction, go ahead and set the xact snapshot */
396 359 : if (!FirstSnapshotSet)
397 3 : return GetTransactionSnapshot();
398 :
399 356 : SecondarySnapshot = GetSnapshotData(&SecondarySnapshotData);
400 :
401 356 : return SecondarySnapshot;
402 : }
403 :
404 : /*
405 : * GetOldestSnapshot
406 : *
407 : * Get the transaction's oldest known snapshot, as judged by the LSN.
408 : * Will return NULL if there are no active or registered snapshots.
409 : */
410 : Snapshot
411 384 : GetOldestSnapshot(void)
412 : {
413 384 : Snapshot OldestRegisteredSnapshot = NULL;
414 384 : XLogRecPtr RegisteredLSN = InvalidXLogRecPtr;
415 :
416 384 : if (!pairingheap_is_empty(&RegisteredSnapshots))
417 : {
418 384 : OldestRegisteredSnapshot = pairingheap_container(SnapshotData, ph_node,
419 : pairingheap_first(&RegisteredSnapshots));
420 384 : RegisteredLSN = OldestRegisteredSnapshot->lsn;
421 : }
422 :
423 384 : if (OldestActiveSnapshot != NULL)
424 : {
425 384 : XLogRecPtr ActiveLSN = OldestActiveSnapshot->as_snap->lsn;
426 :
427 384 : if (XLogRecPtrIsInvalid(RegisteredLSN) || RegisteredLSN > ActiveLSN)
428 384 : return OldestActiveSnapshot->as_snap;
429 : }
430 :
431 0 : return OldestRegisteredSnapshot;
432 : }
433 :
434 : /*
435 : * GetCatalogSnapshot
436 : * Get a snapshot that is sufficiently up-to-date for scan of the
437 : * system catalog with the specified OID.
438 : */
439 : Snapshot
440 356387 : GetCatalogSnapshot(Oid relid)
441 : {
442 : /*
443 : * Return historic snapshot while we're doing logical decoding, so we can
444 : * see the appropriate state of the catalog.
445 : *
446 : * This is the primary reason for needing to reset the system caches after
447 : * finishing decoding.
448 : */
449 356387 : if (HistoricSnapshotActive())
450 0 : return HistoricSnapshot;
451 :
452 356387 : return GetNonHistoricCatalogSnapshot(relid);
453 : }
454 :
455 : /*
456 : * GetNonHistoricCatalogSnapshot
457 : * Get a snapshot that is sufficiently up-to-date for scan of the system
458 : * catalog with the specified OID, even while historic snapshots are set
459 : * up.
460 : */
461 : Snapshot
462 356387 : GetNonHistoricCatalogSnapshot(Oid relid)
463 : {
464 : /*
465 : * If the caller is trying to scan a relation that has no syscache, no
466 : * catcache invalidations will be sent when it is updated. For a few key
467 : * relations, snapshot invalidations are sent instead. If we're trying to
468 : * scan a relation for which neither catcache nor snapshot invalidations
469 : * are sent, we must refresh the snapshot every time.
470 : */
471 671303 : if (CatalogSnapshot &&
472 505712 : !RelationInvalidatesSnapshotsOnly(relid) &&
473 190796 : !RelationHasSysCache(relid))
474 16748 : InvalidateCatalogSnapshot();
475 :
476 356387 : if (CatalogSnapshot == NULL)
477 : {
478 : /* Get new snapshot. */
479 58219 : CatalogSnapshot = GetSnapshotData(&CatalogSnapshotData);
480 :
481 : /*
482 : * Make sure the catalog snapshot will be accounted for in decisions
483 : * about advancing PGXACT->xmin. We could apply RegisterSnapshot, but
484 : * that would result in making a physical copy, which is overkill; and
485 : * it would also create a dependency on some resource owner, which we
486 : * do not want for reasons explained at the head of this file. Instead
487 : * just shove the CatalogSnapshot into the pairing heap manually. This
488 : * has to be reversed in InvalidateCatalogSnapshot, of course.
489 : *
490 : * NB: it had better be impossible for this to throw error, since the
491 : * CatalogSnapshot pointer is already valid.
492 : */
493 58219 : pairingheap_add(&RegisteredSnapshots, &CatalogSnapshot->ph_node);
494 : }
495 :
496 356387 : return CatalogSnapshot;
497 : }
498 :
499 : /*
500 : * InvalidateCatalogSnapshot
501 : * Mark the current catalog snapshot, if any, as invalid
502 : *
503 : * We could change this API to allow the caller to provide more fine-grained
504 : * invalidation details, so that a change to relation A wouldn't prevent us
505 : * from using our cached snapshot to scan relation B, but so far there's no
506 : * evidence that the CPU cycles we spent tracking such fine details would be
507 : * well-spent.
508 : */
509 : void
510 876480 : InvalidateCatalogSnapshot(void)
511 : {
512 876480 : if (CatalogSnapshot)
513 : {
514 58219 : pairingheap_remove(&RegisteredSnapshots, &CatalogSnapshot->ph_node);
515 58219 : CatalogSnapshot = NULL;
516 58219 : SnapshotResetXmin();
517 : }
518 876480 : }
519 :
520 : /*
521 : * InvalidateCatalogSnapshotConditionally
522 : * Drop catalog snapshot if it's the only one we have
523 : *
524 : * This is called when we are about to wait for client input, so we don't
525 : * want to continue holding the catalog snapshot if it might mean that the
526 : * global xmin horizon can't advance. However, if there are other snapshots
527 : * still active or registered, the catalog snapshot isn't likely to be the
528 : * oldest one, so we might as well keep it.
529 : */
530 : void
531 27567 : InvalidateCatalogSnapshotConditionally(void)
532 : {
533 28169 : if (CatalogSnapshot &&
534 1197 : ActiveSnapshot == NULL &&
535 1190 : pairingheap_is_singular(&RegisteredSnapshots))
536 452 : InvalidateCatalogSnapshot();
537 27567 : }
538 :
539 : /*
540 : * SnapshotSetCommandId
541 : * Propagate CommandCounterIncrement into the static snapshots, if set
542 : */
543 : void
544 22090 : SnapshotSetCommandId(CommandId curcid)
545 : {
546 22090 : if (!FirstSnapshotSet)
547 23261 : return;
548 :
549 20919 : if (CurrentSnapshot)
550 20919 : CurrentSnapshot->curcid = curcid;
551 20919 : if (SecondarySnapshot)
552 563 : SecondarySnapshot->curcid = curcid;
553 : /* Should we do the same with CatalogSnapshot? */
554 : }
555 :
556 : /*
557 : * SetTransactionSnapshot
558 : * Set the transaction's snapshot from an imported MVCC snapshot.
559 : *
560 : * Note that this is very closely tied to GetTransactionSnapshot --- it
561 : * must take care of all the same considerations as the first-snapshot case
562 : * in GetTransactionSnapshot.
563 : */
564 : static void
565 115 : SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid,
566 : int sourcepid, PGPROC *sourceproc)
567 : {
568 : /* Caller should have checked this already */
569 115 : Assert(!FirstSnapshotSet);
570 :
571 : /* Better do this to ensure following Assert succeeds. */
572 115 : InvalidateCatalogSnapshot();
573 :
574 115 : Assert(pairingheap_is_empty(&RegisteredSnapshots));
575 115 : Assert(FirstXactSnapshot == NULL);
576 115 : Assert(!HistoricSnapshotActive());
577 :
578 : /*
579 : * Even though we are not going to use the snapshot it computes, we must
580 : * call GetSnapshotData, for two reasons: (1) to be sure that
581 : * CurrentSnapshotData's XID arrays have been allocated, and (2) to update
582 : * RecentXmin and RecentGlobalXmin. (We could alternatively include those
583 : * two variables in exported snapshot files, but it seems better to have
584 : * snapshot importers compute reasonably up-to-date values for them.)
585 : */
586 115 : CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
587 :
588 : /*
589 : * Now copy appropriate fields from the source snapshot.
590 : */
591 115 : CurrentSnapshot->xmin = sourcesnap->xmin;
592 115 : CurrentSnapshot->xmax = sourcesnap->xmax;
593 115 : CurrentSnapshot->xcnt = sourcesnap->xcnt;
594 115 : Assert(sourcesnap->xcnt <= GetMaxSnapshotXidCount());
595 115 : memcpy(CurrentSnapshot->xip, sourcesnap->xip,
596 115 : sourcesnap->xcnt * sizeof(TransactionId));
597 115 : CurrentSnapshot->subxcnt = sourcesnap->subxcnt;
598 115 : Assert(sourcesnap->subxcnt <= GetMaxSnapshotSubxidCount());
599 115 : memcpy(CurrentSnapshot->subxip, sourcesnap->subxip,
600 115 : sourcesnap->subxcnt * sizeof(TransactionId));
601 115 : CurrentSnapshot->suboverflowed = sourcesnap->suboverflowed;
602 115 : CurrentSnapshot->takenDuringRecovery = sourcesnap->takenDuringRecovery;
603 : /* NB: curcid should NOT be copied, it's a local matter */
604 :
605 : /*
606 : * Now we have to fix what GetSnapshotData did with MyPgXact->xmin and
607 : * TransactionXmin. There is a race condition: to make sure we are not
608 : * causing the global xmin to go backwards, we have to test that the
609 : * source transaction is still running, and that has to be done
610 : * atomically. So let procarray.c do it.
611 : *
612 : * Note: in serializable mode, predicate.c will do this a second time. It
613 : * doesn't seem worth contorting the logic here to avoid two calls,
614 : * especially since it's not clear that predicate.c *must* do this.
615 : */
616 115 : if (sourceproc != NULL)
617 : {
618 115 : if (!ProcArrayInstallRestoredXmin(CurrentSnapshot->xmin, sourceproc))
619 0 : ereport(ERROR,
620 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
621 : errmsg("could not import the requested snapshot"),
622 : errdetail("The source transaction is not running anymore.")));
623 : }
624 0 : else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcevxid))
625 0 : ereport(ERROR,
626 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
627 : errmsg("could not import the requested snapshot"),
628 : errdetail("The source process with pid %d is not running anymore.",
629 : sourcepid)));
630 :
631 : /*
632 : * In transaction-snapshot mode, the first snapshot must live until end of
633 : * xact, so we must make a copy of it. Furthermore, if we're running in
634 : * serializable mode, predicate.c needs to do its own processing.
635 : */
636 115 : if (IsolationUsesXactSnapshot())
637 : {
638 115 : if (IsolationIsSerializable())
639 0 : SetSerializableTransactionSnapshot(CurrentSnapshot, sourcevxid,
640 : sourcepid);
641 : /* Make a saved copy */
642 115 : CurrentSnapshot = CopySnapshot(CurrentSnapshot);
643 115 : FirstXactSnapshot = CurrentSnapshot;
644 : /* Mark it as "registered" in FirstXactSnapshot */
645 115 : FirstXactSnapshot->regd_count++;
646 115 : pairingheap_add(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
647 : }
648 :
649 115 : FirstSnapshotSet = true;
650 115 : }
651 :
652 : /*
653 : * CopySnapshot
654 : * Copy the given snapshot.
655 : *
656 : * The copy is palloc'd in TopTransactionContext and has initial refcounts set
657 : * to 0. The returned snapshot has the copied flag set.
658 : */
659 : static Snapshot
660 385345 : CopySnapshot(Snapshot snapshot)
661 : {
662 : Snapshot newsnap;
663 : Size subxipoff;
664 : Size size;
665 :
666 385345 : Assert(snapshot != InvalidSnapshot);
667 :
668 : /* We allocate any XID arrays needed in the same palloc block. */
669 385345 : size = subxipoff = sizeof(SnapshotData) +
670 385345 : snapshot->xcnt * sizeof(TransactionId);
671 385345 : if (snapshot->subxcnt > 0)
672 9187 : size += snapshot->subxcnt * sizeof(TransactionId);
673 :
674 385345 : newsnap = (Snapshot) MemoryContextAlloc(TopTransactionContext, size);
675 385345 : memcpy(newsnap, snapshot, sizeof(SnapshotData));
676 :
677 385345 : newsnap->regd_count = 0;
678 385345 : newsnap->active_count = 0;
679 385345 : newsnap->copied = true;
680 :
681 : /* setup XID array */
682 385345 : if (snapshot->xcnt > 0)
683 : {
684 180104 : newsnap->xip = (TransactionId *) (newsnap + 1);
685 180104 : memcpy(newsnap->xip, snapshot->xip,
686 180104 : snapshot->xcnt * sizeof(TransactionId));
687 : }
688 : else
689 205241 : newsnap->xip = NULL;
690 :
691 : /*
692 : * Setup subXID array. Don't bother to copy it if it had overflowed,
693 : * though, because it's not used anywhere in that case. Except if it's a
694 : * snapshot taken during recovery; all the top-level XIDs are in subxip as
695 : * well in that case, so we mustn't lose them.
696 : */
697 394532 : if (snapshot->subxcnt > 0 &&
698 9187 : (!snapshot->suboverflowed || snapshot->takenDuringRecovery))
699 : {
700 9187 : newsnap->subxip = (TransactionId *) ((char *) newsnap + subxipoff);
701 9187 : memcpy(newsnap->subxip, snapshot->subxip,
702 9187 : snapshot->subxcnt * sizeof(TransactionId));
703 : }
704 : else
705 376158 : newsnap->subxip = NULL;
706 :
707 385345 : return newsnap;
708 : }
709 :
710 : /*
711 : * FreeSnapshot
712 : * Free the memory associated with a snapshot.
713 : */
714 : static void
715 380085 : FreeSnapshot(Snapshot snapshot)
716 : {
717 380085 : Assert(snapshot->regd_count == 0);
718 380085 : Assert(snapshot->active_count == 0);
719 380085 : Assert(snapshot->copied);
720 :
721 380085 : pfree(snapshot);
722 380085 : }
723 :
724 : /*
725 : * PushActiveSnapshot
726 : * Set the given snapshot as the current active snapshot
727 : *
728 : * If the passed snapshot is a statically-allocated one, or it is possibly
729 : * subject to a future command counter update, create a new long-lived copy
730 : * with active refcount=1. Otherwise, only increment the refcount.
731 : */
732 : void
733 77320 : PushActiveSnapshot(Snapshot snap)
734 : {
735 : ActiveSnapshotElt *newactive;
736 :
737 77320 : Assert(snap != InvalidSnapshot);
738 :
739 77320 : newactive = MemoryContextAlloc(TopTransactionContext, sizeof(ActiveSnapshotElt));
740 :
741 : /*
742 : * Checking SecondarySnapshot is probably useless here, but it seems
743 : * better to be sure.
744 : */
745 77320 : if (snap == CurrentSnapshot || snap == SecondarySnapshot || !snap->copied)
746 59090 : newactive->as_snap = CopySnapshot(snap);
747 : else
748 18230 : newactive->as_snap = snap;
749 :
750 77320 : newactive->as_next = ActiveSnapshot;
751 77320 : newactive->as_level = GetCurrentTransactionNestLevel();
752 :
753 77320 : newactive->as_snap->active_count++;
754 :
755 77320 : ActiveSnapshot = newactive;
756 77320 : if (OldestActiveSnapshot == NULL)
757 52267 : OldestActiveSnapshot = ActiveSnapshot;
758 77320 : }
759 :
760 : /*
761 : * PushCopiedSnapshot
762 : * As above, except forcibly copy the presented snapshot.
763 : *
764 : * This should be used when the ActiveSnapshot has to be modifiable, for
765 : * example if the caller intends to call UpdateActiveSnapshotCommandId.
766 : * The new snapshot will be released when popped from the stack.
767 : */
768 : void
769 5178 : PushCopiedSnapshot(Snapshot snapshot)
770 : {
771 5178 : PushActiveSnapshot(CopySnapshot(snapshot));
772 5178 : }
773 :
774 : /*
775 : * UpdateActiveSnapshotCommandId
776 : *
777 : * Update the current CID of the active snapshot. This can only be applied
778 : * to a snapshot that is not referenced elsewhere.
779 : */
780 : void
781 4729 : UpdateActiveSnapshotCommandId(void)
782 : {
783 : CommandId save_curcid,
784 : curcid;
785 :
786 4729 : Assert(ActiveSnapshot != NULL);
787 4729 : Assert(ActiveSnapshot->as_snap->active_count == 1);
788 4729 : Assert(ActiveSnapshot->as_snap->regd_count == 0);
789 :
790 : /*
791 : * Don't allow modification of the active snapshot during parallel
792 : * operation. We share the snapshot to worker backends at the beginning
793 : * of parallel operation, so any change to the snapshot can lead to
794 : * inconsistencies. We have other defenses against
795 : * CommandCounterIncrement, but there are a few places that call this
796 : * directly, so we put an additional guard here.
797 : */
798 4729 : save_curcid = ActiveSnapshot->as_snap->curcid;
799 4729 : curcid = GetCurrentCommandId(false);
800 4729 : if (IsInParallelMode() && save_curcid != curcid)
801 0 : elog(ERROR, "cannot modify commandid in active snapshot during a parallel operation");
802 4729 : ActiveSnapshot->as_snap->curcid = curcid;
803 4729 : }
804 :
805 : /*
806 : * PopActiveSnapshot
807 : *
808 : * Remove the topmost snapshot from the active snapshot stack, decrementing the
809 : * reference count, and free it if this was the last reference.
810 : */
811 : void
812 71679 : PopActiveSnapshot(void)
813 : {
814 : ActiveSnapshotElt *newstack;
815 :
816 71679 : newstack = ActiveSnapshot->as_next;
817 :
818 71679 : Assert(ActiveSnapshot->as_snap->active_count > 0);
819 :
820 71679 : ActiveSnapshot->as_snap->active_count--;
821 :
822 142932 : if (ActiveSnapshot->as_snap->active_count == 0 &&
823 71253 : ActiveSnapshot->as_snap->regd_count == 0)
824 48362 : FreeSnapshot(ActiveSnapshot->as_snap);
825 :
826 71679 : pfree(ActiveSnapshot);
827 71679 : ActiveSnapshot = newstack;
828 71679 : if (ActiveSnapshot == NULL)
829 49187 : OldestActiveSnapshot = NULL;
830 :
831 71679 : SnapshotResetXmin();
832 71679 : }
833 :
834 : /*
835 : * GetActiveSnapshot
836 : * Return the topmost snapshot in the Active stack.
837 : */
838 : Snapshot
839 45837 : GetActiveSnapshot(void)
840 : {
841 45837 : Assert(ActiveSnapshot != NULL);
842 :
843 45837 : return ActiveSnapshot->as_snap;
844 : }
845 :
846 : /*
847 : * ActiveSnapshotSet
848 : * Return whether there is at least one snapshot in the Active stack
849 : */
850 : bool
851 44801 : ActiveSnapshotSet(void)
852 : {
853 44801 : return ActiveSnapshot != NULL;
854 : }
855 :
856 : /*
857 : * RegisterSnapshot
858 : * Register a snapshot as being in use by the current resource owner
859 : *
860 : * If InvalidSnapshot is passed, it is not registered.
861 : */
862 : Snapshot
863 419781 : RegisterSnapshot(Snapshot snapshot)
864 : {
865 419781 : if (snapshot == InvalidSnapshot)
866 49340 : return InvalidSnapshot;
867 :
868 370441 : return RegisterSnapshotOnOwner(snapshot, CurrentResourceOwner);
869 : }
870 :
871 : /*
872 : * RegisterSnapshotOnOwner
873 : * As above, but use the specified resource owner
874 : */
875 : Snapshot
876 370464 : RegisterSnapshotOnOwner(Snapshot snapshot, ResourceOwner owner)
877 : {
878 : Snapshot snap;
879 :
880 370464 : if (snapshot == InvalidSnapshot)
881 0 : return InvalidSnapshot;
882 :
883 : /* Static snapshot? Create a persistent copy */
884 370464 : snap = snapshot->copied ? snapshot : CopySnapshot(snapshot);
885 :
886 : /* and tell resowner.c about it */
887 370464 : ResourceOwnerEnlargeSnapshots(owner);
888 370464 : snap->regd_count++;
889 370464 : ResourceOwnerRememberSnapshot(owner, snap);
890 :
891 370464 : if (snap->regd_count == 1)
892 343206 : pairingheap_add(&RegisteredSnapshots, &snap->ph_node);
893 :
894 370464 : return snap;
895 : }
896 :
897 : /*
898 : * UnregisterSnapshot
899 : *
900 : * Decrement the reference count of a snapshot, remove the corresponding
901 : * reference from CurrentResourceOwner, and free the snapshot if no more
902 : * references remain.
903 : */
904 : void
905 411715 : UnregisterSnapshot(Snapshot snapshot)
906 : {
907 411715 : if (snapshot == NULL)
908 454362 : return;
909 :
910 369068 : UnregisterSnapshotFromOwner(snapshot, CurrentResourceOwner);
911 : }
912 :
913 : /*
914 : * UnregisterSnapshotFromOwner
915 : * As above, but use the specified resource owner
916 : */
917 : void
918 370483 : UnregisterSnapshotFromOwner(Snapshot snapshot, ResourceOwner owner)
919 : {
920 370483 : if (snapshot == NULL)
921 370502 : return;
922 :
923 370464 : Assert(snapshot->regd_count > 0);
924 370464 : Assert(!pairingheap_is_empty(&RegisteredSnapshots));
925 :
926 370464 : ResourceOwnerForgetSnapshot(owner, snapshot);
927 :
928 370464 : snapshot->regd_count--;
929 370464 : if (snapshot->regd_count == 0)
930 343206 : pairingheap_remove(&RegisteredSnapshots, &snapshot->ph_node);
931 :
932 370464 : if (snapshot->regd_count == 0 && snapshot->active_count == 0)
933 : {
934 331462 : FreeSnapshot(snapshot);
935 331462 : SnapshotResetXmin();
936 : }
937 : }
938 :
939 : /*
940 : * Comparison function for RegisteredSnapshots heap. Snapshots are ordered
941 : * by xmin, so that the snapshot with smallest xmin is at the top.
942 : */
943 : static int
944 336222 : xmin_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
945 : {
946 336222 : const SnapshotData *asnap = pairingheap_const_container(SnapshotData, ph_node, a);
947 336222 : const SnapshotData *bsnap = pairingheap_const_container(SnapshotData, ph_node, b);
948 :
949 336222 : if (TransactionIdPrecedes(asnap->xmin, bsnap->xmin))
950 4956 : return 1;
951 331266 : else if (TransactionIdFollows(asnap->xmin, bsnap->xmin))
952 7 : return -1;
953 : else
954 331259 : return 0;
955 : }
956 :
957 : /*
958 : * SnapshotResetXmin
959 : *
960 : * If there are no more snapshots, we can reset our PGXACT->xmin to InvalidXid.
961 : * Note we can do this without locking because we assume that storing an Xid
962 : * is atomic.
963 : *
964 : * Even if there are some remaining snapshots, we may be able to advance our
965 : * PGXACT->xmin to some degree. This typically happens when a portal is
966 : * dropped. For efficiency, we only consider recomputing PGXACT->xmin when
967 : * the active snapshot stack is empty; this allows us not to need to track
968 : * which active snapshot is oldest.
969 : *
970 : * Note: it's tempting to use GetOldestSnapshot() here so that we can include
971 : * active snapshots in the calculation. However, that compares by LSN not
972 : * xmin so it's not entirely clear that it's the same thing. Also, we'd be
973 : * critically dependent on the assumption that the bottommost active snapshot
974 : * stack entry has the oldest xmin. (Current uses of GetOldestSnapshot() are
975 : * not actually critical, but this would be.)
976 : */
977 : static void
978 464995 : SnapshotResetXmin(void)
979 : {
980 : Snapshot minSnapshot;
981 :
982 464995 : if (ActiveSnapshot != NULL)
983 364959 : return;
984 :
985 100036 : if (pairingheap_is_empty(&RegisteredSnapshots))
986 : {
987 45293 : MyPgXact->xmin = InvalidTransactionId;
988 45293 : return;
989 : }
990 :
991 54743 : minSnapshot = pairingheap_container(SnapshotData, ph_node,
992 : pairingheap_first(&RegisteredSnapshots));
993 :
994 54743 : if (TransactionIdPrecedes(MyPgXact->xmin, minSnapshot->xmin))
995 431 : MyPgXact->xmin = minSnapshot->xmin;
996 : }
997 :
998 : /*
999 : * AtSubCommit_Snapshot
1000 : */
1001 : void
1002 49 : AtSubCommit_Snapshot(int level)
1003 : {
1004 : ActiveSnapshotElt *active;
1005 :
1006 : /*
1007 : * Relabel the active snapshots set in this subtransaction as though they
1008 : * are owned by the parent subxact.
1009 : */
1010 49 : for (active = ActiveSnapshot; active != NULL; active = active->as_next)
1011 : {
1012 9 : if (active->as_level < level)
1013 9 : break;
1014 0 : active->as_level = level - 1;
1015 : }
1016 49 : }
1017 :
1018 : /*
1019 : * AtSubAbort_Snapshot
1020 : * Clean up snapshots after a subtransaction abort
1021 : */
1022 : void
1023 323 : AtSubAbort_Snapshot(int level)
1024 : {
1025 : /* Forget the active snapshots set by this subtransaction */
1026 907 : while (ActiveSnapshot && ActiveSnapshot->as_level >= level)
1027 : {
1028 : ActiveSnapshotElt *next;
1029 :
1030 261 : next = ActiveSnapshot->as_next;
1031 :
1032 : /*
1033 : * Decrement the snapshot's active count. If it's still registered or
1034 : * marked as active by an outer subtransaction, we can't free it yet.
1035 : */
1036 261 : Assert(ActiveSnapshot->as_snap->active_count >= 1);
1037 261 : ActiveSnapshot->as_snap->active_count -= 1;
1038 :
1039 522 : if (ActiveSnapshot->as_snap->active_count == 0 &&
1040 261 : ActiveSnapshot->as_snap->regd_count == 0)
1041 261 : FreeSnapshot(ActiveSnapshot->as_snap);
1042 :
1043 : /* and free the stack element */
1044 261 : pfree(ActiveSnapshot);
1045 :
1046 261 : ActiveSnapshot = next;
1047 261 : if (ActiveSnapshot == NULL)
1048 23 : OldestActiveSnapshot = NULL;
1049 : }
1050 :
1051 323 : SnapshotResetXmin();
1052 323 : }
1053 :
1054 : /*
1055 : * AtEOXact_Snapshot
1056 : * Snapshot manager's cleanup function for end of transaction
1057 : */
1058 : void
1059 26218 : AtEOXact_Snapshot(bool isCommit, bool resetXmin)
1060 : {
1061 : /*
1062 : * In transaction-snapshot mode we must release our privately-managed
1063 : * reference to the transaction snapshot. We must remove it from
1064 : * RegisteredSnapshots to keep the check below happy. But we don't bother
1065 : * to do FreeSnapshot, for two reasons: the memory will go away with
1066 : * TopTransactionContext anyway, and if someone has left the snapshot
1067 : * stacked as active, we don't want the code below to be chasing through a
1068 : * dangling pointer.
1069 : */
1070 26218 : if (FirstXactSnapshot != NULL)
1071 : {
1072 131 : Assert(FirstXactSnapshot->regd_count > 0);
1073 131 : Assert(!pairingheap_is_empty(&RegisteredSnapshots));
1074 131 : pairingheap_remove(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
1075 : }
1076 26218 : FirstXactSnapshot = NULL;
1077 :
1078 : /*
1079 : * If we exported any snapshots, clean them up.
1080 : */
1081 26218 : if (exportedSnapshots != NIL)
1082 : {
1083 : ListCell *lc;
1084 :
1085 : /*
1086 : * Get rid of the files. Unlink failure is only a WARNING because (1)
1087 : * it's too late to abort the transaction, and (2) leaving a leaked
1088 : * file around has little real consequence anyway.
1089 : *
1090 : * We also also need to remove the snapshots from RegisteredSnapshots
1091 : * to prevent a warning below.
1092 : *
1093 : * As with the FirstXactSnapshot, we don't need to free resources of
1094 : * the snapshot iself as it will go away with the memory context.
1095 : */
1096 0 : foreach(lc, exportedSnapshots)
1097 : {
1098 0 : ExportedSnapshot *esnap = (ExportedSnapshot *) lfirst(lc);
1099 :
1100 0 : if (unlink(esnap->snapfile))
1101 0 : elog(WARNING, "could not unlink file \"%s\": %m",
1102 : esnap->snapfile);
1103 :
1104 0 : pairingheap_remove(&RegisteredSnapshots,
1105 0 : &esnap->snapshot->ph_node);
1106 : }
1107 :
1108 0 : exportedSnapshots = NIL;
1109 : }
1110 :
1111 : /* Drop catalog snapshot if any */
1112 26218 : InvalidateCatalogSnapshot();
1113 :
1114 : /* On commit, complain about leftover snapshots */
1115 26218 : if (isCommit)
1116 : {
1117 : ActiveSnapshotElt *active;
1118 :
1119 22912 : if (!pairingheap_is_empty(&RegisteredSnapshots))
1120 0 : elog(WARNING, "registered snapshots seem to remain after cleanup");
1121 :
1122 : /* complain about unpopped active snapshots */
1123 22912 : for (active = ActiveSnapshot; active != NULL; active = active->as_next)
1124 0 : elog(WARNING, "snapshot %p still active", active);
1125 : }
1126 :
1127 : /*
1128 : * And reset our state. We don't need to free the memory explicitly --
1129 : * it'll go away with TopTransactionContext.
1130 : */
1131 26218 : ActiveSnapshot = NULL;
1132 26218 : OldestActiveSnapshot = NULL;
1133 26218 : pairingheap_reset(&RegisteredSnapshots);
1134 :
1135 26218 : CurrentSnapshot = NULL;
1136 26218 : SecondarySnapshot = NULL;
1137 :
1138 26218 : FirstSnapshotSet = false;
1139 :
1140 : /*
1141 : * During normal commit processing, we call ProcArrayEndTransaction() to
1142 : * reset the PgXact->xmin. That call happens prior to the call to
1143 : * AtEOXact_Snapshot(), so we need not touch xmin here at all.
1144 : */
1145 26218 : if (resetXmin)
1146 3312 : SnapshotResetXmin();
1147 :
1148 26218 : Assert(resetXmin || MyPgXact->xmin == 0);
1149 26218 : }
1150 :
1151 :
1152 : /*
1153 : * ExportSnapshot
1154 : * Export the snapshot to a file so that other backends can import it.
1155 : * Returns the token (the file name) that can be used to import this
1156 : * snapshot.
1157 : */
1158 : char *
1159 0 : ExportSnapshot(Snapshot snapshot)
1160 : {
1161 : TransactionId topXid;
1162 : TransactionId *children;
1163 : ExportedSnapshot *esnap;
1164 : int nchildren;
1165 : int addTopXid;
1166 : StringInfoData buf;
1167 : FILE *f;
1168 : int i;
1169 : MemoryContext oldcxt;
1170 : char path[MAXPGPATH];
1171 : char pathtmp[MAXPGPATH];
1172 :
1173 : /*
1174 : * It's tempting to call RequireTransactionChain here, since it's not very
1175 : * useful to export a snapshot that will disappear immediately afterwards.
1176 : * However, we haven't got enough information to do that, since we don't
1177 : * know if we're at top level or not. For example, we could be inside a
1178 : * plpgsql function that is going to fire off other transactions via
1179 : * dblink. Rather than disallow perfectly legitimate usages, don't make a
1180 : * check.
1181 : *
1182 : * Also note that we don't make any restriction on the transaction's
1183 : * isolation level; however, importers must check the level if they are
1184 : * serializable.
1185 : */
1186 :
1187 : /*
1188 : * Get our transaction ID if there is one, to include in the snapshot.
1189 : */
1190 0 : topXid = GetTopTransactionIdIfAny();
1191 :
1192 : /*
1193 : * We cannot export a snapshot from a subtransaction because there's no
1194 : * easy way for importers to verify that the same subtransaction is still
1195 : * running.
1196 : */
1197 0 : if (IsSubTransaction())
1198 0 : ereport(ERROR,
1199 : (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
1200 : errmsg("cannot export a snapshot from a subtransaction")));
1201 :
1202 : /*
1203 : * We do however allow previous committed subtransactions to exist.
1204 : * Importers of the snapshot must see them as still running, so get their
1205 : * XIDs to add them to the snapshot.
1206 : */
1207 0 : nchildren = xactGetCommittedChildren(&children);
1208 :
1209 : /*
1210 : * Generate file path for the snapshot. We start numbering of snapshots
1211 : * inside the transaction from 1.
1212 : */
1213 0 : snprintf(path, sizeof(path), SNAPSHOT_EXPORT_DIR "/%08X-%08X-%d",
1214 0 : MyProc->backendId, MyProc->lxid, list_length(exportedSnapshots) + 1);
1215 :
1216 : /*
1217 : * Copy the snapshot into TopTransactionContext, add it to the
1218 : * exportedSnapshots list, and mark it pseudo-registered. We do this to
1219 : * ensure that the snapshot's xmin is honored for the rest of the
1220 : * transaction.
1221 : */
1222 0 : snapshot = CopySnapshot(snapshot);
1223 :
1224 0 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
1225 0 : esnap = (ExportedSnapshot *) palloc(sizeof(ExportedSnapshot));
1226 0 : esnap->snapfile = pstrdup(path);
1227 0 : esnap->snapshot = snapshot;
1228 0 : exportedSnapshots = lappend(exportedSnapshots, esnap);
1229 0 : MemoryContextSwitchTo(oldcxt);
1230 :
1231 0 : snapshot->regd_count++;
1232 0 : pairingheap_add(&RegisteredSnapshots, &snapshot->ph_node);
1233 :
1234 : /*
1235 : * Fill buf with a text serialization of the snapshot, plus identification
1236 : * data about this transaction. The format expected by ImportSnapshot is
1237 : * pretty rigid: each line must be fieldname:value.
1238 : */
1239 0 : initStringInfo(&buf);
1240 :
1241 0 : appendStringInfo(&buf, "vxid:%d/%u\n", MyProc->backendId, MyProc->lxid);
1242 0 : appendStringInfo(&buf, "pid:%d\n", MyProcPid);
1243 0 : appendStringInfo(&buf, "dbid:%u\n", MyDatabaseId);
1244 0 : appendStringInfo(&buf, "iso:%d\n", XactIsoLevel);
1245 0 : appendStringInfo(&buf, "ro:%d\n", XactReadOnly);
1246 :
1247 0 : appendStringInfo(&buf, "xmin:%u\n", snapshot->xmin);
1248 0 : appendStringInfo(&buf, "xmax:%u\n", snapshot->xmax);
1249 :
1250 : /*
1251 : * We must include our own top transaction ID in the top-xid data, since
1252 : * by definition we will still be running when the importing transaction
1253 : * adopts the snapshot, but GetSnapshotData never includes our own XID in
1254 : * the snapshot. (There must, therefore, be enough room to add it.)
1255 : *
1256 : * However, it could be that our topXid is after the xmax, in which case
1257 : * we shouldn't include it because xip[] members are expected to be before
1258 : * xmax. (We need not make the same check for subxip[] members, see
1259 : * snapshot.h.)
1260 : */
1261 0 : addTopXid = (TransactionIdIsValid(topXid) &&
1262 0 : TransactionIdPrecedes(topXid, snapshot->xmax)) ? 1 : 0;
1263 0 : appendStringInfo(&buf, "xcnt:%d\n", snapshot->xcnt + addTopXid);
1264 0 : for (i = 0; i < snapshot->xcnt; i++)
1265 0 : appendStringInfo(&buf, "xip:%u\n", snapshot->xip[i]);
1266 0 : if (addTopXid)
1267 0 : appendStringInfo(&buf, "xip:%u\n", topXid);
1268 :
1269 : /*
1270 : * Similarly, we add our subcommitted child XIDs to the subxid data. Here,
1271 : * we have to cope with possible overflow.
1272 : */
1273 0 : if (snapshot->suboverflowed ||
1274 0 : snapshot->subxcnt + nchildren > GetMaxSnapshotSubxidCount())
1275 0 : appendStringInfoString(&buf, "sof:1\n");
1276 : else
1277 : {
1278 0 : appendStringInfoString(&buf, "sof:0\n");
1279 0 : appendStringInfo(&buf, "sxcnt:%d\n", snapshot->subxcnt + nchildren);
1280 0 : for (i = 0; i < snapshot->subxcnt; i++)
1281 0 : appendStringInfo(&buf, "sxp:%u\n", snapshot->subxip[i]);
1282 0 : for (i = 0; i < nchildren; i++)
1283 0 : appendStringInfo(&buf, "sxp:%u\n", children[i]);
1284 : }
1285 0 : appendStringInfo(&buf, "rec:%u\n", snapshot->takenDuringRecovery);
1286 :
1287 : /*
1288 : * Now write the text representation into a file. We first write to a
1289 : * ".tmp" filename, and rename to final filename if no error. This
1290 : * ensures that no other backend can read an incomplete file
1291 : * (ImportSnapshot won't allow it because of its valid-characters check).
1292 : */
1293 0 : snprintf(pathtmp, sizeof(pathtmp), "%s.tmp", path);
1294 0 : if (!(f = AllocateFile(pathtmp, PG_BINARY_W)))
1295 0 : ereport(ERROR,
1296 : (errcode_for_file_access(),
1297 : errmsg("could not create file \"%s\": %m", pathtmp)));
1298 :
1299 0 : if (fwrite(buf.data, buf.len, 1, f) != 1)
1300 0 : ereport(ERROR,
1301 : (errcode_for_file_access(),
1302 : errmsg("could not write to file \"%s\": %m", pathtmp)));
1303 :
1304 : /* no fsync() since file need not survive a system crash */
1305 :
1306 0 : if (FreeFile(f))
1307 0 : ereport(ERROR,
1308 : (errcode_for_file_access(),
1309 : errmsg("could not write to file \"%s\": %m", pathtmp)));
1310 :
1311 : /*
1312 : * Now that we have written everything into a .tmp file, rename the file
1313 : * to remove the .tmp suffix.
1314 : */
1315 0 : if (rename(pathtmp, path) < 0)
1316 0 : ereport(ERROR,
1317 : (errcode_for_file_access(),
1318 : errmsg("could not rename file \"%s\" to \"%s\": %m",
1319 : pathtmp, path)));
1320 :
1321 : /*
1322 : * The basename of the file is what we return from pg_export_snapshot().
1323 : * It's already in path in a textual format and we know that the path
1324 : * starts with SNAPSHOT_EXPORT_DIR. Skip over the prefix and the slash
1325 : * and pstrdup it so as not to return the address of a local variable.
1326 : */
1327 0 : return pstrdup(path + strlen(SNAPSHOT_EXPORT_DIR) + 1);
1328 : }
1329 :
1330 : /*
1331 : * pg_export_snapshot
1332 : * SQL-callable wrapper for ExportSnapshot.
1333 : */
1334 : Datum
1335 0 : pg_export_snapshot(PG_FUNCTION_ARGS)
1336 : {
1337 : char *snapshotName;
1338 :
1339 0 : snapshotName = ExportSnapshot(GetActiveSnapshot());
1340 0 : PG_RETURN_TEXT_P(cstring_to_text(snapshotName));
1341 : }
1342 :
1343 :
1344 : /*
1345 : * Parsing subroutines for ImportSnapshot: parse a line with the given
1346 : * prefix followed by a value, and advance *s to the next line. The
1347 : * filename is provided for use in error messages.
1348 : */
1349 : static int
1350 0 : parseIntFromText(const char *prefix, char **s, const char *filename)
1351 : {
1352 0 : char *ptr = *s;
1353 0 : int prefixlen = strlen(prefix);
1354 : int val;
1355 :
1356 0 : if (strncmp(ptr, prefix, prefixlen) != 0)
1357 0 : ereport(ERROR,
1358 : (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1359 : errmsg("invalid snapshot data in file \"%s\"", filename)));
1360 0 : ptr += prefixlen;
1361 0 : if (sscanf(ptr, "%d", &val) != 1)
1362 0 : ereport(ERROR,
1363 : (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1364 : errmsg("invalid snapshot data in file \"%s\"", filename)));
1365 0 : ptr = strchr(ptr, '\n');
1366 0 : if (!ptr)
1367 0 : ereport(ERROR,
1368 : (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1369 : errmsg("invalid snapshot data in file \"%s\"", filename)));
1370 0 : *s = ptr + 1;
1371 0 : return val;
1372 : }
1373 :
1374 : static TransactionId
1375 0 : parseXidFromText(const char *prefix, char **s, const char *filename)
1376 : {
1377 0 : char *ptr = *s;
1378 0 : int prefixlen = strlen(prefix);
1379 : TransactionId val;
1380 :
1381 0 : if (strncmp(ptr, prefix, prefixlen) != 0)
1382 0 : ereport(ERROR,
1383 : (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1384 : errmsg("invalid snapshot data in file \"%s\"", filename)));
1385 0 : ptr += prefixlen;
1386 0 : if (sscanf(ptr, "%u", &val) != 1)
1387 0 : ereport(ERROR,
1388 : (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1389 : errmsg("invalid snapshot data in file \"%s\"", filename)));
1390 0 : ptr = strchr(ptr, '\n');
1391 0 : if (!ptr)
1392 0 : ereport(ERROR,
1393 : (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1394 : errmsg("invalid snapshot data in file \"%s\"", filename)));
1395 0 : *s = ptr + 1;
1396 0 : return val;
1397 : }
1398 :
1399 : static void
1400 0 : parseVxidFromText(const char *prefix, char **s, const char *filename,
1401 : VirtualTransactionId *vxid)
1402 : {
1403 0 : char *ptr = *s;
1404 0 : int prefixlen = strlen(prefix);
1405 :
1406 0 : if (strncmp(ptr, prefix, prefixlen) != 0)
1407 0 : ereport(ERROR,
1408 : (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1409 : errmsg("invalid snapshot data in file \"%s\"", filename)));
1410 0 : ptr += prefixlen;
1411 0 : if (sscanf(ptr, "%d/%u", &vxid->backendId, &vxid->localTransactionId) != 2)
1412 0 : ereport(ERROR,
1413 : (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1414 : errmsg("invalid snapshot data in file \"%s\"", filename)));
1415 0 : ptr = strchr(ptr, '\n');
1416 0 : if (!ptr)
1417 0 : ereport(ERROR,
1418 : (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1419 : errmsg("invalid snapshot data in file \"%s\"", filename)));
1420 0 : *s = ptr + 1;
1421 0 : }
1422 :
1423 : /*
1424 : * ImportSnapshot
1425 : * Import a previously exported snapshot. The argument should be a
1426 : * filename in SNAPSHOT_EXPORT_DIR. Load the snapshot from that file.
1427 : * This is called by "SET TRANSACTION SNAPSHOT 'foo'".
1428 : */
1429 : void
1430 0 : ImportSnapshot(const char *idstr)
1431 : {
1432 : char path[MAXPGPATH];
1433 : FILE *f;
1434 : struct stat stat_buf;
1435 : char *filebuf;
1436 : int xcnt;
1437 : int i;
1438 : VirtualTransactionId src_vxid;
1439 : int src_pid;
1440 : Oid src_dbid;
1441 : int src_isolevel;
1442 : bool src_readonly;
1443 : SnapshotData snapshot;
1444 :
1445 : /*
1446 : * Must be at top level of a fresh transaction. Note in particular that
1447 : * we check we haven't acquired an XID --- if we have, it's conceivable
1448 : * that the snapshot would show it as not running, making for very screwy
1449 : * behavior.
1450 : */
1451 0 : if (FirstSnapshotSet ||
1452 0 : GetTopTransactionIdIfAny() != InvalidTransactionId ||
1453 0 : IsSubTransaction())
1454 0 : ereport(ERROR,
1455 : (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
1456 : errmsg("SET TRANSACTION SNAPSHOT must be called before any query")));
1457 :
1458 : /*
1459 : * If we are in read committed mode then the next query would execute with
1460 : * a new snapshot thus making this function call quite useless.
1461 : */
1462 0 : if (!IsolationUsesXactSnapshot())
1463 0 : ereport(ERROR,
1464 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1465 : errmsg("a snapshot-importing transaction must have isolation level SERIALIZABLE or REPEATABLE READ")));
1466 :
1467 : /*
1468 : * Verify the identifier: only 0-9, A-F and hyphens are allowed. We do
1469 : * this mainly to prevent reading arbitrary files.
1470 : */
1471 0 : if (strspn(idstr, "0123456789ABCDEF-") != strlen(idstr))
1472 0 : ereport(ERROR,
1473 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1474 : errmsg("invalid snapshot identifier: \"%s\"", idstr)));
1475 :
1476 : /* OK, read the file */
1477 0 : snprintf(path, MAXPGPATH, SNAPSHOT_EXPORT_DIR "/%s", idstr);
1478 :
1479 0 : f = AllocateFile(path, PG_BINARY_R);
1480 0 : if (!f)
1481 0 : ereport(ERROR,
1482 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1483 : errmsg("invalid snapshot identifier: \"%s\"", idstr)));
1484 :
1485 : /* get the size of the file so that we know how much memory we need */
1486 0 : if (fstat(fileno(f), &stat_buf))
1487 0 : elog(ERROR, "could not stat file \"%s\": %m", path);
1488 :
1489 : /* and read the file into a palloc'd string */
1490 0 : filebuf = (char *) palloc(stat_buf.st_size + 1);
1491 0 : if (fread(filebuf, stat_buf.st_size, 1, f) != 1)
1492 0 : elog(ERROR, "could not read file \"%s\": %m", path);
1493 :
1494 0 : filebuf[stat_buf.st_size] = '\0';
1495 :
1496 0 : FreeFile(f);
1497 :
1498 : /*
1499 : * Construct a snapshot struct by parsing the file content.
1500 : */
1501 0 : memset(&snapshot, 0, sizeof(snapshot));
1502 :
1503 0 : parseVxidFromText("vxid:", &filebuf, path, &src_vxid);
1504 0 : src_pid = parseIntFromText("pid:", &filebuf, path);
1505 : /* we abuse parseXidFromText a bit here ... */
1506 0 : src_dbid = parseXidFromText("dbid:", &filebuf, path);
1507 0 : src_isolevel = parseIntFromText("iso:", &filebuf, path);
1508 0 : src_readonly = parseIntFromText("ro:", &filebuf, path);
1509 :
1510 0 : snapshot.xmin = parseXidFromText("xmin:", &filebuf, path);
1511 0 : snapshot.xmax = parseXidFromText("xmax:", &filebuf, path);
1512 :
1513 0 : snapshot.xcnt = xcnt = parseIntFromText("xcnt:", &filebuf, path);
1514 :
1515 : /* sanity-check the xid count before palloc */
1516 0 : if (xcnt < 0 || xcnt > GetMaxSnapshotXidCount())
1517 0 : ereport(ERROR,
1518 : (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1519 : errmsg("invalid snapshot data in file \"%s\"", path)));
1520 :
1521 0 : snapshot.xip = (TransactionId *) palloc(xcnt * sizeof(TransactionId));
1522 0 : for (i = 0; i < xcnt; i++)
1523 0 : snapshot.xip[i] = parseXidFromText("xip:", &filebuf, path);
1524 :
1525 0 : snapshot.suboverflowed = parseIntFromText("sof:", &filebuf, path);
1526 :
1527 0 : if (!snapshot.suboverflowed)
1528 : {
1529 0 : snapshot.subxcnt = xcnt = parseIntFromText("sxcnt:", &filebuf, path);
1530 :
1531 : /* sanity-check the xid count before palloc */
1532 0 : if (xcnt < 0 || xcnt > GetMaxSnapshotSubxidCount())
1533 0 : ereport(ERROR,
1534 : (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1535 : errmsg("invalid snapshot data in file \"%s\"", path)));
1536 :
1537 0 : snapshot.subxip = (TransactionId *) palloc(xcnt * sizeof(TransactionId));
1538 0 : for (i = 0; i < xcnt; i++)
1539 0 : snapshot.subxip[i] = parseXidFromText("sxp:", &filebuf, path);
1540 : }
1541 : else
1542 : {
1543 0 : snapshot.subxcnt = 0;
1544 0 : snapshot.subxip = NULL;
1545 : }
1546 :
1547 0 : snapshot.takenDuringRecovery = parseIntFromText("rec:", &filebuf, path);
1548 :
1549 : /*
1550 : * Do some additional sanity checking, just to protect ourselves. We
1551 : * don't trouble to check the array elements, just the most critical
1552 : * fields.
1553 : */
1554 0 : if (!VirtualTransactionIdIsValid(src_vxid) ||
1555 0 : !OidIsValid(src_dbid) ||
1556 0 : !TransactionIdIsNormal(snapshot.xmin) ||
1557 0 : !TransactionIdIsNormal(snapshot.xmax))
1558 0 : ereport(ERROR,
1559 : (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1560 : errmsg("invalid snapshot data in file \"%s\"", path)));
1561 :
1562 : /*
1563 : * If we're serializable, the source transaction must be too, otherwise
1564 : * predicate.c has problems (SxactGlobalXmin could go backwards). Also, a
1565 : * non-read-only transaction can't adopt a snapshot from a read-only
1566 : * transaction, as predicate.c handles the cases very differently.
1567 : */
1568 0 : if (IsolationIsSerializable())
1569 : {
1570 0 : if (src_isolevel != XACT_SERIALIZABLE)
1571 0 : ereport(ERROR,
1572 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1573 : errmsg("a serializable transaction cannot import a snapshot from a non-serializable transaction")));
1574 0 : if (src_readonly && !XactReadOnly)
1575 0 : ereport(ERROR,
1576 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1577 : errmsg("a non-read-only serializable transaction cannot import a snapshot from a read-only transaction")));
1578 : }
1579 :
1580 : /*
1581 : * We cannot import a snapshot that was taken in a different database,
1582 : * because vacuum calculates OldestXmin on a per-database basis; so the
1583 : * source transaction's xmin doesn't protect us from data loss. This
1584 : * restriction could be removed if the source transaction were to mark its
1585 : * xmin as being globally applicable. But that would require some
1586 : * additional syntax, since that has to be known when the snapshot is
1587 : * initially taken. (See pgsql-hackers discussion of 2011-10-21.)
1588 : */
1589 0 : if (src_dbid != MyDatabaseId)
1590 0 : ereport(ERROR,
1591 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1592 : errmsg("cannot import a snapshot from a different database")));
1593 :
1594 : /* OK, install the snapshot */
1595 0 : SetTransactionSnapshot(&snapshot, &src_vxid, src_pid, NULL);
1596 0 : }
1597 :
1598 : /*
1599 : * XactHasExportedSnapshots
1600 : * Test whether current transaction has exported any snapshots.
1601 : */
1602 : bool
1603 7 : XactHasExportedSnapshots(void)
1604 : {
1605 7 : return (exportedSnapshots != NIL);
1606 : }
1607 :
1608 : /*
1609 : * DeleteAllExportedSnapshotFiles
1610 : * Clean up any files that have been left behind by a crashed backend
1611 : * that had exported snapshots before it died.
1612 : *
1613 : * This should be called during database startup or crash recovery.
1614 : */
1615 : void
1616 0 : DeleteAllExportedSnapshotFiles(void)
1617 : {
1618 : char buf[MAXPGPATH + sizeof(SNAPSHOT_EXPORT_DIR)];
1619 : DIR *s_dir;
1620 : struct dirent *s_de;
1621 :
1622 0 : if (!(s_dir = AllocateDir(SNAPSHOT_EXPORT_DIR)))
1623 : {
1624 : /*
1625 : * We really should have that directory in a sane cluster setup. But
1626 : * then again if we don't, it's not fatal enough to make it FATAL.
1627 : * Since we're running in the postmaster, LOG is our best bet.
1628 : */
1629 0 : elog(LOG, "could not open directory \"%s\": %m", SNAPSHOT_EXPORT_DIR);
1630 0 : return;
1631 : }
1632 :
1633 0 : while ((s_de = ReadDir(s_dir, SNAPSHOT_EXPORT_DIR)) != NULL)
1634 : {
1635 0 : if (strcmp(s_de->d_name, ".") == 0 ||
1636 0 : strcmp(s_de->d_name, "..") == 0)
1637 0 : continue;
1638 :
1639 0 : snprintf(buf, sizeof(buf), SNAPSHOT_EXPORT_DIR "/%s", s_de->d_name);
1640 : /* Again, unlink failure is not worthy of FATAL */
1641 0 : if (unlink(buf))
1642 0 : elog(LOG, "could not unlink file \"%s\": %m", buf);
1643 : }
1644 :
1645 0 : FreeDir(s_dir);
1646 : }
1647 :
1648 : bool
1649 7 : ThereAreNoPriorRegisteredSnapshots(void)
1650 : {
1651 14 : if (pairingheap_is_empty(&RegisteredSnapshots) ||
1652 14 : pairingheap_is_singular(&RegisteredSnapshots))
1653 7 : return true;
1654 :
1655 0 : return false;
1656 : }
1657 :
1658 :
1659 : /*
1660 : * Return a timestamp that is exactly on a minute boundary.
1661 : *
1662 : * If the argument is already aligned, return that value, otherwise move to
1663 : * the next minute boundary following the given time.
1664 : */
1665 : static TimestampTz
1666 0 : AlignTimestampToMinuteBoundary(TimestampTz ts)
1667 : {
1668 0 : TimestampTz retval = ts + (USECS_PER_MINUTE - 1);
1669 :
1670 0 : return retval - (retval % USECS_PER_MINUTE);
1671 : }
1672 :
1673 : /*
1674 : * Get current timestamp for snapshots
1675 : *
1676 : * This is basically GetCurrentTimestamp(), but with a guarantee that
1677 : * the result never moves backward.
1678 : */
1679 : TimestampTz
1680 0 : GetSnapshotCurrentTimestamp(void)
1681 : {
1682 0 : TimestampTz now = GetCurrentTimestamp();
1683 :
1684 : /*
1685 : * Don't let time move backward; if it hasn't advanced, use the old value.
1686 : */
1687 0 : SpinLockAcquire(&oldSnapshotControl->mutex_current);
1688 0 : if (now <= oldSnapshotControl->current_timestamp)
1689 0 : now = oldSnapshotControl->current_timestamp;
1690 : else
1691 0 : oldSnapshotControl->current_timestamp = now;
1692 0 : SpinLockRelease(&oldSnapshotControl->mutex_current);
1693 :
1694 0 : return now;
1695 : }
1696 :
1697 : /*
1698 : * Get timestamp through which vacuum may have processed based on last stored
1699 : * value for threshold_timestamp.
1700 : *
1701 : * XXX: So far, we never trust that a 64-bit value can be read atomically; if
1702 : * that ever changes, we could get rid of the spinlock here.
1703 : */
1704 : TimestampTz
1705 0 : GetOldSnapshotThresholdTimestamp(void)
1706 : {
1707 : TimestampTz threshold_timestamp;
1708 :
1709 0 : SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
1710 0 : threshold_timestamp = oldSnapshotControl->threshold_timestamp;
1711 0 : SpinLockRelease(&oldSnapshotControl->mutex_threshold);
1712 :
1713 0 : return threshold_timestamp;
1714 : }
1715 :
1716 : static void
1717 0 : SetOldSnapshotThresholdTimestamp(TimestampTz ts, TransactionId xlimit)
1718 : {
1719 0 : SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
1720 0 : oldSnapshotControl->threshold_timestamp = ts;
1721 0 : oldSnapshotControl->threshold_xid = xlimit;
1722 0 : SpinLockRelease(&oldSnapshotControl->mutex_threshold);
1723 0 : }
1724 :
1725 : /*
1726 : * TransactionIdLimitedForOldSnapshots
1727 : *
1728 : * Apply old snapshot limit, if any. This is intended to be called for page
1729 : * pruning and table vacuuming, to allow old_snapshot_threshold to override
1730 : * the normal global xmin value. Actual testing for snapshot too old will be
1731 : * based on whether a snapshot timestamp is prior to the threshold timestamp
1732 : * set in this function.
1733 : */
1734 : TransactionId
1735 167065 : TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
1736 : Relation relation)
1737 : {
1738 167065 : if (TransactionIdIsNormal(recentXmin)
1739 167065 : && old_snapshot_threshold >= 0
1740 0 : && RelationAllowsEarlyPruning(relation))
1741 : {
1742 0 : TimestampTz ts = GetSnapshotCurrentTimestamp();
1743 0 : TransactionId xlimit = recentXmin;
1744 : TransactionId latest_xmin;
1745 : TimestampTz update_ts;
1746 0 : bool same_ts_as_threshold = false;
1747 :
1748 0 : SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin);
1749 0 : latest_xmin = oldSnapshotControl->latest_xmin;
1750 0 : update_ts = oldSnapshotControl->next_map_update;
1751 0 : SpinLockRelease(&oldSnapshotControl->mutex_latest_xmin);
1752 :
1753 : /*
1754 : * Zero threshold always overrides to latest xmin, if valid. Without
1755 : * some heuristic it will find its own snapshot too old on, for
1756 : * example, a simple UPDATE -- which would make it useless for most
1757 : * testing, but there is no principled way to ensure that it doesn't
1758 : * fail in this way. Use a five-second delay to try to get useful
1759 : * testing behavior, but this may need adjustment.
1760 : */
1761 0 : if (old_snapshot_threshold == 0)
1762 : {
1763 0 : if (TransactionIdPrecedes(latest_xmin, MyPgXact->xmin)
1764 0 : && TransactionIdFollows(latest_xmin, xlimit))
1765 0 : xlimit = latest_xmin;
1766 :
1767 0 : ts -= 5 * USECS_PER_SEC;
1768 0 : SetOldSnapshotThresholdTimestamp(ts, xlimit);
1769 :
1770 0 : return xlimit;
1771 : }
1772 :
1773 0 : ts = AlignTimestampToMinuteBoundary(ts)
1774 0 : - (old_snapshot_threshold * USECS_PER_MINUTE);
1775 :
1776 : /* Check for fast exit without LW locking. */
1777 0 : SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
1778 0 : if (ts == oldSnapshotControl->threshold_timestamp)
1779 : {
1780 0 : xlimit = oldSnapshotControl->threshold_xid;
1781 0 : same_ts_as_threshold = true;
1782 : }
1783 0 : SpinLockRelease(&oldSnapshotControl->mutex_threshold);
1784 :
1785 0 : if (!same_ts_as_threshold)
1786 : {
1787 0 : if (ts == update_ts)
1788 : {
1789 0 : xlimit = latest_xmin;
1790 0 : if (NormalTransactionIdFollows(xlimit, recentXmin))
1791 0 : SetOldSnapshotThresholdTimestamp(ts, xlimit);
1792 : }
1793 : else
1794 : {
1795 0 : LWLockAcquire(OldSnapshotTimeMapLock, LW_SHARED);
1796 :
1797 0 : if (oldSnapshotControl->count_used > 0
1798 0 : && ts >= oldSnapshotControl->head_timestamp)
1799 : {
1800 : int offset;
1801 :
1802 0 : offset = ((ts - oldSnapshotControl->head_timestamp)
1803 0 : / USECS_PER_MINUTE);
1804 0 : if (offset > oldSnapshotControl->count_used - 1)
1805 0 : offset = oldSnapshotControl->count_used - 1;
1806 0 : offset = (oldSnapshotControl->head_offset + offset)
1807 0 : % OLD_SNAPSHOT_TIME_MAP_ENTRIES;
1808 0 : xlimit = oldSnapshotControl->xid_by_minute[offset];
1809 :
1810 0 : if (NormalTransactionIdFollows(xlimit, recentXmin))
1811 0 : SetOldSnapshotThresholdTimestamp(ts, xlimit);
1812 : }
1813 :
1814 0 : LWLockRelease(OldSnapshotTimeMapLock);
1815 : }
1816 : }
1817 :
1818 : /*
1819 : * Failsafe protection against vacuuming work of active transaction.
1820 : *
1821 : * This is not an assertion because we avoid the spinlock for
1822 : * performance, leaving open the possibility that xlimit could advance
1823 : * and be more current; but it seems prudent to apply this limit. It
1824 : * might make pruning a tiny bit less aggressive than it could be, but
1825 : * protects against data loss bugs.
1826 : */
1827 0 : if (TransactionIdIsNormal(latest_xmin)
1828 0 : && TransactionIdPrecedes(latest_xmin, xlimit))
1829 0 : xlimit = latest_xmin;
1830 :
1831 0 : if (NormalTransactionIdFollows(xlimit, recentXmin))
1832 0 : return xlimit;
1833 : }
1834 :
1835 167065 : return recentXmin;
1836 : }
1837 :
1838 : /*
1839 : * Take care of the circular buffer that maps time to xid.
1840 : */
1841 : void
1842 0 : MaintainOldSnapshotTimeMapping(TimestampTz whenTaken, TransactionId xmin)
1843 : {
1844 : TimestampTz ts;
1845 : TransactionId latest_xmin;
1846 : TimestampTz update_ts;
1847 0 : bool map_update_required = false;
1848 :
1849 : /* Never call this function when old snapshot checking is disabled. */
1850 0 : Assert(old_snapshot_threshold >= 0);
1851 :
1852 0 : ts = AlignTimestampToMinuteBoundary(whenTaken);
1853 :
1854 : /*
1855 : * Keep track of the latest xmin seen by any process. Update mapping with
1856 : * a new value when we have crossed a bucket boundary.
1857 : */
1858 0 : SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin);
1859 0 : latest_xmin = oldSnapshotControl->latest_xmin;
1860 0 : update_ts = oldSnapshotControl->next_map_update;
1861 0 : if (ts > update_ts)
1862 : {
1863 0 : oldSnapshotControl->next_map_update = ts;
1864 0 : map_update_required = true;
1865 : }
1866 0 : if (TransactionIdFollows(xmin, latest_xmin))
1867 0 : oldSnapshotControl->latest_xmin = xmin;
1868 0 : SpinLockRelease(&oldSnapshotControl->mutex_latest_xmin);
1869 :
1870 : /* We only needed to update the most recent xmin value. */
1871 0 : if (!map_update_required)
1872 0 : return;
1873 :
1874 : /* No further tracking needed for 0 (used for testing). */
1875 0 : if (old_snapshot_threshold == 0)
1876 0 : return;
1877 :
1878 : /*
1879 : * We don't want to do something stupid with unusual values, but we don't
1880 : * want to litter the log with warnings or break otherwise normal
1881 : * processing for this feature; so if something seems unreasonable, just
1882 : * log at DEBUG level and return without doing anything.
1883 : */
1884 0 : if (whenTaken < 0)
1885 : {
1886 0 : elog(DEBUG1,
1887 : "MaintainOldSnapshotTimeMapping called with negative whenTaken = %ld",
1888 : (long) whenTaken);
1889 0 : return;
1890 : }
1891 0 : if (!TransactionIdIsNormal(xmin))
1892 : {
1893 0 : elog(DEBUG1,
1894 : "MaintainOldSnapshotTimeMapping called with xmin = %lu",
1895 : (unsigned long) xmin);
1896 0 : return;
1897 : }
1898 :
1899 0 : LWLockAcquire(OldSnapshotTimeMapLock, LW_EXCLUSIVE);
1900 :
1901 0 : Assert(oldSnapshotControl->head_offset >= 0);
1902 0 : Assert(oldSnapshotControl->head_offset < OLD_SNAPSHOT_TIME_MAP_ENTRIES);
1903 0 : Assert((oldSnapshotControl->head_timestamp % USECS_PER_MINUTE) == 0);
1904 0 : Assert(oldSnapshotControl->count_used >= 0);
1905 0 : Assert(oldSnapshotControl->count_used <= OLD_SNAPSHOT_TIME_MAP_ENTRIES);
1906 :
1907 0 : if (oldSnapshotControl->count_used == 0)
1908 : {
1909 : /* set up first entry for empty mapping */
1910 0 : oldSnapshotControl->head_offset = 0;
1911 0 : oldSnapshotControl->head_timestamp = ts;
1912 0 : oldSnapshotControl->count_used = 1;
1913 0 : oldSnapshotControl->xid_by_minute[0] = xmin;
1914 : }
1915 0 : else if (ts < oldSnapshotControl->head_timestamp)
1916 : {
1917 : /* old ts; log it at DEBUG */
1918 0 : LWLockRelease(OldSnapshotTimeMapLock);
1919 0 : elog(DEBUG1,
1920 : "MaintainOldSnapshotTimeMapping called with old whenTaken = %ld",
1921 : (long) whenTaken);
1922 0 : return;
1923 : }
1924 0 : else if (ts <= (oldSnapshotControl->head_timestamp +
1925 0 : ((oldSnapshotControl->count_used - 1)
1926 0 : * USECS_PER_MINUTE)))
1927 : {
1928 : /* existing mapping; advance xid if possible */
1929 0 : int bucket = (oldSnapshotControl->head_offset
1930 0 : + ((ts - oldSnapshotControl->head_timestamp)
1931 0 : / USECS_PER_MINUTE))
1932 0 : % OLD_SNAPSHOT_TIME_MAP_ENTRIES;
1933 :
1934 0 : if (TransactionIdPrecedes(oldSnapshotControl->xid_by_minute[bucket], xmin))
1935 0 : oldSnapshotControl->xid_by_minute[bucket] = xmin;
1936 : }
1937 : else
1938 : {
1939 : /* We need a new bucket, but it might not be the very next one. */
1940 0 : int advance = ((ts - oldSnapshotControl->head_timestamp)
1941 0 : / USECS_PER_MINUTE);
1942 :
1943 0 : oldSnapshotControl->head_timestamp = ts;
1944 :
1945 0 : if (advance >= OLD_SNAPSHOT_TIME_MAP_ENTRIES)
1946 : {
1947 : /* Advance is so far that all old data is junk; start over. */
1948 0 : oldSnapshotControl->head_offset = 0;
1949 0 : oldSnapshotControl->count_used = 1;
1950 0 : oldSnapshotControl->xid_by_minute[0] = xmin;
1951 : }
1952 : else
1953 : {
1954 : /* Store the new value in one or more buckets. */
1955 : int i;
1956 :
1957 0 : for (i = 0; i < advance; i++)
1958 : {
1959 0 : if (oldSnapshotControl->count_used == OLD_SNAPSHOT_TIME_MAP_ENTRIES)
1960 : {
1961 : /* Map full and new value replaces old head. */
1962 0 : int old_head = oldSnapshotControl->head_offset;
1963 :
1964 0 : if (old_head == (OLD_SNAPSHOT_TIME_MAP_ENTRIES - 1))
1965 0 : oldSnapshotControl->head_offset = 0;
1966 : else
1967 0 : oldSnapshotControl->head_offset = old_head + 1;
1968 0 : oldSnapshotControl->xid_by_minute[old_head] = xmin;
1969 : }
1970 : else
1971 : {
1972 : /* Extend map to unused entry. */
1973 0 : int new_tail = (oldSnapshotControl->head_offset
1974 0 : + oldSnapshotControl->count_used)
1975 0 : % OLD_SNAPSHOT_TIME_MAP_ENTRIES;
1976 :
1977 0 : oldSnapshotControl->count_used++;
1978 0 : oldSnapshotControl->xid_by_minute[new_tail] = xmin;
1979 : }
1980 : }
1981 : }
1982 : }
1983 :
1984 0 : LWLockRelease(OldSnapshotTimeMapLock);
1985 : }
1986 :
1987 :
1988 : /*
1989 : * Setup a snapshot that replaces normal catalog snapshots that allows catalog
1990 : * access to behave just like it did at a certain point in the past.
1991 : *
1992 : * Needed for logical decoding.
1993 : */
1994 : void
1995 0 : SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
1996 : {
1997 0 : Assert(historic_snapshot != NULL);
1998 :
1999 : /* setup the timetravel snapshot */
2000 0 : HistoricSnapshot = historic_snapshot;
2001 :
2002 : /* setup (cmin, cmax) lookup hash */
2003 0 : tuplecid_data = tuplecids;
2004 0 : }
2005 :
2006 :
2007 : /*
2008 : * Make catalog snapshots behave normally again.
2009 : */
2010 : void
2011 0 : TeardownHistoricSnapshot(bool is_error)
2012 : {
2013 0 : HistoricSnapshot = NULL;
2014 0 : tuplecid_data = NULL;
2015 0 : }
2016 :
2017 : bool
2018 482783 : HistoricSnapshotActive(void)
2019 : {
2020 482783 : return HistoricSnapshot != NULL;
2021 : }
2022 :
2023 : HTAB *
2024 0 : HistoricSnapshotGetTupleCids(void)
2025 : {
2026 0 : Assert(HistoricSnapshotActive());
2027 0 : return tuplecid_data;
2028 : }
2029 :
2030 : /*
2031 : * EstimateSnapshotSpace
2032 : * Returns the size needed to store the given snapshot.
2033 : *
2034 : * We are exporting only required fields from the Snapshot, stored in
2035 : * SerializedSnapshotData.
2036 : */
2037 : Size
2038 59 : EstimateSnapshotSpace(Snapshot snap)
2039 : {
2040 : Size size;
2041 :
2042 59 : Assert(snap != InvalidSnapshot);
2043 59 : Assert(snap->satisfies == HeapTupleSatisfiesMVCC);
2044 :
2045 : /* We allocate any XID arrays needed in the same palloc block. */
2046 59 : size = add_size(sizeof(SerializedSnapshotData),
2047 : mul_size(snap->xcnt, sizeof(TransactionId)));
2048 59 : if (snap->subxcnt > 0 &&
2049 0 : (!snap->suboverflowed || snap->takenDuringRecovery))
2050 0 : size = add_size(size,
2051 0 : mul_size(snap->subxcnt, sizeof(TransactionId)));
2052 :
2053 59 : return size;
2054 : }
2055 :
2056 : /*
2057 : * SerializeSnapshot
2058 : * Dumps the serialized snapshot (extracted from given snapshot) onto the
2059 : * memory location at start_address.
2060 : */
2061 : void
2062 54 : SerializeSnapshot(Snapshot snapshot, char *start_address)
2063 : {
2064 : SerializedSnapshotData serialized_snapshot;
2065 :
2066 54 : Assert(snapshot->subxcnt >= 0);
2067 :
2068 : /* Copy all required fields */
2069 54 : serialized_snapshot.xmin = snapshot->xmin;
2070 54 : serialized_snapshot.xmax = snapshot->xmax;
2071 54 : serialized_snapshot.xcnt = snapshot->xcnt;
2072 54 : serialized_snapshot.subxcnt = snapshot->subxcnt;
2073 54 : serialized_snapshot.suboverflowed = snapshot->suboverflowed;
2074 54 : serialized_snapshot.takenDuringRecovery = snapshot->takenDuringRecovery;
2075 54 : serialized_snapshot.curcid = snapshot->curcid;
2076 54 : serialized_snapshot.whenTaken = snapshot->whenTaken;
2077 54 : serialized_snapshot.lsn = snapshot->lsn;
2078 :
2079 : /*
2080 : * Ignore the SubXID array if it has overflowed, unless the snapshot was
2081 : * taken during recovery - in that case, top-level XIDs are in subxip as
2082 : * well, and we mustn't lose them.
2083 : */
2084 54 : if (serialized_snapshot.suboverflowed && !snapshot->takenDuringRecovery)
2085 0 : serialized_snapshot.subxcnt = 0;
2086 :
2087 : /* Copy struct to possibly-unaligned buffer */
2088 54 : memcpy(start_address,
2089 : &serialized_snapshot, sizeof(SerializedSnapshotData));
2090 :
2091 : /* Copy XID array */
2092 54 : if (snapshot->xcnt > 0)
2093 0 : memcpy((TransactionId *) (start_address +
2094 : sizeof(SerializedSnapshotData)),
2095 0 : snapshot->xip, snapshot->xcnt * sizeof(TransactionId));
2096 :
2097 : /*
2098 : * Copy SubXID array. Don't bother to copy it if it had overflowed,
2099 : * though, because it's not used anywhere in that case. Except if it's a
2100 : * snapshot taken during recovery; all the top-level XIDs are in subxip as
2101 : * well in that case, so we mustn't lose them.
2102 : */
2103 54 : if (serialized_snapshot.subxcnt > 0)
2104 : {
2105 0 : Size subxipoff = sizeof(SerializedSnapshotData) +
2106 0 : snapshot->xcnt * sizeof(TransactionId);
2107 :
2108 0 : memcpy((TransactionId *) (start_address + subxipoff),
2109 0 : snapshot->subxip, snapshot->subxcnt * sizeof(TransactionId));
2110 : }
2111 54 : }
2112 :
2113 : /*
2114 : * RestoreSnapshot
2115 : * Restore a serialized snapshot from the specified address.
2116 : *
2117 : * The copy is palloc'd in TopTransactionContext and has initial refcounts set
2118 : * to 0. The returned snapshot has the copied flag set.
2119 : */
2120 : Snapshot
2121 366 : RestoreSnapshot(char *start_address)
2122 : {
2123 : SerializedSnapshotData serialized_snapshot;
2124 : Size size;
2125 : Snapshot snapshot;
2126 : TransactionId *serialized_xids;
2127 :
2128 366 : memcpy(&serialized_snapshot, start_address,
2129 : sizeof(SerializedSnapshotData));
2130 366 : serialized_xids = (TransactionId *)
2131 : (start_address + sizeof(SerializedSnapshotData));
2132 :
2133 : /* We allocate any XID arrays needed in the same palloc block. */
2134 366 : size = sizeof(SnapshotData)
2135 366 : + serialized_snapshot.xcnt * sizeof(TransactionId)
2136 366 : + serialized_snapshot.subxcnt * sizeof(TransactionId);
2137 :
2138 : /* Copy all required fields */
2139 366 : snapshot = (Snapshot) MemoryContextAlloc(TopTransactionContext, size);
2140 366 : snapshot->satisfies = HeapTupleSatisfiesMVCC;
2141 366 : snapshot->xmin = serialized_snapshot.xmin;
2142 366 : snapshot->xmax = serialized_snapshot.xmax;
2143 366 : snapshot->xip = NULL;
2144 366 : snapshot->xcnt = serialized_snapshot.xcnt;
2145 366 : snapshot->subxip = NULL;
2146 366 : snapshot->subxcnt = serialized_snapshot.subxcnt;
2147 366 : snapshot->suboverflowed = serialized_snapshot.suboverflowed;
2148 366 : snapshot->takenDuringRecovery = serialized_snapshot.takenDuringRecovery;
2149 366 : snapshot->curcid = serialized_snapshot.curcid;
2150 366 : snapshot->whenTaken = serialized_snapshot.whenTaken;
2151 366 : snapshot->lsn = serialized_snapshot.lsn;
2152 :
2153 : /* Copy XIDs, if present. */
2154 366 : if (serialized_snapshot.xcnt > 0)
2155 : {
2156 0 : snapshot->xip = (TransactionId *) (snapshot + 1);
2157 0 : memcpy(snapshot->xip, serialized_xids,
2158 0 : serialized_snapshot.xcnt * sizeof(TransactionId));
2159 : }
2160 :
2161 : /* Copy SubXIDs, if present. */
2162 366 : if (serialized_snapshot.subxcnt > 0)
2163 : {
2164 0 : snapshot->subxip = ((TransactionId *) (snapshot + 1)) +
2165 0 : serialized_snapshot.xcnt;
2166 0 : memcpy(snapshot->subxip, serialized_xids + serialized_snapshot.xcnt,
2167 0 : serialized_snapshot.subxcnt * sizeof(TransactionId));
2168 : }
2169 :
2170 : /* Set the copied flag so that the caller will set refcounts correctly. */
2171 366 : snapshot->regd_count = 0;
2172 366 : snapshot->active_count = 0;
2173 366 : snapshot->copied = true;
2174 :
2175 366 : return snapshot;
2176 : }
2177 :
2178 : /*
2179 : * Install a restored snapshot as the transaction snapshot.
2180 : *
2181 : * The second argument is of type void * so that snapmgr.h need not include
2182 : * the declaration for PGPROC.
2183 : */
2184 : void
2185 115 : RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
2186 : {
2187 115 : SetTransactionSnapshot(snapshot, NULL, InvalidPid, master_pgproc);
2188 115 : }
|