Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * snapbuild.c
4 : *
5 : * Infrastructure for building historic catalog snapshots based on contents
6 : * of the WAL, for the purpose of decoding heapam.c style values in the
7 : * WAL.
8 : *
9 : * NOTES:
10 : *
11 : * We build snapshots which can *only* be used to read catalog contents and we
12 : * do so by reading and interpreting the WAL stream. The aim is to build a
13 : * snapshot that behaves the same as a freshly taken MVCC snapshot would have
14 : * at the time the XLogRecord was generated.
15 : *
16 : * To build the snapshots we reuse the infrastructure built for Hot
17 : * Standby. The in-memory snapshots we build look different than HS' because
18 : * we have different needs. To successfully decode data from the WAL we only
19 : * need to access catalog tables and (sys|rel|cat)cache, not the actual user
20 : * tables since the data we decode is wholly contained in the WAL
21 : * records. Also, our snapshots need to be different in comparison to normal
22 : * MVCC ones because in contrast to those we cannot fully rely on the clog and
23 : * pg_subtrans for information about committed transactions because they might
24 : * commit in the future from the POV of the WAL entry we're currently
25 : * decoding. This definition has the advantage that we only need to prevent
26 : * removal of catalog rows, while normal table's rows can still be
27 : * removed. This is achieved by using the replication slot mechanism.
28 : *
29 : * As the percentage of transactions modifying the catalog normally is fairly
30 : * small in comparisons to ones only manipulating user data, we keep track of
31 : * the committed catalog modifying ones inside [xmin, xmax) instead of keeping
32 : * track of all running transactions like it's done in a normal snapshot. Note
33 : * that we're generally only looking at transactions that have acquired an
34 : * xid. That is we keep a list of transactions between snapshot->(xmin, xmax)
35 : * that we consider committed, everything else is considered aborted/in
36 : * progress. That also allows us not to care about subtransactions before they
37 : * have committed which means this module, in contrast to HS, doesn't have to
38 : * care about suboverflowed subtransactions and similar.
39 : *
40 : * One complexity of doing this is that to e.g. handle mixed DDL/DML
41 : * transactions we need Snapshots that see intermediate versions of the
42 : * catalog in a transaction. During normal operation this is achieved by using
43 : * CommandIds/cmin/cmax. The problem with that however is that for space
44 : * efficiency reasons only one value of that is stored
45 : * (c.f. combocid.c). Since ComboCids are only available in memory we log
46 : * additional information which allows us to get the original (cmin, cmax)
47 : * pair during visibility checks. Check the reorderbuffer.c's comment above
48 : * ResolveCminCmaxDuringDecoding() for details.
49 : *
50 : * To facilitate all this we need our own visibility routine, as the normal
51 : * ones are optimized for different usecases.
52 : *
53 : * To replace the normal catalog snapshots with decoding ones use the
54 : * SetupHistoricSnapshot() and TeardownHistoricSnapshot() functions.
55 : *
56 : *
57 : *
58 : * The snapbuild machinery is starting up in several stages, as illustrated
59 : * by the following graph describing the SnapBuild->state transitions:
60 : *
61 : * +-------------------------+
62 : * +----| START |-------------+
63 : * | +-------------------------+ |
64 : * | | |
65 : * | | |
66 : * | running_xacts #1 |
67 : * | | |
68 : * | | |
69 : * | v |
70 : * | +-------------------------+ v
71 : * | | BUILDING_SNAPSHOT |------------>|
72 : * | +-------------------------+ |
73 : * | | |
74 : * | | |
75 : * | running_xacts #2, xacts from #1 finished |
76 : * | | |
77 : * | | |
78 : * | v |
79 : * | +-------------------------+ v
80 : * | | FULL_SNAPSHOT |------------>|
81 : * | +-------------------------+ |
82 : * | | |
83 : * running_xacts | saved snapshot
84 : * with zero xacts | at running_xacts's lsn
85 : * | | |
86 : * | running_xacts with xacts from #2 finished |
87 : * | | |
88 : * | v |
89 : * | +-------------------------+ |
90 : * +--->|SNAPBUILD_CONSISTENT |<------------+
91 : * +-------------------------+
92 : *
93 : * Initially the machinery is in the START stage. When an xl_running_xacts
94 : * record is read that is sufficiently new (above the safe xmin horizon),
95 : * there's a state transition. If there were no running xacts when the
96 : * running_xacts record was generated, we'll directly go into CONSISTENT
97 : * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full
98 : * snapshot means that all transactions that start henceforth can be decoded
99 : * in their entirety, but transactions that started previously can't. In
100 : * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
101 : * running transactions have committed or aborted.
102 : *
103 : * Only transactions that commit after CONSISTENT state has been reached will
104 : * be replayed, even though they might have started while still in
105 : * FULL_SNAPSHOT. That ensures that we'll reach a point where no previous
106 : * changes has been exported, but all the following ones will be. That point
107 : * is a convenient point to initialize replication from, which is why we
108 : * export a snapshot at that point, which *can* be used to read normal data.
109 : *
110 : * Copyright (c) 2012-2017, PostgreSQL Global Development Group
111 : *
112 : * IDENTIFICATION
113 : * src/backend/replication/snapbuild.c
114 : *
115 : *-------------------------------------------------------------------------
116 : */
117 :
118 : #include "postgres.h"
119 :
120 : #include <sys/stat.h>
121 : #include <unistd.h>
122 :
123 : #include "miscadmin.h"
124 :
125 : #include "access/heapam_xlog.h"
126 : #include "access/transam.h"
127 : #include "access/xact.h"
128 :
129 : #include "pgstat.h"
130 :
131 : #include "replication/logical.h"
132 : #include "replication/reorderbuffer.h"
133 : #include "replication/snapbuild.h"
134 :
135 : #include "utils/builtins.h"
136 : #include "utils/memutils.h"
137 : #include "utils/snapshot.h"
138 : #include "utils/snapmgr.h"
139 : #include "utils/tqual.h"
140 :
141 : #include "storage/block.h" /* debugging output */
142 : #include "storage/fd.h"
143 : #include "storage/lmgr.h"
144 : #include "storage/proc.h"
145 : #include "storage/procarray.h"
146 : #include "storage/standby.h"
147 :
148 : /*
149 : * This struct contains the current state of the snapshot building
150 : * machinery. Besides a forward declaration in the header, it is not exposed
151 : * to the public, so we can easily change its contents.
152 : */
153 : struct SnapBuild
154 : {
155 : /* how far are we along building our first full snapshot */
156 : SnapBuildState state;
157 :
158 : /* private memory context used to allocate memory for this module. */
159 : MemoryContext context;
160 :
161 : /* all transactions < than this have committed/aborted */
162 : TransactionId xmin;
163 :
164 : /* all transactions >= than this are uncommitted */
165 : TransactionId xmax;
166 :
167 : /*
168 : * Don't replay commits from an LSN < this LSN. This can be set externally
169 : * but it will also be advanced (never retreat) from within snapbuild.c.
170 : */
171 : XLogRecPtr start_decoding_at;
172 :
173 : /*
174 : * Don't start decoding WAL until the "xl_running_xacts" information
175 : * indicates there are no running xids with an xid smaller than this.
176 : */
177 : TransactionId initial_xmin_horizon;
178 :
179 : /* Indicates if we are building full snapshot or just catalog one. */
180 : bool building_full_snapshot;
181 :
182 : /*
183 : * Snapshot that's valid to see the catalog state seen at this moment.
184 : */
185 : Snapshot snapshot;
186 :
187 : /*
188 : * LSN of the last location we are sure a snapshot has been serialized to.
189 : */
190 : XLogRecPtr last_serialized_snapshot;
191 :
192 : /*
193 : * The reorderbuffer we need to update with usable snapshots et al.
194 : */
195 : ReorderBuffer *reorder;
196 :
197 : /*
198 : * Outdated: This struct isn't used for its original purpose anymore, but
199 : * can't be removed / changed in a minor version, because it's stored
200 : * on-disk.
201 : */
202 : struct
203 : {
204 : /*
205 : * NB: This field is misused, until a major version can break on-disk
206 : * compatibility. See SnapBuildNextPhaseAt() /
207 : * SnapBuildStartNextPhaseAt().
208 : */
209 : TransactionId was_xmin;
210 : TransactionId was_xmax;
211 :
212 : size_t was_xcnt; /* number of used xip entries */
213 : size_t was_xcnt_space; /* allocated size of xip */
214 : TransactionId *was_xip; /* running xacts array, xidComparator-sorted */
215 : } was_running;
216 :
217 : /*
218 : * Array of transactions which could have catalog changes that committed
219 : * between xmin and xmax.
220 : */
221 : struct
222 : {
223 : /* number of committed transactions */
224 : size_t xcnt;
225 :
226 : /* available space for committed transactions */
227 : size_t xcnt_space;
228 :
229 : /*
230 : * Until we reach a CONSISTENT state, we record commits of all
231 : * transactions, not just the catalog changing ones. Record when that
232 : * changes so we know we cannot export a snapshot safely anymore.
233 : */
234 : bool includes_all_transactions;
235 :
236 : /*
237 : * Array of committed transactions that have modified the catalog.
238 : *
239 : * As this array is frequently modified we do *not* keep it in
240 : * xidComparator order. Instead we sort the array when building &
241 : * distributing a snapshot.
242 : *
243 : * TODO: It's unclear whether that reasoning has much merit. Every
244 : * time we add something here after becoming consistent will also
245 : * require distributing a snapshot. Storing them sorted would
246 : * potentially also make it easier to purge (but more complicated wrt
247 : * wraparound?). Should be improved if sorting while building the
248 : * snapshot shows up in profiles.
249 : */
250 : TransactionId *xip;
251 : } committed;
252 : };
253 :
254 : /*
255 : * Starting a transaction -- which we need to do while exporting a snapshot --
256 : * removes knowledge about the previously used resowner, so we save it here.
257 : */
258 : static ResourceOwner SavedResourceOwnerDuringExport = NULL;
259 : static bool ExportInProgress = false;
260 :
261 : /* ->committed manipulation */
262 : static void SnapBuildPurgeCommittedTxn(SnapBuild *builder);
263 :
264 : /* snapshot building/manipulation/distribution functions */
265 : static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
266 :
267 : static void SnapBuildFreeSnapshot(Snapshot snap);
268 :
269 : static void SnapBuildSnapIncRefcount(Snapshot snap);
270 :
271 : static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn);
272 :
273 : /* xlog reading helper functions for SnapBuildProcessRecord */
274 : static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
275 : static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
276 :
277 : /* serialization functions */
278 : static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
279 : static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
280 :
281 : /*
282 : * Return TransactionId after which the next phase of initial snapshot
283 : * building will happen.
284 : */
285 : static inline TransactionId
286 0 : SnapBuildNextPhaseAt(SnapBuild *builder)
287 : {
288 : /*
289 : * For backward compatibility reasons this has to be stored in the wrongly
290 : * named field. Will be fixed in next major version.
291 : */
292 0 : return builder->was_running.was_xmax;
293 : }
294 :
295 : /*
296 : * Set TransactionId after which the next phase of initial snapshot building
297 : * will happen.
298 : */
299 : static inline void
300 0 : SnapBuildStartNextPhaseAt(SnapBuild *builder, TransactionId at)
301 : {
302 : /*
303 : * For backward compatibility reasons this has to be stored in the wrongly
304 : * named field. Will be fixed in next major version.
305 : */
306 0 : builder->was_running.was_xmax = at;
307 0 : }
308 :
309 : /*
310 : * Allocate a new snapshot builder.
311 : *
312 : * xmin_horizon is the xid >= which we can be sure no catalog rows have been
313 : * removed, start_lsn is the LSN >= we want to replay commits.
314 : */
315 : SnapBuild *
316 0 : AllocateSnapshotBuilder(ReorderBuffer *reorder,
317 : TransactionId xmin_horizon,
318 : XLogRecPtr start_lsn,
319 : bool need_full_snapshot)
320 : {
321 : MemoryContext context;
322 : MemoryContext oldcontext;
323 : SnapBuild *builder;
324 :
325 : /* allocate memory in own context, to have better accountability */
326 0 : context = AllocSetContextCreate(CurrentMemoryContext,
327 : "snapshot builder context",
328 : ALLOCSET_DEFAULT_SIZES);
329 0 : oldcontext = MemoryContextSwitchTo(context);
330 :
331 0 : builder = palloc0(sizeof(SnapBuild));
332 :
333 0 : builder->state = SNAPBUILD_START;
334 0 : builder->context = context;
335 0 : builder->reorder = reorder;
336 : /* Other struct members initialized by zeroing via palloc0 above */
337 :
338 0 : builder->committed.xcnt = 0;
339 0 : builder->committed.xcnt_space = 128; /* arbitrary number */
340 0 : builder->committed.xip =
341 0 : palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
342 0 : builder->committed.includes_all_transactions = true;
343 :
344 0 : builder->initial_xmin_horizon = xmin_horizon;
345 0 : builder->start_decoding_at = start_lsn;
346 0 : builder->building_full_snapshot = need_full_snapshot;
347 :
348 0 : MemoryContextSwitchTo(oldcontext);
349 :
350 0 : return builder;
351 : }
352 :
353 : /*
354 : * Free a snapshot builder.
355 : */
356 : void
357 0 : FreeSnapshotBuilder(SnapBuild *builder)
358 : {
359 0 : MemoryContext context = builder->context;
360 :
361 : /* free snapshot explicitly, that contains some error checking */
362 0 : if (builder->snapshot != NULL)
363 : {
364 0 : SnapBuildSnapDecRefcount(builder->snapshot);
365 0 : builder->snapshot = NULL;
366 : }
367 :
368 : /* other resources are deallocated via memory context reset */
369 0 : MemoryContextDelete(context);
370 0 : }
371 :
372 : /*
373 : * Free an unreferenced snapshot that has previously been built by us.
374 : */
375 : static void
376 0 : SnapBuildFreeSnapshot(Snapshot snap)
377 : {
378 : /* make sure we don't get passed an external snapshot */
379 0 : Assert(snap->satisfies == HeapTupleSatisfiesHistoricMVCC);
380 :
381 : /* make sure nobody modified our snapshot */
382 0 : Assert(snap->curcid == FirstCommandId);
383 0 : Assert(!snap->suboverflowed);
384 0 : Assert(!snap->takenDuringRecovery);
385 0 : Assert(snap->regd_count == 0);
386 :
387 : /* slightly more likely, so it's checked even without c-asserts */
388 0 : if (snap->copied)
389 0 : elog(ERROR, "cannot free a copied snapshot");
390 :
391 0 : if (snap->active_count)
392 0 : elog(ERROR, "cannot free an active snapshot");
393 :
394 0 : pfree(snap);
395 0 : }
396 :
397 : /*
398 : * In which state of snapshot building are we?
399 : */
400 : SnapBuildState
401 0 : SnapBuildCurrentState(SnapBuild *builder)
402 : {
403 0 : return builder->state;
404 : }
405 :
406 : /*
407 : * Should the contents of transaction ending at 'ptr' be decoded?
408 : */
409 : bool
410 0 : SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
411 : {
412 0 : return ptr < builder->start_decoding_at;
413 : }
414 :
415 : /*
416 : * Increase refcount of a snapshot.
417 : *
418 : * This is used when handing out a snapshot to some external resource or when
419 : * adding a Snapshot as builder->snapshot.
420 : */
421 : static void
422 0 : SnapBuildSnapIncRefcount(Snapshot snap)
423 : {
424 0 : snap->active_count++;
425 0 : }
426 :
427 : /*
428 : * Decrease refcount of a snapshot and free if the refcount reaches zero.
429 : *
430 : * Externally visible, so that external resources that have been handed an
431 : * IncRef'ed Snapshot can adjust its refcount easily.
432 : */
433 : void
434 0 : SnapBuildSnapDecRefcount(Snapshot snap)
435 : {
436 : /* make sure we don't get passed an external snapshot */
437 0 : Assert(snap->satisfies == HeapTupleSatisfiesHistoricMVCC);
438 :
439 : /* make sure nobody modified our snapshot */
440 0 : Assert(snap->curcid == FirstCommandId);
441 0 : Assert(!snap->suboverflowed);
442 0 : Assert(!snap->takenDuringRecovery);
443 :
444 0 : Assert(snap->regd_count == 0);
445 :
446 0 : Assert(snap->active_count > 0);
447 :
448 : /* slightly more likely, so it's checked even without casserts */
449 0 : if (snap->copied)
450 0 : elog(ERROR, "cannot free a copied snapshot");
451 :
452 0 : snap->active_count--;
453 0 : if (snap->active_count == 0)
454 0 : SnapBuildFreeSnapshot(snap);
455 0 : }
456 :
457 : /*
458 : * Build a new snapshot, based on currently committed catalog-modifying
459 : * transactions.
460 : *
461 : * In-progress transactions with catalog access are *not* allowed to modify
462 : * these snapshots; they have to copy them and fill in appropriate ->curcid
463 : * and ->subxip/subxcnt values.
464 : */
465 : static Snapshot
466 0 : SnapBuildBuildSnapshot(SnapBuild *builder)
467 : {
468 : Snapshot snapshot;
469 : Size ssize;
470 :
471 0 : Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
472 :
473 0 : ssize = sizeof(SnapshotData)
474 0 : + sizeof(TransactionId) * builder->committed.xcnt
475 0 : + sizeof(TransactionId) * 1 /* toplevel xid */ ;
476 :
477 0 : snapshot = MemoryContextAllocZero(builder->context, ssize);
478 :
479 0 : snapshot->satisfies = HeapTupleSatisfiesHistoricMVCC;
480 :
481 : /*
482 : * We misuse the original meaning of SnapshotData's xip and subxip fields
483 : * to make the more fitting for our needs.
484 : *
485 : * In the 'xip' array we store transactions that have to be treated as
486 : * committed. Since we will only ever look at tuples from transactions
487 : * that have modified the catalog it's more efficient to store those few
488 : * that exist between xmin and xmax (frequently there are none).
489 : *
490 : * Snapshots that are used in transactions that have modified the catalog
491 : * also use the 'subxip' array to store their toplevel xid and all the
492 : * subtransaction xids so we can recognize when we need to treat rows as
493 : * visible that are not in xip but still need to be visible. Subxip only
494 : * gets filled when the transaction is copied into the context of a
495 : * catalog modifying transaction since we otherwise share a snapshot
496 : * between transactions. As long as a txn hasn't modified the catalog it
497 : * doesn't need to treat any uncommitted rows as visible, so there is no
498 : * need for those xids.
499 : *
500 : * Both arrays are qsort'ed so that we can use bsearch() on them.
501 : */
502 0 : Assert(TransactionIdIsNormal(builder->xmin));
503 0 : Assert(TransactionIdIsNormal(builder->xmax));
504 :
505 0 : snapshot->xmin = builder->xmin;
506 0 : snapshot->xmax = builder->xmax;
507 :
508 : /* store all transactions to be treated as committed by this snapshot */
509 0 : snapshot->xip =
510 0 : (TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
511 0 : snapshot->xcnt = builder->committed.xcnt;
512 0 : memcpy(snapshot->xip,
513 0 : builder->committed.xip,
514 0 : builder->committed.xcnt * sizeof(TransactionId));
515 :
516 : /* sort so we can bsearch() */
517 0 : qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
518 :
519 : /*
520 : * Initially, subxip is empty, i.e. it's a snapshot to be used by
521 : * transactions that don't modify the catalog. Will be filled by
522 : * ReorderBufferCopySnap() if necessary.
523 : */
524 0 : snapshot->subxcnt = 0;
525 0 : snapshot->subxip = NULL;
526 :
527 0 : snapshot->suboverflowed = false;
528 0 : snapshot->takenDuringRecovery = false;
529 0 : snapshot->copied = false;
530 0 : snapshot->curcid = FirstCommandId;
531 0 : snapshot->active_count = 0;
532 0 : snapshot->regd_count = 0;
533 :
534 0 : return snapshot;
535 : }
536 :
537 : /*
538 : * Build the initial slot snapshot and convert it to a normal snapshot that
539 : * is understood by HeapTupleSatisfiesMVCC.
540 : *
541 : * The snapshot will be usable directly in current transaction or exported
542 : * for loading in different transaction.
543 : */
544 : Snapshot
545 0 : SnapBuildInitialSnapshot(SnapBuild *builder)
546 : {
547 : Snapshot snap;
548 : TransactionId xid;
549 : TransactionId *newxip;
550 0 : int newxcnt = 0;
551 :
552 0 : Assert(!FirstSnapshotSet);
553 0 : Assert(XactIsoLevel == XACT_REPEATABLE_READ);
554 :
555 0 : if (builder->state != SNAPBUILD_CONSISTENT)
556 0 : elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state");
557 :
558 0 : if (!builder->committed.includes_all_transactions)
559 0 : elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore");
560 :
561 : /* so we don't overwrite the existing value */
562 0 : if (TransactionIdIsValid(MyPgXact->xmin))
563 0 : elog(ERROR, "cannot build an initial slot snapshot when MyPgXact->xmin already is valid");
564 :
565 0 : snap = SnapBuildBuildSnapshot(builder);
566 :
567 : /*
568 : * We know that snap->xmin is alive, enforced by the logical xmin
569 : * mechanism. Due to that we can do this without locks, we're only
570 : * changing our own value.
571 : */
572 : #ifdef USE_ASSERT_CHECKING
573 : {
574 : TransactionId safeXid;
575 :
576 0 : LWLockAcquire(ProcArrayLock, LW_SHARED);
577 0 : safeXid = GetOldestSafeDecodingTransactionId(false);
578 0 : LWLockRelease(ProcArrayLock);
579 :
580 0 : Assert(TransactionIdPrecedesOrEquals(safeXid, snap->xmin));
581 : }
582 : #endif
583 :
584 0 : MyPgXact->xmin = snap->xmin;
585 :
586 : /* allocate in transaction context */
587 0 : newxip = (TransactionId *)
588 0 : palloc(sizeof(TransactionId) * GetMaxSnapshotXidCount());
589 :
590 : /*
591 : * snapbuild.c builds transactions in an "inverted" manner, which means it
592 : * stores committed transactions in ->xip, not ones in progress. Build a
593 : * classical snapshot by marking all non-committed transactions as
594 : * in-progress. This can be expensive.
595 : */
596 0 : for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
597 : {
598 : void *test;
599 :
600 : /*
601 : * Check whether transaction committed using the decoding snapshot
602 : * meaning of ->xip.
603 : */
604 0 : test = bsearch(&xid, snap->xip, snap->xcnt,
605 : sizeof(TransactionId), xidComparator);
606 :
607 0 : if (test == NULL)
608 : {
609 0 : if (newxcnt >= GetMaxSnapshotXidCount())
610 0 : ereport(ERROR,
611 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
612 : errmsg("initial slot snapshot too large")));
613 :
614 0 : newxip[newxcnt++] = xid;
615 : }
616 :
617 0 : TransactionIdAdvance(xid);
618 : }
619 :
620 0 : snap->xcnt = newxcnt;
621 0 : snap->xip = newxip;
622 :
623 0 : return snap;
624 : }
625 :
626 : /*
627 : * Export a snapshot so it can be set in another session with SET TRANSACTION
628 : * SNAPSHOT.
629 : *
630 : * For that we need to start a transaction in the current backend as the
631 : * importing side checks whether the source transaction is still open to make
632 : * sure the xmin horizon hasn't advanced since then.
633 : */
634 : const char *
635 0 : SnapBuildExportSnapshot(SnapBuild *builder)
636 : {
637 : Snapshot snap;
638 : char *snapname;
639 :
640 0 : if (IsTransactionOrTransactionBlock())
641 0 : elog(ERROR, "cannot export a snapshot from within a transaction");
642 :
643 0 : if (SavedResourceOwnerDuringExport)
644 0 : elog(ERROR, "can only export one snapshot at a time");
645 :
646 0 : SavedResourceOwnerDuringExport = CurrentResourceOwner;
647 0 : ExportInProgress = true;
648 :
649 0 : StartTransactionCommand();
650 :
651 : /* There doesn't seem to a nice API to set these */
652 0 : XactIsoLevel = XACT_REPEATABLE_READ;
653 0 : XactReadOnly = true;
654 :
655 0 : snap = SnapBuildInitialSnapshot(builder);
656 :
657 : /*
658 : * now that we've built a plain snapshot, make it active and use the
659 : * normal mechanisms for exporting it
660 : */
661 0 : snapname = ExportSnapshot(snap);
662 :
663 0 : ereport(LOG,
664 : (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
665 : "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
666 : snap->xcnt,
667 : snapname, snap->xcnt)));
668 0 : return snapname;
669 : }
670 :
671 : /*
672 : * Ensure there is a snapshot and if not build one for current transaction.
673 : */
674 : Snapshot
675 0 : SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid)
676 : {
677 0 : Assert(builder->state == SNAPBUILD_CONSISTENT);
678 :
679 : /* only build a new snapshot if we don't have a prebuilt one */
680 0 : if (builder->snapshot == NULL)
681 : {
682 0 : builder->snapshot = SnapBuildBuildSnapshot(builder);
683 : /* increase refcount for the snapshot builder */
684 0 : SnapBuildSnapIncRefcount(builder->snapshot);
685 : }
686 :
687 0 : return builder->snapshot;
688 : }
689 :
690 : /*
691 : * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
692 : * any. Aborts the previously started transaction and resets the resource
693 : * owner back to its original value.
694 : */
695 : void
696 0 : SnapBuildClearExportedSnapshot(void)
697 : {
698 : /* nothing exported, that is the usual case */
699 0 : if (!ExportInProgress)
700 0 : return;
701 :
702 0 : if (!IsTransactionState())
703 0 : elog(ERROR, "clearing exported snapshot in wrong transaction state");
704 :
705 : /* make sure nothing could have ever happened */
706 0 : AbortCurrentTransaction();
707 :
708 0 : CurrentResourceOwner = SavedResourceOwnerDuringExport;
709 0 : SavedResourceOwnerDuringExport = NULL;
710 0 : ExportInProgress = false;
711 : }
712 :
713 : /*
714 : * Handle the effects of a single heap change, appropriate to the current state
715 : * of the snapshot builder and returns whether changes made at (xid, lsn) can
716 : * be decoded.
717 : */
718 : bool
719 0 : SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
720 : {
721 : /*
722 : * We can't handle data in transactions if we haven't built a snapshot
723 : * yet, so don't store them.
724 : */
725 0 : if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
726 0 : return false;
727 :
728 : /*
729 : * No point in keeping track of changes in transactions that we don't have
730 : * enough information about to decode. This means that they started before
731 : * we got into the SNAPBUILD_FULL_SNAPSHOT state.
732 : */
733 0 : if (builder->state < SNAPBUILD_CONSISTENT &&
734 0 : TransactionIdPrecedes(xid, SnapBuildNextPhaseAt(builder)))
735 0 : return false;
736 :
737 : /*
738 : * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
739 : * be needed to decode the change we're currently processing.
740 : */
741 0 : if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
742 : {
743 : /* only build a new snapshot if we don't have a prebuilt one */
744 0 : if (builder->snapshot == NULL)
745 : {
746 0 : builder->snapshot = SnapBuildBuildSnapshot(builder);
747 : /* increase refcount for the snapshot builder */
748 0 : SnapBuildSnapIncRefcount(builder->snapshot);
749 : }
750 :
751 : /*
752 : * Increase refcount for the transaction we're handing the snapshot
753 : * out to.
754 : */
755 0 : SnapBuildSnapIncRefcount(builder->snapshot);
756 0 : ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
757 : builder->snapshot);
758 : }
759 :
760 0 : return true;
761 : }
762 :
763 : /*
764 : * Do CommandId/ComboCid handling after reading an xl_heap_new_cid record.
765 : * This implies that a transaction has done some form of write to system
766 : * catalogs.
767 : */
768 : void
769 0 : SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
770 : XLogRecPtr lsn, xl_heap_new_cid *xlrec)
771 : {
772 : CommandId cid;
773 :
774 : /*
775 : * we only log new_cid's if a catalog tuple was modified, so mark the
776 : * transaction as containing catalog modifications
777 : */
778 0 : ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
779 :
780 0 : ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
781 : xlrec->target_node, xlrec->target_tid,
782 : xlrec->cmin, xlrec->cmax,
783 : xlrec->combocid);
784 :
785 : /* figure out new command id */
786 0 : if (xlrec->cmin != InvalidCommandId &&
787 0 : xlrec->cmax != InvalidCommandId)
788 0 : cid = Max(xlrec->cmin, xlrec->cmax);
789 0 : else if (xlrec->cmax != InvalidCommandId)
790 0 : cid = xlrec->cmax;
791 0 : else if (xlrec->cmin != InvalidCommandId)
792 0 : cid = xlrec->cmin;
793 : else
794 : {
795 0 : cid = InvalidCommandId; /* silence compiler */
796 0 : elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
797 : }
798 :
799 0 : ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
800 0 : }
801 :
802 : /*
803 : * Add a new Snapshot to all transactions we're decoding that currently are
804 : * in-progress so they can see new catalog contents made by the transaction
805 : * that just committed. This is necessary because those in-progress
806 : * transactions will use the new catalog's contents from here on (at the very
807 : * least everything they do needs to be compatible with newer catalog
808 : * contents).
809 : */
810 : static void
811 0 : SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
812 : {
813 : dlist_iter txn_i;
814 : ReorderBufferTXN *txn;
815 :
816 : /*
817 : * Iterate through all toplevel transactions. This can include
818 : * subtransactions which we just don't yet know to be that, but that's
819 : * fine, they will just get an unnecessary snapshot queued.
820 : */
821 0 : dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
822 : {
823 0 : txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
824 :
825 0 : Assert(TransactionIdIsValid(txn->xid));
826 :
827 : /*
828 : * If we don't have a base snapshot yet, there are no changes in this
829 : * transaction which in turn implies we don't yet need a snapshot at
830 : * all. We'll add a snapshot when the first change gets queued.
831 : *
832 : * NB: This works correctly even for subtransactions because
833 : * ReorderBufferCommitChild() takes care to pass the parent the base
834 : * snapshot, and while iterating the changequeue we'll get the change
835 : * from the subtxn.
836 : */
837 0 : if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
838 0 : continue;
839 :
840 0 : elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
841 : txn->xid, (uint32) (lsn >> 32), (uint32) lsn);
842 :
843 : /*
844 : * increase the snapshot's refcount for the transaction we are handing
845 : * it out to
846 : */
847 0 : SnapBuildSnapIncRefcount(builder->snapshot);
848 0 : ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
849 : builder->snapshot);
850 : }
851 0 : }
852 :
853 : /*
854 : * Keep track of a new catalog changing transaction that has committed.
855 : */
856 : static void
857 0 : SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
858 : {
859 0 : Assert(TransactionIdIsValid(xid));
860 :
861 0 : if (builder->committed.xcnt == builder->committed.xcnt_space)
862 : {
863 0 : builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
864 :
865 0 : elog(DEBUG1, "increasing space for committed transactions to %u",
866 : (uint32) builder->committed.xcnt_space);
867 :
868 0 : builder->committed.xip = repalloc(builder->committed.xip,
869 0 : builder->committed.xcnt_space * sizeof(TransactionId));
870 : }
871 :
872 : /*
873 : * TODO: It might make sense to keep the array sorted here instead of
874 : * doing it every time we build a new snapshot. On the other hand this
875 : * gets called repeatedly when a transaction with subtransactions commits.
876 : */
877 0 : builder->committed.xip[builder->committed.xcnt++] = xid;
878 0 : }
879 :
880 : /*
881 : * Remove knowledge about transactions we treat as committed that are smaller
882 : * than ->xmin. Those won't ever get checked via the ->committed array but via
883 : * the clog machinery, so we don't need to waste memory on them.
884 : */
885 : static void
886 0 : SnapBuildPurgeCommittedTxn(SnapBuild *builder)
887 : {
888 : int off;
889 : TransactionId *workspace;
890 0 : int surviving_xids = 0;
891 :
892 : /* not ready yet */
893 0 : if (!TransactionIdIsNormal(builder->xmin))
894 0 : return;
895 :
896 : /* TODO: Neater algorithm than just copying and iterating? */
897 0 : workspace =
898 0 : MemoryContextAlloc(builder->context,
899 0 : builder->committed.xcnt * sizeof(TransactionId));
900 :
901 : /* copy xids that still are interesting to workspace */
902 0 : for (off = 0; off < builder->committed.xcnt; off++)
903 : {
904 0 : if (NormalTransactionIdPrecedes(builder->committed.xip[off],
905 : builder->xmin))
906 : ; /* remove */
907 : else
908 0 : workspace[surviving_xids++] = builder->committed.xip[off];
909 : }
910 :
911 : /* copy workspace back to persistent state */
912 0 : memcpy(builder->committed.xip, workspace,
913 : surviving_xids * sizeof(TransactionId));
914 :
915 0 : elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
916 : (uint32) builder->committed.xcnt, (uint32) surviving_xids,
917 : builder->xmin, builder->xmax);
918 0 : builder->committed.xcnt = surviving_xids;
919 :
920 0 : pfree(workspace);
921 : }
922 :
923 : /*
924 : * Handle everything that needs to be done when a transaction commits
925 : */
926 : void
927 0 : SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
928 : int nsubxacts, TransactionId *subxacts)
929 : {
930 : int nxact;
931 :
932 0 : bool needs_snapshot = false;
933 0 : bool needs_timetravel = false;
934 0 : bool sub_needs_timetravel = false;
935 :
936 0 : TransactionId xmax = xid;
937 :
938 : /*
939 : * Transactions preceding BUILDING_SNAPSHOT will neither be decoded, nor
940 : * will they be part of a snapshot. So we don't need to record anything.
941 : */
942 0 : if (builder->state == SNAPBUILD_START ||
943 0 : (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
944 0 : TransactionIdPrecedes(xid, SnapBuildNextPhaseAt(builder))))
945 : {
946 : /* ensure that only commits after this are getting replayed */
947 0 : if (builder->start_decoding_at <= lsn)
948 0 : builder->start_decoding_at = lsn + 1;
949 0 : return;
950 : }
951 :
952 0 : if (builder->state < SNAPBUILD_CONSISTENT)
953 : {
954 : /* ensure that only commits after this are getting replayed */
955 0 : if (builder->start_decoding_at <= lsn)
956 0 : builder->start_decoding_at = lsn + 1;
957 :
958 : /*
959 : * If building an exportable snapshot, force xid to be tracked, even
960 : * if the transaction didn't modify the catalog.
961 : */
962 0 : if (builder->building_full_snapshot)
963 : {
964 0 : needs_timetravel = true;
965 : }
966 : }
967 :
968 0 : for (nxact = 0; nxact < nsubxacts; nxact++)
969 : {
970 0 : TransactionId subxid = subxacts[nxact];
971 :
972 : /*
973 : * Add subtransaction to base snapshot if catalog modifying, we don't
974 : * distinguish to toplevel transactions there.
975 : */
976 0 : if (ReorderBufferXidHasCatalogChanges(builder->reorder, subxid))
977 : {
978 0 : sub_needs_timetravel = true;
979 0 : needs_snapshot = true;
980 :
981 0 : elog(DEBUG1, "found subtransaction %u:%u with catalog changes",
982 : xid, subxid);
983 :
984 0 : SnapBuildAddCommittedTxn(builder, subxid);
985 :
986 0 : if (NormalTransactionIdFollows(subxid, xmax))
987 0 : xmax = subxid;
988 : }
989 :
990 : /*
991 : * If we're forcing timetravel we also need visibility information
992 : * about subtransaction, so keep track of subtransaction's state, even
993 : * if not catalog modifying. Don't need to distribute a snapshot in
994 : * that case.
995 : */
996 0 : else if (needs_timetravel)
997 : {
998 0 : SnapBuildAddCommittedTxn(builder, subxid);
999 0 : if (NormalTransactionIdFollows(subxid, xmax))
1000 0 : xmax = subxid;
1001 : }
1002 : }
1003 :
1004 : /* if top-level modified catalog, it'll need a snapshot */
1005 0 : if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
1006 : {
1007 0 : elog(DEBUG2, "found top level transaction %u, with catalog changes",
1008 : xid);
1009 0 : needs_snapshot = true;
1010 0 : needs_timetravel = true;
1011 0 : SnapBuildAddCommittedTxn(builder, xid);
1012 : }
1013 0 : else if (sub_needs_timetravel)
1014 : {
1015 : /* track toplevel txn as well, subxact alone isn't meaningful */
1016 0 : SnapBuildAddCommittedTxn(builder, xid);
1017 : }
1018 0 : else if (needs_timetravel)
1019 : {
1020 0 : elog(DEBUG2, "forced transaction %u to do timetravel", xid);
1021 :
1022 0 : SnapBuildAddCommittedTxn(builder, xid);
1023 : }
1024 :
1025 0 : if (!needs_timetravel)
1026 : {
1027 : /* record that we cannot export a general snapshot anymore */
1028 0 : builder->committed.includes_all_transactions = false;
1029 : }
1030 :
1031 0 : Assert(!needs_snapshot || needs_timetravel);
1032 :
1033 : /*
1034 : * Adjust xmax of the snapshot builder, we only do that for committed,
1035 : * catalog modifying, transactions, everything else isn't interesting for
1036 : * us since we'll never look at the respective rows.
1037 : */
1038 0 : if (needs_timetravel &&
1039 0 : (!TransactionIdIsValid(builder->xmax) ||
1040 0 : TransactionIdFollowsOrEquals(xmax, builder->xmax)))
1041 : {
1042 0 : builder->xmax = xmax;
1043 0 : TransactionIdAdvance(builder->xmax);
1044 : }
1045 :
1046 : /* if there's any reason to build a historic snapshot, do so now */
1047 0 : if (needs_snapshot)
1048 : {
1049 : /*
1050 : * If we haven't built a complete snapshot yet there's no need to hand
1051 : * it out, it wouldn't (and couldn't) be used anyway.
1052 : */
1053 0 : if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
1054 0 : return;
1055 :
1056 : /*
1057 : * Decrease the snapshot builder's refcount of the old snapshot, note
1058 : * that it still will be used if it has been handed out to the
1059 : * reorderbuffer earlier.
1060 : */
1061 0 : if (builder->snapshot)
1062 0 : SnapBuildSnapDecRefcount(builder->snapshot);
1063 :
1064 0 : builder->snapshot = SnapBuildBuildSnapshot(builder);
1065 :
1066 : /* we might need to execute invalidations, add snapshot */
1067 0 : if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
1068 : {
1069 0 : SnapBuildSnapIncRefcount(builder->snapshot);
1070 0 : ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
1071 : builder->snapshot);
1072 : }
1073 :
1074 : /* refcount of the snapshot builder for the new snapshot */
1075 0 : SnapBuildSnapIncRefcount(builder->snapshot);
1076 :
1077 : /* add a new Snapshot to all currently running transactions */
1078 0 : SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
1079 : }
1080 : }
1081 :
1082 :
1083 : /* -----------------------------------
1084 : * Snapshot building functions dealing with xlog records
1085 : * -----------------------------------
1086 : */
1087 :
1088 : /*
1089 : * Process a running xacts record, and use its information to first build a
1090 : * historic snapshot and later to release resources that aren't needed
1091 : * anymore.
1092 : */
1093 : void
1094 0 : SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1095 : {
1096 : ReorderBufferTXN *txn;
1097 :
1098 : /*
1099 : * If we're not consistent yet, inspect the record to see whether it
1100 : * allows to get closer to being consistent. If we are consistent, dump
1101 : * our snapshot so others or we, after a restart, can use it.
1102 : */
1103 0 : if (builder->state < SNAPBUILD_CONSISTENT)
1104 : {
1105 : /* returns false if there's no point in performing cleanup just yet */
1106 0 : if (!SnapBuildFindSnapshot(builder, lsn, running))
1107 0 : return;
1108 : }
1109 : else
1110 0 : SnapBuildSerialize(builder, lsn);
1111 :
1112 : /*
1113 : * Update range of interesting xids based on the running xacts
1114 : * information. We don't increase ->xmax using it, because once we are in
1115 : * a consistent state we can do that ourselves and much more efficiently
1116 : * so, because we only need to do it for catalog transactions since we
1117 : * only ever look at those.
1118 : *
1119 : * NB: We only increase xmax when a catalog modifying transaction commits
1120 : * (see SnapBuildCommitTxn). Because of this, xmax can be lower than
1121 : * xmin, which looks odd but is correct and actually more efficient, since
1122 : * we hit fast paths in tqual.c.
1123 : */
1124 0 : builder->xmin = running->oldestRunningXid;
1125 :
1126 : /* Remove transactions we don't need to keep track off anymore */
1127 0 : SnapBuildPurgeCommittedTxn(builder);
1128 :
1129 0 : elog(DEBUG3, "xmin: %u, xmax: %u, oldestrunning: %u",
1130 : builder->xmin, builder->xmax,
1131 : running->oldestRunningXid);
1132 :
1133 : /*
1134 : * Increase shared memory limits, so vacuum can work on tuples we
1135 : * prevented from being pruned till now.
1136 : */
1137 0 : LogicalIncreaseXminForSlot(lsn, running->oldestRunningXid);
1138 :
1139 : /*
1140 : * Also tell the slot where we can restart decoding from. We don't want to
1141 : * do that after every commit because changing that implies an fsync of
1142 : * the logical slot's state file, so we only do it every time we see a
1143 : * running xacts record.
1144 : *
1145 : * Do so by looking for the oldest in progress transaction (determined by
1146 : * the first LSN of any of its relevant records). Every transaction
1147 : * remembers the last location we stored the snapshot to disk before its
1148 : * beginning. That point is where we can restart from.
1149 : */
1150 :
1151 : /*
1152 : * Can't know about a serialized snapshot's location if we're not
1153 : * consistent.
1154 : */
1155 0 : if (builder->state < SNAPBUILD_CONSISTENT)
1156 0 : return;
1157 :
1158 0 : txn = ReorderBufferGetOldestTXN(builder->reorder);
1159 :
1160 : /*
1161 : * oldest ongoing txn might have started when we didn't yet serialize
1162 : * anything because we hadn't reached a consistent state yet.
1163 : */
1164 0 : if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
1165 0 : LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
1166 :
1167 : /*
1168 : * No in-progress transaction, can reuse the last serialized snapshot if
1169 : * we have one.
1170 : */
1171 0 : else if (txn == NULL &&
1172 0 : builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
1173 0 : builder->last_serialized_snapshot != InvalidXLogRecPtr)
1174 0 : LogicalIncreaseRestartDecodingForSlot(lsn,
1175 : builder->last_serialized_snapshot);
1176 : }
1177 :
1178 :
1179 : /*
1180 : * Build the start of a snapshot that's capable of decoding the catalog.
1181 : *
1182 : * Helper function for SnapBuildProcessRunningXacts() while we're not yet
1183 : * consistent.
1184 : *
1185 : * Returns true if there is a point in performing internal maintenance/cleanup
1186 : * using the xl_running_xacts record.
1187 : */
1188 : static bool
1189 0 : SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1190 : {
1191 : /* ---
1192 : * Build catalog decoding snapshot incrementally using information about
1193 : * the currently running transactions. There are several ways to do that:
1194 : *
1195 : * a) There were no running transactions when the xl_running_xacts record
1196 : * was inserted, jump to CONSISTENT immediately. We might find such a
1197 : * state while waiting on c)'s sub-states.
1198 : *
1199 : * b) This (in a previous run) or another decoding slot serialized a
1200 : * snapshot to disk that we can use. Can't use this method for the
1201 : * initial snapshot when slot is being created and needs full snapshot
1202 : * for export or direct use, as that snapshot will only contain catalog
1203 : * modifying transactions.
1204 : *
1205 : * c) First incrementally build a snapshot for catalog tuples
1206 : * (BUILDING_SNAPSHOT), that requires all, already in-progress,
1207 : * transactions to finish. Every transaction starting after that
1208 : * (FULL_SNAPSHOT state), has enough information to be decoded. But
1209 : * for older running transactions no viable snapshot exists yet, so
1210 : * CONSISTENT will only be reached once all of those have finished.
1211 : * ---
1212 : */
1213 :
1214 : /*
1215 : * xl_running_xact record is older than what we can use, we might not have
1216 : * all necessary catalog rows anymore.
1217 : */
1218 0 : if (TransactionIdIsNormal(builder->initial_xmin_horizon) &&
1219 0 : NormalTransactionIdPrecedes(running->oldestRunningXid,
1220 : builder->initial_xmin_horizon))
1221 : {
1222 0 : ereport(DEBUG1,
1223 : (errmsg_internal("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
1224 : (uint32) (lsn >> 32), (uint32) lsn),
1225 : errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
1226 : builder->initial_xmin_horizon, running->oldestRunningXid)));
1227 :
1228 :
1229 0 : SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
1230 :
1231 0 : return true;
1232 : }
1233 :
1234 : /*
1235 : * a) No transaction were running, we can jump to consistent.
1236 : *
1237 : * This is not affected by races around xl_running_xacts, because we can
1238 : * miss transaction commits, but currently not transactions starting.
1239 : *
1240 : * NB: We might have already started to incrementally assemble a snapshot,
1241 : * so we need to be careful to deal with that.
1242 : */
1243 0 : if (running->oldestRunningXid == running->nextXid)
1244 : {
1245 0 : if (builder->start_decoding_at == InvalidXLogRecPtr ||
1246 0 : builder->start_decoding_at <= lsn)
1247 : /* can decode everything after this */
1248 0 : builder->start_decoding_at = lsn + 1;
1249 :
1250 : /* As no transactions were running xmin/xmax can be trivially set. */
1251 0 : builder->xmin = running->nextXid; /* < are finished */
1252 0 : builder->xmax = running->nextXid; /* >= are running */
1253 :
1254 : /* so we can safely use the faster comparisons */
1255 0 : Assert(TransactionIdIsNormal(builder->xmin));
1256 0 : Assert(TransactionIdIsNormal(builder->xmax));
1257 :
1258 0 : builder->state = SNAPBUILD_CONSISTENT;
1259 0 : SnapBuildStartNextPhaseAt(builder, InvalidTransactionId);
1260 :
1261 0 : ereport(LOG,
1262 : (errmsg("logical decoding found consistent point at %X/%X",
1263 : (uint32) (lsn >> 32), (uint32) lsn),
1264 : errdetail("There are no running transactions.")));
1265 :
1266 0 : return false;
1267 : }
1268 : /* b) valid on disk state and not building full snapshot */
1269 0 : else if (!builder->building_full_snapshot &&
1270 0 : SnapBuildRestore(builder, lsn))
1271 : {
1272 : /* there won't be any state to cleanup */
1273 0 : return false;
1274 : }
1275 :
1276 : /*
1277 : * c) transition from START to BUILDING_SNAPSHOT.
1278 : *
1279 : * In START state, and a xl_running_xacts record with running xacts is
1280 : * encountered. In that case, switch to BUILDING_SNAPSHOT state, and
1281 : * record xl_running_xacts->nextXid. Once all running xacts have finished
1282 : * (i.e. they're all >= nextXid), we have a complete catalog snapshot. It
1283 : * might look that we could use xl_running_xact's ->xids information to
1284 : * get there quicker, but that is problematic because transactions marked
1285 : * as running, might already have inserted their commit record - it's
1286 : * infeasible to change that with locking.
1287 : */
1288 0 : else if (builder->state == SNAPBUILD_START)
1289 : {
1290 0 : builder->state = SNAPBUILD_BUILDING_SNAPSHOT;
1291 0 : SnapBuildStartNextPhaseAt(builder, running->nextXid);
1292 :
1293 : /*
1294 : * Start with an xmin/xmax that's correct for future, when all the
1295 : * currently running transactions have finished. We'll update both
1296 : * while waiting for the pending transactions to finish.
1297 : */
1298 0 : builder->xmin = running->nextXid; /* < are finished */
1299 0 : builder->xmax = running->nextXid; /* >= are running */
1300 :
1301 : /* so we can safely use the faster comparisons */
1302 0 : Assert(TransactionIdIsNormal(builder->xmin));
1303 0 : Assert(TransactionIdIsNormal(builder->xmax));
1304 :
1305 0 : ereport(LOG,
1306 : (errmsg("logical decoding found initial starting point at %X/%X",
1307 : (uint32) (lsn >> 32), (uint32) lsn),
1308 : errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1309 : running->xcnt, running->nextXid)));
1310 :
1311 0 : SnapBuildWaitSnapshot(running, running->nextXid);
1312 : }
1313 :
1314 : /*
1315 : * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
1316 : *
1317 : * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
1318 : * is >= than nextXid from when we switched to BUILDING_SNAPSHOT. This
1319 : * means all transactions starting afterwards have enough information to
1320 : * be decoded. Switch to FULL_SNAPSHOT.
1321 : */
1322 0 : else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
1323 0 : TransactionIdPrecedesOrEquals(SnapBuildNextPhaseAt(builder),
1324 : running->oldestRunningXid))
1325 : {
1326 0 : builder->state = SNAPBUILD_FULL_SNAPSHOT;
1327 0 : SnapBuildStartNextPhaseAt(builder, running->nextXid);
1328 :
1329 0 : ereport(LOG,
1330 : (errmsg("logical decoding found initial consistent point at %X/%X",
1331 : (uint32) (lsn >> 32), (uint32) lsn),
1332 : errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1333 : running->xcnt, running->nextXid)));
1334 :
1335 0 : SnapBuildWaitSnapshot(running, running->nextXid);
1336 : }
1337 :
1338 : /*
1339 : * c) transition from FULL_SNAPSHOT to CONSISTENT.
1340 : *
1341 : * In FULL_SNAPSHOT state (see d) ), and this xl_running_xacts'
1342 : * oldestRunningXid is >= than nextXid from when we switched to
1343 : * FULL_SNAPSHOT. This means all transactions that are currently in
1344 : * progress have a catalog snapshot, and all their changes have been
1345 : * collected. Switch to CONSISTENT.
1346 : */
1347 0 : else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
1348 0 : TransactionIdPrecedesOrEquals(SnapBuildNextPhaseAt(builder),
1349 : running->oldestRunningXid))
1350 : {
1351 0 : builder->state = SNAPBUILD_CONSISTENT;
1352 0 : SnapBuildStartNextPhaseAt(builder, InvalidTransactionId);
1353 :
1354 0 : ereport(LOG,
1355 : (errmsg("logical decoding found consistent point at %X/%X",
1356 : (uint32) (lsn >> 32), (uint32) lsn),
1357 : errdetail("There are no old transactions anymore.")));
1358 : }
1359 :
1360 : /*
1361 : * We already started to track running xacts and need to wait for all
1362 : * in-progress ones to finish. We fall through to the normal processing of
1363 : * records so incremental cleanup can be performed.
1364 : */
1365 0 : return true;
1366 :
1367 : }
1368 :
1369 : /* ---
1370 : * Iterate through xids in record, wait for all older than the cutoff to
1371 : * finish. Then, if possible, log a new xl_running_xacts record.
1372 : *
1373 : * This isn't required for the correctness of decoding, but to:
1374 : * a) allow isolationtester to notice that we're currently waiting for
1375 : * something.
1376 : * b) log a new xl_running_xacts record where it'd be helpful, without having
1377 : * to write for bgwriter or checkpointer.
1378 : * ---
1379 : */
1380 : static void
1381 0 : SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
1382 : {
1383 : int off;
1384 :
1385 0 : for (off = 0; off < running->xcnt; off++)
1386 : {
1387 0 : TransactionId xid = running->xids[off];
1388 :
1389 : /*
1390 : * Upper layers should prevent that we ever need to wait on ourselves.
1391 : * Check anyway, since failing to do so would either result in an
1392 : * endless wait or an Assert() failure.
1393 : */
1394 0 : if (TransactionIdIsCurrentTransactionId(xid))
1395 0 : elog(ERROR, "waiting for ourselves");
1396 :
1397 0 : if (TransactionIdFollows(xid, cutoff))
1398 0 : continue;
1399 :
1400 0 : XactLockTableWait(xid, NULL, NULL, XLTW_None);
1401 : }
1402 :
1403 : /*
1404 : * All transactions we needed to finish finished - try to ensure there is
1405 : * another xl_running_xacts record in a timely manner, without having to
1406 : * write for bgwriter or checkpointer to log one. During recovery we
1407 : * can't enforce that, so we'll have to wait.
1408 : */
1409 0 : if (!RecoveryInProgress())
1410 : {
1411 0 : LogStandbySnapshot();
1412 : }
1413 0 : }
1414 :
1415 : /* -----------------------------------
1416 : * Snapshot serialization support
1417 : * -----------------------------------
1418 : */
1419 :
1420 : /*
1421 : * We store current state of struct SnapBuild on disk in the following manner:
1422 : *
1423 : * struct SnapBuildOnDisk;
1424 : * TransactionId * running.xcnt_space;
1425 : * TransactionId * committed.xcnt; (*not xcnt_space*)
1426 : *
1427 : */
1428 : typedef struct SnapBuildOnDisk
1429 : {
1430 : /* first part of this struct needs to be version independent */
1431 :
1432 : /* data not covered by checksum */
1433 : uint32 magic;
1434 : pg_crc32c checksum;
1435 :
1436 : /* data covered by checksum */
1437 :
1438 : /* version, in case we want to support pg_upgrade */
1439 : uint32 version;
1440 : /* how large is the on disk data, excluding the constant sized part */
1441 : uint32 length;
1442 :
1443 : /* version dependent part */
1444 : SnapBuild builder;
1445 :
1446 : /* variable amount of TransactionIds follows */
1447 : } SnapBuildOnDisk;
1448 :
1449 : #define SnapBuildOnDiskConstantSize \
1450 : offsetof(SnapBuildOnDisk, builder)
1451 : #define SnapBuildOnDiskNotChecksummedSize \
1452 : offsetof(SnapBuildOnDisk, version)
1453 :
1454 : #define SNAPBUILD_MAGIC 0x51A1E001
1455 : #define SNAPBUILD_VERSION 2
1456 :
1457 : /*
1458 : * Store/Load a snapshot from disk, depending on the snapshot builder's state.
1459 : *
1460 : * Supposed to be used by external (i.e. not snapbuild.c) code that just read
1461 : * a record that's a potential location for a serialized snapshot.
1462 : */
1463 : void
1464 0 : SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)
1465 : {
1466 0 : if (builder->state < SNAPBUILD_CONSISTENT)
1467 0 : SnapBuildRestore(builder, lsn);
1468 : else
1469 0 : SnapBuildSerialize(builder, lsn);
1470 0 : }
1471 :
1472 : /*
1473 : * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already
1474 : * been done by another decoding process.
1475 : */
1476 : static void
1477 0 : SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
1478 : {
1479 : Size needed_length;
1480 : SnapBuildOnDisk *ondisk;
1481 : char *ondisk_c;
1482 : int fd;
1483 : char tmppath[MAXPGPATH];
1484 : char path[MAXPGPATH];
1485 : int ret;
1486 : struct stat stat_buf;
1487 : Size sz;
1488 :
1489 0 : Assert(lsn != InvalidXLogRecPtr);
1490 0 : Assert(builder->last_serialized_snapshot == InvalidXLogRecPtr ||
1491 : builder->last_serialized_snapshot <= lsn);
1492 :
1493 : /*
1494 : * no point in serializing if we cannot continue to work immediately after
1495 : * restoring the snapshot
1496 : */
1497 0 : if (builder->state < SNAPBUILD_CONSISTENT)
1498 0 : return;
1499 :
1500 : /*
1501 : * We identify snapshots by the LSN they are valid for. We don't need to
1502 : * include timelines in the name as each LSN maps to exactly one timeline
1503 : * unless the user used pg_resetwal or similar. If a user did so, there's
1504 : * no hope continuing to decode anyway.
1505 : */
1506 0 : sprintf(path, "pg_logical/snapshots/%X-%X.snap",
1507 0 : (uint32) (lsn >> 32), (uint32) lsn);
1508 :
1509 : /*
1510 : * first check whether some other backend already has written the snapshot
1511 : * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
1512 : * as a valid state. Everything else is an unexpected error.
1513 : */
1514 0 : ret = stat(path, &stat_buf);
1515 :
1516 0 : if (ret != 0 && errno != ENOENT)
1517 0 : ereport(ERROR,
1518 : (errmsg("could not stat file \"%s\": %m", path)));
1519 :
1520 0 : else if (ret == 0)
1521 : {
1522 : /*
1523 : * somebody else has already serialized to this point, don't overwrite
1524 : * but remember location, so we don't need to read old data again.
1525 : *
1526 : * To be sure it has been synced to disk after the rename() from the
1527 : * tempfile filename to the real filename, we just repeat the fsync.
1528 : * That ought to be cheap because in most scenarios it should already
1529 : * be safely on disk.
1530 : */
1531 0 : fsync_fname(path, false);
1532 0 : fsync_fname("pg_logical/snapshots", true);
1533 :
1534 0 : builder->last_serialized_snapshot = lsn;
1535 0 : goto out;
1536 : }
1537 :
1538 : /*
1539 : * there is an obvious race condition here between the time we stat(2) the
1540 : * file and us writing the file. But we rename the file into place
1541 : * atomically and all files created need to contain the same data anyway,
1542 : * so this is perfectly fine, although a bit of a resource waste. Locking
1543 : * seems like pointless complication.
1544 : */
1545 0 : elog(DEBUG1, "serializing snapshot to %s", path);
1546 :
1547 : /* to make sure only we will write to this tempfile, include pid */
1548 0 : sprintf(tmppath, "pg_logical/snapshots/%X-%X.snap.%u.tmp",
1549 0 : (uint32) (lsn >> 32), (uint32) lsn, MyProcPid);
1550 :
1551 : /*
1552 : * Unlink temporary file if it already exists, needs to have been before a
1553 : * crash/error since we won't enter this function twice from within a
1554 : * single decoding slot/backend and the temporary file contains the pid of
1555 : * the current process.
1556 : */
1557 0 : if (unlink(tmppath) != 0 && errno != ENOENT)
1558 0 : ereport(ERROR,
1559 : (errcode_for_file_access(),
1560 : errmsg("could not remove file \"%s\": %m", path)));
1561 :
1562 0 : needed_length = sizeof(SnapBuildOnDisk) +
1563 0 : sizeof(TransactionId) * builder->committed.xcnt;
1564 :
1565 0 : ondisk_c = MemoryContextAllocZero(builder->context, needed_length);
1566 0 : ondisk = (SnapBuildOnDisk *) ondisk_c;
1567 0 : ondisk->magic = SNAPBUILD_MAGIC;
1568 0 : ondisk->version = SNAPBUILD_VERSION;
1569 0 : ondisk->length = needed_length;
1570 0 : INIT_CRC32C(ondisk->checksum);
1571 0 : COMP_CRC32C(ondisk->checksum,
1572 : ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1573 : SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1574 0 : ondisk_c += sizeof(SnapBuildOnDisk);
1575 :
1576 0 : memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
1577 : /* NULL-ify memory-only data */
1578 0 : ondisk->builder.context = NULL;
1579 0 : ondisk->builder.snapshot = NULL;
1580 0 : ondisk->builder.reorder = NULL;
1581 0 : ondisk->builder.committed.xip = NULL;
1582 :
1583 0 : COMP_CRC32C(ondisk->checksum,
1584 : &ondisk->builder,
1585 : sizeof(SnapBuild));
1586 :
1587 : /* there shouldn't be any running xacts */
1588 0 : Assert(builder->was_running.was_xcnt == 0);
1589 :
1590 : /* copy committed xacts */
1591 0 : sz = sizeof(TransactionId) * builder->committed.xcnt;
1592 0 : memcpy(ondisk_c, builder->committed.xip, sz);
1593 0 : COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1594 0 : ondisk_c += sz;
1595 :
1596 0 : FIN_CRC32C(ondisk->checksum);
1597 :
1598 : /* we have valid data now, open tempfile and write it there */
1599 0 : fd = OpenTransientFile(tmppath,
1600 : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
1601 : S_IRUSR | S_IWUSR);
1602 0 : if (fd < 0)
1603 0 : ereport(ERROR,
1604 : (errmsg("could not open file \"%s\": %m", path)));
1605 :
1606 0 : pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE);
1607 0 : if ((write(fd, ondisk, needed_length)) != needed_length)
1608 : {
1609 0 : CloseTransientFile(fd);
1610 0 : ereport(ERROR,
1611 : (errcode_for_file_access(),
1612 : errmsg("could not write to file \"%s\": %m", tmppath)));
1613 : }
1614 0 : pgstat_report_wait_end();
1615 :
1616 : /*
1617 : * fsync the file before renaming so that even if we crash after this we
1618 : * have either a fully valid file or nothing.
1619 : *
1620 : * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
1621 : * some noticeable overhead since it's performed synchronously during
1622 : * decoding?
1623 : */
1624 0 : pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_SYNC);
1625 0 : if (pg_fsync(fd) != 0)
1626 : {
1627 0 : CloseTransientFile(fd);
1628 0 : ereport(ERROR,
1629 : (errcode_for_file_access(),
1630 : errmsg("could not fsync file \"%s\": %m", tmppath)));
1631 : }
1632 0 : pgstat_report_wait_end();
1633 0 : CloseTransientFile(fd);
1634 :
1635 0 : fsync_fname("pg_logical/snapshots", true);
1636 :
1637 : /*
1638 : * We may overwrite the work from some other backend, but that's ok, our
1639 : * snapshot is valid as well, we'll just have done some superfluous work.
1640 : */
1641 0 : if (rename(tmppath, path) != 0)
1642 : {
1643 0 : ereport(ERROR,
1644 : (errcode_for_file_access(),
1645 : errmsg("could not rename file \"%s\" to \"%s\": %m",
1646 : tmppath, path)));
1647 : }
1648 :
1649 : /* make sure we persist */
1650 0 : fsync_fname(path, false);
1651 0 : fsync_fname("pg_logical/snapshots", true);
1652 :
1653 : /*
1654 : * Now there's no way we can loose the dumped state anymore, remember this
1655 : * as a serialization point.
1656 : */
1657 0 : builder->last_serialized_snapshot = lsn;
1658 :
1659 : out:
1660 0 : ReorderBufferSetRestartPoint(builder->reorder,
1661 : builder->last_serialized_snapshot);
1662 : }
1663 :
1664 : /*
1665 : * Restore a snapshot into 'builder' if previously one has been stored at the
1666 : * location indicated by 'lsn'. Returns true if successful, false otherwise.
1667 : */
1668 : static bool
1669 0 : SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
1670 : {
1671 : SnapBuildOnDisk ondisk;
1672 : int fd;
1673 : char path[MAXPGPATH];
1674 : Size sz;
1675 : int readBytes;
1676 : pg_crc32c checksum;
1677 :
1678 : /* no point in loading a snapshot if we're already there */
1679 0 : if (builder->state == SNAPBUILD_CONSISTENT)
1680 0 : return false;
1681 :
1682 0 : sprintf(path, "pg_logical/snapshots/%X-%X.snap",
1683 0 : (uint32) (lsn >> 32), (uint32) lsn);
1684 :
1685 0 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
1686 :
1687 0 : if (fd < 0 && errno == ENOENT)
1688 0 : return false;
1689 0 : else if (fd < 0)
1690 0 : ereport(ERROR,
1691 : (errcode_for_file_access(),
1692 : errmsg("could not open file \"%s\": %m", path)));
1693 :
1694 : /* ----
1695 : * Make sure the snapshot had been stored safely to disk, that's normally
1696 : * cheap.
1697 : * Note that we do not need PANIC here, nobody will be able to use the
1698 : * slot without fsyncing, and saving it won't succeed without an fsync()
1699 : * either...
1700 : * ----
1701 : */
1702 0 : fsync_fname(path, false);
1703 0 : fsync_fname("pg_logical/snapshots", true);
1704 :
1705 :
1706 : /* read statically sized portion of snapshot */
1707 0 : pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
1708 0 : readBytes = read(fd, &ondisk, SnapBuildOnDiskConstantSize);
1709 0 : pgstat_report_wait_end();
1710 0 : if (readBytes != SnapBuildOnDiskConstantSize)
1711 : {
1712 0 : CloseTransientFile(fd);
1713 0 : ereport(ERROR,
1714 : (errcode_for_file_access(),
1715 : errmsg("could not read file \"%s\", read %d of %d: %m",
1716 : path, readBytes, (int) SnapBuildOnDiskConstantSize)));
1717 : }
1718 :
1719 0 : if (ondisk.magic != SNAPBUILD_MAGIC)
1720 0 : ereport(ERROR,
1721 : (errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
1722 : path, ondisk.magic, SNAPBUILD_MAGIC)));
1723 :
1724 0 : if (ondisk.version != SNAPBUILD_VERSION)
1725 0 : ereport(ERROR,
1726 : (errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
1727 : path, ondisk.version, SNAPBUILD_VERSION)));
1728 :
1729 0 : INIT_CRC32C(checksum);
1730 0 : COMP_CRC32C(checksum,
1731 : ((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize,
1732 : SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1733 :
1734 : /* read SnapBuild */
1735 0 : pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
1736 0 : readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild));
1737 0 : pgstat_report_wait_end();
1738 0 : if (readBytes != sizeof(SnapBuild))
1739 : {
1740 0 : CloseTransientFile(fd);
1741 0 : ereport(ERROR,
1742 : (errcode_for_file_access(),
1743 : errmsg("could not read file \"%s\", read %d of %d: %m",
1744 : path, readBytes, (int) sizeof(SnapBuild))));
1745 : }
1746 0 : COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
1747 :
1748 : /* restore running xacts (dead, but kept for backward compat) */
1749 0 : sz = sizeof(TransactionId) * ondisk.builder.was_running.was_xcnt_space;
1750 0 : ondisk.builder.was_running.was_xip =
1751 0 : MemoryContextAllocZero(builder->context, sz);
1752 0 : pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
1753 0 : readBytes = read(fd, ondisk.builder.was_running.was_xip, sz);
1754 0 : pgstat_report_wait_end();
1755 0 : if (readBytes != sz)
1756 : {
1757 0 : CloseTransientFile(fd);
1758 0 : ereport(ERROR,
1759 : (errcode_for_file_access(),
1760 : errmsg("could not read file \"%s\", read %d of %d: %m",
1761 : path, readBytes, (int) sz)));
1762 : }
1763 0 : COMP_CRC32C(checksum, ondisk.builder.was_running.was_xip, sz);
1764 :
1765 : /* restore committed xacts information */
1766 0 : sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
1767 0 : ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
1768 0 : pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
1769 0 : readBytes = read(fd, ondisk.builder.committed.xip, sz);
1770 0 : pgstat_report_wait_end();
1771 0 : if (readBytes != sz)
1772 : {
1773 0 : CloseTransientFile(fd);
1774 0 : ereport(ERROR,
1775 : (errcode_for_file_access(),
1776 : errmsg("could not read file \"%s\", read %d of %d: %m",
1777 : path, readBytes, (int) sz)));
1778 : }
1779 0 : COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz);
1780 :
1781 0 : CloseTransientFile(fd);
1782 :
1783 0 : FIN_CRC32C(checksum);
1784 :
1785 : /* verify checksum of what we've read */
1786 0 : if (!EQ_CRC32C(checksum, ondisk.checksum))
1787 0 : ereport(ERROR,
1788 : (errcode_for_file_access(),
1789 : errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
1790 : path, checksum, ondisk.checksum)));
1791 :
1792 : /*
1793 : * ok, we now have a sensible snapshot here, figure out if it has more
1794 : * information than we have.
1795 : */
1796 :
1797 : /*
1798 : * We are only interested in consistent snapshots for now, comparing
1799 : * whether one incomplete snapshot is more "advanced" seems to be
1800 : * unnecessarily complex.
1801 : */
1802 0 : if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
1803 0 : goto snapshot_not_interesting;
1804 :
1805 : /*
1806 : * Don't use a snapshot that requires an xmin that we cannot guarantee to
1807 : * be available.
1808 : */
1809 0 : if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon))
1810 0 : goto snapshot_not_interesting;
1811 :
1812 :
1813 : /* ok, we think the snapshot is sensible, copy over everything important */
1814 0 : builder->xmin = ondisk.builder.xmin;
1815 0 : builder->xmax = ondisk.builder.xmax;
1816 0 : builder->state = ondisk.builder.state;
1817 :
1818 0 : builder->committed.xcnt = ondisk.builder.committed.xcnt;
1819 : /* We only allocated/stored xcnt, not xcnt_space xids ! */
1820 : /* don't overwrite preallocated xip, if we don't have anything here */
1821 0 : if (builder->committed.xcnt > 0)
1822 : {
1823 0 : pfree(builder->committed.xip);
1824 0 : builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
1825 0 : builder->committed.xip = ondisk.builder.committed.xip;
1826 : }
1827 0 : ondisk.builder.committed.xip = NULL;
1828 :
1829 : /* our snapshot is not interesting anymore, build a new one */
1830 0 : if (builder->snapshot != NULL)
1831 : {
1832 0 : SnapBuildSnapDecRefcount(builder->snapshot);
1833 : }
1834 0 : builder->snapshot = SnapBuildBuildSnapshot(builder);
1835 0 : SnapBuildSnapIncRefcount(builder->snapshot);
1836 :
1837 0 : ReorderBufferSetRestartPoint(builder->reorder, lsn);
1838 :
1839 0 : Assert(builder->state == SNAPBUILD_CONSISTENT);
1840 :
1841 0 : ereport(LOG,
1842 : (errmsg("logical decoding found consistent point at %X/%X",
1843 : (uint32) (lsn >> 32), (uint32) lsn),
1844 : errdetail("Logical decoding will begin using saved snapshot.")));
1845 0 : return true;
1846 :
1847 : snapshot_not_interesting:
1848 0 : if (ondisk.builder.committed.xip != NULL)
1849 0 : pfree(ondisk.builder.committed.xip);
1850 0 : return false;
1851 : }
1852 :
1853 : /*
1854 : * Remove all serialized snapshots that are not required anymore because no
1855 : * slot can need them. This doesn't actually have to run during a checkpoint,
1856 : * but it's a convenient point to schedule this.
1857 : *
1858 : * NB: We run this during checkpoints even if logical decoding is disabled so
1859 : * we cleanup old slots at some point after it got disabled.
1860 : */
1861 : void
1862 22 : CheckPointSnapBuild(void)
1863 : {
1864 : XLogRecPtr cutoff;
1865 : XLogRecPtr redo;
1866 : DIR *snap_dir;
1867 : struct dirent *snap_de;
1868 : char path[MAXPGPATH + 21];
1869 :
1870 : /*
1871 : * We start off with a minimum of the last redo pointer. No new
1872 : * replication slot will start before that, so that's a safe upper bound
1873 : * for removal.
1874 : */
1875 22 : redo = GetRedoRecPtr();
1876 :
1877 : /* now check for the restart ptrs from existing slots */
1878 22 : cutoff = ReplicationSlotsComputeLogicalRestartLSN();
1879 :
1880 : /* don't start earlier than the restart lsn */
1881 22 : if (redo < cutoff)
1882 0 : cutoff = redo;
1883 :
1884 22 : snap_dir = AllocateDir("pg_logical/snapshots");
1885 88 : while ((snap_de = ReadDir(snap_dir, "pg_logical/snapshots")) != NULL)
1886 : {
1887 : uint32 hi;
1888 : uint32 lo;
1889 : XLogRecPtr lsn;
1890 : struct stat statbuf;
1891 :
1892 66 : if (strcmp(snap_de->d_name, ".") == 0 ||
1893 22 : strcmp(snap_de->d_name, "..") == 0)
1894 88 : continue;
1895 :
1896 0 : snprintf(path, sizeof(path), "pg_logical/snapshots/%s", snap_de->d_name);
1897 :
1898 0 : if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
1899 : {
1900 0 : elog(DEBUG1, "only regular files expected: %s", path);
1901 0 : continue;
1902 : }
1903 :
1904 : /*
1905 : * temporary filenames from SnapBuildSerialize() include the LSN and
1906 : * everything but are postfixed by .$pid.tmp. We can just remove them
1907 : * the same as other files because there can be none that are
1908 : * currently being written that are older than cutoff.
1909 : *
1910 : * We just log a message if a file doesn't fit the pattern, it's
1911 : * probably some editors lock/state file or similar...
1912 : */
1913 0 : if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
1914 : {
1915 0 : ereport(LOG,
1916 : (errmsg("could not parse file name \"%s\"", path)));
1917 0 : continue;
1918 : }
1919 :
1920 0 : lsn = ((uint64) hi) << 32 | lo;
1921 :
1922 : /* check whether we still need it */
1923 0 : if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
1924 : {
1925 0 : elog(DEBUG1, "removing snapbuild snapshot %s", path);
1926 :
1927 : /*
1928 : * It's not particularly harmful, though strange, if we can't
1929 : * remove the file here. Don't prevent the checkpoint from
1930 : * completing, that'd be a cure worse than the disease.
1931 : */
1932 0 : if (unlink(path) < 0)
1933 : {
1934 0 : ereport(LOG,
1935 : (errcode_for_file_access(),
1936 : errmsg("could not remove file \"%s\": %m",
1937 : path)));
1938 0 : continue;
1939 : }
1940 : }
1941 : }
1942 22 : FreeDir(snap_dir);
1943 22 : }
|