LCOV - code coverage report
Current view: top level - src/backend/replication - walsender.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 43 1118 3.8 %
Date: 2017-09-29 15:12:54 Functions: 5 50 10.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * walsender.c
       4             :  *
       5             :  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
       6             :  * care of sending XLOG from the primary server to a single recipient.
       7             :  * (Note that there can be more than one walsender process concurrently.)
       8             :  * It is started by the postmaster when the walreceiver of a standby server
       9             :  * connects to the primary server and requests XLOG streaming replication.
      10             :  *
      11             :  * A walsender is similar to a regular backend, ie. there is a one-to-one
      12             :  * relationship between a connection and a walsender process, but instead
      13             :  * of processing SQL queries, it understands a small set of special
      14             :  * replication-mode commands. The START_REPLICATION command begins streaming
      15             :  * WAL to the client. While streaming, the walsender keeps reading XLOG
      16             :  * records from the disk and sends them to the standby server over the
      17             :  * COPY protocol, until either side ends the replication by exiting COPY
      18             :  * mode (or until the connection is closed).
      19             :  *
      20             :  * Normal termination is by SIGTERM, which instructs the walsender to
      21             :  * close the connection and exit(0) at the next convenient moment. Emergency
      22             :  * termination is by SIGQUIT; like any backend, the walsender will simply
      23             :  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
      24             :  * are treated as not a crash but approximately normal termination;
      25             :  * the walsender will exit quickly without sending any more XLOG records.
      26             :  *
      27             :  * If the server is shut down, checkpointer sends us
      28             :  * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited.  If
      29             :  * the backend is idle or runs an SQL query this causes the backend to
      30             :  * shutdown, if logical replication is in progress all existing WAL records
      31             :  * are processed followed by a shutdown.  Otherwise this causes the walsender
      32             :  * to switch to the "stopping" state. In this state, the walsender will reject
      33             :  * any further replication commands. The checkpointer begins the shutdown
      34             :  * checkpoint once all walsenders are confirmed as stopping. When the shutdown
      35             :  * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
      36             :  * walsender to send any outstanding WAL, including the shutdown checkpoint
      37             :  * record, wait for it to be replicated to the standby, and then exit.
      38             :  *
      39             :  *
      40             :  * Portions Copyright (c) 2010-2017, PostgreSQL Global Development Group
      41             :  *
      42             :  * IDENTIFICATION
      43             :  *    src/backend/replication/walsender.c
      44             :  *
      45             :  *-------------------------------------------------------------------------
      46             :  */
      47             : #include "postgres.h"
      48             : 
      49             : #include <signal.h>
      50             : #include <unistd.h>
      51             : 
      52             : #include "access/printtup.h"
      53             : #include "access/timeline.h"
      54             : #include "access/transam.h"
      55             : #include "access/xact.h"
      56             : #include "access/xlog_internal.h"
      57             : #include "access/xlogutils.h"
      58             : 
      59             : #include "catalog/pg_type.h"
      60             : #include "commands/dbcommands.h"
      61             : #include "commands/defrem.h"
      62             : #include "funcapi.h"
      63             : #include "libpq/libpq.h"
      64             : #include "libpq/pqformat.h"
      65             : #include "miscadmin.h"
      66             : #include "nodes/replnodes.h"
      67             : #include "pgstat.h"
      68             : #include "replication/basebackup.h"
      69             : #include "replication/decode.h"
      70             : #include "replication/logical.h"
      71             : #include "replication/logicalfuncs.h"
      72             : #include "replication/slot.h"
      73             : #include "replication/snapbuild.h"
      74             : #include "replication/syncrep.h"
      75             : #include "replication/walreceiver.h"
      76             : #include "replication/walsender.h"
      77             : #include "replication/walsender_private.h"
      78             : #include "storage/condition_variable.h"
      79             : #include "storage/fd.h"
      80             : #include "storage/ipc.h"
      81             : #include "storage/pmsignal.h"
      82             : #include "storage/proc.h"
      83             : #include "storage/procarray.h"
      84             : #include "tcop/dest.h"
      85             : #include "tcop/tcopprot.h"
      86             : #include "utils/builtins.h"
      87             : #include "utils/guc.h"
      88             : #include "utils/memutils.h"
      89             : #include "utils/pg_lsn.h"
      90             : #include "utils/portal.h"
      91             : #include "utils/ps_status.h"
      92             : #include "utils/resowner.h"
      93             : #include "utils/timeout.h"
      94             : #include "utils/timestamp.h"
      95             : 
      96             : /*
      97             :  * Maximum data payload in a WAL data message.  Must be >= XLOG_BLCKSZ.
      98             :  *
      99             :  * We don't have a good idea of what a good value would be; there's some
     100             :  * overhead per message in both walsender and walreceiver, but on the other
     101             :  * hand sending large batches makes walsender less responsive to signals
     102             :  * because signals are checked only between messages.  128kB (with
     103             :  * default 8k blocks) seems like a reasonable guess for now.
     104             :  */
     105             : #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
     106             : 
     107             : /* Array of WalSnds in shared memory */
     108             : WalSndCtlData *WalSndCtl = NULL;
     109             : 
     110             : /* My slot in the shared memory array */
     111             : WalSnd     *MyWalSnd = NULL;
     112             : 
     113             : /* Global state */
     114             : bool        am_walsender = false;   /* Am I a walsender process? */
     115             : bool        am_cascading_walsender = false; /* Am I cascading WAL to another
     116             :                                              * standby? */
     117             : bool        am_db_walsender = false;    /* Connected to a database? */
     118             : 
     119             : /* User-settable parameters for walsender */
     120             : int         max_wal_senders = 0;    /* the maximum number of concurrent
     121             :                                      * walsenders */
     122             : int         wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
     123             :                                              * data message */
     124             : bool        log_replication_commands = false;
     125             : 
     126             : /*
     127             :  * State for WalSndWakeupRequest
     128             :  */
     129             : bool        wake_wal_senders = false;
     130             : 
     131             : /*
     132             :  * These variables are used similarly to openLogFile/SegNo/Off,
     133             :  * but for walsender to read the XLOG.
     134             :  */
     135             : static int  sendFile = -1;
     136             : static XLogSegNo sendSegNo = 0;
     137             : static uint32 sendOff = 0;
     138             : 
     139             : /* Timeline ID of the currently open file */
     140             : static TimeLineID curFileTimeLine = 0;
     141             : 
     142             : /*
     143             :  * These variables keep track of the state of the timeline we're currently
     144             :  * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
     145             :  * the timeline is not the latest timeline on this server, and the server's
     146             :  * history forked off from that timeline at sendTimeLineValidUpto.
     147             :  */
     148             : static TimeLineID sendTimeLine = 0;
     149             : static TimeLineID sendTimeLineNextTLI = 0;
     150             : static bool sendTimeLineIsHistoric = false;
     151             : static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;
     152             : 
     153             : /*
     154             :  * How far have we sent WAL already? This is also advertised in
     155             :  * MyWalSnd->sentPtr.  (Actually, this is the next WAL location to send.)
     156             :  */
     157             : static XLogRecPtr sentPtr = 0;
     158             : 
     159             : /* Buffers for constructing outgoing messages and processing reply messages. */
     160             : static StringInfoData output_message;
     161             : static StringInfoData reply_message;
     162             : static StringInfoData tmpbuf;
     163             : 
     164             : /*
     165             :  * Timestamp of the last receipt of the reply from the standby. Set to 0 if
     166             :  * wal_sender_timeout doesn't need to be active.
     167             :  */
     168             : static TimestampTz last_reply_timestamp = 0;
     169             : 
     170             : /* Have we sent a heartbeat message asking for reply, since last reply? */
     171             : static bool waiting_for_ping_response = false;
     172             : 
     173             : /*
     174             :  * While streaming WAL in Copy mode, streamingDoneSending is set to true
     175             :  * after we have sent CopyDone. We should not send any more CopyData messages
     176             :  * after that. streamingDoneReceiving is set to true when we receive CopyDone
     177             :  * from the other end. When both become true, it's time to exit Copy mode.
     178             :  */
     179             : static bool streamingDoneSending;
     180             : static bool streamingDoneReceiving;
     181             : 
     182             : /* Are we there yet? */
     183             : static bool WalSndCaughtUp = false;
     184             : 
     185             : /* Flags set by signal handlers for later service in main loop */
     186             : static volatile sig_atomic_t got_SIGUSR2 = false;
     187             : static volatile sig_atomic_t got_STOPPING = false;
     188             : 
     189             : /*
     190             :  * This is set while we are streaming. When not set
     191             :  * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
     192             :  * the main loop is responsible for checking got_STOPPING and terminating when
     193             :  * it's set (after streaming any remaining WAL).
     194             :  */
     195             : static volatile sig_atomic_t replication_active = false;
     196             : 
     197             : static LogicalDecodingContext *logical_decoding_ctx = NULL;
     198             : static XLogRecPtr logical_startptr = InvalidXLogRecPtr;
     199             : 
     200             : /* A sample associating a WAL location with the time it was written. */
     201             : typedef struct
     202             : {
     203             :     XLogRecPtr  lsn;
     204             :     TimestampTz time;
     205             : } WalTimeSample;
     206             : 
     207             : /* The size of our buffer of time samples. */
     208             : #define LAG_TRACKER_BUFFER_SIZE 8192
     209             : 
     210             : /* A mechanism for tracking replication lag. */
     211             : static struct
     212             : {
     213             :     XLogRecPtr  last_lsn;
     214             :     WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE];
     215             :     int         write_head;
     216             :     int         read_heads[NUM_SYNC_REP_WAIT_MODE];
     217             :     WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE];
     218             : }           LagTracker;
     219             : 
     220             : /* Signal handlers */
     221             : static void WalSndLastCycleHandler(SIGNAL_ARGS);
     222             : 
     223             : /* Prototypes for private functions */
     224             : typedef void (*WalSndSendDataCallback) (void);
     225             : static void WalSndLoop(WalSndSendDataCallback send_data);
     226             : static void InitWalSenderSlot(void);
     227             : static void WalSndKill(int code, Datum arg);
     228             : static void WalSndShutdown(void) pg_attribute_noreturn();
     229             : static void XLogSendPhysical(void);
     230             : static void XLogSendLogical(void);
     231             : static void WalSndDone(WalSndSendDataCallback send_data);
     232             : static XLogRecPtr GetStandbyFlushRecPtr(void);
     233             : static void IdentifySystem(void);
     234             : static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
     235             : static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
     236             : static void StartReplication(StartReplicationCmd *cmd);
     237             : static void StartLogicalReplication(StartReplicationCmd *cmd);
     238             : static void ProcessStandbyMessage(void);
     239             : static void ProcessStandbyReplyMessage(void);
     240             : static void ProcessStandbyHSFeedbackMessage(void);
     241             : static void ProcessRepliesIfAny(void);
     242             : static void WalSndKeepalive(bool requestReply);
     243             : static void WalSndKeepaliveIfNecessary(TimestampTz now);
     244             : static void WalSndCheckTimeOut(TimestampTz now);
     245             : static long WalSndComputeSleeptime(TimestampTz now);
     246             : static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
     247             : static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
     248             : static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
     249             : static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
     250             : static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
     251             : static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
     252             : static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
     253             : 
     254             : static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
     255             : 
     256             : 
     257             : /* Initialize walsender process before entering the main command loop */
     258             : void
     259           0 : InitWalSender(void)
     260             : {
     261           0 :     am_cascading_walsender = RecoveryInProgress();
     262             : 
     263             :     /* Create a per-walsender data structure in shared memory */
     264           0 :     InitWalSenderSlot();
     265             : 
     266             :     /* Set up resource owner */
     267           0 :     CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
     268             : 
     269             :     /*
     270             :      * Let postmaster know that we're a WAL sender. Once we've declared us as
     271             :      * a WAL sender process, postmaster will let us outlive the bgwriter and
     272             :      * kill us last in the shutdown sequence, so we get a chance to stream all
     273             :      * remaining WAL at shutdown, including the shutdown checkpoint. Note that
     274             :      * there's no going back, and we mustn't write any WAL records after this.
     275             :      */
     276           0 :     MarkPostmasterChildWalSender();
     277           0 :     SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
     278             : 
     279             :     /* Initialize empty timestamp buffer for lag tracking. */
     280           0 :     memset(&LagTracker, 0, sizeof(LagTracker));
     281           0 : }
     282             : 
     283             : /*
     284             :  * Clean up after an error.
     285             :  *
     286             :  * WAL sender processes don't use transactions like regular backends do.
     287             :  * This function does any cleanup required after an error in a WAL sender
     288             :  * process, similar to what transaction abort does in a regular backend.
     289             :  */
     290             : void
     291           0 : WalSndErrorCleanup(void)
     292             : {
     293           0 :     LWLockReleaseAll();
     294           0 :     ConditionVariableCancelSleep();
     295           0 :     pgstat_report_wait_end();
     296             : 
     297           0 :     if (sendFile >= 0)
     298             :     {
     299           0 :         close(sendFile);
     300           0 :         sendFile = -1;
     301             :     }
     302             : 
     303           0 :     if (MyReplicationSlot != NULL)
     304           0 :         ReplicationSlotRelease();
     305             : 
     306           0 :     ReplicationSlotCleanup();
     307             : 
     308           0 :     replication_active = false;
     309             : 
     310           0 :     if (got_STOPPING || got_SIGUSR2)
     311           0 :         proc_exit(0);
     312             : 
     313             :     /* Revert back to startup state */
     314           0 :     WalSndSetState(WALSNDSTATE_STARTUP);
     315           0 : }
     316             : 
     317             : /*
     318             :  * Handle a client's connection abort in an orderly manner.
     319             :  */
     320             : static void
     321           0 : WalSndShutdown(void)
     322             : {
     323             :     /*
     324             :      * Reset whereToSendOutput to prevent ereport from attempting to send any
     325             :      * more messages to the standby.
     326             :      */
     327           0 :     if (whereToSendOutput == DestRemote)
     328           0 :         whereToSendOutput = DestNone;
     329             : 
     330           0 :     proc_exit(0);
     331             :     abort();                    /* keep the compiler quiet */
     332             : }
     333             : 
     334             : /*
     335             :  * Handle the IDENTIFY_SYSTEM command.
     336             :  */
     337             : static void
     338           0 : IdentifySystem(void)
     339             : {
     340             :     char        sysid[32];
     341             :     char        xloc[MAXFNAMELEN];
     342             :     XLogRecPtr  logptr;
     343           0 :     char       *dbname = NULL;
     344             :     DestReceiver *dest;
     345             :     TupOutputState *tstate;
     346             :     TupleDesc   tupdesc;
     347             :     Datum       values[4];
     348             :     bool        nulls[4];
     349             : 
     350             :     /*
     351             :      * Reply with a result set with one row, four columns. First col is system
     352             :      * ID, second is timeline ID, third is current xlog location and the
     353             :      * fourth contains the database name if we are connected to one.
     354             :      */
     355             : 
     356           0 :     snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
     357             :              GetSystemIdentifier());
     358             : 
     359           0 :     am_cascading_walsender = RecoveryInProgress();
     360           0 :     if (am_cascading_walsender)
     361             :     {
     362             :         /* this also updates ThisTimeLineID */
     363           0 :         logptr = GetStandbyFlushRecPtr();
     364             :     }
     365             :     else
     366           0 :         logptr = GetFlushRecPtr();
     367             : 
     368           0 :     snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
     369             : 
     370           0 :     if (MyDatabaseId != InvalidOid)
     371             :     {
     372           0 :         MemoryContext cur = CurrentMemoryContext;
     373             : 
     374             :         /* syscache access needs a transaction env. */
     375           0 :         StartTransactionCommand();
     376             :         /* make dbname live outside TX context */
     377           0 :         MemoryContextSwitchTo(cur);
     378           0 :         dbname = get_database_name(MyDatabaseId);
     379           0 :         CommitTransactionCommand();
     380             :         /* CommitTransactionCommand switches to TopMemoryContext */
     381           0 :         MemoryContextSwitchTo(cur);
     382             :     }
     383             : 
     384           0 :     dest = CreateDestReceiver(DestRemoteSimple);
     385           0 :     MemSet(nulls, false, sizeof(nulls));
     386             : 
     387             :     /* need a tuple descriptor representing four columns */
     388           0 :     tupdesc = CreateTemplateTupleDesc(4, false);
     389           0 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
     390             :                               TEXTOID, -1, 0);
     391           0 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
     392             :                               INT4OID, -1, 0);
     393           0 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
     394             :                               TEXTOID, -1, 0);
     395           0 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
     396             :                               TEXTOID, -1, 0);
     397             : 
     398             :     /* prepare for projection of tuples */
     399           0 :     tstate = begin_tup_output_tupdesc(dest, tupdesc);
     400             : 
     401             :     /* column 1: system identifier */
     402           0 :     values[0] = CStringGetTextDatum(sysid);
     403             : 
     404             :     /* column 2: timeline */
     405           0 :     values[1] = Int32GetDatum(ThisTimeLineID);
     406             : 
     407             :     /* column 3: wal location */
     408           0 :     values[2] = CStringGetTextDatum(xloc);
     409             : 
     410             :     /* column 4: database name, or NULL if none */
     411           0 :     if (dbname)
     412           0 :         values[3] = CStringGetTextDatum(dbname);
     413             :     else
     414           0 :         nulls[3] = true;
     415             : 
     416             :     /* send it to dest */
     417           0 :     do_tup_output(tstate, values, nulls);
     418             : 
     419           0 :     end_tup_output(tstate);
     420           0 : }
     421             : 
     422             : 
     423             : /*
     424             :  * Handle TIMELINE_HISTORY command.
     425             :  */
     426             : static void
     427           0 : SendTimeLineHistory(TimeLineHistoryCmd *cmd)
     428             : {
     429             :     StringInfoData buf;
     430             :     char        histfname[MAXFNAMELEN];
     431             :     char        path[MAXPGPATH];
     432             :     int         fd;
     433             :     off_t       histfilelen;
     434             :     off_t       bytesleft;
     435             :     Size        len;
     436             : 
     437             :     /*
     438             :      * Reply with a result set with one row, and two columns. The first col is
     439             :      * the name of the history file, 2nd is the contents.
     440             :      */
     441             : 
     442           0 :     TLHistoryFileName(histfname, cmd->timeline);
     443           0 :     TLHistoryFilePath(path, cmd->timeline);
     444             : 
     445             :     /* Send a RowDescription message */
     446           0 :     pq_beginmessage(&buf, 'T');
     447           0 :     pq_sendint(&buf, 2, 2);     /* 2 fields */
     448             : 
     449             :     /* first field */
     450           0 :     pq_sendstring(&buf, "filename");  /* col name */
     451           0 :     pq_sendint(&buf, 0, 4);     /* table oid */
     452           0 :     pq_sendint(&buf, 0, 2);     /* attnum */
     453           0 :     pq_sendint(&buf, TEXTOID, 4);   /* type oid */
     454           0 :     pq_sendint(&buf, -1, 2);    /* typlen */
     455           0 :     pq_sendint(&buf, 0, 4);     /* typmod */
     456           0 :     pq_sendint(&buf, 0, 2);     /* format code */
     457             : 
     458             :     /* second field */
     459           0 :     pq_sendstring(&buf, "content"); /* col name */
     460           0 :     pq_sendint(&buf, 0, 4);     /* table oid */
     461           0 :     pq_sendint(&buf, 0, 2);     /* attnum */
     462           0 :     pq_sendint(&buf, BYTEAOID, 4);  /* type oid */
     463           0 :     pq_sendint(&buf, -1, 2);    /* typlen */
     464           0 :     pq_sendint(&buf, 0, 4);     /* typmod */
     465           0 :     pq_sendint(&buf, 0, 2);     /* format code */
     466           0 :     pq_endmessage(&buf);
     467             : 
     468             :     /* Send a DataRow message */
     469           0 :     pq_beginmessage(&buf, 'D');
     470           0 :     pq_sendint(&buf, 2, 2);     /* # of columns */
     471           0 :     len = strlen(histfname);
     472           0 :     pq_sendint(&buf, len, 4);   /* col1 len */
     473           0 :     pq_sendbytes(&buf, histfname, len);
     474             : 
     475           0 :     fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666);
     476           0 :     if (fd < 0)
     477           0 :         ereport(ERROR,
     478             :                 (errcode_for_file_access(),
     479             :                  errmsg("could not open file \"%s\": %m", path)));
     480             : 
     481             :     /* Determine file length and send it to client */
     482           0 :     histfilelen = lseek(fd, 0, SEEK_END);
     483           0 :     if (histfilelen < 0)
     484           0 :         ereport(ERROR,
     485             :                 (errcode_for_file_access(),
     486             :                  errmsg("could not seek to end of file \"%s\": %m", path)));
     487           0 :     if (lseek(fd, 0, SEEK_SET) != 0)
     488           0 :         ereport(ERROR,
     489             :                 (errcode_for_file_access(),
     490             :                  errmsg("could not seek to beginning of file \"%s\": %m", path)));
     491             : 
     492           0 :     pq_sendint(&buf, histfilelen, 4);   /* col2 len */
     493             : 
     494           0 :     bytesleft = histfilelen;
     495           0 :     while (bytesleft > 0)
     496             :     {
     497             :         char        rbuf[BLCKSZ];
     498             :         int         nread;
     499             : 
     500           0 :         pgstat_report_wait_start(WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ);
     501           0 :         nread = read(fd, rbuf, sizeof(rbuf));
     502           0 :         pgstat_report_wait_end();
     503           0 :         if (nread <= 0)
     504           0 :             ereport(ERROR,
     505             :                     (errcode_for_file_access(),
     506             :                      errmsg("could not read file \"%s\": %m",
     507             :                             path)));
     508           0 :         pq_sendbytes(&buf, rbuf, nread);
     509           0 :         bytesleft -= nread;
     510             :     }
     511           0 :     CloseTransientFile(fd);
     512             : 
     513           0 :     pq_endmessage(&buf);
     514           0 : }
     515             : 
     516             : /*
     517             :  * Handle START_REPLICATION command.
     518             :  *
     519             :  * At the moment, this never returns, but an ereport(ERROR) will take us back
     520             :  * to the main loop.
     521             :  */
     522             : static void
     523           0 : StartReplication(StartReplicationCmd *cmd)
     524             : {
     525             :     StringInfoData buf;
     526             :     XLogRecPtr  FlushPtr;
     527             : 
     528           0 :     if (ThisTimeLineID == 0)
     529           0 :         ereport(ERROR,
     530             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     531             :                  errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
     532             : 
     533             :     /*
     534             :      * We assume here that we're logging enough information in the WAL for
     535             :      * log-shipping, since this is checked in PostmasterMain().
     536             :      *
     537             :      * NOTE: wal_level can only change at shutdown, so in most cases it is
     538             :      * difficult for there to be WAL data that we can still see that was
     539             :      * written at wal_level='minimal'.
     540             :      */
     541             : 
     542           0 :     if (cmd->slotname)
     543             :     {
     544           0 :         ReplicationSlotAcquire(cmd->slotname, true);
     545           0 :         if (SlotIsLogical(MyReplicationSlot))
     546           0 :             ereport(ERROR,
     547             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     548             :                      (errmsg("cannot use a logical replication slot for physical replication"))));
     549             :     }
     550             : 
     551             :     /*
     552             :      * Select the timeline. If it was given explicitly by the client, use
     553             :      * that. Otherwise use the timeline of the last replayed record, which is
     554             :      * kept in ThisTimeLineID.
     555             :      */
     556           0 :     if (am_cascading_walsender)
     557             :     {
     558             :         /* this also updates ThisTimeLineID */
     559           0 :         FlushPtr = GetStandbyFlushRecPtr();
     560             :     }
     561             :     else
     562           0 :         FlushPtr = GetFlushRecPtr();
     563             : 
     564           0 :     if (cmd->timeline != 0)
     565             :     {
     566             :         XLogRecPtr  switchpoint;
     567             : 
     568           0 :         sendTimeLine = cmd->timeline;
     569           0 :         if (sendTimeLine == ThisTimeLineID)
     570             :         {
     571           0 :             sendTimeLineIsHistoric = false;
     572           0 :             sendTimeLineValidUpto = InvalidXLogRecPtr;
     573             :         }
     574             :         else
     575             :         {
     576             :             List       *timeLineHistory;
     577             : 
     578           0 :             sendTimeLineIsHistoric = true;
     579             : 
     580             :             /*
     581             :              * Check that the timeline the client requested exists, and the
     582             :              * requested start location is on that timeline.
     583             :              */
     584           0 :             timeLineHistory = readTimeLineHistory(ThisTimeLineID);
     585           0 :             switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
     586             :                                          &sendTimeLineNextTLI);
     587           0 :             list_free_deep(timeLineHistory);
     588             : 
     589             :             /*
     590             :              * Found the requested timeline in the history. Check that
     591             :              * requested startpoint is on that timeline in our history.
     592             :              *
     593             :              * This is quite loose on purpose. We only check that we didn't
     594             :              * fork off the requested timeline before the switchpoint. We
     595             :              * don't check that we switched *to* it before the requested
     596             :              * starting point. This is because the client can legitimately
     597             :              * request to start replication from the beginning of the WAL
     598             :              * segment that contains switchpoint, but on the new timeline, so
     599             :              * that it doesn't end up with a partial segment. If you ask for
     600             :              * too old a starting point, you'll get an error later when we
     601             :              * fail to find the requested WAL segment in pg_wal.
     602             :              *
     603             :              * XXX: we could be more strict here and only allow a startpoint
     604             :              * that's older than the switchpoint, if it's still in the same
     605             :              * WAL segment.
     606             :              */
     607           0 :             if (!XLogRecPtrIsInvalid(switchpoint) &&
     608           0 :                 switchpoint < cmd->startpoint)
     609             :             {
     610           0 :                 ereport(ERROR,
     611             :                         (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
     612             :                                 (uint32) (cmd->startpoint >> 32),
     613             :                                 (uint32) (cmd->startpoint),
     614             :                                 cmd->timeline),
     615             :                          errdetail("This server's history forked from timeline %u at %X/%X.",
     616             :                                    cmd->timeline,
     617             :                                    (uint32) (switchpoint >> 32),
     618             :                                    (uint32) (switchpoint))));
     619             :             }
     620           0 :             sendTimeLineValidUpto = switchpoint;
     621             :         }
     622             :     }
     623             :     else
     624             :     {
     625           0 :         sendTimeLine = ThisTimeLineID;
     626           0 :         sendTimeLineValidUpto = InvalidXLogRecPtr;
     627           0 :         sendTimeLineIsHistoric = false;
     628             :     }
     629             : 
     630           0 :     streamingDoneSending = streamingDoneReceiving = false;
     631             : 
     632             :     /* If there is nothing to stream, don't even enter COPY mode */
     633           0 :     if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
     634             :     {
     635             :         /*
     636             :          * When we first start replication the standby will be behind the
     637             :          * primary. For some applications, for example synchronous
     638             :          * replication, it is important to have a clear state for this initial
     639             :          * catchup mode, so we can trigger actions when we change streaming
     640             :          * state later. We may stay in this state for a long time, which is
     641             :          * exactly why we want to be able to monitor whether or not we are
     642             :          * still here.
     643             :          */
     644           0 :         WalSndSetState(WALSNDSTATE_CATCHUP);
     645             : 
     646             :         /* Send a CopyBothResponse message, and start streaming */
     647           0 :         pq_beginmessage(&buf, 'W');
     648           0 :         pq_sendbyte(&buf, 0);
     649           0 :         pq_sendint(&buf, 0, 2);
     650           0 :         pq_endmessage(&buf);
     651           0 :         pq_flush();
     652             : 
     653             :         /*
     654             :          * Don't allow a request to stream from a future point in WAL that
     655             :          * hasn't been flushed to disk in this server yet.
     656             :          */
     657           0 :         if (FlushPtr < cmd->startpoint)
     658             :         {
     659           0 :             ereport(ERROR,
     660             :                     (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
     661             :                             (uint32) (cmd->startpoint >> 32),
     662             :                             (uint32) (cmd->startpoint),
     663             :                             (uint32) (FlushPtr >> 32),
     664             :                             (uint32) (FlushPtr))));
     665             :         }
     666             : 
     667             :         /* Start streaming from the requested point */
     668           0 :         sentPtr = cmd->startpoint;
     669             : 
     670             :         /* Initialize shared memory status, too */
     671           0 :         SpinLockAcquire(&MyWalSnd->mutex);
     672           0 :         MyWalSnd->sentPtr = sentPtr;
     673           0 :         SpinLockRelease(&MyWalSnd->mutex);
     674             : 
     675           0 :         SyncRepInitConfig();
     676             : 
     677             :         /* Main loop of walsender */
     678           0 :         replication_active = true;
     679             : 
     680           0 :         WalSndLoop(XLogSendPhysical);
     681             : 
     682           0 :         replication_active = false;
     683           0 :         if (got_STOPPING)
     684           0 :             proc_exit(0);
     685           0 :         WalSndSetState(WALSNDSTATE_STARTUP);
     686             : 
     687           0 :         Assert(streamingDoneSending && streamingDoneReceiving);
     688             :     }
     689             : 
     690           0 :     if (cmd->slotname)
     691           0 :         ReplicationSlotRelease();
     692             : 
     693             :     /*
     694             :      * Copy is finished now. Send a single-row result set indicating the next
     695             :      * timeline.
     696             :      */
     697           0 :     if (sendTimeLineIsHistoric)
     698             :     {
     699             :         char        startpos_str[8 + 1 + 8 + 1];
     700             :         DestReceiver *dest;
     701             :         TupOutputState *tstate;
     702             :         TupleDesc   tupdesc;
     703             :         Datum       values[2];
     704             :         bool        nulls[2];
     705             : 
     706           0 :         snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
     707           0 :                  (uint32) (sendTimeLineValidUpto >> 32),
     708             :                  (uint32) sendTimeLineValidUpto);
     709             : 
     710           0 :         dest = CreateDestReceiver(DestRemoteSimple);
     711           0 :         MemSet(nulls, false, sizeof(nulls));
     712             : 
     713             :         /*
     714             :          * Need a tuple descriptor representing two columns. int8 may seem
     715             :          * like a surprising data type for this, but in theory int4 would not
     716             :          * be wide enough for this, as TimeLineID is unsigned.
     717             :          */
     718           0 :         tupdesc = CreateTemplateTupleDesc(2, false);
     719           0 :         TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
     720             :                                   INT8OID, -1, 0);
     721           0 :         TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
     722             :                                   TEXTOID, -1, 0);
     723             : 
     724             :         /* prepare for projection of tuple */
     725           0 :         tstate = begin_tup_output_tupdesc(dest, tupdesc);
     726             : 
     727           0 :         values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
     728           0 :         values[1] = CStringGetTextDatum(startpos_str);
     729             : 
     730             :         /* send it to dest */
     731           0 :         do_tup_output(tstate, values, nulls);
     732             : 
     733           0 :         end_tup_output(tstate);
     734             :     }
     735             : 
     736             :     /* Send CommandComplete message */
     737           0 :     pq_puttextmessage('C', "START_STREAMING");
     738           0 : }
     739             : 
     740             : /*
     741             :  * read_page callback for logical decoding contexts, as a walsender process.
     742             :  *
     743             :  * Inside the walsender we can do better than logical_read_local_xlog_page,
     744             :  * which has to do a plain sleep/busy loop, because the walsender's latch gets
     745             :  * set every time WAL is flushed.
     746             :  */
     747             : static int
     748           0 : logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
     749             :                        XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
     750             : {
     751             :     XLogRecPtr  flushptr;
     752             :     int         count;
     753             : 
     754           0 :     XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
     755           0 :     sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID);
     756           0 :     sendTimeLine = state->currTLI;
     757           0 :     sendTimeLineValidUpto = state->currTLIValidUntil;
     758           0 :     sendTimeLineNextTLI = state->nextTLI;
     759             : 
     760             :     /* make sure we have enough WAL available */
     761           0 :     flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
     762             : 
     763             :     /* fail if not (implies we are going to shut down) */
     764           0 :     if (flushptr < targetPagePtr + reqLen)
     765           0 :         return -1;
     766             : 
     767           0 :     if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
     768           0 :         count = XLOG_BLCKSZ;    /* more than one block available */
     769             :     else
     770           0 :         count = flushptr - targetPagePtr;   /* part of the page available */
     771             : 
     772             :     /* now actually read the data, we know it's there */
     773           0 :     XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
     774             : 
     775           0 :     return count;
     776             : }
     777             : 
     778             : /*
     779             :  * Process extra options given to CREATE_REPLICATION_SLOT.
     780             :  */
     781             : static void
     782           0 : parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
     783             :                            bool *reserve_wal,
     784             :                            CRSSnapshotAction *snapshot_action)
     785             : {
     786             :     ListCell   *lc;
     787           0 :     bool        snapshot_action_given = false;
     788           0 :     bool        reserve_wal_given = false;
     789             : 
     790             :     /* Parse options */
     791           0 :     foreach(lc, cmd->options)
     792             :     {
     793           0 :         DefElem    *defel = (DefElem *) lfirst(lc);
     794             : 
     795           0 :         if (strcmp(defel->defname, "export_snapshot") == 0)
     796             :         {
     797           0 :             if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
     798           0 :                 ereport(ERROR,
     799             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     800             :                          errmsg("conflicting or redundant options")));
     801             : 
     802           0 :             snapshot_action_given = true;
     803           0 :             *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
     804             :                 CRS_NOEXPORT_SNAPSHOT;
     805             :         }
     806           0 :         else if (strcmp(defel->defname, "use_snapshot") == 0)
     807             :         {
     808           0 :             if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
     809           0 :                 ereport(ERROR,
     810             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     811             :                          errmsg("conflicting or redundant options")));
     812             : 
     813           0 :             snapshot_action_given = true;
     814           0 :             *snapshot_action = CRS_USE_SNAPSHOT;
     815             :         }
     816           0 :         else if (strcmp(defel->defname, "reserve_wal") == 0)
     817             :         {
     818           0 :             if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
     819           0 :                 ereport(ERROR,
     820             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     821             :                          errmsg("conflicting or redundant options")));
     822             : 
     823           0 :             reserve_wal_given = true;
     824           0 :             *reserve_wal = true;
     825             :         }
     826             :         else
     827           0 :             elog(ERROR, "unrecognized option: %s", defel->defname);
     828             :     }
     829           0 : }
     830             : 
     831             : /*
     832             :  * Create a new replication slot.
     833             :  */
     834             : static void
     835           0 : CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
     836             : {
     837           0 :     const char *snapshot_name = NULL;
     838             :     char        xloc[MAXFNAMELEN];
     839             :     char       *slot_name;
     840           0 :     bool        reserve_wal = false;
     841           0 :     CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
     842             :     DestReceiver *dest;
     843             :     TupOutputState *tstate;
     844             :     TupleDesc   tupdesc;
     845             :     Datum       values[4];
     846             :     bool        nulls[4];
     847             : 
     848           0 :     Assert(!MyReplicationSlot);
     849             : 
     850           0 :     parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
     851             : 
     852             :     /* setup state for XLogReadPage */
     853           0 :     sendTimeLineIsHistoric = false;
     854           0 :     sendTimeLine = ThisTimeLineID;
     855             : 
     856           0 :     if (cmd->kind == REPLICATION_KIND_PHYSICAL)
     857             :     {
     858           0 :         ReplicationSlotCreate(cmd->slotname, false,
     859           0 :                               cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT);
     860             :     }
     861             :     else
     862             :     {
     863           0 :         CheckLogicalDecodingRequirements();
     864             : 
     865             :         /*
     866             :          * Initially create persistent slot as ephemeral - that allows us to
     867             :          * nicely handle errors during initialization because it'll get
     868             :          * dropped if this transaction fails. We'll make it persistent at the
     869             :          * end. Temporary slots can be created as temporary from beginning as
     870             :          * they get dropped on error as well.
     871             :          */
     872           0 :         ReplicationSlotCreate(cmd->slotname, true,
     873           0 :                               cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL);
     874             :     }
     875             : 
     876           0 :     if (cmd->kind == REPLICATION_KIND_LOGICAL)
     877             :     {
     878             :         LogicalDecodingContext *ctx;
     879           0 :         bool        need_full_snapshot = false;
     880             : 
     881             :         /*
     882             :          * Do options check early so that we can bail before calling the
     883             :          * DecodingContextFindStartpoint which can take long time.
     884             :          */
     885           0 :         if (snapshot_action == CRS_EXPORT_SNAPSHOT)
     886             :         {
     887           0 :             if (IsTransactionBlock())
     888           0 :                 ereport(ERROR,
     889             :                         (errmsg("CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT "
     890             :                                 "must not be called inside a transaction")));
     891             : 
     892           0 :             need_full_snapshot = true;
     893             :         }
     894           0 :         else if (snapshot_action == CRS_USE_SNAPSHOT)
     895             :         {
     896           0 :             if (!IsTransactionBlock())
     897           0 :                 ereport(ERROR,
     898             :                         (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
     899             :                                 "must be called inside a transaction")));
     900             : 
     901           0 :             if (XactIsoLevel != XACT_REPEATABLE_READ)
     902           0 :                 ereport(ERROR,
     903             :                         (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
     904             :                                 "must be called in REPEATABLE READ isolation mode transaction")));
     905             : 
     906           0 :             if (FirstSnapshotSet)
     907           0 :                 ereport(ERROR,
     908             :                         (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
     909             :                                 "must be called before any query")));
     910             : 
     911           0 :             if (IsSubTransaction())
     912           0 :                 ereport(ERROR,
     913             :                         (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
     914             :                                 "must not be called in a subtransaction")));
     915             : 
     916           0 :             need_full_snapshot = true;
     917             :         }
     918             : 
     919           0 :         ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
     920             :                                         logical_read_xlog_page,
     921             :                                         WalSndPrepareWrite, WalSndWriteData,
     922             :                                         WalSndUpdateProgress);
     923             : 
     924             :         /*
     925             :          * Signal that we don't need the timeout mechanism. We're just
     926             :          * creating the replication slot and don't yet accept feedback
     927             :          * messages or send keepalives. As we possibly need to wait for
     928             :          * further WAL the walsender would otherwise possibly be killed too
     929             :          * soon.
     930             :          */
     931           0 :         last_reply_timestamp = 0;
     932             : 
     933             :         /* build initial snapshot, might take a while */
     934           0 :         DecodingContextFindStartpoint(ctx);
     935             : 
     936             :         /*
     937             :          * Export or use the snapshot if we've been asked to do so.
     938             :          *
     939             :          * NB. We will convert the snapbuild.c kind of snapshot to normal
     940             :          * snapshot when doing this.
     941             :          */
     942           0 :         if (snapshot_action == CRS_EXPORT_SNAPSHOT)
     943             :         {
     944           0 :             snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
     945             :         }
     946           0 :         else if (snapshot_action == CRS_USE_SNAPSHOT)
     947             :         {
     948             :             Snapshot    snap;
     949             : 
     950           0 :             snap = SnapBuildInitialSnapshot(ctx->snapshot_builder);
     951           0 :             RestoreTransactionSnapshot(snap, MyProc);
     952             :         }
     953             : 
     954             :         /* don't need the decoding context anymore */
     955           0 :         FreeDecodingContext(ctx);
     956             : 
     957           0 :         if (!cmd->temporary)
     958           0 :             ReplicationSlotPersist();
     959             :     }
     960           0 :     else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
     961             :     {
     962           0 :         ReplicationSlotReserveWal();
     963             : 
     964           0 :         ReplicationSlotMarkDirty();
     965             : 
     966             :         /* Write this slot to disk if it's a permanent one. */
     967           0 :         if (!cmd->temporary)
     968           0 :             ReplicationSlotSave();
     969             :     }
     970             : 
     971           0 :     snprintf(xloc, sizeof(xloc), "%X/%X",
     972           0 :              (uint32) (MyReplicationSlot->data.confirmed_flush >> 32),
     973           0 :              (uint32) MyReplicationSlot->data.confirmed_flush);
     974             : 
     975           0 :     dest = CreateDestReceiver(DestRemoteSimple);
     976           0 :     MemSet(nulls, false, sizeof(nulls));
     977             : 
     978             :     /*----------
     979             :      * Need a tuple descriptor representing four columns:
     980             :      * - first field: the slot name
     981             :      * - second field: LSN at which we became consistent
     982             :      * - third field: exported snapshot's name
     983             :      * - fourth field: output plugin
     984             :      *----------
     985             :      */
     986           0 :     tupdesc = CreateTemplateTupleDesc(4, false);
     987           0 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
     988             :                               TEXTOID, -1, 0);
     989           0 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
     990             :                               TEXTOID, -1, 0);
     991           0 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
     992             :                               TEXTOID, -1, 0);
     993           0 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
     994             :                               TEXTOID, -1, 0);
     995             : 
     996             :     /* prepare for projection of tuples */
     997           0 :     tstate = begin_tup_output_tupdesc(dest, tupdesc);
     998             : 
     999             :     /* slot_name */
    1000           0 :     slot_name = NameStr(MyReplicationSlot->data.name);
    1001           0 :     values[0] = CStringGetTextDatum(slot_name);
    1002             : 
    1003             :     /* consistent wal location */
    1004           0 :     values[1] = CStringGetTextDatum(xloc);
    1005             : 
    1006             :     /* snapshot name, or NULL if none */
    1007           0 :     if (snapshot_name != NULL)
    1008           0 :         values[2] = CStringGetTextDatum(snapshot_name);
    1009             :     else
    1010           0 :         nulls[2] = true;
    1011             : 
    1012             :     /* plugin, or NULL if none */
    1013           0 :     if (cmd->plugin != NULL)
    1014           0 :         values[3] = CStringGetTextDatum(cmd->plugin);
    1015             :     else
    1016           0 :         nulls[3] = true;
    1017             : 
    1018             :     /* send it to dest */
    1019           0 :     do_tup_output(tstate, values, nulls);
    1020           0 :     end_tup_output(tstate);
    1021             : 
    1022           0 :     ReplicationSlotRelease();
    1023           0 : }
    1024             : 
    1025             : /*
    1026             :  * Get rid of a replication slot that is no longer wanted.
    1027             :  */
    1028             : static void
    1029           0 : DropReplicationSlot(DropReplicationSlotCmd *cmd)
    1030             : {
    1031           0 :     ReplicationSlotDrop(cmd->slotname, !cmd->wait);
    1032           0 :     EndCommand("DROP_REPLICATION_SLOT", DestRemote);
    1033           0 : }
    1034             : 
    1035             : /*
    1036             :  * Load previously initiated logical slot and prepare for sending data (via
    1037             :  * WalSndLoop).
    1038             :  */
    1039             : static void
    1040           0 : StartLogicalReplication(StartReplicationCmd *cmd)
    1041             : {
    1042             :     StringInfoData buf;
    1043             : 
    1044             :     /* make sure that our requirements are still fulfilled */
    1045           0 :     CheckLogicalDecodingRequirements();
    1046             : 
    1047           0 :     Assert(!MyReplicationSlot);
    1048             : 
    1049           0 :     ReplicationSlotAcquire(cmd->slotname, true);
    1050             : 
    1051             :     /*
    1052             :      * Force a disconnect, so that the decoding code doesn't need to care
    1053             :      * about an eventual switch from running in recovery, to running in a
    1054             :      * normal environment. Client code is expected to handle reconnects.
    1055             :      */
    1056           0 :     if (am_cascading_walsender && !RecoveryInProgress())
    1057             :     {
    1058           0 :         ereport(LOG,
    1059             :                 (errmsg("terminating walsender process after promotion")));
    1060           0 :         got_STOPPING = true;
    1061             :     }
    1062             : 
    1063           0 :     WalSndSetState(WALSNDSTATE_CATCHUP);
    1064             : 
    1065             :     /* Send a CopyBothResponse message, and start streaming */
    1066           0 :     pq_beginmessage(&buf, 'W');
    1067           0 :     pq_sendbyte(&buf, 0);
    1068           0 :     pq_sendint(&buf, 0, 2);
    1069           0 :     pq_endmessage(&buf);
    1070           0 :     pq_flush();
    1071             : 
    1072             :     /*
    1073             :      * Initialize position to the last ack'ed one, then the xlog records begin
    1074             :      * to be shipped from that position.
    1075             :      */
    1076           0 :     logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options,
    1077             :                                                  logical_read_xlog_page,
    1078             :                                                  WalSndPrepareWrite,
    1079             :                                                  WalSndWriteData,
    1080             :                                                  WalSndUpdateProgress);
    1081             : 
    1082             :     /* Start reading WAL from the oldest required WAL. */
    1083           0 :     logical_startptr = MyReplicationSlot->data.restart_lsn;
    1084             : 
    1085             :     /*
    1086             :      * Report the location after which we'll send out further commits as the
    1087             :      * current sentPtr.
    1088             :      */
    1089           0 :     sentPtr = MyReplicationSlot->data.confirmed_flush;
    1090             : 
    1091             :     /* Also update the sent position status in shared memory */
    1092           0 :     SpinLockAcquire(&MyWalSnd->mutex);
    1093           0 :     MyWalSnd->sentPtr = MyReplicationSlot->data.restart_lsn;
    1094           0 :     SpinLockRelease(&MyWalSnd->mutex);
    1095             : 
    1096           0 :     replication_active = true;
    1097             : 
    1098           0 :     SyncRepInitConfig();
    1099             : 
    1100             :     /* Main loop of walsender */
    1101           0 :     WalSndLoop(XLogSendLogical);
    1102             : 
    1103           0 :     FreeDecodingContext(logical_decoding_ctx);
    1104           0 :     ReplicationSlotRelease();
    1105             : 
    1106           0 :     replication_active = false;
    1107           0 :     if (got_STOPPING)
    1108           0 :         proc_exit(0);
    1109           0 :     WalSndSetState(WALSNDSTATE_STARTUP);
    1110             : 
    1111             :     /* Get out of COPY mode (CommandComplete). */
    1112           0 :     EndCommand("COPY 0", DestRemote);
    1113           0 : }
    1114             : 
    1115             : /*
    1116             :  * LogicalDecodingContext 'prepare_write' callback.
    1117             :  *
    1118             :  * Prepare a write into a StringInfo.
    1119             :  *
    1120             :  * Don't do anything lasting in here, it's quite possible that nothing will be done
    1121             :  * with the data.
    1122             :  */
    1123             : static void
    1124           0 : WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
    1125             : {
    1126             :     /* can't have sync rep confused by sending the same LSN several times */
    1127           0 :     if (!last_write)
    1128           0 :         lsn = InvalidXLogRecPtr;
    1129             : 
    1130           0 :     resetStringInfo(ctx->out);
    1131             : 
    1132           0 :     pq_sendbyte(ctx->out, 'w');
    1133           0 :     pq_sendint64(ctx->out, lsn); /* dataStart */
    1134           0 :     pq_sendint64(ctx->out, lsn); /* walEnd */
    1135             : 
    1136             :     /*
    1137             :      * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
    1138             :      * reserve space here.
    1139             :      */
    1140           0 :     pq_sendint64(ctx->out, 0);   /* sendtime */
    1141           0 : }
    1142             : 
    1143             : /*
    1144             :  * LogicalDecodingContext 'write' callback.
    1145             :  *
    1146             :  * Actually write out data previously prepared by WalSndPrepareWrite out to
    1147             :  * the network. Take as long as needed, but process replies from the other
    1148             :  * side and check timeouts during that.
    1149             :  */
    1150             : static void
    1151           0 : WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
    1152             :                 bool last_write)
    1153             : {
    1154             :     /* output previously gathered data in a CopyData packet */
    1155           0 :     pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
    1156             : 
    1157             :     /*
    1158             :      * Fill the send timestamp last, so that it is taken as late as possible.
    1159             :      * This is somewhat ugly, but the protocol is set as it's already used for
    1160             :      * several releases by streaming physical replication.
    1161             :      */
    1162           0 :     resetStringInfo(&tmpbuf);
    1163           0 :     pq_sendint64(&tmpbuf, GetCurrentTimestamp());
    1164           0 :     memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
    1165           0 :            tmpbuf.data, sizeof(int64));
    1166             : 
    1167             :     /* fast path */
    1168             :     /* Try to flush pending output to the client */
    1169           0 :     if (pq_flush_if_writable() != 0)
    1170           0 :         WalSndShutdown();
    1171             : 
    1172           0 :     if (!pq_is_send_pending())
    1173           0 :         return;
    1174             : 
    1175             :     for (;;)
    1176             :     {
    1177             :         int         wakeEvents;
    1178             :         long        sleeptime;
    1179             :         TimestampTz now;
    1180             : 
    1181             :         /*
    1182             :          * Emergency bailout if postmaster has died.  This is to avoid the
    1183             :          * necessity for manual cleanup of all postmaster children.
    1184             :          */
    1185           0 :         if (!PostmasterIsAlive())
    1186           0 :             exit(1);
    1187             : 
    1188             :         /* Clear any already-pending wakeups */
    1189           0 :         ResetLatch(MyLatch);
    1190             : 
    1191           0 :         CHECK_FOR_INTERRUPTS();
    1192             : 
    1193             :         /* Process any requests or signals received recently */
    1194           0 :         if (ConfigReloadPending)
    1195             :         {
    1196           0 :             ConfigReloadPending = false;
    1197           0 :             ProcessConfigFile(PGC_SIGHUP);
    1198           0 :             SyncRepInitConfig();
    1199             :         }
    1200             : 
    1201             :         /* Check for input from the client */
    1202           0 :         ProcessRepliesIfAny();
    1203             : 
    1204             :         /* Try to flush pending output to the client */
    1205           0 :         if (pq_flush_if_writable() != 0)
    1206           0 :             WalSndShutdown();
    1207             : 
    1208             :         /* If we finished clearing the buffered data, we're done here. */
    1209           0 :         if (!pq_is_send_pending())
    1210           0 :             break;
    1211             : 
    1212           0 :         now = GetCurrentTimestamp();
    1213             : 
    1214             :         /* die if timeout was reached */
    1215           0 :         WalSndCheckTimeOut(now);
    1216             : 
    1217             :         /* Send keepalive if the time has come */
    1218           0 :         WalSndKeepaliveIfNecessary(now);
    1219             : 
    1220           0 :         sleeptime = WalSndComputeSleeptime(now);
    1221             : 
    1222           0 :         wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
    1223             :             WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
    1224             : 
    1225             :         /* Sleep until something happens or we time out */
    1226           0 :         WaitLatchOrSocket(MyLatch, wakeEvents,
    1227           0 :                           MyProcPort->sock, sleeptime,
    1228             :                           WAIT_EVENT_WAL_SENDER_WRITE_DATA);
    1229           0 :     }
    1230             : 
    1231             :     /* reactivate latch so WalSndLoop knows to continue */
    1232           0 :     SetLatch(MyLatch);
    1233             : }
    1234             : 
    1235             : /*
    1236             :  * LogicalDecodingContext 'progress_update' callback.
    1237             :  *
    1238             :  * Write the current position to the log tracker (see XLogSendPhysical).
    1239             :  */
    1240             : static void
    1241           0 : WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
    1242             : {
    1243             :     static TimestampTz sendTime = 0;
    1244           0 :     TimestampTz now = GetCurrentTimestamp();
    1245             : 
    1246             :     /*
    1247             :      * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
    1248             :      * avoid flooding the lag tracker when we commit frequently.
    1249             :      */
    1250             : #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS    1000
    1251           0 :     if (!TimestampDifferenceExceeds(sendTime, now,
    1252             :                                     WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
    1253           0 :         return;
    1254             : 
    1255           0 :     LagTrackerWrite(lsn, now);
    1256           0 :     sendTime = now;
    1257             : }
    1258             : 
    1259             : /*
    1260             :  * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
    1261             :  *
    1262             :  * Returns end LSN of flushed WAL.  Normally this will be >= loc, but
    1263             :  * if we detect a shutdown request (either from postmaster or client)
    1264             :  * we will return early, so caller must always check.
    1265             :  */
    1266             : static XLogRecPtr
    1267           0 : WalSndWaitForWal(XLogRecPtr loc)
    1268             : {
    1269             :     int         wakeEvents;
    1270             :     static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
    1271             : 
    1272             : 
    1273             :     /*
    1274             :      * Fast path to avoid acquiring the spinlock in case we already know we
    1275             :      * have enough WAL available. This is particularly interesting if we're
    1276             :      * far behind.
    1277             :      */
    1278           0 :     if (RecentFlushPtr != InvalidXLogRecPtr &&
    1279           0 :         loc <= RecentFlushPtr)
    1280           0 :         return RecentFlushPtr;
    1281             : 
    1282             :     /* Get a more recent flush pointer. */
    1283           0 :     if (!RecoveryInProgress())
    1284           0 :         RecentFlushPtr = GetFlushRecPtr();
    1285             :     else
    1286           0 :         RecentFlushPtr = GetXLogReplayRecPtr(NULL);
    1287             : 
    1288             :     for (;;)
    1289             :     {
    1290             :         long        sleeptime;
    1291             :         TimestampTz now;
    1292             : 
    1293             :         /*
    1294             :          * Emergency bailout if postmaster has died.  This is to avoid the
    1295             :          * necessity for manual cleanup of all postmaster children.
    1296             :          */
    1297           0 :         if (!PostmasterIsAlive())
    1298           0 :             exit(1);
    1299             : 
    1300             :         /* Clear any already-pending wakeups */
    1301           0 :         ResetLatch(MyLatch);
    1302             : 
    1303           0 :         CHECK_FOR_INTERRUPTS();
    1304             : 
    1305             :         /* Process any requests or signals received recently */
    1306           0 :         if (ConfigReloadPending)
    1307             :         {
    1308           0 :             ConfigReloadPending = false;
    1309           0 :             ProcessConfigFile(PGC_SIGHUP);
    1310           0 :             SyncRepInitConfig();
    1311             :         }
    1312             : 
    1313             :         /* Check for input from the client */
    1314           0 :         ProcessRepliesIfAny();
    1315             : 
    1316             :         /*
    1317             :          * If we're shutting down, trigger pending WAL to be written out,
    1318             :          * otherwise we'd possibly end up waiting for WAL that never gets
    1319             :          * written, because walwriter has shut down already.
    1320             :          */
    1321           0 :         if (got_STOPPING)
    1322           0 :             XLogBackgroundFlush();
    1323             : 
    1324             :         /* Update our idea of the currently flushed position. */
    1325           0 :         if (!RecoveryInProgress())
    1326           0 :             RecentFlushPtr = GetFlushRecPtr();
    1327             :         else
    1328           0 :             RecentFlushPtr = GetXLogReplayRecPtr(NULL);
    1329             : 
    1330             :         /*
    1331             :          * If postmaster asked us to stop, don't wait anymore.
    1332             :          *
    1333             :          * It's important to do this check after the recomputation of
    1334             :          * RecentFlushPtr, so we can send all remaining data before shutting
    1335             :          * down.
    1336             :          */
    1337           0 :         if (got_STOPPING)
    1338           0 :             break;
    1339             : 
    1340             :         /*
    1341             :          * We only send regular messages to the client for full decoded
    1342             :          * transactions, but a synchronous replication and walsender shutdown
    1343             :          * possibly are waiting for a later location. So we send pings
    1344             :          * containing the flush location every now and then.
    1345             :          */
    1346           0 :         if (MyWalSnd->flush < sentPtr &&
    1347           0 :             MyWalSnd->write < sentPtr &&
    1348           0 :             !waiting_for_ping_response)
    1349             :         {
    1350           0 :             WalSndKeepalive(false);
    1351           0 :             waiting_for_ping_response = true;
    1352             :         }
    1353             : 
    1354             :         /* check whether we're done */
    1355           0 :         if (loc <= RecentFlushPtr)
    1356           0 :             break;
    1357             : 
    1358             :         /* Waiting for new WAL. Since we need to wait, we're now caught up. */
    1359           0 :         WalSndCaughtUp = true;
    1360             : 
    1361             :         /*
    1362             :          * Try to flush any pending output to the client.
    1363             :          */
    1364           0 :         if (pq_flush_if_writable() != 0)
    1365           0 :             WalSndShutdown();
    1366             : 
    1367             :         /*
    1368             :          * If we have received CopyDone from the client, sent CopyDone
    1369             :          * ourselves, and the output buffer is empty, it's time to exit
    1370             :          * streaming, so fail the current WAL fetch request.
    1371             :          */
    1372           0 :         if (streamingDoneReceiving && streamingDoneSending &&
    1373           0 :             !pq_is_send_pending())
    1374           0 :             break;
    1375             : 
    1376           0 :         now = GetCurrentTimestamp();
    1377             : 
    1378             :         /* die if timeout was reached */
    1379           0 :         WalSndCheckTimeOut(now);
    1380             : 
    1381             :         /* Send keepalive if the time has come */
    1382           0 :         WalSndKeepaliveIfNecessary(now);
    1383             : 
    1384             :         /*
    1385             :          * Sleep until something happens or we time out.  Also wait for the
    1386             :          * socket becoming writable, if there's still pending output.
    1387             :          * Otherwise we might sit on sendable output data while waiting for
    1388             :          * new WAL to be generated.  (But if we have nothing to send, we don't
    1389             :          * want to wake on socket-writable.)
    1390             :          */
    1391           0 :         sleeptime = WalSndComputeSleeptime(now);
    1392             : 
    1393           0 :         wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
    1394             :             WL_SOCKET_READABLE | WL_TIMEOUT;
    1395             : 
    1396           0 :         if (pq_is_send_pending())
    1397           0 :             wakeEvents |= WL_SOCKET_WRITEABLE;
    1398             : 
    1399           0 :         WaitLatchOrSocket(MyLatch, wakeEvents,
    1400           0 :                           MyProcPort->sock, sleeptime,
    1401             :                           WAIT_EVENT_WAL_SENDER_WAIT_WAL);
    1402           0 :     }
    1403             : 
    1404             :     /* reactivate latch so WalSndLoop knows to continue */
    1405           0 :     SetLatch(MyLatch);
    1406           0 :     return RecentFlushPtr;
    1407             : }
    1408             : 
    1409             : /*
    1410             :  * Execute an incoming replication command.
    1411             :  *
    1412             :  * Returns true if the cmd_string was recognized as WalSender command, false
    1413             :  * if not.
    1414             :  */
    1415             : bool
    1416           0 : exec_replication_command(const char *cmd_string)
    1417             : {
    1418             :     int         parse_rc;
    1419             :     Node       *cmd_node;
    1420             :     MemoryContext cmd_context;
    1421             :     MemoryContext old_context;
    1422             : 
    1423             :     /*
    1424             :      * If WAL sender has been told that shutdown is getting close, switch its
    1425             :      * status accordingly to handle the next replication commands correctly.
    1426             :      */
    1427           0 :     if (got_STOPPING)
    1428           0 :         WalSndSetState(WALSNDSTATE_STOPPING);
    1429             : 
    1430             :     /*
    1431             :      * Throw error if in stopping mode.  We need prevent commands that could
    1432             :      * generate WAL while the shutdown checkpoint is being written.  To be
    1433             :      * safe, we just prohibit all new commands.
    1434             :      */
    1435           0 :     if (MyWalSnd->state == WALSNDSTATE_STOPPING)
    1436           0 :         ereport(ERROR,
    1437             :                 (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
    1438             : 
    1439             :     /*
    1440             :      * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
    1441             :      * command arrives. Clean up the old stuff if there's anything.
    1442             :      */
    1443           0 :     SnapBuildClearExportedSnapshot();
    1444             : 
    1445           0 :     CHECK_FOR_INTERRUPTS();
    1446             : 
    1447           0 :     cmd_context = AllocSetContextCreate(CurrentMemoryContext,
    1448             :                                         "Replication command context",
    1449             :                                         ALLOCSET_DEFAULT_SIZES);
    1450           0 :     old_context = MemoryContextSwitchTo(cmd_context);
    1451             : 
    1452           0 :     replication_scanner_init(cmd_string);
    1453           0 :     parse_rc = replication_yyparse();
    1454           0 :     if (parse_rc != 0)
    1455           0 :         ereport(ERROR,
    1456             :                 (errcode(ERRCODE_SYNTAX_ERROR),
    1457             :                  (errmsg_internal("replication command parser returned %d",
    1458             :                                   parse_rc))));
    1459             : 
    1460           0 :     cmd_node = replication_parse_result;
    1461             : 
    1462             :     /*
    1463             :      * Log replication command if log_replication_commands is enabled. Even
    1464             :      * when it's disabled, log the command with DEBUG1 level for backward
    1465             :      * compatibility. Note that SQL commands are not logged here, and will be
    1466             :      * logged later if log_statement is enabled.
    1467             :      */
    1468           0 :     if (cmd_node->type != T_SQLCmd)
    1469           0 :         ereport(log_replication_commands ? LOG : DEBUG1,
    1470             :                 (errmsg("received replication command: %s", cmd_string)));
    1471             : 
    1472             :     /*
    1473             :      * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
    1474             :      * called outside of transaction the snapshot should be cleared here.
    1475             :      */
    1476           0 :     if (!IsTransactionBlock())
    1477           0 :         SnapBuildClearExportedSnapshot();
    1478             : 
    1479             :     /*
    1480             :      * For aborted transactions, don't allow anything except pure SQL, the
    1481             :      * exec_simple_query() will handle it correctly.
    1482             :      */
    1483           0 :     if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
    1484           0 :         ereport(ERROR,
    1485             :                 (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
    1486             :                  errmsg("current transaction is aborted, "
    1487             :                         "commands ignored until end of transaction block")));
    1488             : 
    1489           0 :     CHECK_FOR_INTERRUPTS();
    1490             : 
    1491             :     /*
    1492             :      * Allocate buffers that will be used for each outgoing and incoming
    1493             :      * message.  We do this just once per command to reduce palloc overhead.
    1494             :      */
    1495           0 :     initStringInfo(&output_message);
    1496           0 :     initStringInfo(&reply_message);
    1497           0 :     initStringInfo(&tmpbuf);
    1498             : 
    1499           0 :     switch (cmd_node->type)
    1500             :     {
    1501             :         case T_IdentifySystemCmd:
    1502           0 :             IdentifySystem();
    1503           0 :             break;
    1504             : 
    1505             :         case T_BaseBackupCmd:
    1506           0 :             PreventTransactionChain(true, "BASE_BACKUP");
    1507           0 :             SendBaseBackup((BaseBackupCmd *) cmd_node);
    1508           0 :             break;
    1509             : 
    1510             :         case T_CreateReplicationSlotCmd:
    1511           0 :             CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
    1512           0 :             break;
    1513             : 
    1514             :         case T_DropReplicationSlotCmd:
    1515           0 :             DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
    1516           0 :             break;
    1517             : 
    1518             :         case T_StartReplicationCmd:
    1519             :             {
    1520           0 :                 StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
    1521             : 
    1522           0 :                 PreventTransactionChain(true, "START_REPLICATION");
    1523             : 
    1524           0 :                 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
    1525           0 :                     StartReplication(cmd);
    1526             :                 else
    1527           0 :                     StartLogicalReplication(cmd);
    1528           0 :                 break;
    1529             :             }
    1530             : 
    1531             :         case T_TimeLineHistoryCmd:
    1532           0 :             PreventTransactionChain(true, "TIMELINE_HISTORY");
    1533           0 :             SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
    1534           0 :             break;
    1535             : 
    1536             :         case T_VariableShowStmt:
    1537             :             {
    1538           0 :                 DestReceiver *dest = CreateDestReceiver(DestRemoteSimple);
    1539           0 :                 VariableShowStmt *n = (VariableShowStmt *) cmd_node;
    1540             : 
    1541           0 :                 GetPGVariable(n->name, dest);
    1542             :             }
    1543           0 :             break;
    1544             : 
    1545             :         case T_SQLCmd:
    1546           0 :             if (MyDatabaseId == InvalidOid)
    1547           0 :                 ereport(ERROR,
    1548             :                         (errmsg("not connected to database")));
    1549             : 
    1550             :             /* Tell the caller that this wasn't a WalSender command. */
    1551           0 :             return false;
    1552             : 
    1553             :         default:
    1554           0 :             elog(ERROR, "unrecognized replication command node tag: %u",
    1555             :                  cmd_node->type);
    1556             :     }
    1557             : 
    1558             :     /* done */
    1559           0 :     MemoryContextSwitchTo(old_context);
    1560           0 :     MemoryContextDelete(cmd_context);
    1561             : 
    1562             :     /* Send CommandComplete message */
    1563           0 :     EndCommand("SELECT", DestRemote);
    1564             : 
    1565           0 :     return true;
    1566             : }
    1567             : 
    1568             : /*
    1569             :  * Process any incoming messages while streaming. Also checks if the remote
    1570             :  * end has closed the connection.
    1571             :  */
    1572             : static void
    1573           0 : ProcessRepliesIfAny(void)
    1574             : {
    1575             :     unsigned char firstchar;
    1576             :     int         r;
    1577           0 :     bool        received = false;
    1578             : 
    1579             :     for (;;)
    1580             :     {
    1581           0 :         pq_startmsgread();
    1582           0 :         r = pq_getbyte_if_available(&firstchar);
    1583           0 :         if (r < 0)
    1584             :         {
    1585             :             /* unexpected error or EOF */
    1586           0 :             ereport(COMMERROR,
    1587             :                     (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1588             :                      errmsg("unexpected EOF on standby connection")));
    1589           0 :             proc_exit(0);
    1590             :         }
    1591           0 :         if (r == 0)
    1592             :         {
    1593             :             /* no data available without blocking */
    1594           0 :             pq_endmsgread();
    1595           0 :             break;
    1596             :         }
    1597             : 
    1598             :         /* Read the message contents */
    1599           0 :         resetStringInfo(&reply_message);
    1600           0 :         if (pq_getmessage(&reply_message, 0))
    1601             :         {
    1602           0 :             ereport(COMMERROR,
    1603             :                     (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1604             :                      errmsg("unexpected EOF on standby connection")));
    1605           0 :             proc_exit(0);
    1606             :         }
    1607             : 
    1608             :         /*
    1609             :          * If we already received a CopyDone from the frontend, the frontend
    1610             :          * should not send us anything until we've closed our end of the COPY.
    1611             :          * XXX: In theory, the frontend could already send the next command
    1612             :          * before receiving the CopyDone, but libpq doesn't currently allow
    1613             :          * that.
    1614             :          */
    1615           0 :         if (streamingDoneReceiving && firstchar != 'X')
    1616           0 :             ereport(FATAL,
    1617             :                     (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1618             :                      errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
    1619             :                             firstchar)));
    1620             : 
    1621             :         /* Handle the very limited subset of commands expected in this phase */
    1622           0 :         switch (firstchar)
    1623             :         {
    1624             :                 /*
    1625             :                  * 'd' means a standby reply wrapped in a CopyData packet.
    1626             :                  */
    1627             :             case 'd':
    1628           0 :                 ProcessStandbyMessage();
    1629           0 :                 received = true;
    1630           0 :                 break;
    1631             : 
    1632             :                 /*
    1633             :                  * CopyDone means the standby requested to finish streaming.
    1634             :                  * Reply with CopyDone, if we had not sent that already.
    1635             :                  */
    1636             :             case 'c':
    1637           0 :                 if (!streamingDoneSending)
    1638             :                 {
    1639           0 :                     pq_putmessage_noblock('c', NULL, 0);
    1640           0 :                     streamingDoneSending = true;
    1641             :                 }
    1642             : 
    1643           0 :                 streamingDoneReceiving = true;
    1644           0 :                 received = true;
    1645           0 :                 break;
    1646             : 
    1647             :                 /*
    1648             :                  * 'X' means that the standby is closing down the socket.
    1649             :                  */
    1650             :             case 'X':
    1651           0 :                 proc_exit(0);
    1652             : 
    1653             :             default:
    1654           0 :                 ereport(FATAL,
    1655             :                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1656             :                          errmsg("invalid standby message type \"%c\"",
    1657             :                                 firstchar)));
    1658             :         }
    1659           0 :     }
    1660             : 
    1661             :     /*
    1662             :      * Save the last reply timestamp if we've received at least one reply.
    1663             :      */
    1664           0 :     if (received)
    1665             :     {
    1666           0 :         last_reply_timestamp = GetCurrentTimestamp();
    1667           0 :         waiting_for_ping_response = false;
    1668             :     }
    1669           0 : }
    1670             : 
    1671             : /*
    1672             :  * Process a status update message received from standby.
    1673             :  */
    1674             : static void
    1675           0 : ProcessStandbyMessage(void)
    1676             : {
    1677             :     char        msgtype;
    1678             : 
    1679             :     /*
    1680             :      * Check message type from the first byte.
    1681             :      */
    1682           0 :     msgtype = pq_getmsgbyte(&reply_message);
    1683             : 
    1684           0 :     switch (msgtype)
    1685             :     {
    1686             :         case 'r':
    1687           0 :             ProcessStandbyReplyMessage();
    1688           0 :             break;
    1689             : 
    1690             :         case 'h':
    1691           0 :             ProcessStandbyHSFeedbackMessage();
    1692           0 :             break;
    1693             : 
    1694             :         default:
    1695           0 :             ereport(COMMERROR,
    1696             :                     (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1697             :                      errmsg("unexpected message type \"%c\"", msgtype)));
    1698           0 :             proc_exit(0);
    1699             :     }
    1700           0 : }
    1701             : 
    1702             : /*
    1703             :  * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
    1704             :  */
    1705             : static void
    1706           0 : PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
    1707             : {
    1708           0 :     bool        changed = false;
    1709           0 :     ReplicationSlot *slot = MyReplicationSlot;
    1710             : 
    1711           0 :     Assert(lsn != InvalidXLogRecPtr);
    1712           0 :     SpinLockAcquire(&slot->mutex);
    1713           0 :     if (slot->data.restart_lsn != lsn)
    1714             :     {
    1715           0 :         changed = true;
    1716           0 :         slot->data.restart_lsn = lsn;
    1717             :     }
    1718           0 :     SpinLockRelease(&slot->mutex);
    1719             : 
    1720           0 :     if (changed)
    1721             :     {
    1722           0 :         ReplicationSlotMarkDirty();
    1723           0 :         ReplicationSlotsComputeRequiredLSN();
    1724             :     }
    1725             : 
    1726             :     /*
    1727             :      * One could argue that the slot should be saved to disk now, but that'd
    1728             :      * be energy wasted - the worst lost information can do here is give us
    1729             :      * wrong information in a statistics view - we'll just potentially be more
    1730             :      * conservative in removing files.
    1731             :      */
    1732           0 : }
    1733             : 
    1734             : /*
    1735             :  * Regular reply from standby advising of WAL locations on standby server.
    1736             :  */
    1737             : static void
    1738           0 : ProcessStandbyReplyMessage(void)
    1739             : {
    1740             :     XLogRecPtr  writePtr,
    1741             :                 flushPtr,
    1742             :                 applyPtr;
    1743             :     bool        replyRequested;
    1744             :     TimeOffset  writeLag,
    1745             :                 flushLag,
    1746             :                 applyLag;
    1747             :     bool        clearLagTimes;
    1748             :     TimestampTz now;
    1749             : 
    1750             :     static bool fullyAppliedLastTime = false;
    1751             : 
    1752             :     /* the caller already consumed the msgtype byte */
    1753           0 :     writePtr = pq_getmsgint64(&reply_message);
    1754           0 :     flushPtr = pq_getmsgint64(&reply_message);
    1755           0 :     applyPtr = pq_getmsgint64(&reply_message);
    1756           0 :     (void) pq_getmsgint64(&reply_message);  /* sendTime; not used ATM */
    1757           0 :     replyRequested = pq_getmsgbyte(&reply_message);
    1758             : 
    1759           0 :     elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
    1760             :          (uint32) (writePtr >> 32), (uint32) writePtr,
    1761             :          (uint32) (flushPtr >> 32), (uint32) flushPtr,
    1762             :          (uint32) (applyPtr >> 32), (uint32) applyPtr,
    1763             :          replyRequested ? " (reply requested)" : "");
    1764             : 
    1765             :     /* See if we can compute the round-trip lag for these positions. */
    1766           0 :     now = GetCurrentTimestamp();
    1767           0 :     writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
    1768           0 :     flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
    1769           0 :     applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
    1770             : 
    1771             :     /*
    1772             :      * If the standby reports that it has fully replayed the WAL in two
    1773             :      * consecutive reply messages, then the second such message must result
    1774             :      * from wal_receiver_status_interval expiring on the standby.  This is a
    1775             :      * convenient time to forget the lag times measured when it last
    1776             :      * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
    1777             :      * until more WAL traffic arrives.
    1778             :      */
    1779           0 :     clearLagTimes = false;
    1780           0 :     if (applyPtr == sentPtr)
    1781             :     {
    1782           0 :         if (fullyAppliedLastTime)
    1783           0 :             clearLagTimes = true;
    1784           0 :         fullyAppliedLastTime = true;
    1785             :     }
    1786             :     else
    1787           0 :         fullyAppliedLastTime = false;
    1788             : 
    1789             :     /* Send a reply if the standby requested one. */
    1790           0 :     if (replyRequested)
    1791           0 :         WalSndKeepalive(false);
    1792             : 
    1793             :     /*
    1794             :      * Update shared state for this WalSender process based on reply data from
    1795             :      * standby.
    1796             :      */
    1797             :     {
    1798           0 :         WalSnd     *walsnd = MyWalSnd;
    1799             : 
    1800           0 :         SpinLockAcquire(&walsnd->mutex);
    1801           0 :         walsnd->write = writePtr;
    1802           0 :         walsnd->flush = flushPtr;
    1803           0 :         walsnd->apply = applyPtr;
    1804           0 :         if (writeLag != -1 || clearLagTimes)
    1805           0 :             walsnd->writeLag = writeLag;
    1806           0 :         if (flushLag != -1 || clearLagTimes)
    1807           0 :             walsnd->flushLag = flushLag;
    1808           0 :         if (applyLag != -1 || clearLagTimes)
    1809           0 :             walsnd->applyLag = applyLag;
    1810           0 :         SpinLockRelease(&walsnd->mutex);
    1811             :     }
    1812             : 
    1813           0 :     if (!am_cascading_walsender)
    1814           0 :         SyncRepReleaseWaiters();
    1815             : 
    1816             :     /*
    1817             :      * Advance our local xmin horizon when the client confirmed a flush.
    1818             :      */
    1819           0 :     if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
    1820             :     {
    1821           0 :         if (SlotIsLogical(MyReplicationSlot))
    1822           0 :             LogicalConfirmReceivedLocation(flushPtr);
    1823             :         else
    1824           0 :             PhysicalConfirmReceivedLocation(flushPtr);
    1825             :     }
    1826           0 : }
    1827             : 
    1828             : /* compute new replication slot xmin horizon if needed */
    1829             : static void
    1830           0 : PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
    1831             : {
    1832           0 :     bool        changed = false;
    1833           0 :     ReplicationSlot *slot = MyReplicationSlot;
    1834             : 
    1835           0 :     SpinLockAcquire(&slot->mutex);
    1836           0 :     MyPgXact->xmin = InvalidTransactionId;
    1837             : 
    1838             :     /*
    1839             :      * For physical replication we don't need the interlock provided by xmin
    1840             :      * and effective_xmin since the consequences of a missed increase are
    1841             :      * limited to query cancellations, so set both at once.
    1842             :      */
    1843           0 :     if (!TransactionIdIsNormal(slot->data.xmin) ||
    1844           0 :         !TransactionIdIsNormal(feedbackXmin) ||
    1845           0 :         TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
    1846             :     {
    1847           0 :         changed = true;
    1848           0 :         slot->data.xmin = feedbackXmin;
    1849           0 :         slot->effective_xmin = feedbackXmin;
    1850             :     }
    1851           0 :     if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
    1852           0 :         !TransactionIdIsNormal(feedbackCatalogXmin) ||
    1853           0 :         TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
    1854             :     {
    1855           0 :         changed = true;
    1856           0 :         slot->data.catalog_xmin = feedbackCatalogXmin;
    1857           0 :         slot->effective_catalog_xmin = feedbackCatalogXmin;
    1858             :     }
    1859           0 :     SpinLockRelease(&slot->mutex);
    1860             : 
    1861           0 :     if (changed)
    1862             :     {
    1863           0 :         ReplicationSlotMarkDirty();
    1864           0 :         ReplicationSlotsComputeRequiredXmin(false);
    1865             :     }
    1866           0 : }
    1867             : 
    1868             : /*
    1869             :  * Check that the provided xmin/epoch are sane, that is, not in the future
    1870             :  * and not so far back as to be already wrapped around.
    1871             :  *
    1872             :  * Epoch of nextXid should be same as standby, or if the counter has
    1873             :  * wrapped, then one greater than standby.
    1874             :  *
    1875             :  * This check doesn't care about whether clog exists for these xids
    1876             :  * at all.
    1877             :  */
    1878             : static bool
    1879           0 : TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
    1880             : {
    1881             :     TransactionId nextXid;
    1882             :     uint32      nextEpoch;
    1883             : 
    1884           0 :     GetNextXidAndEpoch(&nextXid, &nextEpoch);
    1885             : 
    1886           0 :     if (xid <= nextXid)
    1887             :     {
    1888           0 :         if (epoch != nextEpoch)
    1889           0 :             return false;
    1890             :     }
    1891             :     else
    1892             :     {
    1893           0 :         if (epoch + 1 != nextEpoch)
    1894           0 :             return false;
    1895             :     }
    1896             : 
    1897           0 :     if (!TransactionIdPrecedesOrEquals(xid, nextXid))
    1898           0 :         return false;           /* epoch OK, but it's wrapped around */
    1899             : 
    1900           0 :     return true;
    1901             : }
    1902             : 
    1903             : /*
    1904             :  * Hot Standby feedback
    1905             :  */
    1906             : static void
    1907           0 : ProcessStandbyHSFeedbackMessage(void)
    1908             : {
    1909             :     TransactionId feedbackXmin;
    1910             :     uint32      feedbackEpoch;
    1911             :     TransactionId feedbackCatalogXmin;
    1912             :     uint32      feedbackCatalogEpoch;
    1913             : 
    1914             :     /*
    1915             :      * Decipher the reply message. The caller already consumed the msgtype
    1916             :      * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
    1917             :      * of this message.
    1918             :      */
    1919           0 :     (void) pq_getmsgint64(&reply_message);  /* sendTime; not used ATM */
    1920           0 :     feedbackXmin = pq_getmsgint(&reply_message, 4);
    1921           0 :     feedbackEpoch = pq_getmsgint(&reply_message, 4);
    1922           0 :     feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
    1923           0 :     feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
    1924             : 
    1925           0 :     elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
    1926             :          feedbackXmin,
    1927             :          feedbackEpoch,
    1928             :          feedbackCatalogXmin,
    1929             :          feedbackCatalogEpoch);
    1930             : 
    1931             :     /*
    1932             :      * Unset WalSender's xmins if the feedback message values are invalid.
    1933             :      * This happens when the downstream turned hot_standby_feedback off.
    1934             :      */
    1935           0 :     if (!TransactionIdIsNormal(feedbackXmin)
    1936           0 :         && !TransactionIdIsNormal(feedbackCatalogXmin))
    1937             :     {
    1938           0 :         MyPgXact->xmin = InvalidTransactionId;
    1939           0 :         if (MyReplicationSlot != NULL)
    1940           0 :             PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
    1941           0 :         return;
    1942             :     }
    1943             : 
    1944             :     /*
    1945             :      * Check that the provided xmin/epoch are sane, that is, not in the future
    1946             :      * and not so far back as to be already wrapped around.  Ignore if not.
    1947             :      */
    1948           0 :     if (TransactionIdIsNormal(feedbackXmin) &&
    1949           0 :         !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
    1950           0 :         return;
    1951             : 
    1952           0 :     if (TransactionIdIsNormal(feedbackCatalogXmin) &&
    1953           0 :         !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
    1954           0 :         return;
    1955             : 
    1956             :     /*
    1957             :      * Set the WalSender's xmin equal to the standby's requested xmin, so that
    1958             :      * the xmin will be taken into account by GetOldestXmin.  This will hold
    1959             :      * back the removal of dead rows and thereby prevent the generation of
    1960             :      * cleanup conflicts on the standby server.
    1961             :      *
    1962             :      * There is a small window for a race condition here: although we just
    1963             :      * checked that feedbackXmin precedes nextXid, the nextXid could have
    1964             :      * gotten advanced between our fetching it and applying the xmin below,
    1965             :      * perhaps far enough to make feedbackXmin wrap around.  In that case the
    1966             :      * xmin we set here would be "in the future" and have no effect.  No point
    1967             :      * in worrying about this since it's too late to save the desired data
    1968             :      * anyway.  Assuming that the standby sends us an increasing sequence of
    1969             :      * xmins, this could only happen during the first reply cycle, else our
    1970             :      * own xmin would prevent nextXid from advancing so far.
    1971             :      *
    1972             :      * We don't bother taking the ProcArrayLock here.  Setting the xmin field
    1973             :      * is assumed atomic, and there's no real need to prevent a concurrent
    1974             :      * GetOldestXmin.  (If we're moving our xmin forward, this is obviously
    1975             :      * safe, and if we're moving it backwards, well, the data is at risk
    1976             :      * already since a VACUUM could have just finished calling GetOldestXmin.)
    1977             :      *
    1978             :      * If we're using a replication slot we reserve the xmin via that,
    1979             :      * otherwise via the walsender's PGXACT entry. We can only track the
    1980             :      * catalog xmin separately when using a slot, so we store the least of the
    1981             :      * two provided when not using a slot.
    1982             :      *
    1983             :      * XXX: It might make sense to generalize the ephemeral slot concept and
    1984             :      * always use the slot mechanism to handle the feedback xmin.
    1985             :      */
    1986           0 :     if (MyReplicationSlot != NULL)  /* XXX: persistency configurable? */
    1987           0 :         PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
    1988             :     else
    1989             :     {
    1990           0 :         if (TransactionIdIsNormal(feedbackCatalogXmin)
    1991           0 :             && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
    1992           0 :             MyPgXact->xmin = feedbackCatalogXmin;
    1993             :         else
    1994           0 :             MyPgXact->xmin = feedbackXmin;
    1995             :     }
    1996             : }
    1997             : 
    1998             : /*
    1999             :  * Compute how long send/receive loops should sleep.
    2000             :  *
    2001             :  * If wal_sender_timeout is enabled we want to wake up in time to send
    2002             :  * keepalives and to abort the connection if wal_sender_timeout has been
    2003             :  * reached.
    2004             :  */
    2005             : static long
    2006           0 : WalSndComputeSleeptime(TimestampTz now)
    2007             : {
    2008           0 :     long        sleeptime = 10000;  /* 10 s */
    2009             : 
    2010           0 :     if (wal_sender_timeout > 0 && last_reply_timestamp > 0)
    2011             :     {
    2012             :         TimestampTz wakeup_time;
    2013             :         long        sec_to_timeout;
    2014             :         int         microsec_to_timeout;
    2015             : 
    2016             :         /*
    2017             :          * At the latest stop sleeping once wal_sender_timeout has been
    2018             :          * reached.
    2019             :          */
    2020           0 :         wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
    2021             :                                                   wal_sender_timeout);
    2022             : 
    2023             :         /*
    2024             :          * If no ping has been sent yet, wakeup when it's time to do so.
    2025             :          * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
    2026             :          * the timeout passed without a response.
    2027             :          */
    2028           0 :         if (!waiting_for_ping_response)
    2029           0 :             wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
    2030             :                                                       wal_sender_timeout / 2);
    2031             : 
    2032             :         /* Compute relative time until wakeup. */
    2033           0 :         TimestampDifference(now, wakeup_time,
    2034             :                             &sec_to_timeout, &microsec_to_timeout);
    2035             : 
    2036           0 :         sleeptime = sec_to_timeout * 1000 +
    2037           0 :             microsec_to_timeout / 1000;
    2038             :     }
    2039             : 
    2040           0 :     return sleeptime;
    2041             : }
    2042             : 
    2043             : /*
    2044             :  * Check whether there have been responses by the client within
    2045             :  * wal_sender_timeout and shutdown if not.
    2046             :  */
    2047             : static void
    2048           0 : WalSndCheckTimeOut(TimestampTz now)
    2049             : {
    2050             :     TimestampTz timeout;
    2051             : 
    2052             :     /* don't bail out if we're doing something that doesn't require timeouts */
    2053           0 :     if (last_reply_timestamp <= 0)
    2054           0 :         return;
    2055             : 
    2056           0 :     timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
    2057             :                                           wal_sender_timeout);
    2058             : 
    2059           0 :     if (wal_sender_timeout > 0 && now >= timeout)
    2060             :     {
    2061             :         /*
    2062             :          * Since typically expiration of replication timeout means
    2063             :          * communication problem, we don't send the error message to the
    2064             :          * standby.
    2065             :          */
    2066           0 :         ereport(COMMERROR,
    2067             :                 (errmsg("terminating walsender process due to replication timeout")));
    2068             : 
    2069           0 :         WalSndShutdown();
    2070             :     }
    2071             : }
    2072             : 
    2073             : /* Main loop of walsender process that streams the WAL over Copy messages. */
    2074             : static void
    2075           0 : WalSndLoop(WalSndSendDataCallback send_data)
    2076             : {
    2077             :     /*
    2078             :      * Initialize the last reply timestamp. That enables timeout processing
    2079             :      * from hereon.
    2080             :      */
    2081           0 :     last_reply_timestamp = GetCurrentTimestamp();
    2082           0 :     waiting_for_ping_response = false;
    2083             : 
    2084             :     /* Report to pgstat that this process is running */
    2085           0 :     pgstat_report_activity(STATE_RUNNING, NULL);
    2086             : 
    2087             :     /*
    2088             :      * Loop until we reach the end of this timeline or the client requests to
    2089             :      * stop streaming.
    2090             :      */
    2091             :     for (;;)
    2092             :     {
    2093             :         TimestampTz now;
    2094             : 
    2095             :         /*
    2096             :          * Emergency bailout if postmaster has died.  This is to avoid the
    2097             :          * necessity for manual cleanup of all postmaster children.
    2098             :          */
    2099           0 :         if (!PostmasterIsAlive())
    2100           0 :             exit(1);
    2101             : 
    2102             :         /* Clear any already-pending wakeups */
    2103           0 :         ResetLatch(MyLatch);
    2104             : 
    2105           0 :         CHECK_FOR_INTERRUPTS();
    2106             : 
    2107             :         /* Process any requests or signals received recently */
    2108           0 :         if (ConfigReloadPending)
    2109             :         {
    2110           0 :             ConfigReloadPending = false;
    2111           0 :             ProcessConfigFile(PGC_SIGHUP);
    2112           0 :             SyncRepInitConfig();
    2113             :         }
    2114             : 
    2115             :         /* Check for input from the client */
    2116           0 :         ProcessRepliesIfAny();
    2117             : 
    2118             :         /*
    2119             :          * If we have received CopyDone from the client, sent CopyDone
    2120             :          * ourselves, and the output buffer is empty, it's time to exit
    2121             :          * streaming.
    2122             :          */
    2123           0 :         if (streamingDoneReceiving && streamingDoneSending &&
    2124           0 :             !pq_is_send_pending())
    2125           0 :             break;
    2126             : 
    2127             :         /*
    2128             :          * If we don't have any pending data in the output buffer, try to send
    2129             :          * some more.  If there is some, we don't bother to call send_data
    2130             :          * again until we've flushed it ... but we'd better assume we are not
    2131             :          * caught up.
    2132             :          */
    2133           0 :         if (!pq_is_send_pending())
    2134           0 :             send_data();
    2135             :         else
    2136           0 :             WalSndCaughtUp = false;
    2137             : 
    2138             :         /* Try to flush pending output to the client */
    2139           0 :         if (pq_flush_if_writable() != 0)
    2140           0 :             WalSndShutdown();
    2141             : 
    2142             :         /* If nothing remains to be sent right now ... */
    2143           0 :         if (WalSndCaughtUp && !pq_is_send_pending())
    2144             :         {
    2145             :             /*
    2146             :              * If we're in catchup state, move to streaming.  This is an
    2147             :              * important state change for users to know about, since before
    2148             :              * this point data loss might occur if the primary dies and we
    2149             :              * need to failover to the standby. The state change is also
    2150             :              * important for synchronous replication, since commits that
    2151             :              * started to wait at that point might wait for some time.
    2152             :              */
    2153           0 :             if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
    2154             :             {
    2155           0 :                 ereport(DEBUG1,
    2156             :                         (errmsg("standby \"%s\" has now caught up with primary",
    2157             :                                 application_name)));
    2158           0 :                 WalSndSetState(WALSNDSTATE_STREAMING);
    2159             :             }
    2160             : 
    2161             :             /*
    2162             :              * When SIGUSR2 arrives, we send any outstanding logs up to the
    2163             :              * shutdown checkpoint record (i.e., the latest record), wait for
    2164             :              * them to be replicated to the standby, and exit. This may be a
    2165             :              * normal termination at shutdown, or a promotion, the walsender
    2166             :              * is not sure which.
    2167             :              */
    2168           0 :             if (got_SIGUSR2)
    2169           0 :                 WalSndDone(send_data);
    2170             :         }
    2171             : 
    2172           0 :         now = GetCurrentTimestamp();
    2173             : 
    2174             :         /* Check for replication timeout. */
    2175           0 :         WalSndCheckTimeOut(now);
    2176             : 
    2177             :         /* Send keepalive if the time has come */
    2178           0 :         WalSndKeepaliveIfNecessary(now);
    2179             : 
    2180             :         /*
    2181             :          * We don't block if not caught up, unless there is unsent data
    2182             :          * pending in which case we'd better block until the socket is
    2183             :          * write-ready.  This test is only needed for the case where the
    2184             :          * send_data callback handled a subset of the available data but then
    2185             :          * pq_flush_if_writable flushed it all --- we should immediately try
    2186             :          * to send more.
    2187             :          */
    2188           0 :         if ((WalSndCaughtUp && !streamingDoneSending) || pq_is_send_pending())
    2189             :         {
    2190             :             long        sleeptime;
    2191             :             int         wakeEvents;
    2192             : 
    2193           0 :             wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT |
    2194             :                 WL_SOCKET_READABLE;
    2195             : 
    2196           0 :             sleeptime = WalSndComputeSleeptime(now);
    2197             : 
    2198           0 :             if (pq_is_send_pending())
    2199           0 :                 wakeEvents |= WL_SOCKET_WRITEABLE;
    2200             : 
    2201             :             /* Sleep until something happens or we time out */
    2202           0 :             WaitLatchOrSocket(MyLatch, wakeEvents,
    2203           0 :                               MyProcPort->sock, sleeptime,
    2204             :                               WAIT_EVENT_WAL_SENDER_MAIN);
    2205             :         }
    2206           0 :     }
    2207           0 :     return;
    2208             : }
    2209             : 
    2210             : /* Initialize a per-walsender data structure for this walsender process */
    2211             : static void
    2212           0 : InitWalSenderSlot(void)
    2213             : {
    2214             :     int         i;
    2215             : 
    2216             :     /*
    2217             :      * WalSndCtl should be set up already (we inherit this by fork() or
    2218             :      * EXEC_BACKEND mechanism from the postmaster).
    2219             :      */
    2220           0 :     Assert(WalSndCtl != NULL);
    2221           0 :     Assert(MyWalSnd == NULL);
    2222             : 
    2223             :     /*
    2224             :      * Find a free walsender slot and reserve it. If this fails, we must be
    2225             :      * out of WalSnd structures.
    2226             :      */
    2227           0 :     for (i = 0; i < max_wal_senders; i++)
    2228             :     {
    2229           0 :         WalSnd     *walsnd = &WalSndCtl->walsnds[i];
    2230             : 
    2231           0 :         SpinLockAcquire(&walsnd->mutex);
    2232             : 
    2233           0 :         if (walsnd->pid != 0)
    2234             :         {
    2235           0 :             SpinLockRelease(&walsnd->mutex);
    2236           0 :             continue;
    2237             :         }
    2238             :         else
    2239             :         {
    2240             :             /*
    2241             :              * Found a free slot. Reserve it for us.
    2242             :              */
    2243           0 :             walsnd->pid = MyProcPid;
    2244           0 :             walsnd->sentPtr = InvalidXLogRecPtr;
    2245           0 :             walsnd->write = InvalidXLogRecPtr;
    2246           0 :             walsnd->flush = InvalidXLogRecPtr;
    2247           0 :             walsnd->apply = InvalidXLogRecPtr;
    2248           0 :             walsnd->writeLag = -1;
    2249           0 :             walsnd->flushLag = -1;
    2250           0 :             walsnd->applyLag = -1;
    2251           0 :             walsnd->state = WALSNDSTATE_STARTUP;
    2252           0 :             walsnd->latch = &MyProc->procLatch;
    2253           0 :             SpinLockRelease(&walsnd->mutex);
    2254             :             /* don't need the lock anymore */
    2255           0 :             MyWalSnd = (WalSnd *) walsnd;
    2256             : 
    2257           0 :             break;
    2258             :         }
    2259             :     }
    2260           0 :     if (MyWalSnd == NULL)
    2261           0 :         ereport(FATAL,
    2262             :                 (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
    2263             :                  errmsg("number of requested standby connections "
    2264             :                         "exceeds max_wal_senders (currently %d)",
    2265             :                         max_wal_senders)));
    2266             : 
    2267             :     /* Arrange to clean up at walsender exit */
    2268           0 :     on_shmem_exit(WalSndKill, 0);
    2269           0 : }
    2270             : 
    2271             : /* Destroy the per-walsender data structure for this walsender process */
    2272             : static void
    2273           0 : WalSndKill(int code, Datum arg)
    2274             : {
    2275           0 :     WalSnd     *walsnd = MyWalSnd;
    2276             : 
    2277           0 :     Assert(walsnd != NULL);
    2278             : 
    2279           0 :     MyWalSnd = NULL;
    2280             : 
    2281           0 :     SpinLockAcquire(&walsnd->mutex);
    2282             :     /* clear latch while holding the spinlock, so it can safely be read */
    2283           0 :     walsnd->latch = NULL;
    2284             :     /* Mark WalSnd struct as no longer being in use. */
    2285           0 :     walsnd->pid = 0;
    2286           0 :     SpinLockRelease(&walsnd->mutex);
    2287           0 : }
    2288             : 
    2289             : /*
    2290             :  * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
    2291             :  *
    2292             :  * XXX probably this should be improved to suck data directly from the
    2293             :  * WAL buffers when possible.
    2294             :  *
    2295             :  * Will open, and keep open, one WAL segment stored in the global file
    2296             :  * descriptor sendFile. This means if XLogRead is used once, there will
    2297             :  * always be one descriptor left open until the process ends, but never
    2298             :  * more than one.
    2299             :  */
    2300             : static void
    2301           0 : XLogRead(char *buf, XLogRecPtr startptr, Size count)
    2302             : {
    2303             :     char       *p;
    2304             :     XLogRecPtr  recptr;
    2305             :     Size        nbytes;
    2306             :     XLogSegNo   segno;
    2307             : 
    2308             : retry:
    2309           0 :     p = buf;
    2310           0 :     recptr = startptr;
    2311           0 :     nbytes = count;
    2312             : 
    2313           0 :     while (nbytes > 0)
    2314             :     {
    2315             :         uint32      startoff;
    2316             :         int         segbytes;
    2317             :         int         readbytes;
    2318             : 
    2319           0 :         startoff = recptr % XLogSegSize;
    2320             : 
    2321           0 :         if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
    2322             :         {
    2323             :             char        path[MAXPGPATH];
    2324             : 
    2325             :             /* Switch to another logfile segment */
    2326           0 :             if (sendFile >= 0)
    2327           0 :                 close(sendFile);
    2328             : 
    2329           0 :             XLByteToSeg(recptr, sendSegNo);
    2330             : 
    2331             :             /*-------
    2332             :              * When reading from a historic timeline, and there is a timeline
    2333             :              * switch within this segment, read from the WAL segment belonging
    2334             :              * to the new timeline.
    2335             :              *
    2336             :              * For example, imagine that this server is currently on timeline
    2337             :              * 5, and we're streaming timeline 4. The switch from timeline 4
    2338             :              * to 5 happened at 0/13002088. In pg_wal, we have these files:
    2339             :              *
    2340             :              * ...
    2341             :              * 000000040000000000000012
    2342             :              * 000000040000000000000013
    2343             :              * 000000050000000000000013
    2344             :              * 000000050000000000000014
    2345             :              * ...
    2346             :              *
    2347             :              * In this situation, when requested to send the WAL from
    2348             :              * segment 0x13, on timeline 4, we read the WAL from file
    2349             :              * 000000050000000000000013. Archive recovery prefers files from
    2350             :              * newer timelines, so if the segment was restored from the
    2351             :              * archive on this server, the file belonging to the old timeline,
    2352             :              * 000000040000000000000013, might not exist. Their contents are
    2353             :              * equal up to the switchpoint, because at a timeline switch, the
    2354             :              * used portion of the old segment is copied to the new file.
    2355             :              *-------
    2356             :              */
    2357           0 :             curFileTimeLine = sendTimeLine;
    2358           0 :             if (sendTimeLineIsHistoric)
    2359             :             {
    2360             :                 XLogSegNo   endSegNo;
    2361             : 
    2362           0 :                 XLByteToSeg(sendTimeLineValidUpto, endSegNo);
    2363           0 :                 if (sendSegNo == endSegNo)
    2364           0 :                     curFileTimeLine = sendTimeLineNextTLI;
    2365             :             }
    2366             : 
    2367           0 :             XLogFilePath(path, curFileTimeLine, sendSegNo);
    2368             : 
    2369           0 :             sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
    2370           0 :             if (sendFile < 0)
    2371             :             {
    2372             :                 /*
    2373             :                  * If the file is not found, assume it's because the standby
    2374             :                  * asked for a too old WAL segment that has already been
    2375             :                  * removed or recycled.
    2376             :                  */
    2377           0 :                 if (errno == ENOENT)
    2378           0 :                     ereport(ERROR,
    2379             :                             (errcode_for_file_access(),
    2380             :                              errmsg("requested WAL segment %s has already been removed",
    2381             :                                     XLogFileNameP(curFileTimeLine, sendSegNo))));
    2382             :                 else
    2383           0 :                     ereport(ERROR,
    2384             :                             (errcode_for_file_access(),
    2385             :                              errmsg("could not open file \"%s\": %m",
    2386             :                                     path)));
    2387             :             }
    2388           0 :             sendOff = 0;
    2389             :         }
    2390             : 
    2391             :         /* Need to seek in the file? */
    2392           0 :         if (sendOff != startoff)
    2393             :         {
    2394           0 :             if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
    2395           0 :                 ereport(ERROR,
    2396             :                         (errcode_for_file_access(),
    2397             :                          errmsg("could not seek in log segment %s to offset %u: %m",
    2398             :                                 XLogFileNameP(curFileTimeLine, sendSegNo),
    2399             :                                 startoff)));
    2400           0 :             sendOff = startoff;
    2401             :         }
    2402             : 
    2403             :         /* How many bytes are within this segment? */
    2404           0 :         if (nbytes > (XLogSegSize - startoff))
    2405           0 :             segbytes = XLogSegSize - startoff;
    2406             :         else
    2407           0 :             segbytes = nbytes;
    2408             : 
    2409           0 :         pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
    2410           0 :         readbytes = read(sendFile, p, segbytes);
    2411           0 :         pgstat_report_wait_end();
    2412           0 :         if (readbytes <= 0)
    2413             :         {
    2414           0 :             ereport(ERROR,
    2415             :                     (errcode_for_file_access(),
    2416             :                      errmsg("could not read from log segment %s, offset %u, length %lu: %m",
    2417             :                             XLogFileNameP(curFileTimeLine, sendSegNo),
    2418             :                             sendOff, (unsigned long) segbytes)));
    2419             :         }
    2420             : 
    2421             :         /* Update state for read */
    2422           0 :         recptr += readbytes;
    2423             : 
    2424           0 :         sendOff += readbytes;
    2425           0 :         nbytes -= readbytes;
    2426           0 :         p += readbytes;
    2427             :     }
    2428             : 
    2429             :     /*
    2430             :      * After reading into the buffer, check that what we read was valid. We do
    2431             :      * this after reading, because even though the segment was present when we
    2432             :      * opened it, it might get recycled or removed while we read it. The
    2433             :      * read() succeeds in that case, but the data we tried to read might
    2434             :      * already have been overwritten with new WAL records.
    2435             :      */
    2436           0 :     XLByteToSeg(startptr, segno);
    2437           0 :     CheckXLogRemoved(segno, ThisTimeLineID);
    2438             : 
    2439             :     /*
    2440             :      * During recovery, the currently-open WAL file might be replaced with the
    2441             :      * file of the same name retrieved from archive. So we always need to
    2442             :      * check what we read was valid after reading into the buffer. If it's
    2443             :      * invalid, we try to open and read the file again.
    2444             :      */
    2445           0 :     if (am_cascading_walsender)
    2446             :     {
    2447           0 :         WalSnd     *walsnd = MyWalSnd;
    2448             :         bool        reload;
    2449             : 
    2450           0 :         SpinLockAcquire(&walsnd->mutex);
    2451           0 :         reload = walsnd->needreload;
    2452           0 :         walsnd->needreload = false;
    2453           0 :         SpinLockRelease(&walsnd->mutex);
    2454             : 
    2455           0 :         if (reload && sendFile >= 0)
    2456             :         {
    2457           0 :             close(sendFile);
    2458           0 :             sendFile = -1;
    2459             : 
    2460           0 :             goto retry;
    2461             :         }
    2462             :     }
    2463           0 : }
    2464             : 
    2465             : /*
    2466             :  * Send out the WAL in its normal physical/stored form.
    2467             :  *
    2468             :  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
    2469             :  * but not yet sent to the client, and buffer it in the libpq output
    2470             :  * buffer.
    2471             :  *
    2472             :  * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
    2473             :  * otherwise WalSndCaughtUp is set to false.
    2474             :  */
    2475             : static void
    2476           0 : XLogSendPhysical(void)
    2477             : {
    2478             :     XLogRecPtr  SendRqstPtr;
    2479             :     XLogRecPtr  startptr;
    2480             :     XLogRecPtr  endptr;
    2481             :     Size        nbytes;
    2482             : 
    2483             :     /* If requested switch the WAL sender to the stopping state. */
    2484           0 :     if (got_STOPPING)
    2485           0 :         WalSndSetState(WALSNDSTATE_STOPPING);
    2486             : 
    2487           0 :     if (streamingDoneSending)
    2488             :     {
    2489           0 :         WalSndCaughtUp = true;
    2490           0 :         return;
    2491             :     }
    2492             : 
    2493             :     /* Figure out how far we can safely send the WAL. */
    2494           0 :     if (sendTimeLineIsHistoric)
    2495             :     {
    2496             :         /*
    2497             :          * Streaming an old timeline that's in this server's history, but is
    2498             :          * not the one we're currently inserting or replaying. It can be
    2499             :          * streamed up to the point where we switched off that timeline.
    2500             :          */
    2501           0 :         SendRqstPtr = sendTimeLineValidUpto;
    2502             :     }
    2503           0 :     else if (am_cascading_walsender)
    2504             :     {
    2505             :         /*
    2506             :          * Streaming the latest timeline on a standby.
    2507             :          *
    2508             :          * Attempt to send all WAL that has already been replayed, so that we
    2509             :          * know it's valid. If we're receiving WAL through streaming
    2510             :          * replication, it's also OK to send any WAL that has been received
    2511             :          * but not replayed.
    2512             :          *
    2513             :          * The timeline we're recovering from can change, or we can be
    2514             :          * promoted. In either case, the current timeline becomes historic. We
    2515             :          * need to detect that so that we don't try to stream past the point
    2516             :          * where we switched to another timeline. We check for promotion or
    2517             :          * timeline switch after calculating FlushPtr, to avoid a race
    2518             :          * condition: if the timeline becomes historic just after we checked
    2519             :          * that it was still current, it's still be OK to stream it up to the
    2520             :          * FlushPtr that was calculated before it became historic.
    2521             :          */
    2522           0 :         bool        becameHistoric = false;
    2523             : 
    2524           0 :         SendRqstPtr = GetStandbyFlushRecPtr();
    2525             : 
    2526           0 :         if (!RecoveryInProgress())
    2527             :         {
    2528             :             /*
    2529             :              * We have been promoted. RecoveryInProgress() updated
    2530             :              * ThisTimeLineID to the new current timeline.
    2531             :              */
    2532           0 :             am_cascading_walsender = false;
    2533           0 :             becameHistoric = true;
    2534             :         }
    2535             :         else
    2536             :         {
    2537             :             /*
    2538             :              * Still a cascading standby. But is the timeline we're sending
    2539             :              * still the one recovery is recovering from? ThisTimeLineID was
    2540             :              * updated by the GetStandbyFlushRecPtr() call above.
    2541             :              */
    2542           0 :             if (sendTimeLine != ThisTimeLineID)
    2543           0 :                 becameHistoric = true;
    2544             :         }
    2545             : 
    2546           0 :         if (becameHistoric)
    2547             :         {
    2548             :             /*
    2549             :              * The timeline we were sending has become historic. Read the
    2550             :              * timeline history file of the new timeline to see where exactly
    2551             :              * we forked off from the timeline we were sending.
    2552             :              */
    2553             :             List       *history;
    2554             : 
    2555           0 :             history = readTimeLineHistory(ThisTimeLineID);
    2556           0 :             sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
    2557             : 
    2558           0 :             Assert(sendTimeLine < sendTimeLineNextTLI);
    2559           0 :             list_free_deep(history);
    2560             : 
    2561           0 :             sendTimeLineIsHistoric = true;
    2562             : 
    2563           0 :             SendRqstPtr = sendTimeLineValidUpto;
    2564             :         }
    2565             :     }
    2566             :     else
    2567             :     {
    2568             :         /*
    2569             :          * Streaming the current timeline on a master.
    2570             :          *
    2571             :          * Attempt to send all data that's already been written out and
    2572             :          * fsync'd to disk.  We cannot go further than what's been written out
    2573             :          * given the current implementation of XLogRead().  And in any case
    2574             :          * it's unsafe to send WAL that is not securely down to disk on the
    2575             :          * master: if the master subsequently crashes and restarts, standbys
    2576             :          * must not have applied any WAL that got lost on the master.
    2577             :          */
    2578           0 :         SendRqstPtr = GetFlushRecPtr();
    2579             :     }
    2580             : 
    2581             :     /*
    2582             :      * Record the current system time as an approximation of the time at which
    2583             :      * this WAL location was written for the purposes of lag tracking.
    2584             :      *
    2585             :      * In theory we could make XLogFlush() record a time in shmem whenever WAL
    2586             :      * is flushed and we could get that time as well as the LSN when we call
    2587             :      * GetFlushRecPtr() above (and likewise for the cascading standby
    2588             :      * equivalent), but rather than putting any new code into the hot WAL path
    2589             :      * it seems good enough to capture the time here.  We should reach this
    2590             :      * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
    2591             :      * may take some time, we read the WAL flush pointer and take the time
    2592             :      * very close to together here so that we'll get a later position if it is
    2593             :      * still moving.
    2594             :      *
    2595             :      * Because LagTrackerWriter ignores samples when the LSN hasn't advanced,
    2596             :      * this gives us a cheap approximation for the WAL flush time for this
    2597             :      * LSN.
    2598             :      *
    2599             :      * Note that the LSN is not necessarily the LSN for the data contained in
    2600             :      * the present message; it's the end of the WAL, which might be further
    2601             :      * ahead.  All the lag tracking machinery cares about is finding out when
    2602             :      * that arbitrary LSN is eventually reported as written, flushed and
    2603             :      * applied, so that it can measure the elapsed time.
    2604             :      */
    2605           0 :     LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
    2606             : 
    2607             :     /*
    2608             :      * If this is a historic timeline and we've reached the point where we
    2609             :      * forked to the next timeline, stop streaming.
    2610             :      *
    2611             :      * Note: We might already have sent WAL > sendTimeLineValidUpto. The
    2612             :      * startup process will normally replay all WAL that has been received
    2613             :      * from the master, before promoting, but if the WAL streaming is
    2614             :      * terminated at a WAL page boundary, the valid portion of the timeline
    2615             :      * might end in the middle of a WAL record. We might've already sent the
    2616             :      * first half of that partial WAL record to the cascading standby, so that
    2617             :      * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
    2618             :      * replay the partial WAL record either, so it can still follow our
    2619             :      * timeline switch.
    2620             :      */
    2621           0 :     if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
    2622             :     {
    2623             :         /* close the current file. */
    2624           0 :         if (sendFile >= 0)
    2625           0 :             close(sendFile);
    2626           0 :         sendFile = -1;
    2627             : 
    2628             :         /* Send CopyDone */
    2629           0 :         pq_putmessage_noblock('c', NULL, 0);
    2630           0 :         streamingDoneSending = true;
    2631             : 
    2632           0 :         WalSndCaughtUp = true;
    2633             : 
    2634           0 :         elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
    2635             :              (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto,
    2636             :              (uint32) (sentPtr >> 32), (uint32) sentPtr);
    2637           0 :         return;
    2638             :     }
    2639             : 
    2640             :     /* Do we have any work to do? */
    2641           0 :     Assert(sentPtr <= SendRqstPtr);
    2642           0 :     if (SendRqstPtr <= sentPtr)
    2643             :     {
    2644           0 :         WalSndCaughtUp = true;
    2645           0 :         return;
    2646             :     }
    2647             : 
    2648             :     /*
    2649             :      * Figure out how much to send in one message. If there's no more than
    2650             :      * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
    2651             :      * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
    2652             :      *
    2653             :      * The rounding is not only for performance reasons. Walreceiver relies on
    2654             :      * the fact that we never split a WAL record across two messages. Since a
    2655             :      * long WAL record is split at page boundary into continuation records,
    2656             :      * page boundary is always a safe cut-off point. We also assume that
    2657             :      * SendRqstPtr never points to the middle of a WAL record.
    2658             :      */
    2659           0 :     startptr = sentPtr;
    2660           0 :     endptr = startptr;
    2661           0 :     endptr += MAX_SEND_SIZE;
    2662             : 
    2663             :     /* if we went beyond SendRqstPtr, back off */
    2664           0 :     if (SendRqstPtr <= endptr)
    2665             :     {
    2666           0 :         endptr = SendRqstPtr;
    2667           0 :         if (sendTimeLineIsHistoric)
    2668           0 :             WalSndCaughtUp = false;
    2669             :         else
    2670           0 :             WalSndCaughtUp = true;
    2671             :     }
    2672             :     else
    2673             :     {
    2674             :         /* round down to page boundary. */
    2675           0 :         endptr -= (endptr % XLOG_BLCKSZ);
    2676           0 :         WalSndCaughtUp = false;
    2677             :     }
    2678             : 
    2679           0 :     nbytes = endptr - startptr;
    2680           0 :     Assert(nbytes <= MAX_SEND_SIZE);
    2681             : 
    2682             :     /*
    2683             :      * OK to read and send the slice.
    2684             :      */
    2685           0 :     resetStringInfo(&output_message);
    2686           0 :     pq_sendbyte(&output_message, 'w');
    2687             : 
    2688           0 :     pq_sendint64(&output_message, startptr);    /* dataStart */
    2689           0 :     pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
    2690           0 :     pq_sendint64(&output_message, 0);   /* sendtime, filled in last */
    2691             : 
    2692             :     /*
    2693             :      * Read the log directly into the output buffer to avoid extra memcpy
    2694             :      * calls.
    2695             :      */
    2696           0 :     enlargeStringInfo(&output_message, nbytes);
    2697           0 :     XLogRead(&output_message.data[output_message.len], startptr, nbytes);
    2698           0 :     output_message.len += nbytes;
    2699           0 :     output_message.data[output_message.len] = '\0';
    2700             : 
    2701             :     /*
    2702             :      * Fill the send timestamp last, so that it is taken as late as possible.
    2703             :      */
    2704           0 :     resetStringInfo(&tmpbuf);
    2705           0 :     pq_sendint64(&tmpbuf, GetCurrentTimestamp());
    2706           0 :     memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
    2707           0 :            tmpbuf.data, sizeof(int64));
    2708             : 
    2709           0 :     pq_putmessage_noblock('d', output_message.data, output_message.len);
    2710             : 
    2711           0 :     sentPtr = endptr;
    2712             : 
    2713             :     /* Update shared memory status */
    2714             :     {
    2715           0 :         WalSnd     *walsnd = MyWalSnd;
    2716             : 
    2717           0 :         SpinLockAcquire(&walsnd->mutex);
    2718           0 :         walsnd->sentPtr = sentPtr;
    2719           0 :         SpinLockRelease(&walsnd->mutex);
    2720             :     }
    2721             : 
    2722             :     /* Report progress of XLOG streaming in PS display */
    2723           0 :     if (update_process_title)
    2724             :     {
    2725             :         char        activitymsg[50];
    2726             : 
    2727           0 :         snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
    2728           0 :                  (uint32) (sentPtr >> 32), (uint32) sentPtr);
    2729           0 :         set_ps_display(activitymsg, false);
    2730             :     }
    2731             : 
    2732           0 :     return;
    2733             : }
    2734             : 
    2735             : /*
    2736             :  * Stream out logically decoded data.
    2737             :  */
    2738             : static void
    2739           0 : XLogSendLogical(void)
    2740             : {
    2741             :     XLogRecord *record;
    2742             :     char       *errm;
    2743             : 
    2744             :     /*
    2745             :      * Don't know whether we've caught up yet. We'll set it to true in
    2746             :      * WalSndWaitForWal, if we're actually waiting. We also set to true if
    2747             :      * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
    2748             :      * i.e. when we're shutting down.
    2749             :      */
    2750           0 :     WalSndCaughtUp = false;
    2751             : 
    2752           0 :     record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
    2753           0 :     logical_startptr = InvalidXLogRecPtr;
    2754             : 
    2755             :     /* xlog record was invalid */
    2756           0 :     if (errm != NULL)
    2757           0 :         elog(ERROR, "%s", errm);
    2758             : 
    2759           0 :     if (record != NULL)
    2760             :     {
    2761             :         /*
    2762             :          * Note the lack of any call to LagTrackerWrite() which is handled by
    2763             :          * WalSndUpdateProgress which is called by output plugin through
    2764             :          * logical decoding write api.
    2765             :          */
    2766           0 :         LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
    2767             : 
    2768           0 :         sentPtr = logical_decoding_ctx->reader->EndRecPtr;
    2769             :     }
    2770             :     else
    2771             :     {
    2772             :         /*
    2773             :          * If the record we just wanted read is at or beyond the flushed
    2774             :          * point, then we're caught up.
    2775             :          */
    2776           0 :         if (logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr())
    2777             :         {
    2778           0 :             WalSndCaughtUp = true;
    2779             : 
    2780             :             /*
    2781             :              * Have WalSndLoop() terminate the connection in an orderly
    2782             :              * manner, after writing out all the pending data.
    2783             :              */
    2784           0 :             if (got_STOPPING)
    2785           0 :                 got_SIGUSR2 = true;
    2786             :         }
    2787             :     }
    2788             : 
    2789             :     /* Update shared memory status */
    2790             :     {
    2791           0 :         WalSnd     *walsnd = MyWalSnd;
    2792             : 
    2793           0 :         SpinLockAcquire(&walsnd->mutex);
    2794           0 :         walsnd->sentPtr = sentPtr;
    2795           0 :         SpinLockRelease(&walsnd->mutex);
    2796             :     }
    2797           0 : }
    2798             : 
    2799             : /*
    2800             :  * Shutdown if the sender is caught up.
    2801             :  *
    2802             :  * NB: This should only be called when the shutdown signal has been received
    2803             :  * from postmaster.
    2804             :  *
    2805             :  * Note that if we determine that there's still more data to send, this
    2806             :  * function will return control to the caller.
    2807             :  */
    2808             : static void
    2809           0 : WalSndDone(WalSndSendDataCallback send_data)
    2810             : {
    2811             :     XLogRecPtr  replicatedPtr;
    2812             : 
    2813             :     /* ... let's just be real sure we're caught up ... */
    2814           0 :     send_data();
    2815             : 
    2816             :     /*
    2817             :      * To figure out whether all WAL has successfully been replicated, check
    2818             :      * flush location if valid, write otherwise. Tools like pg_receivewal will
    2819             :      * usually (unless in synchronous mode) return an invalid flush location.
    2820             :      */
    2821           0 :     replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
    2822           0 :         MyWalSnd->write : MyWalSnd->flush;
    2823             : 
    2824           0 :     if (WalSndCaughtUp && sentPtr == replicatedPtr &&
    2825           0 :         !pq_is_send_pending())
    2826             :     {
    2827             :         /* Inform the standby that XLOG streaming is done */
    2828           0 :         EndCommand("COPY 0", DestRemote);
    2829           0 :         pq_flush();
    2830             : 
    2831           0 :         proc_exit(0);
    2832             :     }
    2833           0 :     if (!waiting_for_ping_response)
    2834             :     {
    2835           0 :         WalSndKeepalive(true);
    2836           0 :         waiting_for_ping_response = true;
    2837             :     }
    2838           0 : }
    2839             : 
    2840             : /*
    2841             :  * Returns the latest point in WAL that has been safely flushed to disk, and
    2842             :  * can be sent to the standby. This should only be called when in recovery,
    2843             :  * ie. we're streaming to a cascaded standby.
    2844             :  *
    2845             :  * As a side-effect, ThisTimeLineID is updated to the TLI of the last
    2846             :  * replayed WAL record.
    2847             :  */
    2848             : static XLogRecPtr
    2849           0 : GetStandbyFlushRecPtr(void)
    2850             : {
    2851             :     XLogRecPtr  replayPtr;
    2852             :     TimeLineID  replayTLI;
    2853             :     XLogRecPtr  receivePtr;
    2854             :     TimeLineID  receiveTLI;
    2855             :     XLogRecPtr  result;
    2856             : 
    2857             :     /*
    2858             :      * We can safely send what's already been replayed. Also, if walreceiver
    2859             :      * is streaming WAL from the same timeline, we can send anything that it
    2860             :      * has streamed, but hasn't been replayed yet.
    2861             :      */
    2862             : 
    2863           0 :     receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
    2864           0 :     replayPtr = GetXLogReplayRecPtr(&replayTLI);
    2865             : 
    2866           0 :     ThisTimeLineID = replayTLI;
    2867             : 
    2868           0 :     result = replayPtr;
    2869           0 :     if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
    2870           0 :         result = receivePtr;
    2871             : 
    2872           0 :     return result;
    2873             : }
    2874             : 
    2875             : /*
    2876             :  * Request walsenders to reload the currently-open WAL file
    2877             :  */
    2878             : void
    2879           0 : WalSndRqstFileReload(void)
    2880             : {
    2881             :     int         i;
    2882             : 
    2883           0 :     for (i = 0; i < max_wal_senders; i++)
    2884             :     {
    2885           0 :         WalSnd     *walsnd = &WalSndCtl->walsnds[i];
    2886             : 
    2887           0 :         SpinLockAcquire(&walsnd->mutex);
    2888           0 :         if (walsnd->pid == 0)
    2889             :         {
    2890           0 :             SpinLockRelease(&walsnd->mutex);
    2891           0 :             continue;
    2892             :         }
    2893           0 :         walsnd->needreload = true;
    2894           0 :         SpinLockRelease(&walsnd->mutex);
    2895             :     }
    2896           0 : }
    2897             : 
    2898             : /*
    2899             :  * Handle PROCSIG_WALSND_INIT_STOPPING signal.
    2900             :  */
    2901             : void
    2902           0 : HandleWalSndInitStopping(void)
    2903             : {
    2904           0 :     Assert(am_walsender);
    2905             : 
    2906             :     /*
    2907             :      * If replication has not yet started, die like with SIGTERM. If
    2908             :      * replication is active, only set a flag and wake up the main loop. It
    2909             :      * will send any outstanding WAL, wait for it to be replicated to the
    2910             :      * standby, and then exit gracefully.
    2911             :      */
    2912           0 :     if (!replication_active)
    2913           0 :         kill(MyProcPid, SIGTERM);
    2914             :     else
    2915           0 :         got_STOPPING = true;
    2916           0 : }
    2917             : 
    2918             : /*
    2919             :  * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
    2920             :  * sender should already have been switched to WALSNDSTATE_STOPPING at
    2921             :  * this point.
    2922             :  */
    2923             : static void
    2924           0 : WalSndLastCycleHandler(SIGNAL_ARGS)
    2925             : {
    2926           0 :     int         save_errno = errno;
    2927             : 
    2928           0 :     got_SIGUSR2 = true;
    2929           0 :     SetLatch(MyLatch);
    2930             : 
    2931           0 :     errno = save_errno;
    2932           0 : }
    2933             : 
    2934             : /* Set up signal handlers */
    2935             : void
    2936           0 : WalSndSignals(void)
    2937             : {
    2938             :     /* Set up signal handlers */
    2939           0 :     pqsignal(SIGHUP, PostgresSigHupHandler);    /* set flag to read config
    2940             :                                                  * file */
    2941           0 :     pqsignal(SIGINT, StatementCancelHandler);   /* query cancel */
    2942           0 :     pqsignal(SIGTERM, die);     /* request shutdown */
    2943           0 :     pqsignal(SIGQUIT, quickdie);    /* hard crash time */
    2944           0 :     InitializeTimeouts();       /* establishes SIGALRM handler */
    2945           0 :     pqsignal(SIGPIPE, SIG_IGN);
    2946           0 :     pqsignal(SIGUSR1, procsignal_sigusr1_handler);
    2947           0 :     pqsignal(SIGUSR2, WalSndLastCycleHandler);  /* request a last cycle and
    2948             :                                                  * shutdown */
    2949             : 
    2950             :     /* Reset some signals that are accepted by postmaster but not here */
    2951           0 :     pqsignal(SIGCHLD, SIG_DFL);
    2952           0 :     pqsignal(SIGTTIN, SIG_DFL);
    2953           0 :     pqsignal(SIGTTOU, SIG_DFL);
    2954           0 :     pqsignal(SIGCONT, SIG_DFL);
    2955           0 :     pqsignal(SIGWINCH, SIG_DFL);
    2956           0 : }
    2957             : 
    2958             : /* Report shared-memory space needed by WalSndShmemInit */
    2959             : Size
    2960          15 : WalSndShmemSize(void)
    2961             : {
    2962          15 :     Size        size = 0;
    2963             : 
    2964          15 :     size = offsetof(WalSndCtlData, walsnds);
    2965          15 :     size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
    2966             : 
    2967          15 :     return size;
    2968             : }
    2969             : 
    2970             : /* Allocate and initialize walsender-related shared memory */
    2971             : void
    2972           5 : WalSndShmemInit(void)
    2973             : {
    2974             :     bool        found;
    2975             :     int         i;
    2976             : 
    2977           5 :     WalSndCtl = (WalSndCtlData *)
    2978           5 :         ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
    2979             : 
    2980           5 :     if (!found)
    2981             :     {
    2982             :         /* First time through, so initialize */
    2983           5 :         MemSet(WalSndCtl, 0, WalSndShmemSize());
    2984             : 
    2985          20 :         for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
    2986          15 :             SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
    2987             : 
    2988          55 :         for (i = 0; i < max_wal_senders; i++)
    2989             :         {
    2990          50 :             WalSnd     *walsnd = &WalSndCtl->walsnds[i];
    2991             : 
    2992          50 :             SpinLockInit(&walsnd->mutex);
    2993             :         }
    2994             :     }
    2995           5 : }
    2996             : 
    2997             : /*
    2998             :  * Wake up all walsenders
    2999             :  *
    3000             :  * This will be called inside critical sections, so throwing an error is not
    3001             :  * advisable.
    3002             :  */
    3003             : void
    3004        9609 : WalSndWakeup(void)
    3005             : {
    3006             :     int         i;
    3007             : 
    3008      105699 :     for (i = 0; i < max_wal_senders; i++)
    3009             :     {
    3010             :         Latch      *latch;
    3011       96090 :         WalSnd     *walsnd = &WalSndCtl->walsnds[i];
    3012             : 
    3013             :         /*
    3014             :          * Get latch pointer with spinlock held, for the unlikely case that
    3015             :          * pointer reads aren't atomic (as they're 8 bytes).
    3016             :          */
    3017       96090 :         SpinLockAcquire(&walsnd->mutex);
    3018       96090 :         latch = walsnd->latch;
    3019       96090 :         SpinLockRelease(&walsnd->mutex);
    3020             : 
    3021       96090 :         if (latch != NULL)
    3022           0 :             SetLatch(latch);
    3023             :     }
    3024        9609 : }
    3025             : 
    3026             : /*
    3027             :  * Signal all walsenders to move to stopping state.
    3028             :  *
    3029             :  * This will trigger walsenders to move to a state where no further WAL can be
    3030             :  * generated. See this file's header for details.
    3031             :  */
    3032             : void
    3033           3 : WalSndInitStopping(void)
    3034             : {
    3035             :     int         i;
    3036             : 
    3037          33 :     for (i = 0; i < max_wal_senders; i++)
    3038             :     {
    3039          30 :         WalSnd     *walsnd = &WalSndCtl->walsnds[i];
    3040             :         pid_t       pid;
    3041             : 
    3042          30 :         SpinLockAcquire(&walsnd->mutex);
    3043          30 :         pid = walsnd->pid;
    3044          30 :         SpinLockRelease(&walsnd->mutex);
    3045             : 
    3046          30 :         if (pid == 0)
    3047          30 :             continue;
    3048             : 
    3049           0 :         SendProcSignal(pid, PROCSIG_WALSND_INIT_STOPPING, InvalidBackendId);
    3050             :     }
    3051           3 : }
    3052             : 
    3053             : /*
    3054             :  * Wait that all the WAL senders have quit or reached the stopping state. This
    3055             :  * is used by the checkpointer to control when the shutdown checkpoint can
    3056             :  * safely be performed.
    3057             :  */
    3058             : void
    3059           3 : WalSndWaitStopping(void)
    3060             : {
    3061             :     for (;;)
    3062             :     {
    3063             :         int         i;
    3064           3 :         bool        all_stopped = true;
    3065             : 
    3066          33 :         for (i = 0; i < max_wal_senders; i++)
    3067             :         {
    3068          30 :             WalSnd     *walsnd = &WalSndCtl->walsnds[i];
    3069             : 
    3070          30 :             SpinLockAcquire(&walsnd->mutex);
    3071             : 
    3072          30 :             if (walsnd->pid == 0)
    3073             :             {
    3074          30 :                 SpinLockRelease(&walsnd->mutex);
    3075          30 :                 continue;
    3076             :             }
    3077             : 
    3078           0 :             if (walsnd->state != WALSNDSTATE_STOPPING)
    3079             :             {
    3080           0 :                 all_stopped = false;
    3081           0 :                 SpinLockRelease(&walsnd->mutex);
    3082           0 :                 break;
    3083             :             }
    3084           0 :             SpinLockRelease(&walsnd->mutex);
    3085             :         }
    3086             : 
    3087             :         /* safe to leave if confirmation is done for all WAL senders */
    3088           3 :         if (all_stopped)
    3089           6 :             return;
    3090             : 
    3091           0 :         pg_usleep(10000L);      /* wait for 10 msec */
    3092           0 :     }
    3093             : }
    3094             : 
    3095             : /* Set state for current walsender (only called in walsender) */
    3096             : void
    3097           0 : WalSndSetState(WalSndState state)
    3098             : {
    3099           0 :     WalSnd     *walsnd = MyWalSnd;
    3100             : 
    3101           0 :     Assert(am_walsender);
    3102             : 
    3103           0 :     if (walsnd->state == state)
    3104           0 :         return;
    3105             : 
    3106           0 :     SpinLockAcquire(&walsnd->mutex);
    3107           0 :     walsnd->state = state;
    3108           0 :     SpinLockRelease(&walsnd->mutex);
    3109             : }
    3110             : 
    3111             : /*
    3112             :  * Return a string constant representing the state. This is used
    3113             :  * in system views, and should *not* be translated.
    3114             :  */
    3115             : static const char *
    3116           0 : WalSndGetStateString(WalSndState state)
    3117             : {
    3118           0 :     switch (state)
    3119             :     {
    3120             :         case WALSNDSTATE_STARTUP:
    3121           0 :             return "startup";
    3122             :         case WALSNDSTATE_BACKUP:
    3123           0 :             return "backup";
    3124             :         case WALSNDSTATE_CATCHUP:
    3125           0 :             return "catchup";
    3126             :         case WALSNDSTATE_STREAMING:
    3127           0 :             return "streaming";
    3128             :         case WALSNDSTATE_STOPPING:
    3129           0 :             return "stopping";
    3130             :     }
    3131           0 :     return "UNKNOWN";
    3132             : }
    3133             : 
    3134             : static Interval *
    3135           0 : offset_to_interval(TimeOffset offset)
    3136             : {
    3137           0 :     Interval   *result = palloc(sizeof(Interval));
    3138             : 
    3139           0 :     result->month = 0;
    3140           0 :     result->day = 0;
    3141           0 :     result->time = offset;
    3142             : 
    3143           0 :     return result;
    3144             : }
    3145             : 
    3146             : /*
    3147             :  * Returns activity of walsenders, including pids and xlog locations sent to
    3148             :  * standby servers.
    3149             :  */
    3150             : Datum
    3151           0 : pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
    3152             : {
    3153             : #define PG_STAT_GET_WAL_SENDERS_COLS    11
    3154           0 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    3155             :     TupleDesc   tupdesc;
    3156             :     Tuplestorestate *tupstore;
    3157             :     MemoryContext per_query_ctx;
    3158             :     MemoryContext oldcontext;
    3159             :     List       *sync_standbys;
    3160             :     int         i;
    3161             : 
    3162             :     /* check to see if caller supports us returning a tuplestore */
    3163           0 :     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
    3164           0 :         ereport(ERROR,
    3165             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3166             :                  errmsg("set-valued function called in context that cannot accept a set")));
    3167           0 :     if (!(rsinfo->allowedModes & SFRM_Materialize))
    3168           0 :         ereport(ERROR,
    3169             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3170             :                  errmsg("materialize mode required, but it is not " \
    3171             :                         "allowed in this context")));
    3172             : 
    3173             :     /* Build a tuple descriptor for our result type */
    3174           0 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
    3175           0 :         elog(ERROR, "return type must be a row type");
    3176             : 
    3177           0 :     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
    3178           0 :     oldcontext = MemoryContextSwitchTo(per_query_ctx);
    3179             : 
    3180           0 :     tupstore = tuplestore_begin_heap(true, false, work_mem);
    3181           0 :     rsinfo->returnMode = SFRM_Materialize;
    3182           0 :     rsinfo->setResult = tupstore;
    3183           0 :     rsinfo->setDesc = tupdesc;
    3184             : 
    3185           0 :     MemoryContextSwitchTo(oldcontext);
    3186             : 
    3187             :     /*
    3188             :      * Get the currently active synchronous standbys.
    3189             :      */
    3190           0 :     LWLockAcquire(SyncRepLock, LW_SHARED);
    3191           0 :     sync_standbys = SyncRepGetSyncStandbys(NULL);
    3192           0 :     LWLockRelease(SyncRepLock);
    3193             : 
    3194           0 :     for (i = 0; i < max_wal_senders; i++)
    3195             :     {
    3196           0 :         WalSnd     *walsnd = &WalSndCtl->walsnds[i];
    3197             :         XLogRecPtr  sentPtr;
    3198             :         XLogRecPtr  write;
    3199             :         XLogRecPtr  flush;
    3200             :         XLogRecPtr  apply;
    3201             :         TimeOffset  writeLag;
    3202             :         TimeOffset  flushLag;
    3203             :         TimeOffset  applyLag;
    3204             :         int         priority;
    3205             :         int         pid;
    3206             :         WalSndState state;
    3207             :         Datum       values[PG_STAT_GET_WAL_SENDERS_COLS];
    3208             :         bool        nulls[PG_STAT_GET_WAL_SENDERS_COLS];
    3209             : 
    3210           0 :         SpinLockAcquire(&walsnd->mutex);
    3211           0 :         if (walsnd->pid == 0)
    3212             :         {
    3213           0 :             SpinLockRelease(&walsnd->mutex);
    3214           0 :             continue;
    3215             :         }
    3216           0 :         pid = walsnd->pid;
    3217           0 :         sentPtr = walsnd->sentPtr;
    3218           0 :         state = walsnd->state;
    3219           0 :         write = walsnd->write;
    3220           0 :         flush = walsnd->flush;
    3221           0 :         apply = walsnd->apply;
    3222           0 :         writeLag = walsnd->writeLag;
    3223           0 :         flushLag = walsnd->flushLag;
    3224           0 :         applyLag = walsnd->applyLag;
    3225           0 :         priority = walsnd->sync_standby_priority;
    3226           0 :         SpinLockRelease(&walsnd->mutex);
    3227             : 
    3228           0 :         memset(nulls, 0, sizeof(nulls));
    3229           0 :         values[0] = Int32GetDatum(pid);
    3230             : 
    3231           0 :         if (!superuser())
    3232             :         {
    3233             :             /*
    3234             :              * Only superusers can see details. Other users only get the pid
    3235             :              * value to know it's a walsender, but no details.
    3236             :              */
    3237           0 :             MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
    3238             :         }
    3239             :         else
    3240             :         {
    3241           0 :             values[1] = CStringGetTextDatum(WalSndGetStateString(state));
    3242             : 
    3243           0 :             if (XLogRecPtrIsInvalid(sentPtr))
    3244           0 :                 nulls[2] = true;
    3245           0 :             values[2] = LSNGetDatum(sentPtr);
    3246             : 
    3247           0 :             if (XLogRecPtrIsInvalid(write))
    3248           0 :                 nulls[3] = true;
    3249           0 :             values[3] = LSNGetDatum(write);
    3250             : 
    3251           0 :             if (XLogRecPtrIsInvalid(flush))
    3252           0 :                 nulls[4] = true;
    3253           0 :             values[4] = LSNGetDatum(flush);
    3254             : 
    3255           0 :             if (XLogRecPtrIsInvalid(apply))
    3256           0 :                 nulls[5] = true;
    3257           0 :             values[5] = LSNGetDatum(apply);
    3258             : 
    3259             :             /*
    3260             :              * Treat a standby such as a pg_basebackup background process
    3261             :              * which always returns an invalid flush location, as an
    3262             :              * asynchronous standby.
    3263             :              */
    3264           0 :             priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
    3265             : 
    3266           0 :             if (writeLag < 0)
    3267           0 :                 nulls[6] = true;
    3268             :             else
    3269           0 :                 values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
    3270             : 
    3271           0 :             if (flushLag < 0)
    3272           0 :                 nulls[7] = true;
    3273             :             else
    3274           0 :                 values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
    3275             : 
    3276           0 :             if (applyLag < 0)
    3277           0 :                 nulls[8] = true;
    3278             :             else
    3279           0 :                 values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
    3280             : 
    3281           0 :             values[9] = Int32GetDatum(priority);
    3282             : 
    3283             :             /*
    3284             :              * More easily understood version of standby state. This is purely
    3285             :              * informational.
    3286             :              *
    3287             :              * In quorum-based sync replication, the role of each standby
    3288             :              * listed in synchronous_standby_names can be changing very
    3289             :              * frequently. Any standbys considered as "sync" at one moment can
    3290             :              * be switched to "potential" ones at the next moment. So, it's
    3291             :              * basically useless to report "sync" or "potential" as their sync
    3292             :              * states. We report just "quorum" for them.
    3293             :              */
    3294           0 :             if (priority == 0)
    3295           0 :                 values[10] = CStringGetTextDatum("async");
    3296           0 :             else if (list_member_int(sync_standbys, i))
    3297           0 :                 values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
    3298           0 :                     CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
    3299             :             else
    3300           0 :                 values[10] = CStringGetTextDatum("potential");
    3301             :         }
    3302             : 
    3303           0 :         tuplestore_putvalues(tupstore, tupdesc, values, nulls);
    3304             :     }
    3305             : 
    3306             :     /* clean up and return the tuplestore */
    3307             :     tuplestore_donestoring(tupstore);
    3308             : 
    3309           0 :     return (Datum) 0;
    3310             : }
    3311             : 
    3312             : /*
    3313             :   * This function is used to send a keepalive message to standby.
    3314             :   * If requestReply is set, sets a flag in the message requesting the standby
    3315             :   * to send a message back to us, for heartbeat purposes.
    3316             :   */
    3317             : static void
    3318           0 : WalSndKeepalive(bool requestReply)
    3319             : {
    3320           0 :     elog(DEBUG2, "sending replication keepalive");
    3321             : 
    3322             :     /* construct the message... */
    3323           0 :     resetStringInfo(&output_message);
    3324           0 :     pq_sendbyte(&output_message, 'k');
    3325           0 :     pq_sendint64(&output_message, sentPtr);
    3326           0 :     pq_sendint64(&output_message, GetCurrentTimestamp());
    3327           0 :     pq_sendbyte(&output_message, requestReply ? 1 : 0);
    3328             : 
    3329             :     /* ... and send it wrapped in CopyData */
    3330           0 :     pq_putmessage_noblock('d', output_message.data, output_message.len);
    3331           0 : }
    3332             : 
    3333             : /*
    3334             :  * Send keepalive message if too much time has elapsed.
    3335             :  */
    3336             : static void
    3337           0 : WalSndKeepaliveIfNecessary(TimestampTz now)
    3338             : {
    3339             :     TimestampTz ping_time;
    3340             : 
    3341             :     /*
    3342             :      * Don't send keepalive messages if timeouts are globally disabled or
    3343             :      * we're doing something not partaking in timeouts.
    3344             :      */
    3345           0 :     if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
    3346           0 :         return;
    3347             : 
    3348           0 :     if (waiting_for_ping_response)
    3349           0 :         return;
    3350             : 
    3351             :     /*
    3352             :      * If half of wal_sender_timeout has lapsed without receiving any reply
    3353             :      * from the standby, send a keep-alive message to the standby requesting
    3354             :      * an immediate reply.
    3355             :      */
    3356           0 :     ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
    3357             :                                             wal_sender_timeout / 2);
    3358           0 :     if (now >= ping_time)
    3359             :     {
    3360           0 :         WalSndKeepalive(true);
    3361           0 :         waiting_for_ping_response = true;
    3362             : 
    3363             :         /* Try to flush pending output to the client */
    3364           0 :         if (pq_flush_if_writable() != 0)
    3365           0 :             WalSndShutdown();
    3366             :     }
    3367             : }
    3368             : 
    3369             : /*
    3370             :  * Record the end of the WAL and the time it was flushed locally, so that
    3371             :  * LagTrackerRead can compute the elapsed time (lag) when this WAL location is
    3372             :  * eventually reported to have been written, flushed and applied by the
    3373             :  * standby in a reply message.
    3374             :  */
    3375             : static void
    3376           0 : LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
    3377             : {
    3378             :     bool        buffer_full;
    3379             :     int         new_write_head;
    3380             :     int         i;
    3381             : 
    3382           0 :     if (!am_walsender)
    3383           0 :         return;
    3384             : 
    3385             :     /*
    3386             :      * If the lsn hasn't advanced since last time, then do nothing.  This way
    3387             :      * we only record a new sample when new WAL has been written.
    3388             :      */
    3389           0 :     if (LagTracker.last_lsn == lsn)
    3390           0 :         return;
    3391           0 :     LagTracker.last_lsn = lsn;
    3392             : 
    3393             :     /*
    3394             :      * If advancing the write head of the circular buffer would crash into any
    3395             :      * of the read heads, then the buffer is full.  In other words, the
    3396             :      * slowest reader (presumably apply) is the one that controls the release
    3397             :      * of space.
    3398             :      */
    3399           0 :     new_write_head = (LagTracker.write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
    3400           0 :     buffer_full = false;
    3401           0 :     for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
    3402             :     {
    3403           0 :         if (new_write_head == LagTracker.read_heads[i])
    3404           0 :             buffer_full = true;
    3405             :     }
    3406             : 
    3407             :     /*
    3408             :      * If the buffer is full, for now we just rewind by one slot and overwrite
    3409             :      * the last sample, as a simple (if somewhat uneven) way to lower the
    3410             :      * sampling rate.  There may be better adaptive compaction algorithms.
    3411             :      */
    3412           0 :     if (buffer_full)
    3413             :     {
    3414           0 :         new_write_head = LagTracker.write_head;
    3415           0 :         if (LagTracker.write_head > 0)
    3416           0 :             LagTracker.write_head--;
    3417             :         else
    3418           0 :             LagTracker.write_head = LAG_TRACKER_BUFFER_SIZE - 1;
    3419             :     }
    3420             : 
    3421             :     /* Store a sample at the current write head position. */
    3422           0 :     LagTracker.buffer[LagTracker.write_head].lsn = lsn;
    3423           0 :     LagTracker.buffer[LagTracker.write_head].time = local_flush_time;
    3424           0 :     LagTracker.write_head = new_write_head;
    3425             : }
    3426             : 
    3427             : /*
    3428             :  * Find out how much time has elapsed between the moment WAL location 'lsn'
    3429             :  * (or the highest known earlier LSN) was flushed locally and the time 'now'.
    3430             :  * We have a separate read head for each of the reported LSN locations we
    3431             :  * receive in replies from standby; 'head' controls which read head is
    3432             :  * used.  Whenever a read head crosses an LSN which was written into the
    3433             :  * lag buffer with LagTrackerWrite, we can use the associated timestamp to
    3434             :  * find out the time this LSN (or an earlier one) was flushed locally, and
    3435             :  * therefore compute the lag.
    3436             :  *
    3437             :  * Return -1 if no new sample data is available, and otherwise the elapsed
    3438             :  * time in microseconds.
    3439             :  */
    3440             : static TimeOffset
    3441           0 : LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
    3442             : {
    3443           0 :     TimestampTz time = 0;
    3444             : 
    3445             :     /* Read all unread samples up to this LSN or end of buffer. */
    3446           0 :     while (LagTracker.read_heads[head] != LagTracker.write_head &&
    3447           0 :            LagTracker.buffer[LagTracker.read_heads[head]].lsn <= lsn)
    3448             :     {
    3449           0 :         time = LagTracker.buffer[LagTracker.read_heads[head]].time;
    3450           0 :         LagTracker.last_read[head] =
    3451           0 :             LagTracker.buffer[LagTracker.read_heads[head]];
    3452           0 :         LagTracker.read_heads[head] =
    3453           0 :             (LagTracker.read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
    3454             :     }
    3455             : 
    3456             :     /*
    3457             :      * If the lag tracker is empty, that means the standby has processed
    3458             :      * everything we've ever sent so we should now clear 'last_read'.  If we
    3459             :      * didn't do that, we'd risk using a stale and irrelevant sample for
    3460             :      * interpolation at the beginning of the next burst of WAL after a period
    3461             :      * of idleness.
    3462             :      */
    3463           0 :     if (LagTracker.read_heads[head] == LagTracker.write_head)
    3464           0 :         LagTracker.last_read[head].time = 0;
    3465             : 
    3466           0 :     if (time > now)
    3467             :     {
    3468             :         /* If the clock somehow went backwards, treat as not found. */
    3469           0 :         return -1;
    3470             :     }
    3471           0 :     else if (time == 0)
    3472             :     {
    3473             :         /*
    3474             :          * We didn't cross a time.  If there is a future sample that we
    3475             :          * haven't reached yet, and we've already reached at least one sample,
    3476             :          * let's interpolate the local flushed time.  This is mainly useful
    3477             :          * for reporting a completely stuck apply position as having
    3478             :          * increasing lag, since otherwise we'd have to wait for it to
    3479             :          * eventually start moving again and cross one of our samples before
    3480             :          * we can show the lag increasing.
    3481             :          */
    3482           0 :         if (LagTracker.read_heads[head] == LagTracker.write_head)
    3483             :         {
    3484             :             /* There are no future samples, so we can't interpolate. */
    3485           0 :             return -1;
    3486             :         }
    3487           0 :         else if (LagTracker.last_read[head].time != 0)
    3488             :         {
    3489             :             /* We can interpolate between last_read and the next sample. */
    3490             :             double      fraction;
    3491           0 :             WalTimeSample prev = LagTracker.last_read[head];
    3492           0 :             WalTimeSample next = LagTracker.buffer[LagTracker.read_heads[head]];
    3493             : 
    3494           0 :             if (lsn < prev.lsn)
    3495             :             {
    3496             :                 /*
    3497             :                  * Reported LSNs shouldn't normally go backwards, but it's
    3498             :                  * possible when there is a timeline change.  Treat as not
    3499             :                  * found.
    3500             :                  */
    3501           0 :                 return -1;
    3502             :             }
    3503             : 
    3504           0 :             Assert(prev.lsn < next.lsn);
    3505             : 
    3506           0 :             if (prev.time > next.time)
    3507             :             {
    3508             :                 /* If the clock somehow went backwards, treat as not found. */
    3509           0 :                 return -1;
    3510             :             }
    3511             : 
    3512             :             /* See how far we are between the previous and next samples. */
    3513           0 :             fraction =
    3514           0 :                 (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
    3515             : 
    3516             :             /* Scale the local flush time proportionally. */
    3517           0 :             time = (TimestampTz)
    3518           0 :                 ((double) prev.time + (next.time - prev.time) * fraction);
    3519             :         }
    3520             :         else
    3521             :         {
    3522             :             /*
    3523             :              * We have only a future sample, implying that we were entirely
    3524             :              * caught up but and now there is a new burst of WAL and the
    3525             :              * standby hasn't processed the first sample yet.  Until the
    3526             :              * standby reaches the future sample the best we can do is report
    3527             :              * the hypothetical lag if that sample were to be replayed now.
    3528             :              */
    3529           0 :             time = LagTracker.buffer[LagTracker.read_heads[head]].time;
    3530             :         }
    3531             :     }
    3532             : 
    3533             :     /* Return the elapsed time since local flush time in microseconds. */
    3534           0 :     Assert(time != 0);
    3535           0 :     return now - time;
    3536             : }

Generated by: LCOV version 1.11