LCOV - code coverage report
Current view: top level - src/backend/replication/logical - launcher.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 110 371 29.6 %
Date: 2017-09-29 13:40:31 Functions: 12 26 46.2 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * launcher.c
       3             :  *     PostgreSQL logical replication worker launcher process
       4             :  *
       5             :  * Copyright (c) 2016-2017, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/launcher.c
       9             :  *
      10             :  * NOTES
      11             :  *    This module contains the logical replication worker launcher which
      12             :  *    uses the background worker infrastructure to start the logical
      13             :  *    replication workers for every enabled subscription.
      14             :  *
      15             :  *-------------------------------------------------------------------------
      16             :  */
      17             : 
      18             : #include "postgres.h"
      19             : 
      20             : #include "funcapi.h"
      21             : #include "miscadmin.h"
      22             : #include "pgstat.h"
      23             : 
      24             : #include "access/heapam.h"
      25             : #include "access/htup.h"
      26             : #include "access/htup_details.h"
      27             : #include "access/xact.h"
      28             : 
      29             : #include "catalog/pg_subscription.h"
      30             : #include "catalog/pg_subscription_rel.h"
      31             : 
      32             : #include "libpq/pqsignal.h"
      33             : 
      34             : #include "postmaster/bgworker.h"
      35             : #include "postmaster/fork_process.h"
      36             : #include "postmaster/postmaster.h"
      37             : 
      38             : #include "replication/logicallauncher.h"
      39             : #include "replication/logicalworker.h"
      40             : #include "replication/slot.h"
      41             : #include "replication/walreceiver.h"
      42             : #include "replication/worker_internal.h"
      43             : 
      44             : #include "storage/ipc.h"
      45             : #include "storage/proc.h"
      46             : #include "storage/procarray.h"
      47             : #include "storage/procsignal.h"
      48             : 
      49             : #include "tcop/tcopprot.h"
      50             : 
      51             : #include "utils/memutils.h"
      52             : #include "utils/pg_lsn.h"
      53             : #include "utils/ps_status.h"
      54             : #include "utils/timeout.h"
      55             : #include "utils/snapmgr.h"
      56             : 
      57             : /* max sleep time between cycles (3min) */
      58             : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
      59             : 
      60             : int         max_logical_replication_workers = 4;
      61             : int         max_sync_workers_per_subscription = 2;
      62             : 
      63             : LogicalRepWorker *MyLogicalRepWorker = NULL;
      64             : 
      65             : typedef struct LogicalRepCtxStruct
      66             : {
      67             :     /* Supervisor process. */
      68             :     pid_t       launcher_pid;
      69             : 
      70             :     /* Background workers. */
      71             :     LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
      72             : } LogicalRepCtxStruct;
      73             : 
      74             : LogicalRepCtxStruct *LogicalRepCtx;
      75             : 
      76             : typedef struct LogicalRepWorkerId
      77             : {
      78             :     Oid         subid;
      79             :     Oid         relid;
      80             : } LogicalRepWorkerId;
      81             : 
      82             : static List *on_commit_stop_workers = NIL;
      83             : 
      84             : static void ApplyLauncherWakeup(void);
      85             : static void logicalrep_launcher_onexit(int code, Datum arg);
      86             : static void logicalrep_worker_onexit(int code, Datum arg);
      87             : static void logicalrep_worker_detach(void);
      88             : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
      89             : 
      90             : /* Flags set by signal handlers */
      91             : static volatile sig_atomic_t got_SIGHUP = false;
      92             : 
      93             : static bool on_commit_launcher_wakeup = false;
      94             : 
      95             : Datum       pg_stat_get_subscription(PG_FUNCTION_ARGS);
      96             : 
      97             : 
      98             : /*
      99             :  * Load the list of subscriptions.
     100             :  *
     101             :  * Only the fields interesting for worker start/stop functions are filled for
     102             :  * each subscription.
     103             :  */
     104             : static List *
     105         160 : get_subscription_list(void)
     106             : {
     107         160 :     List       *res = NIL;
     108             :     Relation    rel;
     109             :     HeapScanDesc scan;
     110             :     HeapTuple   tup;
     111             :     MemoryContext resultcxt;
     112             : 
     113             :     /* This is the context that we will allocate our output data in */
     114         160 :     resultcxt = CurrentMemoryContext;
     115             : 
     116             :     /*
     117             :      * Start a transaction so we can access pg_database, and get a snapshot.
     118             :      * We don't have a use for the snapshot itself, but we're interested in
     119             :      * the secondary effect that it sets RecentGlobalXmin.  (This is critical
     120             :      * for anything that reads heap pages, because HOT may decide to prune
     121             :      * them even if the process doesn't attempt to modify any tuples.)
     122             :      */
     123         160 :     StartTransactionCommand();
     124         160 :     (void) GetTransactionSnapshot();
     125             : 
     126         160 :     rel = heap_open(SubscriptionRelationId, AccessShareLock);
     127         160 :     scan = heap_beginscan_catalog(rel, 0, NULL);
     128             : 
     129         322 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     130             :     {
     131           2 :         Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
     132             :         Subscription *sub;
     133             :         MemoryContext oldcxt;
     134             : 
     135             :         /*
     136             :          * Allocate our results in the caller's context, not the
     137             :          * transaction's. We do this inside the loop, and restore the original
     138             :          * context at the end, so that leaky things like heap_getnext() are
     139             :          * not called in a potentially long-lived context.
     140             :          */
     141           2 :         oldcxt = MemoryContextSwitchTo(resultcxt);
     142             : 
     143           2 :         sub = (Subscription *) palloc0(sizeof(Subscription));
     144           2 :         sub->oid = HeapTupleGetOid(tup);
     145           2 :         sub->dbid = subform->subdbid;
     146           2 :         sub->owner = subform->subowner;
     147           2 :         sub->enabled = subform->subenabled;
     148           2 :         sub->name = pstrdup(NameStr(subform->subname));
     149             :         /* We don't fill fields we are not interested in. */
     150             : 
     151           2 :         res = lappend(res, sub);
     152           2 :         MemoryContextSwitchTo(oldcxt);
     153             :     }
     154             : 
     155         160 :     heap_endscan(scan);
     156         160 :     heap_close(rel, AccessShareLock);
     157             : 
     158         160 :     CommitTransactionCommand();
     159             : 
     160         160 :     return res;
     161             : }
     162             : 
     163             : /*
     164             :  * Wait for a background worker to start up and attach to the shmem context.
     165             :  *
     166             :  * This is only needed for cleaning up the shared memory in case the worker
     167             :  * fails to attach.
     168             :  */
     169             : static void
     170           0 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
     171             :                                BackgroundWorkerHandle *handle)
     172             : {
     173             :     BgwHandleStatus status;
     174             :     int         rc;
     175             :     uint16      generation;
     176             : 
     177             :     /* Remember generation for future identification. */
     178           0 :     generation = worker->generation;
     179             : 
     180             :     for (;;)
     181             :     {
     182             :         pid_t       pid;
     183             : 
     184           0 :         CHECK_FOR_INTERRUPTS();
     185             : 
     186           0 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     187             : 
     188             :         /* Worker either died or has started; no need to do anything. */
     189           0 :         if (!worker->in_use || worker->proc)
     190             :         {
     191           0 :             LWLockRelease(LogicalRepWorkerLock);
     192           0 :             return;
     193             :         }
     194             : 
     195           0 :         LWLockRelease(LogicalRepWorkerLock);
     196             : 
     197             :         /* Check if worker has died before attaching, and clean up after it. */
     198           0 :         status = GetBackgroundWorkerPid(handle, &pid);
     199             : 
     200           0 :         if (status == BGWH_STOPPED)
     201             :         {
     202           0 :             LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     203             :             /* Ensure that this was indeed the worker we waited for. */
     204           0 :             if (generation == worker->generation)
     205           0 :                 logicalrep_worker_cleanup(worker);
     206           0 :             LWLockRelease(LogicalRepWorkerLock);
     207           0 :             return;
     208             :         }
     209             : 
     210             :         /*
     211             :          * We need timeout because we generally don't get notified via latch
     212             :          * about the worker attach.  But we don't expect to have to wait long.
     213             :          */
     214           0 :         rc = WaitLatch(MyLatch,
     215             :                        WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
     216             :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     217             : 
     218             :         /* emergency bailout if postmaster has died */
     219           0 :         if (rc & WL_POSTMASTER_DEATH)
     220           0 :             proc_exit(1);
     221             : 
     222           0 :         if (rc & WL_LATCH_SET)
     223             :         {
     224           0 :             ResetLatch(MyLatch);
     225           0 :             CHECK_FOR_INTERRUPTS();
     226             :         }
     227           0 :     }
     228             : 
     229             :     return;
     230             : }
     231             : 
     232             : /*
     233             :  * Walks the workers array and searches for one that matches given
     234             :  * subscription id and relid.
     235             :  */
     236             : LogicalRepWorker *
     237           0 : logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
     238             : {
     239             :     int         i;
     240           0 :     LogicalRepWorker *res = NULL;
     241             : 
     242           0 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     243             : 
     244             :     /* Search for attached worker for a given subscription id. */
     245           0 :     for (i = 0; i < max_logical_replication_workers; i++)
     246             :     {
     247           0 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     248             : 
     249           0 :         if (w->in_use && w->subid == subid && w->relid == relid &&
     250           0 :             (!only_running || w->proc))
     251             :         {
     252           0 :             res = w;
     253           0 :             break;
     254             :         }
     255             :     }
     256             : 
     257           0 :     return res;
     258             : }
     259             : 
     260             : /*
     261             :  * Similar to logicalrep_worker_find(), but returns list of all workers for
     262             :  * the subscription, instead just one.
     263             :  */
     264             : List *
     265           6 : logicalrep_workers_find(Oid subid, bool only_running)
     266             : {
     267             :     int         i;
     268           6 :     List       *res = NIL;
     269             : 
     270           6 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     271             : 
     272             :     /* Search for attached worker for a given subscription id. */
     273          30 :     for (i = 0; i < max_logical_replication_workers; i++)
     274             :     {
     275          24 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     276             : 
     277          24 :         if (w->in_use && w->subid == subid && (!only_running || w->proc))
     278           0 :             res = lappend(res, w);
     279             :     }
     280             : 
     281           6 :     return res;
     282             : }
     283             : 
     284             : /*
     285             :  * Start new apply background worker.
     286             :  */
     287             : void
     288           0 : logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
     289             :                          Oid relid)
     290             : {
     291             :     BackgroundWorker bgw;
     292             :     BackgroundWorkerHandle *bgw_handle;
     293             :     int         i;
     294           0 :     int         slot = 0;
     295           0 :     LogicalRepWorker *worker = NULL;
     296             :     int         nsyncworkers;
     297             :     TimestampTz now;
     298             : 
     299           0 :     ereport(DEBUG1,
     300             :             (errmsg("starting logical replication worker for subscription \"%s\"",
     301             :                     subname)));
     302             : 
     303             :     /* Report this after the initial starting message for consistency. */
     304           0 :     if (max_replication_slots == 0)
     305           0 :         ereport(ERROR,
     306             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     307             :                  errmsg("cannot start logical replication workers when max_replication_slots = 0")));
     308             : 
     309             :     /*
     310             :      * We need to do the modification of the shared memory under lock so that
     311             :      * we have consistent view.
     312             :      */
     313           0 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     314             : 
     315             : retry:
     316             :     /* Find unused worker slot. */
     317           0 :     for (i = 0; i < max_logical_replication_workers; i++)
     318             :     {
     319           0 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     320             : 
     321           0 :         if (!w->in_use)
     322             :         {
     323           0 :             worker = w;
     324           0 :             slot = i;
     325           0 :             break;
     326             :         }
     327             :     }
     328             : 
     329           0 :     nsyncworkers = logicalrep_sync_worker_count(subid);
     330             : 
     331           0 :     now = GetCurrentTimestamp();
     332             : 
     333             :     /*
     334             :      * If we didn't find a free slot, try to do garbage collection.  The
     335             :      * reason we do this is because if some worker failed to start up and its
     336             :      * parent has crashed while waiting, the in_use state was never cleared.
     337             :      */
     338           0 :     if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
     339             :     {
     340           0 :         bool        did_cleanup = false;
     341             : 
     342           0 :         for (i = 0; i < max_logical_replication_workers; i++)
     343             :         {
     344           0 :             LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     345             : 
     346             :             /*
     347             :              * If the worker was marked in use but didn't manage to attach in
     348             :              * time, clean it up.
     349             :              */
     350           0 :             if (w->in_use && !w->proc &&
     351           0 :                 TimestampDifferenceExceeds(w->launch_time, now,
     352             :                                            wal_receiver_timeout))
     353             :             {
     354           0 :                 elog(WARNING,
     355             :                      "logical replication worker for subscription %u took too long to start; canceled",
     356             :                      w->subid);
     357             : 
     358           0 :                 logicalrep_worker_cleanup(w);
     359           0 :                 did_cleanup = true;
     360             :             }
     361             :         }
     362             : 
     363           0 :         if (did_cleanup)
     364           0 :             goto retry;
     365             :     }
     366             : 
     367             :     /*
     368             :      * If we reached the sync worker limit per subscription, just exit
     369             :      * silently as we might get here because of an otherwise harmless race
     370             :      * condition.
     371             :      */
     372           0 :     if (nsyncworkers >= max_sync_workers_per_subscription)
     373             :     {
     374           0 :         LWLockRelease(LogicalRepWorkerLock);
     375           0 :         return;
     376             :     }
     377             : 
     378             :     /*
     379             :      * However if there are no more free worker slots, inform user about it
     380             :      * before exiting.
     381             :      */
     382           0 :     if (worker == NULL)
     383             :     {
     384           0 :         LWLockRelease(LogicalRepWorkerLock);
     385           0 :         ereport(WARNING,
     386             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     387             :                  errmsg("out of logical replication worker slots"),
     388             :                  errhint("You might need to increase max_logical_replication_workers.")));
     389           0 :         return;
     390             :     }
     391             : 
     392             :     /* Prepare the worker slot. */
     393           0 :     worker->launch_time = now;
     394           0 :     worker->in_use = true;
     395           0 :     worker->generation++;
     396           0 :     worker->proc = NULL;
     397           0 :     worker->dbid = dbid;
     398           0 :     worker->userid = userid;
     399           0 :     worker->subid = subid;
     400           0 :     worker->relid = relid;
     401           0 :     worker->relstate = SUBREL_STATE_UNKNOWN;
     402           0 :     worker->relstate_lsn = InvalidXLogRecPtr;
     403           0 :     worker->last_lsn = InvalidXLogRecPtr;
     404           0 :     TIMESTAMP_NOBEGIN(worker->last_send_time);
     405           0 :     TIMESTAMP_NOBEGIN(worker->last_recv_time);
     406           0 :     worker->reply_lsn = InvalidXLogRecPtr;
     407           0 :     TIMESTAMP_NOBEGIN(worker->reply_time);
     408             : 
     409           0 :     LWLockRelease(LogicalRepWorkerLock);
     410             : 
     411             :     /* Register the new dynamic worker. */
     412           0 :     memset(&bgw, 0, sizeof(bgw));
     413           0 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
     414             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     415           0 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
     416           0 :     snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
     417           0 :     snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
     418           0 :     if (OidIsValid(relid))
     419           0 :         snprintf(bgw.bgw_name, BGW_MAXLEN,
     420             :                  "logical replication worker for subscription %u sync %u", subid, relid);
     421             :     else
     422           0 :         snprintf(bgw.bgw_name, BGW_MAXLEN,
     423             :                  "logical replication worker for subscription %u", subid);
     424             : 
     425           0 :     bgw.bgw_restart_time = BGW_NEVER_RESTART;
     426           0 :     bgw.bgw_notify_pid = MyProcPid;
     427           0 :     bgw.bgw_main_arg = Int32GetDatum(slot);
     428             : 
     429           0 :     if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
     430             :     {
     431           0 :         ereport(WARNING,
     432             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     433             :                  errmsg("out of background worker slots"),
     434             :                  errhint("You might need to increase max_worker_processes.")));
     435           0 :         return;
     436             :     }
     437             : 
     438             :     /* Now wait until it attaches. */
     439           0 :     WaitForReplicationWorkerAttach(worker, bgw_handle);
     440             : }
     441             : 
     442             : /*
     443             :  * Stop the logical replication worker for subid/relid, if any, and wait until
     444             :  * it detaches from the slot.
     445             :  */
     446             : void
     447           0 : logicalrep_worker_stop(Oid subid, Oid relid)
     448             : {
     449             :     LogicalRepWorker *worker;
     450             :     uint16      generation;
     451             : 
     452           0 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     453             : 
     454           0 :     worker = logicalrep_worker_find(subid, relid, false);
     455             : 
     456             :     /* No worker, nothing to do. */
     457           0 :     if (!worker)
     458             :     {
     459           0 :         LWLockRelease(LogicalRepWorkerLock);
     460           0 :         return;
     461             :     }
     462             : 
     463             :     /*
     464             :      * Remember which generation was our worker so we can check if what we see
     465             :      * is still the same one.
     466             :      */
     467           0 :     generation = worker->generation;
     468             : 
     469             :     /*
     470             :      * If we found a worker but it does not have proc set then it is still
     471             :      * starting up; wait for it to finish starting and then kill it.
     472             :      */
     473           0 :     while (worker->in_use && !worker->proc)
     474             :     {
     475             :         int         rc;
     476             : 
     477           0 :         LWLockRelease(LogicalRepWorkerLock);
     478             : 
     479             :         /* Wait a bit --- we don't expect to have to wait long. */
     480           0 :         rc = WaitLatch(MyLatch,
     481             :                        WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
     482             :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     483             : 
     484             :         /* emergency bailout if postmaster has died */
     485           0 :         if (rc & WL_POSTMASTER_DEATH)
     486           0 :             proc_exit(1);
     487             : 
     488           0 :         if (rc & WL_LATCH_SET)
     489             :         {
     490           0 :             ResetLatch(MyLatch);
     491           0 :             CHECK_FOR_INTERRUPTS();
     492             :         }
     493             : 
     494             :         /* Recheck worker status. */
     495           0 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     496             : 
     497             :         /*
     498             :          * Check whether the worker slot is no longer used, which would mean
     499             :          * that the worker has exited, or whether the worker generation is
     500             :          * different, meaning that a different worker has taken the slot.
     501             :          */
     502           0 :         if (!worker->in_use || worker->generation != generation)
     503             :         {
     504           0 :             LWLockRelease(LogicalRepWorkerLock);
     505           0 :             return;
     506             :         }
     507             : 
     508             :         /* Worker has assigned proc, so it has started. */
     509           0 :         if (worker->proc)
     510           0 :             break;
     511             :     }
     512             : 
     513             :     /* Now terminate the worker ... */
     514           0 :     kill(worker->proc->pid, SIGTERM);
     515             : 
     516             :     /* ... and wait for it to die. */
     517             :     for (;;)
     518             :     {
     519             :         int         rc;
     520             : 
     521             :         /* is it gone? */
     522           0 :         if (!worker->proc || worker->generation != generation)
     523             :             break;
     524             : 
     525           0 :         LWLockRelease(LogicalRepWorkerLock);
     526             : 
     527             :         /* Wait a bit --- we don't expect to have to wait long. */
     528           0 :         rc = WaitLatch(MyLatch,
     529             :                        WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
     530             :                        10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
     531             : 
     532             :         /* emergency bailout if postmaster has died */
     533           0 :         if (rc & WL_POSTMASTER_DEATH)
     534           0 :             proc_exit(1);
     535             : 
     536           0 :         if (rc & WL_LATCH_SET)
     537             :         {
     538           0 :             ResetLatch(MyLatch);
     539           0 :             CHECK_FOR_INTERRUPTS();
     540             :         }
     541             : 
     542           0 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     543           0 :     }
     544             : 
     545           0 :     LWLockRelease(LogicalRepWorkerLock);
     546             : }
     547             : 
     548             : /*
     549             :  * Request worker for specified sub/rel to be stopped on commit.
     550             :  */
     551             : void
     552           0 : logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
     553             : {
     554             :     LogicalRepWorkerId *wid;
     555             :     MemoryContext oldctx;
     556             : 
     557             :     /* Make sure we store the info in context that survives until commit. */
     558           0 :     oldctx = MemoryContextSwitchTo(TopTransactionContext);
     559             : 
     560           0 :     wid = palloc(sizeof(LogicalRepWorkerId));
     561           0 :     wid->subid = subid;
     562           0 :     wid->relid = relid;
     563             : 
     564           0 :     on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
     565             : 
     566           0 :     MemoryContextSwitchTo(oldctx);
     567           0 : }
     568             : 
     569             : /*
     570             :  * Wake up (using latch) any logical replication worker for specified sub/rel.
     571             :  */
     572             : void
     573           0 : logicalrep_worker_wakeup(Oid subid, Oid relid)
     574             : {
     575             :     LogicalRepWorker *worker;
     576             : 
     577           0 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     578             : 
     579           0 :     worker = logicalrep_worker_find(subid, relid, true);
     580             : 
     581           0 :     if (worker)
     582           0 :         logicalrep_worker_wakeup_ptr(worker);
     583             : 
     584           0 :     LWLockRelease(LogicalRepWorkerLock);
     585           0 : }
     586             : 
     587             : /*
     588             :  * Wake up (using latch) the specified logical replication worker.
     589             :  *
     590             :  * Caller must hold lock, else worker->proc could change under us.
     591             :  */
     592             : void
     593           0 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
     594             : {
     595           0 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     596             : 
     597           0 :     SetLatch(&worker->proc->procLatch);
     598           0 : }
     599             : 
     600             : /*
     601             :  * Attach to a slot.
     602             :  */
     603             : void
     604           0 : logicalrep_worker_attach(int slot)
     605             : {
     606             :     /* Block concurrent access. */
     607           0 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     608             : 
     609           0 :     Assert(slot >= 0 && slot < max_logical_replication_workers);
     610           0 :     MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
     611             : 
     612           0 :     if (!MyLogicalRepWorker->in_use)
     613             :     {
     614           0 :         LWLockRelease(LogicalRepWorkerLock);
     615           0 :         ereport(ERROR,
     616             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     617             :                  errmsg("logical replication worker slot %d is empty, cannot attach",
     618             :                         slot)));
     619             :     }
     620             : 
     621           0 :     if (MyLogicalRepWorker->proc)
     622             :     {
     623           0 :         LWLockRelease(LogicalRepWorkerLock);
     624           0 :         ereport(ERROR,
     625             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     626             :                  errmsg("logical replication worker slot %d is already used by "
     627             :                         "another worker, cannot attach", slot)));
     628             :     }
     629             : 
     630           0 :     MyLogicalRepWorker->proc = MyProc;
     631           0 :     before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
     632             : 
     633           0 :     LWLockRelease(LogicalRepWorkerLock);
     634           0 : }
     635             : 
     636             : /*
     637             :  * Detach the worker (cleans up the worker info).
     638             :  */
     639             : static void
     640           0 : logicalrep_worker_detach(void)
     641             : {
     642             :     /* Block concurrent access. */
     643           0 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     644             : 
     645           0 :     logicalrep_worker_cleanup(MyLogicalRepWorker);
     646             : 
     647           0 :     LWLockRelease(LogicalRepWorkerLock);
     648           0 : }
     649             : 
     650             : /*
     651             :  * Clean up worker info.
     652             :  */
     653             : static void
     654           0 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
     655             : {
     656           0 :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
     657             : 
     658           0 :     worker->in_use = false;
     659           0 :     worker->proc = NULL;
     660           0 :     worker->dbid = InvalidOid;
     661           0 :     worker->userid = InvalidOid;
     662           0 :     worker->subid = InvalidOid;
     663           0 :     worker->relid = InvalidOid;
     664           0 : }
     665             : 
     666             : /*
     667             :  * Cleanup function for logical replication launcher.
     668             :  *
     669             :  * Called on logical replication launcher exit.
     670             :  */
     671             : static void
     672           2 : logicalrep_launcher_onexit(int code, Datum arg)
     673             : {
     674           2 :     LogicalRepCtx->launcher_pid = 0;
     675           2 : }
     676             : 
     677             : /*
     678             :  * Cleanup function.
     679             :  *
     680             :  * Called on logical replication worker exit.
     681             :  */
     682             : static void
     683           0 : logicalrep_worker_onexit(int code, Datum arg)
     684             : {
     685             :     /* Disconnect gracefully from the remote side. */
     686           0 :     if (wrconn)
     687           0 :         walrcv_disconnect(wrconn);
     688             : 
     689           0 :     logicalrep_worker_detach();
     690             : 
     691           0 :     ApplyLauncherWakeup();
     692           0 : }
     693             : 
     694             : /* SIGHUP: set flag to reload configuration at next convenient time */
     695             : static void
     696           0 : logicalrep_launcher_sighup(SIGNAL_ARGS)
     697             : {
     698           0 :     int         save_errno = errno;
     699             : 
     700           0 :     got_SIGHUP = true;
     701             : 
     702             :     /* Waken anything waiting on the process latch */
     703           0 :     SetLatch(MyLatch);
     704             : 
     705           0 :     errno = save_errno;
     706           0 : }
     707             : 
     708             : /*
     709             :  * Count the number of registered (not necessarily running) sync workers
     710             :  * for a subscription.
     711             :  */
     712             : int
     713           0 : logicalrep_sync_worker_count(Oid subid)
     714             : {
     715             :     int         i;
     716           0 :     int         res = 0;
     717             : 
     718           0 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     719             : 
     720             :     /* Search for attached worker for a given subscription id. */
     721           0 :     for (i = 0; i < max_logical_replication_workers; i++)
     722             :     {
     723           0 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     724             : 
     725           0 :         if (w->subid == subid && OidIsValid(w->relid))
     726           0 :             res++;
     727             :     }
     728             : 
     729           0 :     return res;
     730             : }
     731             : 
     732             : /*
     733             :  * ApplyLauncherShmemSize
     734             :  *      Compute space needed for replication launcher shared memory
     735             :  */
     736             : Size
     737          30 : ApplyLauncherShmemSize(void)
     738             : {
     739             :     Size        size;
     740             : 
     741             :     /*
     742             :      * Need the fixed struct and the array of LogicalRepWorker.
     743             :      */
     744          30 :     size = sizeof(LogicalRepCtxStruct);
     745          30 :     size = MAXALIGN(size);
     746          30 :     size = add_size(size, mul_size(max_logical_replication_workers,
     747             :                                    sizeof(LogicalRepWorker)));
     748          30 :     return size;
     749             : }
     750             : 
     751             : /*
     752             :  * ApplyLauncherRegister
     753             :  *      Register a background worker running the logical replication launcher.
     754             :  */
     755             : void
     756           2 : ApplyLauncherRegister(void)
     757             : {
     758             :     BackgroundWorker bgw;
     759             : 
     760           2 :     if (max_logical_replication_workers == 0)
     761           2 :         return;
     762             : 
     763           2 :     memset(&bgw, 0, sizeof(bgw));
     764           2 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
     765             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     766           2 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
     767           2 :     snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
     768           2 :     snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
     769           2 :     snprintf(bgw.bgw_name, BGW_MAXLEN,
     770             :              "logical replication launcher");
     771           2 :     bgw.bgw_restart_time = 5;
     772           2 :     bgw.bgw_notify_pid = 0;
     773           2 :     bgw.bgw_main_arg = (Datum) 0;
     774             : 
     775           2 :     RegisterBackgroundWorker(&bgw);
     776             : }
     777             : 
     778             : /*
     779             :  * ApplyLauncherShmemInit
     780             :  *      Allocate and initialize replication launcher shared memory
     781             :  */
     782             : void
     783          10 : ApplyLauncherShmemInit(void)
     784             : {
     785             :     bool        found;
     786             : 
     787          10 :     LogicalRepCtx = (LogicalRepCtxStruct *)
     788          10 :         ShmemInitStruct("Logical Replication Launcher Data",
     789             :                         ApplyLauncherShmemSize(),
     790             :                         &found);
     791             : 
     792          10 :     if (!found)
     793             :     {
     794             :         int         slot;
     795             : 
     796          10 :         memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
     797             : 
     798             :         /* Initialize memory and spin locks for each worker slot. */
     799          50 :         for (slot = 0; slot < max_logical_replication_workers; slot++)
     800             :         {
     801          40 :             LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
     802             : 
     803          40 :             memset(worker, 0, sizeof(LogicalRepWorker));
     804          40 :             SpinLockInit(&worker->relmutex);
     805             :         }
     806             :     }
     807          10 : }
     808             : 
     809             : /*
     810             :  * Check whether current transaction has manipulated logical replication
     811             :  * workers.
     812             :  */
     813             : bool
     814          14 : XactManipulatesLogicalReplicationWorkers(void)
     815             : {
     816          14 :     return (on_commit_stop_workers != NIL);
     817             : }
     818             : 
     819             : /*
     820             :  * Wakeup the launcher on commit if requested.
     821             :  */
     822             : void
     823       52322 : AtEOXact_ApplyLauncher(bool isCommit)
     824             : {
     825       52322 :     if (isCommit)
     826             :     {
     827             :         ListCell   *lc;
     828             : 
     829       45766 :         foreach(lc, on_commit_stop_workers)
     830             :         {
     831           0 :             LogicalRepWorkerId *wid = lfirst(lc);
     832             : 
     833           0 :             logicalrep_worker_stop(wid->subid, wid->relid);
     834             :         }
     835             : 
     836       45766 :         if (on_commit_launcher_wakeup)
     837           2 :             ApplyLauncherWakeup();
     838             :     }
     839             : 
     840             :     /*
     841             :      * No need to pfree on_commit_stop_workers.  It was allocated in
     842             :      * transaction memory context, which is going to be cleaned soon.
     843             :      */
     844       52322 :     on_commit_stop_workers = NIL;
     845       52322 :     on_commit_launcher_wakeup = false;
     846       52322 : }
     847             : 
     848             : /*
     849             :  * Request wakeup of the launcher on commit of the transaction.
     850             :  *
     851             :  * This is used to send launcher signal to stop sleeping and process the
     852             :  * subscriptions when current transaction commits. Should be used when new
     853             :  * tuple was added to the pg_subscription catalog.
     854             : */
     855             : void
     856           2 : ApplyLauncherWakeupAtCommit(void)
     857             : {
     858           2 :     if (!on_commit_launcher_wakeup)
     859           2 :         on_commit_launcher_wakeup = true;
     860           2 : }
     861             : 
     862             : static void
     863           2 : ApplyLauncherWakeup(void)
     864             : {
     865           2 :     if (LogicalRepCtx->launcher_pid != 0)
     866           2 :         kill(LogicalRepCtx->launcher_pid, SIGUSR1);
     867           2 : }
     868             : 
     869             : /*
     870             :  * Main loop for the apply launcher process.
     871             :  */
     872             : void
     873           2 : ApplyLauncherMain(Datum main_arg)
     874             : {
     875           2 :     TimestampTz last_start_time = 0;
     876             : 
     877           2 :     ereport(DEBUG1,
     878             :             (errmsg("logical replication launcher started")));
     879             : 
     880           2 :     before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
     881             : 
     882           2 :     Assert(LogicalRepCtx->launcher_pid == 0);
     883           2 :     LogicalRepCtx->launcher_pid = MyProcPid;
     884             : 
     885             :     /* Establish signal handlers. */
     886           2 :     pqsignal(SIGHUP, logicalrep_launcher_sighup);
     887           2 :     pqsignal(SIGTERM, die);
     888           2 :     BackgroundWorkerUnblockSignals();
     889             : 
     890             :     /*
     891             :      * Establish connection to nailed catalogs (we only ever access
     892             :      * pg_subscription).
     893             :      */
     894           2 :     BackgroundWorkerInitializeConnection(NULL, NULL);
     895             : 
     896             :     /* Enter main loop */
     897             :     for (;;)
     898             :     {
     899             :         int         rc;
     900             :         List       *sublist;
     901             :         ListCell   *lc;
     902             :         MemoryContext subctx;
     903             :         MemoryContext oldctx;
     904             :         TimestampTz now;
     905         160 :         long        wait_time = DEFAULT_NAPTIME_PER_CYCLE;
     906             : 
     907         160 :         CHECK_FOR_INTERRUPTS();
     908             : 
     909         160 :         now = GetCurrentTimestamp();
     910             : 
     911             :         /* Limit the start retry to once a wal_retrieve_retry_interval */
     912         160 :         if (TimestampDifferenceExceeds(last_start_time, now,
     913             :                                        wal_retrieve_retry_interval))
     914             :         {
     915             :             /* Use temporary context for the database list and worker info. */
     916         160 :             subctx = AllocSetContextCreate(TopMemoryContext,
     917             :                                            "Logical Replication Launcher sublist",
     918             :                                            ALLOCSET_DEFAULT_MINSIZE,
     919             :                                            ALLOCSET_DEFAULT_INITSIZE,
     920             :                                            ALLOCSET_DEFAULT_MAXSIZE);
     921         160 :             oldctx = MemoryContextSwitchTo(subctx);
     922             : 
     923             :             /* search for subscriptions to start or stop. */
     924         160 :             sublist = get_subscription_list();
     925             : 
     926             :             /* Start the missing workers for enabled subscriptions. */
     927         162 :             foreach(lc, sublist)
     928             :             {
     929           2 :                 Subscription *sub = (Subscription *) lfirst(lc);
     930             :                 LogicalRepWorker *w;
     931             : 
     932           2 :                 if (!sub->enabled)
     933           2 :                     continue;
     934             : 
     935           0 :                 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     936           0 :                 w = logicalrep_worker_find(sub->oid, InvalidOid, false);
     937           0 :                 LWLockRelease(LogicalRepWorkerLock);
     938             : 
     939           0 :                 if (w == NULL)
     940             :                 {
     941           0 :                     last_start_time = now;
     942           0 :                     wait_time = wal_retrieve_retry_interval;
     943             : 
     944           0 :                     logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
     945             :                                              sub->owner, InvalidOid);
     946             :                 }
     947             :             }
     948             : 
     949             :             /* Switch back to original memory context. */
     950         160 :             MemoryContextSwitchTo(oldctx);
     951             :             /* Clean the temporary memory. */
     952         160 :             MemoryContextDelete(subctx);
     953             :         }
     954             :         else
     955             :         {
     956             :             /*
     957             :              * The wait in previous cycle was interrupted in less than
     958             :              * wal_retrieve_retry_interval since last worker was started, this
     959             :              * usually means crash of the worker, so we should retry in
     960             :              * wal_retrieve_retry_interval again.
     961             :              */
     962           0 :             wait_time = wal_retrieve_retry_interval;
     963             :         }
     964             : 
     965             :         /* Wait for more work. */
     966         160 :         rc = WaitLatch(MyLatch,
     967             :                        WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
     968             :                        wait_time,
     969             :                        WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
     970             : 
     971             :         /* emergency bailout if postmaster has died */
     972         160 :         if (rc & WL_POSTMASTER_DEATH)
     973           0 :             proc_exit(1);
     974             : 
     975         160 :         if (rc & WL_LATCH_SET)
     976             :         {
     977         160 :             ResetLatch(MyLatch);
     978         160 :             CHECK_FOR_INTERRUPTS();
     979             :         }
     980             : 
     981         158 :         if (got_SIGHUP)
     982             :         {
     983           0 :             got_SIGHUP = false;
     984           0 :             ProcessConfigFile(PGC_SIGHUP);
     985             :         }
     986         158 :     }
     987             : 
     988             :     /* Not reachable */
     989             : }
     990             : 
     991             : /*
     992             :  * Is current process the logical replication launcher?
     993             :  */
     994             : bool
     995           2 : IsLogicalLauncher(void)
     996             : {
     997           2 :     return LogicalRepCtx->launcher_pid == MyProcPid;
     998             : }
     999             : 
    1000             : /*
    1001             :  * Returns state of the subscriptions.
    1002             :  */
    1003             : Datum
    1004           0 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
    1005             : {
    1006             : #define PG_STAT_GET_SUBSCRIPTION_COLS   8
    1007           0 :     Oid         subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
    1008             :     int         i;
    1009           0 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1010             :     TupleDesc   tupdesc;
    1011             :     Tuplestorestate *tupstore;
    1012             :     MemoryContext per_query_ctx;
    1013             :     MemoryContext oldcontext;
    1014             : 
    1015             :     /* check to see if caller supports us returning a tuplestore */
    1016           0 :     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
    1017           0 :         ereport(ERROR,
    1018             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1019             :                  errmsg("set-valued function called in context that cannot accept a set")));
    1020           0 :     if (!(rsinfo->allowedModes & SFRM_Materialize))
    1021           0 :         ereport(ERROR,
    1022             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1023             :                  errmsg("materialize mode required, but it is not " \
    1024             :                         "allowed in this context")));
    1025             : 
    1026             :     /* Build a tuple descriptor for our result type */
    1027           0 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
    1028           0 :         elog(ERROR, "return type must be a row type");
    1029             : 
    1030           0 :     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
    1031           0 :     oldcontext = MemoryContextSwitchTo(per_query_ctx);
    1032             : 
    1033           0 :     tupstore = tuplestore_begin_heap(true, false, work_mem);
    1034           0 :     rsinfo->returnMode = SFRM_Materialize;
    1035           0 :     rsinfo->setResult = tupstore;
    1036           0 :     rsinfo->setDesc = tupdesc;
    1037             : 
    1038           0 :     MemoryContextSwitchTo(oldcontext);
    1039             : 
    1040             :     /* Make sure we get consistent view of the workers. */
    1041           0 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1042             : 
    1043           0 :     for (i = 0; i <= max_logical_replication_workers; i++)
    1044             :     {
    1045             :         /* for each row */
    1046             :         Datum       values[PG_STAT_GET_SUBSCRIPTION_COLS];
    1047             :         bool        nulls[PG_STAT_GET_SUBSCRIPTION_COLS];
    1048             :         int         worker_pid;
    1049             :         LogicalRepWorker worker;
    1050             : 
    1051           0 :         memcpy(&worker, &LogicalRepCtx->workers[i],
    1052             :                sizeof(LogicalRepWorker));
    1053           0 :         if (!worker.proc || !IsBackendPid(worker.proc->pid))
    1054           0 :             continue;
    1055             : 
    1056           0 :         if (OidIsValid(subid) && worker.subid != subid)
    1057           0 :             continue;
    1058             : 
    1059           0 :         worker_pid = worker.proc->pid;
    1060             : 
    1061           0 :         MemSet(values, 0, sizeof(values));
    1062           0 :         MemSet(nulls, 0, sizeof(nulls));
    1063             : 
    1064           0 :         values[0] = ObjectIdGetDatum(worker.subid);
    1065           0 :         if (OidIsValid(worker.relid))
    1066           0 :             values[1] = ObjectIdGetDatum(worker.relid);
    1067             :         else
    1068           0 :             nulls[1] = true;
    1069           0 :         values[2] = Int32GetDatum(worker_pid);
    1070           0 :         if (XLogRecPtrIsInvalid(worker.last_lsn))
    1071           0 :             nulls[3] = true;
    1072             :         else
    1073           0 :             values[3] = LSNGetDatum(worker.last_lsn);
    1074           0 :         if (worker.last_send_time == 0)
    1075           0 :             nulls[4] = true;
    1076             :         else
    1077           0 :             values[4] = TimestampTzGetDatum(worker.last_send_time);
    1078           0 :         if (worker.last_recv_time == 0)
    1079           0 :             nulls[5] = true;
    1080             :         else
    1081           0 :             values[5] = TimestampTzGetDatum(worker.last_recv_time);
    1082           0 :         if (XLogRecPtrIsInvalid(worker.reply_lsn))
    1083           0 :             nulls[6] = true;
    1084             :         else
    1085           0 :             values[6] = LSNGetDatum(worker.reply_lsn);
    1086           0 :         if (worker.reply_time == 0)
    1087           0 :             nulls[7] = true;
    1088             :         else
    1089           0 :             values[7] = TimestampTzGetDatum(worker.reply_time);
    1090             : 
    1091           0 :         tuplestore_putvalues(tupstore, tupdesc, values, nulls);
    1092             : 
    1093             :         /*
    1094             :          * If only a single subscription was requested, and we found it,
    1095             :          * break.
    1096             :          */
    1097           0 :         if (OidIsValid(subid))
    1098           0 :             break;
    1099             :     }
    1100             : 
    1101           0 :     LWLockRelease(LogicalRepWorkerLock);
    1102             : 
    1103             :     /* clean up and return the tuplestore */
    1104             :     tuplestore_donestoring(tupstore);
    1105             : 
    1106           0 :     return (Datum) 0;
    1107             : }

Generated by: LCOV version 1.11