Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * worker.c
3 : * PostgreSQL logical replication worker (apply)
4 : *
5 : * Copyright (c) 2016-2017, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/worker.c
9 : *
10 : * NOTES
11 : * This file contains the worker which applies logical changes as they come
12 : * from remote logical replication stream.
13 : *
14 : * The main worker (apply) is started by logical replication worker
15 : * launcher for every enabled subscription in a database. It uses
16 : * walsender protocol to communicate with publisher.
17 : *
18 : * This module includes server facing code and shares libpqwalreceiver
19 : * module with walreceiver for providing the libpq specific functionality.
20 : *
21 : *-------------------------------------------------------------------------
22 : */
23 :
24 : #include "postgres.h"
25 :
26 : #include "miscadmin.h"
27 : #include "pgstat.h"
28 : #include "funcapi.h"
29 :
30 : #include "access/xact.h"
31 : #include "access/xlog_internal.h"
32 :
33 : #include "catalog/namespace.h"
34 : #include "catalog/pg_subscription.h"
35 : #include "catalog/pg_subscription_rel.h"
36 :
37 : #include "commands/trigger.h"
38 :
39 : #include "executor/executor.h"
40 : #include "executor/nodeModifyTable.h"
41 :
42 : #include "libpq/pqformat.h"
43 : #include "libpq/pqsignal.h"
44 :
45 : #include "mb/pg_wchar.h"
46 :
47 : #include "nodes/makefuncs.h"
48 :
49 : #include "optimizer/planner.h"
50 :
51 : #include "parser/parse_relation.h"
52 :
53 : #include "postmaster/bgworker.h"
54 : #include "postmaster/postmaster.h"
55 : #include "postmaster/walwriter.h"
56 :
57 : #include "replication/decode.h"
58 : #include "replication/logical.h"
59 : #include "replication/logicalproto.h"
60 : #include "replication/logicalrelation.h"
61 : #include "replication/logicalworker.h"
62 : #include "replication/reorderbuffer.h"
63 : #include "replication/origin.h"
64 : #include "replication/snapbuild.h"
65 : #include "replication/walreceiver.h"
66 : #include "replication/worker_internal.h"
67 :
68 : #include "rewrite/rewriteHandler.h"
69 :
70 : #include "storage/bufmgr.h"
71 : #include "storage/ipc.h"
72 : #include "storage/lmgr.h"
73 : #include "storage/proc.h"
74 : #include "storage/procarray.h"
75 :
76 : #include "tcop/tcopprot.h"
77 :
78 : #include "utils/builtins.h"
79 : #include "utils/catcache.h"
80 : #include "utils/datum.h"
81 : #include "utils/fmgroids.h"
82 : #include "utils/guc.h"
83 : #include "utils/inval.h"
84 : #include "utils/lsyscache.h"
85 : #include "utils/memutils.h"
86 : #include "utils/timeout.h"
87 : #include "utils/tqual.h"
88 : #include "utils/syscache.h"
89 :
90 : #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
91 :
92 : typedef struct FlushPosition
93 : {
94 : dlist_node node;
95 : XLogRecPtr local_end;
96 : XLogRecPtr remote_end;
97 : } FlushPosition;
98 :
99 : static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
100 :
101 : typedef struct SlotErrCallbackArg
102 : {
103 : LogicalRepRelation *rel;
104 : int attnum;
105 : } SlotErrCallbackArg;
106 :
107 : static MemoryContext ApplyMessageContext = NULL;
108 : MemoryContext ApplyContext = NULL;
109 :
110 : WalReceiverConn *wrconn = NULL;
111 :
112 : Subscription *MySubscription = NULL;
113 : bool MySubscriptionValid = false;
114 :
115 : bool in_remote_transaction = false;
116 : static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
117 :
118 : static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
119 :
120 : static void store_flush_position(XLogRecPtr remote_lsn);
121 :
122 : static void maybe_reread_subscription(void);
123 :
124 : /* Flags set by signal handlers */
125 : static volatile sig_atomic_t got_SIGHUP = false;
126 :
127 : /*
128 : * Should this worker apply changes for given relation.
129 : *
130 : * This is mainly needed for initial relation data sync as that runs in
131 : * separate worker process running in parallel and we need some way to skip
132 : * changes coming to the main apply worker during the sync of a table.
133 : *
134 : * Note we need to do smaller or equals comparison for SYNCDONE state because
135 : * it might hold position of end of initial slot consistent point WAL
136 : * record + 1 (ie start of next record) and next record can be COMMIT of
137 : * transaction we are now processing (which is what we set remote_final_lsn
138 : * to in apply_handle_begin).
139 : */
140 : static bool
141 0 : should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
142 : {
143 0 : if (am_tablesync_worker())
144 0 : return MyLogicalRepWorker->relid == rel->localreloid;
145 : else
146 0 : return (rel->state == SUBREL_STATE_READY ||
147 0 : (rel->state == SUBREL_STATE_SYNCDONE &&
148 0 : rel->statelsn <= remote_final_lsn));
149 : }
150 :
151 : /*
152 : * Make sure that we started local transaction.
153 : *
154 : * Also switches to ApplyMessageContext as necessary.
155 : */
156 : static bool
157 0 : ensure_transaction(void)
158 : {
159 0 : if (IsTransactionState())
160 : {
161 0 : SetCurrentStatementStartTimestamp();
162 :
163 0 : if (CurrentMemoryContext != ApplyMessageContext)
164 0 : MemoryContextSwitchTo(ApplyMessageContext);
165 :
166 0 : return false;
167 : }
168 :
169 0 : SetCurrentStatementStartTimestamp();
170 0 : StartTransactionCommand();
171 :
172 0 : maybe_reread_subscription();
173 :
174 0 : MemoryContextSwitchTo(ApplyMessageContext);
175 0 : return true;
176 : }
177 :
178 :
179 : /*
180 : * Executor state preparation for evaluation of constraint expressions,
181 : * indexes and triggers.
182 : *
183 : * This is based on similar code in copy.c
184 : */
185 : static EState *
186 0 : create_estate_for_relation(LogicalRepRelMapEntry *rel)
187 : {
188 : EState *estate;
189 : ResultRelInfo *resultRelInfo;
190 : RangeTblEntry *rte;
191 :
192 0 : estate = CreateExecutorState();
193 :
194 0 : rte = makeNode(RangeTblEntry);
195 0 : rte->rtekind = RTE_RELATION;
196 0 : rte->relid = RelationGetRelid(rel->localrel);
197 0 : rte->relkind = rel->localrel->rd_rel->relkind;
198 0 : estate->es_range_table = list_make1(rte);
199 :
200 0 : resultRelInfo = makeNode(ResultRelInfo);
201 0 : InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
202 :
203 0 : estate->es_result_relations = resultRelInfo;
204 0 : estate->es_num_result_relations = 1;
205 0 : estate->es_result_relation_info = resultRelInfo;
206 :
207 : /* Triggers might need a slot */
208 0 : if (resultRelInfo->ri_TrigDesc)
209 0 : estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
210 :
211 : /* Prepare to catch AFTER triggers. */
212 0 : AfterTriggerBeginQuery();
213 :
214 0 : return estate;
215 : }
216 :
217 : /*
218 : * Executes default values for columns for which we can't map to remote
219 : * relation columns.
220 : *
221 : * This allows us to support tables which have more columns on the downstream
222 : * than on the upstream.
223 : */
224 : static void
225 0 : slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
226 : TupleTableSlot *slot)
227 : {
228 0 : TupleDesc desc = RelationGetDescr(rel->localrel);
229 0 : int num_phys_attrs = desc->natts;
230 : int i;
231 : int attnum,
232 0 : num_defaults = 0;
233 : int *defmap;
234 : ExprState **defexprs;
235 : ExprContext *econtext;
236 :
237 0 : econtext = GetPerTupleExprContext(estate);
238 :
239 : /* We got all the data via replication, no need to evaluate anything. */
240 0 : if (num_phys_attrs == rel->remoterel.natts)
241 0 : return;
242 :
243 0 : defmap = (int *) palloc(num_phys_attrs * sizeof(int));
244 0 : defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
245 :
246 0 : for (attnum = 0; attnum < num_phys_attrs; attnum++)
247 : {
248 : Expr *defexpr;
249 :
250 0 : if (TupleDescAttr(desc, attnum)->attisdropped)
251 0 : continue;
252 :
253 0 : if (rel->attrmap[attnum] >= 0)
254 0 : continue;
255 :
256 0 : defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
257 :
258 0 : if (defexpr != NULL)
259 : {
260 : /* Run the expression through planner */
261 0 : defexpr = expression_planner(defexpr);
262 :
263 : /* Initialize executable expression in copycontext */
264 0 : defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
265 0 : defmap[num_defaults] = attnum;
266 0 : num_defaults++;
267 : }
268 :
269 : }
270 :
271 0 : for (i = 0; i < num_defaults; i++)
272 0 : slot->tts_values[defmap[i]] =
273 0 : ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
274 : }
275 :
276 : /*
277 : * Error callback to give more context info about type conversion failure.
278 : */
279 : static void
280 0 : slot_store_error_callback(void *arg)
281 : {
282 0 : SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
283 : Oid remotetypoid,
284 : localtypoid;
285 :
286 0 : if (errarg->attnum < 0)
287 0 : return;
288 :
289 0 : remotetypoid = errarg->rel->atttyps[errarg->attnum];
290 0 : localtypoid = logicalrep_typmap_getid(remotetypoid);
291 0 : errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
292 : "remote type %s, local type %s",
293 0 : errarg->rel->nspname, errarg->rel->relname,
294 0 : errarg->rel->attnames[errarg->attnum],
295 : format_type_be(remotetypoid),
296 : format_type_be(localtypoid));
297 : }
298 :
299 : /*
300 : * Store data in C string form into slot.
301 : * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
302 : * use better.
303 : */
304 : static void
305 0 : slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
306 : char **values)
307 : {
308 0 : int natts = slot->tts_tupleDescriptor->natts;
309 : int i;
310 : SlotErrCallbackArg errarg;
311 : ErrorContextCallback errcallback;
312 :
313 0 : ExecClearTuple(slot);
314 :
315 : /* Push callback + info on the error context stack */
316 0 : errarg.rel = &rel->remoterel;
317 0 : errarg.attnum = -1;
318 0 : errcallback.callback = slot_store_error_callback;
319 0 : errcallback.arg = (void *) &errarg;
320 0 : errcallback.previous = error_context_stack;
321 0 : error_context_stack = &errcallback;
322 :
323 : /* Call the "in" function for each non-dropped attribute */
324 0 : for (i = 0; i < natts; i++)
325 : {
326 0 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
327 0 : int remoteattnum = rel->attrmap[i];
328 :
329 0 : if (!att->attisdropped && remoteattnum >= 0 &&
330 0 : values[remoteattnum] != NULL)
331 0 : {
332 : Oid typinput;
333 : Oid typioparam;
334 :
335 0 : errarg.attnum = remoteattnum;
336 :
337 0 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
338 0 : slot->tts_values[i] = OidInputFunctionCall(typinput,
339 0 : values[remoteattnum],
340 : typioparam,
341 : att->atttypmod);
342 0 : slot->tts_isnull[i] = false;
343 : }
344 : else
345 : {
346 : /*
347 : * We assign NULL to dropped attributes, NULL values, and missing
348 : * values (missing values should be later filled using
349 : * slot_fill_defaults).
350 : */
351 0 : slot->tts_values[i] = (Datum) 0;
352 0 : slot->tts_isnull[i] = true;
353 : }
354 : }
355 :
356 : /* Pop the error context stack */
357 0 : error_context_stack = errcallback.previous;
358 :
359 0 : ExecStoreVirtualTuple(slot);
360 0 : }
361 :
362 : /*
363 : * Modify slot with user data provided as C strings.
364 : * This is somewhat similar to heap_modify_tuple but also calls the type
365 : * input function on the user data as the input is the text representation
366 : * of the types.
367 : */
368 : static void
369 0 : slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
370 : char **values, bool *replaces)
371 : {
372 0 : int natts = slot->tts_tupleDescriptor->natts;
373 : int i;
374 : SlotErrCallbackArg errarg;
375 : ErrorContextCallback errcallback;
376 :
377 0 : slot_getallattrs(slot);
378 0 : ExecClearTuple(slot);
379 :
380 : /* Push callback + info on the error context stack */
381 0 : errarg.rel = &rel->remoterel;
382 0 : errarg.attnum = -1;
383 0 : errcallback.callback = slot_store_error_callback;
384 0 : errcallback.arg = (void *) &errarg;
385 0 : errcallback.previous = error_context_stack;
386 0 : error_context_stack = &errcallback;
387 :
388 : /* Call the "in" function for each replaced attribute */
389 0 : for (i = 0; i < natts; i++)
390 : {
391 0 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
392 0 : int remoteattnum = rel->attrmap[i];
393 :
394 0 : if (remoteattnum >= 0 && !replaces[remoteattnum])
395 0 : continue;
396 :
397 0 : if (remoteattnum >= 0 && values[remoteattnum] != NULL)
398 0 : {
399 : Oid typinput;
400 : Oid typioparam;
401 :
402 0 : errarg.attnum = remoteattnum;
403 :
404 0 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
405 0 : slot->tts_values[i] = OidInputFunctionCall(typinput,
406 0 : values[remoteattnum],
407 : typioparam,
408 : att->atttypmod);
409 0 : slot->tts_isnull[i] = false;
410 : }
411 : else
412 : {
413 0 : slot->tts_values[i] = (Datum) 0;
414 0 : slot->tts_isnull[i] = true;
415 : }
416 : }
417 :
418 : /* Pop the error context stack */
419 0 : error_context_stack = errcallback.previous;
420 :
421 0 : ExecStoreVirtualTuple(slot);
422 0 : }
423 :
424 : /*
425 : * Handle BEGIN message.
426 : */
427 : static void
428 0 : apply_handle_begin(StringInfo s)
429 : {
430 : LogicalRepBeginData begin_data;
431 :
432 0 : logicalrep_read_begin(s, &begin_data);
433 :
434 0 : remote_final_lsn = begin_data.final_lsn;
435 :
436 0 : in_remote_transaction = true;
437 :
438 0 : pgstat_report_activity(STATE_RUNNING, NULL);
439 0 : }
440 :
441 : /*
442 : * Handle COMMIT message.
443 : *
444 : * TODO, support tracking of multiple origins
445 : */
446 : static void
447 0 : apply_handle_commit(StringInfo s)
448 : {
449 : LogicalRepCommitData commit_data;
450 :
451 0 : logicalrep_read_commit(s, &commit_data);
452 :
453 0 : Assert(commit_data.commit_lsn == remote_final_lsn);
454 :
455 : /* The synchronization worker runs in single transaction. */
456 0 : if (IsTransactionState() && !am_tablesync_worker())
457 : {
458 : /*
459 : * Update origin state so we can restart streaming from correct
460 : * position in case of crash.
461 : */
462 0 : replorigin_session_origin_lsn = commit_data.end_lsn;
463 0 : replorigin_session_origin_timestamp = commit_data.committime;
464 :
465 0 : CommitTransactionCommand();
466 0 : pgstat_report_stat(false);
467 :
468 0 : store_flush_position(commit_data.end_lsn);
469 : }
470 : else
471 : {
472 : /* Process any invalidation messages that might have accumulated. */
473 0 : AcceptInvalidationMessages();
474 0 : maybe_reread_subscription();
475 : }
476 :
477 0 : in_remote_transaction = false;
478 :
479 : /* Process any tables that are being synchronized in parallel. */
480 0 : process_syncing_tables(commit_data.end_lsn);
481 :
482 0 : pgstat_report_activity(STATE_IDLE, NULL);
483 0 : }
484 :
485 : /*
486 : * Handle ORIGIN message.
487 : *
488 : * TODO, support tracking of multiple origins
489 : */
490 : static void
491 0 : apply_handle_origin(StringInfo s)
492 : {
493 : /*
494 : * ORIGIN message can only come inside remote transaction and before any
495 : * actual writes.
496 : */
497 0 : if (!in_remote_transaction ||
498 0 : (IsTransactionState() && !am_tablesync_worker()))
499 0 : ereport(ERROR,
500 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
501 : errmsg("ORIGIN message sent out of order")));
502 0 : }
503 :
504 : /*
505 : * Handle RELATION message.
506 : *
507 : * Note we don't do validation against local schema here. The validation
508 : * against local schema is postponed until first change for given relation
509 : * comes as we only care about it when applying changes for it anyway and we
510 : * do less locking this way.
511 : */
512 : static void
513 0 : apply_handle_relation(StringInfo s)
514 : {
515 : LogicalRepRelation *rel;
516 :
517 0 : rel = logicalrep_read_rel(s);
518 0 : logicalrep_relmap_update(rel);
519 0 : }
520 :
521 : /*
522 : * Handle TYPE message.
523 : *
524 : * Note we don't do local mapping here, that's done when the type is
525 : * actually used.
526 : */
527 : static void
528 0 : apply_handle_type(StringInfo s)
529 : {
530 : LogicalRepTyp typ;
531 :
532 0 : logicalrep_read_typ(s, &typ);
533 0 : logicalrep_typmap_update(&typ);
534 0 : }
535 :
536 : /*
537 : * Get replica identity index or if it is not defined a primary key.
538 : *
539 : * If neither is defined, returns InvalidOid
540 : */
541 : static Oid
542 0 : GetRelationIdentityOrPK(Relation rel)
543 : {
544 : Oid idxoid;
545 :
546 0 : idxoid = RelationGetReplicaIndex(rel);
547 :
548 0 : if (!OidIsValid(idxoid))
549 0 : idxoid = RelationGetPrimaryKeyIndex(rel);
550 :
551 0 : return idxoid;
552 : }
553 :
554 : /*
555 : * Handle INSERT message.
556 : */
557 : static void
558 0 : apply_handle_insert(StringInfo s)
559 : {
560 : LogicalRepRelMapEntry *rel;
561 : LogicalRepTupleData newtup;
562 : LogicalRepRelId relid;
563 : EState *estate;
564 : TupleTableSlot *remoteslot;
565 : MemoryContext oldctx;
566 :
567 0 : ensure_transaction();
568 :
569 0 : relid = logicalrep_read_insert(s, &newtup);
570 0 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
571 0 : if (!should_apply_changes_for_rel(rel))
572 : {
573 : /*
574 : * The relation can't become interesting in the middle of the
575 : * transaction so it's safe to unlock it.
576 : */
577 0 : logicalrep_rel_close(rel, RowExclusiveLock);
578 0 : return;
579 : }
580 :
581 : /* Initialize the executor state. */
582 0 : estate = create_estate_for_relation(rel);
583 0 : remoteslot = ExecInitExtraTupleSlot(estate);
584 0 : ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->localrel));
585 :
586 : /* Process and store remote tuple in the slot */
587 0 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
588 0 : slot_store_cstrings(remoteslot, rel, newtup.values);
589 0 : slot_fill_defaults(rel, estate, remoteslot);
590 0 : MemoryContextSwitchTo(oldctx);
591 :
592 0 : PushActiveSnapshot(GetTransactionSnapshot());
593 0 : ExecOpenIndices(estate->es_result_relation_info, false);
594 :
595 : /* Do the insert. */
596 0 : ExecSimpleRelationInsert(estate, remoteslot);
597 :
598 : /* Cleanup. */
599 0 : ExecCloseIndices(estate->es_result_relation_info);
600 0 : PopActiveSnapshot();
601 :
602 : /* Handle queued AFTER triggers. */
603 0 : AfterTriggerEndQuery(estate);
604 :
605 0 : ExecResetTupleTable(estate->es_tupleTable, false);
606 0 : FreeExecutorState(estate);
607 :
608 0 : logicalrep_rel_close(rel, NoLock);
609 :
610 0 : CommandCounterIncrement();
611 : }
612 :
613 : /*
614 : * Check if the logical replication relation is updatable and throw
615 : * appropriate error if it isn't.
616 : */
617 : static void
618 0 : check_relation_updatable(LogicalRepRelMapEntry *rel)
619 : {
620 : /* Updatable, no error. */
621 0 : if (rel->updatable)
622 0 : return;
623 :
624 : /*
625 : * We are in error mode so it's fine this is somewhat slow. It's better to
626 : * give user correct error.
627 : */
628 0 : if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
629 : {
630 0 : ereport(ERROR,
631 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
632 : errmsg("publisher does not send replica identity column "
633 : "expected by the logical replication target relation \"%s.%s\"",
634 : rel->remoterel.nspname, rel->remoterel.relname)));
635 : }
636 :
637 0 : ereport(ERROR,
638 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
639 : errmsg("logical replication target relation \"%s.%s\" has "
640 : "neither REPLICA IDENTITY index nor PRIMARY "
641 : "KEY and published relation does not have "
642 : "REPLICA IDENTITY FULL",
643 : rel->remoterel.nspname, rel->remoterel.relname)));
644 : }
645 :
646 : /*
647 : * Handle UPDATE message.
648 : *
649 : * TODO: FDW support
650 : */
651 : static void
652 0 : apply_handle_update(StringInfo s)
653 : {
654 : LogicalRepRelMapEntry *rel;
655 : LogicalRepRelId relid;
656 : Oid idxoid;
657 : EState *estate;
658 : EPQState epqstate;
659 : LogicalRepTupleData oldtup;
660 : LogicalRepTupleData newtup;
661 : bool has_oldtup;
662 : TupleTableSlot *localslot;
663 : TupleTableSlot *remoteslot;
664 : bool found;
665 : MemoryContext oldctx;
666 :
667 0 : ensure_transaction();
668 :
669 0 : relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
670 : &newtup);
671 0 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
672 0 : if (!should_apply_changes_for_rel(rel))
673 : {
674 : /*
675 : * The relation can't become interesting in the middle of the
676 : * transaction so it's safe to unlock it.
677 : */
678 0 : logicalrep_rel_close(rel, RowExclusiveLock);
679 0 : return;
680 : }
681 :
682 : /* Check if we can do the update. */
683 0 : check_relation_updatable(rel);
684 :
685 : /* Initialize the executor state. */
686 0 : estate = create_estate_for_relation(rel);
687 0 : remoteslot = ExecInitExtraTupleSlot(estate);
688 0 : ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->localrel));
689 0 : localslot = ExecInitExtraTupleSlot(estate);
690 0 : ExecSetSlotDescriptor(localslot, RelationGetDescr(rel->localrel));
691 0 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
692 :
693 0 : PushActiveSnapshot(GetTransactionSnapshot());
694 0 : ExecOpenIndices(estate->es_result_relation_info, false);
695 :
696 : /* Build the search tuple. */
697 0 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
698 0 : slot_store_cstrings(remoteslot, rel,
699 0 : has_oldtup ? oldtup.values : newtup.values);
700 0 : MemoryContextSwitchTo(oldctx);
701 :
702 : /*
703 : * Try to find tuple using either replica identity index, primary key or
704 : * if needed, sequential scan.
705 : */
706 0 : idxoid = GetRelationIdentityOrPK(rel->localrel);
707 0 : Assert(OidIsValid(idxoid) ||
708 : (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup));
709 :
710 0 : if (OidIsValid(idxoid))
711 0 : found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
712 : LockTupleExclusive,
713 : remoteslot, localslot);
714 : else
715 0 : found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive,
716 : remoteslot, localslot);
717 :
718 0 : ExecClearTuple(remoteslot);
719 :
720 : /*
721 : * Tuple found.
722 : *
723 : * Note this will fail if there are other conflicting unique indexes.
724 : */
725 0 : if (found)
726 : {
727 : /* Process and store remote tuple in the slot */
728 0 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
729 0 : ExecStoreTuple(localslot->tts_tuple, remoteslot, InvalidBuffer, false);
730 0 : slot_modify_cstrings(remoteslot, rel, newtup.values, newtup.changed);
731 0 : MemoryContextSwitchTo(oldctx);
732 :
733 0 : EvalPlanQualSetSlot(&epqstate, remoteslot);
734 :
735 : /* Do the actual update. */
736 0 : ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
737 : }
738 : else
739 : {
740 : /*
741 : * The tuple to be updated could not be found.
742 : *
743 : * TODO what to do here, change the log level to LOG perhaps?
744 : */
745 0 : elog(DEBUG1,
746 : "logical replication did not find row for update "
747 : "in replication target relation \"%s\"",
748 : RelationGetRelationName(rel->localrel));
749 : }
750 :
751 : /* Cleanup. */
752 0 : ExecCloseIndices(estate->es_result_relation_info);
753 0 : PopActiveSnapshot();
754 :
755 : /* Handle queued AFTER triggers. */
756 0 : AfterTriggerEndQuery(estate);
757 :
758 0 : EvalPlanQualEnd(&epqstate);
759 0 : ExecResetTupleTable(estate->es_tupleTable, false);
760 0 : FreeExecutorState(estate);
761 :
762 0 : logicalrep_rel_close(rel, NoLock);
763 :
764 0 : CommandCounterIncrement();
765 : }
766 :
767 : /*
768 : * Handle DELETE message.
769 : *
770 : * TODO: FDW support
771 : */
772 : static void
773 0 : apply_handle_delete(StringInfo s)
774 : {
775 : LogicalRepRelMapEntry *rel;
776 : LogicalRepTupleData oldtup;
777 : LogicalRepRelId relid;
778 : Oid idxoid;
779 : EState *estate;
780 : EPQState epqstate;
781 : TupleTableSlot *remoteslot;
782 : TupleTableSlot *localslot;
783 : bool found;
784 : MemoryContext oldctx;
785 :
786 0 : ensure_transaction();
787 :
788 0 : relid = logicalrep_read_delete(s, &oldtup);
789 0 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
790 0 : if (!should_apply_changes_for_rel(rel))
791 : {
792 : /*
793 : * The relation can't become interesting in the middle of the
794 : * transaction so it's safe to unlock it.
795 : */
796 0 : logicalrep_rel_close(rel, RowExclusiveLock);
797 0 : return;
798 : }
799 :
800 : /* Check if we can do the delete. */
801 0 : check_relation_updatable(rel);
802 :
803 : /* Initialize the executor state. */
804 0 : estate = create_estate_for_relation(rel);
805 0 : remoteslot = ExecInitExtraTupleSlot(estate);
806 0 : ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->localrel));
807 0 : localslot = ExecInitExtraTupleSlot(estate);
808 0 : ExecSetSlotDescriptor(localslot, RelationGetDescr(rel->localrel));
809 0 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
810 :
811 0 : PushActiveSnapshot(GetTransactionSnapshot());
812 0 : ExecOpenIndices(estate->es_result_relation_info, false);
813 :
814 : /* Find the tuple using the replica identity index. */
815 0 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
816 0 : slot_store_cstrings(remoteslot, rel, oldtup.values);
817 0 : MemoryContextSwitchTo(oldctx);
818 :
819 : /*
820 : * Try to find tuple using either replica identity index, primary key or
821 : * if needed, sequential scan.
822 : */
823 0 : idxoid = GetRelationIdentityOrPK(rel->localrel);
824 0 : Assert(OidIsValid(idxoid) ||
825 : (rel->remoterel.replident == REPLICA_IDENTITY_FULL));
826 :
827 0 : if (OidIsValid(idxoid))
828 0 : found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
829 : LockTupleExclusive,
830 : remoteslot, localslot);
831 : else
832 0 : found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive,
833 : remoteslot, localslot);
834 : /* If found delete it. */
835 0 : if (found)
836 : {
837 0 : EvalPlanQualSetSlot(&epqstate, localslot);
838 :
839 : /* Do the actual delete. */
840 0 : ExecSimpleRelationDelete(estate, &epqstate, localslot);
841 : }
842 : else
843 : {
844 : /* The tuple to be deleted could not be found. */
845 0 : ereport(DEBUG1,
846 : (errmsg("logical replication could not find row for delete "
847 : "in replication target %s",
848 : RelationGetRelationName(rel->localrel))));
849 : }
850 :
851 : /* Cleanup. */
852 0 : ExecCloseIndices(estate->es_result_relation_info);
853 0 : PopActiveSnapshot();
854 :
855 : /* Handle queued AFTER triggers. */
856 0 : AfterTriggerEndQuery(estate);
857 :
858 0 : EvalPlanQualEnd(&epqstate);
859 0 : ExecResetTupleTable(estate->es_tupleTable, false);
860 0 : FreeExecutorState(estate);
861 :
862 0 : logicalrep_rel_close(rel, NoLock);
863 :
864 0 : CommandCounterIncrement();
865 : }
866 :
867 :
868 : /*
869 : * Logical replication protocol message dispatcher.
870 : */
871 : static void
872 0 : apply_dispatch(StringInfo s)
873 : {
874 0 : char action = pq_getmsgbyte(s);
875 :
876 0 : switch (action)
877 : {
878 : /* BEGIN */
879 : case 'B':
880 0 : apply_handle_begin(s);
881 0 : break;
882 : /* COMMIT */
883 : case 'C':
884 0 : apply_handle_commit(s);
885 0 : break;
886 : /* INSERT */
887 : case 'I':
888 0 : apply_handle_insert(s);
889 0 : break;
890 : /* UPDATE */
891 : case 'U':
892 0 : apply_handle_update(s);
893 0 : break;
894 : /* DELETE */
895 : case 'D':
896 0 : apply_handle_delete(s);
897 0 : break;
898 : /* RELATION */
899 : case 'R':
900 0 : apply_handle_relation(s);
901 0 : break;
902 : /* TYPE */
903 : case 'Y':
904 0 : apply_handle_type(s);
905 0 : break;
906 : /* ORIGIN */
907 : case 'O':
908 0 : apply_handle_origin(s);
909 0 : break;
910 : default:
911 0 : ereport(ERROR,
912 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
913 : errmsg("invalid logical replication message type %c", action)));
914 : }
915 0 : }
916 :
917 : /*
918 : * Figure out which write/flush positions to report to the walsender process.
919 : *
920 : * We can't simply report back the last LSN the walsender sent us because the
921 : * local transaction might not yet be flushed to disk locally. Instead we
922 : * build a list that associates local with remote LSNs for every commit. When
923 : * reporting back the flush position to the sender we iterate that list and
924 : * check which entries on it are already locally flushed. Those we can report
925 : * as having been flushed.
926 : *
927 : * The have_pending_txes is true if there are outstanding transactions that
928 : * need to be flushed.
929 : */
930 : static void
931 0 : get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
932 : bool *have_pending_txes)
933 : {
934 : dlist_mutable_iter iter;
935 0 : XLogRecPtr local_flush = GetFlushRecPtr();
936 :
937 0 : *write = InvalidXLogRecPtr;
938 0 : *flush = InvalidXLogRecPtr;
939 :
940 0 : dlist_foreach_modify(iter, &lsn_mapping)
941 : {
942 0 : FlushPosition *pos =
943 0 : dlist_container(FlushPosition, node, iter.cur);
944 :
945 0 : *write = pos->remote_end;
946 :
947 0 : if (pos->local_end <= local_flush)
948 : {
949 0 : *flush = pos->remote_end;
950 0 : dlist_delete(iter.cur);
951 0 : pfree(pos);
952 : }
953 : else
954 : {
955 : /*
956 : * Don't want to uselessly iterate over the rest of the list which
957 : * could potentially be long. Instead get the last element and
958 : * grab the write position from there.
959 : */
960 0 : pos = dlist_tail_element(FlushPosition, node,
961 : &lsn_mapping);
962 0 : *write = pos->remote_end;
963 0 : *have_pending_txes = true;
964 0 : return;
965 : }
966 : }
967 :
968 0 : *have_pending_txes = !dlist_is_empty(&lsn_mapping);
969 : }
970 :
971 : /*
972 : * Store current remote/local lsn pair in the tracking list.
973 : */
974 : static void
975 0 : store_flush_position(XLogRecPtr remote_lsn)
976 : {
977 : FlushPosition *flushpos;
978 :
979 : /* Need to do this in permanent context */
980 0 : MemoryContextSwitchTo(ApplyContext);
981 :
982 : /* Track commit lsn */
983 0 : flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
984 0 : flushpos->local_end = XactLastCommitEnd;
985 0 : flushpos->remote_end = remote_lsn;
986 :
987 0 : dlist_push_tail(&lsn_mapping, &flushpos->node);
988 0 : MemoryContextSwitchTo(ApplyMessageContext);
989 0 : }
990 :
991 :
992 : /* Update statistics of the worker. */
993 : static void
994 0 : UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
995 : {
996 0 : MyLogicalRepWorker->last_lsn = last_lsn;
997 0 : MyLogicalRepWorker->last_send_time = send_time;
998 0 : MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
999 0 : if (reply)
1000 : {
1001 0 : MyLogicalRepWorker->reply_lsn = last_lsn;
1002 0 : MyLogicalRepWorker->reply_time = send_time;
1003 : }
1004 0 : }
1005 :
1006 : /*
1007 : * Apply main loop.
1008 : */
1009 : static void
1010 0 : LogicalRepApplyLoop(XLogRecPtr last_received)
1011 : {
1012 : /*
1013 : * Init the ApplyMessageContext which we clean up after each replication
1014 : * protocol message.
1015 : */
1016 0 : ApplyMessageContext = AllocSetContextCreate(ApplyContext,
1017 : "ApplyMessageContext",
1018 : ALLOCSET_DEFAULT_SIZES);
1019 :
1020 : /* mark as idle, before starting to loop */
1021 0 : pgstat_report_activity(STATE_IDLE, NULL);
1022 :
1023 : for (;;)
1024 : {
1025 0 : pgsocket fd = PGINVALID_SOCKET;
1026 : int rc;
1027 : int len;
1028 0 : char *buf = NULL;
1029 0 : bool endofstream = false;
1030 0 : TimestampTz last_recv_timestamp = GetCurrentTimestamp();
1031 0 : bool ping_sent = false;
1032 : long wait_time;
1033 :
1034 0 : CHECK_FOR_INTERRUPTS();
1035 :
1036 0 : MemoryContextSwitchTo(ApplyMessageContext);
1037 :
1038 0 : len = walrcv_receive(wrconn, &buf, &fd);
1039 :
1040 0 : if (len != 0)
1041 : {
1042 : /* Process the data */
1043 : for (;;)
1044 : {
1045 0 : CHECK_FOR_INTERRUPTS();
1046 :
1047 0 : if (len == 0)
1048 : {
1049 0 : break;
1050 : }
1051 0 : else if (len < 0)
1052 : {
1053 0 : ereport(LOG,
1054 : (errmsg("data stream from publisher has ended")));
1055 0 : endofstream = true;
1056 0 : break;
1057 : }
1058 : else
1059 : {
1060 : int c;
1061 : StringInfoData s;
1062 :
1063 : /* Reset timeout. */
1064 0 : last_recv_timestamp = GetCurrentTimestamp();
1065 0 : ping_sent = false;
1066 :
1067 : /* Ensure we are reading the data into our memory context. */
1068 0 : MemoryContextSwitchTo(ApplyMessageContext);
1069 :
1070 0 : s.data = buf;
1071 0 : s.len = len;
1072 0 : s.cursor = 0;
1073 0 : s.maxlen = -1;
1074 :
1075 0 : c = pq_getmsgbyte(&s);
1076 :
1077 0 : if (c == 'w')
1078 : {
1079 : XLogRecPtr start_lsn;
1080 : XLogRecPtr end_lsn;
1081 : TimestampTz send_time;
1082 :
1083 0 : start_lsn = pq_getmsgint64(&s);
1084 0 : end_lsn = pq_getmsgint64(&s);
1085 0 : send_time = pq_getmsgint64(&s);
1086 :
1087 0 : if (last_received < start_lsn)
1088 0 : last_received = start_lsn;
1089 :
1090 0 : if (last_received < end_lsn)
1091 0 : last_received = end_lsn;
1092 :
1093 0 : UpdateWorkerStats(last_received, send_time, false);
1094 :
1095 0 : apply_dispatch(&s);
1096 : }
1097 0 : else if (c == 'k')
1098 : {
1099 : XLogRecPtr end_lsn;
1100 : TimestampTz timestamp;
1101 : bool reply_requested;
1102 :
1103 0 : end_lsn = pq_getmsgint64(&s);
1104 0 : timestamp = pq_getmsgint64(&s);
1105 0 : reply_requested = pq_getmsgbyte(&s);
1106 :
1107 0 : if (last_received < end_lsn)
1108 0 : last_received = end_lsn;
1109 :
1110 0 : send_feedback(last_received, reply_requested, false);
1111 0 : UpdateWorkerStats(last_received, timestamp, true);
1112 : }
1113 : /* other message types are purposefully ignored */
1114 :
1115 0 : MemoryContextReset(ApplyMessageContext);
1116 : }
1117 :
1118 0 : len = walrcv_receive(wrconn, &buf, &fd);
1119 0 : }
1120 : }
1121 :
1122 : /* confirm all writes so far */
1123 0 : send_feedback(last_received, false, false);
1124 :
1125 0 : if (!in_remote_transaction)
1126 : {
1127 : /*
1128 : * If we didn't get any transactions for a while there might be
1129 : * unconsumed invalidation messages in the queue, consume them
1130 : * now.
1131 : */
1132 0 : AcceptInvalidationMessages();
1133 0 : maybe_reread_subscription();
1134 :
1135 : /* Process any table synchronization changes. */
1136 0 : process_syncing_tables(last_received);
1137 : }
1138 :
1139 : /* Cleanup the memory. */
1140 0 : MemoryContextResetAndDeleteChildren(ApplyMessageContext);
1141 0 : MemoryContextSwitchTo(TopMemoryContext);
1142 :
1143 : /* Check if we need to exit the streaming loop. */
1144 0 : if (endofstream)
1145 : {
1146 : TimeLineID tli;
1147 :
1148 0 : walrcv_endstreaming(wrconn, &tli);
1149 0 : break;
1150 : }
1151 :
1152 : /*
1153 : * Wait for more data or latch. If we have unflushed transactions,
1154 : * wake up after WalWriterDelay to see if they've been flushed yet (in
1155 : * which case we should send a feedback message). Otherwise, there's
1156 : * no particular urgency about waking up unless we get data or a
1157 : * signal.
1158 : */
1159 0 : if (!dlist_is_empty(&lsn_mapping))
1160 0 : wait_time = WalWriterDelay;
1161 : else
1162 0 : wait_time = NAPTIME_PER_CYCLE;
1163 :
1164 0 : rc = WaitLatchOrSocket(MyLatch,
1165 : WL_SOCKET_READABLE | WL_LATCH_SET |
1166 : WL_TIMEOUT | WL_POSTMASTER_DEATH,
1167 : fd, wait_time,
1168 : WAIT_EVENT_LOGICAL_APPLY_MAIN);
1169 :
1170 : /* Emergency bailout if postmaster has died */
1171 0 : if (rc & WL_POSTMASTER_DEATH)
1172 0 : proc_exit(1);
1173 :
1174 0 : if (rc & WL_LATCH_SET)
1175 : {
1176 0 : ResetLatch(MyLatch);
1177 0 : CHECK_FOR_INTERRUPTS();
1178 : }
1179 :
1180 0 : if (got_SIGHUP)
1181 : {
1182 0 : got_SIGHUP = false;
1183 0 : ProcessConfigFile(PGC_SIGHUP);
1184 : }
1185 :
1186 0 : if (rc & WL_TIMEOUT)
1187 : {
1188 : /*
1189 : * We didn't receive anything new. If we haven't heard anything
1190 : * from the server for more than wal_receiver_timeout / 2, ping
1191 : * the server. Also, if it's been longer than
1192 : * wal_receiver_status_interval since the last update we sent,
1193 : * send a status update to the master anyway, to report any
1194 : * progress in applying WAL.
1195 : */
1196 0 : bool requestReply = false;
1197 :
1198 : /*
1199 : * Check if time since last receive from standby has reached the
1200 : * configured limit.
1201 : */
1202 0 : if (wal_receiver_timeout > 0)
1203 : {
1204 0 : TimestampTz now = GetCurrentTimestamp();
1205 : TimestampTz timeout;
1206 :
1207 0 : timeout =
1208 0 : TimestampTzPlusMilliseconds(last_recv_timestamp,
1209 : wal_receiver_timeout);
1210 :
1211 0 : if (now >= timeout)
1212 0 : ereport(ERROR,
1213 : (errmsg("terminating logical replication worker due to timeout")));
1214 :
1215 : /*
1216 : * We didn't receive anything new, for half of receiver
1217 : * replication timeout. Ping the server.
1218 : */
1219 0 : if (!ping_sent)
1220 : {
1221 0 : timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
1222 : (wal_receiver_timeout / 2));
1223 0 : if (now >= timeout)
1224 : {
1225 0 : requestReply = true;
1226 0 : ping_sent = true;
1227 : }
1228 : }
1229 : }
1230 :
1231 0 : send_feedback(last_received, requestReply, requestReply);
1232 : }
1233 0 : }
1234 0 : }
1235 :
1236 : /*
1237 : * Send a Standby Status Update message to server.
1238 : *
1239 : * 'recvpos' is the latest LSN we've received data to, force is set if we need
1240 : * to send a response to avoid timeouts.
1241 : */
1242 : static void
1243 0 : send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
1244 : {
1245 : static StringInfo reply_message = NULL;
1246 : static TimestampTz send_time = 0;
1247 :
1248 : static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
1249 : static XLogRecPtr last_writepos = InvalidXLogRecPtr;
1250 : static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
1251 :
1252 : XLogRecPtr writepos;
1253 : XLogRecPtr flushpos;
1254 : TimestampTz now;
1255 : bool have_pending_txes;
1256 :
1257 : /*
1258 : * If the user doesn't want status to be reported to the publisher, be
1259 : * sure to exit before doing anything at all.
1260 : */
1261 0 : if (!force && wal_receiver_status_interval <= 0)
1262 0 : return;
1263 :
1264 : /* It's legal to not pass a recvpos */
1265 0 : if (recvpos < last_recvpos)
1266 0 : recvpos = last_recvpos;
1267 :
1268 0 : get_flush_position(&writepos, &flushpos, &have_pending_txes);
1269 :
1270 : /*
1271 : * No outstanding transactions to flush, we can report the latest received
1272 : * position. This is important for synchronous replication.
1273 : */
1274 0 : if (!have_pending_txes)
1275 0 : flushpos = writepos = recvpos;
1276 :
1277 0 : if (writepos < last_writepos)
1278 0 : writepos = last_writepos;
1279 :
1280 0 : if (flushpos < last_flushpos)
1281 0 : flushpos = last_flushpos;
1282 :
1283 0 : now = GetCurrentTimestamp();
1284 :
1285 : /* if we've already reported everything we're good */
1286 0 : if (!force &&
1287 0 : writepos == last_writepos &&
1288 0 : flushpos == last_flushpos &&
1289 0 : !TimestampDifferenceExceeds(send_time, now,
1290 : wal_receiver_status_interval * 1000))
1291 0 : return;
1292 0 : send_time = now;
1293 :
1294 0 : if (!reply_message)
1295 : {
1296 0 : MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
1297 :
1298 0 : reply_message = makeStringInfo();
1299 0 : MemoryContextSwitchTo(oldctx);
1300 : }
1301 : else
1302 0 : resetStringInfo(reply_message);
1303 :
1304 0 : pq_sendbyte(reply_message, 'r');
1305 0 : pq_sendint64(reply_message, recvpos); /* write */
1306 0 : pq_sendint64(reply_message, flushpos); /* flush */
1307 0 : pq_sendint64(reply_message, writepos); /* apply */
1308 0 : pq_sendint64(reply_message, now); /* sendTime */
1309 0 : pq_sendbyte(reply_message, requestReply); /* replyRequested */
1310 :
1311 0 : elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
1312 : force,
1313 : (uint32) (recvpos >> 32), (uint32) recvpos,
1314 : (uint32) (writepos >> 32), (uint32) writepos,
1315 : (uint32) (flushpos >> 32), (uint32) flushpos
1316 : );
1317 :
1318 0 : walrcv_send(wrconn, reply_message->data, reply_message->len);
1319 :
1320 0 : if (recvpos > last_recvpos)
1321 0 : last_recvpos = recvpos;
1322 0 : if (writepos > last_writepos)
1323 0 : last_writepos = writepos;
1324 0 : if (flushpos > last_flushpos)
1325 0 : last_flushpos = flushpos;
1326 : }
1327 :
1328 : /*
1329 : * Reread subscription info if needed. Most changes will be exit.
1330 : */
1331 : static void
1332 0 : maybe_reread_subscription(void)
1333 : {
1334 : MemoryContext oldctx;
1335 : Subscription *newsub;
1336 0 : bool started_tx = false;
1337 :
1338 : /* When cache state is valid there is nothing to do here. */
1339 0 : if (MySubscriptionValid)
1340 0 : return;
1341 :
1342 : /* This function might be called inside or outside of transaction. */
1343 0 : if (!IsTransactionState())
1344 : {
1345 0 : StartTransactionCommand();
1346 0 : started_tx = true;
1347 : }
1348 :
1349 : /* Ensure allocations in permanent context. */
1350 0 : oldctx = MemoryContextSwitchTo(ApplyContext);
1351 :
1352 0 : newsub = GetSubscription(MyLogicalRepWorker->subid, true);
1353 :
1354 : /*
1355 : * Exit if the subscription was removed. This normally should not happen
1356 : * as the worker gets killed during DROP SUBSCRIPTION.
1357 : */
1358 0 : if (!newsub)
1359 : {
1360 0 : ereport(LOG,
1361 : (errmsg("logical replication apply worker for subscription \"%s\" will "
1362 : "stop because the subscription was removed",
1363 : MySubscription->name)));
1364 :
1365 0 : proc_exit(0);
1366 : }
1367 :
1368 : /*
1369 : * Exit if the subscription was disabled. This normally should not happen
1370 : * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
1371 : */
1372 0 : if (!newsub->enabled)
1373 : {
1374 0 : ereport(LOG,
1375 : (errmsg("logical replication apply worker for subscription \"%s\" will "
1376 : "stop because the subscription was disabled",
1377 : MySubscription->name)));
1378 :
1379 0 : proc_exit(0);
1380 : }
1381 :
1382 : /*
1383 : * Exit if connection string was changed. The launcher will start new
1384 : * worker.
1385 : */
1386 0 : if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
1387 : {
1388 0 : ereport(LOG,
1389 : (errmsg("logical replication apply worker for subscription \"%s\" will "
1390 : "restart because the connection information was changed",
1391 : MySubscription->name)));
1392 :
1393 0 : proc_exit(0);
1394 : }
1395 :
1396 : /*
1397 : * Exit if subscription name was changed (it's used for
1398 : * fallback_application_name). The launcher will start new worker.
1399 : */
1400 0 : if (strcmp(newsub->name, MySubscription->name) != 0)
1401 : {
1402 0 : ereport(LOG,
1403 : (errmsg("logical replication apply worker for subscription \"%s\" will "
1404 : "restart because subscription was renamed",
1405 : MySubscription->name)));
1406 :
1407 0 : proc_exit(0);
1408 : }
1409 :
1410 : /* !slotname should never happen when enabled is true. */
1411 0 : Assert(newsub->slotname);
1412 :
1413 : /*
1414 : * We need to make new connection to new slot if slot name has changed so
1415 : * exit here as well if that's the case.
1416 : */
1417 0 : if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
1418 : {
1419 0 : ereport(LOG,
1420 : (errmsg("logical replication apply worker for subscription \"%s\" will "
1421 : "restart because the replication slot name was changed",
1422 : MySubscription->name)));
1423 :
1424 0 : proc_exit(0);
1425 : }
1426 :
1427 : /*
1428 : * Exit if publication list was changed. The launcher will start new
1429 : * worker.
1430 : */
1431 0 : if (!equal(newsub->publications, MySubscription->publications))
1432 : {
1433 0 : ereport(LOG,
1434 : (errmsg("logical replication apply worker for subscription \"%s\" will "
1435 : "restart because subscription's publications were changed",
1436 : MySubscription->name)));
1437 :
1438 0 : proc_exit(0);
1439 : }
1440 :
1441 : /* Check for other changes that should never happen too. */
1442 0 : if (newsub->dbid != MySubscription->dbid)
1443 : {
1444 0 : elog(ERROR, "subscription %u changed unexpectedly",
1445 : MyLogicalRepWorker->subid);
1446 : }
1447 :
1448 : /* Clean old subscription info and switch to new one. */
1449 0 : FreeSubscription(MySubscription);
1450 0 : MySubscription = newsub;
1451 :
1452 0 : MemoryContextSwitchTo(oldctx);
1453 :
1454 : /* Change synchronous commit according to the user's wishes */
1455 0 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
1456 : PGC_BACKEND, PGC_S_OVERRIDE);
1457 :
1458 0 : if (started_tx)
1459 0 : CommitTransactionCommand();
1460 :
1461 0 : MySubscriptionValid = true;
1462 : }
1463 :
1464 : /*
1465 : * Callback from subscription syscache invalidation.
1466 : */
1467 : static void
1468 0 : subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
1469 : {
1470 0 : MySubscriptionValid = false;
1471 0 : }
1472 :
1473 : /* SIGHUP: set flag to reload configuration at next convenient time */
1474 : static void
1475 0 : logicalrep_worker_sighup(SIGNAL_ARGS)
1476 : {
1477 0 : int save_errno = errno;
1478 :
1479 0 : got_SIGHUP = true;
1480 :
1481 : /* Waken anything waiting on the process latch */
1482 0 : SetLatch(MyLatch);
1483 :
1484 0 : errno = save_errno;
1485 0 : }
1486 :
1487 : /* Logical Replication Apply worker entry point */
1488 : void
1489 0 : ApplyWorkerMain(Datum main_arg)
1490 : {
1491 0 : int worker_slot = DatumGetInt32(main_arg);
1492 : MemoryContext oldctx;
1493 : char originname[NAMEDATALEN];
1494 : XLogRecPtr origin_startpos;
1495 : char *myslotname;
1496 : WalRcvStreamOptions options;
1497 :
1498 : /* Attach to slot */
1499 0 : logicalrep_worker_attach(worker_slot);
1500 :
1501 : /* Setup signal handling */
1502 0 : pqsignal(SIGHUP, logicalrep_worker_sighup);
1503 0 : pqsignal(SIGTERM, die);
1504 0 : BackgroundWorkerUnblockSignals();
1505 :
1506 : /* Initialise stats to a sanish value */
1507 0 : MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
1508 0 : MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
1509 :
1510 : /* Load the libpq-specific functions */
1511 0 : load_file("libpqwalreceiver", false);
1512 :
1513 0 : Assert(CurrentResourceOwner == NULL);
1514 0 : CurrentResourceOwner = ResourceOwnerCreate(NULL,
1515 : "logical replication apply");
1516 :
1517 : /* Run as replica session replication role. */
1518 0 : SetConfigOption("session_replication_role", "replica",
1519 : PGC_SUSET, PGC_S_OVERRIDE);
1520 :
1521 : /* Connect to our database. */
1522 0 : BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
1523 0 : MyLogicalRepWorker->userid);
1524 :
1525 : /* Load the subscription into persistent memory context. */
1526 0 : ApplyContext = AllocSetContextCreate(TopMemoryContext,
1527 : "ApplyContext",
1528 : ALLOCSET_DEFAULT_SIZES);
1529 0 : StartTransactionCommand();
1530 0 : oldctx = MemoryContextSwitchTo(ApplyContext);
1531 0 : MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
1532 0 : MySubscriptionValid = true;
1533 0 : MemoryContextSwitchTo(oldctx);
1534 :
1535 : /* Setup synchronous commit according to the user's wishes */
1536 0 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
1537 : PGC_BACKEND, PGC_S_OVERRIDE);
1538 :
1539 0 : if (!MySubscription->enabled)
1540 : {
1541 0 : ereport(LOG,
1542 : (errmsg("logical replication apply worker for subscription \"%s\" will not "
1543 : "start because the subscription was disabled during startup",
1544 : MySubscription->name)));
1545 :
1546 0 : proc_exit(0);
1547 : }
1548 :
1549 : /* Keep us informed about subscription changes. */
1550 0 : CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
1551 : subscription_change_cb,
1552 : (Datum) 0);
1553 :
1554 0 : if (am_tablesync_worker())
1555 0 : ereport(LOG,
1556 : (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
1557 : MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
1558 : else
1559 0 : ereport(LOG,
1560 : (errmsg("logical replication apply worker for subscription \"%s\" has started",
1561 : MySubscription->name)));
1562 :
1563 0 : CommitTransactionCommand();
1564 :
1565 : /* Connect to the origin and start the replication. */
1566 0 : elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
1567 : MySubscription->conninfo);
1568 :
1569 0 : if (am_tablesync_worker())
1570 : {
1571 : char *syncslotname;
1572 :
1573 : /* This is table synchroniation worker, call initial sync. */
1574 0 : syncslotname = LogicalRepSyncTableStart(&origin_startpos);
1575 :
1576 : /* The slot name needs to be allocated in permanent memory context. */
1577 0 : oldctx = MemoryContextSwitchTo(ApplyContext);
1578 0 : myslotname = pstrdup(syncslotname);
1579 0 : MemoryContextSwitchTo(oldctx);
1580 :
1581 0 : pfree(syncslotname);
1582 : }
1583 : else
1584 : {
1585 : /* This is main apply worker */
1586 : RepOriginId originid;
1587 : TimeLineID startpointTLI;
1588 : char *err;
1589 : int server_version;
1590 :
1591 0 : myslotname = MySubscription->slotname;
1592 :
1593 : /*
1594 : * This shouldn't happen if the subscription is enabled, but guard
1595 : * against DDL bugs or manual catalog changes. (libpqwalreceiver will
1596 : * crash if slot is NULL.)
1597 : */
1598 0 : if (!myslotname)
1599 0 : ereport(ERROR,
1600 : (errmsg("subscription has no replication slot set")));
1601 :
1602 : /* Setup replication origin tracking. */
1603 0 : StartTransactionCommand();
1604 0 : snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
1605 0 : originid = replorigin_by_name(originname, true);
1606 0 : if (!OidIsValid(originid))
1607 0 : originid = replorigin_create(originname);
1608 0 : replorigin_session_setup(originid);
1609 0 : replorigin_session_origin = originid;
1610 0 : origin_startpos = replorigin_session_get_progress(false);
1611 0 : CommitTransactionCommand();
1612 :
1613 0 : wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
1614 : &err);
1615 0 : if (wrconn == NULL)
1616 0 : ereport(ERROR,
1617 : (errmsg("could not connect to the publisher: %s", err)));
1618 :
1619 : /*
1620 : * We don't really use the output identify_system for anything but it
1621 : * does some initializations on the upstream so let's still call it.
1622 : */
1623 0 : (void) walrcv_identify_system(wrconn, &startpointTLI,
1624 : &server_version);
1625 :
1626 : }
1627 :
1628 : /*
1629 : * Setup callback for syscache so that we know when something changes in
1630 : * the subscription relation state.
1631 : */
1632 0 : CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
1633 : invalidate_syncing_table_states,
1634 : (Datum) 0);
1635 :
1636 : /* Build logical replication streaming options. */
1637 0 : options.logical = true;
1638 0 : options.startpoint = origin_startpos;
1639 0 : options.slotname = myslotname;
1640 0 : options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
1641 0 : options.proto.logical.publication_names = MySubscription->publications;
1642 :
1643 : /* Start normal logical streaming replication. */
1644 0 : walrcv_startstreaming(wrconn, &options);
1645 :
1646 : /* Run the main loop. */
1647 0 : LogicalRepApplyLoop(origin_startpos);
1648 :
1649 0 : proc_exit(0);
1650 : }
1651 :
1652 : /*
1653 : * Is current process a logical replication worker?
1654 : */
1655 : bool
1656 2 : IsLogicalWorker(void)
1657 : {
1658 2 : return MyLogicalRepWorker != NULL;
1659 : }
|