Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * walsender.c
4 : *
5 : * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6 : * care of sending XLOG from the primary server to a single recipient.
7 : * (Note that there can be more than one walsender process concurrently.)
8 : * It is started by the postmaster when the walreceiver of a standby server
9 : * connects to the primary server and requests XLOG streaming replication.
10 : *
11 : * A walsender is similar to a regular backend, ie. there is a one-to-one
12 : * relationship between a connection and a walsender process, but instead
13 : * of processing SQL queries, it understands a small set of special
14 : * replication-mode commands. The START_REPLICATION command begins streaming
15 : * WAL to the client. While streaming, the walsender keeps reading XLOG
16 : * records from the disk and sends them to the standby server over the
17 : * COPY protocol, until either side ends the replication by exiting COPY
18 : * mode (or until the connection is closed).
19 : *
20 : * Normal termination is by SIGTERM, which instructs the walsender to
21 : * close the connection and exit(0) at the next convenient moment. Emergency
22 : * termination is by SIGQUIT; like any backend, the walsender will simply
23 : * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24 : * are treated as not a crash but approximately normal termination;
25 : * the walsender will exit quickly without sending any more XLOG records.
26 : *
27 : * If the server is shut down, checkpointer sends us
28 : * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If
29 : * the backend is idle or runs an SQL query this causes the backend to
30 : * shutdown, if logical replication is in progress all existing WAL records
31 : * are processed followed by a shutdown. Otherwise this causes the walsender
32 : * to switch to the "stopping" state. In this state, the walsender will reject
33 : * any further replication commands. The checkpointer begins the shutdown
34 : * checkpoint once all walsenders are confirmed as stopping. When the shutdown
35 : * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
36 : * walsender to send any outstanding WAL, including the shutdown checkpoint
37 : * record, wait for it to be replicated to the standby, and then exit.
38 : *
39 : *
40 : * Portions Copyright (c) 2010-2017, PostgreSQL Global Development Group
41 : *
42 : * IDENTIFICATION
43 : * src/backend/replication/walsender.c
44 : *
45 : *-------------------------------------------------------------------------
46 : */
47 : #include "postgres.h"
48 :
49 : #include <signal.h>
50 : #include <unistd.h>
51 :
52 : #include "access/printtup.h"
53 : #include "access/timeline.h"
54 : #include "access/transam.h"
55 : #include "access/xact.h"
56 : #include "access/xlog_internal.h"
57 : #include "access/xlogutils.h"
58 :
59 : #include "catalog/pg_type.h"
60 : #include "commands/dbcommands.h"
61 : #include "commands/defrem.h"
62 : #include "funcapi.h"
63 : #include "libpq/libpq.h"
64 : #include "libpq/pqformat.h"
65 : #include "miscadmin.h"
66 : #include "nodes/replnodes.h"
67 : #include "pgstat.h"
68 : #include "replication/basebackup.h"
69 : #include "replication/decode.h"
70 : #include "replication/logical.h"
71 : #include "replication/logicalfuncs.h"
72 : #include "replication/slot.h"
73 : #include "replication/snapbuild.h"
74 : #include "replication/syncrep.h"
75 : #include "replication/walreceiver.h"
76 : #include "replication/walsender.h"
77 : #include "replication/walsender_private.h"
78 : #include "storage/condition_variable.h"
79 : #include "storage/fd.h"
80 : #include "storage/ipc.h"
81 : #include "storage/pmsignal.h"
82 : #include "storage/proc.h"
83 : #include "storage/procarray.h"
84 : #include "tcop/dest.h"
85 : #include "tcop/tcopprot.h"
86 : #include "utils/builtins.h"
87 : #include "utils/guc.h"
88 : #include "utils/memutils.h"
89 : #include "utils/pg_lsn.h"
90 : #include "utils/portal.h"
91 : #include "utils/ps_status.h"
92 : #include "utils/resowner.h"
93 : #include "utils/timeout.h"
94 : #include "utils/timestamp.h"
95 :
96 : /*
97 : * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
98 : *
99 : * We don't have a good idea of what a good value would be; there's some
100 : * overhead per message in both walsender and walreceiver, but on the other
101 : * hand sending large batches makes walsender less responsive to signals
102 : * because signals are checked only between messages. 128kB (with
103 : * default 8k blocks) seems like a reasonable guess for now.
104 : */
105 : #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
106 :
107 : /* Array of WalSnds in shared memory */
108 : WalSndCtlData *WalSndCtl = NULL;
109 :
110 : /* My slot in the shared memory array */
111 : WalSnd *MyWalSnd = NULL;
112 :
113 : /* Global state */
114 : bool am_walsender = false; /* Am I a walsender process? */
115 : bool am_cascading_walsender = false; /* Am I cascading WAL to another
116 : * standby? */
117 : bool am_db_walsender = false; /* Connected to a database? */
118 :
119 : /* User-settable parameters for walsender */
120 : int max_wal_senders = 0; /* the maximum number of concurrent
121 : * walsenders */
122 : int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
123 : * data message */
124 : bool log_replication_commands = false;
125 :
126 : /*
127 : * State for WalSndWakeupRequest
128 : */
129 : bool wake_wal_senders = false;
130 :
131 : /*
132 : * These variables are used similarly to openLogFile/SegNo/Off,
133 : * but for walsender to read the XLOG.
134 : */
135 : static int sendFile = -1;
136 : static XLogSegNo sendSegNo = 0;
137 : static uint32 sendOff = 0;
138 :
139 : /* Timeline ID of the currently open file */
140 : static TimeLineID curFileTimeLine = 0;
141 :
142 : /*
143 : * These variables keep track of the state of the timeline we're currently
144 : * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
145 : * the timeline is not the latest timeline on this server, and the server's
146 : * history forked off from that timeline at sendTimeLineValidUpto.
147 : */
148 : static TimeLineID sendTimeLine = 0;
149 : static TimeLineID sendTimeLineNextTLI = 0;
150 : static bool sendTimeLineIsHistoric = false;
151 : static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;
152 :
153 : /*
154 : * How far have we sent WAL already? This is also advertised in
155 : * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
156 : */
157 : static XLogRecPtr sentPtr = 0;
158 :
159 : /* Buffers for constructing outgoing messages and processing reply messages. */
160 : static StringInfoData output_message;
161 : static StringInfoData reply_message;
162 : static StringInfoData tmpbuf;
163 :
164 : /*
165 : * Timestamp of the last receipt of the reply from the standby. Set to 0 if
166 : * wal_sender_timeout doesn't need to be active.
167 : */
168 : static TimestampTz last_reply_timestamp = 0;
169 :
170 : /* Have we sent a heartbeat message asking for reply, since last reply? */
171 : static bool waiting_for_ping_response = false;
172 :
173 : /*
174 : * While streaming WAL in Copy mode, streamingDoneSending is set to true
175 : * after we have sent CopyDone. We should not send any more CopyData messages
176 : * after that. streamingDoneReceiving is set to true when we receive CopyDone
177 : * from the other end. When both become true, it's time to exit Copy mode.
178 : */
179 : static bool streamingDoneSending;
180 : static bool streamingDoneReceiving;
181 :
182 : /* Are we there yet? */
183 : static bool WalSndCaughtUp = false;
184 :
185 : /* Flags set by signal handlers for later service in main loop */
186 : static volatile sig_atomic_t got_SIGUSR2 = false;
187 : static volatile sig_atomic_t got_STOPPING = false;
188 :
189 : /*
190 : * This is set while we are streaming. When not set
191 : * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
192 : * the main loop is responsible for checking got_STOPPING and terminating when
193 : * it's set (after streaming any remaining WAL).
194 : */
195 : static volatile sig_atomic_t replication_active = false;
196 :
197 : static LogicalDecodingContext *logical_decoding_ctx = NULL;
198 : static XLogRecPtr logical_startptr = InvalidXLogRecPtr;
199 :
200 : /* A sample associating a WAL location with the time it was written. */
201 : typedef struct
202 : {
203 : XLogRecPtr lsn;
204 : TimestampTz time;
205 : } WalTimeSample;
206 :
207 : /* The size of our buffer of time samples. */
208 : #define LAG_TRACKER_BUFFER_SIZE 8192
209 :
210 : /* A mechanism for tracking replication lag. */
211 : static struct
212 : {
213 : XLogRecPtr last_lsn;
214 : WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE];
215 : int write_head;
216 : int read_heads[NUM_SYNC_REP_WAIT_MODE];
217 : WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE];
218 : } LagTracker;
219 :
220 : /* Signal handlers */
221 : static void WalSndLastCycleHandler(SIGNAL_ARGS);
222 :
223 : /* Prototypes for private functions */
224 : typedef void (*WalSndSendDataCallback) (void);
225 : static void WalSndLoop(WalSndSendDataCallback send_data);
226 : static void InitWalSenderSlot(void);
227 : static void WalSndKill(int code, Datum arg);
228 : static void WalSndShutdown(void) pg_attribute_noreturn();
229 : static void XLogSendPhysical(void);
230 : static void XLogSendLogical(void);
231 : static void WalSndDone(WalSndSendDataCallback send_data);
232 : static XLogRecPtr GetStandbyFlushRecPtr(void);
233 : static void IdentifySystem(void);
234 : static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
235 : static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
236 : static void StartReplication(StartReplicationCmd *cmd);
237 : static void StartLogicalReplication(StartReplicationCmd *cmd);
238 : static void ProcessStandbyMessage(void);
239 : static void ProcessStandbyReplyMessage(void);
240 : static void ProcessStandbyHSFeedbackMessage(void);
241 : static void ProcessRepliesIfAny(void);
242 : static void WalSndKeepalive(bool requestReply);
243 : static void WalSndKeepaliveIfNecessary(TimestampTz now);
244 : static void WalSndCheckTimeOut(TimestampTz now);
245 : static long WalSndComputeSleeptime(TimestampTz now);
246 : static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
247 : static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
248 : static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
249 : static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
250 : static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
251 : static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
252 : static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
253 :
254 : static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
255 :
256 :
257 : /* Initialize walsender process before entering the main command loop */
258 : void
259 0 : InitWalSender(void)
260 : {
261 0 : am_cascading_walsender = RecoveryInProgress();
262 :
263 : /* Create a per-walsender data structure in shared memory */
264 0 : InitWalSenderSlot();
265 :
266 : /* Set up resource owner */
267 0 : CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
268 :
269 : /*
270 : * Let postmaster know that we're a WAL sender. Once we've declared us as
271 : * a WAL sender process, postmaster will let us outlive the bgwriter and
272 : * kill us last in the shutdown sequence, so we get a chance to stream all
273 : * remaining WAL at shutdown, including the shutdown checkpoint. Note that
274 : * there's no going back, and we mustn't write any WAL records after this.
275 : */
276 0 : MarkPostmasterChildWalSender();
277 0 : SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
278 :
279 : /* Initialize empty timestamp buffer for lag tracking. */
280 0 : memset(&LagTracker, 0, sizeof(LagTracker));
281 0 : }
282 :
283 : /*
284 : * Clean up after an error.
285 : *
286 : * WAL sender processes don't use transactions like regular backends do.
287 : * This function does any cleanup required after an error in a WAL sender
288 : * process, similar to what transaction abort does in a regular backend.
289 : */
290 : void
291 0 : WalSndErrorCleanup(void)
292 : {
293 0 : LWLockReleaseAll();
294 0 : ConditionVariableCancelSleep();
295 0 : pgstat_report_wait_end();
296 :
297 0 : if (sendFile >= 0)
298 : {
299 0 : close(sendFile);
300 0 : sendFile = -1;
301 : }
302 :
303 0 : if (MyReplicationSlot != NULL)
304 0 : ReplicationSlotRelease();
305 :
306 0 : ReplicationSlotCleanup();
307 :
308 0 : replication_active = false;
309 :
310 0 : if (got_STOPPING || got_SIGUSR2)
311 0 : proc_exit(0);
312 :
313 : /* Revert back to startup state */
314 0 : WalSndSetState(WALSNDSTATE_STARTUP);
315 0 : }
316 :
317 : /*
318 : * Handle a client's connection abort in an orderly manner.
319 : */
320 : static void
321 0 : WalSndShutdown(void)
322 : {
323 : /*
324 : * Reset whereToSendOutput to prevent ereport from attempting to send any
325 : * more messages to the standby.
326 : */
327 0 : if (whereToSendOutput == DestRemote)
328 0 : whereToSendOutput = DestNone;
329 :
330 0 : proc_exit(0);
331 : abort(); /* keep the compiler quiet */
332 : }
333 :
334 : /*
335 : * Handle the IDENTIFY_SYSTEM command.
336 : */
337 : static void
338 0 : IdentifySystem(void)
339 : {
340 : char sysid[32];
341 : char xloc[MAXFNAMELEN];
342 : XLogRecPtr logptr;
343 0 : char *dbname = NULL;
344 : DestReceiver *dest;
345 : TupOutputState *tstate;
346 : TupleDesc tupdesc;
347 : Datum values[4];
348 : bool nulls[4];
349 :
350 : /*
351 : * Reply with a result set with one row, four columns. First col is system
352 : * ID, second is timeline ID, third is current xlog location and the
353 : * fourth contains the database name if we are connected to one.
354 : */
355 :
356 0 : snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
357 : GetSystemIdentifier());
358 :
359 0 : am_cascading_walsender = RecoveryInProgress();
360 0 : if (am_cascading_walsender)
361 : {
362 : /* this also updates ThisTimeLineID */
363 0 : logptr = GetStandbyFlushRecPtr();
364 : }
365 : else
366 0 : logptr = GetFlushRecPtr();
367 :
368 0 : snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
369 :
370 0 : if (MyDatabaseId != InvalidOid)
371 : {
372 0 : MemoryContext cur = CurrentMemoryContext;
373 :
374 : /* syscache access needs a transaction env. */
375 0 : StartTransactionCommand();
376 : /* make dbname live outside TX context */
377 0 : MemoryContextSwitchTo(cur);
378 0 : dbname = get_database_name(MyDatabaseId);
379 0 : CommitTransactionCommand();
380 : /* CommitTransactionCommand switches to TopMemoryContext */
381 0 : MemoryContextSwitchTo(cur);
382 : }
383 :
384 0 : dest = CreateDestReceiver(DestRemoteSimple);
385 0 : MemSet(nulls, false, sizeof(nulls));
386 :
387 : /* need a tuple descriptor representing four columns */
388 0 : tupdesc = CreateTemplateTupleDesc(4, false);
389 0 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
390 : TEXTOID, -1, 0);
391 0 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
392 : INT4OID, -1, 0);
393 0 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
394 : TEXTOID, -1, 0);
395 0 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
396 : TEXTOID, -1, 0);
397 :
398 : /* prepare for projection of tuples */
399 0 : tstate = begin_tup_output_tupdesc(dest, tupdesc);
400 :
401 : /* column 1: system identifier */
402 0 : values[0] = CStringGetTextDatum(sysid);
403 :
404 : /* column 2: timeline */
405 0 : values[1] = Int32GetDatum(ThisTimeLineID);
406 :
407 : /* column 3: wal location */
408 0 : values[2] = CStringGetTextDatum(xloc);
409 :
410 : /* column 4: database name, or NULL if none */
411 0 : if (dbname)
412 0 : values[3] = CStringGetTextDatum(dbname);
413 : else
414 0 : nulls[3] = true;
415 :
416 : /* send it to dest */
417 0 : do_tup_output(tstate, values, nulls);
418 :
419 0 : end_tup_output(tstate);
420 0 : }
421 :
422 :
423 : /*
424 : * Handle TIMELINE_HISTORY command.
425 : */
426 : static void
427 0 : SendTimeLineHistory(TimeLineHistoryCmd *cmd)
428 : {
429 : StringInfoData buf;
430 : char histfname[MAXFNAMELEN];
431 : char path[MAXPGPATH];
432 : int fd;
433 : off_t histfilelen;
434 : off_t bytesleft;
435 : Size len;
436 :
437 : /*
438 : * Reply with a result set with one row, and two columns. The first col is
439 : * the name of the history file, 2nd is the contents.
440 : */
441 :
442 0 : TLHistoryFileName(histfname, cmd->timeline);
443 0 : TLHistoryFilePath(path, cmd->timeline);
444 :
445 : /* Send a RowDescription message */
446 0 : pq_beginmessage(&buf, 'T');
447 0 : pq_sendint(&buf, 2, 2); /* 2 fields */
448 :
449 : /* first field */
450 0 : pq_sendstring(&buf, "filename"); /* col name */
451 0 : pq_sendint(&buf, 0, 4); /* table oid */
452 0 : pq_sendint(&buf, 0, 2); /* attnum */
453 0 : pq_sendint(&buf, TEXTOID, 4); /* type oid */
454 0 : pq_sendint(&buf, -1, 2); /* typlen */
455 0 : pq_sendint(&buf, 0, 4); /* typmod */
456 0 : pq_sendint(&buf, 0, 2); /* format code */
457 :
458 : /* second field */
459 0 : pq_sendstring(&buf, "content"); /* col name */
460 0 : pq_sendint(&buf, 0, 4); /* table oid */
461 0 : pq_sendint(&buf, 0, 2); /* attnum */
462 0 : pq_sendint(&buf, BYTEAOID, 4); /* type oid */
463 0 : pq_sendint(&buf, -1, 2); /* typlen */
464 0 : pq_sendint(&buf, 0, 4); /* typmod */
465 0 : pq_sendint(&buf, 0, 2); /* format code */
466 0 : pq_endmessage(&buf);
467 :
468 : /* Send a DataRow message */
469 0 : pq_beginmessage(&buf, 'D');
470 0 : pq_sendint(&buf, 2, 2); /* # of columns */
471 0 : len = strlen(histfname);
472 0 : pq_sendint(&buf, len, 4); /* col1 len */
473 0 : pq_sendbytes(&buf, histfname, len);
474 :
475 0 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666);
476 0 : if (fd < 0)
477 0 : ereport(ERROR,
478 : (errcode_for_file_access(),
479 : errmsg("could not open file \"%s\": %m", path)));
480 :
481 : /* Determine file length and send it to client */
482 0 : histfilelen = lseek(fd, 0, SEEK_END);
483 0 : if (histfilelen < 0)
484 0 : ereport(ERROR,
485 : (errcode_for_file_access(),
486 : errmsg("could not seek to end of file \"%s\": %m", path)));
487 0 : if (lseek(fd, 0, SEEK_SET) != 0)
488 0 : ereport(ERROR,
489 : (errcode_for_file_access(),
490 : errmsg("could not seek to beginning of file \"%s\": %m", path)));
491 :
492 0 : pq_sendint(&buf, histfilelen, 4); /* col2 len */
493 :
494 0 : bytesleft = histfilelen;
495 0 : while (bytesleft > 0)
496 : {
497 : char rbuf[BLCKSZ];
498 : int nread;
499 :
500 0 : pgstat_report_wait_start(WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ);
501 0 : nread = read(fd, rbuf, sizeof(rbuf));
502 0 : pgstat_report_wait_end();
503 0 : if (nread <= 0)
504 0 : ereport(ERROR,
505 : (errcode_for_file_access(),
506 : errmsg("could not read file \"%s\": %m",
507 : path)));
508 0 : pq_sendbytes(&buf, rbuf, nread);
509 0 : bytesleft -= nread;
510 : }
511 0 : CloseTransientFile(fd);
512 :
513 0 : pq_endmessage(&buf);
514 0 : }
515 :
516 : /*
517 : * Handle START_REPLICATION command.
518 : *
519 : * At the moment, this never returns, but an ereport(ERROR) will take us back
520 : * to the main loop.
521 : */
522 : static void
523 0 : StartReplication(StartReplicationCmd *cmd)
524 : {
525 : StringInfoData buf;
526 : XLogRecPtr FlushPtr;
527 :
528 0 : if (ThisTimeLineID == 0)
529 0 : ereport(ERROR,
530 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
531 : errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
532 :
533 : /*
534 : * We assume here that we're logging enough information in the WAL for
535 : * log-shipping, since this is checked in PostmasterMain().
536 : *
537 : * NOTE: wal_level can only change at shutdown, so in most cases it is
538 : * difficult for there to be WAL data that we can still see that was
539 : * written at wal_level='minimal'.
540 : */
541 :
542 0 : if (cmd->slotname)
543 : {
544 0 : ReplicationSlotAcquire(cmd->slotname, true);
545 0 : if (SlotIsLogical(MyReplicationSlot))
546 0 : ereport(ERROR,
547 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
548 : (errmsg("cannot use a logical replication slot for physical replication"))));
549 : }
550 :
551 : /*
552 : * Select the timeline. If it was given explicitly by the client, use
553 : * that. Otherwise use the timeline of the last replayed record, which is
554 : * kept in ThisTimeLineID.
555 : */
556 0 : if (am_cascading_walsender)
557 : {
558 : /* this also updates ThisTimeLineID */
559 0 : FlushPtr = GetStandbyFlushRecPtr();
560 : }
561 : else
562 0 : FlushPtr = GetFlushRecPtr();
563 :
564 0 : if (cmd->timeline != 0)
565 : {
566 : XLogRecPtr switchpoint;
567 :
568 0 : sendTimeLine = cmd->timeline;
569 0 : if (sendTimeLine == ThisTimeLineID)
570 : {
571 0 : sendTimeLineIsHistoric = false;
572 0 : sendTimeLineValidUpto = InvalidXLogRecPtr;
573 : }
574 : else
575 : {
576 : List *timeLineHistory;
577 :
578 0 : sendTimeLineIsHistoric = true;
579 :
580 : /*
581 : * Check that the timeline the client requested exists, and the
582 : * requested start location is on that timeline.
583 : */
584 0 : timeLineHistory = readTimeLineHistory(ThisTimeLineID);
585 0 : switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
586 : &sendTimeLineNextTLI);
587 0 : list_free_deep(timeLineHistory);
588 :
589 : /*
590 : * Found the requested timeline in the history. Check that
591 : * requested startpoint is on that timeline in our history.
592 : *
593 : * This is quite loose on purpose. We only check that we didn't
594 : * fork off the requested timeline before the switchpoint. We
595 : * don't check that we switched *to* it before the requested
596 : * starting point. This is because the client can legitimately
597 : * request to start replication from the beginning of the WAL
598 : * segment that contains switchpoint, but on the new timeline, so
599 : * that it doesn't end up with a partial segment. If you ask for
600 : * too old a starting point, you'll get an error later when we
601 : * fail to find the requested WAL segment in pg_wal.
602 : *
603 : * XXX: we could be more strict here and only allow a startpoint
604 : * that's older than the switchpoint, if it's still in the same
605 : * WAL segment.
606 : */
607 0 : if (!XLogRecPtrIsInvalid(switchpoint) &&
608 0 : switchpoint < cmd->startpoint)
609 : {
610 0 : ereport(ERROR,
611 : (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
612 : (uint32) (cmd->startpoint >> 32),
613 : (uint32) (cmd->startpoint),
614 : cmd->timeline),
615 : errdetail("This server's history forked from timeline %u at %X/%X.",
616 : cmd->timeline,
617 : (uint32) (switchpoint >> 32),
618 : (uint32) (switchpoint))));
619 : }
620 0 : sendTimeLineValidUpto = switchpoint;
621 : }
622 : }
623 : else
624 : {
625 0 : sendTimeLine = ThisTimeLineID;
626 0 : sendTimeLineValidUpto = InvalidXLogRecPtr;
627 0 : sendTimeLineIsHistoric = false;
628 : }
629 :
630 0 : streamingDoneSending = streamingDoneReceiving = false;
631 :
632 : /* If there is nothing to stream, don't even enter COPY mode */
633 0 : if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
634 : {
635 : /*
636 : * When we first start replication the standby will be behind the
637 : * primary. For some applications, for example synchronous
638 : * replication, it is important to have a clear state for this initial
639 : * catchup mode, so we can trigger actions when we change streaming
640 : * state later. We may stay in this state for a long time, which is
641 : * exactly why we want to be able to monitor whether or not we are
642 : * still here.
643 : */
644 0 : WalSndSetState(WALSNDSTATE_CATCHUP);
645 :
646 : /* Send a CopyBothResponse message, and start streaming */
647 0 : pq_beginmessage(&buf, 'W');
648 0 : pq_sendbyte(&buf, 0);
649 0 : pq_sendint(&buf, 0, 2);
650 0 : pq_endmessage(&buf);
651 0 : pq_flush();
652 :
653 : /*
654 : * Don't allow a request to stream from a future point in WAL that
655 : * hasn't been flushed to disk in this server yet.
656 : */
657 0 : if (FlushPtr < cmd->startpoint)
658 : {
659 0 : ereport(ERROR,
660 : (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
661 : (uint32) (cmd->startpoint >> 32),
662 : (uint32) (cmd->startpoint),
663 : (uint32) (FlushPtr >> 32),
664 : (uint32) (FlushPtr))));
665 : }
666 :
667 : /* Start streaming from the requested point */
668 0 : sentPtr = cmd->startpoint;
669 :
670 : /* Initialize shared memory status, too */
671 0 : SpinLockAcquire(&MyWalSnd->mutex);
672 0 : MyWalSnd->sentPtr = sentPtr;
673 0 : SpinLockRelease(&MyWalSnd->mutex);
674 :
675 0 : SyncRepInitConfig();
676 :
677 : /* Main loop of walsender */
678 0 : replication_active = true;
679 :
680 0 : WalSndLoop(XLogSendPhysical);
681 :
682 0 : replication_active = false;
683 0 : if (got_STOPPING)
684 0 : proc_exit(0);
685 0 : WalSndSetState(WALSNDSTATE_STARTUP);
686 :
687 0 : Assert(streamingDoneSending && streamingDoneReceiving);
688 : }
689 :
690 0 : if (cmd->slotname)
691 0 : ReplicationSlotRelease();
692 :
693 : /*
694 : * Copy is finished now. Send a single-row result set indicating the next
695 : * timeline.
696 : */
697 0 : if (sendTimeLineIsHistoric)
698 : {
699 : char startpos_str[8 + 1 + 8 + 1];
700 : DestReceiver *dest;
701 : TupOutputState *tstate;
702 : TupleDesc tupdesc;
703 : Datum values[2];
704 : bool nulls[2];
705 :
706 0 : snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
707 0 : (uint32) (sendTimeLineValidUpto >> 32),
708 : (uint32) sendTimeLineValidUpto);
709 :
710 0 : dest = CreateDestReceiver(DestRemoteSimple);
711 0 : MemSet(nulls, false, sizeof(nulls));
712 :
713 : /*
714 : * Need a tuple descriptor representing two columns. int8 may seem
715 : * like a surprising data type for this, but in theory int4 would not
716 : * be wide enough for this, as TimeLineID is unsigned.
717 : */
718 0 : tupdesc = CreateTemplateTupleDesc(2, false);
719 0 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
720 : INT8OID, -1, 0);
721 0 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
722 : TEXTOID, -1, 0);
723 :
724 : /* prepare for projection of tuple */
725 0 : tstate = begin_tup_output_tupdesc(dest, tupdesc);
726 :
727 0 : values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
728 0 : values[1] = CStringGetTextDatum(startpos_str);
729 :
730 : /* send it to dest */
731 0 : do_tup_output(tstate, values, nulls);
732 :
733 0 : end_tup_output(tstate);
734 : }
735 :
736 : /* Send CommandComplete message */
737 0 : pq_puttextmessage('C', "START_STREAMING");
738 0 : }
739 :
740 : /*
741 : * read_page callback for logical decoding contexts, as a walsender process.
742 : *
743 : * Inside the walsender we can do better than logical_read_local_xlog_page,
744 : * which has to do a plain sleep/busy loop, because the walsender's latch gets
745 : * set every time WAL is flushed.
746 : */
747 : static int
748 0 : logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
749 : XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
750 : {
751 : XLogRecPtr flushptr;
752 : int count;
753 :
754 0 : XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
755 0 : sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID);
756 0 : sendTimeLine = state->currTLI;
757 0 : sendTimeLineValidUpto = state->currTLIValidUntil;
758 0 : sendTimeLineNextTLI = state->nextTLI;
759 :
760 : /* make sure we have enough WAL available */
761 0 : flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
762 :
763 : /* fail if not (implies we are going to shut down) */
764 0 : if (flushptr < targetPagePtr + reqLen)
765 0 : return -1;
766 :
767 0 : if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
768 0 : count = XLOG_BLCKSZ; /* more than one block available */
769 : else
770 0 : count = flushptr - targetPagePtr; /* part of the page available */
771 :
772 : /* now actually read the data, we know it's there */
773 0 : XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
774 :
775 0 : return count;
776 : }
777 :
778 : /*
779 : * Process extra options given to CREATE_REPLICATION_SLOT.
780 : */
781 : static void
782 0 : parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
783 : bool *reserve_wal,
784 : CRSSnapshotAction *snapshot_action)
785 : {
786 : ListCell *lc;
787 0 : bool snapshot_action_given = false;
788 0 : bool reserve_wal_given = false;
789 :
790 : /* Parse options */
791 0 : foreach(lc, cmd->options)
792 : {
793 0 : DefElem *defel = (DefElem *) lfirst(lc);
794 :
795 0 : if (strcmp(defel->defname, "export_snapshot") == 0)
796 : {
797 0 : if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
798 0 : ereport(ERROR,
799 : (errcode(ERRCODE_SYNTAX_ERROR),
800 : errmsg("conflicting or redundant options")));
801 :
802 0 : snapshot_action_given = true;
803 0 : *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
804 : CRS_NOEXPORT_SNAPSHOT;
805 : }
806 0 : else if (strcmp(defel->defname, "use_snapshot") == 0)
807 : {
808 0 : if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
809 0 : ereport(ERROR,
810 : (errcode(ERRCODE_SYNTAX_ERROR),
811 : errmsg("conflicting or redundant options")));
812 :
813 0 : snapshot_action_given = true;
814 0 : *snapshot_action = CRS_USE_SNAPSHOT;
815 : }
816 0 : else if (strcmp(defel->defname, "reserve_wal") == 0)
817 : {
818 0 : if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
819 0 : ereport(ERROR,
820 : (errcode(ERRCODE_SYNTAX_ERROR),
821 : errmsg("conflicting or redundant options")));
822 :
823 0 : reserve_wal_given = true;
824 0 : *reserve_wal = true;
825 : }
826 : else
827 0 : elog(ERROR, "unrecognized option: %s", defel->defname);
828 : }
829 0 : }
830 :
831 : /*
832 : * Create a new replication slot.
833 : */
834 : static void
835 0 : CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
836 : {
837 0 : const char *snapshot_name = NULL;
838 : char xloc[MAXFNAMELEN];
839 : char *slot_name;
840 0 : bool reserve_wal = false;
841 0 : CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
842 : DestReceiver *dest;
843 : TupOutputState *tstate;
844 : TupleDesc tupdesc;
845 : Datum values[4];
846 : bool nulls[4];
847 :
848 0 : Assert(!MyReplicationSlot);
849 :
850 0 : parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
851 :
852 : /* setup state for XLogReadPage */
853 0 : sendTimeLineIsHistoric = false;
854 0 : sendTimeLine = ThisTimeLineID;
855 :
856 0 : if (cmd->kind == REPLICATION_KIND_PHYSICAL)
857 : {
858 0 : ReplicationSlotCreate(cmd->slotname, false,
859 0 : cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT);
860 : }
861 : else
862 : {
863 0 : CheckLogicalDecodingRequirements();
864 :
865 : /*
866 : * Initially create persistent slot as ephemeral - that allows us to
867 : * nicely handle errors during initialization because it'll get
868 : * dropped if this transaction fails. We'll make it persistent at the
869 : * end. Temporary slots can be created as temporary from beginning as
870 : * they get dropped on error as well.
871 : */
872 0 : ReplicationSlotCreate(cmd->slotname, true,
873 0 : cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL);
874 : }
875 :
876 0 : if (cmd->kind == REPLICATION_KIND_LOGICAL)
877 : {
878 : LogicalDecodingContext *ctx;
879 0 : bool need_full_snapshot = false;
880 :
881 : /*
882 : * Do options check early so that we can bail before calling the
883 : * DecodingContextFindStartpoint which can take long time.
884 : */
885 0 : if (snapshot_action == CRS_EXPORT_SNAPSHOT)
886 : {
887 0 : if (IsTransactionBlock())
888 0 : ereport(ERROR,
889 : (errmsg("CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT "
890 : "must not be called inside a transaction")));
891 :
892 0 : need_full_snapshot = true;
893 : }
894 0 : else if (snapshot_action == CRS_USE_SNAPSHOT)
895 : {
896 0 : if (!IsTransactionBlock())
897 0 : ereport(ERROR,
898 : (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
899 : "must be called inside a transaction")));
900 :
901 0 : if (XactIsoLevel != XACT_REPEATABLE_READ)
902 0 : ereport(ERROR,
903 : (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
904 : "must be called in REPEATABLE READ isolation mode transaction")));
905 :
906 0 : if (FirstSnapshotSet)
907 0 : ereport(ERROR,
908 : (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
909 : "must be called before any query")));
910 :
911 0 : if (IsSubTransaction())
912 0 : ereport(ERROR,
913 : (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
914 : "must not be called in a subtransaction")));
915 :
916 0 : need_full_snapshot = true;
917 : }
918 :
919 0 : ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
920 : logical_read_xlog_page,
921 : WalSndPrepareWrite, WalSndWriteData,
922 : WalSndUpdateProgress);
923 :
924 : /*
925 : * Signal that we don't need the timeout mechanism. We're just
926 : * creating the replication slot and don't yet accept feedback
927 : * messages or send keepalives. As we possibly need to wait for
928 : * further WAL the walsender would otherwise possibly be killed too
929 : * soon.
930 : */
931 0 : last_reply_timestamp = 0;
932 :
933 : /* build initial snapshot, might take a while */
934 0 : DecodingContextFindStartpoint(ctx);
935 :
936 : /*
937 : * Export or use the snapshot if we've been asked to do so.
938 : *
939 : * NB. We will convert the snapbuild.c kind of snapshot to normal
940 : * snapshot when doing this.
941 : */
942 0 : if (snapshot_action == CRS_EXPORT_SNAPSHOT)
943 : {
944 0 : snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
945 : }
946 0 : else if (snapshot_action == CRS_USE_SNAPSHOT)
947 : {
948 : Snapshot snap;
949 :
950 0 : snap = SnapBuildInitialSnapshot(ctx->snapshot_builder);
951 0 : RestoreTransactionSnapshot(snap, MyProc);
952 : }
953 :
954 : /* don't need the decoding context anymore */
955 0 : FreeDecodingContext(ctx);
956 :
957 0 : if (!cmd->temporary)
958 0 : ReplicationSlotPersist();
959 : }
960 0 : else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
961 : {
962 0 : ReplicationSlotReserveWal();
963 :
964 0 : ReplicationSlotMarkDirty();
965 :
966 : /* Write this slot to disk if it's a permanent one. */
967 0 : if (!cmd->temporary)
968 0 : ReplicationSlotSave();
969 : }
970 :
971 0 : snprintf(xloc, sizeof(xloc), "%X/%X",
972 0 : (uint32) (MyReplicationSlot->data.confirmed_flush >> 32),
973 0 : (uint32) MyReplicationSlot->data.confirmed_flush);
974 :
975 0 : dest = CreateDestReceiver(DestRemoteSimple);
976 0 : MemSet(nulls, false, sizeof(nulls));
977 :
978 : /*----------
979 : * Need a tuple descriptor representing four columns:
980 : * - first field: the slot name
981 : * - second field: LSN at which we became consistent
982 : * - third field: exported snapshot's name
983 : * - fourth field: output plugin
984 : *----------
985 : */
986 0 : tupdesc = CreateTemplateTupleDesc(4, false);
987 0 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
988 : TEXTOID, -1, 0);
989 0 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
990 : TEXTOID, -1, 0);
991 0 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
992 : TEXTOID, -1, 0);
993 0 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
994 : TEXTOID, -1, 0);
995 :
996 : /* prepare for projection of tuples */
997 0 : tstate = begin_tup_output_tupdesc(dest, tupdesc);
998 :
999 : /* slot_name */
1000 0 : slot_name = NameStr(MyReplicationSlot->data.name);
1001 0 : values[0] = CStringGetTextDatum(slot_name);
1002 :
1003 : /* consistent wal location */
1004 0 : values[1] = CStringGetTextDatum(xloc);
1005 :
1006 : /* snapshot name, or NULL if none */
1007 0 : if (snapshot_name != NULL)
1008 0 : values[2] = CStringGetTextDatum(snapshot_name);
1009 : else
1010 0 : nulls[2] = true;
1011 :
1012 : /* plugin, or NULL if none */
1013 0 : if (cmd->plugin != NULL)
1014 0 : values[3] = CStringGetTextDatum(cmd->plugin);
1015 : else
1016 0 : nulls[3] = true;
1017 :
1018 : /* send it to dest */
1019 0 : do_tup_output(tstate, values, nulls);
1020 0 : end_tup_output(tstate);
1021 :
1022 0 : ReplicationSlotRelease();
1023 0 : }
1024 :
1025 : /*
1026 : * Get rid of a replication slot that is no longer wanted.
1027 : */
1028 : static void
1029 0 : DropReplicationSlot(DropReplicationSlotCmd *cmd)
1030 : {
1031 0 : ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1032 0 : EndCommand("DROP_REPLICATION_SLOT", DestRemote);
1033 0 : }
1034 :
1035 : /*
1036 : * Load previously initiated logical slot and prepare for sending data (via
1037 : * WalSndLoop).
1038 : */
1039 : static void
1040 0 : StartLogicalReplication(StartReplicationCmd *cmd)
1041 : {
1042 : StringInfoData buf;
1043 :
1044 : /* make sure that our requirements are still fulfilled */
1045 0 : CheckLogicalDecodingRequirements();
1046 :
1047 0 : Assert(!MyReplicationSlot);
1048 :
1049 0 : ReplicationSlotAcquire(cmd->slotname, true);
1050 :
1051 : /*
1052 : * Force a disconnect, so that the decoding code doesn't need to care
1053 : * about an eventual switch from running in recovery, to running in a
1054 : * normal environment. Client code is expected to handle reconnects.
1055 : */
1056 0 : if (am_cascading_walsender && !RecoveryInProgress())
1057 : {
1058 0 : ereport(LOG,
1059 : (errmsg("terminating walsender process after promotion")));
1060 0 : got_STOPPING = true;
1061 : }
1062 :
1063 0 : WalSndSetState(WALSNDSTATE_CATCHUP);
1064 :
1065 : /* Send a CopyBothResponse message, and start streaming */
1066 0 : pq_beginmessage(&buf, 'W');
1067 0 : pq_sendbyte(&buf, 0);
1068 0 : pq_sendint(&buf, 0, 2);
1069 0 : pq_endmessage(&buf);
1070 0 : pq_flush();
1071 :
1072 : /*
1073 : * Initialize position to the last ack'ed one, then the xlog records begin
1074 : * to be shipped from that position.
1075 : */
1076 0 : logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options,
1077 : logical_read_xlog_page,
1078 : WalSndPrepareWrite,
1079 : WalSndWriteData,
1080 : WalSndUpdateProgress);
1081 :
1082 : /* Start reading WAL from the oldest required WAL. */
1083 0 : logical_startptr = MyReplicationSlot->data.restart_lsn;
1084 :
1085 : /*
1086 : * Report the location after which we'll send out further commits as the
1087 : * current sentPtr.
1088 : */
1089 0 : sentPtr = MyReplicationSlot->data.confirmed_flush;
1090 :
1091 : /* Also update the sent position status in shared memory */
1092 0 : SpinLockAcquire(&MyWalSnd->mutex);
1093 0 : MyWalSnd->sentPtr = MyReplicationSlot->data.restart_lsn;
1094 0 : SpinLockRelease(&MyWalSnd->mutex);
1095 :
1096 0 : replication_active = true;
1097 :
1098 0 : SyncRepInitConfig();
1099 :
1100 : /* Main loop of walsender */
1101 0 : WalSndLoop(XLogSendLogical);
1102 :
1103 0 : FreeDecodingContext(logical_decoding_ctx);
1104 0 : ReplicationSlotRelease();
1105 :
1106 0 : replication_active = false;
1107 0 : if (got_STOPPING)
1108 0 : proc_exit(0);
1109 0 : WalSndSetState(WALSNDSTATE_STARTUP);
1110 :
1111 : /* Get out of COPY mode (CommandComplete). */
1112 0 : EndCommand("COPY 0", DestRemote);
1113 0 : }
1114 :
1115 : /*
1116 : * LogicalDecodingContext 'prepare_write' callback.
1117 : *
1118 : * Prepare a write into a StringInfo.
1119 : *
1120 : * Don't do anything lasting in here, it's quite possible that nothing will be done
1121 : * with the data.
1122 : */
1123 : static void
1124 0 : WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
1125 : {
1126 : /* can't have sync rep confused by sending the same LSN several times */
1127 0 : if (!last_write)
1128 0 : lsn = InvalidXLogRecPtr;
1129 :
1130 0 : resetStringInfo(ctx->out);
1131 :
1132 0 : pq_sendbyte(ctx->out, 'w');
1133 0 : pq_sendint64(ctx->out, lsn); /* dataStart */
1134 0 : pq_sendint64(ctx->out, lsn); /* walEnd */
1135 :
1136 : /*
1137 : * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1138 : * reserve space here.
1139 : */
1140 0 : pq_sendint64(ctx->out, 0); /* sendtime */
1141 0 : }
1142 :
1143 : /*
1144 : * LogicalDecodingContext 'write' callback.
1145 : *
1146 : * Actually write out data previously prepared by WalSndPrepareWrite out to
1147 : * the network. Take as long as needed, but process replies from the other
1148 : * side and check timeouts during that.
1149 : */
1150 : static void
1151 0 : WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
1152 : bool last_write)
1153 : {
1154 : /* output previously gathered data in a CopyData packet */
1155 0 : pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1156 :
1157 : /*
1158 : * Fill the send timestamp last, so that it is taken as late as possible.
1159 : * This is somewhat ugly, but the protocol is set as it's already used for
1160 : * several releases by streaming physical replication.
1161 : */
1162 0 : resetStringInfo(&tmpbuf);
1163 0 : pq_sendint64(&tmpbuf, GetCurrentTimestamp());
1164 0 : memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1165 0 : tmpbuf.data, sizeof(int64));
1166 :
1167 : /* fast path */
1168 : /* Try to flush pending output to the client */
1169 0 : if (pq_flush_if_writable() != 0)
1170 0 : WalSndShutdown();
1171 :
1172 0 : if (!pq_is_send_pending())
1173 0 : return;
1174 :
1175 : for (;;)
1176 : {
1177 : int wakeEvents;
1178 : long sleeptime;
1179 : TimestampTz now;
1180 :
1181 : /*
1182 : * Emergency bailout if postmaster has died. This is to avoid the
1183 : * necessity for manual cleanup of all postmaster children.
1184 : */
1185 0 : if (!PostmasterIsAlive())
1186 0 : exit(1);
1187 :
1188 : /* Clear any already-pending wakeups */
1189 0 : ResetLatch(MyLatch);
1190 :
1191 0 : CHECK_FOR_INTERRUPTS();
1192 :
1193 : /* Process any requests or signals received recently */
1194 0 : if (ConfigReloadPending)
1195 : {
1196 0 : ConfigReloadPending = false;
1197 0 : ProcessConfigFile(PGC_SIGHUP);
1198 0 : SyncRepInitConfig();
1199 : }
1200 :
1201 : /* Check for input from the client */
1202 0 : ProcessRepliesIfAny();
1203 :
1204 : /* Try to flush pending output to the client */
1205 0 : if (pq_flush_if_writable() != 0)
1206 0 : WalSndShutdown();
1207 :
1208 : /* If we finished clearing the buffered data, we're done here. */
1209 0 : if (!pq_is_send_pending())
1210 0 : break;
1211 :
1212 0 : now = GetCurrentTimestamp();
1213 :
1214 : /* die if timeout was reached */
1215 0 : WalSndCheckTimeOut(now);
1216 :
1217 : /* Send keepalive if the time has come */
1218 0 : WalSndKeepaliveIfNecessary(now);
1219 :
1220 0 : sleeptime = WalSndComputeSleeptime(now);
1221 :
1222 0 : wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1223 : WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
1224 :
1225 : /* Sleep until something happens or we time out */
1226 0 : WaitLatchOrSocket(MyLatch, wakeEvents,
1227 0 : MyProcPort->sock, sleeptime,
1228 : WAIT_EVENT_WAL_SENDER_WRITE_DATA);
1229 0 : }
1230 :
1231 : /* reactivate latch so WalSndLoop knows to continue */
1232 0 : SetLatch(MyLatch);
1233 : }
1234 :
1235 : /*
1236 : * LogicalDecodingContext 'progress_update' callback.
1237 : *
1238 : * Write the current position to the log tracker (see XLogSendPhysical).
1239 : */
1240 : static void
1241 0 : WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
1242 : {
1243 : static TimestampTz sendTime = 0;
1244 0 : TimestampTz now = GetCurrentTimestamp();
1245 :
1246 : /*
1247 : * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1248 : * avoid flooding the lag tracker when we commit frequently.
1249 : */
1250 : #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1251 0 : if (!TimestampDifferenceExceeds(sendTime, now,
1252 : WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
1253 0 : return;
1254 :
1255 0 : LagTrackerWrite(lsn, now);
1256 0 : sendTime = now;
1257 : }
1258 :
1259 : /*
1260 : * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
1261 : *
1262 : * Returns end LSN of flushed WAL. Normally this will be >= loc, but
1263 : * if we detect a shutdown request (either from postmaster or client)
1264 : * we will return early, so caller must always check.
1265 : */
1266 : static XLogRecPtr
1267 0 : WalSndWaitForWal(XLogRecPtr loc)
1268 : {
1269 : int wakeEvents;
1270 : static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1271 :
1272 :
1273 : /*
1274 : * Fast path to avoid acquiring the spinlock in case we already know we
1275 : * have enough WAL available. This is particularly interesting if we're
1276 : * far behind.
1277 : */
1278 0 : if (RecentFlushPtr != InvalidXLogRecPtr &&
1279 0 : loc <= RecentFlushPtr)
1280 0 : return RecentFlushPtr;
1281 :
1282 : /* Get a more recent flush pointer. */
1283 0 : if (!RecoveryInProgress())
1284 0 : RecentFlushPtr = GetFlushRecPtr();
1285 : else
1286 0 : RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1287 :
1288 : for (;;)
1289 : {
1290 : long sleeptime;
1291 : TimestampTz now;
1292 :
1293 : /*
1294 : * Emergency bailout if postmaster has died. This is to avoid the
1295 : * necessity for manual cleanup of all postmaster children.
1296 : */
1297 0 : if (!PostmasterIsAlive())
1298 0 : exit(1);
1299 :
1300 : /* Clear any already-pending wakeups */
1301 0 : ResetLatch(MyLatch);
1302 :
1303 0 : CHECK_FOR_INTERRUPTS();
1304 :
1305 : /* Process any requests or signals received recently */
1306 0 : if (ConfigReloadPending)
1307 : {
1308 0 : ConfigReloadPending = false;
1309 0 : ProcessConfigFile(PGC_SIGHUP);
1310 0 : SyncRepInitConfig();
1311 : }
1312 :
1313 : /* Check for input from the client */
1314 0 : ProcessRepliesIfAny();
1315 :
1316 : /*
1317 : * If we're shutting down, trigger pending WAL to be written out,
1318 : * otherwise we'd possibly end up waiting for WAL that never gets
1319 : * written, because walwriter has shut down already.
1320 : */
1321 0 : if (got_STOPPING)
1322 0 : XLogBackgroundFlush();
1323 :
1324 : /* Update our idea of the currently flushed position. */
1325 0 : if (!RecoveryInProgress())
1326 0 : RecentFlushPtr = GetFlushRecPtr();
1327 : else
1328 0 : RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1329 :
1330 : /*
1331 : * If postmaster asked us to stop, don't wait anymore.
1332 : *
1333 : * It's important to do this check after the recomputation of
1334 : * RecentFlushPtr, so we can send all remaining data before shutting
1335 : * down.
1336 : */
1337 0 : if (got_STOPPING)
1338 0 : break;
1339 :
1340 : /*
1341 : * We only send regular messages to the client for full decoded
1342 : * transactions, but a synchronous replication and walsender shutdown
1343 : * possibly are waiting for a later location. So we send pings
1344 : * containing the flush location every now and then.
1345 : */
1346 0 : if (MyWalSnd->flush < sentPtr &&
1347 0 : MyWalSnd->write < sentPtr &&
1348 0 : !waiting_for_ping_response)
1349 : {
1350 0 : WalSndKeepalive(false);
1351 0 : waiting_for_ping_response = true;
1352 : }
1353 :
1354 : /* check whether we're done */
1355 0 : if (loc <= RecentFlushPtr)
1356 0 : break;
1357 :
1358 : /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1359 0 : WalSndCaughtUp = true;
1360 :
1361 : /*
1362 : * Try to flush any pending output to the client.
1363 : */
1364 0 : if (pq_flush_if_writable() != 0)
1365 0 : WalSndShutdown();
1366 :
1367 : /*
1368 : * If we have received CopyDone from the client, sent CopyDone
1369 : * ourselves, and the output buffer is empty, it's time to exit
1370 : * streaming, so fail the current WAL fetch request.
1371 : */
1372 0 : if (streamingDoneReceiving && streamingDoneSending &&
1373 0 : !pq_is_send_pending())
1374 0 : break;
1375 :
1376 0 : now = GetCurrentTimestamp();
1377 :
1378 : /* die if timeout was reached */
1379 0 : WalSndCheckTimeOut(now);
1380 :
1381 : /* Send keepalive if the time has come */
1382 0 : WalSndKeepaliveIfNecessary(now);
1383 :
1384 : /*
1385 : * Sleep until something happens or we time out. Also wait for the
1386 : * socket becoming writable, if there's still pending output.
1387 : * Otherwise we might sit on sendable output data while waiting for
1388 : * new WAL to be generated. (But if we have nothing to send, we don't
1389 : * want to wake on socket-writable.)
1390 : */
1391 0 : sleeptime = WalSndComputeSleeptime(now);
1392 :
1393 0 : wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1394 : WL_SOCKET_READABLE | WL_TIMEOUT;
1395 :
1396 0 : if (pq_is_send_pending())
1397 0 : wakeEvents |= WL_SOCKET_WRITEABLE;
1398 :
1399 0 : WaitLatchOrSocket(MyLatch, wakeEvents,
1400 0 : MyProcPort->sock, sleeptime,
1401 : WAIT_EVENT_WAL_SENDER_WAIT_WAL);
1402 0 : }
1403 :
1404 : /* reactivate latch so WalSndLoop knows to continue */
1405 0 : SetLatch(MyLatch);
1406 0 : return RecentFlushPtr;
1407 : }
1408 :
1409 : /*
1410 : * Execute an incoming replication command.
1411 : *
1412 : * Returns true if the cmd_string was recognized as WalSender command, false
1413 : * if not.
1414 : */
1415 : bool
1416 0 : exec_replication_command(const char *cmd_string)
1417 : {
1418 : int parse_rc;
1419 : Node *cmd_node;
1420 : MemoryContext cmd_context;
1421 : MemoryContext old_context;
1422 :
1423 : /*
1424 : * If WAL sender has been told that shutdown is getting close, switch its
1425 : * status accordingly to handle the next replication commands correctly.
1426 : */
1427 0 : if (got_STOPPING)
1428 0 : WalSndSetState(WALSNDSTATE_STOPPING);
1429 :
1430 : /*
1431 : * Throw error if in stopping mode. We need prevent commands that could
1432 : * generate WAL while the shutdown checkpoint is being written. To be
1433 : * safe, we just prohibit all new commands.
1434 : */
1435 0 : if (MyWalSnd->state == WALSNDSTATE_STOPPING)
1436 0 : ereport(ERROR,
1437 : (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1438 :
1439 : /*
1440 : * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1441 : * command arrives. Clean up the old stuff if there's anything.
1442 : */
1443 0 : SnapBuildClearExportedSnapshot();
1444 :
1445 0 : CHECK_FOR_INTERRUPTS();
1446 :
1447 0 : cmd_context = AllocSetContextCreate(CurrentMemoryContext,
1448 : "Replication command context",
1449 : ALLOCSET_DEFAULT_SIZES);
1450 0 : old_context = MemoryContextSwitchTo(cmd_context);
1451 :
1452 0 : replication_scanner_init(cmd_string);
1453 0 : parse_rc = replication_yyparse();
1454 0 : if (parse_rc != 0)
1455 0 : ereport(ERROR,
1456 : (errcode(ERRCODE_SYNTAX_ERROR),
1457 : (errmsg_internal("replication command parser returned %d",
1458 : parse_rc))));
1459 :
1460 0 : cmd_node = replication_parse_result;
1461 :
1462 : /*
1463 : * Log replication command if log_replication_commands is enabled. Even
1464 : * when it's disabled, log the command with DEBUG1 level for backward
1465 : * compatibility. Note that SQL commands are not logged here, and will be
1466 : * logged later if log_statement is enabled.
1467 : */
1468 0 : if (cmd_node->type != T_SQLCmd)
1469 0 : ereport(log_replication_commands ? LOG : DEBUG1,
1470 : (errmsg("received replication command: %s", cmd_string)));
1471 :
1472 : /*
1473 : * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
1474 : * called outside of transaction the snapshot should be cleared here.
1475 : */
1476 0 : if (!IsTransactionBlock())
1477 0 : SnapBuildClearExportedSnapshot();
1478 :
1479 : /*
1480 : * For aborted transactions, don't allow anything except pure SQL, the
1481 : * exec_simple_query() will handle it correctly.
1482 : */
1483 0 : if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
1484 0 : ereport(ERROR,
1485 : (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1486 : errmsg("current transaction is aborted, "
1487 : "commands ignored until end of transaction block")));
1488 :
1489 0 : CHECK_FOR_INTERRUPTS();
1490 :
1491 : /*
1492 : * Allocate buffers that will be used for each outgoing and incoming
1493 : * message. We do this just once per command to reduce palloc overhead.
1494 : */
1495 0 : initStringInfo(&output_message);
1496 0 : initStringInfo(&reply_message);
1497 0 : initStringInfo(&tmpbuf);
1498 :
1499 0 : switch (cmd_node->type)
1500 : {
1501 : case T_IdentifySystemCmd:
1502 0 : IdentifySystem();
1503 0 : break;
1504 :
1505 : case T_BaseBackupCmd:
1506 0 : PreventTransactionChain(true, "BASE_BACKUP");
1507 0 : SendBaseBackup((BaseBackupCmd *) cmd_node);
1508 0 : break;
1509 :
1510 : case T_CreateReplicationSlotCmd:
1511 0 : CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
1512 0 : break;
1513 :
1514 : case T_DropReplicationSlotCmd:
1515 0 : DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
1516 0 : break;
1517 :
1518 : case T_StartReplicationCmd:
1519 : {
1520 0 : StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1521 :
1522 0 : PreventTransactionChain(true, "START_REPLICATION");
1523 :
1524 0 : if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1525 0 : StartReplication(cmd);
1526 : else
1527 0 : StartLogicalReplication(cmd);
1528 0 : break;
1529 : }
1530 :
1531 : case T_TimeLineHistoryCmd:
1532 0 : PreventTransactionChain(true, "TIMELINE_HISTORY");
1533 0 : SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
1534 0 : break;
1535 :
1536 : case T_VariableShowStmt:
1537 : {
1538 0 : DestReceiver *dest = CreateDestReceiver(DestRemoteSimple);
1539 0 : VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1540 :
1541 0 : GetPGVariable(n->name, dest);
1542 : }
1543 0 : break;
1544 :
1545 : case T_SQLCmd:
1546 0 : if (MyDatabaseId == InvalidOid)
1547 0 : ereport(ERROR,
1548 : (errmsg("not connected to database")));
1549 :
1550 : /* Tell the caller that this wasn't a WalSender command. */
1551 0 : return false;
1552 :
1553 : default:
1554 0 : elog(ERROR, "unrecognized replication command node tag: %u",
1555 : cmd_node->type);
1556 : }
1557 :
1558 : /* done */
1559 0 : MemoryContextSwitchTo(old_context);
1560 0 : MemoryContextDelete(cmd_context);
1561 :
1562 : /* Send CommandComplete message */
1563 0 : EndCommand("SELECT", DestRemote);
1564 :
1565 0 : return true;
1566 : }
1567 :
1568 : /*
1569 : * Process any incoming messages while streaming. Also checks if the remote
1570 : * end has closed the connection.
1571 : */
1572 : static void
1573 0 : ProcessRepliesIfAny(void)
1574 : {
1575 : unsigned char firstchar;
1576 : int r;
1577 0 : bool received = false;
1578 :
1579 : for (;;)
1580 : {
1581 0 : pq_startmsgread();
1582 0 : r = pq_getbyte_if_available(&firstchar);
1583 0 : if (r < 0)
1584 : {
1585 : /* unexpected error or EOF */
1586 0 : ereport(COMMERROR,
1587 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1588 : errmsg("unexpected EOF on standby connection")));
1589 0 : proc_exit(0);
1590 : }
1591 0 : if (r == 0)
1592 : {
1593 : /* no data available without blocking */
1594 0 : pq_endmsgread();
1595 0 : break;
1596 : }
1597 :
1598 : /* Read the message contents */
1599 0 : resetStringInfo(&reply_message);
1600 0 : if (pq_getmessage(&reply_message, 0))
1601 : {
1602 0 : ereport(COMMERROR,
1603 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1604 : errmsg("unexpected EOF on standby connection")));
1605 0 : proc_exit(0);
1606 : }
1607 :
1608 : /*
1609 : * If we already received a CopyDone from the frontend, the frontend
1610 : * should not send us anything until we've closed our end of the COPY.
1611 : * XXX: In theory, the frontend could already send the next command
1612 : * before receiving the CopyDone, but libpq doesn't currently allow
1613 : * that.
1614 : */
1615 0 : if (streamingDoneReceiving && firstchar != 'X')
1616 0 : ereport(FATAL,
1617 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1618 : errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
1619 : firstchar)));
1620 :
1621 : /* Handle the very limited subset of commands expected in this phase */
1622 0 : switch (firstchar)
1623 : {
1624 : /*
1625 : * 'd' means a standby reply wrapped in a CopyData packet.
1626 : */
1627 : case 'd':
1628 0 : ProcessStandbyMessage();
1629 0 : received = true;
1630 0 : break;
1631 :
1632 : /*
1633 : * CopyDone means the standby requested to finish streaming.
1634 : * Reply with CopyDone, if we had not sent that already.
1635 : */
1636 : case 'c':
1637 0 : if (!streamingDoneSending)
1638 : {
1639 0 : pq_putmessage_noblock('c', NULL, 0);
1640 0 : streamingDoneSending = true;
1641 : }
1642 :
1643 0 : streamingDoneReceiving = true;
1644 0 : received = true;
1645 0 : break;
1646 :
1647 : /*
1648 : * 'X' means that the standby is closing down the socket.
1649 : */
1650 : case 'X':
1651 0 : proc_exit(0);
1652 :
1653 : default:
1654 0 : ereport(FATAL,
1655 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1656 : errmsg("invalid standby message type \"%c\"",
1657 : firstchar)));
1658 : }
1659 0 : }
1660 :
1661 : /*
1662 : * Save the last reply timestamp if we've received at least one reply.
1663 : */
1664 0 : if (received)
1665 : {
1666 0 : last_reply_timestamp = GetCurrentTimestamp();
1667 0 : waiting_for_ping_response = false;
1668 : }
1669 0 : }
1670 :
1671 : /*
1672 : * Process a status update message received from standby.
1673 : */
1674 : static void
1675 0 : ProcessStandbyMessage(void)
1676 : {
1677 : char msgtype;
1678 :
1679 : /*
1680 : * Check message type from the first byte.
1681 : */
1682 0 : msgtype = pq_getmsgbyte(&reply_message);
1683 :
1684 0 : switch (msgtype)
1685 : {
1686 : case 'r':
1687 0 : ProcessStandbyReplyMessage();
1688 0 : break;
1689 :
1690 : case 'h':
1691 0 : ProcessStandbyHSFeedbackMessage();
1692 0 : break;
1693 :
1694 : default:
1695 0 : ereport(COMMERROR,
1696 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1697 : errmsg("unexpected message type \"%c\"", msgtype)));
1698 0 : proc_exit(0);
1699 : }
1700 0 : }
1701 :
1702 : /*
1703 : * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
1704 : */
1705 : static void
1706 0 : PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
1707 : {
1708 0 : bool changed = false;
1709 0 : ReplicationSlot *slot = MyReplicationSlot;
1710 :
1711 0 : Assert(lsn != InvalidXLogRecPtr);
1712 0 : SpinLockAcquire(&slot->mutex);
1713 0 : if (slot->data.restart_lsn != lsn)
1714 : {
1715 0 : changed = true;
1716 0 : slot->data.restart_lsn = lsn;
1717 : }
1718 0 : SpinLockRelease(&slot->mutex);
1719 :
1720 0 : if (changed)
1721 : {
1722 0 : ReplicationSlotMarkDirty();
1723 0 : ReplicationSlotsComputeRequiredLSN();
1724 : }
1725 :
1726 : /*
1727 : * One could argue that the slot should be saved to disk now, but that'd
1728 : * be energy wasted - the worst lost information can do here is give us
1729 : * wrong information in a statistics view - we'll just potentially be more
1730 : * conservative in removing files.
1731 : */
1732 0 : }
1733 :
1734 : /*
1735 : * Regular reply from standby advising of WAL locations on standby server.
1736 : */
1737 : static void
1738 0 : ProcessStandbyReplyMessage(void)
1739 : {
1740 : XLogRecPtr writePtr,
1741 : flushPtr,
1742 : applyPtr;
1743 : bool replyRequested;
1744 : TimeOffset writeLag,
1745 : flushLag,
1746 : applyLag;
1747 : bool clearLagTimes;
1748 : TimestampTz now;
1749 :
1750 : static bool fullyAppliedLastTime = false;
1751 :
1752 : /* the caller already consumed the msgtype byte */
1753 0 : writePtr = pq_getmsgint64(&reply_message);
1754 0 : flushPtr = pq_getmsgint64(&reply_message);
1755 0 : applyPtr = pq_getmsgint64(&reply_message);
1756 0 : (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1757 0 : replyRequested = pq_getmsgbyte(&reply_message);
1758 :
1759 0 : elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
1760 : (uint32) (writePtr >> 32), (uint32) writePtr,
1761 : (uint32) (flushPtr >> 32), (uint32) flushPtr,
1762 : (uint32) (applyPtr >> 32), (uint32) applyPtr,
1763 : replyRequested ? " (reply requested)" : "");
1764 :
1765 : /* See if we can compute the round-trip lag for these positions. */
1766 0 : now = GetCurrentTimestamp();
1767 0 : writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1768 0 : flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1769 0 : applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1770 :
1771 : /*
1772 : * If the standby reports that it has fully replayed the WAL in two
1773 : * consecutive reply messages, then the second such message must result
1774 : * from wal_receiver_status_interval expiring on the standby. This is a
1775 : * convenient time to forget the lag times measured when it last
1776 : * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1777 : * until more WAL traffic arrives.
1778 : */
1779 0 : clearLagTimes = false;
1780 0 : if (applyPtr == sentPtr)
1781 : {
1782 0 : if (fullyAppliedLastTime)
1783 0 : clearLagTimes = true;
1784 0 : fullyAppliedLastTime = true;
1785 : }
1786 : else
1787 0 : fullyAppliedLastTime = false;
1788 :
1789 : /* Send a reply if the standby requested one. */
1790 0 : if (replyRequested)
1791 0 : WalSndKeepalive(false);
1792 :
1793 : /*
1794 : * Update shared state for this WalSender process based on reply data from
1795 : * standby.
1796 : */
1797 : {
1798 0 : WalSnd *walsnd = MyWalSnd;
1799 :
1800 0 : SpinLockAcquire(&walsnd->mutex);
1801 0 : walsnd->write = writePtr;
1802 0 : walsnd->flush = flushPtr;
1803 0 : walsnd->apply = applyPtr;
1804 0 : if (writeLag != -1 || clearLagTimes)
1805 0 : walsnd->writeLag = writeLag;
1806 0 : if (flushLag != -1 || clearLagTimes)
1807 0 : walsnd->flushLag = flushLag;
1808 0 : if (applyLag != -1 || clearLagTimes)
1809 0 : walsnd->applyLag = applyLag;
1810 0 : SpinLockRelease(&walsnd->mutex);
1811 : }
1812 :
1813 0 : if (!am_cascading_walsender)
1814 0 : SyncRepReleaseWaiters();
1815 :
1816 : /*
1817 : * Advance our local xmin horizon when the client confirmed a flush.
1818 : */
1819 0 : if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1820 : {
1821 0 : if (SlotIsLogical(MyReplicationSlot))
1822 0 : LogicalConfirmReceivedLocation(flushPtr);
1823 : else
1824 0 : PhysicalConfirmReceivedLocation(flushPtr);
1825 : }
1826 0 : }
1827 :
1828 : /* compute new replication slot xmin horizon if needed */
1829 : static void
1830 0 : PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
1831 : {
1832 0 : bool changed = false;
1833 0 : ReplicationSlot *slot = MyReplicationSlot;
1834 :
1835 0 : SpinLockAcquire(&slot->mutex);
1836 0 : MyPgXact->xmin = InvalidTransactionId;
1837 :
1838 : /*
1839 : * For physical replication we don't need the interlock provided by xmin
1840 : * and effective_xmin since the consequences of a missed increase are
1841 : * limited to query cancellations, so set both at once.
1842 : */
1843 0 : if (!TransactionIdIsNormal(slot->data.xmin) ||
1844 0 : !TransactionIdIsNormal(feedbackXmin) ||
1845 0 : TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1846 : {
1847 0 : changed = true;
1848 0 : slot->data.xmin = feedbackXmin;
1849 0 : slot->effective_xmin = feedbackXmin;
1850 : }
1851 0 : if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
1852 0 : !TransactionIdIsNormal(feedbackCatalogXmin) ||
1853 0 : TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
1854 : {
1855 0 : changed = true;
1856 0 : slot->data.catalog_xmin = feedbackCatalogXmin;
1857 0 : slot->effective_catalog_xmin = feedbackCatalogXmin;
1858 : }
1859 0 : SpinLockRelease(&slot->mutex);
1860 :
1861 0 : if (changed)
1862 : {
1863 0 : ReplicationSlotMarkDirty();
1864 0 : ReplicationSlotsComputeRequiredXmin(false);
1865 : }
1866 0 : }
1867 :
1868 : /*
1869 : * Check that the provided xmin/epoch are sane, that is, not in the future
1870 : * and not so far back as to be already wrapped around.
1871 : *
1872 : * Epoch of nextXid should be same as standby, or if the counter has
1873 : * wrapped, then one greater than standby.
1874 : *
1875 : * This check doesn't care about whether clog exists for these xids
1876 : * at all.
1877 : */
1878 : static bool
1879 0 : TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
1880 : {
1881 : TransactionId nextXid;
1882 : uint32 nextEpoch;
1883 :
1884 0 : GetNextXidAndEpoch(&nextXid, &nextEpoch);
1885 :
1886 0 : if (xid <= nextXid)
1887 : {
1888 0 : if (epoch != nextEpoch)
1889 0 : return false;
1890 : }
1891 : else
1892 : {
1893 0 : if (epoch + 1 != nextEpoch)
1894 0 : return false;
1895 : }
1896 :
1897 0 : if (!TransactionIdPrecedesOrEquals(xid, nextXid))
1898 0 : return false; /* epoch OK, but it's wrapped around */
1899 :
1900 0 : return true;
1901 : }
1902 :
1903 : /*
1904 : * Hot Standby feedback
1905 : */
1906 : static void
1907 0 : ProcessStandbyHSFeedbackMessage(void)
1908 : {
1909 : TransactionId feedbackXmin;
1910 : uint32 feedbackEpoch;
1911 : TransactionId feedbackCatalogXmin;
1912 : uint32 feedbackCatalogEpoch;
1913 :
1914 : /*
1915 : * Decipher the reply message. The caller already consumed the msgtype
1916 : * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
1917 : * of this message.
1918 : */
1919 0 : (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1920 0 : feedbackXmin = pq_getmsgint(&reply_message, 4);
1921 0 : feedbackEpoch = pq_getmsgint(&reply_message, 4);
1922 0 : feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
1923 0 : feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
1924 :
1925 0 : elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
1926 : feedbackXmin,
1927 : feedbackEpoch,
1928 : feedbackCatalogXmin,
1929 : feedbackCatalogEpoch);
1930 :
1931 : /*
1932 : * Unset WalSender's xmins if the feedback message values are invalid.
1933 : * This happens when the downstream turned hot_standby_feedback off.
1934 : */
1935 0 : if (!TransactionIdIsNormal(feedbackXmin)
1936 0 : && !TransactionIdIsNormal(feedbackCatalogXmin))
1937 : {
1938 0 : MyPgXact->xmin = InvalidTransactionId;
1939 0 : if (MyReplicationSlot != NULL)
1940 0 : PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
1941 0 : return;
1942 : }
1943 :
1944 : /*
1945 : * Check that the provided xmin/epoch are sane, that is, not in the future
1946 : * and not so far back as to be already wrapped around. Ignore if not.
1947 : */
1948 0 : if (TransactionIdIsNormal(feedbackXmin) &&
1949 0 : !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
1950 0 : return;
1951 :
1952 0 : if (TransactionIdIsNormal(feedbackCatalogXmin) &&
1953 0 : !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
1954 0 : return;
1955 :
1956 : /*
1957 : * Set the WalSender's xmin equal to the standby's requested xmin, so that
1958 : * the xmin will be taken into account by GetOldestXmin. This will hold
1959 : * back the removal of dead rows and thereby prevent the generation of
1960 : * cleanup conflicts on the standby server.
1961 : *
1962 : * There is a small window for a race condition here: although we just
1963 : * checked that feedbackXmin precedes nextXid, the nextXid could have
1964 : * gotten advanced between our fetching it and applying the xmin below,
1965 : * perhaps far enough to make feedbackXmin wrap around. In that case the
1966 : * xmin we set here would be "in the future" and have no effect. No point
1967 : * in worrying about this since it's too late to save the desired data
1968 : * anyway. Assuming that the standby sends us an increasing sequence of
1969 : * xmins, this could only happen during the first reply cycle, else our
1970 : * own xmin would prevent nextXid from advancing so far.
1971 : *
1972 : * We don't bother taking the ProcArrayLock here. Setting the xmin field
1973 : * is assumed atomic, and there's no real need to prevent a concurrent
1974 : * GetOldestXmin. (If we're moving our xmin forward, this is obviously
1975 : * safe, and if we're moving it backwards, well, the data is at risk
1976 : * already since a VACUUM could have just finished calling GetOldestXmin.)
1977 : *
1978 : * If we're using a replication slot we reserve the xmin via that,
1979 : * otherwise via the walsender's PGXACT entry. We can only track the
1980 : * catalog xmin separately when using a slot, so we store the least of the
1981 : * two provided when not using a slot.
1982 : *
1983 : * XXX: It might make sense to generalize the ephemeral slot concept and
1984 : * always use the slot mechanism to handle the feedback xmin.
1985 : */
1986 0 : if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
1987 0 : PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
1988 : else
1989 : {
1990 0 : if (TransactionIdIsNormal(feedbackCatalogXmin)
1991 0 : && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
1992 0 : MyPgXact->xmin = feedbackCatalogXmin;
1993 : else
1994 0 : MyPgXact->xmin = feedbackXmin;
1995 : }
1996 : }
1997 :
1998 : /*
1999 : * Compute how long send/receive loops should sleep.
2000 : *
2001 : * If wal_sender_timeout is enabled we want to wake up in time to send
2002 : * keepalives and to abort the connection if wal_sender_timeout has been
2003 : * reached.
2004 : */
2005 : static long
2006 0 : WalSndComputeSleeptime(TimestampTz now)
2007 : {
2008 0 : long sleeptime = 10000; /* 10 s */
2009 :
2010 0 : if (wal_sender_timeout > 0 && last_reply_timestamp > 0)
2011 : {
2012 : TimestampTz wakeup_time;
2013 : long sec_to_timeout;
2014 : int microsec_to_timeout;
2015 :
2016 : /*
2017 : * At the latest stop sleeping once wal_sender_timeout has been
2018 : * reached.
2019 : */
2020 0 : wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
2021 : wal_sender_timeout);
2022 :
2023 : /*
2024 : * If no ping has been sent yet, wakeup when it's time to do so.
2025 : * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2026 : * the timeout passed without a response.
2027 : */
2028 0 : if (!waiting_for_ping_response)
2029 0 : wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
2030 : wal_sender_timeout / 2);
2031 :
2032 : /* Compute relative time until wakeup. */
2033 0 : TimestampDifference(now, wakeup_time,
2034 : &sec_to_timeout, µsec_to_timeout);
2035 :
2036 0 : sleeptime = sec_to_timeout * 1000 +
2037 0 : microsec_to_timeout / 1000;
2038 : }
2039 :
2040 0 : return sleeptime;
2041 : }
2042 :
2043 : /*
2044 : * Check whether there have been responses by the client within
2045 : * wal_sender_timeout and shutdown if not.
2046 : */
2047 : static void
2048 0 : WalSndCheckTimeOut(TimestampTz now)
2049 : {
2050 : TimestampTz timeout;
2051 :
2052 : /* don't bail out if we're doing something that doesn't require timeouts */
2053 0 : if (last_reply_timestamp <= 0)
2054 0 : return;
2055 :
2056 0 : timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
2057 : wal_sender_timeout);
2058 :
2059 0 : if (wal_sender_timeout > 0 && now >= timeout)
2060 : {
2061 : /*
2062 : * Since typically expiration of replication timeout means
2063 : * communication problem, we don't send the error message to the
2064 : * standby.
2065 : */
2066 0 : ereport(COMMERROR,
2067 : (errmsg("terminating walsender process due to replication timeout")));
2068 :
2069 0 : WalSndShutdown();
2070 : }
2071 : }
2072 :
2073 : /* Main loop of walsender process that streams the WAL over Copy messages. */
2074 : static void
2075 0 : WalSndLoop(WalSndSendDataCallback send_data)
2076 : {
2077 : /*
2078 : * Initialize the last reply timestamp. That enables timeout processing
2079 : * from hereon.
2080 : */
2081 0 : last_reply_timestamp = GetCurrentTimestamp();
2082 0 : waiting_for_ping_response = false;
2083 :
2084 : /* Report to pgstat that this process is running */
2085 0 : pgstat_report_activity(STATE_RUNNING, NULL);
2086 :
2087 : /*
2088 : * Loop until we reach the end of this timeline or the client requests to
2089 : * stop streaming.
2090 : */
2091 : for (;;)
2092 : {
2093 : TimestampTz now;
2094 :
2095 : /*
2096 : * Emergency bailout if postmaster has died. This is to avoid the
2097 : * necessity for manual cleanup of all postmaster children.
2098 : */
2099 0 : if (!PostmasterIsAlive())
2100 0 : exit(1);
2101 :
2102 : /* Clear any already-pending wakeups */
2103 0 : ResetLatch(MyLatch);
2104 :
2105 0 : CHECK_FOR_INTERRUPTS();
2106 :
2107 : /* Process any requests or signals received recently */
2108 0 : if (ConfigReloadPending)
2109 : {
2110 0 : ConfigReloadPending = false;
2111 0 : ProcessConfigFile(PGC_SIGHUP);
2112 0 : SyncRepInitConfig();
2113 : }
2114 :
2115 : /* Check for input from the client */
2116 0 : ProcessRepliesIfAny();
2117 :
2118 : /*
2119 : * If we have received CopyDone from the client, sent CopyDone
2120 : * ourselves, and the output buffer is empty, it's time to exit
2121 : * streaming.
2122 : */
2123 0 : if (streamingDoneReceiving && streamingDoneSending &&
2124 0 : !pq_is_send_pending())
2125 0 : break;
2126 :
2127 : /*
2128 : * If we don't have any pending data in the output buffer, try to send
2129 : * some more. If there is some, we don't bother to call send_data
2130 : * again until we've flushed it ... but we'd better assume we are not
2131 : * caught up.
2132 : */
2133 0 : if (!pq_is_send_pending())
2134 0 : send_data();
2135 : else
2136 0 : WalSndCaughtUp = false;
2137 :
2138 : /* Try to flush pending output to the client */
2139 0 : if (pq_flush_if_writable() != 0)
2140 0 : WalSndShutdown();
2141 :
2142 : /* If nothing remains to be sent right now ... */
2143 0 : if (WalSndCaughtUp && !pq_is_send_pending())
2144 : {
2145 : /*
2146 : * If we're in catchup state, move to streaming. This is an
2147 : * important state change for users to know about, since before
2148 : * this point data loss might occur if the primary dies and we
2149 : * need to failover to the standby. The state change is also
2150 : * important for synchronous replication, since commits that
2151 : * started to wait at that point might wait for some time.
2152 : */
2153 0 : if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
2154 : {
2155 0 : ereport(DEBUG1,
2156 : (errmsg("standby \"%s\" has now caught up with primary",
2157 : application_name)));
2158 0 : WalSndSetState(WALSNDSTATE_STREAMING);
2159 : }
2160 :
2161 : /*
2162 : * When SIGUSR2 arrives, we send any outstanding logs up to the
2163 : * shutdown checkpoint record (i.e., the latest record), wait for
2164 : * them to be replicated to the standby, and exit. This may be a
2165 : * normal termination at shutdown, or a promotion, the walsender
2166 : * is not sure which.
2167 : */
2168 0 : if (got_SIGUSR2)
2169 0 : WalSndDone(send_data);
2170 : }
2171 :
2172 0 : now = GetCurrentTimestamp();
2173 :
2174 : /* Check for replication timeout. */
2175 0 : WalSndCheckTimeOut(now);
2176 :
2177 : /* Send keepalive if the time has come */
2178 0 : WalSndKeepaliveIfNecessary(now);
2179 :
2180 : /*
2181 : * We don't block if not caught up, unless there is unsent data
2182 : * pending in which case we'd better block until the socket is
2183 : * write-ready. This test is only needed for the case where the
2184 : * send_data callback handled a subset of the available data but then
2185 : * pq_flush_if_writable flushed it all --- we should immediately try
2186 : * to send more.
2187 : */
2188 0 : if ((WalSndCaughtUp && !streamingDoneSending) || pq_is_send_pending())
2189 : {
2190 : long sleeptime;
2191 : int wakeEvents;
2192 :
2193 0 : wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT |
2194 : WL_SOCKET_READABLE;
2195 :
2196 0 : sleeptime = WalSndComputeSleeptime(now);
2197 :
2198 0 : if (pq_is_send_pending())
2199 0 : wakeEvents |= WL_SOCKET_WRITEABLE;
2200 :
2201 : /* Sleep until something happens or we time out */
2202 0 : WaitLatchOrSocket(MyLatch, wakeEvents,
2203 0 : MyProcPort->sock, sleeptime,
2204 : WAIT_EVENT_WAL_SENDER_MAIN);
2205 : }
2206 0 : }
2207 0 : return;
2208 : }
2209 :
2210 : /* Initialize a per-walsender data structure for this walsender process */
2211 : static void
2212 0 : InitWalSenderSlot(void)
2213 : {
2214 : int i;
2215 :
2216 : /*
2217 : * WalSndCtl should be set up already (we inherit this by fork() or
2218 : * EXEC_BACKEND mechanism from the postmaster).
2219 : */
2220 0 : Assert(WalSndCtl != NULL);
2221 0 : Assert(MyWalSnd == NULL);
2222 :
2223 : /*
2224 : * Find a free walsender slot and reserve it. If this fails, we must be
2225 : * out of WalSnd structures.
2226 : */
2227 0 : for (i = 0; i < max_wal_senders; i++)
2228 : {
2229 0 : WalSnd *walsnd = &WalSndCtl->walsnds[i];
2230 :
2231 0 : SpinLockAcquire(&walsnd->mutex);
2232 :
2233 0 : if (walsnd->pid != 0)
2234 : {
2235 0 : SpinLockRelease(&walsnd->mutex);
2236 0 : continue;
2237 : }
2238 : else
2239 : {
2240 : /*
2241 : * Found a free slot. Reserve it for us.
2242 : */
2243 0 : walsnd->pid = MyProcPid;
2244 0 : walsnd->sentPtr = InvalidXLogRecPtr;
2245 0 : walsnd->write = InvalidXLogRecPtr;
2246 0 : walsnd->flush = InvalidXLogRecPtr;
2247 0 : walsnd->apply = InvalidXLogRecPtr;
2248 0 : walsnd->writeLag = -1;
2249 0 : walsnd->flushLag = -1;
2250 0 : walsnd->applyLag = -1;
2251 0 : walsnd->state = WALSNDSTATE_STARTUP;
2252 0 : walsnd->latch = &MyProc->procLatch;
2253 0 : SpinLockRelease(&walsnd->mutex);
2254 : /* don't need the lock anymore */
2255 0 : MyWalSnd = (WalSnd *) walsnd;
2256 :
2257 0 : break;
2258 : }
2259 : }
2260 0 : if (MyWalSnd == NULL)
2261 0 : ereport(FATAL,
2262 : (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
2263 : errmsg("number of requested standby connections "
2264 : "exceeds max_wal_senders (currently %d)",
2265 : max_wal_senders)));
2266 :
2267 : /* Arrange to clean up at walsender exit */
2268 0 : on_shmem_exit(WalSndKill, 0);
2269 0 : }
2270 :
2271 : /* Destroy the per-walsender data structure for this walsender process */
2272 : static void
2273 0 : WalSndKill(int code, Datum arg)
2274 : {
2275 0 : WalSnd *walsnd = MyWalSnd;
2276 :
2277 0 : Assert(walsnd != NULL);
2278 :
2279 0 : MyWalSnd = NULL;
2280 :
2281 0 : SpinLockAcquire(&walsnd->mutex);
2282 : /* clear latch while holding the spinlock, so it can safely be read */
2283 0 : walsnd->latch = NULL;
2284 : /* Mark WalSnd struct as no longer being in use. */
2285 0 : walsnd->pid = 0;
2286 0 : SpinLockRelease(&walsnd->mutex);
2287 0 : }
2288 :
2289 : /*
2290 : * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
2291 : *
2292 : * XXX probably this should be improved to suck data directly from the
2293 : * WAL buffers when possible.
2294 : *
2295 : * Will open, and keep open, one WAL segment stored in the global file
2296 : * descriptor sendFile. This means if XLogRead is used once, there will
2297 : * always be one descriptor left open until the process ends, but never
2298 : * more than one.
2299 : */
2300 : static void
2301 0 : XLogRead(char *buf, XLogRecPtr startptr, Size count)
2302 : {
2303 : char *p;
2304 : XLogRecPtr recptr;
2305 : Size nbytes;
2306 : XLogSegNo segno;
2307 :
2308 : retry:
2309 0 : p = buf;
2310 0 : recptr = startptr;
2311 0 : nbytes = count;
2312 :
2313 0 : while (nbytes > 0)
2314 : {
2315 : uint32 startoff;
2316 : int segbytes;
2317 : int readbytes;
2318 :
2319 0 : startoff = recptr % XLogSegSize;
2320 :
2321 0 : if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
2322 : {
2323 : char path[MAXPGPATH];
2324 :
2325 : /* Switch to another logfile segment */
2326 0 : if (sendFile >= 0)
2327 0 : close(sendFile);
2328 :
2329 0 : XLByteToSeg(recptr, sendSegNo);
2330 :
2331 : /*-------
2332 : * When reading from a historic timeline, and there is a timeline
2333 : * switch within this segment, read from the WAL segment belonging
2334 : * to the new timeline.
2335 : *
2336 : * For example, imagine that this server is currently on timeline
2337 : * 5, and we're streaming timeline 4. The switch from timeline 4
2338 : * to 5 happened at 0/13002088. In pg_wal, we have these files:
2339 : *
2340 : * ...
2341 : * 000000040000000000000012
2342 : * 000000040000000000000013
2343 : * 000000050000000000000013
2344 : * 000000050000000000000014
2345 : * ...
2346 : *
2347 : * In this situation, when requested to send the WAL from
2348 : * segment 0x13, on timeline 4, we read the WAL from file
2349 : * 000000050000000000000013. Archive recovery prefers files from
2350 : * newer timelines, so if the segment was restored from the
2351 : * archive on this server, the file belonging to the old timeline,
2352 : * 000000040000000000000013, might not exist. Their contents are
2353 : * equal up to the switchpoint, because at a timeline switch, the
2354 : * used portion of the old segment is copied to the new file.
2355 : *-------
2356 : */
2357 0 : curFileTimeLine = sendTimeLine;
2358 0 : if (sendTimeLineIsHistoric)
2359 : {
2360 : XLogSegNo endSegNo;
2361 :
2362 0 : XLByteToSeg(sendTimeLineValidUpto, endSegNo);
2363 0 : if (sendSegNo == endSegNo)
2364 0 : curFileTimeLine = sendTimeLineNextTLI;
2365 : }
2366 :
2367 0 : XLogFilePath(path, curFileTimeLine, sendSegNo);
2368 :
2369 0 : sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
2370 0 : if (sendFile < 0)
2371 : {
2372 : /*
2373 : * If the file is not found, assume it's because the standby
2374 : * asked for a too old WAL segment that has already been
2375 : * removed or recycled.
2376 : */
2377 0 : if (errno == ENOENT)
2378 0 : ereport(ERROR,
2379 : (errcode_for_file_access(),
2380 : errmsg("requested WAL segment %s has already been removed",
2381 : XLogFileNameP(curFileTimeLine, sendSegNo))));
2382 : else
2383 0 : ereport(ERROR,
2384 : (errcode_for_file_access(),
2385 : errmsg("could not open file \"%s\": %m",
2386 : path)));
2387 : }
2388 0 : sendOff = 0;
2389 : }
2390 :
2391 : /* Need to seek in the file? */
2392 0 : if (sendOff != startoff)
2393 : {
2394 0 : if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
2395 0 : ereport(ERROR,
2396 : (errcode_for_file_access(),
2397 : errmsg("could not seek in log segment %s to offset %u: %m",
2398 : XLogFileNameP(curFileTimeLine, sendSegNo),
2399 : startoff)));
2400 0 : sendOff = startoff;
2401 : }
2402 :
2403 : /* How many bytes are within this segment? */
2404 0 : if (nbytes > (XLogSegSize - startoff))
2405 0 : segbytes = XLogSegSize - startoff;
2406 : else
2407 0 : segbytes = nbytes;
2408 :
2409 0 : pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
2410 0 : readbytes = read(sendFile, p, segbytes);
2411 0 : pgstat_report_wait_end();
2412 0 : if (readbytes <= 0)
2413 : {
2414 0 : ereport(ERROR,
2415 : (errcode_for_file_access(),
2416 : errmsg("could not read from log segment %s, offset %u, length %lu: %m",
2417 : XLogFileNameP(curFileTimeLine, sendSegNo),
2418 : sendOff, (unsigned long) segbytes)));
2419 : }
2420 :
2421 : /* Update state for read */
2422 0 : recptr += readbytes;
2423 :
2424 0 : sendOff += readbytes;
2425 0 : nbytes -= readbytes;
2426 0 : p += readbytes;
2427 : }
2428 :
2429 : /*
2430 : * After reading into the buffer, check that what we read was valid. We do
2431 : * this after reading, because even though the segment was present when we
2432 : * opened it, it might get recycled or removed while we read it. The
2433 : * read() succeeds in that case, but the data we tried to read might
2434 : * already have been overwritten with new WAL records.
2435 : */
2436 0 : XLByteToSeg(startptr, segno);
2437 0 : CheckXLogRemoved(segno, ThisTimeLineID);
2438 :
2439 : /*
2440 : * During recovery, the currently-open WAL file might be replaced with the
2441 : * file of the same name retrieved from archive. So we always need to
2442 : * check what we read was valid after reading into the buffer. If it's
2443 : * invalid, we try to open and read the file again.
2444 : */
2445 0 : if (am_cascading_walsender)
2446 : {
2447 0 : WalSnd *walsnd = MyWalSnd;
2448 : bool reload;
2449 :
2450 0 : SpinLockAcquire(&walsnd->mutex);
2451 0 : reload = walsnd->needreload;
2452 0 : walsnd->needreload = false;
2453 0 : SpinLockRelease(&walsnd->mutex);
2454 :
2455 0 : if (reload && sendFile >= 0)
2456 : {
2457 0 : close(sendFile);
2458 0 : sendFile = -1;
2459 :
2460 0 : goto retry;
2461 : }
2462 : }
2463 0 : }
2464 :
2465 : /*
2466 : * Send out the WAL in its normal physical/stored form.
2467 : *
2468 : * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
2469 : * but not yet sent to the client, and buffer it in the libpq output
2470 : * buffer.
2471 : *
2472 : * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
2473 : * otherwise WalSndCaughtUp is set to false.
2474 : */
2475 : static void
2476 0 : XLogSendPhysical(void)
2477 : {
2478 : XLogRecPtr SendRqstPtr;
2479 : XLogRecPtr startptr;
2480 : XLogRecPtr endptr;
2481 : Size nbytes;
2482 :
2483 : /* If requested switch the WAL sender to the stopping state. */
2484 0 : if (got_STOPPING)
2485 0 : WalSndSetState(WALSNDSTATE_STOPPING);
2486 :
2487 0 : if (streamingDoneSending)
2488 : {
2489 0 : WalSndCaughtUp = true;
2490 0 : return;
2491 : }
2492 :
2493 : /* Figure out how far we can safely send the WAL. */
2494 0 : if (sendTimeLineIsHistoric)
2495 : {
2496 : /*
2497 : * Streaming an old timeline that's in this server's history, but is
2498 : * not the one we're currently inserting or replaying. It can be
2499 : * streamed up to the point where we switched off that timeline.
2500 : */
2501 0 : SendRqstPtr = sendTimeLineValidUpto;
2502 : }
2503 0 : else if (am_cascading_walsender)
2504 : {
2505 : /*
2506 : * Streaming the latest timeline on a standby.
2507 : *
2508 : * Attempt to send all WAL that has already been replayed, so that we
2509 : * know it's valid. If we're receiving WAL through streaming
2510 : * replication, it's also OK to send any WAL that has been received
2511 : * but not replayed.
2512 : *
2513 : * The timeline we're recovering from can change, or we can be
2514 : * promoted. In either case, the current timeline becomes historic. We
2515 : * need to detect that so that we don't try to stream past the point
2516 : * where we switched to another timeline. We check for promotion or
2517 : * timeline switch after calculating FlushPtr, to avoid a race
2518 : * condition: if the timeline becomes historic just after we checked
2519 : * that it was still current, it's still be OK to stream it up to the
2520 : * FlushPtr that was calculated before it became historic.
2521 : */
2522 0 : bool becameHistoric = false;
2523 :
2524 0 : SendRqstPtr = GetStandbyFlushRecPtr();
2525 :
2526 0 : if (!RecoveryInProgress())
2527 : {
2528 : /*
2529 : * We have been promoted. RecoveryInProgress() updated
2530 : * ThisTimeLineID to the new current timeline.
2531 : */
2532 0 : am_cascading_walsender = false;
2533 0 : becameHistoric = true;
2534 : }
2535 : else
2536 : {
2537 : /*
2538 : * Still a cascading standby. But is the timeline we're sending
2539 : * still the one recovery is recovering from? ThisTimeLineID was
2540 : * updated by the GetStandbyFlushRecPtr() call above.
2541 : */
2542 0 : if (sendTimeLine != ThisTimeLineID)
2543 0 : becameHistoric = true;
2544 : }
2545 :
2546 0 : if (becameHistoric)
2547 : {
2548 : /*
2549 : * The timeline we were sending has become historic. Read the
2550 : * timeline history file of the new timeline to see where exactly
2551 : * we forked off from the timeline we were sending.
2552 : */
2553 : List *history;
2554 :
2555 0 : history = readTimeLineHistory(ThisTimeLineID);
2556 0 : sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
2557 :
2558 0 : Assert(sendTimeLine < sendTimeLineNextTLI);
2559 0 : list_free_deep(history);
2560 :
2561 0 : sendTimeLineIsHistoric = true;
2562 :
2563 0 : SendRqstPtr = sendTimeLineValidUpto;
2564 : }
2565 : }
2566 : else
2567 : {
2568 : /*
2569 : * Streaming the current timeline on a master.
2570 : *
2571 : * Attempt to send all data that's already been written out and
2572 : * fsync'd to disk. We cannot go further than what's been written out
2573 : * given the current implementation of XLogRead(). And in any case
2574 : * it's unsafe to send WAL that is not securely down to disk on the
2575 : * master: if the master subsequently crashes and restarts, standbys
2576 : * must not have applied any WAL that got lost on the master.
2577 : */
2578 0 : SendRqstPtr = GetFlushRecPtr();
2579 : }
2580 :
2581 : /*
2582 : * Record the current system time as an approximation of the time at which
2583 : * this WAL location was written for the purposes of lag tracking.
2584 : *
2585 : * In theory we could make XLogFlush() record a time in shmem whenever WAL
2586 : * is flushed and we could get that time as well as the LSN when we call
2587 : * GetFlushRecPtr() above (and likewise for the cascading standby
2588 : * equivalent), but rather than putting any new code into the hot WAL path
2589 : * it seems good enough to capture the time here. We should reach this
2590 : * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2591 : * may take some time, we read the WAL flush pointer and take the time
2592 : * very close to together here so that we'll get a later position if it is
2593 : * still moving.
2594 : *
2595 : * Because LagTrackerWriter ignores samples when the LSN hasn't advanced,
2596 : * this gives us a cheap approximation for the WAL flush time for this
2597 : * LSN.
2598 : *
2599 : * Note that the LSN is not necessarily the LSN for the data contained in
2600 : * the present message; it's the end of the WAL, which might be further
2601 : * ahead. All the lag tracking machinery cares about is finding out when
2602 : * that arbitrary LSN is eventually reported as written, flushed and
2603 : * applied, so that it can measure the elapsed time.
2604 : */
2605 0 : LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2606 :
2607 : /*
2608 : * If this is a historic timeline and we've reached the point where we
2609 : * forked to the next timeline, stop streaming.
2610 : *
2611 : * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2612 : * startup process will normally replay all WAL that has been received
2613 : * from the master, before promoting, but if the WAL streaming is
2614 : * terminated at a WAL page boundary, the valid portion of the timeline
2615 : * might end in the middle of a WAL record. We might've already sent the
2616 : * first half of that partial WAL record to the cascading standby, so that
2617 : * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2618 : * replay the partial WAL record either, so it can still follow our
2619 : * timeline switch.
2620 : */
2621 0 : if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
2622 : {
2623 : /* close the current file. */
2624 0 : if (sendFile >= 0)
2625 0 : close(sendFile);
2626 0 : sendFile = -1;
2627 :
2628 : /* Send CopyDone */
2629 0 : pq_putmessage_noblock('c', NULL, 0);
2630 0 : streamingDoneSending = true;
2631 :
2632 0 : WalSndCaughtUp = true;
2633 :
2634 0 : elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2635 : (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto,
2636 : (uint32) (sentPtr >> 32), (uint32) sentPtr);
2637 0 : return;
2638 : }
2639 :
2640 : /* Do we have any work to do? */
2641 0 : Assert(sentPtr <= SendRqstPtr);
2642 0 : if (SendRqstPtr <= sentPtr)
2643 : {
2644 0 : WalSndCaughtUp = true;
2645 0 : return;
2646 : }
2647 :
2648 : /*
2649 : * Figure out how much to send in one message. If there's no more than
2650 : * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2651 : * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2652 : *
2653 : * The rounding is not only for performance reasons. Walreceiver relies on
2654 : * the fact that we never split a WAL record across two messages. Since a
2655 : * long WAL record is split at page boundary into continuation records,
2656 : * page boundary is always a safe cut-off point. We also assume that
2657 : * SendRqstPtr never points to the middle of a WAL record.
2658 : */
2659 0 : startptr = sentPtr;
2660 0 : endptr = startptr;
2661 0 : endptr += MAX_SEND_SIZE;
2662 :
2663 : /* if we went beyond SendRqstPtr, back off */
2664 0 : if (SendRqstPtr <= endptr)
2665 : {
2666 0 : endptr = SendRqstPtr;
2667 0 : if (sendTimeLineIsHistoric)
2668 0 : WalSndCaughtUp = false;
2669 : else
2670 0 : WalSndCaughtUp = true;
2671 : }
2672 : else
2673 : {
2674 : /* round down to page boundary. */
2675 0 : endptr -= (endptr % XLOG_BLCKSZ);
2676 0 : WalSndCaughtUp = false;
2677 : }
2678 :
2679 0 : nbytes = endptr - startptr;
2680 0 : Assert(nbytes <= MAX_SEND_SIZE);
2681 :
2682 : /*
2683 : * OK to read and send the slice.
2684 : */
2685 0 : resetStringInfo(&output_message);
2686 0 : pq_sendbyte(&output_message, 'w');
2687 :
2688 0 : pq_sendint64(&output_message, startptr); /* dataStart */
2689 0 : pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2690 0 : pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2691 :
2692 : /*
2693 : * Read the log directly into the output buffer to avoid extra memcpy
2694 : * calls.
2695 : */
2696 0 : enlargeStringInfo(&output_message, nbytes);
2697 0 : XLogRead(&output_message.data[output_message.len], startptr, nbytes);
2698 0 : output_message.len += nbytes;
2699 0 : output_message.data[output_message.len] = '\0';
2700 :
2701 : /*
2702 : * Fill the send timestamp last, so that it is taken as late as possible.
2703 : */
2704 0 : resetStringInfo(&tmpbuf);
2705 0 : pq_sendint64(&tmpbuf, GetCurrentTimestamp());
2706 0 : memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2707 0 : tmpbuf.data, sizeof(int64));
2708 :
2709 0 : pq_putmessage_noblock('d', output_message.data, output_message.len);
2710 :
2711 0 : sentPtr = endptr;
2712 :
2713 : /* Update shared memory status */
2714 : {
2715 0 : WalSnd *walsnd = MyWalSnd;
2716 :
2717 0 : SpinLockAcquire(&walsnd->mutex);
2718 0 : walsnd->sentPtr = sentPtr;
2719 0 : SpinLockRelease(&walsnd->mutex);
2720 : }
2721 :
2722 : /* Report progress of XLOG streaming in PS display */
2723 0 : if (update_process_title)
2724 : {
2725 : char activitymsg[50];
2726 :
2727 0 : snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
2728 0 : (uint32) (sentPtr >> 32), (uint32) sentPtr);
2729 0 : set_ps_display(activitymsg, false);
2730 : }
2731 :
2732 0 : return;
2733 : }
2734 :
2735 : /*
2736 : * Stream out logically decoded data.
2737 : */
2738 : static void
2739 0 : XLogSendLogical(void)
2740 : {
2741 : XLogRecord *record;
2742 : char *errm;
2743 :
2744 : /*
2745 : * Don't know whether we've caught up yet. We'll set it to true in
2746 : * WalSndWaitForWal, if we're actually waiting. We also set to true if
2747 : * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
2748 : * i.e. when we're shutting down.
2749 : */
2750 0 : WalSndCaughtUp = false;
2751 :
2752 0 : record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
2753 0 : logical_startptr = InvalidXLogRecPtr;
2754 :
2755 : /* xlog record was invalid */
2756 0 : if (errm != NULL)
2757 0 : elog(ERROR, "%s", errm);
2758 :
2759 0 : if (record != NULL)
2760 : {
2761 : /*
2762 : * Note the lack of any call to LagTrackerWrite() which is handled by
2763 : * WalSndUpdateProgress which is called by output plugin through
2764 : * logical decoding write api.
2765 : */
2766 0 : LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
2767 :
2768 0 : sentPtr = logical_decoding_ctx->reader->EndRecPtr;
2769 : }
2770 : else
2771 : {
2772 : /*
2773 : * If the record we just wanted read is at or beyond the flushed
2774 : * point, then we're caught up.
2775 : */
2776 0 : if (logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr())
2777 : {
2778 0 : WalSndCaughtUp = true;
2779 :
2780 : /*
2781 : * Have WalSndLoop() terminate the connection in an orderly
2782 : * manner, after writing out all the pending data.
2783 : */
2784 0 : if (got_STOPPING)
2785 0 : got_SIGUSR2 = true;
2786 : }
2787 : }
2788 :
2789 : /* Update shared memory status */
2790 : {
2791 0 : WalSnd *walsnd = MyWalSnd;
2792 :
2793 0 : SpinLockAcquire(&walsnd->mutex);
2794 0 : walsnd->sentPtr = sentPtr;
2795 0 : SpinLockRelease(&walsnd->mutex);
2796 : }
2797 0 : }
2798 :
2799 : /*
2800 : * Shutdown if the sender is caught up.
2801 : *
2802 : * NB: This should only be called when the shutdown signal has been received
2803 : * from postmaster.
2804 : *
2805 : * Note that if we determine that there's still more data to send, this
2806 : * function will return control to the caller.
2807 : */
2808 : static void
2809 0 : WalSndDone(WalSndSendDataCallback send_data)
2810 : {
2811 : XLogRecPtr replicatedPtr;
2812 :
2813 : /* ... let's just be real sure we're caught up ... */
2814 0 : send_data();
2815 :
2816 : /*
2817 : * To figure out whether all WAL has successfully been replicated, check
2818 : * flush location if valid, write otherwise. Tools like pg_receivewal will
2819 : * usually (unless in synchronous mode) return an invalid flush location.
2820 : */
2821 0 : replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2822 0 : MyWalSnd->write : MyWalSnd->flush;
2823 :
2824 0 : if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2825 0 : !pq_is_send_pending())
2826 : {
2827 : /* Inform the standby that XLOG streaming is done */
2828 0 : EndCommand("COPY 0", DestRemote);
2829 0 : pq_flush();
2830 :
2831 0 : proc_exit(0);
2832 : }
2833 0 : if (!waiting_for_ping_response)
2834 : {
2835 0 : WalSndKeepalive(true);
2836 0 : waiting_for_ping_response = true;
2837 : }
2838 0 : }
2839 :
2840 : /*
2841 : * Returns the latest point in WAL that has been safely flushed to disk, and
2842 : * can be sent to the standby. This should only be called when in recovery,
2843 : * ie. we're streaming to a cascaded standby.
2844 : *
2845 : * As a side-effect, ThisTimeLineID is updated to the TLI of the last
2846 : * replayed WAL record.
2847 : */
2848 : static XLogRecPtr
2849 0 : GetStandbyFlushRecPtr(void)
2850 : {
2851 : XLogRecPtr replayPtr;
2852 : TimeLineID replayTLI;
2853 : XLogRecPtr receivePtr;
2854 : TimeLineID receiveTLI;
2855 : XLogRecPtr result;
2856 :
2857 : /*
2858 : * We can safely send what's already been replayed. Also, if walreceiver
2859 : * is streaming WAL from the same timeline, we can send anything that it
2860 : * has streamed, but hasn't been replayed yet.
2861 : */
2862 :
2863 0 : receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
2864 0 : replayPtr = GetXLogReplayRecPtr(&replayTLI);
2865 :
2866 0 : ThisTimeLineID = replayTLI;
2867 :
2868 0 : result = replayPtr;
2869 0 : if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2870 0 : result = receivePtr;
2871 :
2872 0 : return result;
2873 : }
2874 :
2875 : /*
2876 : * Request walsenders to reload the currently-open WAL file
2877 : */
2878 : void
2879 0 : WalSndRqstFileReload(void)
2880 : {
2881 : int i;
2882 :
2883 0 : for (i = 0; i < max_wal_senders; i++)
2884 : {
2885 0 : WalSnd *walsnd = &WalSndCtl->walsnds[i];
2886 :
2887 0 : SpinLockAcquire(&walsnd->mutex);
2888 0 : if (walsnd->pid == 0)
2889 : {
2890 0 : SpinLockRelease(&walsnd->mutex);
2891 0 : continue;
2892 : }
2893 0 : walsnd->needreload = true;
2894 0 : SpinLockRelease(&walsnd->mutex);
2895 : }
2896 0 : }
2897 :
2898 : /*
2899 : * Handle PROCSIG_WALSND_INIT_STOPPING signal.
2900 : */
2901 : void
2902 0 : HandleWalSndInitStopping(void)
2903 : {
2904 0 : Assert(am_walsender);
2905 :
2906 : /*
2907 : * If replication has not yet started, die like with SIGTERM. If
2908 : * replication is active, only set a flag and wake up the main loop. It
2909 : * will send any outstanding WAL, wait for it to be replicated to the
2910 : * standby, and then exit gracefully.
2911 : */
2912 0 : if (!replication_active)
2913 0 : kill(MyProcPid, SIGTERM);
2914 : else
2915 0 : got_STOPPING = true;
2916 0 : }
2917 :
2918 : /*
2919 : * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
2920 : * sender should already have been switched to WALSNDSTATE_STOPPING at
2921 : * this point.
2922 : */
2923 : static void
2924 0 : WalSndLastCycleHandler(SIGNAL_ARGS)
2925 : {
2926 0 : int save_errno = errno;
2927 :
2928 0 : got_SIGUSR2 = true;
2929 0 : SetLatch(MyLatch);
2930 :
2931 0 : errno = save_errno;
2932 0 : }
2933 :
2934 : /* Set up signal handlers */
2935 : void
2936 0 : WalSndSignals(void)
2937 : {
2938 : /* Set up signal handlers */
2939 0 : pqsignal(SIGHUP, PostgresSigHupHandler); /* set flag to read config
2940 : * file */
2941 0 : pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
2942 0 : pqsignal(SIGTERM, die); /* request shutdown */
2943 0 : pqsignal(SIGQUIT, quickdie); /* hard crash time */
2944 0 : InitializeTimeouts(); /* establishes SIGALRM handler */
2945 0 : pqsignal(SIGPIPE, SIG_IGN);
2946 0 : pqsignal(SIGUSR1, procsignal_sigusr1_handler);
2947 0 : pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
2948 : * shutdown */
2949 :
2950 : /* Reset some signals that are accepted by postmaster but not here */
2951 0 : pqsignal(SIGCHLD, SIG_DFL);
2952 0 : pqsignal(SIGTTIN, SIG_DFL);
2953 0 : pqsignal(SIGTTOU, SIG_DFL);
2954 0 : pqsignal(SIGCONT, SIG_DFL);
2955 0 : pqsignal(SIGWINCH, SIG_DFL);
2956 0 : }
2957 :
2958 : /* Report shared-memory space needed by WalSndShmemInit */
2959 : Size
2960 15 : WalSndShmemSize(void)
2961 : {
2962 15 : Size size = 0;
2963 :
2964 15 : size = offsetof(WalSndCtlData, walsnds);
2965 15 : size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
2966 :
2967 15 : return size;
2968 : }
2969 :
2970 : /* Allocate and initialize walsender-related shared memory */
2971 : void
2972 5 : WalSndShmemInit(void)
2973 : {
2974 : bool found;
2975 : int i;
2976 :
2977 5 : WalSndCtl = (WalSndCtlData *)
2978 5 : ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
2979 :
2980 5 : if (!found)
2981 : {
2982 : /* First time through, so initialize */
2983 5 : MemSet(WalSndCtl, 0, WalSndShmemSize());
2984 :
2985 20 : for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
2986 15 : SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
2987 :
2988 55 : for (i = 0; i < max_wal_senders; i++)
2989 : {
2990 50 : WalSnd *walsnd = &WalSndCtl->walsnds[i];
2991 :
2992 50 : SpinLockInit(&walsnd->mutex);
2993 : }
2994 : }
2995 5 : }
2996 :
2997 : /*
2998 : * Wake up all walsenders
2999 : *
3000 : * This will be called inside critical sections, so throwing an error is not
3001 : * advisable.
3002 : */
3003 : void
3004 9609 : WalSndWakeup(void)
3005 : {
3006 : int i;
3007 :
3008 105699 : for (i = 0; i < max_wal_senders; i++)
3009 : {
3010 : Latch *latch;
3011 96090 : WalSnd *walsnd = &WalSndCtl->walsnds[i];
3012 :
3013 : /*
3014 : * Get latch pointer with spinlock held, for the unlikely case that
3015 : * pointer reads aren't atomic (as they're 8 bytes).
3016 : */
3017 96090 : SpinLockAcquire(&walsnd->mutex);
3018 96090 : latch = walsnd->latch;
3019 96090 : SpinLockRelease(&walsnd->mutex);
3020 :
3021 96090 : if (latch != NULL)
3022 0 : SetLatch(latch);
3023 : }
3024 9609 : }
3025 :
3026 : /*
3027 : * Signal all walsenders to move to stopping state.
3028 : *
3029 : * This will trigger walsenders to move to a state where no further WAL can be
3030 : * generated. See this file's header for details.
3031 : */
3032 : void
3033 3 : WalSndInitStopping(void)
3034 : {
3035 : int i;
3036 :
3037 33 : for (i = 0; i < max_wal_senders; i++)
3038 : {
3039 30 : WalSnd *walsnd = &WalSndCtl->walsnds[i];
3040 : pid_t pid;
3041 :
3042 30 : SpinLockAcquire(&walsnd->mutex);
3043 30 : pid = walsnd->pid;
3044 30 : SpinLockRelease(&walsnd->mutex);
3045 :
3046 30 : if (pid == 0)
3047 30 : continue;
3048 :
3049 0 : SendProcSignal(pid, PROCSIG_WALSND_INIT_STOPPING, InvalidBackendId);
3050 : }
3051 3 : }
3052 :
3053 : /*
3054 : * Wait that all the WAL senders have quit or reached the stopping state. This
3055 : * is used by the checkpointer to control when the shutdown checkpoint can
3056 : * safely be performed.
3057 : */
3058 : void
3059 3 : WalSndWaitStopping(void)
3060 : {
3061 : for (;;)
3062 : {
3063 : int i;
3064 3 : bool all_stopped = true;
3065 :
3066 33 : for (i = 0; i < max_wal_senders; i++)
3067 : {
3068 30 : WalSnd *walsnd = &WalSndCtl->walsnds[i];
3069 :
3070 30 : SpinLockAcquire(&walsnd->mutex);
3071 :
3072 30 : if (walsnd->pid == 0)
3073 : {
3074 30 : SpinLockRelease(&walsnd->mutex);
3075 30 : continue;
3076 : }
3077 :
3078 0 : if (walsnd->state != WALSNDSTATE_STOPPING)
3079 : {
3080 0 : all_stopped = false;
3081 0 : SpinLockRelease(&walsnd->mutex);
3082 0 : break;
3083 : }
3084 0 : SpinLockRelease(&walsnd->mutex);
3085 : }
3086 :
3087 : /* safe to leave if confirmation is done for all WAL senders */
3088 3 : if (all_stopped)
3089 6 : return;
3090 :
3091 0 : pg_usleep(10000L); /* wait for 10 msec */
3092 0 : }
3093 : }
3094 :
3095 : /* Set state for current walsender (only called in walsender) */
3096 : void
3097 0 : WalSndSetState(WalSndState state)
3098 : {
3099 0 : WalSnd *walsnd = MyWalSnd;
3100 :
3101 0 : Assert(am_walsender);
3102 :
3103 0 : if (walsnd->state == state)
3104 0 : return;
3105 :
3106 0 : SpinLockAcquire(&walsnd->mutex);
3107 0 : walsnd->state = state;
3108 0 : SpinLockRelease(&walsnd->mutex);
3109 : }
3110 :
3111 : /*
3112 : * Return a string constant representing the state. This is used
3113 : * in system views, and should *not* be translated.
3114 : */
3115 : static const char *
3116 0 : WalSndGetStateString(WalSndState state)
3117 : {
3118 0 : switch (state)
3119 : {
3120 : case WALSNDSTATE_STARTUP:
3121 0 : return "startup";
3122 : case WALSNDSTATE_BACKUP:
3123 0 : return "backup";
3124 : case WALSNDSTATE_CATCHUP:
3125 0 : return "catchup";
3126 : case WALSNDSTATE_STREAMING:
3127 0 : return "streaming";
3128 : case WALSNDSTATE_STOPPING:
3129 0 : return "stopping";
3130 : }
3131 0 : return "UNKNOWN";
3132 : }
3133 :
3134 : static Interval *
3135 0 : offset_to_interval(TimeOffset offset)
3136 : {
3137 0 : Interval *result = palloc(sizeof(Interval));
3138 :
3139 0 : result->month = 0;
3140 0 : result->day = 0;
3141 0 : result->time = offset;
3142 :
3143 0 : return result;
3144 : }
3145 :
3146 : /*
3147 : * Returns activity of walsenders, including pids and xlog locations sent to
3148 : * standby servers.
3149 : */
3150 : Datum
3151 0 : pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
3152 : {
3153 : #define PG_STAT_GET_WAL_SENDERS_COLS 11
3154 0 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3155 : TupleDesc tupdesc;
3156 : Tuplestorestate *tupstore;
3157 : MemoryContext per_query_ctx;
3158 : MemoryContext oldcontext;
3159 : List *sync_standbys;
3160 : int i;
3161 :
3162 : /* check to see if caller supports us returning a tuplestore */
3163 0 : if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3164 0 : ereport(ERROR,
3165 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3166 : errmsg("set-valued function called in context that cannot accept a set")));
3167 0 : if (!(rsinfo->allowedModes & SFRM_Materialize))
3168 0 : ereport(ERROR,
3169 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3170 : errmsg("materialize mode required, but it is not " \
3171 : "allowed in this context")));
3172 :
3173 : /* Build a tuple descriptor for our result type */
3174 0 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3175 0 : elog(ERROR, "return type must be a row type");
3176 :
3177 0 : per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3178 0 : oldcontext = MemoryContextSwitchTo(per_query_ctx);
3179 :
3180 0 : tupstore = tuplestore_begin_heap(true, false, work_mem);
3181 0 : rsinfo->returnMode = SFRM_Materialize;
3182 0 : rsinfo->setResult = tupstore;
3183 0 : rsinfo->setDesc = tupdesc;
3184 :
3185 0 : MemoryContextSwitchTo(oldcontext);
3186 :
3187 : /*
3188 : * Get the currently active synchronous standbys.
3189 : */
3190 0 : LWLockAcquire(SyncRepLock, LW_SHARED);
3191 0 : sync_standbys = SyncRepGetSyncStandbys(NULL);
3192 0 : LWLockRelease(SyncRepLock);
3193 :
3194 0 : for (i = 0; i < max_wal_senders; i++)
3195 : {
3196 0 : WalSnd *walsnd = &WalSndCtl->walsnds[i];
3197 : XLogRecPtr sentPtr;
3198 : XLogRecPtr write;
3199 : XLogRecPtr flush;
3200 : XLogRecPtr apply;
3201 : TimeOffset writeLag;
3202 : TimeOffset flushLag;
3203 : TimeOffset applyLag;
3204 : int priority;
3205 : int pid;
3206 : WalSndState state;
3207 : Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
3208 : bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3209 :
3210 0 : SpinLockAcquire(&walsnd->mutex);
3211 0 : if (walsnd->pid == 0)
3212 : {
3213 0 : SpinLockRelease(&walsnd->mutex);
3214 0 : continue;
3215 : }
3216 0 : pid = walsnd->pid;
3217 0 : sentPtr = walsnd->sentPtr;
3218 0 : state = walsnd->state;
3219 0 : write = walsnd->write;
3220 0 : flush = walsnd->flush;
3221 0 : apply = walsnd->apply;
3222 0 : writeLag = walsnd->writeLag;
3223 0 : flushLag = walsnd->flushLag;
3224 0 : applyLag = walsnd->applyLag;
3225 0 : priority = walsnd->sync_standby_priority;
3226 0 : SpinLockRelease(&walsnd->mutex);
3227 :
3228 0 : memset(nulls, 0, sizeof(nulls));
3229 0 : values[0] = Int32GetDatum(pid);
3230 :
3231 0 : if (!superuser())
3232 : {
3233 : /*
3234 : * Only superusers can see details. Other users only get the pid
3235 : * value to know it's a walsender, but no details.
3236 : */
3237 0 : MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3238 : }
3239 : else
3240 : {
3241 0 : values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3242 :
3243 0 : if (XLogRecPtrIsInvalid(sentPtr))
3244 0 : nulls[2] = true;
3245 0 : values[2] = LSNGetDatum(sentPtr);
3246 :
3247 0 : if (XLogRecPtrIsInvalid(write))
3248 0 : nulls[3] = true;
3249 0 : values[3] = LSNGetDatum(write);
3250 :
3251 0 : if (XLogRecPtrIsInvalid(flush))
3252 0 : nulls[4] = true;
3253 0 : values[4] = LSNGetDatum(flush);
3254 :
3255 0 : if (XLogRecPtrIsInvalid(apply))
3256 0 : nulls[5] = true;
3257 0 : values[5] = LSNGetDatum(apply);
3258 :
3259 : /*
3260 : * Treat a standby such as a pg_basebackup background process
3261 : * which always returns an invalid flush location, as an
3262 : * asynchronous standby.
3263 : */
3264 0 : priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3265 :
3266 0 : if (writeLag < 0)
3267 0 : nulls[6] = true;
3268 : else
3269 0 : values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3270 :
3271 0 : if (flushLag < 0)
3272 0 : nulls[7] = true;
3273 : else
3274 0 : values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3275 :
3276 0 : if (applyLag < 0)
3277 0 : nulls[8] = true;
3278 : else
3279 0 : values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3280 :
3281 0 : values[9] = Int32GetDatum(priority);
3282 :
3283 : /*
3284 : * More easily understood version of standby state. This is purely
3285 : * informational.
3286 : *
3287 : * In quorum-based sync replication, the role of each standby
3288 : * listed in synchronous_standby_names can be changing very
3289 : * frequently. Any standbys considered as "sync" at one moment can
3290 : * be switched to "potential" ones at the next moment. So, it's
3291 : * basically useless to report "sync" or "potential" as their sync
3292 : * states. We report just "quorum" for them.
3293 : */
3294 0 : if (priority == 0)
3295 0 : values[10] = CStringGetTextDatum("async");
3296 0 : else if (list_member_int(sync_standbys, i))
3297 0 : values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
3298 0 : CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3299 : else
3300 0 : values[10] = CStringGetTextDatum("potential");
3301 : }
3302 :
3303 0 : tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3304 : }
3305 :
3306 : /* clean up and return the tuplestore */
3307 : tuplestore_donestoring(tupstore);
3308 :
3309 0 : return (Datum) 0;
3310 : }
3311 :
3312 : /*
3313 : * This function is used to send a keepalive message to standby.
3314 : * If requestReply is set, sets a flag in the message requesting the standby
3315 : * to send a message back to us, for heartbeat purposes.
3316 : */
3317 : static void
3318 0 : WalSndKeepalive(bool requestReply)
3319 : {
3320 0 : elog(DEBUG2, "sending replication keepalive");
3321 :
3322 : /* construct the message... */
3323 0 : resetStringInfo(&output_message);
3324 0 : pq_sendbyte(&output_message, 'k');
3325 0 : pq_sendint64(&output_message, sentPtr);
3326 0 : pq_sendint64(&output_message, GetCurrentTimestamp());
3327 0 : pq_sendbyte(&output_message, requestReply ? 1 : 0);
3328 :
3329 : /* ... and send it wrapped in CopyData */
3330 0 : pq_putmessage_noblock('d', output_message.data, output_message.len);
3331 0 : }
3332 :
3333 : /*
3334 : * Send keepalive message if too much time has elapsed.
3335 : */
3336 : static void
3337 0 : WalSndKeepaliveIfNecessary(TimestampTz now)
3338 : {
3339 : TimestampTz ping_time;
3340 :
3341 : /*
3342 : * Don't send keepalive messages if timeouts are globally disabled or
3343 : * we're doing something not partaking in timeouts.
3344 : */
3345 0 : if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3346 0 : return;
3347 :
3348 0 : if (waiting_for_ping_response)
3349 0 : return;
3350 :
3351 : /*
3352 : * If half of wal_sender_timeout has lapsed without receiving any reply
3353 : * from the standby, send a keep-alive message to the standby requesting
3354 : * an immediate reply.
3355 : */
3356 0 : ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
3357 : wal_sender_timeout / 2);
3358 0 : if (now >= ping_time)
3359 : {
3360 0 : WalSndKeepalive(true);
3361 0 : waiting_for_ping_response = true;
3362 :
3363 : /* Try to flush pending output to the client */
3364 0 : if (pq_flush_if_writable() != 0)
3365 0 : WalSndShutdown();
3366 : }
3367 : }
3368 :
3369 : /*
3370 : * Record the end of the WAL and the time it was flushed locally, so that
3371 : * LagTrackerRead can compute the elapsed time (lag) when this WAL location is
3372 : * eventually reported to have been written, flushed and applied by the
3373 : * standby in a reply message.
3374 : */
3375 : static void
3376 0 : LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
3377 : {
3378 : bool buffer_full;
3379 : int new_write_head;
3380 : int i;
3381 :
3382 0 : if (!am_walsender)
3383 0 : return;
3384 :
3385 : /*
3386 : * If the lsn hasn't advanced since last time, then do nothing. This way
3387 : * we only record a new sample when new WAL has been written.
3388 : */
3389 0 : if (LagTracker.last_lsn == lsn)
3390 0 : return;
3391 0 : LagTracker.last_lsn = lsn;
3392 :
3393 : /*
3394 : * If advancing the write head of the circular buffer would crash into any
3395 : * of the read heads, then the buffer is full. In other words, the
3396 : * slowest reader (presumably apply) is the one that controls the release
3397 : * of space.
3398 : */
3399 0 : new_write_head = (LagTracker.write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3400 0 : buffer_full = false;
3401 0 : for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3402 : {
3403 0 : if (new_write_head == LagTracker.read_heads[i])
3404 0 : buffer_full = true;
3405 : }
3406 :
3407 : /*
3408 : * If the buffer is full, for now we just rewind by one slot and overwrite
3409 : * the last sample, as a simple (if somewhat uneven) way to lower the
3410 : * sampling rate. There may be better adaptive compaction algorithms.
3411 : */
3412 0 : if (buffer_full)
3413 : {
3414 0 : new_write_head = LagTracker.write_head;
3415 0 : if (LagTracker.write_head > 0)
3416 0 : LagTracker.write_head--;
3417 : else
3418 0 : LagTracker.write_head = LAG_TRACKER_BUFFER_SIZE - 1;
3419 : }
3420 :
3421 : /* Store a sample at the current write head position. */
3422 0 : LagTracker.buffer[LagTracker.write_head].lsn = lsn;
3423 0 : LagTracker.buffer[LagTracker.write_head].time = local_flush_time;
3424 0 : LagTracker.write_head = new_write_head;
3425 : }
3426 :
3427 : /*
3428 : * Find out how much time has elapsed between the moment WAL location 'lsn'
3429 : * (or the highest known earlier LSN) was flushed locally and the time 'now'.
3430 : * We have a separate read head for each of the reported LSN locations we
3431 : * receive in replies from standby; 'head' controls which read head is
3432 : * used. Whenever a read head crosses an LSN which was written into the
3433 : * lag buffer with LagTrackerWrite, we can use the associated timestamp to
3434 : * find out the time this LSN (or an earlier one) was flushed locally, and
3435 : * therefore compute the lag.
3436 : *
3437 : * Return -1 if no new sample data is available, and otherwise the elapsed
3438 : * time in microseconds.
3439 : */
3440 : static TimeOffset
3441 0 : LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
3442 : {
3443 0 : TimestampTz time = 0;
3444 :
3445 : /* Read all unread samples up to this LSN or end of buffer. */
3446 0 : while (LagTracker.read_heads[head] != LagTracker.write_head &&
3447 0 : LagTracker.buffer[LagTracker.read_heads[head]].lsn <= lsn)
3448 : {
3449 0 : time = LagTracker.buffer[LagTracker.read_heads[head]].time;
3450 0 : LagTracker.last_read[head] =
3451 0 : LagTracker.buffer[LagTracker.read_heads[head]];
3452 0 : LagTracker.read_heads[head] =
3453 0 : (LagTracker.read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
3454 : }
3455 :
3456 : /*
3457 : * If the lag tracker is empty, that means the standby has processed
3458 : * everything we've ever sent so we should now clear 'last_read'. If we
3459 : * didn't do that, we'd risk using a stale and irrelevant sample for
3460 : * interpolation at the beginning of the next burst of WAL after a period
3461 : * of idleness.
3462 : */
3463 0 : if (LagTracker.read_heads[head] == LagTracker.write_head)
3464 0 : LagTracker.last_read[head].time = 0;
3465 :
3466 0 : if (time > now)
3467 : {
3468 : /* If the clock somehow went backwards, treat as not found. */
3469 0 : return -1;
3470 : }
3471 0 : else if (time == 0)
3472 : {
3473 : /*
3474 : * We didn't cross a time. If there is a future sample that we
3475 : * haven't reached yet, and we've already reached at least one sample,
3476 : * let's interpolate the local flushed time. This is mainly useful
3477 : * for reporting a completely stuck apply position as having
3478 : * increasing lag, since otherwise we'd have to wait for it to
3479 : * eventually start moving again and cross one of our samples before
3480 : * we can show the lag increasing.
3481 : */
3482 0 : if (LagTracker.read_heads[head] == LagTracker.write_head)
3483 : {
3484 : /* There are no future samples, so we can't interpolate. */
3485 0 : return -1;
3486 : }
3487 0 : else if (LagTracker.last_read[head].time != 0)
3488 : {
3489 : /* We can interpolate between last_read and the next sample. */
3490 : double fraction;
3491 0 : WalTimeSample prev = LagTracker.last_read[head];
3492 0 : WalTimeSample next = LagTracker.buffer[LagTracker.read_heads[head]];
3493 :
3494 0 : if (lsn < prev.lsn)
3495 : {
3496 : /*
3497 : * Reported LSNs shouldn't normally go backwards, but it's
3498 : * possible when there is a timeline change. Treat as not
3499 : * found.
3500 : */
3501 0 : return -1;
3502 : }
3503 :
3504 0 : Assert(prev.lsn < next.lsn);
3505 :
3506 0 : if (prev.time > next.time)
3507 : {
3508 : /* If the clock somehow went backwards, treat as not found. */
3509 0 : return -1;
3510 : }
3511 :
3512 : /* See how far we are between the previous and next samples. */
3513 0 : fraction =
3514 0 : (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3515 :
3516 : /* Scale the local flush time proportionally. */
3517 0 : time = (TimestampTz)
3518 0 : ((double) prev.time + (next.time - prev.time) * fraction);
3519 : }
3520 : else
3521 : {
3522 : /*
3523 : * We have only a future sample, implying that we were entirely
3524 : * caught up but and now there is a new burst of WAL and the
3525 : * standby hasn't processed the first sample yet. Until the
3526 : * standby reaches the future sample the best we can do is report
3527 : * the hypothetical lag if that sample were to be replayed now.
3528 : */
3529 0 : time = LagTracker.buffer[LagTracker.read_heads[head]].time;
3530 : }
3531 : }
3532 :
3533 : /* Return the elapsed time since local flush time in microseconds. */
3534 0 : Assert(time != 0);
3535 0 : return now - time;
3536 : }
|