LCOV - code coverage report
Current view: top level - src/backend/replication/logical - tablesync.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 0 313 0.0 %
Date: 2017-09-29 13:40:31 Functions: 0 12 0.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * tablesync.c
       3             :  *    PostgreSQL logical replication
       4             :  *
       5             :  * Copyright (c) 2012-2017, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/tablesync.c
       9             :  *
      10             :  * NOTES
      11             :  *    This file contains code for initial table data synchronization for
      12             :  *    logical replication.
      13             :  *
      14             :  *    The initial data synchronization is done separately for each table,
      15             :  *    in a separate apply worker that only fetches the initial snapshot data
      16             :  *    from the publisher and then synchronizes the position in the stream with
      17             :  *    the main apply worker.
      18             :  *
      19             :  *    There are several reasons for doing the synchronization this way:
      20             :  *     - It allows us to parallelize the initial data synchronization
      21             :  *       which lowers the time needed for it to happen.
      22             :  *     - The initial synchronization does not have to hold the xid and LSN
      23             :  *       for the time it takes to copy data of all tables, causing less
      24             :  *       bloat and lower disk consumption compared to doing the
      25             :  *       synchronization in a single process for the whole database.
      26             :  *     - It allows us to synchronize any tables added after the initial
      27             :  *       synchronization has finished.
      28             :  *
      29             :  *    The stream position synchronization works in multiple steps.
      30             :  *     - Sync finishes copy and sets worker state as SYNCWAIT and waits for
      31             :  *       state to change in a loop.
      32             :  *     - Apply periodically checks tables that are synchronizing for SYNCWAIT.
      33             :  *       When the desired state appears, it will set the worker state to
      34             :  *       CATCHUP and starts loop-waiting until either the table state is set
      35             :  *       to SYNCDONE or the sync worker exits.
      36             :  *     - After the sync worker has seen the state change to CATCHUP, it will
      37             :  *       read the stream and apply changes (acting like an apply worker) until
      38             :  *       it catches up to the specified stream position.  Then it sets the
      39             :  *       state to SYNCDONE.  There might be zero changes applied between
      40             :  *       CATCHUP and SYNCDONE, because the sync worker might be ahead of the
      41             :  *       apply worker.
      42             :  *     - Once the state was set to SYNCDONE, the apply will continue tracking
      43             :  *       the table until it reaches the SYNCDONE stream position, at which
      44             :  *       point it sets state to READY and stops tracking.  Again, there might
      45             :  *       be zero changes in between.
      46             :  *
      47             :  *    So the state progression is always: INIT -> DATASYNC -> SYNCWAIT -> CATCHUP ->
      48             :  *    SYNCDONE -> READY.
      49             :  *
      50             :  *    The catalog pg_subscription_rel is used to keep information about
      51             :  *    subscribed tables and their state.  Some transient state during data
      52             :  *    synchronization is kept in shared memory.  The states SYNCWAIT and
      53             :  *    CATCHUP only appear in memory.
      54             :  *
      55             :  *    Example flows look like this:
      56             :  *     - Apply is in front:
      57             :  *        sync:8
      58             :  *          -> set in memory SYNCWAIT
      59             :  *        apply:10
      60             :  *          -> set in memory CATCHUP
      61             :  *          -> enter wait-loop
      62             :  *        sync:10
      63             :  *          -> set in catalog SYNCDONE
      64             :  *          -> exit
      65             :  *        apply:10
      66             :  *          -> exit wait-loop
      67             :  *          -> continue rep
      68             :  *        apply:11
      69             :  *          -> set in catalog READY
      70             :  *     - Sync in front:
      71             :  *        sync:10
      72             :  *          -> set in memory SYNCWAIT
      73             :  *        apply:8
      74             :  *          -> set in memory CATCHUP
      75             :  *          -> continue per-table filtering
      76             :  *        sync:10
      77             :  *          -> set in catalog SYNCDONE
      78             :  *          -> exit
      79             :  *        apply:10
      80             :  *          -> set in catalog READY
      81             :  *          -> stop per-table filtering
      82             :  *          -> continue rep
      83             :  *-------------------------------------------------------------------------
      84             :  */
      85             : 
      86             : #include "postgres.h"
      87             : 
      88             : #include "miscadmin.h"
      89             : #include "pgstat.h"
      90             : 
      91             : #include "access/xact.h"
      92             : 
      93             : #include "catalog/pg_subscription_rel.h"
      94             : #include "catalog/pg_type.h"
      95             : 
      96             : #include "commands/copy.h"
      97             : 
      98             : #include "parser/parse_relation.h"
      99             : 
     100             : #include "replication/logicallauncher.h"
     101             : #include "replication/logicalrelation.h"
     102             : #include "replication/walreceiver.h"
     103             : #include "replication/worker_internal.h"
     104             : 
     105             : #include "utils/snapmgr.h"
     106             : #include "storage/ipc.h"
     107             : 
     108             : #include "utils/builtins.h"
     109             : #include "utils/lsyscache.h"
     110             : #include "utils/memutils.h"
     111             : 
     112             : static bool table_states_valid = false;
     113             : 
     114             : StringInfo  copybuf = NULL;
     115             : 
     116             : /*
     117             :  * Exit routine for synchronization worker.
     118             :  */
     119             : static void
     120             : pg_attribute_noreturn()
     121           0 : finish_sync_worker(void)
     122             : {
     123             :     /*
     124             :      * Commit any outstanding transaction. This is the usual case, unless
     125             :      * there was nothing to do for the table.
     126             :      */
     127           0 :     if (IsTransactionState())
     128             :     {
     129           0 :         CommitTransactionCommand();
     130           0 :         pgstat_report_stat(false);
     131             :     }
     132             : 
     133             :     /* And flush all writes. */
     134           0 :     XLogFlush(GetXLogWriteRecPtr());
     135             : 
     136           0 :     StartTransactionCommand();
     137           0 :     ereport(LOG,
     138             :             (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
     139             :                     MySubscription->name,
     140             :                     get_rel_name(MyLogicalRepWorker->relid))));
     141           0 :     CommitTransactionCommand();
     142             : 
     143             :     /* Find the main apply worker and signal it. */
     144           0 :     logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
     145             : 
     146             :     /* Stop gracefully */
     147           0 :     proc_exit(0);
     148             : }
     149             : 
     150             : /*
     151             :  * Wait until the relation synchronization state is set in the catalog to the
     152             :  * expected one.
     153             :  *
     154             :  * Used when transitioning from CATCHUP state to SYNCDONE.
     155             :  *
     156             :  * Returns false if the synchronization worker has disappeared or the table state
     157             :  * has been reset.
     158             :  */
     159             : static bool
     160           0 : wait_for_relation_state_change(Oid relid, char expected_state)
     161             : {
     162             :     int         rc;
     163             :     char        state;
     164             : 
     165             :     for (;;)
     166             :     {
     167             :         LogicalRepWorker *worker;
     168             :         XLogRecPtr  statelsn;
     169             : 
     170           0 :         CHECK_FOR_INTERRUPTS();
     171             : 
     172             :         /* XXX use cache invalidation here to improve performance? */
     173           0 :         PushActiveSnapshot(GetLatestSnapshot());
     174           0 :         state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
     175             :                                         relid, &statelsn, true);
     176           0 :         PopActiveSnapshot();
     177             : 
     178           0 :         if (state == SUBREL_STATE_UNKNOWN)
     179           0 :             return false;
     180             : 
     181           0 :         if (state == expected_state)
     182           0 :             return true;
     183             : 
     184             :         /* Check if the sync worker is still running and bail if not. */
     185           0 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     186             : 
     187             :         /* Check if the opposite worker is still running and bail if not. */
     188           0 :         worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
     189           0 :                                         am_tablesync_worker() ? InvalidOid : relid,
     190             :                                         false);
     191           0 :         LWLockRelease(LogicalRepWorkerLock);
     192           0 :         if (!worker)
     193           0 :             return false;
     194             : 
     195           0 :         rc = WaitLatch(MyLatch,
     196             :                        WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
     197             :                        1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
     198             : 
     199             :         /* emergency bailout if postmaster has died */
     200           0 :         if (rc & WL_POSTMASTER_DEATH)
     201           0 :             proc_exit(1);
     202             : 
     203           0 :         ResetLatch(MyLatch);
     204           0 :     }
     205             : 
     206             :     return false;
     207             : }
     208             : 
     209             : /*
     210             :  * Wait until the apply worker changes the state of our synchronization
     211             :  * worker to the expected one.
     212             :  *
     213             :  * Used when transitioning from SYNCWAIT state to CATCHUP.
     214             :  *
     215             :  * Returns false if the apply worker has disappeared.
     216             :  */
     217             : static bool
     218           0 : wait_for_worker_state_change(char expected_state)
     219             : {
     220             :     int         rc;
     221             : 
     222             :     for (;;)
     223             :     {
     224             :         LogicalRepWorker *worker;
     225             : 
     226           0 :         CHECK_FOR_INTERRUPTS();
     227             : 
     228             :         /*
     229             :          * Done if already in correct state.  (We assume this fetch is atomic
     230             :          * enough to not give a misleading answer if we do it with no lock.)
     231             :          */
     232           0 :         if (MyLogicalRepWorker->relstate == expected_state)
     233           0 :             return true;
     234             : 
     235             :         /*
     236             :          * Bail out if the apply worker has died, else signal it we're
     237             :          * waiting.
     238             :          */
     239           0 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     240           0 :         worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
     241             :                                         InvalidOid, false);
     242           0 :         if (worker && worker->proc)
     243           0 :             logicalrep_worker_wakeup_ptr(worker);
     244           0 :         LWLockRelease(LogicalRepWorkerLock);
     245           0 :         if (!worker)
     246           0 :             break;
     247             : 
     248             :         /*
     249             :          * Wait.  We expect to get a latch signal back from the apply worker,
     250             :          * but use a timeout in case it dies without sending one.
     251             :          */
     252           0 :         rc = WaitLatch(MyLatch,
     253             :                        WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
     254             :                        1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
     255             : 
     256             :         /* emergency bailout if postmaster has died */
     257           0 :         if (rc & WL_POSTMASTER_DEATH)
     258           0 :             proc_exit(1);
     259             : 
     260           0 :         if (rc & WL_LATCH_SET)
     261           0 :             ResetLatch(MyLatch);
     262           0 :     }
     263             : 
     264           0 :     return false;
     265             : }
     266             : 
     267             : /*
     268             :  * Callback from syscache invalidation.
     269             :  */
     270             : void
     271           0 : invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
     272             : {
     273           0 :     table_states_valid = false;
     274           0 : }
     275             : 
     276             : /*
     277             :  * Handle table synchronization cooperation from the synchronization
     278             :  * worker.
     279             :  *
     280             :  * If the sync worker is in CATCHUP state and reached (or passed) the
     281             :  * predetermined synchronization point in the WAL stream, mark the table as
     282             :  * SYNCDONE and finish.
     283             :  */
     284             : static void
     285           0 : process_syncing_tables_for_sync(XLogRecPtr current_lsn)
     286             : {
     287           0 :     Assert(IsTransactionState());
     288             : 
     289           0 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
     290             : 
     291           0 :     if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
     292           0 :         current_lsn >= MyLogicalRepWorker->relstate_lsn)
     293             :     {
     294             :         TimeLineID  tli;
     295             : 
     296           0 :         MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
     297           0 :         MyLogicalRepWorker->relstate_lsn = current_lsn;
     298             : 
     299           0 :         SpinLockRelease(&MyLogicalRepWorker->relmutex);
     300             : 
     301           0 :         SetSubscriptionRelState(MyLogicalRepWorker->subid,
     302           0 :                                 MyLogicalRepWorker->relid,
     303           0 :                                 MyLogicalRepWorker->relstate,
     304           0 :                                 MyLogicalRepWorker->relstate_lsn,
     305             :                                 true);
     306             : 
     307           0 :         walrcv_endstreaming(wrconn, &tli);
     308           0 :         finish_sync_worker();
     309             :     }
     310             :     else
     311           0 :         SpinLockRelease(&MyLogicalRepWorker->relmutex);
     312           0 : }
     313             : 
     314             : /*
     315             :  * Handle table synchronization cooperation from the apply worker.
     316             :  *
     317             :  * Walk over all subscription tables that are individually tracked by the
     318             :  * apply process (currently, all that have state other than
     319             :  * SUBREL_STATE_READY) and manage synchronization for them.
     320             :  *
     321             :  * If there are tables that need synchronizing and are not being synchronized
     322             :  * yet, start sync workers for them (if there are free slots for sync
     323             :  * workers).  To prevent starting the sync worker for the same relation at a
     324             :  * high frequency after a failure, we store its last start time with each sync
     325             :  * state info.  We start the sync worker for the same relation after waiting
     326             :  * at least wal_retrieve_retry_interval.
     327             :  *
     328             :  * For tables that are being synchronized already, check if sync workers
     329             :  * either need action from the apply worker or have finished.  This is the
     330             :  * SYNCWAIT to CATCHUP transition.
     331             :  *
     332             :  * If the synchronization position is reached (SYNCDONE), then the table can
     333             :  * be marked as READY and is no longer tracked.
     334             :  */
     335             : static void
     336           0 : process_syncing_tables_for_apply(XLogRecPtr current_lsn)
     337             : {
     338             :     struct tablesync_start_time_mapping
     339             :     {
     340             :         Oid         relid;
     341             :         TimestampTz last_start_time;
     342             :     };
     343             :     static List *table_states = NIL;
     344             :     static HTAB *last_start_times = NULL;
     345             :     ListCell   *lc;
     346           0 :     bool        started_tx = false;
     347             : 
     348           0 :     Assert(!IsTransactionState());
     349             : 
     350             :     /* We need up-to-date sync state info for subscription tables here. */
     351           0 :     if (!table_states_valid)
     352             :     {
     353             :         MemoryContext oldctx;
     354             :         List       *rstates;
     355             :         ListCell   *lc;
     356             :         SubscriptionRelState *rstate;
     357             : 
     358             :         /* Clean the old list. */
     359           0 :         list_free_deep(table_states);
     360           0 :         table_states = NIL;
     361             : 
     362           0 :         StartTransactionCommand();
     363           0 :         started_tx = true;
     364             : 
     365             :         /* Fetch all non-ready tables. */
     366           0 :         rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
     367             : 
     368             :         /* Allocate the tracking info in a permanent memory context. */
     369           0 :         oldctx = MemoryContextSwitchTo(CacheMemoryContext);
     370           0 :         foreach(lc, rstates)
     371             :         {
     372           0 :             rstate = palloc(sizeof(SubscriptionRelState));
     373           0 :             memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
     374           0 :             table_states = lappend(table_states, rstate);
     375             :         }
     376           0 :         MemoryContextSwitchTo(oldctx);
     377             : 
     378           0 :         table_states_valid = true;
     379             :     }
     380             : 
     381             :     /*
     382             :      * Prepare a hash table for tracking last start times of workers, to avoid
     383             :      * immediate restarts.  We don't need it if there are no tables that need
     384             :      * syncing.
     385             :      */
     386           0 :     if (table_states && !last_start_times)
     387           0 :     {
     388             :         HASHCTL     ctl;
     389             : 
     390           0 :         memset(&ctl, 0, sizeof(ctl));
     391           0 :         ctl.keysize = sizeof(Oid);
     392           0 :         ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
     393           0 :         last_start_times = hash_create("Logical replication table sync worker start times",
     394             :                                        256, &ctl, HASH_ELEM | HASH_BLOBS);
     395             :     }
     396             : 
     397             :     /*
     398             :      * Clean up the hash table when we're done with all tables (just to
     399             :      * release the bit of memory).
     400             :      */
     401           0 :     else if (!table_states && last_start_times)
     402             :     {
     403           0 :         hash_destroy(last_start_times);
     404           0 :         last_start_times = NULL;
     405             :     }
     406             : 
     407             :     /*
     408             :      * Process all tables that are being synchronized.
     409             :      */
     410           0 :     foreach(lc, table_states)
     411             :     {
     412           0 :         SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
     413             : 
     414           0 :         if (rstate->state == SUBREL_STATE_SYNCDONE)
     415             :         {
     416             :             /*
     417             :              * Apply has caught up to the position where the table sync has
     418             :              * finished.  Mark the table as ready so that the apply will just
     419             :              * continue to replicate it normally.
     420             :              */
     421           0 :             if (current_lsn >= rstate->lsn)
     422             :             {
     423           0 :                 rstate->state = SUBREL_STATE_READY;
     424           0 :                 rstate->lsn = current_lsn;
     425           0 :                 if (!started_tx)
     426             :                 {
     427           0 :                     StartTransactionCommand();
     428           0 :                     started_tx = true;
     429             :                 }
     430           0 :                 SetSubscriptionRelState(MyLogicalRepWorker->subid,
     431           0 :                                         rstate->relid, rstate->state,
     432             :                                         rstate->lsn, true);
     433             :             }
     434             :         }
     435             :         else
     436             :         {
     437             :             LogicalRepWorker *syncworker;
     438             : 
     439             :             /*
     440             :              * Look for a sync worker for this relation.
     441             :              */
     442           0 :             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     443             : 
     444           0 :             syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
     445             :                                                 rstate->relid, false);
     446             : 
     447           0 :             if (syncworker)
     448             :             {
     449             :                 /* Found one, update our copy of its state */
     450           0 :                 SpinLockAcquire(&syncworker->relmutex);
     451           0 :                 rstate->state = syncworker->relstate;
     452           0 :                 rstate->lsn = syncworker->relstate_lsn;
     453           0 :                 if (rstate->state == SUBREL_STATE_SYNCWAIT)
     454             :                 {
     455             :                     /*
     456             :                      * Sync worker is waiting for apply.  Tell sync worker it
     457             :                      * can catchup now.
     458             :                      */
     459           0 :                     syncworker->relstate = SUBREL_STATE_CATCHUP;
     460           0 :                     syncworker->relstate_lsn =
     461           0 :                         Max(syncworker->relstate_lsn, current_lsn);
     462             :                 }
     463           0 :                 SpinLockRelease(&syncworker->relmutex);
     464             : 
     465             :                 /* If we told worker to catch up, wait for it. */
     466           0 :                 if (rstate->state == SUBREL_STATE_SYNCWAIT)
     467             :                 {
     468             :                     /* Signal the sync worker, as it may be waiting for us. */
     469           0 :                     if (syncworker->proc)
     470           0 :                         logicalrep_worker_wakeup_ptr(syncworker);
     471             : 
     472             :                     /* Now safe to release the LWLock */
     473           0 :                     LWLockRelease(LogicalRepWorkerLock);
     474             : 
     475             :                     /*
     476             :                      * Enter busy loop and wait for synchronization worker to
     477             :                      * reach expected state (or die trying).
     478             :                      */
     479           0 :                     if (!started_tx)
     480             :                     {
     481           0 :                         StartTransactionCommand();
     482           0 :                         started_tx = true;
     483             :                     }
     484             : 
     485           0 :                     wait_for_relation_state_change(rstate->relid,
     486             :                                                    SUBREL_STATE_SYNCDONE);
     487             :                 }
     488             :                 else
     489           0 :                     LWLockRelease(LogicalRepWorkerLock);
     490             :             }
     491             :             else
     492             :             {
     493             :                 /*
     494             :                  * If there is no sync worker for this table yet, count
     495             :                  * running sync workers for this subscription, while we have
     496             :                  * the lock.
     497             :                  */
     498           0 :                 int         nsyncworkers =
     499           0 :                 logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
     500             : 
     501             :                 /* Now safe to release the LWLock */
     502           0 :                 LWLockRelease(LogicalRepWorkerLock);
     503             : 
     504             :                 /*
     505             :                  * If there are free sync worker slot(s), start a new sync
     506             :                  * worker for the table.
     507             :                  */
     508           0 :                 if (nsyncworkers < max_sync_workers_per_subscription)
     509             :                 {
     510           0 :                     TimestampTz now = GetCurrentTimestamp();
     511             :                     struct tablesync_start_time_mapping *hentry;
     512             :                     bool        found;
     513             : 
     514           0 :                     hentry = hash_search(last_start_times, &rstate->relid,
     515             :                                          HASH_ENTER, &found);
     516             : 
     517           0 :                     if (!found ||
     518           0 :                         TimestampDifferenceExceeds(hentry->last_start_time, now,
     519             :                                                    wal_retrieve_retry_interval))
     520             :                     {
     521           0 :                         logicalrep_worker_launch(MyLogicalRepWorker->dbid,
     522           0 :                                                  MySubscription->oid,
     523           0 :                                                  MySubscription->name,
     524           0 :                                                  MyLogicalRepWorker->userid,
     525             :                                                  rstate->relid);
     526           0 :                         hentry->last_start_time = now;
     527             :                     }
     528             :                 }
     529             :             }
     530             :         }
     531             :     }
     532             : 
     533           0 :     if (started_tx)
     534             :     {
     535           0 :         CommitTransactionCommand();
     536           0 :         pgstat_report_stat(false);
     537             :     }
     538           0 : }
     539             : 
     540             : /*
     541             :  * Process possible state change(s) of tables that are being synchronized.
     542             :  */
     543             : void
     544           0 : process_syncing_tables(XLogRecPtr current_lsn)
     545             : {
     546           0 :     if (am_tablesync_worker())
     547           0 :         process_syncing_tables_for_sync(current_lsn);
     548             :     else
     549           0 :         process_syncing_tables_for_apply(current_lsn);
     550           0 : }
     551             : 
     552             : /*
     553             :  * Create list of columns for COPY based on logical relation mapping.
     554             :  */
     555             : static List *
     556           0 : make_copy_attnamelist(LogicalRepRelMapEntry *rel)
     557             : {
     558           0 :     List       *attnamelist = NIL;
     559             :     int         i;
     560             : 
     561           0 :     for (i = 0; i < rel->remoterel.natts; i++)
     562             :     {
     563           0 :         attnamelist = lappend(attnamelist,
     564           0 :                               makeString(rel->remoterel.attnames[i]));
     565             :     }
     566             : 
     567             : 
     568           0 :     return attnamelist;
     569             : }
     570             : 
     571             : /*
     572             :  * Data source callback for the COPY FROM, which reads from the remote
     573             :  * connection and passes the data back to our local COPY.
     574             :  */
     575             : static int
     576           0 : copy_read_data(void *outbuf, int minread, int maxread)
     577             : {
     578           0 :     int         bytesread = 0;
     579             :     int         avail;
     580             : 
     581             :     /* If there are some leftover data from previous read, use it. */
     582           0 :     avail = copybuf->len - copybuf->cursor;
     583           0 :     if (avail)
     584             :     {
     585           0 :         if (avail > maxread)
     586           0 :             avail = maxread;
     587           0 :         memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
     588           0 :         copybuf->cursor += avail;
     589           0 :         maxread -= avail;
     590           0 :         bytesread += avail;
     591             :     }
     592             : 
     593           0 :     while (maxread > 0 && bytesread < minread)
     594             :     {
     595           0 :         pgsocket    fd = PGINVALID_SOCKET;
     596             :         int         rc;
     597             :         int         len;
     598           0 :         char       *buf = NULL;
     599             : 
     600             :         for (;;)
     601             :         {
     602             :             /* Try read the data. */
     603           0 :             len = walrcv_receive(wrconn, &buf, &fd);
     604             : 
     605           0 :             CHECK_FOR_INTERRUPTS();
     606             : 
     607           0 :             if (len == 0)
     608           0 :                 break;
     609           0 :             else if (len < 0)
     610           0 :                 return bytesread;
     611             :             else
     612             :             {
     613             :                 /* Process the data */
     614           0 :                 copybuf->data = buf;
     615           0 :                 copybuf->len = len;
     616           0 :                 copybuf->cursor = 0;
     617             : 
     618           0 :                 avail = copybuf->len - copybuf->cursor;
     619           0 :                 if (avail > maxread)
     620           0 :                     avail = maxread;
     621           0 :                 memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
     622           0 :                 outbuf = (void *) ((char *) outbuf + avail);
     623           0 :                 copybuf->cursor += avail;
     624           0 :                 maxread -= avail;
     625           0 :                 bytesread += avail;
     626             :             }
     627             : 
     628           0 :             if (maxread <= 0 || bytesread >= minread)
     629           0 :                 return bytesread;
     630           0 :         }
     631             : 
     632             :         /*
     633             :          * Wait for more data or latch.
     634             :          */
     635           0 :         rc = WaitLatchOrSocket(MyLatch,
     636             :                                WL_SOCKET_READABLE | WL_LATCH_SET |
     637             :                                WL_TIMEOUT | WL_POSTMASTER_DEATH,
     638             :                                fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
     639             : 
     640             :         /* Emergency bailout if postmaster has died */
     641           0 :         if (rc & WL_POSTMASTER_DEATH)
     642           0 :             proc_exit(1);
     643             : 
     644           0 :         ResetLatch(MyLatch);
     645             :     }
     646             : 
     647           0 :     return bytesread;
     648             : }
     649             : 
     650             : 
     651             : /*
     652             :  * Get information about remote relation in similar fashion the RELATION
     653             :  * message provides during replication.
     654             :  */
     655             : static void
     656           0 : fetch_remote_table_info(char *nspname, char *relname,
     657             :                         LogicalRepRelation *lrel)
     658             : {
     659             :     WalRcvExecResult *res;
     660             :     StringInfoData cmd;
     661             :     TupleTableSlot *slot;
     662           0 :     Oid         tableRow[2] = {OIDOID, CHAROID};
     663           0 :     Oid         attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID};
     664             :     bool        isnull;
     665             :     int         natt;
     666             : 
     667           0 :     lrel->nspname = nspname;
     668           0 :     lrel->relname = relname;
     669             : 
     670             :     /* First fetch Oid and replica identity. */
     671           0 :     initStringInfo(&cmd);
     672           0 :     appendStringInfo(&cmd, "SELECT c.oid, c.relreplident"
     673             :                      "  FROM pg_catalog.pg_class c"
     674             :                      "  INNER JOIN pg_catalog.pg_namespace n"
     675             :                      "        ON (c.relnamespace = n.oid)"
     676             :                      " WHERE n.nspname = %s"
     677             :                      "   AND c.relname = %s"
     678             :                      "   AND c.relkind = 'r'",
     679             :                      quote_literal_cstr(nspname),
     680             :                      quote_literal_cstr(relname));
     681           0 :     res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
     682             : 
     683           0 :     if (res->status != WALRCV_OK_TUPLES)
     684           0 :         ereport(ERROR,
     685             :                 (errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
     686             :                         nspname, relname, res->err)));
     687             : 
     688           0 :     slot = MakeSingleTupleTableSlot(res->tupledesc);
     689           0 :     if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
     690           0 :         ereport(ERROR,
     691             :                 (errmsg("table \"%s.%s\" not found on publisher",
     692             :                         nspname, relname)));
     693             : 
     694           0 :     lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
     695           0 :     Assert(!isnull);
     696           0 :     lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
     697           0 :     Assert(!isnull);
     698             : 
     699           0 :     ExecDropSingleTupleTableSlot(slot);
     700           0 :     walrcv_clear_result(res);
     701             : 
     702             :     /* Now fetch columns. */
     703           0 :     resetStringInfo(&cmd);
     704           0 :     appendStringInfo(&cmd,
     705             :                      "SELECT a.attname,"
     706             :                      "       a.atttypid,"
     707             :                      "       a.atttypmod,"
     708             :                      "       a.attnum = ANY(i.indkey)"
     709             :                      "  FROM pg_catalog.pg_attribute a"
     710             :                      "  LEFT JOIN pg_catalog.pg_index i"
     711             :                      "       ON (i.indexrelid = pg_get_replica_identity_index(%u))"
     712             :                      " WHERE a.attnum > 0::pg_catalog.int2"
     713             :                      "   AND NOT a.attisdropped"
     714             :                      "   AND a.attrelid = %u"
     715             :                      " ORDER BY a.attnum",
     716             :                      lrel->remoteid, lrel->remoteid);
     717           0 :     res = walrcv_exec(wrconn, cmd.data, 4, attrRow);
     718             : 
     719           0 :     if (res->status != WALRCV_OK_TUPLES)
     720           0 :         ereport(ERROR,
     721             :                 (errmsg("could not fetch table info for table \"%s.%s\": %s",
     722             :                         nspname, relname, res->err)));
     723             : 
     724             :     /* We don't know the number of rows coming, so allocate enough space. */
     725           0 :     lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
     726           0 :     lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
     727           0 :     lrel->attkeys = NULL;
     728             : 
     729           0 :     natt = 0;
     730           0 :     slot = MakeSingleTupleTableSlot(res->tupledesc);
     731           0 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
     732             :     {
     733           0 :         lrel->attnames[natt] =
     734           0 :             TextDatumGetCString(slot_getattr(slot, 1, &isnull));
     735           0 :         Assert(!isnull);
     736           0 :         lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
     737           0 :         Assert(!isnull);
     738           0 :         if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
     739           0 :             lrel->attkeys = bms_add_member(lrel->attkeys, natt);
     740             : 
     741             :         /* Should never happen. */
     742           0 :         if (++natt >= MaxTupleAttributeNumber)
     743           0 :             elog(ERROR, "too many columns in remote table \"%s.%s\"",
     744             :                  nspname, relname);
     745             : 
     746           0 :         ExecClearTuple(slot);
     747             :     }
     748           0 :     ExecDropSingleTupleTableSlot(slot);
     749             : 
     750           0 :     lrel->natts = natt;
     751             : 
     752           0 :     walrcv_clear_result(res);
     753           0 :     pfree(cmd.data);
     754           0 : }
     755             : 
     756             : /*
     757             :  * Copy existing data of a table from publisher.
     758             :  *
     759             :  * Caller is responsible for locking the local relation.
     760             :  */
     761             : static void
     762           0 : copy_table(Relation rel)
     763             : {
     764             :     LogicalRepRelMapEntry *relmapentry;
     765             :     LogicalRepRelation lrel;
     766             :     WalRcvExecResult *res;
     767             :     StringInfoData cmd;
     768             :     CopyState   cstate;
     769             :     List       *attnamelist;
     770             :     ParseState *pstate;
     771             : 
     772             :     /* Get the publisher relation info. */
     773           0 :     fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
     774           0 :                             RelationGetRelationName(rel), &lrel);
     775             : 
     776             :     /* Put the relation into relmap. */
     777           0 :     logicalrep_relmap_update(&lrel);
     778             : 
     779             :     /* Map the publisher relation to local one. */
     780           0 :     relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
     781           0 :     Assert(rel == relmapentry->localrel);
     782             : 
     783             :     /* Start copy on the publisher. */
     784           0 :     initStringInfo(&cmd);
     785           0 :     appendStringInfo(&cmd, "COPY %s TO STDOUT",
     786           0 :                      quote_qualified_identifier(lrel.nspname, lrel.relname));
     787           0 :     res = walrcv_exec(wrconn, cmd.data, 0, NULL);
     788           0 :     pfree(cmd.data);
     789           0 :     if (res->status != WALRCV_OK_COPY_OUT)
     790           0 :         ereport(ERROR,
     791             :                 (errmsg("could not start initial contents copy for table \"%s.%s\": %s",
     792             :                         lrel.nspname, lrel.relname, res->err)));
     793           0 :     walrcv_clear_result(res);
     794             : 
     795           0 :     copybuf = makeStringInfo();
     796             : 
     797           0 :     pstate = make_parsestate(NULL);
     798           0 :     addRangeTableEntryForRelation(pstate, rel, NULL, false, false);
     799             : 
     800           0 :     attnamelist = make_copy_attnamelist(relmapentry);
     801           0 :     cstate = BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, attnamelist, NIL);
     802             : 
     803             :     /* Do the copy */
     804           0 :     (void) CopyFrom(cstate);
     805             : 
     806           0 :     logicalrep_rel_close(relmapentry, NoLock);
     807           0 : }
     808             : 
     809             : /*
     810             :  * Start syncing the table in the sync worker.
     811             :  *
     812             :  * The returned slot name is palloc'ed in current memory context.
     813             :  */
     814             : char *
     815           0 : LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
     816             : {
     817             :     char       *slotname;
     818             :     char       *err;
     819             :     char        relstate;
     820             :     XLogRecPtr  relstate_lsn;
     821             : 
     822             :     /* Check the state of the table synchronization. */
     823           0 :     StartTransactionCommand();
     824           0 :     relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
     825           0 :                                        MyLogicalRepWorker->relid,
     826             :                                        &relstate_lsn, true);
     827           0 :     CommitTransactionCommand();
     828             : 
     829           0 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
     830           0 :     MyLogicalRepWorker->relstate = relstate;
     831           0 :     MyLogicalRepWorker->relstate_lsn = relstate_lsn;
     832           0 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
     833             : 
     834             :     /*
     835             :      * To build a slot name for the sync work, we are limited to NAMEDATALEN -
     836             :      * 1 characters.  We cut the original slot name to NAMEDATALEN - 28 chars
     837             :      * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0').  (It's actually the
     838             :      * NAMEDATALEN on the remote that matters, but this scheme will also work
     839             :      * reasonably if that is different.)
     840             :      */
     841             :     StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small");  /* for sanity */
     842           0 :     slotname = psprintf("%.*s_%u_sync_%u",
     843             :                         NAMEDATALEN - 28,
     844           0 :                         MySubscription->slotname,
     845           0 :                         MySubscription->oid,
     846           0 :                         MyLogicalRepWorker->relid);
     847             : 
     848             :     /*
     849             :      * Here we use the slot name instead of the subscription name as the
     850             :      * application_name, so that it is different from the main apply worker,
     851             :      * so that synchronous replication can distinguish them.
     852             :      */
     853           0 :     wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
     854           0 :     if (wrconn == NULL)
     855           0 :         ereport(ERROR,
     856             :                 (errmsg("could not connect to the publisher: %s", err)));
     857             : 
     858           0 :     switch (MyLogicalRepWorker->relstate)
     859             :     {
     860             :         case SUBREL_STATE_INIT:
     861             :         case SUBREL_STATE_DATASYNC:
     862             :             {
     863             :                 Relation    rel;
     864             :                 WalRcvExecResult *res;
     865             : 
     866           0 :                 SpinLockAcquire(&MyLogicalRepWorker->relmutex);
     867           0 :                 MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
     868           0 :                 MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
     869           0 :                 SpinLockRelease(&MyLogicalRepWorker->relmutex);
     870             : 
     871             :                 /* Update the state and make it visible to others. */
     872           0 :                 StartTransactionCommand();
     873           0 :                 SetSubscriptionRelState(MyLogicalRepWorker->subid,
     874           0 :                                         MyLogicalRepWorker->relid,
     875           0 :                                         MyLogicalRepWorker->relstate,
     876           0 :                                         MyLogicalRepWorker->relstate_lsn,
     877             :                                         true);
     878           0 :                 CommitTransactionCommand();
     879           0 :                 pgstat_report_stat(false);
     880             : 
     881             :                 /*
     882             :                  * We want to do the table data sync in a single transaction.
     883             :                  */
     884           0 :                 StartTransactionCommand();
     885             : 
     886             :                 /*
     887             :                  * Use a standard write lock here. It might be better to
     888             :                  * disallow access to the table while it's being synchronized.
     889             :                  * But we don't want to block the main apply process from
     890             :                  * working and it has to open the relation in RowExclusiveLock
     891             :                  * when remapping remote relation id to local one.
     892             :                  */
     893           0 :                 rel = heap_open(MyLogicalRepWorker->relid, RowExclusiveLock);
     894             : 
     895             :                 /*
     896             :                  * Create a temporary slot for the sync process. We do this
     897             :                  * inside the transaction so that we can use the snapshot made
     898             :                  * by the slot to get existing data.
     899             :                  */
     900           0 :                 res = walrcv_exec(wrconn,
     901             :                                   "BEGIN READ ONLY ISOLATION LEVEL "
     902             :                                   "REPEATABLE READ", 0, NULL);
     903           0 :                 if (res->status != WALRCV_OK_COMMAND)
     904           0 :                     ereport(ERROR,
     905             :                             (errmsg("table copy could not start transaction on publisher"),
     906             :                              errdetail("The error was: %s", res->err)));
     907           0 :                 walrcv_clear_result(res);
     908             : 
     909             :                 /*
     910             :                  * Create new temporary logical decoding slot.
     911             :                  *
     912             :                  * We'll use slot for data copy so make sure the snapshot is
     913             :                  * used for the transaction; that way the COPY will get data
     914             :                  * that is consistent with the lsn used by the slot to start
     915             :                  * decoding.
     916             :                  */
     917           0 :                 walrcv_create_slot(wrconn, slotname, true,
     918             :                                    CRS_USE_SNAPSHOT, origin_startpos);
     919             : 
     920           0 :                 PushActiveSnapshot(GetTransactionSnapshot());
     921           0 :                 copy_table(rel);
     922           0 :                 PopActiveSnapshot();
     923             : 
     924           0 :                 res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
     925           0 :                 if (res->status != WALRCV_OK_COMMAND)
     926           0 :                     ereport(ERROR,
     927             :                             (errmsg("table copy could not finish transaction on publisher"),
     928             :                              errdetail("The error was: %s", res->err)));
     929           0 :                 walrcv_clear_result(res);
     930             : 
     931           0 :                 heap_close(rel, NoLock);
     932             : 
     933             :                 /* Make the copy visible. */
     934           0 :                 CommandCounterIncrement();
     935             : 
     936             :                 /*
     937             :                  * We are done with the initial data synchronization, update
     938             :                  * the state.
     939             :                  */
     940           0 :                 SpinLockAcquire(&MyLogicalRepWorker->relmutex);
     941           0 :                 MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
     942           0 :                 MyLogicalRepWorker->relstate_lsn = *origin_startpos;
     943           0 :                 SpinLockRelease(&MyLogicalRepWorker->relmutex);
     944             : 
     945             :                 /* Wait for main apply worker to tell us to catchup. */
     946           0 :                 wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
     947             : 
     948             :                 /*----------
     949             :                  * There are now two possible states here:
     950             :                  * a) Sync is behind the apply.  If that's the case we need to
     951             :                  *    catch up with it by consuming the logical replication
     952             :                  *    stream up to the relstate_lsn.  For that, we exit this
     953             :                  *    function and continue in ApplyWorkerMain().
     954             :                  * b) Sync is caught up with the apply.  So it can just set
     955             :                  *    the state to SYNCDONE and finish.
     956             :                  *----------
     957             :                  */
     958           0 :                 if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn)
     959             :                 {
     960             :                     /*
     961             :                      * Update the new state in catalog.  No need to bother
     962             :                      * with the shmem state as we are exiting for good.
     963             :                      */
     964           0 :                     SetSubscriptionRelState(MyLogicalRepWorker->subid,
     965           0 :                                             MyLogicalRepWorker->relid,
     966             :                                             SUBREL_STATE_SYNCDONE,
     967             :                                             *origin_startpos,
     968             :                                             true);
     969           0 :                     finish_sync_worker();
     970             :                 }
     971           0 :                 break;
     972             :             }
     973             :         case SUBREL_STATE_SYNCDONE:
     974             :         case SUBREL_STATE_READY:
     975             :         case SUBREL_STATE_UNKNOWN:
     976             : 
     977             :             /*
     978             :              * Nothing to do here but finish.  (UNKNOWN means the relation was
     979             :              * removed from pg_subscription_rel before the sync worker could
     980             :              * start.)
     981             :              */
     982           0 :             finish_sync_worker();
     983             :             break;
     984             :         default:
     985           0 :             elog(ERROR, "unknown relation state \"%c\"",
     986             :                  MyLogicalRepWorker->relstate);
     987             :     }
     988             : 
     989           0 :     return slotname;
     990             : }

Generated by: LCOV version 1.11