Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * walreceiver.c
4 : *
5 : * The WAL receiver process (walreceiver) is new as of Postgres 9.0. It
6 : * is the process in the standby server that takes charge of receiving
7 : * XLOG records from a primary server during streaming replication.
8 : *
9 : * When the startup process determines that it's time to start streaming,
10 : * it instructs postmaster to start walreceiver. Walreceiver first connects
11 : * to the primary server (it will be served by a walsender process
12 : * in the primary server), and then keeps receiving XLOG records and
13 : * writing them to the disk as long as the connection is alive. As XLOG
14 : * records are received and flushed to disk, it updates the
15 : * WalRcv->receivedUpto variable in shared memory, to inform the startup
16 : * process of how far it can proceed with XLOG replay.
17 : *
18 : * If the primary server ends streaming, but doesn't disconnect, walreceiver
19 : * goes into "waiting" mode, and waits for the startup process to give new
20 : * instructions. The startup process will treat that the same as
21 : * disconnection, and will rescan the archive/pg_wal directory. But when the
22 : * startup process wants to try streaming replication again, it will just
23 : * nudge the existing walreceiver process that's waiting, instead of launching
24 : * a new one.
25 : *
26 : * Normal termination is by SIGTERM, which instructs the walreceiver to
27 : * exit(0). Emergency termination is by SIGQUIT; like any postmaster child
28 : * process, the walreceiver will simply abort and exit on SIGQUIT. A close
29 : * of the connection and a FATAL error are treated not as a crash but as
30 : * normal operation.
31 : *
32 : * This file contains the server-facing parts of walreceiver. The libpq-
33 : * specific parts are in the libpqwalreceiver module. It's loaded
34 : * dynamically to avoid linking the server with libpq.
35 : *
36 : * Portions Copyright (c) 2010-2017, PostgreSQL Global Development Group
37 : *
38 : *
39 : * IDENTIFICATION
40 : * src/backend/replication/walreceiver.c
41 : *
42 : *-------------------------------------------------------------------------
43 : */
44 : #include "postgres.h"
45 :
46 : #include <signal.h>
47 : #include <unistd.h>
48 :
49 : #include "access/htup_details.h"
50 : #include "access/timeline.h"
51 : #include "access/transam.h"
52 : #include "access/xlog_internal.h"
53 : #include "catalog/pg_authid.h"
54 : #include "catalog/pg_type.h"
55 : #include "funcapi.h"
56 : #include "libpq/pqformat.h"
57 : #include "libpq/pqsignal.h"
58 : #include "miscadmin.h"
59 : #include "pgstat.h"
60 : #include "replication/walreceiver.h"
61 : #include "replication/walsender.h"
62 : #include "storage/ipc.h"
63 : #include "storage/pmsignal.h"
64 : #include "storage/procarray.h"
65 : #include "utils/builtins.h"
66 : #include "utils/guc.h"
67 : #include "utils/pg_lsn.h"
68 : #include "utils/ps_status.h"
69 : #include "utils/resowner.h"
70 : #include "utils/timestamp.h"
71 :
72 :
73 : /* GUC variables */
74 : int wal_receiver_status_interval;
75 : int wal_receiver_timeout;
76 : bool hot_standby_feedback;
77 :
78 : /* libpqwalreceiver connection */
79 : static WalReceiverConn *wrconn = NULL;
80 : WalReceiverFunctionsType *WalReceiverFunctions = NULL;
81 :
82 : #define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
83 :
84 : /*
85 : * These variables are used similarly to openLogFile/SegNo/Off,
86 : * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
87 : * corresponding the filename of recvFile.
88 : */
89 : static int recvFile = -1;
90 : static TimeLineID recvFileTLI = 0;
91 : static XLogSegNo recvSegNo = 0;
92 : static uint32 recvOff = 0;
93 :
94 : /*
95 : * Flags set by interrupt handlers of walreceiver for later service in the
96 : * main loop.
97 : */
98 : static volatile sig_atomic_t got_SIGHUP = false;
99 : static volatile sig_atomic_t got_SIGTERM = false;
100 :
101 : /*
102 : * LogstreamResult indicates the byte positions that we have already
103 : * written/fsynced.
104 : */
105 : static struct
106 : {
107 : XLogRecPtr Write; /* last byte + 1 written out in the standby */
108 : XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
109 : } LogstreamResult;
110 :
111 : static StringInfoData reply_message;
112 : static StringInfoData incoming_message;
113 :
114 : /*
115 : * About SIGTERM handling:
116 : *
117 : * We can't just exit(1) within SIGTERM signal handler, because the signal
118 : * might arrive in the middle of some critical operation, like while we're
119 : * holding a spinlock. We also can't just set a flag in signal handler and
120 : * check it in the main loop, because we perform some blocking operations
121 : * like libpqrcv_PQexec(), which can take a long time to finish.
122 : *
123 : * We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
124 : * safe for the signal handler to elog(FATAL) immediately. Otherwise it just
125 : * sets got_SIGTERM flag, which is checked in the main loop when convenient.
126 : *
127 : * This is very much like what regular backends do with ImmediateInterruptOK,
128 : * ProcessInterrupts() etc.
129 : */
130 : static volatile bool WalRcvImmediateInterruptOK = false;
131 :
132 : /* Prototypes for private functions */
133 : static void ProcessWalRcvInterrupts(void);
134 : static void EnableWalRcvImmediateExit(void);
135 : static void DisableWalRcvImmediateExit(void);
136 : static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
137 : static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
138 : static void WalRcvDie(int code, Datum arg);
139 : static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
140 : static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
141 : static void XLogWalRcvFlush(bool dying);
142 : static void XLogWalRcvSendReply(bool force, bool requestReply);
143 : static void XLogWalRcvSendHSFeedback(bool immed);
144 : static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
145 :
146 : /* Signal handlers */
147 : static void WalRcvSigHupHandler(SIGNAL_ARGS);
148 : static void WalRcvSigUsr1Handler(SIGNAL_ARGS);
149 : static void WalRcvShutdownHandler(SIGNAL_ARGS);
150 : static void WalRcvQuickDieHandler(SIGNAL_ARGS);
151 :
152 :
153 : static void
154 0 : ProcessWalRcvInterrupts(void)
155 : {
156 : /*
157 : * Although walreceiver interrupt handling doesn't use the same scheme as
158 : * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
159 : * any incoming signals on Win32.
160 : */
161 0 : CHECK_FOR_INTERRUPTS();
162 :
163 0 : if (got_SIGTERM)
164 : {
165 0 : WalRcvImmediateInterruptOK = false;
166 0 : ereport(FATAL,
167 : (errcode(ERRCODE_ADMIN_SHUTDOWN),
168 : errmsg("terminating walreceiver process due to administrator command")));
169 : }
170 0 : }
171 :
172 : static void
173 0 : EnableWalRcvImmediateExit(void)
174 : {
175 0 : WalRcvImmediateInterruptOK = true;
176 0 : ProcessWalRcvInterrupts();
177 0 : }
178 :
179 : static void
180 0 : DisableWalRcvImmediateExit(void)
181 : {
182 0 : WalRcvImmediateInterruptOK = false;
183 0 : ProcessWalRcvInterrupts();
184 0 : }
185 :
186 : /* Main entry point for walreceiver process */
187 : void
188 0 : WalReceiverMain(void)
189 : {
190 : char conninfo[MAXCONNINFO];
191 : char *tmp_conninfo;
192 : char slotname[NAMEDATALEN];
193 : XLogRecPtr startpoint;
194 : TimeLineID startpointTLI;
195 : TimeLineID primaryTLI;
196 : bool first_stream;
197 0 : WalRcvData *walrcv = WalRcv;
198 : TimestampTz last_recv_timestamp;
199 : bool ping_sent;
200 : char *err;
201 :
202 : /*
203 : * WalRcv should be set up already (if we are a backend, we inherit this
204 : * by fork() or EXEC_BACKEND mechanism from the postmaster).
205 : */
206 0 : Assert(walrcv != NULL);
207 :
208 : /*
209 : * Mark walreceiver as running in shared memory.
210 : *
211 : * Do this as early as possible, so that if we fail later on, we'll set
212 : * state to STOPPED. If we die before this, the startup process will keep
213 : * waiting for us to start up, until it times out.
214 : */
215 0 : SpinLockAcquire(&walrcv->mutex);
216 0 : Assert(walrcv->pid == 0);
217 0 : switch (walrcv->walRcvState)
218 : {
219 : case WALRCV_STOPPING:
220 : /* If we've already been requested to stop, don't start up. */
221 0 : walrcv->walRcvState = WALRCV_STOPPED;
222 : /* fall through */
223 :
224 : case WALRCV_STOPPED:
225 0 : SpinLockRelease(&walrcv->mutex);
226 0 : proc_exit(1);
227 : break;
228 :
229 : case WALRCV_STARTING:
230 : /* The usual case */
231 0 : break;
232 :
233 : case WALRCV_WAITING:
234 : case WALRCV_STREAMING:
235 : case WALRCV_RESTARTING:
236 : default:
237 : /* Shouldn't happen */
238 0 : elog(PANIC, "walreceiver still running according to shared memory state");
239 : }
240 : /* Advertise our PID so that the startup process can kill us */
241 0 : walrcv->pid = MyProcPid;
242 0 : walrcv->walRcvState = WALRCV_STREAMING;
243 :
244 : /* Fetch information required to start streaming */
245 0 : walrcv->ready_to_display = false;
246 0 : strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
247 0 : strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
248 0 : startpoint = walrcv->receiveStart;
249 0 : startpointTLI = walrcv->receiveStartTLI;
250 :
251 : /* Initialise to a sanish value */
252 0 : walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = GetCurrentTimestamp();
253 :
254 0 : SpinLockRelease(&walrcv->mutex);
255 :
256 : /* Arrange to clean up at walreceiver exit */
257 0 : on_shmem_exit(WalRcvDie, 0);
258 :
259 0 : walrcv->latch = &MyProc->procLatch;
260 :
261 : /* Properly accept or ignore signals the postmaster might send us */
262 0 : pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config file */
263 0 : pqsignal(SIGINT, SIG_IGN);
264 0 : pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */
265 0 : pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */
266 0 : pqsignal(SIGALRM, SIG_IGN);
267 0 : pqsignal(SIGPIPE, SIG_IGN);
268 0 : pqsignal(SIGUSR1, WalRcvSigUsr1Handler);
269 0 : pqsignal(SIGUSR2, SIG_IGN);
270 :
271 : /* Reset some signals that are accepted by postmaster but not here */
272 0 : pqsignal(SIGCHLD, SIG_DFL);
273 0 : pqsignal(SIGTTIN, SIG_DFL);
274 0 : pqsignal(SIGTTOU, SIG_DFL);
275 0 : pqsignal(SIGCONT, SIG_DFL);
276 0 : pqsignal(SIGWINCH, SIG_DFL);
277 :
278 : /* We allow SIGQUIT (quickdie) at all times */
279 0 : sigdelset(&BlockSig, SIGQUIT);
280 :
281 : /* Load the libpq-specific functions */
282 0 : load_file("libpqwalreceiver", false);
283 0 : if (WalReceiverFunctions == NULL)
284 0 : elog(ERROR, "libpqwalreceiver didn't initialize correctly");
285 :
286 : /*
287 : * Create a resource owner to keep track of our resources (not clear that
288 : * we need this, but may as well have one).
289 : */
290 0 : CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");
291 :
292 : /* Unblock signals (they were blocked when the postmaster forked us) */
293 0 : PG_SETMASK(&UnBlockSig);
294 :
295 : /* Establish the connection to the primary for XLOG streaming */
296 0 : EnableWalRcvImmediateExit();
297 0 : wrconn = walrcv_connect(conninfo, false, "walreceiver", &err);
298 0 : if (!wrconn)
299 0 : ereport(ERROR,
300 : (errmsg("could not connect to the primary server: %s", err)));
301 0 : DisableWalRcvImmediateExit();
302 :
303 : /*
304 : * Save user-visible connection string. This clobbers the original
305 : * conninfo, for security.
306 : */
307 0 : tmp_conninfo = walrcv_get_conninfo(wrconn);
308 0 : SpinLockAcquire(&walrcv->mutex);
309 0 : memset(walrcv->conninfo, 0, MAXCONNINFO);
310 0 : if (tmp_conninfo)
311 : {
312 0 : strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
313 0 : pfree(tmp_conninfo);
314 : }
315 0 : walrcv->ready_to_display = true;
316 0 : SpinLockRelease(&walrcv->mutex);
317 :
318 0 : first_stream = true;
319 : for (;;)
320 : {
321 : char *primary_sysid;
322 : char standby_sysid[32];
323 : int server_version;
324 : WalRcvStreamOptions options;
325 :
326 : /*
327 : * Check that we're connected to a valid server using the
328 : * IDENTIFY_SYSTEM replication command.
329 : */
330 0 : EnableWalRcvImmediateExit();
331 0 : primary_sysid = walrcv_identify_system(wrconn, &primaryTLI,
332 : &server_version);
333 :
334 0 : snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
335 : GetSystemIdentifier());
336 0 : if (strcmp(primary_sysid, standby_sysid) != 0)
337 : {
338 0 : ereport(ERROR,
339 : (errmsg("database system identifier differs between the primary and standby"),
340 : errdetail("The primary's identifier is %s, the standby's identifier is %s.",
341 : primary_sysid, standby_sysid)));
342 : }
343 0 : DisableWalRcvImmediateExit();
344 :
345 : /*
346 : * Confirm that the current timeline of the primary is the same or
347 : * ahead of ours.
348 : */
349 0 : if (primaryTLI < startpointTLI)
350 0 : ereport(ERROR,
351 : (errmsg("highest timeline %u of the primary is behind recovery timeline %u",
352 : primaryTLI, startpointTLI)));
353 :
354 : /*
355 : * Get any missing history files. We do this always, even when we're
356 : * not interested in that timeline, so that if we're promoted to
357 : * become the master later on, we don't select the same timeline that
358 : * was already used in the current master. This isn't bullet-proof -
359 : * you'll need some external software to manage your cluster if you
360 : * need to ensure that a unique timeline id is chosen in every case,
361 : * but let's avoid the confusion of timeline id collisions where we
362 : * can.
363 : */
364 0 : WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
365 :
366 : /*
367 : * Start streaming.
368 : *
369 : * We'll try to start at the requested starting point and timeline,
370 : * even if it's different from the server's latest timeline. In case
371 : * we've already reached the end of the old timeline, the server will
372 : * finish the streaming immediately, and we will go back to await
373 : * orders from the startup process. If recovery_target_timeline is
374 : * 'latest', the startup process will scan pg_wal and find the new
375 : * history file, bump recovery target timeline, and ask us to restart
376 : * on the new timeline.
377 : */
378 0 : options.logical = false;
379 0 : options.startpoint = startpoint;
380 0 : options.slotname = slotname[0] != '\0' ? slotname : NULL;
381 0 : options.proto.physical.startpointTLI = startpointTLI;
382 0 : ThisTimeLineID = startpointTLI;
383 0 : if (walrcv_startstreaming(wrconn, &options))
384 : {
385 0 : if (first_stream)
386 0 : ereport(LOG,
387 : (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
388 : (uint32) (startpoint >> 32), (uint32) startpoint,
389 : startpointTLI)));
390 : else
391 0 : ereport(LOG,
392 : (errmsg("restarted WAL streaming at %X/%X on timeline %u",
393 : (uint32) (startpoint >> 32), (uint32) startpoint,
394 : startpointTLI)));
395 0 : first_stream = false;
396 :
397 : /* Initialize LogstreamResult and buffers for processing messages */
398 0 : LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
399 0 : initStringInfo(&reply_message);
400 0 : initStringInfo(&incoming_message);
401 :
402 : /* Initialize the last recv timestamp */
403 0 : last_recv_timestamp = GetCurrentTimestamp();
404 0 : ping_sent = false;
405 :
406 : /* Loop until end-of-streaming or error */
407 : for (;;)
408 : {
409 : char *buf;
410 : int len;
411 0 : bool endofwal = false;
412 0 : pgsocket wait_fd = PGINVALID_SOCKET;
413 : int rc;
414 :
415 : /*
416 : * Exit walreceiver if we're not in recovery. This should not
417 : * happen, but cross-check the status here.
418 : */
419 0 : if (!RecoveryInProgress())
420 0 : ereport(FATAL,
421 : (errmsg("cannot continue WAL streaming, recovery has already ended")));
422 :
423 : /* Process any requests or signals received recently */
424 0 : ProcessWalRcvInterrupts();
425 :
426 0 : if (got_SIGHUP)
427 : {
428 0 : got_SIGHUP = false;
429 0 : ProcessConfigFile(PGC_SIGHUP);
430 0 : XLogWalRcvSendHSFeedback(true);
431 : }
432 :
433 : /* See if we can read data immediately */
434 0 : len = walrcv_receive(wrconn, &buf, &wait_fd);
435 0 : if (len != 0)
436 : {
437 : /*
438 : * Process the received data, and any subsequent data we
439 : * can read without blocking.
440 : */
441 : for (;;)
442 : {
443 0 : if (len > 0)
444 : {
445 : /*
446 : * Something was received from master, so reset
447 : * timeout
448 : */
449 0 : last_recv_timestamp = GetCurrentTimestamp();
450 0 : ping_sent = false;
451 0 : XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
452 : }
453 0 : else if (len == 0)
454 0 : break;
455 0 : else if (len < 0)
456 : {
457 0 : ereport(LOG,
458 : (errmsg("replication terminated by primary server"),
459 : errdetail("End of WAL reached on timeline %u at %X/%X.",
460 : startpointTLI,
461 : (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
462 0 : endofwal = true;
463 0 : break;
464 : }
465 0 : len = walrcv_receive(wrconn, &buf, &wait_fd);
466 0 : }
467 :
468 : /* Let the master know that we received some data. */
469 0 : XLogWalRcvSendReply(false, false);
470 :
471 : /*
472 : * If we've written some records, flush them to disk and
473 : * let the startup process and primary server know about
474 : * them.
475 : */
476 0 : XLogWalRcvFlush(false);
477 : }
478 :
479 : /* Check if we need to exit the streaming loop. */
480 0 : if (endofwal)
481 0 : break;
482 :
483 : /*
484 : * Ideally we would reuse a WaitEventSet object repeatedly
485 : * here to avoid the overheads of WaitLatchOrSocket on epoll
486 : * systems, but we can't be sure that libpq (or any other
487 : * walreceiver implementation) has the same socket (even if
488 : * the fd is the same number, it may have been closed and
489 : * reopened since the last time). In future, if there is a
490 : * function for removing sockets from WaitEventSet, then we
491 : * could add and remove just the socket each time, potentially
492 : * avoiding some system calls.
493 : */
494 0 : Assert(wait_fd != PGINVALID_SOCKET);
495 0 : rc = WaitLatchOrSocket(walrcv->latch,
496 : WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
497 : WL_TIMEOUT | WL_LATCH_SET,
498 : wait_fd,
499 : NAPTIME_PER_CYCLE,
500 : WAIT_EVENT_WAL_RECEIVER_MAIN);
501 0 : if (rc & WL_LATCH_SET)
502 : {
503 0 : ResetLatch(walrcv->latch);
504 0 : if (walrcv->force_reply)
505 : {
506 : /*
507 : * The recovery process has asked us to send apply
508 : * feedback now. Make sure the flag is really set to
509 : * false in shared memory before sending the reply, so
510 : * we don't miss a new request for a reply.
511 : */
512 0 : walrcv->force_reply = false;
513 0 : pg_memory_barrier();
514 0 : XLogWalRcvSendReply(true, false);
515 : }
516 : }
517 0 : if (rc & WL_POSTMASTER_DEATH)
518 : {
519 : /*
520 : * Emergency bailout if postmaster has died. This is to
521 : * avoid the necessity for manual cleanup of all
522 : * postmaster children.
523 : */
524 0 : exit(1);
525 : }
526 0 : if (rc & WL_TIMEOUT)
527 : {
528 : /*
529 : * We didn't receive anything new. If we haven't heard
530 : * anything from the server for more than
531 : * wal_receiver_timeout / 2, ping the server. Also, if
532 : * it's been longer than wal_receiver_status_interval
533 : * since the last update we sent, send a status update to
534 : * the master anyway, to report any progress in applying
535 : * WAL.
536 : */
537 0 : bool requestReply = false;
538 :
539 : /*
540 : * Check if time since last receive from standby has
541 : * reached the configured limit.
542 : */
543 0 : if (wal_receiver_timeout > 0)
544 : {
545 0 : TimestampTz now = GetCurrentTimestamp();
546 : TimestampTz timeout;
547 :
548 0 : timeout =
549 0 : TimestampTzPlusMilliseconds(last_recv_timestamp,
550 : wal_receiver_timeout);
551 :
552 0 : if (now >= timeout)
553 0 : ereport(ERROR,
554 : (errmsg("terminating walreceiver due to timeout")));
555 :
556 : /*
557 : * We didn't receive anything new, for half of
558 : * receiver replication timeout. Ping the server.
559 : */
560 0 : if (!ping_sent)
561 : {
562 0 : timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
563 : (wal_receiver_timeout / 2));
564 0 : if (now >= timeout)
565 : {
566 0 : requestReply = true;
567 0 : ping_sent = true;
568 : }
569 : }
570 : }
571 :
572 0 : XLogWalRcvSendReply(requestReply, requestReply);
573 0 : XLogWalRcvSendHSFeedback(false);
574 : }
575 0 : }
576 :
577 : /*
578 : * The backend finished streaming. Exit streaming COPY-mode from
579 : * our side, too.
580 : */
581 0 : EnableWalRcvImmediateExit();
582 0 : walrcv_endstreaming(wrconn, &primaryTLI);
583 0 : DisableWalRcvImmediateExit();
584 :
585 : /*
586 : * If the server had switched to a new timeline that we didn't
587 : * know about when we began streaming, fetch its timeline history
588 : * file now.
589 : */
590 0 : WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
591 : }
592 : else
593 0 : ereport(LOG,
594 : (errmsg("primary server contains no more WAL on requested timeline %u",
595 : startpointTLI)));
596 :
597 : /*
598 : * End of WAL reached on the requested timeline. Close the last
599 : * segment, and await for new orders from the startup process.
600 : */
601 0 : if (recvFile >= 0)
602 : {
603 : char xlogfname[MAXFNAMELEN];
604 :
605 0 : XLogWalRcvFlush(false);
606 0 : if (close(recvFile) != 0)
607 0 : ereport(PANIC,
608 : (errcode_for_file_access(),
609 : errmsg("could not close log segment %s: %m",
610 : XLogFileNameP(recvFileTLI, recvSegNo))));
611 :
612 : /*
613 : * Create .done file forcibly to prevent the streamed segment from
614 : * being archived later.
615 : */
616 0 : XLogFileName(xlogfname, recvFileTLI, recvSegNo);
617 0 : if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
618 0 : XLogArchiveForceDone(xlogfname);
619 : else
620 0 : XLogArchiveNotify(xlogfname);
621 : }
622 0 : recvFile = -1;
623 :
624 0 : elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
625 0 : WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
626 0 : }
627 : /* not reached */
628 : }
629 :
630 : /*
631 : * Wait for startup process to set receiveStart and receiveStartTLI.
632 : */
633 : static void
634 0 : WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
635 : {
636 0 : WalRcvData *walrcv = WalRcv;
637 : int state;
638 :
639 0 : SpinLockAcquire(&walrcv->mutex);
640 0 : state = walrcv->walRcvState;
641 0 : if (state != WALRCV_STREAMING)
642 : {
643 0 : SpinLockRelease(&walrcv->mutex);
644 0 : if (state == WALRCV_STOPPING)
645 0 : proc_exit(0);
646 : else
647 0 : elog(FATAL, "unexpected walreceiver state");
648 : }
649 0 : walrcv->walRcvState = WALRCV_WAITING;
650 0 : walrcv->receiveStart = InvalidXLogRecPtr;
651 0 : walrcv->receiveStartTLI = 0;
652 0 : SpinLockRelease(&walrcv->mutex);
653 :
654 0 : if (update_process_title)
655 0 : set_ps_display("idle", false);
656 :
657 : /*
658 : * nudge startup process to notice that we've stopped streaming and are
659 : * now waiting for instructions.
660 : */
661 0 : WakeupRecovery();
662 : for (;;)
663 : {
664 0 : ResetLatch(walrcv->latch);
665 :
666 : /*
667 : * Emergency bailout if postmaster has died. This is to avoid the
668 : * necessity for manual cleanup of all postmaster children.
669 : */
670 0 : if (!PostmasterIsAlive())
671 0 : exit(1);
672 :
673 0 : ProcessWalRcvInterrupts();
674 :
675 0 : SpinLockAcquire(&walrcv->mutex);
676 0 : Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
677 : walrcv->walRcvState == WALRCV_WAITING ||
678 : walrcv->walRcvState == WALRCV_STOPPING);
679 0 : if (walrcv->walRcvState == WALRCV_RESTARTING)
680 : {
681 : /* we don't expect primary_conninfo to change */
682 0 : *startpoint = walrcv->receiveStart;
683 0 : *startpointTLI = walrcv->receiveStartTLI;
684 0 : walrcv->walRcvState = WALRCV_STREAMING;
685 0 : SpinLockRelease(&walrcv->mutex);
686 0 : break;
687 : }
688 0 : if (walrcv->walRcvState == WALRCV_STOPPING)
689 : {
690 : /*
691 : * We should've received SIGTERM if the startup process wants us
692 : * to die, but might as well check it here too.
693 : */
694 0 : SpinLockRelease(&walrcv->mutex);
695 0 : exit(1);
696 : }
697 0 : SpinLockRelease(&walrcv->mutex);
698 :
699 0 : WaitLatch(walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0,
700 : WAIT_EVENT_WAL_RECEIVER_WAIT_START);
701 0 : }
702 :
703 0 : if (update_process_title)
704 : {
705 : char activitymsg[50];
706 :
707 0 : snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
708 0 : (uint32) (*startpoint >> 32),
709 0 : (uint32) *startpoint);
710 0 : set_ps_display(activitymsg, false);
711 : }
712 0 : }
713 :
714 : /*
715 : * Fetch any missing timeline history files between 'first' and 'last'
716 : * (inclusive) from the server.
717 : */
718 : static void
719 0 : WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
720 : {
721 : TimeLineID tli;
722 :
723 0 : for (tli = first; tli <= last; tli++)
724 : {
725 : /* there's no history file for timeline 1 */
726 0 : if (tli != 1 && !existsTimeLineHistory(tli))
727 : {
728 : char *fname;
729 : char *content;
730 : int len;
731 : char expectedfname[MAXFNAMELEN];
732 :
733 0 : ereport(LOG,
734 : (errmsg("fetching timeline history file for timeline %u from primary server",
735 : tli)));
736 :
737 0 : EnableWalRcvImmediateExit();
738 0 : walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
739 0 : DisableWalRcvImmediateExit();
740 :
741 : /*
742 : * Check that the filename on the master matches what we
743 : * calculated ourselves. This is just a sanity check, it should
744 : * always match.
745 : */
746 0 : TLHistoryFileName(expectedfname, tli);
747 0 : if (strcmp(fname, expectedfname) != 0)
748 0 : ereport(ERROR,
749 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
750 : errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
751 : tli)));
752 :
753 : /*
754 : * Write the file to pg_wal.
755 : */
756 0 : writeTimeLineHistoryFile(tli, content, len);
757 :
758 0 : pfree(fname);
759 0 : pfree(content);
760 : }
761 : }
762 0 : }
763 :
764 : /*
765 : * Mark us as STOPPED in shared memory at exit.
766 : */
767 : static void
768 0 : WalRcvDie(int code, Datum arg)
769 : {
770 0 : WalRcvData *walrcv = WalRcv;
771 :
772 : /* Ensure that all WAL records received are flushed to disk */
773 0 : XLogWalRcvFlush(true);
774 :
775 0 : walrcv->latch = NULL;
776 :
777 0 : SpinLockAcquire(&walrcv->mutex);
778 0 : Assert(walrcv->walRcvState == WALRCV_STREAMING ||
779 : walrcv->walRcvState == WALRCV_RESTARTING ||
780 : walrcv->walRcvState == WALRCV_STARTING ||
781 : walrcv->walRcvState == WALRCV_WAITING ||
782 : walrcv->walRcvState == WALRCV_STOPPING);
783 0 : Assert(walrcv->pid == MyProcPid);
784 0 : walrcv->walRcvState = WALRCV_STOPPED;
785 0 : walrcv->pid = 0;
786 0 : walrcv->ready_to_display = false;
787 0 : SpinLockRelease(&walrcv->mutex);
788 :
789 : /* Terminate the connection gracefully. */
790 0 : if (wrconn != NULL)
791 0 : walrcv_disconnect(wrconn);
792 :
793 : /* Wake up the startup process to notice promptly that we're gone */
794 0 : WakeupRecovery();
795 0 : }
796 :
797 : /* SIGHUP: set flag to re-read config file at next convenient time */
798 : static void
799 0 : WalRcvSigHupHandler(SIGNAL_ARGS)
800 : {
801 0 : got_SIGHUP = true;
802 0 : }
803 :
804 :
805 : /* SIGUSR1: used by latch mechanism */
806 : static void
807 0 : WalRcvSigUsr1Handler(SIGNAL_ARGS)
808 : {
809 0 : int save_errno = errno;
810 :
811 0 : latch_sigusr1_handler();
812 :
813 0 : errno = save_errno;
814 0 : }
815 :
816 : /* SIGTERM: set flag for main loop, or shutdown immediately if safe */
817 : static void
818 0 : WalRcvShutdownHandler(SIGNAL_ARGS)
819 : {
820 0 : int save_errno = errno;
821 :
822 0 : got_SIGTERM = true;
823 :
824 0 : if (WalRcv->latch)
825 0 : SetLatch(WalRcv->latch);
826 :
827 : /* Don't joggle the elbow of proc_exit */
828 0 : if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
829 0 : ProcessWalRcvInterrupts();
830 :
831 0 : errno = save_errno;
832 0 : }
833 :
834 : /*
835 : * WalRcvQuickDieHandler() occurs when signalled SIGQUIT by the postmaster.
836 : *
837 : * Some backend has bought the farm, so we need to stop what we're doing and
838 : * exit.
839 : */
840 : static void
841 0 : WalRcvQuickDieHandler(SIGNAL_ARGS)
842 : {
843 0 : PG_SETMASK(&BlockSig);
844 :
845 : /*
846 : * We DO NOT want to run proc_exit() callbacks -- we're here because
847 : * shared memory may be corrupted, so we don't want to try to clean up our
848 : * transaction. Just nail the windows shut and get out of town. Now that
849 : * there's an atexit callback to prevent third-party code from breaking
850 : * things by calling exit() directly, we have to reset the callbacks
851 : * explicitly to make this work as intended.
852 : */
853 0 : on_exit_reset();
854 :
855 : /*
856 : * Note we do exit(2) not exit(0). This is to force the postmaster into a
857 : * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
858 : * backend. This is necessary precisely because we don't clean up our
859 : * shared memory state. (The "dead man switch" mechanism in pmsignal.c
860 : * should ensure the postmaster sees this as a crash, too, but no harm in
861 : * being doubly sure.)
862 : */
863 0 : exit(2);
864 : }
865 :
866 : /*
867 : * Accept the message from XLOG stream, and process it.
868 : */
869 : static void
870 0 : XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
871 : {
872 : int hdrlen;
873 : XLogRecPtr dataStart;
874 : XLogRecPtr walEnd;
875 : TimestampTz sendTime;
876 : bool replyRequested;
877 :
878 0 : resetStringInfo(&incoming_message);
879 :
880 0 : switch (type)
881 : {
882 : case 'w': /* WAL records */
883 : {
884 : /* copy message to StringInfo */
885 0 : hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
886 0 : if (len < hdrlen)
887 0 : ereport(ERROR,
888 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
889 : errmsg_internal("invalid WAL message received from primary")));
890 0 : appendBinaryStringInfo(&incoming_message, buf, hdrlen);
891 :
892 : /* read the fields */
893 0 : dataStart = pq_getmsgint64(&incoming_message);
894 0 : walEnd = pq_getmsgint64(&incoming_message);
895 0 : sendTime = pq_getmsgint64(&incoming_message);
896 0 : ProcessWalSndrMessage(walEnd, sendTime);
897 :
898 0 : buf += hdrlen;
899 0 : len -= hdrlen;
900 0 : XLogWalRcvWrite(buf, len, dataStart);
901 0 : break;
902 : }
903 : case 'k': /* Keepalive */
904 : {
905 : /* copy message to StringInfo */
906 0 : hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
907 0 : if (len != hdrlen)
908 0 : ereport(ERROR,
909 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
910 : errmsg_internal("invalid keepalive message received from primary")));
911 0 : appendBinaryStringInfo(&incoming_message, buf, hdrlen);
912 :
913 : /* read the fields */
914 0 : walEnd = pq_getmsgint64(&incoming_message);
915 0 : sendTime = pq_getmsgint64(&incoming_message);
916 0 : replyRequested = pq_getmsgbyte(&incoming_message);
917 :
918 0 : ProcessWalSndrMessage(walEnd, sendTime);
919 :
920 : /* If the primary requested a reply, send one immediately */
921 0 : if (replyRequested)
922 0 : XLogWalRcvSendReply(true, false);
923 0 : break;
924 : }
925 : default:
926 0 : ereport(ERROR,
927 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
928 : errmsg_internal("invalid replication message type %d",
929 : type)));
930 : }
931 0 : }
932 :
933 : /*
934 : * Write XLOG data to disk.
935 : */
936 : static void
937 0 : XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
938 : {
939 : int startoff;
940 : int byteswritten;
941 :
942 0 : while (nbytes > 0)
943 : {
944 : int segbytes;
945 :
946 0 : if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo))
947 : {
948 : bool use_existent;
949 :
950 : /*
951 : * fsync() and close current file before we switch to next one. We
952 : * would otherwise have to reopen this file to fsync it later
953 : */
954 0 : if (recvFile >= 0)
955 : {
956 : char xlogfname[MAXFNAMELEN];
957 :
958 0 : XLogWalRcvFlush(false);
959 :
960 : /*
961 : * XLOG segment files will be re-read by recovery in startup
962 : * process soon, so we don't advise the OS to release cache
963 : * pages associated with the file like XLogFileClose() does.
964 : */
965 0 : if (close(recvFile) != 0)
966 0 : ereport(PANIC,
967 : (errcode_for_file_access(),
968 : errmsg("could not close log segment %s: %m",
969 : XLogFileNameP(recvFileTLI, recvSegNo))));
970 :
971 : /*
972 : * Create .done file forcibly to prevent the streamed segment
973 : * from being archived later.
974 : */
975 0 : XLogFileName(xlogfname, recvFileTLI, recvSegNo);
976 0 : if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
977 0 : XLogArchiveForceDone(xlogfname);
978 : else
979 0 : XLogArchiveNotify(xlogfname);
980 : }
981 0 : recvFile = -1;
982 :
983 : /* Create/use new log file */
984 0 : XLByteToSeg(recptr, recvSegNo);
985 0 : use_existent = true;
986 0 : recvFile = XLogFileInit(recvSegNo, &use_existent, true);
987 0 : recvFileTLI = ThisTimeLineID;
988 0 : recvOff = 0;
989 : }
990 :
991 : /* Calculate the start offset of the received logs */
992 0 : startoff = recptr % XLogSegSize;
993 :
994 0 : if (startoff + nbytes > XLogSegSize)
995 0 : segbytes = XLogSegSize - startoff;
996 : else
997 0 : segbytes = nbytes;
998 :
999 : /* Need to seek in the file? */
1000 0 : if (recvOff != startoff)
1001 : {
1002 0 : if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
1003 0 : ereport(PANIC,
1004 : (errcode_for_file_access(),
1005 : errmsg("could not seek in log segment %s to offset %u: %m",
1006 : XLogFileNameP(recvFileTLI, recvSegNo),
1007 : startoff)));
1008 0 : recvOff = startoff;
1009 : }
1010 :
1011 : /* OK to write the logs */
1012 0 : errno = 0;
1013 :
1014 0 : byteswritten = write(recvFile, buf, segbytes);
1015 0 : if (byteswritten <= 0)
1016 : {
1017 : /* if write didn't set errno, assume no disk space */
1018 0 : if (errno == 0)
1019 0 : errno = ENOSPC;
1020 0 : ereport(PANIC,
1021 : (errcode_for_file_access(),
1022 : errmsg("could not write to log segment %s "
1023 : "at offset %u, length %lu: %m",
1024 : XLogFileNameP(recvFileTLI, recvSegNo),
1025 : recvOff, (unsigned long) segbytes)));
1026 : }
1027 :
1028 : /* Update state for write */
1029 0 : recptr += byteswritten;
1030 :
1031 0 : recvOff += byteswritten;
1032 0 : nbytes -= byteswritten;
1033 0 : buf += byteswritten;
1034 :
1035 0 : LogstreamResult.Write = recptr;
1036 : }
1037 0 : }
1038 :
1039 : /*
1040 : * Flush the log to disk.
1041 : *
1042 : * If we're in the midst of dying, it's unwise to do anything that might throw
1043 : * an error, so we skip sending a reply in that case.
1044 : */
1045 : static void
1046 0 : XLogWalRcvFlush(bool dying)
1047 : {
1048 0 : if (LogstreamResult.Flush < LogstreamResult.Write)
1049 : {
1050 0 : WalRcvData *walrcv = WalRcv;
1051 :
1052 0 : issue_xlog_fsync(recvFile, recvSegNo);
1053 :
1054 0 : LogstreamResult.Flush = LogstreamResult.Write;
1055 :
1056 : /* Update shared-memory status */
1057 0 : SpinLockAcquire(&walrcv->mutex);
1058 0 : if (walrcv->receivedUpto < LogstreamResult.Flush)
1059 : {
1060 0 : walrcv->latestChunkStart = walrcv->receivedUpto;
1061 0 : walrcv->receivedUpto = LogstreamResult.Flush;
1062 0 : walrcv->receivedTLI = ThisTimeLineID;
1063 : }
1064 0 : SpinLockRelease(&walrcv->mutex);
1065 :
1066 : /* Signal the startup process and walsender that new WAL has arrived */
1067 0 : WakeupRecovery();
1068 0 : if (AllowCascadeReplication())
1069 0 : WalSndWakeup();
1070 :
1071 : /* Report XLOG streaming progress in PS display */
1072 0 : if (update_process_title)
1073 : {
1074 : char activitymsg[50];
1075 :
1076 0 : snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1077 0 : (uint32) (LogstreamResult.Write >> 32),
1078 0 : (uint32) LogstreamResult.Write);
1079 0 : set_ps_display(activitymsg, false);
1080 : }
1081 :
1082 : /* Also let the master know that we made some progress */
1083 0 : if (!dying)
1084 : {
1085 0 : XLogWalRcvSendReply(false, false);
1086 0 : XLogWalRcvSendHSFeedback(false);
1087 : }
1088 : }
1089 0 : }
1090 :
1091 : /*
1092 : * Send reply message to primary, indicating our current WAL locations, oldest
1093 : * xmin and the current time.
1094 : *
1095 : * If 'force' is not set, the message is only sent if enough time has
1096 : * passed since last status update to reach wal_receiver_status_interval.
1097 : * If wal_receiver_status_interval is disabled altogether and 'force' is
1098 : * false, this is a no-op.
1099 : *
1100 : * If 'requestReply' is true, requests the server to reply immediately upon
1101 : * receiving this message. This is used for heartbearts, when approaching
1102 : * wal_receiver_timeout.
1103 : */
1104 : static void
1105 0 : XLogWalRcvSendReply(bool force, bool requestReply)
1106 : {
1107 : static XLogRecPtr writePtr = 0;
1108 : static XLogRecPtr flushPtr = 0;
1109 : XLogRecPtr applyPtr;
1110 : static TimestampTz sendTime = 0;
1111 : TimestampTz now;
1112 :
1113 : /*
1114 : * If the user doesn't want status to be reported to the master, be sure
1115 : * to exit before doing anything at all.
1116 : */
1117 0 : if (!force && wal_receiver_status_interval <= 0)
1118 0 : return;
1119 :
1120 : /* Get current timestamp. */
1121 0 : now = GetCurrentTimestamp();
1122 :
1123 : /*
1124 : * We can compare the write and flush positions to the last message we
1125 : * sent without taking any lock, but the apply position requires a spin
1126 : * lock, so we don't check that unless something else has changed or 10
1127 : * seconds have passed. This means that the apply WAL location will
1128 : * appear, from the master's point of view, to lag slightly, but since
1129 : * this is only for reporting purposes and only on idle systems, that's
1130 : * probably OK.
1131 : */
1132 0 : if (!force
1133 0 : && writePtr == LogstreamResult.Write
1134 0 : && flushPtr == LogstreamResult.Flush
1135 0 : && !TimestampDifferenceExceeds(sendTime, now,
1136 : wal_receiver_status_interval * 1000))
1137 0 : return;
1138 0 : sendTime = now;
1139 :
1140 : /* Construct a new message */
1141 0 : writePtr = LogstreamResult.Write;
1142 0 : flushPtr = LogstreamResult.Flush;
1143 0 : applyPtr = GetXLogReplayRecPtr(NULL);
1144 :
1145 0 : resetStringInfo(&reply_message);
1146 0 : pq_sendbyte(&reply_message, 'r');
1147 0 : pq_sendint64(&reply_message, writePtr);
1148 0 : pq_sendint64(&reply_message, flushPtr);
1149 0 : pq_sendint64(&reply_message, applyPtr);
1150 0 : pq_sendint64(&reply_message, GetCurrentTimestamp());
1151 0 : pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1152 :
1153 : /* Send it */
1154 0 : elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1155 : (uint32) (writePtr >> 32), (uint32) writePtr,
1156 : (uint32) (flushPtr >> 32), (uint32) flushPtr,
1157 : (uint32) (applyPtr >> 32), (uint32) applyPtr,
1158 : requestReply ? " (reply requested)" : "");
1159 :
1160 0 : walrcv_send(wrconn, reply_message.data, reply_message.len);
1161 : }
1162 :
1163 : /*
1164 : * Send hot standby feedback message to primary, plus the current time,
1165 : * in case they don't have a watch.
1166 : *
1167 : * If the user disables feedback, send one final message to tell sender
1168 : * to forget about the xmin on this standby. We also send this message
1169 : * on first connect because a previous connection might have set xmin
1170 : * on a replication slot. (If we're not using a slot it's harmless to
1171 : * send a feedback message explicitly setting InvalidTransactionId).
1172 : */
1173 : static void
1174 0 : XLogWalRcvSendHSFeedback(bool immed)
1175 : {
1176 : TimestampTz now;
1177 : TransactionId nextXid;
1178 : uint32 xmin_epoch,
1179 : catalog_xmin_epoch;
1180 : TransactionId xmin,
1181 : catalog_xmin;
1182 : static TimestampTz sendTime = 0;
1183 :
1184 : /* initially true so we always send at least one feedback message */
1185 : static bool master_has_standby_xmin = true;
1186 :
1187 : /*
1188 : * If the user doesn't want status to be reported to the master, be sure
1189 : * to exit before doing anything at all.
1190 : */
1191 0 : if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) &&
1192 0 : !master_has_standby_xmin)
1193 0 : return;
1194 :
1195 : /* Get current timestamp. */
1196 0 : now = GetCurrentTimestamp();
1197 :
1198 0 : if (!immed)
1199 : {
1200 : /*
1201 : * Send feedback at most once per wal_receiver_status_interval.
1202 : */
1203 0 : if (!TimestampDifferenceExceeds(sendTime, now,
1204 : wal_receiver_status_interval * 1000))
1205 0 : return;
1206 0 : sendTime = now;
1207 : }
1208 :
1209 : /*
1210 : * If Hot Standby is not yet accepting connections there is nothing to
1211 : * send. Check this after the interval has expired to reduce number of
1212 : * calls.
1213 : *
1214 : * Bailing out here also ensures that we don't send feedback until we've
1215 : * read our own replication slot state, so we don't tell the master to
1216 : * discard needed xmin or catalog_xmin from any slots that may exist on
1217 : * this replica.
1218 : */
1219 0 : if (!HotStandbyActive())
1220 0 : return;
1221 :
1222 : /*
1223 : * Make the expensive call to get the oldest xmin once we are certain
1224 : * everything else has been checked.
1225 : */
1226 0 : if (hot_standby_feedback)
1227 : {
1228 : TransactionId slot_xmin;
1229 :
1230 : /*
1231 : * Usually GetOldestXmin() would include both global replication slot
1232 : * xmin and catalog_xmin in its calculations, but we want to derive
1233 : * separate values for each of those. So we ask for an xmin that
1234 : * excludes the catalog_xmin.
1235 : */
1236 0 : xmin = GetOldestXmin(NULL,
1237 : PROCARRAY_FLAGS_DEFAULT | PROCARRAY_SLOTS_XMIN);
1238 :
1239 0 : ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin);
1240 :
1241 0 : if (TransactionIdIsValid(slot_xmin) &&
1242 0 : TransactionIdPrecedes(slot_xmin, xmin))
1243 0 : xmin = slot_xmin;
1244 : }
1245 : else
1246 : {
1247 0 : xmin = InvalidTransactionId;
1248 0 : catalog_xmin = InvalidTransactionId;
1249 : }
1250 :
1251 : /*
1252 : * Get epoch and adjust if nextXid and oldestXmin are different sides of
1253 : * the epoch boundary.
1254 : */
1255 0 : GetNextXidAndEpoch(&nextXid, &xmin_epoch);
1256 0 : catalog_xmin_epoch = xmin_epoch;
1257 0 : if (nextXid < xmin)
1258 0 : xmin_epoch--;
1259 0 : if (nextXid < catalog_xmin)
1260 0 : catalog_xmin_epoch--;
1261 :
1262 0 : elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1263 : xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1264 :
1265 : /* Construct the message and send it. */
1266 0 : resetStringInfo(&reply_message);
1267 0 : pq_sendbyte(&reply_message, 'h');
1268 0 : pq_sendint64(&reply_message, GetCurrentTimestamp());
1269 0 : pq_sendint(&reply_message, xmin, 4);
1270 0 : pq_sendint(&reply_message, xmin_epoch, 4);
1271 0 : pq_sendint(&reply_message, catalog_xmin, 4);
1272 0 : pq_sendint(&reply_message, catalog_xmin_epoch, 4);
1273 0 : walrcv_send(wrconn, reply_message.data, reply_message.len);
1274 0 : if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1275 0 : master_has_standby_xmin = true;
1276 : else
1277 0 : master_has_standby_xmin = false;
1278 : }
1279 :
1280 : /*
1281 : * Update shared memory status upon receiving a message from primary.
1282 : *
1283 : * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest
1284 : * message, reported by primary.
1285 : */
1286 : static void
1287 0 : ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
1288 : {
1289 0 : WalRcvData *walrcv = WalRcv;
1290 :
1291 0 : TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1292 :
1293 : /* Update shared-memory status */
1294 0 : SpinLockAcquire(&walrcv->mutex);
1295 0 : if (walrcv->latestWalEnd < walEnd)
1296 0 : walrcv->latestWalEndTime = sendTime;
1297 0 : walrcv->latestWalEnd = walEnd;
1298 0 : walrcv->lastMsgSendTime = sendTime;
1299 0 : walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1300 0 : SpinLockRelease(&walrcv->mutex);
1301 :
1302 0 : if (log_min_messages <= DEBUG2)
1303 : {
1304 : char *sendtime;
1305 : char *receipttime;
1306 : int applyDelay;
1307 :
1308 : /* Copy because timestamptz_to_str returns a static buffer */
1309 0 : sendtime = pstrdup(timestamptz_to_str(sendTime));
1310 0 : receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1311 0 : applyDelay = GetReplicationApplyDelay();
1312 :
1313 : /* apply delay is not available */
1314 0 : if (applyDelay == -1)
1315 0 : elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1316 : sendtime,
1317 : receipttime,
1318 : GetReplicationTransferLatency());
1319 : else
1320 0 : elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1321 : sendtime,
1322 : receipttime,
1323 : applyDelay,
1324 : GetReplicationTransferLatency());
1325 :
1326 0 : pfree(sendtime);
1327 0 : pfree(receipttime);
1328 : }
1329 0 : }
1330 :
1331 : /*
1332 : * Wake up the walreceiver main loop.
1333 : *
1334 : * This is called by the startup process whenever interesting xlog records
1335 : * are applied, so that walreceiver can check if it needs to send an apply
1336 : * notification back to the master which may be waiting in a COMMIT with
1337 : * synchronous_commit = remote_apply.
1338 : */
1339 : void
1340 0 : WalRcvForceReply(void)
1341 : {
1342 0 : WalRcv->force_reply = true;
1343 0 : if (WalRcv->latch)
1344 0 : SetLatch(WalRcv->latch);
1345 0 : }
1346 :
1347 : /*
1348 : * Return a string constant representing the state. This is used
1349 : * in system functions and views, and should *not* be translated.
1350 : */
1351 : static const char *
1352 0 : WalRcvGetStateString(WalRcvState state)
1353 : {
1354 0 : switch (state)
1355 : {
1356 : case WALRCV_STOPPED:
1357 0 : return "stopped";
1358 : case WALRCV_STARTING:
1359 0 : return "starting";
1360 : case WALRCV_STREAMING:
1361 0 : return "streaming";
1362 : case WALRCV_WAITING:
1363 0 : return "waiting";
1364 : case WALRCV_RESTARTING:
1365 0 : return "restarting";
1366 : case WALRCV_STOPPING:
1367 0 : return "stopping";
1368 : }
1369 0 : return "UNKNOWN";
1370 : }
1371 :
1372 : /*
1373 : * Returns activity of WAL receiver, including pid, state and xlog locations
1374 : * received from the WAL sender of another server.
1375 : */
1376 : Datum
1377 0 : pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
1378 : {
1379 : TupleDesc tupdesc;
1380 : Datum *values;
1381 : bool *nulls;
1382 : int pid;
1383 : bool ready_to_display;
1384 : WalRcvState state;
1385 : XLogRecPtr receive_start_lsn;
1386 : TimeLineID receive_start_tli;
1387 : XLogRecPtr received_lsn;
1388 : TimeLineID received_tli;
1389 : TimestampTz last_send_time;
1390 : TimestampTz last_receipt_time;
1391 : XLogRecPtr latest_end_lsn;
1392 : TimestampTz latest_end_time;
1393 : char *slotname;
1394 : char *conninfo;
1395 :
1396 : /* Take a lock to ensure value consistency */
1397 0 : SpinLockAcquire(&WalRcv->mutex);
1398 0 : pid = (int) WalRcv->pid;
1399 0 : ready_to_display = WalRcv->ready_to_display;
1400 0 : state = WalRcv->walRcvState;
1401 0 : receive_start_lsn = WalRcv->receiveStart;
1402 0 : receive_start_tli = WalRcv->receiveStartTLI;
1403 0 : received_lsn = WalRcv->receivedUpto;
1404 0 : received_tli = WalRcv->receivedTLI;
1405 0 : last_send_time = WalRcv->lastMsgSendTime;
1406 0 : last_receipt_time = WalRcv->lastMsgReceiptTime;
1407 0 : latest_end_lsn = WalRcv->latestWalEnd;
1408 0 : latest_end_time = WalRcv->latestWalEndTime;
1409 0 : slotname = pstrdup(WalRcv->slotname);
1410 0 : conninfo = pstrdup(WalRcv->conninfo);
1411 0 : SpinLockRelease(&WalRcv->mutex);
1412 :
1413 : /*
1414 : * No WAL receiver (or not ready yet), just return a tuple with NULL
1415 : * values
1416 : */
1417 0 : if (pid == 0 || !ready_to_display)
1418 0 : PG_RETURN_NULL();
1419 :
1420 : /* determine result type */
1421 0 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1422 0 : elog(ERROR, "return type must be a row type");
1423 :
1424 0 : values = palloc0(sizeof(Datum) * tupdesc->natts);
1425 0 : nulls = palloc0(sizeof(bool) * tupdesc->natts);
1426 :
1427 : /* Fetch values */
1428 0 : values[0] = Int32GetDatum(pid);
1429 :
1430 0 : if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
1431 : {
1432 : /*
1433 : * Only superusers can see details. Other users only get the pid value
1434 : * to know whether it is a WAL receiver, but no details.
1435 : */
1436 0 : MemSet(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1437 : }
1438 : else
1439 : {
1440 0 : values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1441 :
1442 0 : if (XLogRecPtrIsInvalid(receive_start_lsn))
1443 0 : nulls[2] = true;
1444 : else
1445 0 : values[2] = LSNGetDatum(receive_start_lsn);
1446 0 : values[3] = Int32GetDatum(receive_start_tli);
1447 0 : if (XLogRecPtrIsInvalid(received_lsn))
1448 0 : nulls[4] = true;
1449 : else
1450 0 : values[4] = LSNGetDatum(received_lsn);
1451 0 : values[5] = Int32GetDatum(received_tli);
1452 0 : if (last_send_time == 0)
1453 0 : nulls[6] = true;
1454 : else
1455 0 : values[6] = TimestampTzGetDatum(last_send_time);
1456 0 : if (last_receipt_time == 0)
1457 0 : nulls[7] = true;
1458 : else
1459 0 : values[7] = TimestampTzGetDatum(last_receipt_time);
1460 0 : if (XLogRecPtrIsInvalid(latest_end_lsn))
1461 0 : nulls[8] = true;
1462 : else
1463 0 : values[8] = LSNGetDatum(latest_end_lsn);
1464 0 : if (latest_end_time == 0)
1465 0 : nulls[9] = true;
1466 : else
1467 0 : values[9] = TimestampTzGetDatum(latest_end_time);
1468 0 : if (*slotname == '\0')
1469 0 : nulls[10] = true;
1470 : else
1471 0 : values[10] = CStringGetTextDatum(slotname);
1472 0 : if (*conninfo == '\0')
1473 0 : nulls[11] = true;
1474 : else
1475 0 : values[11] = CStringGetTextDatum(conninfo);
1476 : }
1477 :
1478 : /* Returns the record as Datum */
1479 0 : PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
1480 : }
|