Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * origin.c
4 : * Logical replication progress tracking support.
5 : *
6 : * Copyright (c) 2013-2017, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/logical/origin.c
10 : *
11 : * NOTES
12 : *
13 : * This file provides the following:
14 : * * An infrastructure to name nodes in a replication setup
15 : * * A facility to efficiently store and persist replication progress in an
16 : * efficient and durable manner.
17 : *
18 : * Replication origin consist out of a descriptive, user defined, external
19 : * name and a short, thus space efficient, internal 2 byte one. This split
20 : * exists because replication origin have to be stored in WAL and shared
21 : * memory and long descriptors would be inefficient. For now only use 2 bytes
22 : * for the internal id of a replication origin as it seems unlikely that there
23 : * soon will be more than 65k nodes in one replication setup; and using only
24 : * two bytes allow us to be more space efficient.
25 : *
26 : * Replication progress is tracked in a shared memory table
27 : * (ReplicationState) that's dumped to disk every checkpoint. Entries
28 : * ('slots') in this table are identified by the internal id. That's the case
29 : * because it allows to increase replication progress during crash
30 : * recovery. To allow doing so we store the original LSN (from the originating
31 : * system) of a transaction in the commit record. That allows to recover the
32 : * precise replayed state after crash recovery; without requiring synchronous
33 : * commits. Allowing logical replication to use asynchronous commit is
34 : * generally good for performance, but especially important as it allows a
35 : * single threaded replay process to keep up with a source that has multiple
36 : * backends generating changes concurrently. For efficiency and simplicity
37 : * reasons a backend can setup one replication origin that's from then used as
38 : * the source of changes produced by the backend, until reset again.
39 : *
40 : * This infrastructure is intended to be used in cooperation with logical
41 : * decoding. When replaying from a remote system the configured origin is
42 : * provided to output plugins, allowing prevention of replication loops and
43 : * other filtering.
44 : *
45 : * There are several levels of locking at work:
46 : *
47 : * * To create and drop replication origins an exclusive lock on
48 : * pg_replication_slot is required for the duration. That allows us to
49 : * safely and conflict free assign new origins using a dirty snapshot.
50 : *
51 : * * When creating an in-memory replication progress slot the ReplicationOrigin
52 : * LWLock has to be held exclusively; when iterating over the replication
53 : * progress a shared lock has to be held, the same when advancing the
54 : * replication progress of an individual backend that has not setup as the
55 : * session's replication origin.
56 : *
57 : * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 : * replication progress slot that slot's lwlock has to be held. That's
59 : * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 : * all our platforms, but it also simplifies memory ordering concerns
61 : * between the remote and local lsn. We use a lwlock instead of a spinlock
62 : * so it's less harmful to hold the lock over a WAL write
63 : * (c.f. AdvanceReplicationProgress).
64 : *
65 : * ---------------------------------------------------------------------------
66 : */
67 :
68 : #include "postgres.h"
69 :
70 : #include <unistd.h>
71 : #include <sys/stat.h>
72 :
73 : #include "funcapi.h"
74 : #include "miscadmin.h"
75 :
76 : #include "access/genam.h"
77 : #include "access/heapam.h"
78 : #include "access/htup_details.h"
79 : #include "access/xact.h"
80 :
81 : #include "catalog/indexing.h"
82 : #include "nodes/execnodes.h"
83 :
84 : #include "replication/origin.h"
85 : #include "replication/logical.h"
86 : #include "pgstat.h"
87 : #include "storage/fd.h"
88 : #include "storage/ipc.h"
89 : #include "storage/lmgr.h"
90 : #include "storage/condition_variable.h"
91 : #include "storage/copydir.h"
92 :
93 : #include "utils/builtins.h"
94 : #include "utils/fmgroids.h"
95 : #include "utils/pg_lsn.h"
96 : #include "utils/rel.h"
97 : #include "utils/syscache.h"
98 : #include "utils/tqual.h"
99 :
100 : /*
101 : * Replay progress of a single remote node.
102 : */
103 : typedef struct ReplicationState
104 : {
105 : /*
106 : * Local identifier for the remote node.
107 : */
108 : RepOriginId roident;
109 :
110 : /*
111 : * Location of the latest commit from the remote side.
112 : */
113 : XLogRecPtr remote_lsn;
114 :
115 : /*
116 : * Remember the local lsn of the commit record so we can XLogFlush() to it
117 : * during a checkpoint so we know the commit record actually is safe on
118 : * disk.
119 : */
120 : XLogRecPtr local_lsn;
121 :
122 : /*
123 : * PID of backend that's acquired slot, or 0 if none.
124 : */
125 : int acquired_by;
126 :
127 : /*
128 : * Condition variable that's signalled when acquired_by changes.
129 : */
130 : ConditionVariable origin_cv;
131 :
132 : /*
133 : * Lock protecting remote_lsn and local_lsn.
134 : */
135 : LWLock lock;
136 : } ReplicationState;
137 :
138 : /*
139 : * On disk version of ReplicationState.
140 : */
141 : typedef struct ReplicationStateOnDisk
142 : {
143 : RepOriginId roident;
144 : XLogRecPtr remote_lsn;
145 : } ReplicationStateOnDisk;
146 :
147 :
148 : typedef struct ReplicationStateCtl
149 : {
150 : int tranche_id;
151 : ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
152 : } ReplicationStateCtl;
153 :
154 : /* external variables */
155 : RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
156 : XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr;
157 : TimestampTz replorigin_session_origin_timestamp = 0;
158 :
159 : /*
160 : * Base address into a shared memory array of replication states of size
161 : * max_replication_slots.
162 : *
163 : * XXX: Should we use a separate variable to size this rather than
164 : * max_replication_slots?
165 : */
166 : static ReplicationState *replication_states;
167 : static ReplicationStateCtl *replication_states_ctl;
168 :
169 : /*
170 : * Backend-local, cached element from ReplicationState for use in a backend
171 : * replaying remote commits, so we don't have to search ReplicationState for
172 : * the backends current RepOriginId.
173 : */
174 : static ReplicationState *session_replication_state = NULL;
175 :
176 : /* Magic for on disk files. */
177 : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
178 :
179 : static void
180 0 : replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
181 : {
182 0 : if (!superuser())
183 0 : ereport(ERROR,
184 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
185 : errmsg("only superusers can query or manipulate replication origins")));
186 :
187 0 : if (check_slots && max_replication_slots == 0)
188 0 : ereport(ERROR,
189 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
190 : errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
191 :
192 0 : if (!recoveryOK && RecoveryInProgress())
193 0 : ereport(ERROR,
194 : (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
195 : errmsg("cannot manipulate replication origins during recovery")));
196 :
197 0 : }
198 :
199 :
200 : /* ---------------------------------------------------------------------------
201 : * Functions for working with replication origins themselves.
202 : * ---------------------------------------------------------------------------
203 : */
204 :
205 : /*
206 : * Check for a persistent replication origin identified by name.
207 : *
208 : * Returns InvalidOid if the node isn't known yet and missing_ok is true.
209 : */
210 : RepOriginId
211 6 : replorigin_by_name(char *roname, bool missing_ok)
212 : {
213 : Form_pg_replication_origin ident;
214 6 : Oid roident = InvalidOid;
215 : HeapTuple tuple;
216 : Datum roname_d;
217 :
218 6 : roname_d = CStringGetTextDatum(roname);
219 :
220 6 : tuple = SearchSysCache1(REPLORIGNAME, roname_d);
221 6 : if (HeapTupleIsValid(tuple))
222 : {
223 6 : ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
224 6 : roident = ident->roident;
225 6 : ReleaseSysCache(tuple);
226 : }
227 0 : else if (!missing_ok)
228 0 : elog(ERROR, "cache lookup failed for replication origin '%s'",
229 : roname);
230 :
231 6 : return roident;
232 : }
233 :
234 : /*
235 : * Create a replication origin.
236 : *
237 : * Needs to be called in a transaction.
238 : */
239 : RepOriginId
240 6 : replorigin_create(char *roname)
241 : {
242 : Oid roident;
243 6 : HeapTuple tuple = NULL;
244 : Relation rel;
245 : Datum roname_d;
246 : SnapshotData SnapshotDirty;
247 : SysScanDesc scan;
248 : ScanKeyData key;
249 :
250 6 : roname_d = CStringGetTextDatum(roname);
251 :
252 6 : Assert(IsTransactionState());
253 :
254 : /*
255 : * We need the numeric replication origin to be 16bit wide, so we cannot
256 : * rely on the normal oid allocation. Instead we simply scan
257 : * pg_replication_origin for the first unused id. That's not particularly
258 : * efficient, but this should be a fairly infrequent operation - we can
259 : * easily spend a bit more code on this when it turns out it needs to be
260 : * faster.
261 : *
262 : * We handle concurrency by taking an exclusive lock (allowing reads!)
263 : * over the table for the duration of the search. Because we use a "dirty
264 : * snapshot" we can read rows that other in-progress sessions have
265 : * written, even though they would be invisible with normal snapshots. Due
266 : * to the exclusive lock there's no danger that new rows can appear while
267 : * we're checking.
268 : */
269 6 : InitDirtySnapshot(SnapshotDirty);
270 :
271 6 : rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
272 :
273 8 : for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
274 : {
275 : bool nulls[Natts_pg_replication_origin];
276 : Datum values[Natts_pg_replication_origin];
277 : bool collides;
278 :
279 8 : CHECK_FOR_INTERRUPTS();
280 :
281 8 : ScanKeyInit(&key,
282 : Anum_pg_replication_origin_roident,
283 : BTEqualStrategyNumber, F_OIDEQ,
284 : ObjectIdGetDatum(roident));
285 :
286 8 : scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
287 : true /* indexOK */ ,
288 : &SnapshotDirty,
289 : 1, &key);
290 :
291 8 : collides = HeapTupleIsValid(systable_getnext(scan));
292 :
293 8 : systable_endscan(scan);
294 :
295 8 : if (!collides)
296 : {
297 : /*
298 : * Ok, found an unused roident, insert the new row and do a CCI,
299 : * so our callers can look it up if they want to.
300 : */
301 6 : memset(&nulls, 0, sizeof(nulls));
302 :
303 6 : values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
304 6 : values[Anum_pg_replication_origin_roname - 1] = roname_d;
305 :
306 6 : tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
307 6 : CatalogTupleInsert(rel, tuple);
308 6 : CommandCounterIncrement();
309 6 : break;
310 : }
311 : }
312 :
313 : /* now release lock again, */
314 6 : heap_close(rel, ExclusiveLock);
315 :
316 6 : if (tuple == NULL)
317 0 : ereport(ERROR,
318 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
319 : errmsg("could not find free replication origin OID")));
320 :
321 6 : heap_freetuple(tuple);
322 6 : return roident;
323 : }
324 :
325 :
326 : /*
327 : * Drop replication origin.
328 : *
329 : * Needs to be called in a transaction.
330 : */
331 : void
332 6 : replorigin_drop(RepOriginId roident, bool nowait)
333 : {
334 : HeapTuple tuple;
335 : Relation rel;
336 : int i;
337 :
338 6 : Assert(IsTransactionState());
339 :
340 6 : rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
341 :
342 : restart:
343 6 : tuple = NULL;
344 : /* cleanup the slot state info */
345 6 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
346 :
347 66 : for (i = 0; i < max_replication_slots; i++)
348 : {
349 60 : ReplicationState *state = &replication_states[i];
350 :
351 : /* found our slot */
352 60 : if (state->roident == roident)
353 : {
354 0 : if (state->acquired_by != 0)
355 : {
356 : ConditionVariable *cv;
357 :
358 0 : if (nowait)
359 0 : ereport(ERROR,
360 : (errcode(ERRCODE_OBJECT_IN_USE),
361 : errmsg("could not drop replication origin with OID %d, in use by PID %d",
362 : state->roident,
363 : state->acquired_by)));
364 0 : cv = &state->origin_cv;
365 :
366 0 : LWLockRelease(ReplicationOriginLock);
367 0 : ConditionVariablePrepareToSleep(cv);
368 0 : ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
369 0 : ConditionVariableCancelSleep();
370 0 : goto restart;
371 : }
372 :
373 : /* first WAL log */
374 : {
375 : xl_replorigin_drop xlrec;
376 :
377 0 : xlrec.node_id = roident;
378 0 : XLogBeginInsert();
379 0 : XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
380 0 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
381 : }
382 :
383 : /* then reset the in-memory entry */
384 0 : state->roident = InvalidRepOriginId;
385 0 : state->remote_lsn = InvalidXLogRecPtr;
386 0 : state->local_lsn = InvalidXLogRecPtr;
387 0 : break;
388 : }
389 : }
390 6 : LWLockRelease(ReplicationOriginLock);
391 :
392 6 : tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
393 6 : if (!HeapTupleIsValid(tuple))
394 0 : elog(ERROR, "cache lookup failed for replication origin with oid %u",
395 : roident);
396 :
397 6 : CatalogTupleDelete(rel, &tuple->t_self);
398 6 : ReleaseSysCache(tuple);
399 :
400 6 : CommandCounterIncrement();
401 :
402 : /* now release lock again */
403 6 : heap_close(rel, ExclusiveLock);
404 6 : }
405 :
406 :
407 : /*
408 : * Lookup replication origin via it's oid and return the name.
409 : *
410 : * The external name is palloc'd in the calling context.
411 : *
412 : * Returns true if the origin is known, false otherwise.
413 : */
414 : bool
415 0 : replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
416 : {
417 : HeapTuple tuple;
418 : Form_pg_replication_origin ric;
419 :
420 0 : Assert(OidIsValid((Oid) roident));
421 0 : Assert(roident != InvalidRepOriginId);
422 0 : Assert(roident != DoNotReplicateId);
423 :
424 0 : tuple = SearchSysCache1(REPLORIGIDENT,
425 : ObjectIdGetDatum((Oid) roident));
426 :
427 0 : if (HeapTupleIsValid(tuple))
428 : {
429 0 : ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
430 0 : *roname = text_to_cstring(&ric->roname);
431 0 : ReleaseSysCache(tuple);
432 :
433 0 : return true;
434 : }
435 : else
436 : {
437 0 : *roname = NULL;
438 :
439 0 : if (!missing_ok)
440 0 : elog(ERROR, "cache lookup failed for replication origin with oid %u",
441 : roident);
442 :
443 0 : return false;
444 : }
445 : }
446 :
447 :
448 : /* ---------------------------------------------------------------------------
449 : * Functions for handling replication progress.
450 : * ---------------------------------------------------------------------------
451 : */
452 :
453 : Size
454 30 : ReplicationOriginShmemSize(void)
455 : {
456 30 : Size size = 0;
457 :
458 : /*
459 : * XXX: max_replication_slots is arguably the wrong thing to use, as here
460 : * we keep the replay state of *remote* transactions. But for now it seems
461 : * sufficient to reuse it, lest we introduce a separate GUC.
462 : */
463 30 : if (max_replication_slots == 0)
464 0 : return size;
465 :
466 30 : size = add_size(size, offsetof(ReplicationStateCtl, states));
467 :
468 30 : size = add_size(size,
469 : mul_size(max_replication_slots, sizeof(ReplicationState)));
470 30 : return size;
471 : }
472 :
473 : void
474 10 : ReplicationOriginShmemInit(void)
475 : {
476 : bool found;
477 :
478 10 : if (max_replication_slots == 0)
479 10 : return;
480 :
481 10 : replication_states_ctl = (ReplicationStateCtl *)
482 10 : ShmemInitStruct("ReplicationOriginState",
483 : ReplicationOriginShmemSize(),
484 : &found);
485 10 : replication_states = replication_states_ctl->states;
486 :
487 10 : if (!found)
488 : {
489 : int i;
490 :
491 10 : replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN;
492 :
493 10 : MemSet(replication_states, 0, ReplicationOriginShmemSize());
494 :
495 110 : for (i = 0; i < max_replication_slots; i++)
496 : {
497 100 : LWLockInitialize(&replication_states[i].lock,
498 100 : replication_states_ctl->tranche_id);
499 100 : ConditionVariableInit(&replication_states[i].origin_cv);
500 : }
501 : }
502 :
503 10 : LWLockRegisterTranche(replication_states_ctl->tranche_id,
504 : "replication_origin");
505 : }
506 :
507 : /* ---------------------------------------------------------------------------
508 : * Perform a checkpoint of each replication origin's progress with respect to
509 : * the replayed remote_lsn. Make sure that all transactions we refer to in the
510 : * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
511 : * if the transactions were originally committed asynchronously.
512 : *
513 : * We store checkpoints in the following format:
514 : * +-------+------------------------+------------------+-----+--------+
515 : * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
516 : * +-------+------------------------+------------------+-----+--------+
517 : *
518 : * So its just the magic, followed by the statically sized
519 : * ReplicationStateOnDisk structs. Note that the maximum number of
520 : * ReplicationState is determined by max_replication_slots.
521 : * ---------------------------------------------------------------------------
522 : */
523 : void
524 22 : CheckPointReplicationOrigin(void)
525 : {
526 22 : const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
527 22 : const char *path = "pg_logical/replorigin_checkpoint";
528 : int tmpfd;
529 : int i;
530 22 : uint32 magic = REPLICATION_STATE_MAGIC;
531 : pg_crc32c crc;
532 :
533 22 : if (max_replication_slots == 0)
534 22 : return;
535 :
536 22 : INIT_CRC32C(crc);
537 :
538 : /* make sure no old temp file is remaining */
539 22 : if (unlink(tmppath) < 0 && errno != ENOENT)
540 0 : ereport(PANIC,
541 : (errcode_for_file_access(),
542 : errmsg("could not remove file \"%s\": %m",
543 : tmppath)));
544 :
545 : /*
546 : * no other backend can perform this at the same time, we're protected by
547 : * CheckpointLock.
548 : */
549 22 : tmpfd = OpenTransientFile((char *) tmppath,
550 : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
551 : S_IRUSR | S_IWUSR);
552 22 : if (tmpfd < 0)
553 0 : ereport(PANIC,
554 : (errcode_for_file_access(),
555 : errmsg("could not create file \"%s\": %m",
556 : tmppath)));
557 :
558 : /* write magic */
559 22 : if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
560 : {
561 0 : CloseTransientFile(tmpfd);
562 0 : ereport(PANIC,
563 : (errcode_for_file_access(),
564 : errmsg("could not write to file \"%s\": %m",
565 : tmppath)));
566 : }
567 22 : COMP_CRC32C(crc, &magic, sizeof(magic));
568 :
569 : /* prevent concurrent creations/drops */
570 22 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
571 :
572 : /* write actual data */
573 242 : for (i = 0; i < max_replication_slots; i++)
574 : {
575 : ReplicationStateOnDisk disk_state;
576 220 : ReplicationState *curstate = &replication_states[i];
577 : XLogRecPtr local_lsn;
578 :
579 220 : if (curstate->roident == InvalidRepOriginId)
580 220 : continue;
581 :
582 : /* zero, to avoid uninitialized padding bytes */
583 0 : memset(&disk_state, 0, sizeof(disk_state));
584 :
585 0 : LWLockAcquire(&curstate->lock, LW_SHARED);
586 :
587 0 : disk_state.roident = curstate->roident;
588 :
589 0 : disk_state.remote_lsn = curstate->remote_lsn;
590 0 : local_lsn = curstate->local_lsn;
591 :
592 0 : LWLockRelease(&curstate->lock);
593 :
594 : /* make sure we only write out a commit that's persistent */
595 0 : XLogFlush(local_lsn);
596 :
597 0 : if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
598 : sizeof(disk_state))
599 : {
600 0 : CloseTransientFile(tmpfd);
601 0 : ereport(PANIC,
602 : (errcode_for_file_access(),
603 : errmsg("could not write to file \"%s\": %m",
604 : tmppath)));
605 : }
606 :
607 0 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
608 : }
609 :
610 22 : LWLockRelease(ReplicationOriginLock);
611 :
612 : /* write out the CRC */
613 22 : FIN_CRC32C(crc);
614 22 : if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
615 : {
616 0 : CloseTransientFile(tmpfd);
617 0 : ereport(PANIC,
618 : (errcode_for_file_access(),
619 : errmsg("could not write to file \"%s\": %m",
620 : tmppath)));
621 : }
622 :
623 22 : CloseTransientFile(tmpfd);
624 :
625 : /* fsync, rename to permanent file, fsync file and directory */
626 22 : durable_rename(tmppath, path, PANIC);
627 : }
628 :
629 : /*
630 : * Recover replication replay status from checkpoint data saved earlier by
631 : * CheckPointReplicationOrigin.
632 : *
633 : * This only needs to be called at startup and *not* during every checkpoint
634 : * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
635 : * state thereafter can be recovered by looking at commit records.
636 : */
637 : void
638 6 : StartupReplicationOrigin(void)
639 : {
640 6 : const char *path = "pg_logical/replorigin_checkpoint";
641 : int fd;
642 : int readBytes;
643 6 : uint32 magic = REPLICATION_STATE_MAGIC;
644 6 : int last_state = 0;
645 : pg_crc32c file_crc;
646 : pg_crc32c crc;
647 :
648 : /* don't want to overwrite already existing state */
649 : #ifdef USE_ASSERT_CHECKING
650 : static bool already_started = false;
651 :
652 6 : Assert(!already_started);
653 6 : already_started = true;
654 : #endif
655 :
656 6 : if (max_replication_slots == 0)
657 2 : return;
658 :
659 6 : INIT_CRC32C(crc);
660 :
661 6 : elog(DEBUG2, "starting up replication origin progress state");
662 :
663 6 : fd = OpenTransientFile((char *) path, O_RDONLY | PG_BINARY, 0);
664 :
665 : /*
666 : * might have had max_replication_slots == 0 last run, or we just brought
667 : * up a standby.
668 : */
669 6 : if (fd < 0 && errno == ENOENT)
670 2 : return;
671 4 : else if (fd < 0)
672 0 : ereport(PANIC,
673 : (errcode_for_file_access(),
674 : errmsg("could not open file \"%s\": %m",
675 : path)));
676 :
677 : /* verify magic, that is written even if nothing was active */
678 4 : readBytes = read(fd, &magic, sizeof(magic));
679 4 : if (readBytes != sizeof(magic))
680 0 : ereport(PANIC,
681 : (errmsg("could not read file \"%s\": %m",
682 : path)));
683 4 : COMP_CRC32C(crc, &magic, sizeof(magic));
684 :
685 4 : if (magic != REPLICATION_STATE_MAGIC)
686 0 : ereport(PANIC,
687 : (errmsg("replication checkpoint has wrong magic %u instead of %u",
688 : magic, REPLICATION_STATE_MAGIC)));
689 :
690 : /* we can skip locking here, no other access is possible */
691 :
692 : /* recover individual states, until there are no more to be found */
693 : while (true)
694 : {
695 : ReplicationStateOnDisk disk_state;
696 :
697 4 : readBytes = read(fd, &disk_state, sizeof(disk_state));
698 :
699 : /* no further data */
700 4 : if (readBytes == sizeof(crc))
701 : {
702 : /* not pretty, but simple ... */
703 4 : file_crc = *(pg_crc32c *) &disk_state;
704 4 : break;
705 : }
706 :
707 0 : if (readBytes < 0)
708 : {
709 0 : ereport(PANIC,
710 : (errcode_for_file_access(),
711 : errmsg("could not read file \"%s\": %m",
712 : path)));
713 : }
714 :
715 0 : if (readBytes != sizeof(disk_state))
716 : {
717 0 : ereport(PANIC,
718 : (errcode_for_file_access(),
719 : errmsg("could not read file \"%s\": read %d of %zu",
720 : path, readBytes, sizeof(disk_state))));
721 : }
722 :
723 0 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
724 :
725 0 : if (last_state == max_replication_slots)
726 0 : ereport(PANIC,
727 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
728 : errmsg("could not find free replication state, increase max_replication_slots")));
729 :
730 : /* copy data to shared memory */
731 0 : replication_states[last_state].roident = disk_state.roident;
732 0 : replication_states[last_state].remote_lsn = disk_state.remote_lsn;
733 0 : last_state++;
734 :
735 0 : elog(LOG, "recovered replication state of node %u to %X/%X",
736 : disk_state.roident,
737 : (uint32) (disk_state.remote_lsn >> 32),
738 : (uint32) disk_state.remote_lsn);
739 0 : }
740 :
741 : /* now check checksum */
742 4 : FIN_CRC32C(crc);
743 4 : if (file_crc != crc)
744 0 : ereport(PANIC,
745 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
746 : errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
747 : crc, file_crc)));
748 :
749 4 : CloseTransientFile(fd);
750 : }
751 :
752 : void
753 0 : replorigin_redo(XLogReaderState *record)
754 : {
755 0 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
756 :
757 0 : switch (info)
758 : {
759 : case XLOG_REPLORIGIN_SET:
760 : {
761 0 : xl_replorigin_set *xlrec =
762 : (xl_replorigin_set *) XLogRecGetData(record);
763 :
764 0 : replorigin_advance(xlrec->node_id,
765 : xlrec->remote_lsn, record->EndRecPtr,
766 0 : xlrec->force /* backward */ ,
767 : false /* WAL log */ );
768 0 : break;
769 : }
770 : case XLOG_REPLORIGIN_DROP:
771 : {
772 : xl_replorigin_drop *xlrec;
773 : int i;
774 :
775 0 : xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
776 :
777 0 : for (i = 0; i < max_replication_slots; i++)
778 : {
779 0 : ReplicationState *state = &replication_states[i];
780 :
781 : /* found our slot */
782 0 : if (state->roident == xlrec->node_id)
783 : {
784 : /* reset entry */
785 0 : state->roident = InvalidRepOriginId;
786 0 : state->remote_lsn = InvalidXLogRecPtr;
787 0 : state->local_lsn = InvalidXLogRecPtr;
788 0 : break;
789 : }
790 : }
791 0 : break;
792 : }
793 : default:
794 0 : elog(PANIC, "replorigin_redo: unknown op code %u", info);
795 : }
796 0 : }
797 :
798 :
799 : /*
800 : * Tell the replication origin progress machinery that a commit from 'node'
801 : * that originated at the LSN remote_commit on the remote node was replayed
802 : * successfully and that we don't need to do so again. In combination with
803 : * setting up replorigin_session_origin_lsn and replorigin_session_origin
804 : * that ensures we won't loose knowledge about that after a crash if the
805 : * transaction had a persistent effect (think of asynchronous commits).
806 : *
807 : * local_commit needs to be a local LSN of the commit so that we can make sure
808 : * upon a checkpoint that enough WAL has been persisted to disk.
809 : *
810 : * Needs to be called with a RowExclusiveLock on pg_replication_origin,
811 : * unless running in recovery.
812 : */
813 : void
814 0 : replorigin_advance(RepOriginId node,
815 : XLogRecPtr remote_commit, XLogRecPtr local_commit,
816 : bool go_backward, bool wal_log)
817 : {
818 : int i;
819 0 : ReplicationState *replication_state = NULL;
820 0 : ReplicationState *free_state = NULL;
821 :
822 0 : Assert(node != InvalidRepOriginId);
823 :
824 : /* we don't track DoNotReplicateId */
825 0 : if (node == DoNotReplicateId)
826 0 : return;
827 :
828 : /*
829 : * XXX: For the case where this is called by WAL replay, it'd be more
830 : * efficient to restore into a backend local hashtable and only dump into
831 : * shmem after recovery is finished. Let's wait with implementing that
832 : * till it's shown to be a measurable expense
833 : */
834 :
835 : /* Lock exclusively, as we may have to create a new table entry. */
836 0 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
837 :
838 : /*
839 : * Search for either an existing slot for the origin, or a free one we can
840 : * use.
841 : */
842 0 : for (i = 0; i < max_replication_slots; i++)
843 : {
844 0 : ReplicationState *curstate = &replication_states[i];
845 :
846 : /* remember where to insert if necessary */
847 0 : if (curstate->roident == InvalidRepOriginId &&
848 : free_state == NULL)
849 : {
850 0 : free_state = curstate;
851 0 : continue;
852 : }
853 :
854 : /* not our slot */
855 0 : if (curstate->roident != node)
856 : {
857 0 : continue;
858 : }
859 :
860 : /* ok, found slot */
861 0 : replication_state = curstate;
862 :
863 0 : LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
864 :
865 : /* Make sure it's not used by somebody else */
866 0 : if (replication_state->acquired_by != 0)
867 : {
868 0 : ereport(ERROR,
869 : (errcode(ERRCODE_OBJECT_IN_USE),
870 : errmsg("replication origin with OID %d is already active for PID %d",
871 : replication_state->roident,
872 : replication_state->acquired_by)));
873 : }
874 :
875 0 : break;
876 : }
877 :
878 0 : if (replication_state == NULL && free_state == NULL)
879 0 : ereport(ERROR,
880 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
881 : errmsg("could not find free replication state slot for replication origin with OID %u",
882 : node),
883 : errhint("Increase max_replication_slots and try again.")));
884 :
885 0 : if (replication_state == NULL)
886 : {
887 : /* initialize new slot */
888 0 : LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
889 0 : replication_state = free_state;
890 0 : Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
891 0 : Assert(replication_state->local_lsn == InvalidXLogRecPtr);
892 0 : replication_state->roident = node;
893 : }
894 :
895 0 : Assert(replication_state->roident != InvalidRepOriginId);
896 :
897 : /*
898 : * If somebody "forcefully" sets this slot, WAL log it, so it's durable
899 : * and the standby gets the message. Primarily this will be called during
900 : * WAL replay (of commit records) where no WAL logging is necessary.
901 : */
902 0 : if (wal_log)
903 : {
904 : xl_replorigin_set xlrec;
905 :
906 0 : xlrec.remote_lsn = remote_commit;
907 0 : xlrec.node_id = node;
908 0 : xlrec.force = go_backward;
909 :
910 0 : XLogBeginInsert();
911 0 : XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
912 :
913 0 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
914 : }
915 :
916 : /*
917 : * Due to - harmless - race conditions during a checkpoint we could see
918 : * values here that are older than the ones we already have in memory.
919 : * Don't overwrite those.
920 : */
921 0 : if (go_backward || replication_state->remote_lsn < remote_commit)
922 0 : replication_state->remote_lsn = remote_commit;
923 0 : if (local_commit != InvalidXLogRecPtr &&
924 0 : (go_backward || replication_state->local_lsn < local_commit))
925 0 : replication_state->local_lsn = local_commit;
926 0 : LWLockRelease(&replication_state->lock);
927 :
928 : /*
929 : * Release *after* changing the LSNs, slot isn't acquired and thus could
930 : * otherwise be dropped anytime.
931 : */
932 0 : LWLockRelease(ReplicationOriginLock);
933 : }
934 :
935 :
936 : XLogRecPtr
937 0 : replorigin_get_progress(RepOriginId node, bool flush)
938 : {
939 : int i;
940 0 : XLogRecPtr local_lsn = InvalidXLogRecPtr;
941 0 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
942 :
943 : /* prevent slots from being concurrently dropped */
944 0 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
945 :
946 0 : for (i = 0; i < max_replication_slots; i++)
947 : {
948 : ReplicationState *state;
949 :
950 0 : state = &replication_states[i];
951 :
952 0 : if (state->roident == node)
953 : {
954 0 : LWLockAcquire(&state->lock, LW_SHARED);
955 :
956 0 : remote_lsn = state->remote_lsn;
957 0 : local_lsn = state->local_lsn;
958 :
959 0 : LWLockRelease(&state->lock);
960 :
961 0 : break;
962 : }
963 : }
964 :
965 0 : LWLockRelease(ReplicationOriginLock);
966 :
967 0 : if (flush && local_lsn != InvalidXLogRecPtr)
968 0 : XLogFlush(local_lsn);
969 :
970 0 : return remote_lsn;
971 : }
972 :
973 : /*
974 : * Tear down a (possibly) configured session replication origin during process
975 : * exit.
976 : */
977 : static void
978 0 : ReplicationOriginExitCleanup(int code, Datum arg)
979 : {
980 0 : ConditionVariable *cv = NULL;
981 :
982 0 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
983 :
984 0 : if (session_replication_state != NULL &&
985 0 : session_replication_state->acquired_by == MyProcPid)
986 : {
987 0 : cv = &session_replication_state->origin_cv;
988 :
989 0 : session_replication_state->acquired_by = 0;
990 0 : session_replication_state = NULL;
991 : }
992 :
993 0 : LWLockRelease(ReplicationOriginLock);
994 :
995 0 : if (cv)
996 0 : ConditionVariableBroadcast(cv);
997 0 : }
998 :
999 : /*
1000 : * Setup a replication origin in the shared memory struct if it doesn't
1001 : * already exists and cache access to the specific ReplicationSlot so the
1002 : * array doesn't have to be searched when calling
1003 : * replorigin_session_advance().
1004 : *
1005 : * Obviously only one such cached origin can exist per process and the current
1006 : * cached value can only be set again after the previous value is torn down
1007 : * with replorigin_session_reset().
1008 : */
1009 : void
1010 0 : replorigin_session_setup(RepOriginId node)
1011 : {
1012 : static bool registered_cleanup;
1013 : int i;
1014 0 : int free_slot = -1;
1015 :
1016 0 : if (!registered_cleanup)
1017 : {
1018 0 : on_shmem_exit(ReplicationOriginExitCleanup, 0);
1019 0 : registered_cleanup = true;
1020 : }
1021 :
1022 0 : Assert(max_replication_slots > 0);
1023 :
1024 0 : if (session_replication_state != NULL)
1025 0 : ereport(ERROR,
1026 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1027 : errmsg("cannot setup replication origin when one is already setup")));
1028 :
1029 : /* Lock exclusively, as we may have to create a new table entry. */
1030 0 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1031 :
1032 : /*
1033 : * Search for either an existing slot for the origin, or a free one we can
1034 : * use.
1035 : */
1036 0 : for (i = 0; i < max_replication_slots; i++)
1037 : {
1038 0 : ReplicationState *curstate = &replication_states[i];
1039 :
1040 : /* remember where to insert if necessary */
1041 0 : if (curstate->roident == InvalidRepOriginId &&
1042 : free_slot == -1)
1043 : {
1044 0 : free_slot = i;
1045 0 : continue;
1046 : }
1047 :
1048 : /* not our slot */
1049 0 : if (curstate->roident != node)
1050 0 : continue;
1051 :
1052 0 : else if (curstate->acquired_by != 0)
1053 : {
1054 0 : ereport(ERROR,
1055 : (errcode(ERRCODE_OBJECT_IN_USE),
1056 : errmsg("replication identifier %d is already active for PID %d",
1057 : curstate->roident, curstate->acquired_by)));
1058 : }
1059 :
1060 : /* ok, found slot */
1061 0 : session_replication_state = curstate;
1062 : }
1063 :
1064 :
1065 0 : if (session_replication_state == NULL && free_slot == -1)
1066 0 : ereport(ERROR,
1067 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1068 : errmsg("could not find free replication state slot for replication origin with OID %u",
1069 : node),
1070 : errhint("Increase max_replication_slots and try again.")));
1071 0 : else if (session_replication_state == NULL)
1072 : {
1073 : /* initialize new slot */
1074 0 : session_replication_state = &replication_states[free_slot];
1075 0 : Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
1076 0 : Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
1077 0 : session_replication_state->roident = node;
1078 : }
1079 :
1080 :
1081 0 : Assert(session_replication_state->roident != InvalidRepOriginId);
1082 :
1083 0 : session_replication_state->acquired_by = MyProcPid;
1084 :
1085 0 : LWLockRelease(ReplicationOriginLock);
1086 :
1087 : /* probably this one is pointless */
1088 0 : ConditionVariableBroadcast(&session_replication_state->origin_cv);
1089 0 : }
1090 :
1091 : /*
1092 : * Reset replay state previously setup in this session.
1093 : *
1094 : * This function may only be called if an origin was setup with
1095 : * replorigin_session_setup().
1096 : */
1097 : void
1098 0 : replorigin_session_reset(void)
1099 : {
1100 : ConditionVariable *cv;
1101 :
1102 0 : Assert(max_replication_slots != 0);
1103 :
1104 0 : if (session_replication_state == NULL)
1105 0 : ereport(ERROR,
1106 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1107 : errmsg("no replication origin is configured")));
1108 :
1109 0 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1110 :
1111 0 : session_replication_state->acquired_by = 0;
1112 0 : cv = &session_replication_state->origin_cv;
1113 0 : session_replication_state = NULL;
1114 :
1115 0 : LWLockRelease(ReplicationOriginLock);
1116 :
1117 0 : ConditionVariableBroadcast(cv);
1118 0 : }
1119 :
1120 : /*
1121 : * Do the same work replorigin_advance() does, just on the session's
1122 : * configured origin.
1123 : *
1124 : * This is noticeably cheaper than using replorigin_advance().
1125 : */
1126 : void
1127 0 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
1128 : {
1129 0 : Assert(session_replication_state != NULL);
1130 0 : Assert(session_replication_state->roident != InvalidRepOriginId);
1131 :
1132 0 : LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1133 0 : if (session_replication_state->local_lsn < local_commit)
1134 0 : session_replication_state->local_lsn = local_commit;
1135 0 : if (session_replication_state->remote_lsn < remote_commit)
1136 0 : session_replication_state->remote_lsn = remote_commit;
1137 0 : LWLockRelease(&session_replication_state->lock);
1138 0 : }
1139 :
1140 : /*
1141 : * Ask the machinery about the point up to which we successfully replayed
1142 : * changes from an already setup replication origin.
1143 : */
1144 : XLogRecPtr
1145 0 : replorigin_session_get_progress(bool flush)
1146 : {
1147 : XLogRecPtr remote_lsn;
1148 : XLogRecPtr local_lsn;
1149 :
1150 0 : Assert(session_replication_state != NULL);
1151 :
1152 0 : LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1153 0 : remote_lsn = session_replication_state->remote_lsn;
1154 0 : local_lsn = session_replication_state->local_lsn;
1155 0 : LWLockRelease(&session_replication_state->lock);
1156 :
1157 0 : if (flush && local_lsn != InvalidXLogRecPtr)
1158 0 : XLogFlush(local_lsn);
1159 :
1160 0 : return remote_lsn;
1161 : }
1162 :
1163 :
1164 :
1165 : /* ---------------------------------------------------------------------------
1166 : * SQL functions for working with replication origin.
1167 : *
1168 : * These mostly should be fairly short wrappers around more generic functions.
1169 : * ---------------------------------------------------------------------------
1170 : */
1171 :
1172 : /*
1173 : * Create replication origin for the passed in name, and return the assigned
1174 : * oid.
1175 : */
1176 : Datum
1177 0 : pg_replication_origin_create(PG_FUNCTION_ARGS)
1178 : {
1179 : char *name;
1180 : RepOriginId roident;
1181 :
1182 0 : replorigin_check_prerequisites(false, false);
1183 :
1184 0 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1185 0 : roident = replorigin_create(name);
1186 :
1187 0 : pfree(name);
1188 :
1189 0 : PG_RETURN_OID(roident);
1190 : }
1191 :
1192 : /*
1193 : * Drop replication origin.
1194 : */
1195 : Datum
1196 0 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
1197 : {
1198 : char *name;
1199 : RepOriginId roident;
1200 :
1201 0 : replorigin_check_prerequisites(false, false);
1202 :
1203 0 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1204 :
1205 0 : roident = replorigin_by_name(name, false);
1206 0 : Assert(OidIsValid(roident));
1207 :
1208 0 : replorigin_drop(roident, true);
1209 :
1210 0 : pfree(name);
1211 :
1212 0 : PG_RETURN_VOID();
1213 : }
1214 :
1215 : /*
1216 : * Return oid of a replication origin.
1217 : */
1218 : Datum
1219 0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
1220 : {
1221 : char *name;
1222 : RepOriginId roident;
1223 :
1224 0 : replorigin_check_prerequisites(false, false);
1225 :
1226 0 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1227 0 : roident = replorigin_by_name(name, true);
1228 :
1229 0 : pfree(name);
1230 :
1231 0 : if (OidIsValid(roident))
1232 0 : PG_RETURN_OID(roident);
1233 0 : PG_RETURN_NULL();
1234 : }
1235 :
1236 : /*
1237 : * Setup a replication origin for this session.
1238 : */
1239 : Datum
1240 0 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
1241 : {
1242 : char *name;
1243 : RepOriginId origin;
1244 :
1245 0 : replorigin_check_prerequisites(true, false);
1246 :
1247 0 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1248 0 : origin = replorigin_by_name(name, false);
1249 0 : replorigin_session_setup(origin);
1250 :
1251 0 : replorigin_session_origin = origin;
1252 :
1253 0 : pfree(name);
1254 :
1255 0 : PG_RETURN_VOID();
1256 : }
1257 :
1258 : /*
1259 : * Reset previously setup origin in this session
1260 : */
1261 : Datum
1262 0 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
1263 : {
1264 0 : replorigin_check_prerequisites(true, false);
1265 :
1266 0 : replorigin_session_reset();
1267 :
1268 0 : replorigin_session_origin = InvalidRepOriginId;
1269 0 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
1270 0 : replorigin_session_origin_timestamp = 0;
1271 :
1272 0 : PG_RETURN_VOID();
1273 : }
1274 :
1275 : /*
1276 : * Has a replication origin been setup for this session.
1277 : */
1278 : Datum
1279 0 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
1280 : {
1281 0 : replorigin_check_prerequisites(false, false);
1282 :
1283 0 : PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
1284 : }
1285 :
1286 :
1287 : /*
1288 : * Return the replication progress for origin setup in the current session.
1289 : *
1290 : * If 'flush' is set to true it is ensured that the returned value corresponds
1291 : * to a local transaction that has been flushed. This is useful if asynchronous
1292 : * commits are used when replaying replicated transactions.
1293 : */
1294 : Datum
1295 0 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
1296 : {
1297 0 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1298 0 : bool flush = PG_GETARG_BOOL(0);
1299 :
1300 0 : replorigin_check_prerequisites(true, false);
1301 :
1302 0 : if (session_replication_state == NULL)
1303 0 : ereport(ERROR,
1304 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1305 : errmsg("no replication origin is configured")));
1306 :
1307 0 : remote_lsn = replorigin_session_get_progress(flush);
1308 :
1309 0 : if (remote_lsn == InvalidXLogRecPtr)
1310 0 : PG_RETURN_NULL();
1311 :
1312 0 : PG_RETURN_LSN(remote_lsn);
1313 : }
1314 :
1315 : Datum
1316 0 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
1317 : {
1318 0 : XLogRecPtr location = PG_GETARG_LSN(0);
1319 :
1320 0 : replorigin_check_prerequisites(true, false);
1321 :
1322 0 : if (session_replication_state == NULL)
1323 0 : ereport(ERROR,
1324 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1325 : errmsg("no replication origin is configured")));
1326 :
1327 0 : replorigin_session_origin_lsn = location;
1328 0 : replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
1329 :
1330 0 : PG_RETURN_VOID();
1331 : }
1332 :
1333 : Datum
1334 0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
1335 : {
1336 0 : replorigin_check_prerequisites(true, false);
1337 :
1338 0 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
1339 0 : replorigin_session_origin_timestamp = 0;
1340 :
1341 0 : PG_RETURN_VOID();
1342 : }
1343 :
1344 :
1345 : Datum
1346 0 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
1347 : {
1348 0 : text *name = PG_GETARG_TEXT_PP(0);
1349 0 : XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1350 : RepOriginId node;
1351 :
1352 0 : replorigin_check_prerequisites(true, false);
1353 :
1354 : /* lock to prevent the replication origin from vanishing */
1355 0 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1356 :
1357 0 : node = replorigin_by_name(text_to_cstring(name), false);
1358 :
1359 : /*
1360 : * Can't sensibly pass a local commit to be flushed at checkpoint - this
1361 : * xact hasn't committed yet. This is why this function should be used to
1362 : * set up the initial replication state, but not for replay.
1363 : */
1364 0 : replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1365 : true /* go backward */ , true /* WAL log */ );
1366 :
1367 0 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1368 :
1369 0 : PG_RETURN_VOID();
1370 : }
1371 :
1372 :
1373 : /*
1374 : * Return the replication progress for an individual replication origin.
1375 : *
1376 : * If 'flush' is set to true it is ensured that the returned value corresponds
1377 : * to a local transaction that has been flushed. This is useful if asynchronous
1378 : * commits are used when replaying replicated transactions.
1379 : */
1380 : Datum
1381 0 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
1382 : {
1383 : char *name;
1384 : bool flush;
1385 : RepOriginId roident;
1386 0 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1387 :
1388 0 : replorigin_check_prerequisites(true, true);
1389 :
1390 0 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1391 0 : flush = PG_GETARG_BOOL(1);
1392 :
1393 0 : roident = replorigin_by_name(name, false);
1394 0 : Assert(OidIsValid(roident));
1395 :
1396 0 : remote_lsn = replorigin_get_progress(roident, flush);
1397 :
1398 0 : if (remote_lsn == InvalidXLogRecPtr)
1399 0 : PG_RETURN_NULL();
1400 :
1401 0 : PG_RETURN_LSN(remote_lsn);
1402 : }
1403 :
1404 :
1405 : Datum
1406 0 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
1407 : {
1408 0 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1409 : TupleDesc tupdesc;
1410 : Tuplestorestate *tupstore;
1411 : MemoryContext per_query_ctx;
1412 : MemoryContext oldcontext;
1413 : int i;
1414 : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
1415 :
1416 : /* we we want to return 0 rows if slot is set to zero */
1417 0 : replorigin_check_prerequisites(false, true);
1418 :
1419 0 : if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1420 0 : ereport(ERROR,
1421 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1422 : errmsg("set-valued function called in context that cannot accept a set")));
1423 0 : if (!(rsinfo->allowedModes & SFRM_Materialize))
1424 0 : ereport(ERROR,
1425 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1426 : errmsg("materialize mode required, but it is not allowed in this context")));
1427 0 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1428 0 : elog(ERROR, "return type must be a row type");
1429 :
1430 0 : if (tupdesc->natts != REPLICATION_ORIGIN_PROGRESS_COLS)
1431 0 : elog(ERROR, "wrong function definition");
1432 :
1433 0 : per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1434 0 : oldcontext = MemoryContextSwitchTo(per_query_ctx);
1435 :
1436 0 : tupstore = tuplestore_begin_heap(true, false, work_mem);
1437 0 : rsinfo->returnMode = SFRM_Materialize;
1438 0 : rsinfo->setResult = tupstore;
1439 0 : rsinfo->setDesc = tupdesc;
1440 :
1441 0 : MemoryContextSwitchTo(oldcontext);
1442 :
1443 :
1444 : /* prevent slots from being concurrently dropped */
1445 0 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1446 :
1447 : /*
1448 : * Iterate through all possible replication_states, display if they are
1449 : * filled. Note that we do not take any locks, so slightly corrupted/out
1450 : * of date values are a possibility.
1451 : */
1452 0 : for (i = 0; i < max_replication_slots; i++)
1453 : {
1454 : ReplicationState *state;
1455 : Datum values[REPLICATION_ORIGIN_PROGRESS_COLS];
1456 : bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
1457 : char *roname;
1458 :
1459 0 : state = &replication_states[i];
1460 :
1461 : /* unused slot, nothing to display */
1462 0 : if (state->roident == InvalidRepOriginId)
1463 0 : continue;
1464 :
1465 0 : memset(values, 0, sizeof(values));
1466 0 : memset(nulls, 1, sizeof(nulls));
1467 :
1468 0 : values[0] = ObjectIdGetDatum(state->roident);
1469 0 : nulls[0] = false;
1470 :
1471 : /*
1472 : * We're not preventing the origin to be dropped concurrently, so
1473 : * silently accept that it might be gone.
1474 : */
1475 0 : if (replorigin_by_oid(state->roident, true,
1476 : &roname))
1477 : {
1478 0 : values[1] = CStringGetTextDatum(roname);
1479 0 : nulls[1] = false;
1480 : }
1481 :
1482 0 : LWLockAcquire(&state->lock, LW_SHARED);
1483 :
1484 0 : values[2] = LSNGetDatum(state->remote_lsn);
1485 0 : nulls[2] = false;
1486 :
1487 0 : values[3] = LSNGetDatum(state->local_lsn);
1488 0 : nulls[3] = false;
1489 :
1490 0 : LWLockRelease(&state->lock);
1491 :
1492 0 : tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1493 : }
1494 :
1495 : tuplestore_donestoring(tupstore);
1496 :
1497 0 : LWLockRelease(ReplicationOriginLock);
1498 :
1499 : #undef REPLICATION_ORIGIN_PROGRESS_COLS
1500 :
1501 0 : return (Datum) 0;
1502 : }
|