LCOV - code coverage report
Current view: top level - src/backend/replication - syncrep.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 28 380 7.4 %
Date: 2017-09-29 15:12:54 Functions: 6 20 30.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * syncrep.c
       4             :  *
       5             :  * Synchronous replication is new as of PostgreSQL 9.1.
       6             :  *
       7             :  * If requested, transaction commits wait until their commit LSN are
       8             :  * acknowledged by the synchronous standbys.
       9             :  *
      10             :  * This module contains the code for waiting and release of backends.
      11             :  * All code in this module executes on the primary. The core streaming
      12             :  * replication transport remains within WALreceiver/WALsender modules.
      13             :  *
      14             :  * The essence of this design is that it isolates all logic about
      15             :  * waiting/releasing onto the primary. The primary defines which standbys
      16             :  * it wishes to wait for. The standbys are completely unaware of the
      17             :  * durability requirements of transactions on the primary, reducing the
      18             :  * complexity of the code and streamlining both standby operations and
      19             :  * network bandwidth because there is no requirement to ship
      20             :  * per-transaction state information.
      21             :  *
      22             :  * Replication is either synchronous or not synchronous (async). If it is
      23             :  * async, we just fastpath out of here. If it is sync, then we wait for
      24             :  * the write, flush or apply location on the standby before releasing
      25             :  * the waiting backend. Further complexity in that interaction is
      26             :  * expected in later releases.
      27             :  *
      28             :  * The best performing way to manage the waiting backends is to have a
      29             :  * single ordered queue of waiting backends, so that we can avoid
      30             :  * searching the through all waiters each time we receive a reply.
      31             :  *
      32             :  * In 9.5 or before only a single standby could be considered as
      33             :  * synchronous. In 9.6 we support a priority-based multiple synchronous
      34             :  * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
      35             :  * supported. The number of synchronous standbys that transactions
      36             :  * must wait for replies from is specified in synchronous_standby_names.
      37             :  * This parameter also specifies a list of standby names and the method
      38             :  * (FIRST and ANY) to choose synchronous standbys from the listed ones.
      39             :  *
      40             :  * The method FIRST specifies a priority-based synchronous replication
      41             :  * and makes transaction commits wait until their WAL records are
      42             :  * replicated to the requested number of synchronous standbys chosen based
      43             :  * on their priorities. The standbys whose names appear earlier in the list
      44             :  * are given higher priority and will be considered as synchronous.
      45             :  * Other standby servers appearing later in this list represent potential
      46             :  * synchronous standbys. If any of the current synchronous standbys
      47             :  * disconnects for whatever reason, it will be replaced immediately with
      48             :  * the next-highest-priority standby.
      49             :  *
      50             :  * The method ANY specifies a quorum-based synchronous replication
      51             :  * and makes transaction commits wait until their WAL records are
      52             :  * replicated to at least the requested number of synchronous standbys
      53             :  * in the list. All the standbys appearing in the list are considered as
      54             :  * candidates for quorum synchronous standbys.
      55             :  *
      56             :  * If neither FIRST nor ANY is specified, FIRST is used as the method.
      57             :  * This is for backward compatibility with 9.6 or before where only a
      58             :  * priority-based sync replication was supported.
      59             :  *
      60             :  * Before the standbys chosen from synchronous_standby_names can
      61             :  * become the synchronous standbys they must have caught up with
      62             :  * the primary; that may take some time. Once caught up,
      63             :  * the standbys which are considered as synchronous at that moment
      64             :  * will release waiters from the queue.
      65             :  *
      66             :  * Portions Copyright (c) 2010-2017, PostgreSQL Global Development Group
      67             :  *
      68             :  * IDENTIFICATION
      69             :  *    src/backend/replication/syncrep.c
      70             :  *
      71             :  *-------------------------------------------------------------------------
      72             :  */
      73             : #include "postgres.h"
      74             : 
      75             : #include <unistd.h>
      76             : 
      77             : #include "access/xact.h"
      78             : #include "miscadmin.h"
      79             : #include "pgstat.h"
      80             : #include "replication/syncrep.h"
      81             : #include "replication/walsender.h"
      82             : #include "replication/walsender_private.h"
      83             : #include "storage/pmsignal.h"
      84             : #include "storage/proc.h"
      85             : #include "tcop/tcopprot.h"
      86             : #include "utils/builtins.h"
      87             : #include "utils/ps_status.h"
      88             : 
      89             : /* User-settable parameters for sync rep */
      90             : char       *SyncRepStandbyNames;
      91             : 
      92             : #define SyncStandbysDefined() \
      93             :     (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
      94             : 
      95             : static bool announce_next_takeover = true;
      96             : 
      97             : SyncRepConfigData *SyncRepConfig = NULL;
      98             : static int  SyncRepWaitMode = SYNC_REP_NO_WAIT;
      99             : 
     100             : static void SyncRepQueueInsert(int mode);
     101             : static void SyncRepCancelWait(void);
     102             : static int  SyncRepWakeQueue(bool all, int mode);
     103             : 
     104             : static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
     105             :                      XLogRecPtr *flushPtr,
     106             :                      XLogRecPtr *applyPtr,
     107             :                      bool *am_sync);
     108             : static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
     109             :                            XLogRecPtr *flushPtr,
     110             :                            XLogRecPtr *applyPtr,
     111             :                            List *sync_standbys);
     112             : static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
     113             :                               XLogRecPtr *flushPtr,
     114             :                               XLogRecPtr *applyPtr,
     115             :                               List *sync_standbys, uint8 nth);
     116             : static int  SyncRepGetStandbyPriority(void);
     117             : static List *SyncRepGetSyncStandbysPriority(bool *am_sync);
     118             : static List *SyncRepGetSyncStandbysQuorum(bool *am_sync);
     119             : static int  cmp_lsn(const void *a, const void *b);
     120             : 
     121             : #ifdef USE_ASSERT_CHECKING
     122             : static bool SyncRepQueueIsOrderedByLSN(int mode);
     123             : #endif
     124             : 
     125             : /*
     126             :  * ===========================================================
     127             :  * Synchronous Replication functions for normal user backends
     128             :  * ===========================================================
     129             :  */
     130             : 
     131             : /*
     132             :  * Wait for synchronous replication, if requested by user.
     133             :  *
     134             :  * Initially backends start in state SYNC_REP_NOT_WAITING and then
     135             :  * change that state to SYNC_REP_WAITING before adding ourselves
     136             :  * to the wait queue. During SyncRepWakeQueue() a WALSender changes
     137             :  * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
     138             :  * This backend then resets its state to SYNC_REP_NOT_WAITING.
     139             :  *
     140             :  * 'lsn' represents the LSN to wait for.  'commit' indicates whether this LSN
     141             :  * represents a commit record.  If it doesn't, then we wait only for the WAL
     142             :  * to be flushed if synchronous_commit is set to the higher level of
     143             :  * remote_apply, because only commit records provide apply feedback.
     144             :  */
     145             : void
     146        9511 : SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
     147             : {
     148        9511 :     char       *new_status = NULL;
     149             :     const char *old_status;
     150             :     int         mode;
     151             : 
     152             :     /* Cap the level for anything other than commit to remote flush only. */
     153        9511 :     if (commit)
     154        9502 :         mode = SyncRepWaitMode;
     155             :     else
     156           9 :         mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
     157             : 
     158             :     /*
     159             :      * Fast exit if user has not requested sync replication, or there are no
     160             :      * sync replication standby names defined. Note that those standbys don't
     161             :      * need to be connected.
     162             :      */
     163        9511 :     if (!SyncRepRequested() || !SyncStandbysDefined())
     164        9511 :         return;
     165             : 
     166           0 :     Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
     167           0 :     Assert(WalSndCtl != NULL);
     168             : 
     169           0 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     170           0 :     Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
     171             : 
     172             :     /*
     173             :      * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
     174             :      * set.  See SyncRepUpdateSyncStandbysDefined.
     175             :      *
     176             :      * Also check that the standby hasn't already replied. Unlikely race
     177             :      * condition but we'll be fetching that cache line anyway so it's likely
     178             :      * to be a low cost check.
     179             :      */
     180           0 :     if (!WalSndCtl->sync_standbys_defined ||
     181           0 :         lsn <= WalSndCtl->lsn[mode])
     182             :     {
     183           0 :         LWLockRelease(SyncRepLock);
     184           0 :         return;
     185             :     }
     186             : 
     187             :     /*
     188             :      * Set our waitLSN so WALSender will know when to wake us, and add
     189             :      * ourselves to the queue.
     190             :      */
     191           0 :     MyProc->waitLSN = lsn;
     192           0 :     MyProc->syncRepState = SYNC_REP_WAITING;
     193           0 :     SyncRepQueueInsert(mode);
     194           0 :     Assert(SyncRepQueueIsOrderedByLSN(mode));
     195           0 :     LWLockRelease(SyncRepLock);
     196             : 
     197             :     /* Alter ps display to show waiting for sync rep. */
     198           0 :     if (update_process_title)
     199             :     {
     200             :         int         len;
     201             : 
     202           0 :         old_status = get_ps_display(&len);
     203           0 :         new_status = (char *) palloc(len + 32 + 1);
     204           0 :         memcpy(new_status, old_status, len);
     205           0 :         sprintf(new_status + len, " waiting for %X/%X",
     206           0 :                 (uint32) (lsn >> 32), (uint32) lsn);
     207           0 :         set_ps_display(new_status, false);
     208           0 :         new_status[len] = '\0'; /* truncate off " waiting ..." */
     209             :     }
     210             : 
     211             :     /*
     212             :      * Wait for specified LSN to be confirmed.
     213             :      *
     214             :      * Each proc has its own wait latch, so we perform a normal latch
     215             :      * check/wait loop here.
     216             :      */
     217             :     for (;;)
     218             :     {
     219             :         /* Must reset the latch before testing state. */
     220           0 :         ResetLatch(MyLatch);
     221             : 
     222             :         /*
     223             :          * Acquiring the lock is not needed, the latch ensures proper
     224             :          * barriers. If it looks like we're done, we must really be done,
     225             :          * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
     226             :          * it will never update it again, so we can't be seeing a stale value
     227             :          * in that case.
     228             :          */
     229           0 :         if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE)
     230           0 :             break;
     231             : 
     232             :         /*
     233             :          * If a wait for synchronous replication is pending, we can neither
     234             :          * acknowledge the commit nor raise ERROR or FATAL.  The latter would
     235             :          * lead the client to believe that the transaction aborted, which is
     236             :          * not true: it's already committed locally. The former is no good
     237             :          * either: the client has requested synchronous replication, and is
     238             :          * entitled to assume that an acknowledged commit is also replicated,
     239             :          * which might not be true. So in this case we issue a WARNING (which
     240             :          * some clients may be able to interpret) and shut off further output.
     241             :          * We do NOT reset ProcDiePending, so that the process will die after
     242             :          * the commit is cleaned up.
     243             :          */
     244           0 :         if (ProcDiePending)
     245             :         {
     246           0 :             ereport(WARNING,
     247             :                     (errcode(ERRCODE_ADMIN_SHUTDOWN),
     248             :                      errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
     249             :                      errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
     250           0 :             whereToSendOutput = DestNone;
     251           0 :             SyncRepCancelWait();
     252           0 :             break;
     253             :         }
     254             : 
     255             :         /*
     256             :          * It's unclear what to do if a query cancel interrupt arrives.  We
     257             :          * can't actually abort at this point, but ignoring the interrupt
     258             :          * altogether is not helpful, so we just terminate the wait with a
     259             :          * suitable warning.
     260             :          */
     261           0 :         if (QueryCancelPending)
     262             :         {
     263           0 :             QueryCancelPending = false;
     264           0 :             ereport(WARNING,
     265             :                     (errmsg("canceling wait for synchronous replication due to user request"),
     266             :                      errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
     267           0 :             SyncRepCancelWait();
     268           0 :             break;
     269             :         }
     270             : 
     271             :         /*
     272             :          * If the postmaster dies, we'll probably never get an
     273             :          * acknowledgement, because all the wal sender processes will exit. So
     274             :          * just bail out.
     275             :          */
     276           0 :         if (!PostmasterIsAlive())
     277             :         {
     278           0 :             ProcDiePending = true;
     279           0 :             whereToSendOutput = DestNone;
     280           0 :             SyncRepCancelWait();
     281           0 :             break;
     282             :         }
     283             : 
     284             :         /*
     285             :          * Wait on latch.  Any condition that should wake us up will set the
     286             :          * latch, so no need for timeout.
     287             :          */
     288           0 :         WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
     289             :                   WAIT_EVENT_SYNC_REP);
     290           0 :     }
     291             : 
     292             :     /*
     293             :      * WalSender has checked our LSN and has removed us from queue. Clean up
     294             :      * state and leave.  It's OK to reset these shared memory fields without
     295             :      * holding SyncRepLock, because any walsenders will ignore us anyway when
     296             :      * we're not on the queue.  We need a read barrier to make sure we see the
     297             :      * changes to the queue link (this might be unnecessary without
     298             :      * assertions, but better safe than sorry).
     299             :      */
     300           0 :     pg_read_barrier();
     301           0 :     Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
     302           0 :     MyProc->syncRepState = SYNC_REP_NOT_WAITING;
     303           0 :     MyProc->waitLSN = 0;
     304             : 
     305           0 :     if (new_status)
     306             :     {
     307             :         /* Reset ps display */
     308           0 :         set_ps_display(new_status, false);
     309           0 :         pfree(new_status);
     310             :     }
     311             : }
     312             : 
     313             : /*
     314             :  * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
     315             :  *
     316             :  * Usually we will go at tail of queue, though it's possible that we arrive
     317             :  * here out of order, so start at tail and work back to insertion point.
     318             :  */
     319             : static void
     320           0 : SyncRepQueueInsert(int mode)
     321             : {
     322             :     PGPROC     *proc;
     323             : 
     324           0 :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
     325           0 :     proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
     326           0 :                                    &(WalSndCtl->SyncRepQueue[mode]),
     327             :                                    offsetof(PGPROC, syncRepLinks));
     328             : 
     329           0 :     while (proc)
     330             :     {
     331             :         /*
     332             :          * Stop at the queue element that we should after to ensure the queue
     333             :          * is ordered by LSN.
     334             :          */
     335           0 :         if (proc->waitLSN < MyProc->waitLSN)
     336           0 :             break;
     337             : 
     338           0 :         proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
     339           0 :                                        &(proc->syncRepLinks),
     340             :                                        offsetof(PGPROC, syncRepLinks));
     341             :     }
     342             : 
     343           0 :     if (proc)
     344           0 :         SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
     345             :     else
     346           0 :         SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
     347           0 : }
     348             : 
     349             : /*
     350             :  * Acquire SyncRepLock and cancel any wait currently in progress.
     351             :  */
     352             : static void
     353           0 : SyncRepCancelWait(void)
     354             : {
     355           0 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     356           0 :     if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
     357           0 :         SHMQueueDelete(&(MyProc->syncRepLinks));
     358           0 :     MyProc->syncRepState = SYNC_REP_NOT_WAITING;
     359           0 :     LWLockRelease(SyncRepLock);
     360           0 : }
     361             : 
     362             : void
     363         338 : SyncRepCleanupAtProcExit(void)
     364             : {
     365         338 :     if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
     366             :     {
     367           0 :         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     368           0 :         SHMQueueDelete(&(MyProc->syncRepLinks));
     369           0 :         LWLockRelease(SyncRepLock);
     370             :     }
     371         338 : }
     372             : 
     373             : /*
     374             :  * ===========================================================
     375             :  * Synchronous Replication functions for wal sender processes
     376             :  * ===========================================================
     377             :  */
     378             : 
     379             : /*
     380             :  * Take any action required to initialise sync rep state from config
     381             :  * data. Called at WALSender startup and after each SIGHUP.
     382             :  */
     383             : void
     384           0 : SyncRepInitConfig(void)
     385             : {
     386             :     int         priority;
     387             : 
     388             :     /*
     389             :      * Determine if we are a potential sync standby and remember the result
     390             :      * for handling replies from standby.
     391             :      */
     392           0 :     priority = SyncRepGetStandbyPriority();
     393           0 :     if (MyWalSnd->sync_standby_priority != priority)
     394             :     {
     395           0 :         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     396           0 :         MyWalSnd->sync_standby_priority = priority;
     397           0 :         LWLockRelease(SyncRepLock);
     398           0 :         ereport(DEBUG1,
     399             :                 (errmsg("standby \"%s\" now has synchronous standby priority %u",
     400             :                         application_name, priority)));
     401             :     }
     402           0 : }
     403             : 
     404             : /*
     405             :  * Update the LSNs on each queue based upon our latest state. This
     406             :  * implements a simple policy of first-valid-sync-standby-releases-waiter.
     407             :  *
     408             :  * Other policies are possible, which would change what we do here and
     409             :  * perhaps also which information we store as well.
     410             :  */
     411             : void
     412           0 : SyncRepReleaseWaiters(void)
     413             : {
     414           0 :     volatile WalSndCtlData *walsndctl = WalSndCtl;
     415             :     XLogRecPtr  writePtr;
     416             :     XLogRecPtr  flushPtr;
     417             :     XLogRecPtr  applyPtr;
     418             :     bool        got_recptr;
     419             :     bool        am_sync;
     420           0 :     int         numwrite = 0;
     421           0 :     int         numflush = 0;
     422           0 :     int         numapply = 0;
     423             : 
     424             :     /*
     425             :      * If this WALSender is serving a standby that is not on the list of
     426             :      * potential sync standbys then we have nothing to do. If we are still
     427             :      * starting up, still running base backup or the current flush position is
     428             :      * still invalid, then leave quickly also.
     429             :      */
     430           0 :     if (MyWalSnd->sync_standby_priority == 0 ||
     431           0 :         MyWalSnd->state < WALSNDSTATE_STREAMING ||
     432           0 :         XLogRecPtrIsInvalid(MyWalSnd->flush))
     433             :     {
     434           0 :         announce_next_takeover = true;
     435           0 :         return;
     436             :     }
     437             : 
     438             :     /*
     439             :      * We're a potential sync standby. Release waiters if there are enough
     440             :      * sync standbys and we are considered as sync.
     441             :      */
     442           0 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     443             : 
     444             :     /*
     445             :      * Check whether we are a sync standby or not, and calculate the synced
     446             :      * positions among all sync standbys.
     447             :      */
     448           0 :     got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
     449             : 
     450             :     /*
     451             :      * If we are managing a sync standby, though we weren't prior to this,
     452             :      * then announce we are now a sync standby.
     453             :      */
     454           0 :     if (announce_next_takeover && am_sync)
     455             :     {
     456           0 :         announce_next_takeover = false;
     457             : 
     458           0 :         if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
     459           0 :             ereport(LOG,
     460             :                     (errmsg("standby \"%s\" is now a synchronous standby with priority %u",
     461             :                             application_name, MyWalSnd->sync_standby_priority)));
     462             :         else
     463           0 :             ereport(LOG,
     464             :                     (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
     465             :                             application_name)));
     466             :     }
     467             : 
     468             :     /*
     469             :      * If the number of sync standbys is less than requested or we aren't
     470             :      * managing a sync standby then just leave.
     471             :      */
     472           0 :     if (!got_recptr || !am_sync)
     473             :     {
     474           0 :         LWLockRelease(SyncRepLock);
     475           0 :         announce_next_takeover = !am_sync;
     476           0 :         return;
     477             :     }
     478             : 
     479             :     /*
     480             :      * Set the lsn first so that when we wake backends they will release up to
     481             :      * this location.
     482             :      */
     483           0 :     if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
     484             :     {
     485           0 :         walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
     486           0 :         numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
     487             :     }
     488           0 :     if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
     489             :     {
     490           0 :         walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
     491           0 :         numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
     492             :     }
     493           0 :     if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
     494             :     {
     495           0 :         walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
     496           0 :         numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
     497             :     }
     498             : 
     499           0 :     LWLockRelease(SyncRepLock);
     500             : 
     501           0 :     elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
     502             :          numwrite, (uint32) (writePtr >> 32), (uint32) writePtr,
     503             :          numflush, (uint32) (flushPtr >> 32), (uint32) flushPtr,
     504             :          numapply, (uint32) (applyPtr >> 32), (uint32) applyPtr);
     505             : }
     506             : 
     507             : /*
     508             :  * Calculate the synced Write, Flush and Apply positions among sync standbys.
     509             :  *
     510             :  * Return false if the number of sync standbys is less than
     511             :  * synchronous_standby_names specifies. Otherwise return true and
     512             :  * store the positions into *writePtr, *flushPtr and *applyPtr.
     513             :  *
     514             :  * On return, *am_sync is set to true if this walsender is connecting to
     515             :  * sync standby. Otherwise it's set to false.
     516             :  */
     517             : static bool
     518           0 : SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
     519             :                      XLogRecPtr *applyPtr, bool *am_sync)
     520             : {
     521             :     List       *sync_standbys;
     522             : 
     523           0 :     *writePtr = InvalidXLogRecPtr;
     524           0 :     *flushPtr = InvalidXLogRecPtr;
     525           0 :     *applyPtr = InvalidXLogRecPtr;
     526           0 :     *am_sync = false;
     527             : 
     528             :     /* Get standbys that are considered as synchronous at this moment */
     529           0 :     sync_standbys = SyncRepGetSyncStandbys(am_sync);
     530             : 
     531             :     /*
     532             :      * Quick exit if we are not managing a sync standby or there are not
     533             :      * enough synchronous standbys.
     534             :      */
     535           0 :     if (!(*am_sync) ||
     536           0 :         SyncRepConfig == NULL ||
     537           0 :         list_length(sync_standbys) < SyncRepConfig->num_sync)
     538             :     {
     539           0 :         list_free(sync_standbys);
     540           0 :         return false;
     541             :     }
     542             : 
     543             :     /*
     544             :      * In a priority-based sync replication, the synced positions are the
     545             :      * oldest ones among sync standbys. In a quorum-based, they are the Nth
     546             :      * latest ones.
     547             :      *
     548             :      * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
     549             :      * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
     550             :      * because it's a bit more efficient.
     551             :      *
     552             :      * XXX If the numbers of current and requested sync standbys are the same,
     553             :      * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
     554             :      * positions even in a quorum-based sync replication.
     555             :      */
     556           0 :     if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
     557             :     {
     558           0 :         SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
     559             :                                    sync_standbys);
     560             :     }
     561             :     else
     562             :     {
     563           0 :         SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
     564           0 :                                       sync_standbys, SyncRepConfig->num_sync);
     565             :     }
     566             : 
     567           0 :     list_free(sync_standbys);
     568           0 :     return true;
     569             : }
     570             : 
     571             : /*
     572             :  * Calculate the oldest Write, Flush and Apply positions among sync standbys.
     573             :  */
     574             : static void
     575           0 : SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
     576             :                            XLogRecPtr *applyPtr, List *sync_standbys)
     577             : {
     578             :     ListCell   *cell;
     579             : 
     580             :     /*
     581             :      * Scan through all sync standbys and calculate the oldest Write, Flush
     582             :      * and Apply positions.
     583             :      */
     584           0 :     foreach(cell, sync_standbys)
     585             :     {
     586           0 :         WalSnd     *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
     587             :         XLogRecPtr  write;
     588             :         XLogRecPtr  flush;
     589             :         XLogRecPtr  apply;
     590             : 
     591           0 :         SpinLockAcquire(&walsnd->mutex);
     592           0 :         write = walsnd->write;
     593           0 :         flush = walsnd->flush;
     594           0 :         apply = walsnd->apply;
     595           0 :         SpinLockRelease(&walsnd->mutex);
     596             : 
     597           0 :         if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
     598           0 :             *writePtr = write;
     599           0 :         if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
     600           0 :             *flushPtr = flush;
     601           0 :         if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
     602           0 :             *applyPtr = apply;
     603             :     }
     604           0 : }
     605             : 
     606             : /*
     607             :  * Calculate the Nth latest Write, Flush and Apply positions among sync
     608             :  * standbys.
     609             :  */
     610             : static void
     611           0 : SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
     612             :                               XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth)
     613             : {
     614             :     ListCell   *cell;
     615             :     XLogRecPtr *write_array;
     616             :     XLogRecPtr *flush_array;
     617             :     XLogRecPtr *apply_array;
     618             :     int         len;
     619           0 :     int         i = 0;
     620             : 
     621           0 :     len = list_length(sync_standbys);
     622           0 :     write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
     623           0 :     flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
     624           0 :     apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
     625             : 
     626           0 :     foreach(cell, sync_standbys)
     627             :     {
     628           0 :         WalSnd     *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
     629             : 
     630           0 :         SpinLockAcquire(&walsnd->mutex);
     631           0 :         write_array[i] = walsnd->write;
     632           0 :         flush_array[i] = walsnd->flush;
     633           0 :         apply_array[i] = walsnd->apply;
     634           0 :         SpinLockRelease(&walsnd->mutex);
     635             : 
     636           0 :         i++;
     637             :     }
     638             : 
     639             :     /* Sort each array in descending order */
     640           0 :     qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn);
     641           0 :     qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn);
     642           0 :     qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn);
     643             : 
     644             :     /* Get Nth latest Write, Flush, Apply positions */
     645           0 :     *writePtr = write_array[nth - 1];
     646           0 :     *flushPtr = flush_array[nth - 1];
     647           0 :     *applyPtr = apply_array[nth - 1];
     648             : 
     649           0 :     pfree(write_array);
     650           0 :     pfree(flush_array);
     651           0 :     pfree(apply_array);
     652           0 : }
     653             : 
     654             : /*
     655             :  * Compare lsn in order to sort array in descending order.
     656             :  */
     657             : static int
     658           0 : cmp_lsn(const void *a, const void *b)
     659             : {
     660           0 :     XLogRecPtr  lsn1 = *((const XLogRecPtr *) a);
     661           0 :     XLogRecPtr  lsn2 = *((const XLogRecPtr *) b);
     662             : 
     663           0 :     if (lsn1 > lsn2)
     664           0 :         return -1;
     665           0 :     else if (lsn1 == lsn2)
     666           0 :         return 0;
     667             :     else
     668           0 :         return 1;
     669             : }
     670             : 
     671             : /*
     672             :  * Return the list of sync standbys, or NIL if no sync standby is connected.
     673             :  *
     674             :  * The caller must hold SyncRepLock.
     675             :  *
     676             :  * On return, *am_sync is set to true if this walsender is connecting to
     677             :  * sync standby. Otherwise it's set to false.
     678             :  */
     679             : List *
     680           0 : SyncRepGetSyncStandbys(bool *am_sync)
     681             : {
     682             :     /* Set default result */
     683           0 :     if (am_sync != NULL)
     684           0 :         *am_sync = false;
     685             : 
     686             :     /* Quick exit if sync replication is not requested */
     687           0 :     if (SyncRepConfig == NULL)
     688           0 :         return NIL;
     689             : 
     690           0 :     return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ?
     691           0 :         SyncRepGetSyncStandbysPriority(am_sync) :
     692             :         SyncRepGetSyncStandbysQuorum(am_sync);
     693             : }
     694             : 
     695             : /*
     696             :  * Return the list of all the candidates for quorum sync standbys,
     697             :  * or NIL if no such standby is connected.
     698             :  *
     699             :  * The caller must hold SyncRepLock. This function must be called only in
     700             :  * a quorum-based sync replication.
     701             :  *
     702             :  * On return, *am_sync is set to true if this walsender is connecting to
     703             :  * sync standby. Otherwise it's set to false.
     704             :  */
     705             : static List *
     706           0 : SyncRepGetSyncStandbysQuorum(bool *am_sync)
     707             : {
     708           0 :     List       *result = NIL;
     709             :     int         i;
     710             :     volatile WalSnd *walsnd;    /* Use volatile pointer to prevent code
     711             :                                  * rearrangement */
     712             : 
     713           0 :     Assert(SyncRepConfig->syncrep_method == SYNC_REP_QUORUM);
     714             : 
     715           0 :     for (i = 0; i < max_wal_senders; i++)
     716             :     {
     717             :         XLogRecPtr  flush;
     718             :         WalSndState state;
     719             :         int         pid;
     720             : 
     721           0 :         walsnd = &WalSndCtl->walsnds[i];
     722             : 
     723           0 :         SpinLockAcquire(&walsnd->mutex);
     724           0 :         pid = walsnd->pid;
     725           0 :         flush = walsnd->flush;
     726           0 :         state = walsnd->state;
     727           0 :         SpinLockRelease(&walsnd->mutex);
     728             : 
     729             :         /* Must be active */
     730           0 :         if (pid == 0)
     731           0 :             continue;
     732             : 
     733             :         /* Must be streaming */
     734           0 :         if (state != WALSNDSTATE_STREAMING)
     735           0 :             continue;
     736             : 
     737             :         /* Must be synchronous */
     738           0 :         if (walsnd->sync_standby_priority == 0)
     739           0 :             continue;
     740             : 
     741             :         /* Must have a valid flush position */
     742           0 :         if (XLogRecPtrIsInvalid(flush))
     743           0 :             continue;
     744             : 
     745             :         /*
     746             :          * Consider this standby as a candidate for quorum sync standbys and
     747             :          * append it to the result.
     748             :          */
     749           0 :         result = lappend_int(result, i);
     750           0 :         if (am_sync != NULL && walsnd == MyWalSnd)
     751           0 :             *am_sync = true;
     752             :     }
     753             : 
     754           0 :     return result;
     755             : }
     756             : 
     757             : /*
     758             :  * Return the list of sync standbys chosen based on their priorities,
     759             :  * or NIL if no sync standby is connected.
     760             :  *
     761             :  * If there are multiple standbys with the same priority,
     762             :  * the first one found is selected preferentially.
     763             :  *
     764             :  * The caller must hold SyncRepLock. This function must be called only in
     765             :  * a priority-based sync replication.
     766             :  *
     767             :  * On return, *am_sync is set to true if this walsender is connecting to
     768             :  * sync standby. Otherwise it's set to false.
     769             :  */
     770             : static List *
     771           0 : SyncRepGetSyncStandbysPriority(bool *am_sync)
     772             : {
     773           0 :     List       *result = NIL;
     774           0 :     List       *pending = NIL;
     775             :     int         lowest_priority;
     776             :     int         next_highest_priority;
     777             :     int         this_priority;
     778             :     int         priority;
     779             :     int         i;
     780           0 :     bool        am_in_pending = false;
     781             :     volatile WalSnd *walsnd;    /* Use volatile pointer to prevent code
     782             :                                  * rearrangement */
     783             : 
     784           0 :     Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY);
     785             : 
     786           0 :     lowest_priority = SyncRepConfig->nmembers;
     787           0 :     next_highest_priority = lowest_priority + 1;
     788             : 
     789             :     /*
     790             :      * Find the sync standbys which have the highest priority (i.e, 1). Also
     791             :      * store all the other potential sync standbys into the pending list, in
     792             :      * order to scan it later and find other sync standbys from it quickly.
     793             :      */
     794           0 :     for (i = 0; i < max_wal_senders; i++)
     795             :     {
     796             :         XLogRecPtr  flush;
     797             :         WalSndState state;
     798             :         int         pid;
     799             : 
     800           0 :         walsnd = &WalSndCtl->walsnds[i];
     801             : 
     802           0 :         SpinLockAcquire(&walsnd->mutex);
     803           0 :         pid = walsnd->pid;
     804           0 :         flush = walsnd->flush;
     805           0 :         state = walsnd->state;
     806           0 :         SpinLockRelease(&walsnd->mutex);
     807             : 
     808             :         /* Must be active */
     809           0 :         if (pid == 0)
     810           0 :             continue;
     811             : 
     812             :         /* Must be streaming */
     813           0 :         if (state != WALSNDSTATE_STREAMING)
     814           0 :             continue;
     815             : 
     816             :         /* Must be synchronous */
     817           0 :         this_priority = walsnd->sync_standby_priority;
     818           0 :         if (this_priority == 0)
     819           0 :             continue;
     820             : 
     821             :         /* Must have a valid flush position */
     822           0 :         if (XLogRecPtrIsInvalid(flush))
     823           0 :             continue;
     824             : 
     825             :         /*
     826             :          * If the priority is equal to 1, consider this standby as sync and
     827             :          * append it to the result. Otherwise append this standby to the
     828             :          * pending list to check if it's actually sync or not later.
     829             :          */
     830           0 :         if (this_priority == 1)
     831             :         {
     832           0 :             result = lappend_int(result, i);
     833           0 :             if (am_sync != NULL && walsnd == MyWalSnd)
     834           0 :                 *am_sync = true;
     835           0 :             if (list_length(result) == SyncRepConfig->num_sync)
     836             :             {
     837           0 :                 list_free(pending);
     838           0 :                 return result;  /* Exit if got enough sync standbys */
     839             :             }
     840             :         }
     841             :         else
     842             :         {
     843           0 :             pending = lappend_int(pending, i);
     844           0 :             if (am_sync != NULL && walsnd == MyWalSnd)
     845           0 :                 am_in_pending = true;
     846             : 
     847             :             /*
     848             :              * Track the highest priority among the standbys in the pending
     849             :              * list, in order to use it as the starting priority for later
     850             :              * scan of the list. This is useful to find quickly the sync
     851             :              * standbys from the pending list later because we can skip
     852             :              * unnecessary scans for the unused priorities.
     853             :              */
     854           0 :             if (this_priority < next_highest_priority)
     855           0 :                 next_highest_priority = this_priority;
     856             :         }
     857             :     }
     858             : 
     859             :     /*
     860             :      * Consider all pending standbys as sync if the number of them plus
     861             :      * already-found sync ones is lower than the configuration requests.
     862             :      */
     863           0 :     if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync)
     864             :     {
     865           0 :         bool        needfree = (result != NIL && pending != NIL);
     866             : 
     867             :         /*
     868             :          * Set *am_sync to true if this walsender is in the pending list
     869             :          * because all pending standbys are considered as sync.
     870             :          */
     871           0 :         if (am_sync != NULL && !(*am_sync))
     872           0 :             *am_sync = am_in_pending;
     873             : 
     874           0 :         result = list_concat(result, pending);
     875           0 :         if (needfree)
     876           0 :             pfree(pending);
     877           0 :         return result;
     878             :     }
     879             : 
     880             :     /*
     881             :      * Find the sync standbys from the pending list.
     882             :      */
     883           0 :     priority = next_highest_priority;
     884           0 :     while (priority <= lowest_priority)
     885             :     {
     886             :         ListCell   *cell;
     887           0 :         ListCell   *prev = NULL;
     888             :         ListCell   *next;
     889             : 
     890           0 :         next_highest_priority = lowest_priority + 1;
     891             : 
     892           0 :         for (cell = list_head(pending); cell != NULL; cell = next)
     893             :         {
     894           0 :             i = lfirst_int(cell);
     895           0 :             walsnd = &WalSndCtl->walsnds[i];
     896             : 
     897           0 :             next = lnext(cell);
     898             : 
     899           0 :             this_priority = walsnd->sync_standby_priority;
     900           0 :             if (this_priority == priority)
     901             :             {
     902           0 :                 result = lappend_int(result, i);
     903           0 :                 if (am_sync != NULL && walsnd == MyWalSnd)
     904           0 :                     *am_sync = true;
     905             : 
     906             :                 /*
     907             :                  * We should always exit here after the scan of pending list
     908             :                  * starts because we know that the list has enough elements to
     909             :                  * reach SyncRepConfig->num_sync.
     910             :                  */
     911           0 :                 if (list_length(result) == SyncRepConfig->num_sync)
     912             :                 {
     913           0 :                     list_free(pending);
     914           0 :                     return result;  /* Exit if got enough sync standbys */
     915             :                 }
     916             : 
     917             :                 /*
     918             :                  * Remove the entry for this sync standby from the list to
     919             :                  * prevent us from looking at the same entry again.
     920             :                  */
     921           0 :                 pending = list_delete_cell(pending, cell, prev);
     922             : 
     923           0 :                 continue;
     924             :             }
     925             : 
     926           0 :             if (this_priority < next_highest_priority)
     927           0 :                 next_highest_priority = this_priority;
     928             : 
     929           0 :             prev = cell;
     930             :         }
     931             : 
     932           0 :         priority = next_highest_priority;
     933             :     }
     934             : 
     935             :     /* never reached, but keep compiler quiet */
     936           0 :     Assert(false);
     937             :     return result;
     938             : }
     939             : 
     940             : /*
     941             :  * Check if we are in the list of sync standbys, and if so, determine
     942             :  * priority sequence. Return priority if set, or zero to indicate that
     943             :  * we are not a potential sync standby.
     944             :  *
     945             :  * Compare the parameter SyncRepStandbyNames against the application_name
     946             :  * for this WALSender, or allow any name if we find a wildcard "*".
     947             :  */
     948             : static int
     949           0 : SyncRepGetStandbyPriority(void)
     950             : {
     951             :     const char *standby_name;
     952             :     int         priority;
     953           0 :     bool        found = false;
     954             : 
     955             :     /*
     956             :      * Since synchronous cascade replication is not allowed, we always set the
     957             :      * priority of cascading walsender to zero.
     958             :      */
     959           0 :     if (am_cascading_walsender)
     960           0 :         return 0;
     961             : 
     962           0 :     if (!SyncStandbysDefined() || SyncRepConfig == NULL)
     963           0 :         return 0;
     964             : 
     965           0 :     standby_name = SyncRepConfig->member_names;
     966           0 :     for (priority = 1; priority <= SyncRepConfig->nmembers; priority++)
     967             :     {
     968           0 :         if (pg_strcasecmp(standby_name, application_name) == 0 ||
     969           0 :             strcmp(standby_name, "*") == 0)
     970             :         {
     971           0 :             found = true;
     972           0 :             break;
     973             :         }
     974           0 :         standby_name += strlen(standby_name) + 1;
     975             :     }
     976             : 
     977           0 :     if (!found)
     978           0 :         return 0;
     979             : 
     980             :     /*
     981             :      * In quorum-based sync replication, all the standbys in the list have the
     982             :      * same priority, one.
     983             :      */
     984           0 :     return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1;
     985             : }
     986             : 
     987             : /*
     988             :  * Walk the specified queue from head.  Set the state of any backends that
     989             :  * need to be woken, remove them from the queue, and then wake them.
     990             :  * Pass all = true to wake whole queue; otherwise, just wake up to
     991             :  * the walsender's LSN.
     992             :  *
     993             :  * Must hold SyncRepLock.
     994             :  */
     995             : static int
     996           0 : SyncRepWakeQueue(bool all, int mode)
     997             : {
     998           0 :     volatile WalSndCtlData *walsndctl = WalSndCtl;
     999           0 :     PGPROC     *proc = NULL;
    1000           0 :     PGPROC     *thisproc = NULL;
    1001           0 :     int         numprocs = 0;
    1002             : 
    1003           0 :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
    1004           0 :     Assert(SyncRepQueueIsOrderedByLSN(mode));
    1005             : 
    1006           0 :     proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
    1007           0 :                                    &(WalSndCtl->SyncRepQueue[mode]),
    1008             :                                    offsetof(PGPROC, syncRepLinks));
    1009             : 
    1010           0 :     while (proc)
    1011             :     {
    1012             :         /*
    1013             :          * Assume the queue is ordered by LSN
    1014             :          */
    1015           0 :         if (!all && walsndctl->lsn[mode] < proc->waitLSN)
    1016           0 :             return numprocs;
    1017             : 
    1018             :         /*
    1019             :          * Move to next proc, so we can delete thisproc from the queue.
    1020             :          * thisproc is valid, proc may be NULL after this.
    1021             :          */
    1022           0 :         thisproc = proc;
    1023           0 :         proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
    1024           0 :                                        &(proc->syncRepLinks),
    1025             :                                        offsetof(PGPROC, syncRepLinks));
    1026             : 
    1027             :         /*
    1028             :          * Remove thisproc from queue.
    1029             :          */
    1030           0 :         SHMQueueDelete(&(thisproc->syncRepLinks));
    1031             : 
    1032             :         /*
    1033             :          * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
    1034             :          * make sure that it sees the queue link being removed before the
    1035             :          * syncRepState change.
    1036             :          */
    1037           0 :         pg_write_barrier();
    1038             : 
    1039             :         /*
    1040             :          * Set state to complete; see SyncRepWaitForLSN() for discussion of
    1041             :          * the various states.
    1042             :          */
    1043           0 :         thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;
    1044             : 
    1045             :         /*
    1046             :          * Wake only when we have set state and removed from queue.
    1047             :          */
    1048           0 :         SetLatch(&(thisproc->procLatch));
    1049             : 
    1050           0 :         numprocs++;
    1051             :     }
    1052             : 
    1053           0 :     return numprocs;
    1054             : }
    1055             : 
    1056             : /*
    1057             :  * The checkpointer calls this as needed to update the shared
    1058             :  * sync_standbys_defined flag, so that backends don't remain permanently wedged
    1059             :  * if synchronous_standby_names is unset.  It's safe to check the current value
    1060             :  * without the lock, because it's only ever updated by one process.  But we
    1061             :  * must take the lock to change it.
    1062             :  */
    1063             : void
    1064           1 : SyncRepUpdateSyncStandbysDefined(void)
    1065             : {
    1066           1 :     bool        sync_standbys_defined = SyncStandbysDefined();
    1067             : 
    1068           1 :     if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
    1069             :     {
    1070           0 :         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
    1071             : 
    1072             :         /*
    1073             :          * If synchronous_standby_names has been reset to empty, it's futile
    1074             :          * for backends to continue to waiting.  Since the user no longer
    1075             :          * wants synchronous replication, we'd better wake them up.
    1076             :          */
    1077           0 :         if (!sync_standbys_defined)
    1078             :         {
    1079             :             int         i;
    1080             : 
    1081           0 :             for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
    1082           0 :                 SyncRepWakeQueue(true, i);
    1083             :         }
    1084             : 
    1085             :         /*
    1086             :          * Only allow people to join the queue when there are synchronous
    1087             :          * standbys defined.  Without this interlock, there's a race
    1088             :          * condition: we might wake up all the current waiters; then, some
    1089             :          * backend that hasn't yet reloaded its config might go to sleep on
    1090             :          * the queue (and never wake up).  This prevents that.
    1091             :          */
    1092           0 :         WalSndCtl->sync_standbys_defined = sync_standbys_defined;
    1093             : 
    1094           0 :         LWLockRelease(SyncRepLock);
    1095             :     }
    1096           1 : }
    1097             : 
    1098             : #ifdef USE_ASSERT_CHECKING
    1099             : static bool
    1100           0 : SyncRepQueueIsOrderedByLSN(int mode)
    1101             : {
    1102           0 :     PGPROC     *proc = NULL;
    1103             :     XLogRecPtr  lastLSN;
    1104             : 
    1105           0 :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
    1106             : 
    1107           0 :     lastLSN = 0;
    1108             : 
    1109           0 :     proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
    1110           0 :                                    &(WalSndCtl->SyncRepQueue[mode]),
    1111             :                                    offsetof(PGPROC, syncRepLinks));
    1112             : 
    1113           0 :     while (proc)
    1114             :     {
    1115             :         /*
    1116             :          * Check the queue is ordered by LSN and that multiple procs don't
    1117             :          * have matching LSNs
    1118             :          */
    1119           0 :         if (proc->waitLSN <= lastLSN)
    1120           0 :             return false;
    1121             : 
    1122           0 :         lastLSN = proc->waitLSN;
    1123             : 
    1124           0 :         proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
    1125           0 :                                        &(proc->syncRepLinks),
    1126             :                                        offsetof(PGPROC, syncRepLinks));
    1127             :     }
    1128             : 
    1129           0 :     return true;
    1130             : }
    1131             : #endif
    1132             : 
    1133             : /*
    1134             :  * ===========================================================
    1135             :  * Synchronous Replication functions executed by any process
    1136             :  * ===========================================================
    1137             :  */
    1138             : 
    1139             : bool
    1140           5 : check_synchronous_standby_names(char **newval, void **extra, GucSource source)
    1141             : {
    1142           5 :     if (*newval != NULL && (*newval)[0] != '\0')
    1143           0 :     {
    1144             :         int         parse_rc;
    1145             :         SyncRepConfigData *pconf;
    1146             : 
    1147             :         /* Reset communication variables to ensure a fresh start */
    1148           0 :         syncrep_parse_result = NULL;
    1149           0 :         syncrep_parse_error_msg = NULL;
    1150             : 
    1151             :         /* Parse the synchronous_standby_names string */
    1152           0 :         syncrep_scanner_init(*newval);
    1153           0 :         parse_rc = syncrep_yyparse();
    1154           0 :         syncrep_scanner_finish();
    1155             : 
    1156           0 :         if (parse_rc != 0 || syncrep_parse_result == NULL)
    1157             :         {
    1158           0 :             GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
    1159           0 :             if (syncrep_parse_error_msg)
    1160           0 :                 GUC_check_errdetail("%s", syncrep_parse_error_msg);
    1161             :             else
    1162           0 :                 GUC_check_errdetail("synchronous_standby_names parser failed");
    1163           0 :             return false;
    1164             :         }
    1165             : 
    1166           0 :         if (syncrep_parse_result->num_sync <= 0)
    1167             :         {
    1168           0 :             GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero",
    1169           0 :                              syncrep_parse_result->num_sync);
    1170           0 :             return false;
    1171             :         }
    1172             : 
    1173             :         /* GUC extra value must be malloc'd, not palloc'd */
    1174           0 :         pconf = (SyncRepConfigData *)
    1175           0 :             malloc(syncrep_parse_result->config_size);
    1176           0 :         if (pconf == NULL)
    1177           0 :             return false;
    1178           0 :         memcpy(pconf, syncrep_parse_result, syncrep_parse_result->config_size);
    1179             : 
    1180           0 :         *extra = (void *) pconf;
    1181             : 
    1182             :         /*
    1183             :          * We need not explicitly clean up syncrep_parse_result.  It, and any
    1184             :          * other cruft generated during parsing, will be freed when the
    1185             :          * current memory context is deleted.  (This code is generally run in
    1186             :          * a short-lived context used for config file processing, so that will
    1187             :          * not be very long.)
    1188             :          */
    1189             :     }
    1190             :     else
    1191           5 :         *extra = NULL;
    1192             : 
    1193           5 :     return true;
    1194             : }
    1195             : 
    1196             : void
    1197           5 : assign_synchronous_standby_names(const char *newval, void *extra)
    1198             : {
    1199           5 :     SyncRepConfig = (SyncRepConfigData *) extra;
    1200           5 : }
    1201             : 
    1202             : void
    1203           8 : assign_synchronous_commit(int newval, void *extra)
    1204             : {
    1205           8 :     switch (newval)
    1206             :     {
    1207             :         case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
    1208           0 :             SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
    1209           0 :             break;
    1210             :         case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
    1211           5 :             SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
    1212           5 :             break;
    1213             :         case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
    1214           0 :             SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
    1215           0 :             break;
    1216             :         default:
    1217           3 :             SyncRepWaitMode = SYNC_REP_NO_WAIT;
    1218           3 :             break;
    1219             :     }
    1220           8 : }

Generated by: LCOV version 1.11