Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * tablesync.c
3 : * PostgreSQL logical replication
4 : *
5 : * Copyright (c) 2012-2017, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/tablesync.c
9 : *
10 : * NOTES
11 : * This file contains code for initial table data synchronization for
12 : * logical replication.
13 : *
14 : * The initial data synchronization is done separately for each table,
15 : * in a separate apply worker that only fetches the initial snapshot data
16 : * from the publisher and then synchronizes the position in the stream with
17 : * the main apply worker.
18 : *
19 : * There are several reasons for doing the synchronization this way:
20 : * - It allows us to parallelize the initial data synchronization
21 : * which lowers the time needed for it to happen.
22 : * - The initial synchronization does not have to hold the xid and LSN
23 : * for the time it takes to copy data of all tables, causing less
24 : * bloat and lower disk consumption compared to doing the
25 : * synchronization in a single process for the whole database.
26 : * - It allows us to synchronize any tables added after the initial
27 : * synchronization has finished.
28 : *
29 : * The stream position synchronization works in multiple steps.
30 : * - Sync finishes copy and sets worker state as SYNCWAIT and waits for
31 : * state to change in a loop.
32 : * - Apply periodically checks tables that are synchronizing for SYNCWAIT.
33 : * When the desired state appears, it will set the worker state to
34 : * CATCHUP and starts loop-waiting until either the table state is set
35 : * to SYNCDONE or the sync worker exits.
36 : * - After the sync worker has seen the state change to CATCHUP, it will
37 : * read the stream and apply changes (acting like an apply worker) until
38 : * it catches up to the specified stream position. Then it sets the
39 : * state to SYNCDONE. There might be zero changes applied between
40 : * CATCHUP and SYNCDONE, because the sync worker might be ahead of the
41 : * apply worker.
42 : * - Once the state was set to SYNCDONE, the apply will continue tracking
43 : * the table until it reaches the SYNCDONE stream position, at which
44 : * point it sets state to READY and stops tracking. Again, there might
45 : * be zero changes in between.
46 : *
47 : * So the state progression is always: INIT -> DATASYNC -> SYNCWAIT -> CATCHUP ->
48 : * SYNCDONE -> READY.
49 : *
50 : * The catalog pg_subscription_rel is used to keep information about
51 : * subscribed tables and their state. Some transient state during data
52 : * synchronization is kept in shared memory. The states SYNCWAIT and
53 : * CATCHUP only appear in memory.
54 : *
55 : * Example flows look like this:
56 : * - Apply is in front:
57 : * sync:8
58 : * -> set in memory SYNCWAIT
59 : * apply:10
60 : * -> set in memory CATCHUP
61 : * -> enter wait-loop
62 : * sync:10
63 : * -> set in catalog SYNCDONE
64 : * -> exit
65 : * apply:10
66 : * -> exit wait-loop
67 : * -> continue rep
68 : * apply:11
69 : * -> set in catalog READY
70 : * - Sync in front:
71 : * sync:10
72 : * -> set in memory SYNCWAIT
73 : * apply:8
74 : * -> set in memory CATCHUP
75 : * -> continue per-table filtering
76 : * sync:10
77 : * -> set in catalog SYNCDONE
78 : * -> exit
79 : * apply:10
80 : * -> set in catalog READY
81 : * -> stop per-table filtering
82 : * -> continue rep
83 : *-------------------------------------------------------------------------
84 : */
85 :
86 : #include "postgres.h"
87 :
88 : #include "miscadmin.h"
89 : #include "pgstat.h"
90 :
91 : #include "access/xact.h"
92 :
93 : #include "catalog/pg_subscription_rel.h"
94 : #include "catalog/pg_type.h"
95 :
96 : #include "commands/copy.h"
97 :
98 : #include "parser/parse_relation.h"
99 :
100 : #include "replication/logicallauncher.h"
101 : #include "replication/logicalrelation.h"
102 : #include "replication/walreceiver.h"
103 : #include "replication/worker_internal.h"
104 :
105 : #include "utils/snapmgr.h"
106 : #include "storage/ipc.h"
107 :
108 : #include "utils/builtins.h"
109 : #include "utils/lsyscache.h"
110 : #include "utils/memutils.h"
111 :
112 : static bool table_states_valid = false;
113 :
114 : StringInfo copybuf = NULL;
115 :
116 : /*
117 : * Exit routine for synchronization worker.
118 : */
119 : static void
120 : pg_attribute_noreturn()
121 0 : finish_sync_worker(void)
122 : {
123 : /*
124 : * Commit any outstanding transaction. This is the usual case, unless
125 : * there was nothing to do for the table.
126 : */
127 0 : if (IsTransactionState())
128 : {
129 0 : CommitTransactionCommand();
130 0 : pgstat_report_stat(false);
131 : }
132 :
133 : /* And flush all writes. */
134 0 : XLogFlush(GetXLogWriteRecPtr());
135 :
136 0 : StartTransactionCommand();
137 0 : ereport(LOG,
138 : (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
139 : MySubscription->name,
140 : get_rel_name(MyLogicalRepWorker->relid))));
141 0 : CommitTransactionCommand();
142 :
143 : /* Find the main apply worker and signal it. */
144 0 : logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
145 :
146 : /* Stop gracefully */
147 0 : proc_exit(0);
148 : }
149 :
150 : /*
151 : * Wait until the relation synchronization state is set in the catalog to the
152 : * expected one.
153 : *
154 : * Used when transitioning from CATCHUP state to SYNCDONE.
155 : *
156 : * Returns false if the synchronization worker has disappeared or the table state
157 : * has been reset.
158 : */
159 : static bool
160 0 : wait_for_relation_state_change(Oid relid, char expected_state)
161 : {
162 : int rc;
163 : char state;
164 :
165 : for (;;)
166 : {
167 : LogicalRepWorker *worker;
168 : XLogRecPtr statelsn;
169 :
170 0 : CHECK_FOR_INTERRUPTS();
171 :
172 : /* XXX use cache invalidation here to improve performance? */
173 0 : PushActiveSnapshot(GetLatestSnapshot());
174 0 : state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
175 : relid, &statelsn, true);
176 0 : PopActiveSnapshot();
177 :
178 0 : if (state == SUBREL_STATE_UNKNOWN)
179 0 : return false;
180 :
181 0 : if (state == expected_state)
182 0 : return true;
183 :
184 : /* Check if the sync worker is still running and bail if not. */
185 0 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
186 :
187 : /* Check if the opposite worker is still running and bail if not. */
188 0 : worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
189 0 : am_tablesync_worker() ? InvalidOid : relid,
190 : false);
191 0 : LWLockRelease(LogicalRepWorkerLock);
192 0 : if (!worker)
193 0 : return false;
194 :
195 0 : rc = WaitLatch(MyLatch,
196 : WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
197 : 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
198 :
199 : /* emergency bailout if postmaster has died */
200 0 : if (rc & WL_POSTMASTER_DEATH)
201 0 : proc_exit(1);
202 :
203 0 : ResetLatch(MyLatch);
204 0 : }
205 :
206 : return false;
207 : }
208 :
209 : /*
210 : * Wait until the apply worker changes the state of our synchronization
211 : * worker to the expected one.
212 : *
213 : * Used when transitioning from SYNCWAIT state to CATCHUP.
214 : *
215 : * Returns false if the apply worker has disappeared.
216 : */
217 : static bool
218 0 : wait_for_worker_state_change(char expected_state)
219 : {
220 : int rc;
221 :
222 : for (;;)
223 : {
224 : LogicalRepWorker *worker;
225 :
226 0 : CHECK_FOR_INTERRUPTS();
227 :
228 : /*
229 : * Done if already in correct state. (We assume this fetch is atomic
230 : * enough to not give a misleading answer if we do it with no lock.)
231 : */
232 0 : if (MyLogicalRepWorker->relstate == expected_state)
233 0 : return true;
234 :
235 : /*
236 : * Bail out if the apply worker has died, else signal it we're
237 : * waiting.
238 : */
239 0 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
240 0 : worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
241 : InvalidOid, false);
242 0 : if (worker && worker->proc)
243 0 : logicalrep_worker_wakeup_ptr(worker);
244 0 : LWLockRelease(LogicalRepWorkerLock);
245 0 : if (!worker)
246 0 : break;
247 :
248 : /*
249 : * Wait. We expect to get a latch signal back from the apply worker,
250 : * but use a timeout in case it dies without sending one.
251 : */
252 0 : rc = WaitLatch(MyLatch,
253 : WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
254 : 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
255 :
256 : /* emergency bailout if postmaster has died */
257 0 : if (rc & WL_POSTMASTER_DEATH)
258 0 : proc_exit(1);
259 :
260 0 : if (rc & WL_LATCH_SET)
261 0 : ResetLatch(MyLatch);
262 0 : }
263 :
264 0 : return false;
265 : }
266 :
267 : /*
268 : * Callback from syscache invalidation.
269 : */
270 : void
271 0 : invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
272 : {
273 0 : table_states_valid = false;
274 0 : }
275 :
276 : /*
277 : * Handle table synchronization cooperation from the synchronization
278 : * worker.
279 : *
280 : * If the sync worker is in CATCHUP state and reached (or passed) the
281 : * predetermined synchronization point in the WAL stream, mark the table as
282 : * SYNCDONE and finish.
283 : */
284 : static void
285 0 : process_syncing_tables_for_sync(XLogRecPtr current_lsn)
286 : {
287 0 : Assert(IsTransactionState());
288 :
289 0 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
290 :
291 0 : if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
292 0 : current_lsn >= MyLogicalRepWorker->relstate_lsn)
293 : {
294 : TimeLineID tli;
295 :
296 0 : MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
297 0 : MyLogicalRepWorker->relstate_lsn = current_lsn;
298 :
299 0 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
300 :
301 0 : SetSubscriptionRelState(MyLogicalRepWorker->subid,
302 0 : MyLogicalRepWorker->relid,
303 0 : MyLogicalRepWorker->relstate,
304 0 : MyLogicalRepWorker->relstate_lsn,
305 : true);
306 :
307 0 : walrcv_endstreaming(wrconn, &tli);
308 0 : finish_sync_worker();
309 : }
310 : else
311 0 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
312 0 : }
313 :
314 : /*
315 : * Handle table synchronization cooperation from the apply worker.
316 : *
317 : * Walk over all subscription tables that are individually tracked by the
318 : * apply process (currently, all that have state other than
319 : * SUBREL_STATE_READY) and manage synchronization for them.
320 : *
321 : * If there are tables that need synchronizing and are not being synchronized
322 : * yet, start sync workers for them (if there are free slots for sync
323 : * workers). To prevent starting the sync worker for the same relation at a
324 : * high frequency after a failure, we store its last start time with each sync
325 : * state info. We start the sync worker for the same relation after waiting
326 : * at least wal_retrieve_retry_interval.
327 : *
328 : * For tables that are being synchronized already, check if sync workers
329 : * either need action from the apply worker or have finished. This is the
330 : * SYNCWAIT to CATCHUP transition.
331 : *
332 : * If the synchronization position is reached (SYNCDONE), then the table can
333 : * be marked as READY and is no longer tracked.
334 : */
335 : static void
336 0 : process_syncing_tables_for_apply(XLogRecPtr current_lsn)
337 : {
338 : struct tablesync_start_time_mapping
339 : {
340 : Oid relid;
341 : TimestampTz last_start_time;
342 : };
343 : static List *table_states = NIL;
344 : static HTAB *last_start_times = NULL;
345 : ListCell *lc;
346 0 : bool started_tx = false;
347 :
348 0 : Assert(!IsTransactionState());
349 :
350 : /* We need up-to-date sync state info for subscription tables here. */
351 0 : if (!table_states_valid)
352 : {
353 : MemoryContext oldctx;
354 : List *rstates;
355 : ListCell *lc;
356 : SubscriptionRelState *rstate;
357 :
358 : /* Clean the old list. */
359 0 : list_free_deep(table_states);
360 0 : table_states = NIL;
361 :
362 0 : StartTransactionCommand();
363 0 : started_tx = true;
364 :
365 : /* Fetch all non-ready tables. */
366 0 : rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
367 :
368 : /* Allocate the tracking info in a permanent memory context. */
369 0 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
370 0 : foreach(lc, rstates)
371 : {
372 0 : rstate = palloc(sizeof(SubscriptionRelState));
373 0 : memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
374 0 : table_states = lappend(table_states, rstate);
375 : }
376 0 : MemoryContextSwitchTo(oldctx);
377 :
378 0 : table_states_valid = true;
379 : }
380 :
381 : /*
382 : * Prepare a hash table for tracking last start times of workers, to avoid
383 : * immediate restarts. We don't need it if there are no tables that need
384 : * syncing.
385 : */
386 0 : if (table_states && !last_start_times)
387 0 : {
388 : HASHCTL ctl;
389 :
390 0 : memset(&ctl, 0, sizeof(ctl));
391 0 : ctl.keysize = sizeof(Oid);
392 0 : ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
393 0 : last_start_times = hash_create("Logical replication table sync worker start times",
394 : 256, &ctl, HASH_ELEM | HASH_BLOBS);
395 : }
396 :
397 : /*
398 : * Clean up the hash table when we're done with all tables (just to
399 : * release the bit of memory).
400 : */
401 0 : else if (!table_states && last_start_times)
402 : {
403 0 : hash_destroy(last_start_times);
404 0 : last_start_times = NULL;
405 : }
406 :
407 : /*
408 : * Process all tables that are being synchronized.
409 : */
410 0 : foreach(lc, table_states)
411 : {
412 0 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
413 :
414 0 : if (rstate->state == SUBREL_STATE_SYNCDONE)
415 : {
416 : /*
417 : * Apply has caught up to the position where the table sync has
418 : * finished. Mark the table as ready so that the apply will just
419 : * continue to replicate it normally.
420 : */
421 0 : if (current_lsn >= rstate->lsn)
422 : {
423 0 : rstate->state = SUBREL_STATE_READY;
424 0 : rstate->lsn = current_lsn;
425 0 : if (!started_tx)
426 : {
427 0 : StartTransactionCommand();
428 0 : started_tx = true;
429 : }
430 0 : SetSubscriptionRelState(MyLogicalRepWorker->subid,
431 0 : rstate->relid, rstate->state,
432 : rstate->lsn, true);
433 : }
434 : }
435 : else
436 : {
437 : LogicalRepWorker *syncworker;
438 :
439 : /*
440 : * Look for a sync worker for this relation.
441 : */
442 0 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
443 :
444 0 : syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
445 : rstate->relid, false);
446 :
447 0 : if (syncworker)
448 : {
449 : /* Found one, update our copy of its state */
450 0 : SpinLockAcquire(&syncworker->relmutex);
451 0 : rstate->state = syncworker->relstate;
452 0 : rstate->lsn = syncworker->relstate_lsn;
453 0 : if (rstate->state == SUBREL_STATE_SYNCWAIT)
454 : {
455 : /*
456 : * Sync worker is waiting for apply. Tell sync worker it
457 : * can catchup now.
458 : */
459 0 : syncworker->relstate = SUBREL_STATE_CATCHUP;
460 0 : syncworker->relstate_lsn =
461 0 : Max(syncworker->relstate_lsn, current_lsn);
462 : }
463 0 : SpinLockRelease(&syncworker->relmutex);
464 :
465 : /* If we told worker to catch up, wait for it. */
466 0 : if (rstate->state == SUBREL_STATE_SYNCWAIT)
467 : {
468 : /* Signal the sync worker, as it may be waiting for us. */
469 0 : if (syncworker->proc)
470 0 : logicalrep_worker_wakeup_ptr(syncworker);
471 :
472 : /* Now safe to release the LWLock */
473 0 : LWLockRelease(LogicalRepWorkerLock);
474 :
475 : /*
476 : * Enter busy loop and wait for synchronization worker to
477 : * reach expected state (or die trying).
478 : */
479 0 : if (!started_tx)
480 : {
481 0 : StartTransactionCommand();
482 0 : started_tx = true;
483 : }
484 :
485 0 : wait_for_relation_state_change(rstate->relid,
486 : SUBREL_STATE_SYNCDONE);
487 : }
488 : else
489 0 : LWLockRelease(LogicalRepWorkerLock);
490 : }
491 : else
492 : {
493 : /*
494 : * If there is no sync worker for this table yet, count
495 : * running sync workers for this subscription, while we have
496 : * the lock.
497 : */
498 0 : int nsyncworkers =
499 0 : logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
500 :
501 : /* Now safe to release the LWLock */
502 0 : LWLockRelease(LogicalRepWorkerLock);
503 :
504 : /*
505 : * If there are free sync worker slot(s), start a new sync
506 : * worker for the table.
507 : */
508 0 : if (nsyncworkers < max_sync_workers_per_subscription)
509 : {
510 0 : TimestampTz now = GetCurrentTimestamp();
511 : struct tablesync_start_time_mapping *hentry;
512 : bool found;
513 :
514 0 : hentry = hash_search(last_start_times, &rstate->relid,
515 : HASH_ENTER, &found);
516 :
517 0 : if (!found ||
518 0 : TimestampDifferenceExceeds(hentry->last_start_time, now,
519 : wal_retrieve_retry_interval))
520 : {
521 0 : logicalrep_worker_launch(MyLogicalRepWorker->dbid,
522 0 : MySubscription->oid,
523 0 : MySubscription->name,
524 0 : MyLogicalRepWorker->userid,
525 : rstate->relid);
526 0 : hentry->last_start_time = now;
527 : }
528 : }
529 : }
530 : }
531 : }
532 :
533 0 : if (started_tx)
534 : {
535 0 : CommitTransactionCommand();
536 0 : pgstat_report_stat(false);
537 : }
538 0 : }
539 :
540 : /*
541 : * Process possible state change(s) of tables that are being synchronized.
542 : */
543 : void
544 0 : process_syncing_tables(XLogRecPtr current_lsn)
545 : {
546 0 : if (am_tablesync_worker())
547 0 : process_syncing_tables_for_sync(current_lsn);
548 : else
549 0 : process_syncing_tables_for_apply(current_lsn);
550 0 : }
551 :
552 : /*
553 : * Create list of columns for COPY based on logical relation mapping.
554 : */
555 : static List *
556 0 : make_copy_attnamelist(LogicalRepRelMapEntry *rel)
557 : {
558 0 : List *attnamelist = NIL;
559 : int i;
560 :
561 0 : for (i = 0; i < rel->remoterel.natts; i++)
562 : {
563 0 : attnamelist = lappend(attnamelist,
564 0 : makeString(rel->remoterel.attnames[i]));
565 : }
566 :
567 :
568 0 : return attnamelist;
569 : }
570 :
571 : /*
572 : * Data source callback for the COPY FROM, which reads from the remote
573 : * connection and passes the data back to our local COPY.
574 : */
575 : static int
576 0 : copy_read_data(void *outbuf, int minread, int maxread)
577 : {
578 0 : int bytesread = 0;
579 : int avail;
580 :
581 : /* If there are some leftover data from previous read, use it. */
582 0 : avail = copybuf->len - copybuf->cursor;
583 0 : if (avail)
584 : {
585 0 : if (avail > maxread)
586 0 : avail = maxread;
587 0 : memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
588 0 : copybuf->cursor += avail;
589 0 : maxread -= avail;
590 0 : bytesread += avail;
591 : }
592 :
593 0 : while (maxread > 0 && bytesread < minread)
594 : {
595 0 : pgsocket fd = PGINVALID_SOCKET;
596 : int rc;
597 : int len;
598 0 : char *buf = NULL;
599 :
600 : for (;;)
601 : {
602 : /* Try read the data. */
603 0 : len = walrcv_receive(wrconn, &buf, &fd);
604 :
605 0 : CHECK_FOR_INTERRUPTS();
606 :
607 0 : if (len == 0)
608 0 : break;
609 0 : else if (len < 0)
610 0 : return bytesread;
611 : else
612 : {
613 : /* Process the data */
614 0 : copybuf->data = buf;
615 0 : copybuf->len = len;
616 0 : copybuf->cursor = 0;
617 :
618 0 : avail = copybuf->len - copybuf->cursor;
619 0 : if (avail > maxread)
620 0 : avail = maxread;
621 0 : memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
622 0 : outbuf = (void *) ((char *) outbuf + avail);
623 0 : copybuf->cursor += avail;
624 0 : maxread -= avail;
625 0 : bytesread += avail;
626 : }
627 :
628 0 : if (maxread <= 0 || bytesread >= minread)
629 0 : return bytesread;
630 0 : }
631 :
632 : /*
633 : * Wait for more data or latch.
634 : */
635 0 : rc = WaitLatchOrSocket(MyLatch,
636 : WL_SOCKET_READABLE | WL_LATCH_SET |
637 : WL_TIMEOUT | WL_POSTMASTER_DEATH,
638 : fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
639 :
640 : /* Emergency bailout if postmaster has died */
641 0 : if (rc & WL_POSTMASTER_DEATH)
642 0 : proc_exit(1);
643 :
644 0 : ResetLatch(MyLatch);
645 : }
646 :
647 0 : return bytesread;
648 : }
649 :
650 :
651 : /*
652 : * Get information about remote relation in similar fashion the RELATION
653 : * message provides during replication.
654 : */
655 : static void
656 0 : fetch_remote_table_info(char *nspname, char *relname,
657 : LogicalRepRelation *lrel)
658 : {
659 : WalRcvExecResult *res;
660 : StringInfoData cmd;
661 : TupleTableSlot *slot;
662 0 : Oid tableRow[2] = {OIDOID, CHAROID};
663 0 : Oid attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID};
664 : bool isnull;
665 : int natt;
666 :
667 0 : lrel->nspname = nspname;
668 0 : lrel->relname = relname;
669 :
670 : /* First fetch Oid and replica identity. */
671 0 : initStringInfo(&cmd);
672 0 : appendStringInfo(&cmd, "SELECT c.oid, c.relreplident"
673 : " FROM pg_catalog.pg_class c"
674 : " INNER JOIN pg_catalog.pg_namespace n"
675 : " ON (c.relnamespace = n.oid)"
676 : " WHERE n.nspname = %s"
677 : " AND c.relname = %s"
678 : " AND c.relkind = 'r'",
679 : quote_literal_cstr(nspname),
680 : quote_literal_cstr(relname));
681 0 : res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
682 :
683 0 : if (res->status != WALRCV_OK_TUPLES)
684 0 : ereport(ERROR,
685 : (errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
686 : nspname, relname, res->err)));
687 :
688 0 : slot = MakeSingleTupleTableSlot(res->tupledesc);
689 0 : if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
690 0 : ereport(ERROR,
691 : (errmsg("table \"%s.%s\" not found on publisher",
692 : nspname, relname)));
693 :
694 0 : lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
695 0 : Assert(!isnull);
696 0 : lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
697 0 : Assert(!isnull);
698 :
699 0 : ExecDropSingleTupleTableSlot(slot);
700 0 : walrcv_clear_result(res);
701 :
702 : /* Now fetch columns. */
703 0 : resetStringInfo(&cmd);
704 0 : appendStringInfo(&cmd,
705 : "SELECT a.attname,"
706 : " a.atttypid,"
707 : " a.atttypmod,"
708 : " a.attnum = ANY(i.indkey)"
709 : " FROM pg_catalog.pg_attribute a"
710 : " LEFT JOIN pg_catalog.pg_index i"
711 : " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
712 : " WHERE a.attnum > 0::pg_catalog.int2"
713 : " AND NOT a.attisdropped"
714 : " AND a.attrelid = %u"
715 : " ORDER BY a.attnum",
716 : lrel->remoteid, lrel->remoteid);
717 0 : res = walrcv_exec(wrconn, cmd.data, 4, attrRow);
718 :
719 0 : if (res->status != WALRCV_OK_TUPLES)
720 0 : ereport(ERROR,
721 : (errmsg("could not fetch table info for table \"%s.%s\": %s",
722 : nspname, relname, res->err)));
723 :
724 : /* We don't know the number of rows coming, so allocate enough space. */
725 0 : lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
726 0 : lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
727 0 : lrel->attkeys = NULL;
728 :
729 0 : natt = 0;
730 0 : slot = MakeSingleTupleTableSlot(res->tupledesc);
731 0 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
732 : {
733 0 : lrel->attnames[natt] =
734 0 : TextDatumGetCString(slot_getattr(slot, 1, &isnull));
735 0 : Assert(!isnull);
736 0 : lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
737 0 : Assert(!isnull);
738 0 : if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
739 0 : lrel->attkeys = bms_add_member(lrel->attkeys, natt);
740 :
741 : /* Should never happen. */
742 0 : if (++natt >= MaxTupleAttributeNumber)
743 0 : elog(ERROR, "too many columns in remote table \"%s.%s\"",
744 : nspname, relname);
745 :
746 0 : ExecClearTuple(slot);
747 : }
748 0 : ExecDropSingleTupleTableSlot(slot);
749 :
750 0 : lrel->natts = natt;
751 :
752 0 : walrcv_clear_result(res);
753 0 : pfree(cmd.data);
754 0 : }
755 :
756 : /*
757 : * Copy existing data of a table from publisher.
758 : *
759 : * Caller is responsible for locking the local relation.
760 : */
761 : static void
762 0 : copy_table(Relation rel)
763 : {
764 : LogicalRepRelMapEntry *relmapentry;
765 : LogicalRepRelation lrel;
766 : WalRcvExecResult *res;
767 : StringInfoData cmd;
768 : CopyState cstate;
769 : List *attnamelist;
770 : ParseState *pstate;
771 :
772 : /* Get the publisher relation info. */
773 0 : fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
774 0 : RelationGetRelationName(rel), &lrel);
775 :
776 : /* Put the relation into relmap. */
777 0 : logicalrep_relmap_update(&lrel);
778 :
779 : /* Map the publisher relation to local one. */
780 0 : relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
781 0 : Assert(rel == relmapentry->localrel);
782 :
783 : /* Start copy on the publisher. */
784 0 : initStringInfo(&cmd);
785 0 : appendStringInfo(&cmd, "COPY %s TO STDOUT",
786 0 : quote_qualified_identifier(lrel.nspname, lrel.relname));
787 0 : res = walrcv_exec(wrconn, cmd.data, 0, NULL);
788 0 : pfree(cmd.data);
789 0 : if (res->status != WALRCV_OK_COPY_OUT)
790 0 : ereport(ERROR,
791 : (errmsg("could not start initial contents copy for table \"%s.%s\": %s",
792 : lrel.nspname, lrel.relname, res->err)));
793 0 : walrcv_clear_result(res);
794 :
795 0 : copybuf = makeStringInfo();
796 :
797 0 : pstate = make_parsestate(NULL);
798 0 : addRangeTableEntryForRelation(pstate, rel, NULL, false, false);
799 :
800 0 : attnamelist = make_copy_attnamelist(relmapentry);
801 0 : cstate = BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, attnamelist, NIL);
802 :
803 : /* Do the copy */
804 0 : (void) CopyFrom(cstate);
805 :
806 0 : logicalrep_rel_close(relmapentry, NoLock);
807 0 : }
808 :
809 : /*
810 : * Start syncing the table in the sync worker.
811 : *
812 : * The returned slot name is palloc'ed in current memory context.
813 : */
814 : char *
815 0 : LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
816 : {
817 : char *slotname;
818 : char *err;
819 : char relstate;
820 : XLogRecPtr relstate_lsn;
821 :
822 : /* Check the state of the table synchronization. */
823 0 : StartTransactionCommand();
824 0 : relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
825 0 : MyLogicalRepWorker->relid,
826 : &relstate_lsn, true);
827 0 : CommitTransactionCommand();
828 :
829 0 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
830 0 : MyLogicalRepWorker->relstate = relstate;
831 0 : MyLogicalRepWorker->relstate_lsn = relstate_lsn;
832 0 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
833 :
834 : /*
835 : * To build a slot name for the sync work, we are limited to NAMEDATALEN -
836 : * 1 characters. We cut the original slot name to NAMEDATALEN - 28 chars
837 : * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0'). (It's actually the
838 : * NAMEDATALEN on the remote that matters, but this scheme will also work
839 : * reasonably if that is different.)
840 : */
841 : StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small"); /* for sanity */
842 0 : slotname = psprintf("%.*s_%u_sync_%u",
843 : NAMEDATALEN - 28,
844 0 : MySubscription->slotname,
845 0 : MySubscription->oid,
846 0 : MyLogicalRepWorker->relid);
847 :
848 : /*
849 : * Here we use the slot name instead of the subscription name as the
850 : * application_name, so that it is different from the main apply worker,
851 : * so that synchronous replication can distinguish them.
852 : */
853 0 : wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
854 0 : if (wrconn == NULL)
855 0 : ereport(ERROR,
856 : (errmsg("could not connect to the publisher: %s", err)));
857 :
858 0 : switch (MyLogicalRepWorker->relstate)
859 : {
860 : case SUBREL_STATE_INIT:
861 : case SUBREL_STATE_DATASYNC:
862 : {
863 : Relation rel;
864 : WalRcvExecResult *res;
865 :
866 0 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
867 0 : MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
868 0 : MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
869 0 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
870 :
871 : /* Update the state and make it visible to others. */
872 0 : StartTransactionCommand();
873 0 : SetSubscriptionRelState(MyLogicalRepWorker->subid,
874 0 : MyLogicalRepWorker->relid,
875 0 : MyLogicalRepWorker->relstate,
876 0 : MyLogicalRepWorker->relstate_lsn,
877 : true);
878 0 : CommitTransactionCommand();
879 0 : pgstat_report_stat(false);
880 :
881 : /*
882 : * We want to do the table data sync in a single transaction.
883 : */
884 0 : StartTransactionCommand();
885 :
886 : /*
887 : * Use a standard write lock here. It might be better to
888 : * disallow access to the table while it's being synchronized.
889 : * But we don't want to block the main apply process from
890 : * working and it has to open the relation in RowExclusiveLock
891 : * when remapping remote relation id to local one.
892 : */
893 0 : rel = heap_open(MyLogicalRepWorker->relid, RowExclusiveLock);
894 :
895 : /*
896 : * Create a temporary slot for the sync process. We do this
897 : * inside the transaction so that we can use the snapshot made
898 : * by the slot to get existing data.
899 : */
900 0 : res = walrcv_exec(wrconn,
901 : "BEGIN READ ONLY ISOLATION LEVEL "
902 : "REPEATABLE READ", 0, NULL);
903 0 : if (res->status != WALRCV_OK_COMMAND)
904 0 : ereport(ERROR,
905 : (errmsg("table copy could not start transaction on publisher"),
906 : errdetail("The error was: %s", res->err)));
907 0 : walrcv_clear_result(res);
908 :
909 : /*
910 : * Create new temporary logical decoding slot.
911 : *
912 : * We'll use slot for data copy so make sure the snapshot is
913 : * used for the transaction; that way the COPY will get data
914 : * that is consistent with the lsn used by the slot to start
915 : * decoding.
916 : */
917 0 : walrcv_create_slot(wrconn, slotname, true,
918 : CRS_USE_SNAPSHOT, origin_startpos);
919 :
920 0 : PushActiveSnapshot(GetTransactionSnapshot());
921 0 : copy_table(rel);
922 0 : PopActiveSnapshot();
923 :
924 0 : res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
925 0 : if (res->status != WALRCV_OK_COMMAND)
926 0 : ereport(ERROR,
927 : (errmsg("table copy could not finish transaction on publisher"),
928 : errdetail("The error was: %s", res->err)));
929 0 : walrcv_clear_result(res);
930 :
931 0 : heap_close(rel, NoLock);
932 :
933 : /* Make the copy visible. */
934 0 : CommandCounterIncrement();
935 :
936 : /*
937 : * We are done with the initial data synchronization, update
938 : * the state.
939 : */
940 0 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
941 0 : MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
942 0 : MyLogicalRepWorker->relstate_lsn = *origin_startpos;
943 0 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
944 :
945 : /* Wait for main apply worker to tell us to catchup. */
946 0 : wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
947 :
948 : /*----------
949 : * There are now two possible states here:
950 : * a) Sync is behind the apply. If that's the case we need to
951 : * catch up with it by consuming the logical replication
952 : * stream up to the relstate_lsn. For that, we exit this
953 : * function and continue in ApplyWorkerMain().
954 : * b) Sync is caught up with the apply. So it can just set
955 : * the state to SYNCDONE and finish.
956 : *----------
957 : */
958 0 : if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn)
959 : {
960 : /*
961 : * Update the new state in catalog. No need to bother
962 : * with the shmem state as we are exiting for good.
963 : */
964 0 : SetSubscriptionRelState(MyLogicalRepWorker->subid,
965 0 : MyLogicalRepWorker->relid,
966 : SUBREL_STATE_SYNCDONE,
967 : *origin_startpos,
968 : true);
969 0 : finish_sync_worker();
970 : }
971 0 : break;
972 : }
973 : case SUBREL_STATE_SYNCDONE:
974 : case SUBREL_STATE_READY:
975 : case SUBREL_STATE_UNKNOWN:
976 :
977 : /*
978 : * Nothing to do here but finish. (UNKNOWN means the relation was
979 : * removed from pg_subscription_rel before the sync worker could
980 : * start.)
981 : */
982 0 : finish_sync_worker();
983 : break;
984 : default:
985 0 : elog(ERROR, "unknown relation state \"%c\"",
986 : MyLogicalRepWorker->relstate);
987 : }
988 :
989 0 : return slotname;
990 : }
|