Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * launcher.c
3 : * PostgreSQL logical replication worker launcher process
4 : *
5 : * Copyright (c) 2016-2017, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/launcher.c
9 : *
10 : * NOTES
11 : * This module contains the logical replication worker launcher which
12 : * uses the background worker infrastructure to start the logical
13 : * replication workers for every enabled subscription.
14 : *
15 : *-------------------------------------------------------------------------
16 : */
17 :
18 : #include "postgres.h"
19 :
20 : #include "funcapi.h"
21 : #include "miscadmin.h"
22 : #include "pgstat.h"
23 :
24 : #include "access/heapam.h"
25 : #include "access/htup.h"
26 : #include "access/htup_details.h"
27 : #include "access/xact.h"
28 :
29 : #include "catalog/pg_subscription.h"
30 : #include "catalog/pg_subscription_rel.h"
31 :
32 : #include "libpq/pqsignal.h"
33 :
34 : #include "postmaster/bgworker.h"
35 : #include "postmaster/fork_process.h"
36 : #include "postmaster/postmaster.h"
37 :
38 : #include "replication/logicallauncher.h"
39 : #include "replication/logicalworker.h"
40 : #include "replication/slot.h"
41 : #include "replication/walreceiver.h"
42 : #include "replication/worker_internal.h"
43 :
44 : #include "storage/ipc.h"
45 : #include "storage/proc.h"
46 : #include "storage/procarray.h"
47 : #include "storage/procsignal.h"
48 :
49 : #include "tcop/tcopprot.h"
50 :
51 : #include "utils/memutils.h"
52 : #include "utils/pg_lsn.h"
53 : #include "utils/ps_status.h"
54 : #include "utils/timeout.h"
55 : #include "utils/snapmgr.h"
56 :
57 : /* max sleep time between cycles (3min) */
58 : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
59 :
60 : int max_logical_replication_workers = 4;
61 : int max_sync_workers_per_subscription = 2;
62 :
63 : LogicalRepWorker *MyLogicalRepWorker = NULL;
64 :
65 : typedef struct LogicalRepCtxStruct
66 : {
67 : /* Supervisor process. */
68 : pid_t launcher_pid;
69 :
70 : /* Background workers. */
71 : LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
72 : } LogicalRepCtxStruct;
73 :
74 : LogicalRepCtxStruct *LogicalRepCtx;
75 :
76 : typedef struct LogicalRepWorkerId
77 : {
78 : Oid subid;
79 : Oid relid;
80 : } LogicalRepWorkerId;
81 :
82 : static List *on_commit_stop_workers = NIL;
83 :
84 : static void ApplyLauncherWakeup(void);
85 : static void logicalrep_launcher_onexit(int code, Datum arg);
86 : static void logicalrep_worker_onexit(int code, Datum arg);
87 : static void logicalrep_worker_detach(void);
88 : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
89 :
90 : /* Flags set by signal handlers */
91 : static volatile sig_atomic_t got_SIGHUP = false;
92 :
93 : static bool on_commit_launcher_wakeup = false;
94 :
95 : Datum pg_stat_get_subscription(PG_FUNCTION_ARGS);
96 :
97 :
98 : /*
99 : * Load the list of subscriptions.
100 : *
101 : * Only the fields interesting for worker start/stop functions are filled for
102 : * each subscription.
103 : */
104 : static List *
105 160 : get_subscription_list(void)
106 : {
107 160 : List *res = NIL;
108 : Relation rel;
109 : HeapScanDesc scan;
110 : HeapTuple tup;
111 : MemoryContext resultcxt;
112 :
113 : /* This is the context that we will allocate our output data in */
114 160 : resultcxt = CurrentMemoryContext;
115 :
116 : /*
117 : * Start a transaction so we can access pg_database, and get a snapshot.
118 : * We don't have a use for the snapshot itself, but we're interested in
119 : * the secondary effect that it sets RecentGlobalXmin. (This is critical
120 : * for anything that reads heap pages, because HOT may decide to prune
121 : * them even if the process doesn't attempt to modify any tuples.)
122 : */
123 160 : StartTransactionCommand();
124 160 : (void) GetTransactionSnapshot();
125 :
126 160 : rel = heap_open(SubscriptionRelationId, AccessShareLock);
127 160 : scan = heap_beginscan_catalog(rel, 0, NULL);
128 :
129 322 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
130 : {
131 2 : Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
132 : Subscription *sub;
133 : MemoryContext oldcxt;
134 :
135 : /*
136 : * Allocate our results in the caller's context, not the
137 : * transaction's. We do this inside the loop, and restore the original
138 : * context at the end, so that leaky things like heap_getnext() are
139 : * not called in a potentially long-lived context.
140 : */
141 2 : oldcxt = MemoryContextSwitchTo(resultcxt);
142 :
143 2 : sub = (Subscription *) palloc0(sizeof(Subscription));
144 2 : sub->oid = HeapTupleGetOid(tup);
145 2 : sub->dbid = subform->subdbid;
146 2 : sub->owner = subform->subowner;
147 2 : sub->enabled = subform->subenabled;
148 2 : sub->name = pstrdup(NameStr(subform->subname));
149 : /* We don't fill fields we are not interested in. */
150 :
151 2 : res = lappend(res, sub);
152 2 : MemoryContextSwitchTo(oldcxt);
153 : }
154 :
155 160 : heap_endscan(scan);
156 160 : heap_close(rel, AccessShareLock);
157 :
158 160 : CommitTransactionCommand();
159 :
160 160 : return res;
161 : }
162 :
163 : /*
164 : * Wait for a background worker to start up and attach to the shmem context.
165 : *
166 : * This is only needed for cleaning up the shared memory in case the worker
167 : * fails to attach.
168 : */
169 : static void
170 0 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
171 : BackgroundWorkerHandle *handle)
172 : {
173 : BgwHandleStatus status;
174 : int rc;
175 : uint16 generation;
176 :
177 : /* Remember generation for future identification. */
178 0 : generation = worker->generation;
179 :
180 : for (;;)
181 : {
182 : pid_t pid;
183 :
184 0 : CHECK_FOR_INTERRUPTS();
185 :
186 0 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
187 :
188 : /* Worker either died or has started; no need to do anything. */
189 0 : if (!worker->in_use || worker->proc)
190 : {
191 0 : LWLockRelease(LogicalRepWorkerLock);
192 0 : return;
193 : }
194 :
195 0 : LWLockRelease(LogicalRepWorkerLock);
196 :
197 : /* Check if worker has died before attaching, and clean up after it. */
198 0 : status = GetBackgroundWorkerPid(handle, &pid);
199 :
200 0 : if (status == BGWH_STOPPED)
201 : {
202 0 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
203 : /* Ensure that this was indeed the worker we waited for. */
204 0 : if (generation == worker->generation)
205 0 : logicalrep_worker_cleanup(worker);
206 0 : LWLockRelease(LogicalRepWorkerLock);
207 0 : return;
208 : }
209 :
210 : /*
211 : * We need timeout because we generally don't get notified via latch
212 : * about the worker attach. But we don't expect to have to wait long.
213 : */
214 0 : rc = WaitLatch(MyLatch,
215 : WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
216 : 10L, WAIT_EVENT_BGWORKER_STARTUP);
217 :
218 : /* emergency bailout if postmaster has died */
219 0 : if (rc & WL_POSTMASTER_DEATH)
220 0 : proc_exit(1);
221 :
222 0 : if (rc & WL_LATCH_SET)
223 : {
224 0 : ResetLatch(MyLatch);
225 0 : CHECK_FOR_INTERRUPTS();
226 : }
227 0 : }
228 :
229 : return;
230 : }
231 :
232 : /*
233 : * Walks the workers array and searches for one that matches given
234 : * subscription id and relid.
235 : */
236 : LogicalRepWorker *
237 0 : logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
238 : {
239 : int i;
240 0 : LogicalRepWorker *res = NULL;
241 :
242 0 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
243 :
244 : /* Search for attached worker for a given subscription id. */
245 0 : for (i = 0; i < max_logical_replication_workers; i++)
246 : {
247 0 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
248 :
249 0 : if (w->in_use && w->subid == subid && w->relid == relid &&
250 0 : (!only_running || w->proc))
251 : {
252 0 : res = w;
253 0 : break;
254 : }
255 : }
256 :
257 0 : return res;
258 : }
259 :
260 : /*
261 : * Similar to logicalrep_worker_find(), but returns list of all workers for
262 : * the subscription, instead just one.
263 : */
264 : List *
265 6 : logicalrep_workers_find(Oid subid, bool only_running)
266 : {
267 : int i;
268 6 : List *res = NIL;
269 :
270 6 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
271 :
272 : /* Search for attached worker for a given subscription id. */
273 30 : for (i = 0; i < max_logical_replication_workers; i++)
274 : {
275 24 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
276 :
277 24 : if (w->in_use && w->subid == subid && (!only_running || w->proc))
278 0 : res = lappend(res, w);
279 : }
280 :
281 6 : return res;
282 : }
283 :
284 : /*
285 : * Start new apply background worker.
286 : */
287 : void
288 0 : logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
289 : Oid relid)
290 : {
291 : BackgroundWorker bgw;
292 : BackgroundWorkerHandle *bgw_handle;
293 : int i;
294 0 : int slot = 0;
295 0 : LogicalRepWorker *worker = NULL;
296 : int nsyncworkers;
297 : TimestampTz now;
298 :
299 0 : ereport(DEBUG1,
300 : (errmsg("starting logical replication worker for subscription \"%s\"",
301 : subname)));
302 :
303 : /* Report this after the initial starting message for consistency. */
304 0 : if (max_replication_slots == 0)
305 0 : ereport(ERROR,
306 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
307 : errmsg("cannot start logical replication workers when max_replication_slots = 0")));
308 :
309 : /*
310 : * We need to do the modification of the shared memory under lock so that
311 : * we have consistent view.
312 : */
313 0 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
314 :
315 : retry:
316 : /* Find unused worker slot. */
317 0 : for (i = 0; i < max_logical_replication_workers; i++)
318 : {
319 0 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
320 :
321 0 : if (!w->in_use)
322 : {
323 0 : worker = w;
324 0 : slot = i;
325 0 : break;
326 : }
327 : }
328 :
329 0 : nsyncworkers = logicalrep_sync_worker_count(subid);
330 :
331 0 : now = GetCurrentTimestamp();
332 :
333 : /*
334 : * If we didn't find a free slot, try to do garbage collection. The
335 : * reason we do this is because if some worker failed to start up and its
336 : * parent has crashed while waiting, the in_use state was never cleared.
337 : */
338 0 : if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
339 : {
340 0 : bool did_cleanup = false;
341 :
342 0 : for (i = 0; i < max_logical_replication_workers; i++)
343 : {
344 0 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
345 :
346 : /*
347 : * If the worker was marked in use but didn't manage to attach in
348 : * time, clean it up.
349 : */
350 0 : if (w->in_use && !w->proc &&
351 0 : TimestampDifferenceExceeds(w->launch_time, now,
352 : wal_receiver_timeout))
353 : {
354 0 : elog(WARNING,
355 : "logical replication worker for subscription %u took too long to start; canceled",
356 : w->subid);
357 :
358 0 : logicalrep_worker_cleanup(w);
359 0 : did_cleanup = true;
360 : }
361 : }
362 :
363 0 : if (did_cleanup)
364 0 : goto retry;
365 : }
366 :
367 : /*
368 : * If we reached the sync worker limit per subscription, just exit
369 : * silently as we might get here because of an otherwise harmless race
370 : * condition.
371 : */
372 0 : if (nsyncworkers >= max_sync_workers_per_subscription)
373 : {
374 0 : LWLockRelease(LogicalRepWorkerLock);
375 0 : return;
376 : }
377 :
378 : /*
379 : * However if there are no more free worker slots, inform user about it
380 : * before exiting.
381 : */
382 0 : if (worker == NULL)
383 : {
384 0 : LWLockRelease(LogicalRepWorkerLock);
385 0 : ereport(WARNING,
386 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
387 : errmsg("out of logical replication worker slots"),
388 : errhint("You might need to increase max_logical_replication_workers.")));
389 0 : return;
390 : }
391 :
392 : /* Prepare the worker slot. */
393 0 : worker->launch_time = now;
394 0 : worker->in_use = true;
395 0 : worker->generation++;
396 0 : worker->proc = NULL;
397 0 : worker->dbid = dbid;
398 0 : worker->userid = userid;
399 0 : worker->subid = subid;
400 0 : worker->relid = relid;
401 0 : worker->relstate = SUBREL_STATE_UNKNOWN;
402 0 : worker->relstate_lsn = InvalidXLogRecPtr;
403 0 : worker->last_lsn = InvalidXLogRecPtr;
404 0 : TIMESTAMP_NOBEGIN(worker->last_send_time);
405 0 : TIMESTAMP_NOBEGIN(worker->last_recv_time);
406 0 : worker->reply_lsn = InvalidXLogRecPtr;
407 0 : TIMESTAMP_NOBEGIN(worker->reply_time);
408 :
409 0 : LWLockRelease(LogicalRepWorkerLock);
410 :
411 : /* Register the new dynamic worker. */
412 0 : memset(&bgw, 0, sizeof(bgw));
413 0 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
414 : BGWORKER_BACKEND_DATABASE_CONNECTION;
415 0 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
416 0 : snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
417 0 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
418 0 : if (OidIsValid(relid))
419 0 : snprintf(bgw.bgw_name, BGW_MAXLEN,
420 : "logical replication worker for subscription %u sync %u", subid, relid);
421 : else
422 0 : snprintf(bgw.bgw_name, BGW_MAXLEN,
423 : "logical replication worker for subscription %u", subid);
424 :
425 0 : bgw.bgw_restart_time = BGW_NEVER_RESTART;
426 0 : bgw.bgw_notify_pid = MyProcPid;
427 0 : bgw.bgw_main_arg = Int32GetDatum(slot);
428 :
429 0 : if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
430 : {
431 0 : ereport(WARNING,
432 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
433 : errmsg("out of background worker slots"),
434 : errhint("You might need to increase max_worker_processes.")));
435 0 : return;
436 : }
437 :
438 : /* Now wait until it attaches. */
439 0 : WaitForReplicationWorkerAttach(worker, bgw_handle);
440 : }
441 :
442 : /*
443 : * Stop the logical replication worker for subid/relid, if any, and wait until
444 : * it detaches from the slot.
445 : */
446 : void
447 0 : logicalrep_worker_stop(Oid subid, Oid relid)
448 : {
449 : LogicalRepWorker *worker;
450 : uint16 generation;
451 :
452 0 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
453 :
454 0 : worker = logicalrep_worker_find(subid, relid, false);
455 :
456 : /* No worker, nothing to do. */
457 0 : if (!worker)
458 : {
459 0 : LWLockRelease(LogicalRepWorkerLock);
460 0 : return;
461 : }
462 :
463 : /*
464 : * Remember which generation was our worker so we can check if what we see
465 : * is still the same one.
466 : */
467 0 : generation = worker->generation;
468 :
469 : /*
470 : * If we found a worker but it does not have proc set then it is still
471 : * starting up; wait for it to finish starting and then kill it.
472 : */
473 0 : while (worker->in_use && !worker->proc)
474 : {
475 : int rc;
476 :
477 0 : LWLockRelease(LogicalRepWorkerLock);
478 :
479 : /* Wait a bit --- we don't expect to have to wait long. */
480 0 : rc = WaitLatch(MyLatch,
481 : WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
482 : 10L, WAIT_EVENT_BGWORKER_STARTUP);
483 :
484 : /* emergency bailout if postmaster has died */
485 0 : if (rc & WL_POSTMASTER_DEATH)
486 0 : proc_exit(1);
487 :
488 0 : if (rc & WL_LATCH_SET)
489 : {
490 0 : ResetLatch(MyLatch);
491 0 : CHECK_FOR_INTERRUPTS();
492 : }
493 :
494 : /* Recheck worker status. */
495 0 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
496 :
497 : /*
498 : * Check whether the worker slot is no longer used, which would mean
499 : * that the worker has exited, or whether the worker generation is
500 : * different, meaning that a different worker has taken the slot.
501 : */
502 0 : if (!worker->in_use || worker->generation != generation)
503 : {
504 0 : LWLockRelease(LogicalRepWorkerLock);
505 0 : return;
506 : }
507 :
508 : /* Worker has assigned proc, so it has started. */
509 0 : if (worker->proc)
510 0 : break;
511 : }
512 :
513 : /* Now terminate the worker ... */
514 0 : kill(worker->proc->pid, SIGTERM);
515 :
516 : /* ... and wait for it to die. */
517 : for (;;)
518 : {
519 : int rc;
520 :
521 : /* is it gone? */
522 0 : if (!worker->proc || worker->generation != generation)
523 : break;
524 :
525 0 : LWLockRelease(LogicalRepWorkerLock);
526 :
527 : /* Wait a bit --- we don't expect to have to wait long. */
528 0 : rc = WaitLatch(MyLatch,
529 : WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
530 : 10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
531 :
532 : /* emergency bailout if postmaster has died */
533 0 : if (rc & WL_POSTMASTER_DEATH)
534 0 : proc_exit(1);
535 :
536 0 : if (rc & WL_LATCH_SET)
537 : {
538 0 : ResetLatch(MyLatch);
539 0 : CHECK_FOR_INTERRUPTS();
540 : }
541 :
542 0 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
543 0 : }
544 :
545 0 : LWLockRelease(LogicalRepWorkerLock);
546 : }
547 :
548 : /*
549 : * Request worker for specified sub/rel to be stopped on commit.
550 : */
551 : void
552 0 : logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
553 : {
554 : LogicalRepWorkerId *wid;
555 : MemoryContext oldctx;
556 :
557 : /* Make sure we store the info in context that survives until commit. */
558 0 : oldctx = MemoryContextSwitchTo(TopTransactionContext);
559 :
560 0 : wid = palloc(sizeof(LogicalRepWorkerId));
561 0 : wid->subid = subid;
562 0 : wid->relid = relid;
563 :
564 0 : on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
565 :
566 0 : MemoryContextSwitchTo(oldctx);
567 0 : }
568 :
569 : /*
570 : * Wake up (using latch) any logical replication worker for specified sub/rel.
571 : */
572 : void
573 0 : logicalrep_worker_wakeup(Oid subid, Oid relid)
574 : {
575 : LogicalRepWorker *worker;
576 :
577 0 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
578 :
579 0 : worker = logicalrep_worker_find(subid, relid, true);
580 :
581 0 : if (worker)
582 0 : logicalrep_worker_wakeup_ptr(worker);
583 :
584 0 : LWLockRelease(LogicalRepWorkerLock);
585 0 : }
586 :
587 : /*
588 : * Wake up (using latch) the specified logical replication worker.
589 : *
590 : * Caller must hold lock, else worker->proc could change under us.
591 : */
592 : void
593 0 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
594 : {
595 0 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
596 :
597 0 : SetLatch(&worker->proc->procLatch);
598 0 : }
599 :
600 : /*
601 : * Attach to a slot.
602 : */
603 : void
604 0 : logicalrep_worker_attach(int slot)
605 : {
606 : /* Block concurrent access. */
607 0 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
608 :
609 0 : Assert(slot >= 0 && slot < max_logical_replication_workers);
610 0 : MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
611 :
612 0 : if (!MyLogicalRepWorker->in_use)
613 : {
614 0 : LWLockRelease(LogicalRepWorkerLock);
615 0 : ereport(ERROR,
616 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
617 : errmsg("logical replication worker slot %d is empty, cannot attach",
618 : slot)));
619 : }
620 :
621 0 : if (MyLogicalRepWorker->proc)
622 : {
623 0 : LWLockRelease(LogicalRepWorkerLock);
624 0 : ereport(ERROR,
625 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
626 : errmsg("logical replication worker slot %d is already used by "
627 : "another worker, cannot attach", slot)));
628 : }
629 :
630 0 : MyLogicalRepWorker->proc = MyProc;
631 0 : before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
632 :
633 0 : LWLockRelease(LogicalRepWorkerLock);
634 0 : }
635 :
636 : /*
637 : * Detach the worker (cleans up the worker info).
638 : */
639 : static void
640 0 : logicalrep_worker_detach(void)
641 : {
642 : /* Block concurrent access. */
643 0 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
644 :
645 0 : logicalrep_worker_cleanup(MyLogicalRepWorker);
646 :
647 0 : LWLockRelease(LogicalRepWorkerLock);
648 0 : }
649 :
650 : /*
651 : * Clean up worker info.
652 : */
653 : static void
654 0 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
655 : {
656 0 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
657 :
658 0 : worker->in_use = false;
659 0 : worker->proc = NULL;
660 0 : worker->dbid = InvalidOid;
661 0 : worker->userid = InvalidOid;
662 0 : worker->subid = InvalidOid;
663 0 : worker->relid = InvalidOid;
664 0 : }
665 :
666 : /*
667 : * Cleanup function for logical replication launcher.
668 : *
669 : * Called on logical replication launcher exit.
670 : */
671 : static void
672 2 : logicalrep_launcher_onexit(int code, Datum arg)
673 : {
674 2 : LogicalRepCtx->launcher_pid = 0;
675 2 : }
676 :
677 : /*
678 : * Cleanup function.
679 : *
680 : * Called on logical replication worker exit.
681 : */
682 : static void
683 0 : logicalrep_worker_onexit(int code, Datum arg)
684 : {
685 : /* Disconnect gracefully from the remote side. */
686 0 : if (wrconn)
687 0 : walrcv_disconnect(wrconn);
688 :
689 0 : logicalrep_worker_detach();
690 :
691 0 : ApplyLauncherWakeup();
692 0 : }
693 :
694 : /* SIGHUP: set flag to reload configuration at next convenient time */
695 : static void
696 0 : logicalrep_launcher_sighup(SIGNAL_ARGS)
697 : {
698 0 : int save_errno = errno;
699 :
700 0 : got_SIGHUP = true;
701 :
702 : /* Waken anything waiting on the process latch */
703 0 : SetLatch(MyLatch);
704 :
705 0 : errno = save_errno;
706 0 : }
707 :
708 : /*
709 : * Count the number of registered (not necessarily running) sync workers
710 : * for a subscription.
711 : */
712 : int
713 0 : logicalrep_sync_worker_count(Oid subid)
714 : {
715 : int i;
716 0 : int res = 0;
717 :
718 0 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
719 :
720 : /* Search for attached worker for a given subscription id. */
721 0 : for (i = 0; i < max_logical_replication_workers; i++)
722 : {
723 0 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
724 :
725 0 : if (w->subid == subid && OidIsValid(w->relid))
726 0 : res++;
727 : }
728 :
729 0 : return res;
730 : }
731 :
732 : /*
733 : * ApplyLauncherShmemSize
734 : * Compute space needed for replication launcher shared memory
735 : */
736 : Size
737 30 : ApplyLauncherShmemSize(void)
738 : {
739 : Size size;
740 :
741 : /*
742 : * Need the fixed struct and the array of LogicalRepWorker.
743 : */
744 30 : size = sizeof(LogicalRepCtxStruct);
745 30 : size = MAXALIGN(size);
746 30 : size = add_size(size, mul_size(max_logical_replication_workers,
747 : sizeof(LogicalRepWorker)));
748 30 : return size;
749 : }
750 :
751 : /*
752 : * ApplyLauncherRegister
753 : * Register a background worker running the logical replication launcher.
754 : */
755 : void
756 2 : ApplyLauncherRegister(void)
757 : {
758 : BackgroundWorker bgw;
759 :
760 2 : if (max_logical_replication_workers == 0)
761 2 : return;
762 :
763 2 : memset(&bgw, 0, sizeof(bgw));
764 2 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
765 : BGWORKER_BACKEND_DATABASE_CONNECTION;
766 2 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
767 2 : snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
768 2 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
769 2 : snprintf(bgw.bgw_name, BGW_MAXLEN,
770 : "logical replication launcher");
771 2 : bgw.bgw_restart_time = 5;
772 2 : bgw.bgw_notify_pid = 0;
773 2 : bgw.bgw_main_arg = (Datum) 0;
774 :
775 2 : RegisterBackgroundWorker(&bgw);
776 : }
777 :
778 : /*
779 : * ApplyLauncherShmemInit
780 : * Allocate and initialize replication launcher shared memory
781 : */
782 : void
783 10 : ApplyLauncherShmemInit(void)
784 : {
785 : bool found;
786 :
787 10 : LogicalRepCtx = (LogicalRepCtxStruct *)
788 10 : ShmemInitStruct("Logical Replication Launcher Data",
789 : ApplyLauncherShmemSize(),
790 : &found);
791 :
792 10 : if (!found)
793 : {
794 : int slot;
795 :
796 10 : memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
797 :
798 : /* Initialize memory and spin locks for each worker slot. */
799 50 : for (slot = 0; slot < max_logical_replication_workers; slot++)
800 : {
801 40 : LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
802 :
803 40 : memset(worker, 0, sizeof(LogicalRepWorker));
804 40 : SpinLockInit(&worker->relmutex);
805 : }
806 : }
807 10 : }
808 :
809 : /*
810 : * Check whether current transaction has manipulated logical replication
811 : * workers.
812 : */
813 : bool
814 14 : XactManipulatesLogicalReplicationWorkers(void)
815 : {
816 14 : return (on_commit_stop_workers != NIL);
817 : }
818 :
819 : /*
820 : * Wakeup the launcher on commit if requested.
821 : */
822 : void
823 52322 : AtEOXact_ApplyLauncher(bool isCommit)
824 : {
825 52322 : if (isCommit)
826 : {
827 : ListCell *lc;
828 :
829 45766 : foreach(lc, on_commit_stop_workers)
830 : {
831 0 : LogicalRepWorkerId *wid = lfirst(lc);
832 :
833 0 : logicalrep_worker_stop(wid->subid, wid->relid);
834 : }
835 :
836 45766 : if (on_commit_launcher_wakeup)
837 2 : ApplyLauncherWakeup();
838 : }
839 :
840 : /*
841 : * No need to pfree on_commit_stop_workers. It was allocated in
842 : * transaction memory context, which is going to be cleaned soon.
843 : */
844 52322 : on_commit_stop_workers = NIL;
845 52322 : on_commit_launcher_wakeup = false;
846 52322 : }
847 :
848 : /*
849 : * Request wakeup of the launcher on commit of the transaction.
850 : *
851 : * This is used to send launcher signal to stop sleeping and process the
852 : * subscriptions when current transaction commits. Should be used when new
853 : * tuple was added to the pg_subscription catalog.
854 : */
855 : void
856 2 : ApplyLauncherWakeupAtCommit(void)
857 : {
858 2 : if (!on_commit_launcher_wakeup)
859 2 : on_commit_launcher_wakeup = true;
860 2 : }
861 :
862 : static void
863 2 : ApplyLauncherWakeup(void)
864 : {
865 2 : if (LogicalRepCtx->launcher_pid != 0)
866 2 : kill(LogicalRepCtx->launcher_pid, SIGUSR1);
867 2 : }
868 :
869 : /*
870 : * Main loop for the apply launcher process.
871 : */
872 : void
873 2 : ApplyLauncherMain(Datum main_arg)
874 : {
875 2 : TimestampTz last_start_time = 0;
876 :
877 2 : ereport(DEBUG1,
878 : (errmsg("logical replication launcher started")));
879 :
880 2 : before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
881 :
882 2 : Assert(LogicalRepCtx->launcher_pid == 0);
883 2 : LogicalRepCtx->launcher_pid = MyProcPid;
884 :
885 : /* Establish signal handlers. */
886 2 : pqsignal(SIGHUP, logicalrep_launcher_sighup);
887 2 : pqsignal(SIGTERM, die);
888 2 : BackgroundWorkerUnblockSignals();
889 :
890 : /*
891 : * Establish connection to nailed catalogs (we only ever access
892 : * pg_subscription).
893 : */
894 2 : BackgroundWorkerInitializeConnection(NULL, NULL);
895 :
896 : /* Enter main loop */
897 : for (;;)
898 : {
899 : int rc;
900 : List *sublist;
901 : ListCell *lc;
902 : MemoryContext subctx;
903 : MemoryContext oldctx;
904 : TimestampTz now;
905 160 : long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
906 :
907 160 : CHECK_FOR_INTERRUPTS();
908 :
909 160 : now = GetCurrentTimestamp();
910 :
911 : /* Limit the start retry to once a wal_retrieve_retry_interval */
912 160 : if (TimestampDifferenceExceeds(last_start_time, now,
913 : wal_retrieve_retry_interval))
914 : {
915 : /* Use temporary context for the database list and worker info. */
916 160 : subctx = AllocSetContextCreate(TopMemoryContext,
917 : "Logical Replication Launcher sublist",
918 : ALLOCSET_DEFAULT_MINSIZE,
919 : ALLOCSET_DEFAULT_INITSIZE,
920 : ALLOCSET_DEFAULT_MAXSIZE);
921 160 : oldctx = MemoryContextSwitchTo(subctx);
922 :
923 : /* search for subscriptions to start or stop. */
924 160 : sublist = get_subscription_list();
925 :
926 : /* Start the missing workers for enabled subscriptions. */
927 162 : foreach(lc, sublist)
928 : {
929 2 : Subscription *sub = (Subscription *) lfirst(lc);
930 : LogicalRepWorker *w;
931 :
932 2 : if (!sub->enabled)
933 2 : continue;
934 :
935 0 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
936 0 : w = logicalrep_worker_find(sub->oid, InvalidOid, false);
937 0 : LWLockRelease(LogicalRepWorkerLock);
938 :
939 0 : if (w == NULL)
940 : {
941 0 : last_start_time = now;
942 0 : wait_time = wal_retrieve_retry_interval;
943 :
944 0 : logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
945 : sub->owner, InvalidOid);
946 : }
947 : }
948 :
949 : /* Switch back to original memory context. */
950 160 : MemoryContextSwitchTo(oldctx);
951 : /* Clean the temporary memory. */
952 160 : MemoryContextDelete(subctx);
953 : }
954 : else
955 : {
956 : /*
957 : * The wait in previous cycle was interrupted in less than
958 : * wal_retrieve_retry_interval since last worker was started, this
959 : * usually means crash of the worker, so we should retry in
960 : * wal_retrieve_retry_interval again.
961 : */
962 0 : wait_time = wal_retrieve_retry_interval;
963 : }
964 :
965 : /* Wait for more work. */
966 160 : rc = WaitLatch(MyLatch,
967 : WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
968 : wait_time,
969 : WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
970 :
971 : /* emergency bailout if postmaster has died */
972 160 : if (rc & WL_POSTMASTER_DEATH)
973 0 : proc_exit(1);
974 :
975 160 : if (rc & WL_LATCH_SET)
976 : {
977 160 : ResetLatch(MyLatch);
978 160 : CHECK_FOR_INTERRUPTS();
979 : }
980 :
981 158 : if (got_SIGHUP)
982 : {
983 0 : got_SIGHUP = false;
984 0 : ProcessConfigFile(PGC_SIGHUP);
985 : }
986 158 : }
987 :
988 : /* Not reachable */
989 : }
990 :
991 : /*
992 : * Is current process the logical replication launcher?
993 : */
994 : bool
995 2 : IsLogicalLauncher(void)
996 : {
997 2 : return LogicalRepCtx->launcher_pid == MyProcPid;
998 : }
999 :
1000 : /*
1001 : * Returns state of the subscriptions.
1002 : */
1003 : Datum
1004 0 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
1005 : {
1006 : #define PG_STAT_GET_SUBSCRIPTION_COLS 8
1007 0 : Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
1008 : int i;
1009 0 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1010 : TupleDesc tupdesc;
1011 : Tuplestorestate *tupstore;
1012 : MemoryContext per_query_ctx;
1013 : MemoryContext oldcontext;
1014 :
1015 : /* check to see if caller supports us returning a tuplestore */
1016 0 : if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1017 0 : ereport(ERROR,
1018 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1019 : errmsg("set-valued function called in context that cannot accept a set")));
1020 0 : if (!(rsinfo->allowedModes & SFRM_Materialize))
1021 0 : ereport(ERROR,
1022 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1023 : errmsg("materialize mode required, but it is not " \
1024 : "allowed in this context")));
1025 :
1026 : /* Build a tuple descriptor for our result type */
1027 0 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1028 0 : elog(ERROR, "return type must be a row type");
1029 :
1030 0 : per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1031 0 : oldcontext = MemoryContextSwitchTo(per_query_ctx);
1032 :
1033 0 : tupstore = tuplestore_begin_heap(true, false, work_mem);
1034 0 : rsinfo->returnMode = SFRM_Materialize;
1035 0 : rsinfo->setResult = tupstore;
1036 0 : rsinfo->setDesc = tupdesc;
1037 :
1038 0 : MemoryContextSwitchTo(oldcontext);
1039 :
1040 : /* Make sure we get consistent view of the workers. */
1041 0 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1042 :
1043 0 : for (i = 0; i <= max_logical_replication_workers; i++)
1044 : {
1045 : /* for each row */
1046 : Datum values[PG_STAT_GET_SUBSCRIPTION_COLS];
1047 : bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS];
1048 : int worker_pid;
1049 : LogicalRepWorker worker;
1050 :
1051 0 : memcpy(&worker, &LogicalRepCtx->workers[i],
1052 : sizeof(LogicalRepWorker));
1053 0 : if (!worker.proc || !IsBackendPid(worker.proc->pid))
1054 0 : continue;
1055 :
1056 0 : if (OidIsValid(subid) && worker.subid != subid)
1057 0 : continue;
1058 :
1059 0 : worker_pid = worker.proc->pid;
1060 :
1061 0 : MemSet(values, 0, sizeof(values));
1062 0 : MemSet(nulls, 0, sizeof(nulls));
1063 :
1064 0 : values[0] = ObjectIdGetDatum(worker.subid);
1065 0 : if (OidIsValid(worker.relid))
1066 0 : values[1] = ObjectIdGetDatum(worker.relid);
1067 : else
1068 0 : nulls[1] = true;
1069 0 : values[2] = Int32GetDatum(worker_pid);
1070 0 : if (XLogRecPtrIsInvalid(worker.last_lsn))
1071 0 : nulls[3] = true;
1072 : else
1073 0 : values[3] = LSNGetDatum(worker.last_lsn);
1074 0 : if (worker.last_send_time == 0)
1075 0 : nulls[4] = true;
1076 : else
1077 0 : values[4] = TimestampTzGetDatum(worker.last_send_time);
1078 0 : if (worker.last_recv_time == 0)
1079 0 : nulls[5] = true;
1080 : else
1081 0 : values[5] = TimestampTzGetDatum(worker.last_recv_time);
1082 0 : if (XLogRecPtrIsInvalid(worker.reply_lsn))
1083 0 : nulls[6] = true;
1084 : else
1085 0 : values[6] = LSNGetDatum(worker.reply_lsn);
1086 0 : if (worker.reply_time == 0)
1087 0 : nulls[7] = true;
1088 : else
1089 0 : values[7] = TimestampTzGetDatum(worker.reply_time);
1090 :
1091 0 : tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1092 :
1093 : /*
1094 : * If only a single subscription was requested, and we found it,
1095 : * break.
1096 : */
1097 0 : if (OidIsValid(subid))
1098 0 : break;
1099 : }
1100 :
1101 0 : LWLockRelease(LogicalRepWorkerLock);
1102 :
1103 : /* clean up and return the tuplestore */
1104 : tuplestore_donestoring(tupstore);
1105 :
1106 0 : return (Datum) 0;
1107 : }
|