LCOV - code coverage report
Current view: top level - src/backend/replication/logical - origin.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 116 464 25.0 %
Date: 2017-09-29 13:40:31 Functions: 7 29 24.1 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * origin.c
       4             :  *    Logical replication progress tracking support.
       5             :  *
       6             :  * Copyright (c) 2013-2017, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *    src/backend/replication/logical/origin.c
      10             :  *
      11             :  * NOTES
      12             :  *
      13             :  * This file provides the following:
      14             :  * * An infrastructure to name nodes in a replication setup
      15             :  * * A facility to efficiently store and persist replication progress in an
      16             :  *   efficient and durable manner.
      17             :  *
      18             :  * Replication origin consist out of a descriptive, user defined, external
      19             :  * name and a short, thus space efficient, internal 2 byte one. This split
      20             :  * exists because replication origin have to be stored in WAL and shared
      21             :  * memory and long descriptors would be inefficient.  For now only use 2 bytes
      22             :  * for the internal id of a replication origin as it seems unlikely that there
      23             :  * soon will be more than 65k nodes in one replication setup; and using only
      24             :  * two bytes allow us to be more space efficient.
      25             :  *
      26             :  * Replication progress is tracked in a shared memory table
      27             :  * (ReplicationState) that's dumped to disk every checkpoint. Entries
      28             :  * ('slots') in this table are identified by the internal id. That's the case
      29             :  * because it allows to increase replication progress during crash
      30             :  * recovery. To allow doing so we store the original LSN (from the originating
      31             :  * system) of a transaction in the commit record. That allows to recover the
      32             :  * precise replayed state after crash recovery; without requiring synchronous
      33             :  * commits. Allowing logical replication to use asynchronous commit is
      34             :  * generally good for performance, but especially important as it allows a
      35             :  * single threaded replay process to keep up with a source that has multiple
      36             :  * backends generating changes concurrently.  For efficiency and simplicity
      37             :  * reasons a backend can setup one replication origin that's from then used as
      38             :  * the source of changes produced by the backend, until reset again.
      39             :  *
      40             :  * This infrastructure is intended to be used in cooperation with logical
      41             :  * decoding. When replaying from a remote system the configured origin is
      42             :  * provided to output plugins, allowing prevention of replication loops and
      43             :  * other filtering.
      44             :  *
      45             :  * There are several levels of locking at work:
      46             :  *
      47             :  * * To create and drop replication origins an exclusive lock on
      48             :  *   pg_replication_slot is required for the duration. That allows us to
      49             :  *   safely and conflict free assign new origins using a dirty snapshot.
      50             :  *
      51             :  * * When creating an in-memory replication progress slot the ReplicationOrigin
      52             :  *   LWLock has to be held exclusively; when iterating over the replication
      53             :  *   progress a shared lock has to be held, the same when advancing the
      54             :  *   replication progress of an individual backend that has not setup as the
      55             :  *   session's replication origin.
      56             :  *
      57             :  * * When manipulating or looking at the remote_lsn and local_lsn fields of a
      58             :  *   replication progress slot that slot's lwlock has to be held. That's
      59             :  *   primarily because we do not assume 8 byte writes (the LSN) is atomic on
      60             :  *   all our platforms, but it also simplifies memory ordering concerns
      61             :  *   between the remote and local lsn. We use a lwlock instead of a spinlock
      62             :  *   so it's less harmful to hold the lock over a WAL write
      63             :  *   (c.f. AdvanceReplicationProgress).
      64             :  *
      65             :  * ---------------------------------------------------------------------------
      66             :  */
      67             : 
      68             : #include "postgres.h"
      69             : 
      70             : #include <unistd.h>
      71             : #include <sys/stat.h>
      72             : 
      73             : #include "funcapi.h"
      74             : #include "miscadmin.h"
      75             : 
      76             : #include "access/genam.h"
      77             : #include "access/heapam.h"
      78             : #include "access/htup_details.h"
      79             : #include "access/xact.h"
      80             : 
      81             : #include "catalog/indexing.h"
      82             : #include "nodes/execnodes.h"
      83             : 
      84             : #include "replication/origin.h"
      85             : #include "replication/logical.h"
      86             : #include "pgstat.h"
      87             : #include "storage/fd.h"
      88             : #include "storage/ipc.h"
      89             : #include "storage/lmgr.h"
      90             : #include "storage/condition_variable.h"
      91             : #include "storage/copydir.h"
      92             : 
      93             : #include "utils/builtins.h"
      94             : #include "utils/fmgroids.h"
      95             : #include "utils/pg_lsn.h"
      96             : #include "utils/rel.h"
      97             : #include "utils/syscache.h"
      98             : #include "utils/tqual.h"
      99             : 
     100             : /*
     101             :  * Replay progress of a single remote node.
     102             :  */
     103             : typedef struct ReplicationState
     104             : {
     105             :     /*
     106             :      * Local identifier for the remote node.
     107             :      */
     108             :     RepOriginId roident;
     109             : 
     110             :     /*
     111             :      * Location of the latest commit from the remote side.
     112             :      */
     113             :     XLogRecPtr  remote_lsn;
     114             : 
     115             :     /*
     116             :      * Remember the local lsn of the commit record so we can XLogFlush() to it
     117             :      * during a checkpoint so we know the commit record actually is safe on
     118             :      * disk.
     119             :      */
     120             :     XLogRecPtr  local_lsn;
     121             : 
     122             :     /*
     123             :      * PID of backend that's acquired slot, or 0 if none.
     124             :      */
     125             :     int         acquired_by;
     126             : 
     127             :     /*
     128             :      * Condition variable that's signalled when acquired_by changes.
     129             :      */
     130             :     ConditionVariable origin_cv;
     131             : 
     132             :     /*
     133             :      * Lock protecting remote_lsn and local_lsn.
     134             :      */
     135             :     LWLock      lock;
     136             : } ReplicationState;
     137             : 
     138             : /*
     139             :  * On disk version of ReplicationState.
     140             :  */
     141             : typedef struct ReplicationStateOnDisk
     142             : {
     143             :     RepOriginId roident;
     144             :     XLogRecPtr  remote_lsn;
     145             : } ReplicationStateOnDisk;
     146             : 
     147             : 
     148             : typedef struct ReplicationStateCtl
     149             : {
     150             :     int         tranche_id;
     151             :     ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
     152             : } ReplicationStateCtl;
     153             : 
     154             : /* external variables */
     155             : RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
     156             : XLogRecPtr  replorigin_session_origin_lsn = InvalidXLogRecPtr;
     157             : TimestampTz replorigin_session_origin_timestamp = 0;
     158             : 
     159             : /*
     160             :  * Base address into a shared memory array of replication states of size
     161             :  * max_replication_slots.
     162             :  *
     163             :  * XXX: Should we use a separate variable to size this rather than
     164             :  * max_replication_slots?
     165             :  */
     166             : static ReplicationState *replication_states;
     167             : static ReplicationStateCtl *replication_states_ctl;
     168             : 
     169             : /*
     170             :  * Backend-local, cached element from ReplicationState for use in a backend
     171             :  * replaying remote commits, so we don't have to search ReplicationState for
     172             :  * the backends current RepOriginId.
     173             :  */
     174             : static ReplicationState *session_replication_state = NULL;
     175             : 
     176             : /* Magic for on disk files. */
     177             : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
     178             : 
     179             : static void
     180           0 : replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
     181             : {
     182           0 :     if (!superuser())
     183           0 :         ereport(ERROR,
     184             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     185             :                  errmsg("only superusers can query or manipulate replication origins")));
     186             : 
     187           0 :     if (check_slots && max_replication_slots == 0)
     188           0 :         ereport(ERROR,
     189             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     190             :                  errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
     191             : 
     192           0 :     if (!recoveryOK && RecoveryInProgress())
     193           0 :         ereport(ERROR,
     194             :                 (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
     195             :                  errmsg("cannot manipulate replication origins during recovery")));
     196             : 
     197           0 : }
     198             : 
     199             : 
     200             : /* ---------------------------------------------------------------------------
     201             :  * Functions for working with replication origins themselves.
     202             :  * ---------------------------------------------------------------------------
     203             :  */
     204             : 
     205             : /*
     206             :  * Check for a persistent replication origin identified by name.
     207             :  *
     208             :  * Returns InvalidOid if the node isn't known yet and missing_ok is true.
     209             :  */
     210             : RepOriginId
     211           6 : replorigin_by_name(char *roname, bool missing_ok)
     212             : {
     213             :     Form_pg_replication_origin ident;
     214           6 :     Oid         roident = InvalidOid;
     215             :     HeapTuple   tuple;
     216             :     Datum       roname_d;
     217             : 
     218           6 :     roname_d = CStringGetTextDatum(roname);
     219             : 
     220           6 :     tuple = SearchSysCache1(REPLORIGNAME, roname_d);
     221           6 :     if (HeapTupleIsValid(tuple))
     222             :     {
     223           6 :         ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
     224           6 :         roident = ident->roident;
     225           6 :         ReleaseSysCache(tuple);
     226             :     }
     227           0 :     else if (!missing_ok)
     228           0 :         elog(ERROR, "cache lookup failed for replication origin '%s'",
     229             :              roname);
     230             : 
     231           6 :     return roident;
     232             : }
     233             : 
     234             : /*
     235             :  * Create a replication origin.
     236             :  *
     237             :  * Needs to be called in a transaction.
     238             :  */
     239             : RepOriginId
     240           6 : replorigin_create(char *roname)
     241             : {
     242             :     Oid         roident;
     243           6 :     HeapTuple   tuple = NULL;
     244             :     Relation    rel;
     245             :     Datum       roname_d;
     246             :     SnapshotData SnapshotDirty;
     247             :     SysScanDesc scan;
     248             :     ScanKeyData key;
     249             : 
     250           6 :     roname_d = CStringGetTextDatum(roname);
     251             : 
     252           6 :     Assert(IsTransactionState());
     253             : 
     254             :     /*
     255             :      * We need the numeric replication origin to be 16bit wide, so we cannot
     256             :      * rely on the normal oid allocation. Instead we simply scan
     257             :      * pg_replication_origin for the first unused id. That's not particularly
     258             :      * efficient, but this should be a fairly infrequent operation - we can
     259             :      * easily spend a bit more code on this when it turns out it needs to be
     260             :      * faster.
     261             :      *
     262             :      * We handle concurrency by taking an exclusive lock (allowing reads!)
     263             :      * over the table for the duration of the search. Because we use a "dirty
     264             :      * snapshot" we can read rows that other in-progress sessions have
     265             :      * written, even though they would be invisible with normal snapshots. Due
     266             :      * to the exclusive lock there's no danger that new rows can appear while
     267             :      * we're checking.
     268             :      */
     269           6 :     InitDirtySnapshot(SnapshotDirty);
     270             : 
     271           6 :     rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
     272             : 
     273           8 :     for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
     274             :     {
     275             :         bool        nulls[Natts_pg_replication_origin];
     276             :         Datum       values[Natts_pg_replication_origin];
     277             :         bool        collides;
     278             : 
     279           8 :         CHECK_FOR_INTERRUPTS();
     280             : 
     281           8 :         ScanKeyInit(&key,
     282             :                     Anum_pg_replication_origin_roident,
     283             :                     BTEqualStrategyNumber, F_OIDEQ,
     284             :                     ObjectIdGetDatum(roident));
     285             : 
     286           8 :         scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
     287             :                                   true /* indexOK */ ,
     288             :                                   &SnapshotDirty,
     289             :                                   1, &key);
     290             : 
     291           8 :         collides = HeapTupleIsValid(systable_getnext(scan));
     292             : 
     293           8 :         systable_endscan(scan);
     294             : 
     295           8 :         if (!collides)
     296             :         {
     297             :             /*
     298             :              * Ok, found an unused roident, insert the new row and do a CCI,
     299             :              * so our callers can look it up if they want to.
     300             :              */
     301           6 :             memset(&nulls, 0, sizeof(nulls));
     302             : 
     303           6 :             values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
     304           6 :             values[Anum_pg_replication_origin_roname - 1] = roname_d;
     305             : 
     306           6 :             tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     307           6 :             CatalogTupleInsert(rel, tuple);
     308           6 :             CommandCounterIncrement();
     309           6 :             break;
     310             :         }
     311             :     }
     312             : 
     313             :     /* now release lock again,  */
     314           6 :     heap_close(rel, ExclusiveLock);
     315             : 
     316           6 :     if (tuple == NULL)
     317           0 :         ereport(ERROR,
     318             :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     319             :                  errmsg("could not find free replication origin OID")));
     320             : 
     321           6 :     heap_freetuple(tuple);
     322           6 :     return roident;
     323             : }
     324             : 
     325             : 
     326             : /*
     327             :  * Drop replication origin.
     328             :  *
     329             :  * Needs to be called in a transaction.
     330             :  */
     331             : void
     332           6 : replorigin_drop(RepOriginId roident, bool nowait)
     333             : {
     334             :     HeapTuple   tuple;
     335             :     Relation    rel;
     336             :     int         i;
     337             : 
     338           6 :     Assert(IsTransactionState());
     339             : 
     340           6 :     rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
     341             : 
     342             : restart:
     343           6 :     tuple = NULL;
     344             :     /* cleanup the slot state info */
     345           6 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
     346             : 
     347          66 :     for (i = 0; i < max_replication_slots; i++)
     348             :     {
     349          60 :         ReplicationState *state = &replication_states[i];
     350             : 
     351             :         /* found our slot */
     352          60 :         if (state->roident == roident)
     353             :         {
     354           0 :             if (state->acquired_by != 0)
     355             :             {
     356             :                 ConditionVariable *cv;
     357             : 
     358           0 :                 if (nowait)
     359           0 :                     ereport(ERROR,
     360             :                             (errcode(ERRCODE_OBJECT_IN_USE),
     361             :                              errmsg("could not drop replication origin with OID %d, in use by PID %d",
     362             :                                     state->roident,
     363             :                                     state->acquired_by)));
     364           0 :                 cv = &state->origin_cv;
     365             : 
     366           0 :                 LWLockRelease(ReplicationOriginLock);
     367           0 :                 ConditionVariablePrepareToSleep(cv);
     368           0 :                 ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
     369           0 :                 ConditionVariableCancelSleep();
     370           0 :                 goto restart;
     371             :             }
     372             : 
     373             :             /* first WAL log */
     374             :             {
     375             :                 xl_replorigin_drop xlrec;
     376             : 
     377           0 :                 xlrec.node_id = roident;
     378           0 :                 XLogBeginInsert();
     379           0 :                 XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
     380           0 :                 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
     381             :             }
     382             : 
     383             :             /* then reset the in-memory entry */
     384           0 :             state->roident = InvalidRepOriginId;
     385           0 :             state->remote_lsn = InvalidXLogRecPtr;
     386           0 :             state->local_lsn = InvalidXLogRecPtr;
     387           0 :             break;
     388             :         }
     389             :     }
     390           6 :     LWLockRelease(ReplicationOriginLock);
     391             : 
     392           6 :     tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
     393           6 :     if (!HeapTupleIsValid(tuple))
     394           0 :         elog(ERROR, "cache lookup failed for replication origin with oid %u",
     395             :              roident);
     396             : 
     397           6 :     CatalogTupleDelete(rel, &tuple->t_self);
     398           6 :     ReleaseSysCache(tuple);
     399             : 
     400           6 :     CommandCounterIncrement();
     401             : 
     402             :     /* now release lock again */
     403           6 :     heap_close(rel, ExclusiveLock);
     404           6 : }
     405             : 
     406             : 
     407             : /*
     408             :  * Lookup replication origin via it's oid and return the name.
     409             :  *
     410             :  * The external name is palloc'd in the calling context.
     411             :  *
     412             :  * Returns true if the origin is known, false otherwise.
     413             :  */
     414             : bool
     415           0 : replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
     416             : {
     417             :     HeapTuple   tuple;
     418             :     Form_pg_replication_origin ric;
     419             : 
     420           0 :     Assert(OidIsValid((Oid) roident));
     421           0 :     Assert(roident != InvalidRepOriginId);
     422           0 :     Assert(roident != DoNotReplicateId);
     423             : 
     424           0 :     tuple = SearchSysCache1(REPLORIGIDENT,
     425             :                             ObjectIdGetDatum((Oid) roident));
     426             : 
     427           0 :     if (HeapTupleIsValid(tuple))
     428             :     {
     429           0 :         ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
     430           0 :         *roname = text_to_cstring(&ric->roname);
     431           0 :         ReleaseSysCache(tuple);
     432             : 
     433           0 :         return true;
     434             :     }
     435             :     else
     436             :     {
     437           0 :         *roname = NULL;
     438             : 
     439           0 :         if (!missing_ok)
     440           0 :             elog(ERROR, "cache lookup failed for replication origin with oid %u",
     441             :                  roident);
     442             : 
     443           0 :         return false;
     444             :     }
     445             : }
     446             : 
     447             : 
     448             : /* ---------------------------------------------------------------------------
     449             :  * Functions for handling replication progress.
     450             :  * ---------------------------------------------------------------------------
     451             :  */
     452             : 
     453             : Size
     454          30 : ReplicationOriginShmemSize(void)
     455             : {
     456          30 :     Size        size = 0;
     457             : 
     458             :     /*
     459             :      * XXX: max_replication_slots is arguably the wrong thing to use, as here
     460             :      * we keep the replay state of *remote* transactions. But for now it seems
     461             :      * sufficient to reuse it, lest we introduce a separate GUC.
     462             :      */
     463          30 :     if (max_replication_slots == 0)
     464           0 :         return size;
     465             : 
     466          30 :     size = add_size(size, offsetof(ReplicationStateCtl, states));
     467             : 
     468          30 :     size = add_size(size,
     469             :                     mul_size(max_replication_slots, sizeof(ReplicationState)));
     470          30 :     return size;
     471             : }
     472             : 
     473             : void
     474          10 : ReplicationOriginShmemInit(void)
     475             : {
     476             :     bool        found;
     477             : 
     478          10 :     if (max_replication_slots == 0)
     479          10 :         return;
     480             : 
     481          10 :     replication_states_ctl = (ReplicationStateCtl *)
     482          10 :         ShmemInitStruct("ReplicationOriginState",
     483             :                         ReplicationOriginShmemSize(),
     484             :                         &found);
     485          10 :     replication_states = replication_states_ctl->states;
     486             : 
     487          10 :     if (!found)
     488             :     {
     489             :         int         i;
     490             : 
     491          10 :         replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN;
     492             : 
     493          10 :         MemSet(replication_states, 0, ReplicationOriginShmemSize());
     494             : 
     495         110 :         for (i = 0; i < max_replication_slots; i++)
     496             :         {
     497         100 :             LWLockInitialize(&replication_states[i].lock,
     498         100 :                              replication_states_ctl->tranche_id);
     499         100 :             ConditionVariableInit(&replication_states[i].origin_cv);
     500             :         }
     501             :     }
     502             : 
     503          10 :     LWLockRegisterTranche(replication_states_ctl->tranche_id,
     504             :                           "replication_origin");
     505             : }
     506             : 
     507             : /* ---------------------------------------------------------------------------
     508             :  * Perform a checkpoint of each replication origin's progress with respect to
     509             :  * the replayed remote_lsn. Make sure that all transactions we refer to in the
     510             :  * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
     511             :  * if the transactions were originally committed asynchronously.
     512             :  *
     513             :  * We store checkpoints in the following format:
     514             :  * +-------+------------------------+------------------+-----+--------+
     515             :  * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
     516             :  * +-------+------------------------+------------------+-----+--------+
     517             :  *
     518             :  * So its just the magic, followed by the statically sized
     519             :  * ReplicationStateOnDisk structs. Note that the maximum number of
     520             :  * ReplicationState is determined by max_replication_slots.
     521             :  * ---------------------------------------------------------------------------
     522             :  */
     523             : void
     524          22 : CheckPointReplicationOrigin(void)
     525             : {
     526          22 :     const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
     527          22 :     const char *path = "pg_logical/replorigin_checkpoint";
     528             :     int         tmpfd;
     529             :     int         i;
     530          22 :     uint32      magic = REPLICATION_STATE_MAGIC;
     531             :     pg_crc32c   crc;
     532             : 
     533          22 :     if (max_replication_slots == 0)
     534          22 :         return;
     535             : 
     536          22 :     INIT_CRC32C(crc);
     537             : 
     538             :     /* make sure no old temp file is remaining */
     539          22 :     if (unlink(tmppath) < 0 && errno != ENOENT)
     540           0 :         ereport(PANIC,
     541             :                 (errcode_for_file_access(),
     542             :                  errmsg("could not remove file \"%s\": %m",
     543             :                         tmppath)));
     544             : 
     545             :     /*
     546             :      * no other backend can perform this at the same time, we're protected by
     547             :      * CheckpointLock.
     548             :      */
     549          22 :     tmpfd = OpenTransientFile((char *) tmppath,
     550             :                               O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
     551             :                               S_IRUSR | S_IWUSR);
     552          22 :     if (tmpfd < 0)
     553           0 :         ereport(PANIC,
     554             :                 (errcode_for_file_access(),
     555             :                  errmsg("could not create file \"%s\": %m",
     556             :                         tmppath)));
     557             : 
     558             :     /* write magic */
     559          22 :     if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
     560             :     {
     561           0 :         CloseTransientFile(tmpfd);
     562           0 :         ereport(PANIC,
     563             :                 (errcode_for_file_access(),
     564             :                  errmsg("could not write to file \"%s\": %m",
     565             :                         tmppath)));
     566             :     }
     567          22 :     COMP_CRC32C(crc, &magic, sizeof(magic));
     568             : 
     569             :     /* prevent concurrent creations/drops */
     570          22 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
     571             : 
     572             :     /* write actual data */
     573         242 :     for (i = 0; i < max_replication_slots; i++)
     574             :     {
     575             :         ReplicationStateOnDisk disk_state;
     576         220 :         ReplicationState *curstate = &replication_states[i];
     577             :         XLogRecPtr  local_lsn;
     578             : 
     579         220 :         if (curstate->roident == InvalidRepOriginId)
     580         220 :             continue;
     581             : 
     582             :         /* zero, to avoid uninitialized padding bytes */
     583           0 :         memset(&disk_state, 0, sizeof(disk_state));
     584             : 
     585           0 :         LWLockAcquire(&curstate->lock, LW_SHARED);
     586             : 
     587           0 :         disk_state.roident = curstate->roident;
     588             : 
     589           0 :         disk_state.remote_lsn = curstate->remote_lsn;
     590           0 :         local_lsn = curstate->local_lsn;
     591             : 
     592           0 :         LWLockRelease(&curstate->lock);
     593             : 
     594             :         /* make sure we only write out a commit that's persistent */
     595           0 :         XLogFlush(local_lsn);
     596             : 
     597           0 :         if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
     598             :             sizeof(disk_state))
     599             :         {
     600           0 :             CloseTransientFile(tmpfd);
     601           0 :             ereport(PANIC,
     602             :                     (errcode_for_file_access(),
     603             :                      errmsg("could not write to file \"%s\": %m",
     604             :                             tmppath)));
     605             :         }
     606             : 
     607           0 :         COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
     608             :     }
     609             : 
     610          22 :     LWLockRelease(ReplicationOriginLock);
     611             : 
     612             :     /* write out the CRC */
     613          22 :     FIN_CRC32C(crc);
     614          22 :     if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
     615             :     {
     616           0 :         CloseTransientFile(tmpfd);
     617           0 :         ereport(PANIC,
     618             :                 (errcode_for_file_access(),
     619             :                  errmsg("could not write to file \"%s\": %m",
     620             :                         tmppath)));
     621             :     }
     622             : 
     623          22 :     CloseTransientFile(tmpfd);
     624             : 
     625             :     /* fsync, rename to permanent file, fsync file and directory */
     626          22 :     durable_rename(tmppath, path, PANIC);
     627             : }
     628             : 
     629             : /*
     630             :  * Recover replication replay status from checkpoint data saved earlier by
     631             :  * CheckPointReplicationOrigin.
     632             :  *
     633             :  * This only needs to be called at startup and *not* during every checkpoint
     634             :  * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
     635             :  * state thereafter can be recovered by looking at commit records.
     636             :  */
     637             : void
     638           6 : StartupReplicationOrigin(void)
     639             : {
     640           6 :     const char *path = "pg_logical/replorigin_checkpoint";
     641             :     int         fd;
     642             :     int         readBytes;
     643           6 :     uint32      magic = REPLICATION_STATE_MAGIC;
     644           6 :     int         last_state = 0;
     645             :     pg_crc32c   file_crc;
     646             :     pg_crc32c   crc;
     647             : 
     648             :     /* don't want to overwrite already existing state */
     649             : #ifdef USE_ASSERT_CHECKING
     650             :     static bool already_started = false;
     651             : 
     652           6 :     Assert(!already_started);
     653           6 :     already_started = true;
     654             : #endif
     655             : 
     656           6 :     if (max_replication_slots == 0)
     657           2 :         return;
     658             : 
     659           6 :     INIT_CRC32C(crc);
     660             : 
     661           6 :     elog(DEBUG2, "starting up replication origin progress state");
     662             : 
     663           6 :     fd = OpenTransientFile((char *) path, O_RDONLY | PG_BINARY, 0);
     664             : 
     665             :     /*
     666             :      * might have had max_replication_slots == 0 last run, or we just brought
     667             :      * up a standby.
     668             :      */
     669           6 :     if (fd < 0 && errno == ENOENT)
     670           2 :         return;
     671           4 :     else if (fd < 0)
     672           0 :         ereport(PANIC,
     673             :                 (errcode_for_file_access(),
     674             :                  errmsg("could not open file \"%s\": %m",
     675             :                         path)));
     676             : 
     677             :     /* verify magic, that is written even if nothing was active */
     678           4 :     readBytes = read(fd, &magic, sizeof(magic));
     679           4 :     if (readBytes != sizeof(magic))
     680           0 :         ereport(PANIC,
     681             :                 (errmsg("could not read file \"%s\": %m",
     682             :                         path)));
     683           4 :     COMP_CRC32C(crc, &magic, sizeof(magic));
     684             : 
     685           4 :     if (magic != REPLICATION_STATE_MAGIC)
     686           0 :         ereport(PANIC,
     687             :                 (errmsg("replication checkpoint has wrong magic %u instead of %u",
     688             :                         magic, REPLICATION_STATE_MAGIC)));
     689             : 
     690             :     /* we can skip locking here, no other access is possible */
     691             : 
     692             :     /* recover individual states, until there are no more to be found */
     693             :     while (true)
     694             :     {
     695             :         ReplicationStateOnDisk disk_state;
     696             : 
     697           4 :         readBytes = read(fd, &disk_state, sizeof(disk_state));
     698             : 
     699             :         /* no further data */
     700           4 :         if (readBytes == sizeof(crc))
     701             :         {
     702             :             /* not pretty, but simple ... */
     703           4 :             file_crc = *(pg_crc32c *) &disk_state;
     704           4 :             break;
     705             :         }
     706             : 
     707           0 :         if (readBytes < 0)
     708             :         {
     709           0 :             ereport(PANIC,
     710             :                     (errcode_for_file_access(),
     711             :                      errmsg("could not read file \"%s\": %m",
     712             :                             path)));
     713             :         }
     714             : 
     715           0 :         if (readBytes != sizeof(disk_state))
     716             :         {
     717           0 :             ereport(PANIC,
     718             :                     (errcode_for_file_access(),
     719             :                      errmsg("could not read file \"%s\": read %d of %zu",
     720             :                             path, readBytes, sizeof(disk_state))));
     721             :         }
     722             : 
     723           0 :         COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
     724             : 
     725           0 :         if (last_state == max_replication_slots)
     726           0 :             ereport(PANIC,
     727             :                     (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     728             :                      errmsg("could not find free replication state, increase max_replication_slots")));
     729             : 
     730             :         /* copy data to shared memory */
     731           0 :         replication_states[last_state].roident = disk_state.roident;
     732           0 :         replication_states[last_state].remote_lsn = disk_state.remote_lsn;
     733           0 :         last_state++;
     734             : 
     735           0 :         elog(LOG, "recovered replication state of node %u to %X/%X",
     736             :              disk_state.roident,
     737             :              (uint32) (disk_state.remote_lsn >> 32),
     738             :              (uint32) disk_state.remote_lsn);
     739           0 :     }
     740             : 
     741             :     /* now check checksum */
     742           4 :     FIN_CRC32C(crc);
     743           4 :     if (file_crc != crc)
     744           0 :         ereport(PANIC,
     745             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     746             :                  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
     747             :                         crc, file_crc)));
     748             : 
     749           4 :     CloseTransientFile(fd);
     750             : }
     751             : 
     752             : void
     753           0 : replorigin_redo(XLogReaderState *record)
     754             : {
     755           0 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
     756             : 
     757           0 :     switch (info)
     758             :     {
     759             :         case XLOG_REPLORIGIN_SET:
     760             :             {
     761           0 :                 xl_replorigin_set *xlrec =
     762             :                 (xl_replorigin_set *) XLogRecGetData(record);
     763             : 
     764           0 :                 replorigin_advance(xlrec->node_id,
     765             :                                    xlrec->remote_lsn, record->EndRecPtr,
     766           0 :                                    xlrec->force /* backward */ ,
     767             :                                    false /* WAL log */ );
     768           0 :                 break;
     769             :             }
     770             :         case XLOG_REPLORIGIN_DROP:
     771             :             {
     772             :                 xl_replorigin_drop *xlrec;
     773             :                 int         i;
     774             : 
     775           0 :                 xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
     776             : 
     777           0 :                 for (i = 0; i < max_replication_slots; i++)
     778             :                 {
     779           0 :                     ReplicationState *state = &replication_states[i];
     780             : 
     781             :                     /* found our slot */
     782           0 :                     if (state->roident == xlrec->node_id)
     783             :                     {
     784             :                         /* reset entry */
     785           0 :                         state->roident = InvalidRepOriginId;
     786           0 :                         state->remote_lsn = InvalidXLogRecPtr;
     787           0 :                         state->local_lsn = InvalidXLogRecPtr;
     788           0 :                         break;
     789             :                     }
     790             :                 }
     791           0 :                 break;
     792             :             }
     793             :         default:
     794           0 :             elog(PANIC, "replorigin_redo: unknown op code %u", info);
     795             :     }
     796           0 : }
     797             : 
     798             : 
     799             : /*
     800             :  * Tell the replication origin progress machinery that a commit from 'node'
     801             :  * that originated at the LSN remote_commit on the remote node was replayed
     802             :  * successfully and that we don't need to do so again. In combination with
     803             :  * setting up replorigin_session_origin_lsn and replorigin_session_origin
     804             :  * that ensures we won't loose knowledge about that after a crash if the
     805             :  * transaction had a persistent effect (think of asynchronous commits).
     806             :  *
     807             :  * local_commit needs to be a local LSN of the commit so that we can make sure
     808             :  * upon a checkpoint that enough WAL has been persisted to disk.
     809             :  *
     810             :  * Needs to be called with a RowExclusiveLock on pg_replication_origin,
     811             :  * unless running in recovery.
     812             :  */
     813             : void
     814           0 : replorigin_advance(RepOriginId node,
     815             :                    XLogRecPtr remote_commit, XLogRecPtr local_commit,
     816             :                    bool go_backward, bool wal_log)
     817             : {
     818             :     int         i;
     819           0 :     ReplicationState *replication_state = NULL;
     820           0 :     ReplicationState *free_state = NULL;
     821             : 
     822           0 :     Assert(node != InvalidRepOriginId);
     823             : 
     824             :     /* we don't track DoNotReplicateId */
     825           0 :     if (node == DoNotReplicateId)
     826           0 :         return;
     827             : 
     828             :     /*
     829             :      * XXX: For the case where this is called by WAL replay, it'd be more
     830             :      * efficient to restore into a backend local hashtable and only dump into
     831             :      * shmem after recovery is finished. Let's wait with implementing that
     832             :      * till it's shown to be a measurable expense
     833             :      */
     834             : 
     835             :     /* Lock exclusively, as we may have to create a new table entry. */
     836           0 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
     837             : 
     838             :     /*
     839             :      * Search for either an existing slot for the origin, or a free one we can
     840             :      * use.
     841             :      */
     842           0 :     for (i = 0; i < max_replication_slots; i++)
     843             :     {
     844           0 :         ReplicationState *curstate = &replication_states[i];
     845             : 
     846             :         /* remember where to insert if necessary */
     847           0 :         if (curstate->roident == InvalidRepOriginId &&
     848             :             free_state == NULL)
     849             :         {
     850           0 :             free_state = curstate;
     851           0 :             continue;
     852             :         }
     853             : 
     854             :         /* not our slot */
     855           0 :         if (curstate->roident != node)
     856             :         {
     857           0 :             continue;
     858             :         }
     859             : 
     860             :         /* ok, found slot */
     861           0 :         replication_state = curstate;
     862             : 
     863           0 :         LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
     864             : 
     865             :         /* Make sure it's not used by somebody else */
     866           0 :         if (replication_state->acquired_by != 0)
     867             :         {
     868           0 :             ereport(ERROR,
     869             :                     (errcode(ERRCODE_OBJECT_IN_USE),
     870             :                      errmsg("replication origin with OID %d is already active for PID %d",
     871             :                             replication_state->roident,
     872             :                             replication_state->acquired_by)));
     873             :         }
     874             : 
     875           0 :         break;
     876             :     }
     877             : 
     878           0 :     if (replication_state == NULL && free_state == NULL)
     879           0 :         ereport(ERROR,
     880             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     881             :                  errmsg("could not find free replication state slot for replication origin with OID %u",
     882             :                         node),
     883             :                  errhint("Increase max_replication_slots and try again.")));
     884             : 
     885           0 :     if (replication_state == NULL)
     886             :     {
     887             :         /* initialize new slot */
     888           0 :         LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
     889           0 :         replication_state = free_state;
     890           0 :         Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
     891           0 :         Assert(replication_state->local_lsn == InvalidXLogRecPtr);
     892           0 :         replication_state->roident = node;
     893             :     }
     894             : 
     895           0 :     Assert(replication_state->roident != InvalidRepOriginId);
     896             : 
     897             :     /*
     898             :      * If somebody "forcefully" sets this slot, WAL log it, so it's durable
     899             :      * and the standby gets the message. Primarily this will be called during
     900             :      * WAL replay (of commit records) where no WAL logging is necessary.
     901             :      */
     902           0 :     if (wal_log)
     903             :     {
     904             :         xl_replorigin_set xlrec;
     905             : 
     906           0 :         xlrec.remote_lsn = remote_commit;
     907           0 :         xlrec.node_id = node;
     908           0 :         xlrec.force = go_backward;
     909             : 
     910           0 :         XLogBeginInsert();
     911           0 :         XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
     912             : 
     913           0 :         XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
     914             :     }
     915             : 
     916             :     /*
     917             :      * Due to - harmless - race conditions during a checkpoint we could see
     918             :      * values here that are older than the ones we already have in memory.
     919             :      * Don't overwrite those.
     920             :      */
     921           0 :     if (go_backward || replication_state->remote_lsn < remote_commit)
     922           0 :         replication_state->remote_lsn = remote_commit;
     923           0 :     if (local_commit != InvalidXLogRecPtr &&
     924           0 :         (go_backward || replication_state->local_lsn < local_commit))
     925           0 :         replication_state->local_lsn = local_commit;
     926           0 :     LWLockRelease(&replication_state->lock);
     927             : 
     928             :     /*
     929             :      * Release *after* changing the LSNs, slot isn't acquired and thus could
     930             :      * otherwise be dropped anytime.
     931             :      */
     932           0 :     LWLockRelease(ReplicationOriginLock);
     933             : }
     934             : 
     935             : 
     936             : XLogRecPtr
     937           0 : replorigin_get_progress(RepOriginId node, bool flush)
     938             : {
     939             :     int         i;
     940           0 :     XLogRecPtr  local_lsn = InvalidXLogRecPtr;
     941           0 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
     942             : 
     943             :     /* prevent slots from being concurrently dropped */
     944           0 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
     945             : 
     946           0 :     for (i = 0; i < max_replication_slots; i++)
     947             :     {
     948             :         ReplicationState *state;
     949             : 
     950           0 :         state = &replication_states[i];
     951             : 
     952           0 :         if (state->roident == node)
     953             :         {
     954           0 :             LWLockAcquire(&state->lock, LW_SHARED);
     955             : 
     956           0 :             remote_lsn = state->remote_lsn;
     957           0 :             local_lsn = state->local_lsn;
     958             : 
     959           0 :             LWLockRelease(&state->lock);
     960             : 
     961           0 :             break;
     962             :         }
     963             :     }
     964             : 
     965           0 :     LWLockRelease(ReplicationOriginLock);
     966             : 
     967           0 :     if (flush && local_lsn != InvalidXLogRecPtr)
     968           0 :         XLogFlush(local_lsn);
     969             : 
     970           0 :     return remote_lsn;
     971             : }
     972             : 
     973             : /*
     974             :  * Tear down a (possibly) configured session replication origin during process
     975             :  * exit.
     976             :  */
     977             : static void
     978           0 : ReplicationOriginExitCleanup(int code, Datum arg)
     979             : {
     980           0 :     ConditionVariable *cv = NULL;
     981             : 
     982           0 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
     983             : 
     984           0 :     if (session_replication_state != NULL &&
     985           0 :         session_replication_state->acquired_by == MyProcPid)
     986             :     {
     987           0 :         cv = &session_replication_state->origin_cv;
     988             : 
     989           0 :         session_replication_state->acquired_by = 0;
     990           0 :         session_replication_state = NULL;
     991             :     }
     992             : 
     993           0 :     LWLockRelease(ReplicationOriginLock);
     994             : 
     995           0 :     if (cv)
     996           0 :         ConditionVariableBroadcast(cv);
     997           0 : }
     998             : 
     999             : /*
    1000             :  * Setup a replication origin in the shared memory struct if it doesn't
    1001             :  * already exists and cache access to the specific ReplicationSlot so the
    1002             :  * array doesn't have to be searched when calling
    1003             :  * replorigin_session_advance().
    1004             :  *
    1005             :  * Obviously only one such cached origin can exist per process and the current
    1006             :  * cached value can only be set again after the previous value is torn down
    1007             :  * with replorigin_session_reset().
    1008             :  */
    1009             : void
    1010           0 : replorigin_session_setup(RepOriginId node)
    1011             : {
    1012             :     static bool registered_cleanup;
    1013             :     int         i;
    1014           0 :     int         free_slot = -1;
    1015             : 
    1016           0 :     if (!registered_cleanup)
    1017             :     {
    1018           0 :         on_shmem_exit(ReplicationOriginExitCleanup, 0);
    1019           0 :         registered_cleanup = true;
    1020             :     }
    1021             : 
    1022           0 :     Assert(max_replication_slots > 0);
    1023             : 
    1024           0 :     if (session_replication_state != NULL)
    1025           0 :         ereport(ERROR,
    1026             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1027             :                  errmsg("cannot setup replication origin when one is already setup")));
    1028             : 
    1029             :     /* Lock exclusively, as we may have to create a new table entry. */
    1030           0 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1031             : 
    1032             :     /*
    1033             :      * Search for either an existing slot for the origin, or a free one we can
    1034             :      * use.
    1035             :      */
    1036           0 :     for (i = 0; i < max_replication_slots; i++)
    1037             :     {
    1038           0 :         ReplicationState *curstate = &replication_states[i];
    1039             : 
    1040             :         /* remember where to insert if necessary */
    1041           0 :         if (curstate->roident == InvalidRepOriginId &&
    1042             :             free_slot == -1)
    1043             :         {
    1044           0 :             free_slot = i;
    1045           0 :             continue;
    1046             :         }
    1047             : 
    1048             :         /* not our slot */
    1049           0 :         if (curstate->roident != node)
    1050           0 :             continue;
    1051             : 
    1052           0 :         else if (curstate->acquired_by != 0)
    1053             :         {
    1054           0 :             ereport(ERROR,
    1055             :                     (errcode(ERRCODE_OBJECT_IN_USE),
    1056             :                      errmsg("replication identifier %d is already active for PID %d",
    1057             :                             curstate->roident, curstate->acquired_by)));
    1058             :         }
    1059             : 
    1060             :         /* ok, found slot */
    1061           0 :         session_replication_state = curstate;
    1062             :     }
    1063             : 
    1064             : 
    1065           0 :     if (session_replication_state == NULL && free_slot == -1)
    1066           0 :         ereport(ERROR,
    1067             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
    1068             :                  errmsg("could not find free replication state slot for replication origin with OID %u",
    1069             :                         node),
    1070             :                  errhint("Increase max_replication_slots and try again.")));
    1071           0 :     else if (session_replication_state == NULL)
    1072             :     {
    1073             :         /* initialize new slot */
    1074           0 :         session_replication_state = &replication_states[free_slot];
    1075           0 :         Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
    1076           0 :         Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
    1077           0 :         session_replication_state->roident = node;
    1078             :     }
    1079             : 
    1080             : 
    1081           0 :     Assert(session_replication_state->roident != InvalidRepOriginId);
    1082             : 
    1083           0 :     session_replication_state->acquired_by = MyProcPid;
    1084             : 
    1085           0 :     LWLockRelease(ReplicationOriginLock);
    1086             : 
    1087             :     /* probably this one is pointless */
    1088           0 :     ConditionVariableBroadcast(&session_replication_state->origin_cv);
    1089           0 : }
    1090             : 
    1091             : /*
    1092             :  * Reset replay state previously setup in this session.
    1093             :  *
    1094             :  * This function may only be called if an origin was setup with
    1095             :  * replorigin_session_setup().
    1096             :  */
    1097             : void
    1098           0 : replorigin_session_reset(void)
    1099             : {
    1100             :     ConditionVariable *cv;
    1101             : 
    1102           0 :     Assert(max_replication_slots != 0);
    1103             : 
    1104           0 :     if (session_replication_state == NULL)
    1105           0 :         ereport(ERROR,
    1106             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1107             :                  errmsg("no replication origin is configured")));
    1108             : 
    1109           0 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1110             : 
    1111           0 :     session_replication_state->acquired_by = 0;
    1112           0 :     cv = &session_replication_state->origin_cv;
    1113           0 :     session_replication_state = NULL;
    1114             : 
    1115           0 :     LWLockRelease(ReplicationOriginLock);
    1116             : 
    1117           0 :     ConditionVariableBroadcast(cv);
    1118           0 : }
    1119             : 
    1120             : /*
    1121             :  * Do the same work replorigin_advance() does, just on the session's
    1122             :  * configured origin.
    1123             :  *
    1124             :  * This is noticeably cheaper than using replorigin_advance().
    1125             :  */
    1126             : void
    1127           0 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
    1128             : {
    1129           0 :     Assert(session_replication_state != NULL);
    1130           0 :     Assert(session_replication_state->roident != InvalidRepOriginId);
    1131             : 
    1132           0 :     LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
    1133           0 :     if (session_replication_state->local_lsn < local_commit)
    1134           0 :         session_replication_state->local_lsn = local_commit;
    1135           0 :     if (session_replication_state->remote_lsn < remote_commit)
    1136           0 :         session_replication_state->remote_lsn = remote_commit;
    1137           0 :     LWLockRelease(&session_replication_state->lock);
    1138           0 : }
    1139             : 
    1140             : /*
    1141             :  * Ask the machinery about the point up to which we successfully replayed
    1142             :  * changes from an already setup replication origin.
    1143             :  */
    1144             : XLogRecPtr
    1145           0 : replorigin_session_get_progress(bool flush)
    1146             : {
    1147             :     XLogRecPtr  remote_lsn;
    1148             :     XLogRecPtr  local_lsn;
    1149             : 
    1150           0 :     Assert(session_replication_state != NULL);
    1151             : 
    1152           0 :     LWLockAcquire(&session_replication_state->lock, LW_SHARED);
    1153           0 :     remote_lsn = session_replication_state->remote_lsn;
    1154           0 :     local_lsn = session_replication_state->local_lsn;
    1155           0 :     LWLockRelease(&session_replication_state->lock);
    1156             : 
    1157           0 :     if (flush && local_lsn != InvalidXLogRecPtr)
    1158           0 :         XLogFlush(local_lsn);
    1159             : 
    1160           0 :     return remote_lsn;
    1161             : }
    1162             : 
    1163             : 
    1164             : 
    1165             : /* ---------------------------------------------------------------------------
    1166             :  * SQL functions for working with replication origin.
    1167             :  *
    1168             :  * These mostly should be fairly short wrappers around more generic functions.
    1169             :  * ---------------------------------------------------------------------------
    1170             :  */
    1171             : 
    1172             : /*
    1173             :  * Create replication origin for the passed in name, and return the assigned
    1174             :  * oid.
    1175             :  */
    1176             : Datum
    1177           0 : pg_replication_origin_create(PG_FUNCTION_ARGS)
    1178             : {
    1179             :     char       *name;
    1180             :     RepOriginId roident;
    1181             : 
    1182           0 :     replorigin_check_prerequisites(false, false);
    1183             : 
    1184           0 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1185           0 :     roident = replorigin_create(name);
    1186             : 
    1187           0 :     pfree(name);
    1188             : 
    1189           0 :     PG_RETURN_OID(roident);
    1190             : }
    1191             : 
    1192             : /*
    1193             :  * Drop replication origin.
    1194             :  */
    1195             : Datum
    1196           0 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
    1197             : {
    1198             :     char       *name;
    1199             :     RepOriginId roident;
    1200             : 
    1201           0 :     replorigin_check_prerequisites(false, false);
    1202             : 
    1203           0 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1204             : 
    1205           0 :     roident = replorigin_by_name(name, false);
    1206           0 :     Assert(OidIsValid(roident));
    1207             : 
    1208           0 :     replorigin_drop(roident, true);
    1209             : 
    1210           0 :     pfree(name);
    1211             : 
    1212           0 :     PG_RETURN_VOID();
    1213             : }
    1214             : 
    1215             : /*
    1216             :  * Return oid of a replication origin.
    1217             :  */
    1218             : Datum
    1219           0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
    1220             : {
    1221             :     char       *name;
    1222             :     RepOriginId roident;
    1223             : 
    1224           0 :     replorigin_check_prerequisites(false, false);
    1225             : 
    1226           0 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1227           0 :     roident = replorigin_by_name(name, true);
    1228             : 
    1229           0 :     pfree(name);
    1230             : 
    1231           0 :     if (OidIsValid(roident))
    1232           0 :         PG_RETURN_OID(roident);
    1233           0 :     PG_RETURN_NULL();
    1234             : }
    1235             : 
    1236             : /*
    1237             :  * Setup a replication origin for this session.
    1238             :  */
    1239             : Datum
    1240           0 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
    1241             : {
    1242             :     char       *name;
    1243             :     RepOriginId origin;
    1244             : 
    1245           0 :     replorigin_check_prerequisites(true, false);
    1246             : 
    1247           0 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1248           0 :     origin = replorigin_by_name(name, false);
    1249           0 :     replorigin_session_setup(origin);
    1250             : 
    1251           0 :     replorigin_session_origin = origin;
    1252             : 
    1253           0 :     pfree(name);
    1254             : 
    1255           0 :     PG_RETURN_VOID();
    1256             : }
    1257             : 
    1258             : /*
    1259             :  * Reset previously setup origin in this session
    1260             :  */
    1261             : Datum
    1262           0 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
    1263             : {
    1264           0 :     replorigin_check_prerequisites(true, false);
    1265             : 
    1266           0 :     replorigin_session_reset();
    1267             : 
    1268           0 :     replorigin_session_origin = InvalidRepOriginId;
    1269           0 :     replorigin_session_origin_lsn = InvalidXLogRecPtr;
    1270           0 :     replorigin_session_origin_timestamp = 0;
    1271             : 
    1272           0 :     PG_RETURN_VOID();
    1273             : }
    1274             : 
    1275             : /*
    1276             :  * Has a replication origin been setup for this session.
    1277             :  */
    1278             : Datum
    1279           0 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
    1280             : {
    1281           0 :     replorigin_check_prerequisites(false, false);
    1282             : 
    1283           0 :     PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
    1284             : }
    1285             : 
    1286             : 
    1287             : /*
    1288             :  * Return the replication progress for origin setup in the current session.
    1289             :  *
    1290             :  * If 'flush' is set to true it is ensured that the returned value corresponds
    1291             :  * to a local transaction that has been flushed. This is useful if asynchronous
    1292             :  * commits are used when replaying replicated transactions.
    1293             :  */
    1294             : Datum
    1295           0 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
    1296             : {
    1297           0 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1298           0 :     bool        flush = PG_GETARG_BOOL(0);
    1299             : 
    1300           0 :     replorigin_check_prerequisites(true, false);
    1301             : 
    1302           0 :     if (session_replication_state == NULL)
    1303           0 :         ereport(ERROR,
    1304             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1305             :                  errmsg("no replication origin is configured")));
    1306             : 
    1307           0 :     remote_lsn = replorigin_session_get_progress(flush);
    1308             : 
    1309           0 :     if (remote_lsn == InvalidXLogRecPtr)
    1310           0 :         PG_RETURN_NULL();
    1311             : 
    1312           0 :     PG_RETURN_LSN(remote_lsn);
    1313             : }
    1314             : 
    1315             : Datum
    1316           0 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
    1317             : {
    1318           0 :     XLogRecPtr  location = PG_GETARG_LSN(0);
    1319             : 
    1320           0 :     replorigin_check_prerequisites(true, false);
    1321             : 
    1322           0 :     if (session_replication_state == NULL)
    1323           0 :         ereport(ERROR,
    1324             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1325             :                  errmsg("no replication origin is configured")));
    1326             : 
    1327           0 :     replorigin_session_origin_lsn = location;
    1328           0 :     replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
    1329             : 
    1330           0 :     PG_RETURN_VOID();
    1331             : }
    1332             : 
    1333             : Datum
    1334           0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
    1335             : {
    1336           0 :     replorigin_check_prerequisites(true, false);
    1337             : 
    1338           0 :     replorigin_session_origin_lsn = InvalidXLogRecPtr;
    1339           0 :     replorigin_session_origin_timestamp = 0;
    1340             : 
    1341           0 :     PG_RETURN_VOID();
    1342             : }
    1343             : 
    1344             : 
    1345             : Datum
    1346           0 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
    1347             : {
    1348           0 :     text       *name = PG_GETARG_TEXT_PP(0);
    1349           0 :     XLogRecPtr  remote_commit = PG_GETARG_LSN(1);
    1350             :     RepOriginId node;
    1351             : 
    1352           0 :     replorigin_check_prerequisites(true, false);
    1353             : 
    1354             :     /* lock to prevent the replication origin from vanishing */
    1355           0 :     LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1356             : 
    1357           0 :     node = replorigin_by_name(text_to_cstring(name), false);
    1358             : 
    1359             :     /*
    1360             :      * Can't sensibly pass a local commit to be flushed at checkpoint - this
    1361             :      * xact hasn't committed yet. This is why this function should be used to
    1362             :      * set up the initial replication state, but not for replay.
    1363             :      */
    1364           0 :     replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
    1365             :                        true /* go backward */ , true /* WAL log */ );
    1366             : 
    1367           0 :     UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1368             : 
    1369           0 :     PG_RETURN_VOID();
    1370             : }
    1371             : 
    1372             : 
    1373             : /*
    1374             :  * Return the replication progress for an individual replication origin.
    1375             :  *
    1376             :  * If 'flush' is set to true it is ensured that the returned value corresponds
    1377             :  * to a local transaction that has been flushed. This is useful if asynchronous
    1378             :  * commits are used when replaying replicated transactions.
    1379             :  */
    1380             : Datum
    1381           0 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
    1382             : {
    1383             :     char       *name;
    1384             :     bool        flush;
    1385             :     RepOriginId roident;
    1386           0 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1387             : 
    1388           0 :     replorigin_check_prerequisites(true, true);
    1389             : 
    1390           0 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1391           0 :     flush = PG_GETARG_BOOL(1);
    1392             : 
    1393           0 :     roident = replorigin_by_name(name, false);
    1394           0 :     Assert(OidIsValid(roident));
    1395             : 
    1396           0 :     remote_lsn = replorigin_get_progress(roident, flush);
    1397             : 
    1398           0 :     if (remote_lsn == InvalidXLogRecPtr)
    1399           0 :         PG_RETURN_NULL();
    1400             : 
    1401           0 :     PG_RETURN_LSN(remote_lsn);
    1402             : }
    1403             : 
    1404             : 
    1405             : Datum
    1406           0 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
    1407             : {
    1408           0 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1409             :     TupleDesc   tupdesc;
    1410             :     Tuplestorestate *tupstore;
    1411             :     MemoryContext per_query_ctx;
    1412             :     MemoryContext oldcontext;
    1413             :     int         i;
    1414             : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
    1415             : 
    1416             :     /* we we want to return 0 rows if slot is set to zero */
    1417           0 :     replorigin_check_prerequisites(false, true);
    1418             : 
    1419           0 :     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
    1420           0 :         ereport(ERROR,
    1421             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1422             :                  errmsg("set-valued function called in context that cannot accept a set")));
    1423           0 :     if (!(rsinfo->allowedModes & SFRM_Materialize))
    1424           0 :         ereport(ERROR,
    1425             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1426             :                  errmsg("materialize mode required, but it is not allowed in this context")));
    1427           0 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
    1428           0 :         elog(ERROR, "return type must be a row type");
    1429             : 
    1430           0 :     if (tupdesc->natts != REPLICATION_ORIGIN_PROGRESS_COLS)
    1431           0 :         elog(ERROR, "wrong function definition");
    1432             : 
    1433           0 :     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
    1434           0 :     oldcontext = MemoryContextSwitchTo(per_query_ctx);
    1435             : 
    1436           0 :     tupstore = tuplestore_begin_heap(true, false, work_mem);
    1437           0 :     rsinfo->returnMode = SFRM_Materialize;
    1438           0 :     rsinfo->setResult = tupstore;
    1439           0 :     rsinfo->setDesc = tupdesc;
    1440             : 
    1441           0 :     MemoryContextSwitchTo(oldcontext);
    1442             : 
    1443             : 
    1444             :     /* prevent slots from being concurrently dropped */
    1445           0 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
    1446             : 
    1447             :     /*
    1448             :      * Iterate through all possible replication_states, display if they are
    1449             :      * filled. Note that we do not take any locks, so slightly corrupted/out
    1450             :      * of date values are a possibility.
    1451             :      */
    1452           0 :     for (i = 0; i < max_replication_slots; i++)
    1453             :     {
    1454             :         ReplicationState *state;
    1455             :         Datum       values[REPLICATION_ORIGIN_PROGRESS_COLS];
    1456             :         bool        nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
    1457             :         char       *roname;
    1458             : 
    1459           0 :         state = &replication_states[i];
    1460             : 
    1461             :         /* unused slot, nothing to display */
    1462           0 :         if (state->roident == InvalidRepOriginId)
    1463           0 :             continue;
    1464             : 
    1465           0 :         memset(values, 0, sizeof(values));
    1466           0 :         memset(nulls, 1, sizeof(nulls));
    1467             : 
    1468           0 :         values[0] = ObjectIdGetDatum(state->roident);
    1469           0 :         nulls[0] = false;
    1470             : 
    1471             :         /*
    1472             :          * We're not preventing the origin to be dropped concurrently, so
    1473             :          * silently accept that it might be gone.
    1474             :          */
    1475           0 :         if (replorigin_by_oid(state->roident, true,
    1476             :                               &roname))
    1477             :         {
    1478           0 :             values[1] = CStringGetTextDatum(roname);
    1479           0 :             nulls[1] = false;
    1480             :         }
    1481             : 
    1482           0 :         LWLockAcquire(&state->lock, LW_SHARED);
    1483             : 
    1484           0 :         values[2] = LSNGetDatum(state->remote_lsn);
    1485           0 :         nulls[2] = false;
    1486             : 
    1487           0 :         values[3] = LSNGetDatum(state->local_lsn);
    1488           0 :         nulls[3] = false;
    1489             : 
    1490           0 :         LWLockRelease(&state->lock);
    1491             : 
    1492           0 :         tuplestore_putvalues(tupstore, tupdesc, values, nulls);
    1493             :     }
    1494             : 
    1495             :     tuplestore_donestoring(tupstore);
    1496             : 
    1497           0 :     LWLockRelease(ReplicationOriginLock);
    1498             : 
    1499             : #undef REPLICATION_ORIGIN_PROGRESS_COLS
    1500             : 
    1501           0 :     return (Datum) 0;
    1502             : }

Generated by: LCOV version 1.11