LCOV - code coverage report
Current view: top level - src/backend/replication/logical - worker.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 2 551 0.4 %
Date: 2017-09-29 13:40:31 Functions: 1 28 3.6 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * worker.c
       3             :  *     PostgreSQL logical replication worker (apply)
       4             :  *
       5             :  * Copyright (c) 2016-2017, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/worker.c
       9             :  *
      10             :  * NOTES
      11             :  *    This file contains the worker which applies logical changes as they come
      12             :  *    from remote logical replication stream.
      13             :  *
      14             :  *    The main worker (apply) is started by logical replication worker
      15             :  *    launcher for every enabled subscription in a database. It uses
      16             :  *    walsender protocol to communicate with publisher.
      17             :  *
      18             :  *    This module includes server facing code and shares libpqwalreceiver
      19             :  *    module with walreceiver for providing the libpq specific functionality.
      20             :  *
      21             :  *-------------------------------------------------------------------------
      22             :  */
      23             : 
      24             : #include "postgres.h"
      25             : 
      26             : #include "miscadmin.h"
      27             : #include "pgstat.h"
      28             : #include "funcapi.h"
      29             : 
      30             : #include "access/xact.h"
      31             : #include "access/xlog_internal.h"
      32             : 
      33             : #include "catalog/namespace.h"
      34             : #include "catalog/pg_subscription.h"
      35             : #include "catalog/pg_subscription_rel.h"
      36             : 
      37             : #include "commands/trigger.h"
      38             : 
      39             : #include "executor/executor.h"
      40             : #include "executor/nodeModifyTable.h"
      41             : 
      42             : #include "libpq/pqformat.h"
      43             : #include "libpq/pqsignal.h"
      44             : 
      45             : #include "mb/pg_wchar.h"
      46             : 
      47             : #include "nodes/makefuncs.h"
      48             : 
      49             : #include "optimizer/planner.h"
      50             : 
      51             : #include "parser/parse_relation.h"
      52             : 
      53             : #include "postmaster/bgworker.h"
      54             : #include "postmaster/postmaster.h"
      55             : #include "postmaster/walwriter.h"
      56             : 
      57             : #include "replication/decode.h"
      58             : #include "replication/logical.h"
      59             : #include "replication/logicalproto.h"
      60             : #include "replication/logicalrelation.h"
      61             : #include "replication/logicalworker.h"
      62             : #include "replication/reorderbuffer.h"
      63             : #include "replication/origin.h"
      64             : #include "replication/snapbuild.h"
      65             : #include "replication/walreceiver.h"
      66             : #include "replication/worker_internal.h"
      67             : 
      68             : #include "rewrite/rewriteHandler.h"
      69             : 
      70             : #include "storage/bufmgr.h"
      71             : #include "storage/ipc.h"
      72             : #include "storage/lmgr.h"
      73             : #include "storage/proc.h"
      74             : #include "storage/procarray.h"
      75             : 
      76             : #include "tcop/tcopprot.h"
      77             : 
      78             : #include "utils/builtins.h"
      79             : #include "utils/catcache.h"
      80             : #include "utils/datum.h"
      81             : #include "utils/fmgroids.h"
      82             : #include "utils/guc.h"
      83             : #include "utils/inval.h"
      84             : #include "utils/lsyscache.h"
      85             : #include "utils/memutils.h"
      86             : #include "utils/timeout.h"
      87             : #include "utils/tqual.h"
      88             : #include "utils/syscache.h"
      89             : 
      90             : #define NAPTIME_PER_CYCLE 1000  /* max sleep time between cycles (1s) */
      91             : 
      92             : typedef struct FlushPosition
      93             : {
      94             :     dlist_node  node;
      95             :     XLogRecPtr  local_end;
      96             :     XLogRecPtr  remote_end;
      97             : } FlushPosition;
      98             : 
      99             : static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
     100             : 
     101             : typedef struct SlotErrCallbackArg
     102             : {
     103             :     LogicalRepRelation *rel;
     104             :     int         attnum;
     105             : } SlotErrCallbackArg;
     106             : 
     107             : static MemoryContext ApplyMessageContext = NULL;
     108             : MemoryContext ApplyContext = NULL;
     109             : 
     110             : WalReceiverConn *wrconn = NULL;
     111             : 
     112             : Subscription *MySubscription = NULL;
     113             : bool        MySubscriptionValid = false;
     114             : 
     115             : bool        in_remote_transaction = false;
     116             : static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
     117             : 
     118             : static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
     119             : 
     120             : static void store_flush_position(XLogRecPtr remote_lsn);
     121             : 
     122             : static void maybe_reread_subscription(void);
     123             : 
     124             : /* Flags set by signal handlers */
     125             : static volatile sig_atomic_t got_SIGHUP = false;
     126             : 
     127             : /*
     128             :  * Should this worker apply changes for given relation.
     129             :  *
     130             :  * This is mainly needed for initial relation data sync as that runs in
     131             :  * separate worker process running in parallel and we need some way to skip
     132             :  * changes coming to the main apply worker during the sync of a table.
     133             :  *
     134             :  * Note we need to do smaller or equals comparison for SYNCDONE state because
     135             :  * it might hold position of end of initial slot consistent point WAL
     136             :  * record + 1 (ie start of next record) and next record can be COMMIT of
     137             :  * transaction we are now processing (which is what we set remote_final_lsn
     138             :  * to in apply_handle_begin).
     139             :  */
     140             : static bool
     141           0 : should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
     142             : {
     143           0 :     if (am_tablesync_worker())
     144           0 :         return MyLogicalRepWorker->relid == rel->localreloid;
     145             :     else
     146           0 :         return (rel->state == SUBREL_STATE_READY ||
     147           0 :                 (rel->state == SUBREL_STATE_SYNCDONE &&
     148           0 :                  rel->statelsn <= remote_final_lsn));
     149             : }
     150             : 
     151             : /*
     152             :  * Make sure that we started local transaction.
     153             :  *
     154             :  * Also switches to ApplyMessageContext as necessary.
     155             :  */
     156             : static bool
     157           0 : ensure_transaction(void)
     158             : {
     159           0 :     if (IsTransactionState())
     160             :     {
     161           0 :         SetCurrentStatementStartTimestamp();
     162             : 
     163           0 :         if (CurrentMemoryContext != ApplyMessageContext)
     164           0 :             MemoryContextSwitchTo(ApplyMessageContext);
     165             : 
     166           0 :         return false;
     167             :     }
     168             : 
     169           0 :     SetCurrentStatementStartTimestamp();
     170           0 :     StartTransactionCommand();
     171             : 
     172           0 :     maybe_reread_subscription();
     173             : 
     174           0 :     MemoryContextSwitchTo(ApplyMessageContext);
     175           0 :     return true;
     176             : }
     177             : 
     178             : 
     179             : /*
     180             :  * Executor state preparation for evaluation of constraint expressions,
     181             :  * indexes and triggers.
     182             :  *
     183             :  * This is based on similar code in copy.c
     184             :  */
     185             : static EState *
     186           0 : create_estate_for_relation(LogicalRepRelMapEntry *rel)
     187             : {
     188             :     EState     *estate;
     189             :     ResultRelInfo *resultRelInfo;
     190             :     RangeTblEntry *rte;
     191             : 
     192           0 :     estate = CreateExecutorState();
     193             : 
     194           0 :     rte = makeNode(RangeTblEntry);
     195           0 :     rte->rtekind = RTE_RELATION;
     196           0 :     rte->relid = RelationGetRelid(rel->localrel);
     197           0 :     rte->relkind = rel->localrel->rd_rel->relkind;
     198           0 :     estate->es_range_table = list_make1(rte);
     199             : 
     200           0 :     resultRelInfo = makeNode(ResultRelInfo);
     201           0 :     InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
     202             : 
     203           0 :     estate->es_result_relations = resultRelInfo;
     204           0 :     estate->es_num_result_relations = 1;
     205           0 :     estate->es_result_relation_info = resultRelInfo;
     206             : 
     207             :     /* Triggers might need a slot */
     208           0 :     if (resultRelInfo->ri_TrigDesc)
     209           0 :         estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
     210             : 
     211             :     /* Prepare to catch AFTER triggers. */
     212           0 :     AfterTriggerBeginQuery();
     213             : 
     214           0 :     return estate;
     215             : }
     216             : 
     217             : /*
     218             :  * Executes default values for columns for which we can't map to remote
     219             :  * relation columns.
     220             :  *
     221             :  * This allows us to support tables which have more columns on the downstream
     222             :  * than on the upstream.
     223             :  */
     224             : static void
     225           0 : slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
     226             :                    TupleTableSlot *slot)
     227             : {
     228           0 :     TupleDesc   desc = RelationGetDescr(rel->localrel);
     229           0 :     int         num_phys_attrs = desc->natts;
     230             :     int         i;
     231             :     int         attnum,
     232           0 :                 num_defaults = 0;
     233             :     int        *defmap;
     234             :     ExprState **defexprs;
     235             :     ExprContext *econtext;
     236             : 
     237           0 :     econtext = GetPerTupleExprContext(estate);
     238             : 
     239             :     /* We got all the data via replication, no need to evaluate anything. */
     240           0 :     if (num_phys_attrs == rel->remoterel.natts)
     241           0 :         return;
     242             : 
     243           0 :     defmap = (int *) palloc(num_phys_attrs * sizeof(int));
     244           0 :     defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
     245             : 
     246           0 :     for (attnum = 0; attnum < num_phys_attrs; attnum++)
     247             :     {
     248             :         Expr       *defexpr;
     249             : 
     250           0 :         if (TupleDescAttr(desc, attnum)->attisdropped)
     251           0 :             continue;
     252             : 
     253           0 :         if (rel->attrmap[attnum] >= 0)
     254           0 :             continue;
     255             : 
     256           0 :         defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
     257             : 
     258           0 :         if (defexpr != NULL)
     259             :         {
     260             :             /* Run the expression through planner */
     261           0 :             defexpr = expression_planner(defexpr);
     262             : 
     263             :             /* Initialize executable expression in copycontext */
     264           0 :             defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
     265           0 :             defmap[num_defaults] = attnum;
     266           0 :             num_defaults++;
     267             :         }
     268             : 
     269             :     }
     270             : 
     271           0 :     for (i = 0; i < num_defaults; i++)
     272           0 :         slot->tts_values[defmap[i]] =
     273           0 :             ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
     274             : }
     275             : 
     276             : /*
     277             :  * Error callback to give more context info about type conversion failure.
     278             :  */
     279             : static void
     280           0 : slot_store_error_callback(void *arg)
     281             : {
     282           0 :     SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
     283             :     Oid         remotetypoid,
     284             :                 localtypoid;
     285             : 
     286           0 :     if (errarg->attnum < 0)
     287           0 :         return;
     288             : 
     289           0 :     remotetypoid = errarg->rel->atttyps[errarg->attnum];
     290           0 :     localtypoid = logicalrep_typmap_getid(remotetypoid);
     291           0 :     errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
     292             :                "remote type %s, local type %s",
     293           0 :                errarg->rel->nspname, errarg->rel->relname,
     294           0 :                errarg->rel->attnames[errarg->attnum],
     295             :                format_type_be(remotetypoid),
     296             :                format_type_be(localtypoid));
     297             : }
     298             : 
     299             : /*
     300             :  * Store data in C string form into slot.
     301             :  * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
     302             :  * use better.
     303             :  */
     304             : static void
     305           0 : slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
     306             :                     char **values)
     307             : {
     308           0 :     int         natts = slot->tts_tupleDescriptor->natts;
     309             :     int         i;
     310             :     SlotErrCallbackArg errarg;
     311             :     ErrorContextCallback errcallback;
     312             : 
     313           0 :     ExecClearTuple(slot);
     314             : 
     315             :     /* Push callback + info on the error context stack */
     316           0 :     errarg.rel = &rel->remoterel;
     317           0 :     errarg.attnum = -1;
     318           0 :     errcallback.callback = slot_store_error_callback;
     319           0 :     errcallback.arg = (void *) &errarg;
     320           0 :     errcallback.previous = error_context_stack;
     321           0 :     error_context_stack = &errcallback;
     322             : 
     323             :     /* Call the "in" function for each non-dropped attribute */
     324           0 :     for (i = 0; i < natts; i++)
     325             :     {
     326           0 :         Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
     327           0 :         int         remoteattnum = rel->attrmap[i];
     328             : 
     329           0 :         if (!att->attisdropped && remoteattnum >= 0 &&
     330           0 :             values[remoteattnum] != NULL)
     331           0 :         {
     332             :             Oid         typinput;
     333             :             Oid         typioparam;
     334             : 
     335           0 :             errarg.attnum = remoteattnum;
     336             : 
     337           0 :             getTypeInputInfo(att->atttypid, &typinput, &typioparam);
     338           0 :             slot->tts_values[i] = OidInputFunctionCall(typinput,
     339           0 :                                                        values[remoteattnum],
     340             :                                                        typioparam,
     341             :                                                        att->atttypmod);
     342           0 :             slot->tts_isnull[i] = false;
     343             :         }
     344             :         else
     345             :         {
     346             :             /*
     347             :              * We assign NULL to dropped attributes, NULL values, and missing
     348             :              * values (missing values should be later filled using
     349             :              * slot_fill_defaults).
     350             :              */
     351           0 :             slot->tts_values[i] = (Datum) 0;
     352           0 :             slot->tts_isnull[i] = true;
     353             :         }
     354             :     }
     355             : 
     356             :     /* Pop the error context stack */
     357           0 :     error_context_stack = errcallback.previous;
     358             : 
     359           0 :     ExecStoreVirtualTuple(slot);
     360           0 : }
     361             : 
     362             : /*
     363             :  * Modify slot with user data provided as C strings.
     364             :  * This is somewhat similar to heap_modify_tuple but also calls the type
     365             :  * input function on the user data as the input is the text representation
     366             :  * of the types.
     367             :  */
     368             : static void
     369           0 : slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
     370             :                      char **values, bool *replaces)
     371             : {
     372           0 :     int         natts = slot->tts_tupleDescriptor->natts;
     373             :     int         i;
     374             :     SlotErrCallbackArg errarg;
     375             :     ErrorContextCallback errcallback;
     376             : 
     377           0 :     slot_getallattrs(slot);
     378           0 :     ExecClearTuple(slot);
     379             : 
     380             :     /* Push callback + info on the error context stack */
     381           0 :     errarg.rel = &rel->remoterel;
     382           0 :     errarg.attnum = -1;
     383           0 :     errcallback.callback = slot_store_error_callback;
     384           0 :     errcallback.arg = (void *) &errarg;
     385           0 :     errcallback.previous = error_context_stack;
     386           0 :     error_context_stack = &errcallback;
     387             : 
     388             :     /* Call the "in" function for each replaced attribute */
     389           0 :     for (i = 0; i < natts; i++)
     390             :     {
     391           0 :         Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
     392           0 :         int         remoteattnum = rel->attrmap[i];
     393             : 
     394           0 :         if (remoteattnum >= 0 && !replaces[remoteattnum])
     395           0 :             continue;
     396             : 
     397           0 :         if (remoteattnum >= 0 && values[remoteattnum] != NULL)
     398           0 :         {
     399             :             Oid         typinput;
     400             :             Oid         typioparam;
     401             : 
     402           0 :             errarg.attnum = remoteattnum;
     403             : 
     404           0 :             getTypeInputInfo(att->atttypid, &typinput, &typioparam);
     405           0 :             slot->tts_values[i] = OidInputFunctionCall(typinput,
     406           0 :                                                        values[remoteattnum],
     407             :                                                        typioparam,
     408             :                                                        att->atttypmod);
     409           0 :             slot->tts_isnull[i] = false;
     410             :         }
     411             :         else
     412             :         {
     413           0 :             slot->tts_values[i] = (Datum) 0;
     414           0 :             slot->tts_isnull[i] = true;
     415             :         }
     416             :     }
     417             : 
     418             :     /* Pop the error context stack */
     419           0 :     error_context_stack = errcallback.previous;
     420             : 
     421           0 :     ExecStoreVirtualTuple(slot);
     422           0 : }
     423             : 
     424             : /*
     425             :  * Handle BEGIN message.
     426             :  */
     427             : static void
     428           0 : apply_handle_begin(StringInfo s)
     429             : {
     430             :     LogicalRepBeginData begin_data;
     431             : 
     432           0 :     logicalrep_read_begin(s, &begin_data);
     433             : 
     434           0 :     remote_final_lsn = begin_data.final_lsn;
     435             : 
     436           0 :     in_remote_transaction = true;
     437             : 
     438           0 :     pgstat_report_activity(STATE_RUNNING, NULL);
     439           0 : }
     440             : 
     441             : /*
     442             :  * Handle COMMIT message.
     443             :  *
     444             :  * TODO, support tracking of multiple origins
     445             :  */
     446             : static void
     447           0 : apply_handle_commit(StringInfo s)
     448             : {
     449             :     LogicalRepCommitData commit_data;
     450             : 
     451           0 :     logicalrep_read_commit(s, &commit_data);
     452             : 
     453           0 :     Assert(commit_data.commit_lsn == remote_final_lsn);
     454             : 
     455             :     /* The synchronization worker runs in single transaction. */
     456           0 :     if (IsTransactionState() && !am_tablesync_worker())
     457             :     {
     458             :         /*
     459             :          * Update origin state so we can restart streaming from correct
     460             :          * position in case of crash.
     461             :          */
     462           0 :         replorigin_session_origin_lsn = commit_data.end_lsn;
     463           0 :         replorigin_session_origin_timestamp = commit_data.committime;
     464             : 
     465           0 :         CommitTransactionCommand();
     466           0 :         pgstat_report_stat(false);
     467             : 
     468           0 :         store_flush_position(commit_data.end_lsn);
     469             :     }
     470             :     else
     471             :     {
     472             :         /* Process any invalidation messages that might have accumulated. */
     473           0 :         AcceptInvalidationMessages();
     474           0 :         maybe_reread_subscription();
     475             :     }
     476             : 
     477           0 :     in_remote_transaction = false;
     478             : 
     479             :     /* Process any tables that are being synchronized in parallel. */
     480           0 :     process_syncing_tables(commit_data.end_lsn);
     481             : 
     482           0 :     pgstat_report_activity(STATE_IDLE, NULL);
     483           0 : }
     484             : 
     485             : /*
     486             :  * Handle ORIGIN message.
     487             :  *
     488             :  * TODO, support tracking of multiple origins
     489             :  */
     490             : static void
     491           0 : apply_handle_origin(StringInfo s)
     492             : {
     493             :     /*
     494             :      * ORIGIN message can only come inside remote transaction and before any
     495             :      * actual writes.
     496             :      */
     497           0 :     if (!in_remote_transaction ||
     498           0 :         (IsTransactionState() && !am_tablesync_worker()))
     499           0 :         ereport(ERROR,
     500             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     501             :                  errmsg("ORIGIN message sent out of order")));
     502           0 : }
     503             : 
     504             : /*
     505             :  * Handle RELATION message.
     506             :  *
     507             :  * Note we don't do validation against local schema here. The validation
     508             :  * against local schema is postponed until first change for given relation
     509             :  * comes as we only care about it when applying changes for it anyway and we
     510             :  * do less locking this way.
     511             :  */
     512             : static void
     513           0 : apply_handle_relation(StringInfo s)
     514             : {
     515             :     LogicalRepRelation *rel;
     516             : 
     517           0 :     rel = logicalrep_read_rel(s);
     518           0 :     logicalrep_relmap_update(rel);
     519           0 : }
     520             : 
     521             : /*
     522             :  * Handle TYPE message.
     523             :  *
     524             :  * Note we don't do local mapping here, that's done when the type is
     525             :  * actually used.
     526             :  */
     527             : static void
     528           0 : apply_handle_type(StringInfo s)
     529             : {
     530             :     LogicalRepTyp typ;
     531             : 
     532           0 :     logicalrep_read_typ(s, &typ);
     533           0 :     logicalrep_typmap_update(&typ);
     534           0 : }
     535             : 
     536             : /*
     537             :  * Get replica identity index or if it is not defined a primary key.
     538             :  *
     539             :  * If neither is defined, returns InvalidOid
     540             :  */
     541             : static Oid
     542           0 : GetRelationIdentityOrPK(Relation rel)
     543             : {
     544             :     Oid         idxoid;
     545             : 
     546           0 :     idxoid = RelationGetReplicaIndex(rel);
     547             : 
     548           0 :     if (!OidIsValid(idxoid))
     549           0 :         idxoid = RelationGetPrimaryKeyIndex(rel);
     550             : 
     551           0 :     return idxoid;
     552             : }
     553             : 
     554             : /*
     555             :  * Handle INSERT message.
     556             :  */
     557             : static void
     558           0 : apply_handle_insert(StringInfo s)
     559             : {
     560             :     LogicalRepRelMapEntry *rel;
     561             :     LogicalRepTupleData newtup;
     562             :     LogicalRepRelId relid;
     563             :     EState     *estate;
     564             :     TupleTableSlot *remoteslot;
     565             :     MemoryContext oldctx;
     566             : 
     567           0 :     ensure_transaction();
     568             : 
     569           0 :     relid = logicalrep_read_insert(s, &newtup);
     570           0 :     rel = logicalrep_rel_open(relid, RowExclusiveLock);
     571           0 :     if (!should_apply_changes_for_rel(rel))
     572             :     {
     573             :         /*
     574             :          * The relation can't become interesting in the middle of the
     575             :          * transaction so it's safe to unlock it.
     576             :          */
     577           0 :         logicalrep_rel_close(rel, RowExclusiveLock);
     578           0 :         return;
     579             :     }
     580             : 
     581             :     /* Initialize the executor state. */
     582           0 :     estate = create_estate_for_relation(rel);
     583           0 :     remoteslot = ExecInitExtraTupleSlot(estate);
     584           0 :     ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->localrel));
     585             : 
     586             :     /* Process and store remote tuple in the slot */
     587           0 :     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
     588           0 :     slot_store_cstrings(remoteslot, rel, newtup.values);
     589           0 :     slot_fill_defaults(rel, estate, remoteslot);
     590           0 :     MemoryContextSwitchTo(oldctx);
     591             : 
     592           0 :     PushActiveSnapshot(GetTransactionSnapshot());
     593           0 :     ExecOpenIndices(estate->es_result_relation_info, false);
     594             : 
     595             :     /* Do the insert. */
     596           0 :     ExecSimpleRelationInsert(estate, remoteslot);
     597             : 
     598             :     /* Cleanup. */
     599           0 :     ExecCloseIndices(estate->es_result_relation_info);
     600           0 :     PopActiveSnapshot();
     601             : 
     602             :     /* Handle queued AFTER triggers. */
     603           0 :     AfterTriggerEndQuery(estate);
     604             : 
     605           0 :     ExecResetTupleTable(estate->es_tupleTable, false);
     606           0 :     FreeExecutorState(estate);
     607             : 
     608           0 :     logicalrep_rel_close(rel, NoLock);
     609             : 
     610           0 :     CommandCounterIncrement();
     611             : }
     612             : 
     613             : /*
     614             :  * Check if the logical replication relation is updatable and throw
     615             :  * appropriate error if it isn't.
     616             :  */
     617             : static void
     618           0 : check_relation_updatable(LogicalRepRelMapEntry *rel)
     619             : {
     620             :     /* Updatable, no error. */
     621           0 :     if (rel->updatable)
     622           0 :         return;
     623             : 
     624             :     /*
     625             :      * We are in error mode so it's fine this is somewhat slow. It's better to
     626             :      * give user correct error.
     627             :      */
     628           0 :     if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
     629             :     {
     630           0 :         ereport(ERROR,
     631             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     632             :                  errmsg("publisher does not send replica identity column "
     633             :                         "expected by the logical replication target relation \"%s.%s\"",
     634             :                         rel->remoterel.nspname, rel->remoterel.relname)));
     635             :     }
     636             : 
     637           0 :     ereport(ERROR,
     638             :             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     639             :              errmsg("logical replication target relation \"%s.%s\" has "
     640             :                     "neither REPLICA IDENTITY index nor PRIMARY "
     641             :                     "KEY and published relation does not have "
     642             :                     "REPLICA IDENTITY FULL",
     643             :                     rel->remoterel.nspname, rel->remoterel.relname)));
     644             : }
     645             : 
     646             : /*
     647             :  * Handle UPDATE message.
     648             :  *
     649             :  * TODO: FDW support
     650             :  */
     651             : static void
     652           0 : apply_handle_update(StringInfo s)
     653             : {
     654             :     LogicalRepRelMapEntry *rel;
     655             :     LogicalRepRelId relid;
     656             :     Oid         idxoid;
     657             :     EState     *estate;
     658             :     EPQState    epqstate;
     659             :     LogicalRepTupleData oldtup;
     660             :     LogicalRepTupleData newtup;
     661             :     bool        has_oldtup;
     662             :     TupleTableSlot *localslot;
     663             :     TupleTableSlot *remoteslot;
     664             :     bool        found;
     665             :     MemoryContext oldctx;
     666             : 
     667           0 :     ensure_transaction();
     668             : 
     669           0 :     relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
     670             :                                    &newtup);
     671           0 :     rel = logicalrep_rel_open(relid, RowExclusiveLock);
     672           0 :     if (!should_apply_changes_for_rel(rel))
     673             :     {
     674             :         /*
     675             :          * The relation can't become interesting in the middle of the
     676             :          * transaction so it's safe to unlock it.
     677             :          */
     678           0 :         logicalrep_rel_close(rel, RowExclusiveLock);
     679           0 :         return;
     680             :     }
     681             : 
     682             :     /* Check if we can do the update. */
     683           0 :     check_relation_updatable(rel);
     684             : 
     685             :     /* Initialize the executor state. */
     686           0 :     estate = create_estate_for_relation(rel);
     687           0 :     remoteslot = ExecInitExtraTupleSlot(estate);
     688           0 :     ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->localrel));
     689           0 :     localslot = ExecInitExtraTupleSlot(estate);
     690           0 :     ExecSetSlotDescriptor(localslot, RelationGetDescr(rel->localrel));
     691           0 :     EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
     692             : 
     693           0 :     PushActiveSnapshot(GetTransactionSnapshot());
     694           0 :     ExecOpenIndices(estate->es_result_relation_info, false);
     695             : 
     696             :     /* Build the search tuple. */
     697           0 :     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
     698           0 :     slot_store_cstrings(remoteslot, rel,
     699           0 :                         has_oldtup ? oldtup.values : newtup.values);
     700           0 :     MemoryContextSwitchTo(oldctx);
     701             : 
     702             :     /*
     703             :      * Try to find tuple using either replica identity index, primary key or
     704             :      * if needed, sequential scan.
     705             :      */
     706           0 :     idxoid = GetRelationIdentityOrPK(rel->localrel);
     707           0 :     Assert(OidIsValid(idxoid) ||
     708             :            (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup));
     709             : 
     710           0 :     if (OidIsValid(idxoid))
     711           0 :         found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
     712             :                                              LockTupleExclusive,
     713             :                                              remoteslot, localslot);
     714             :     else
     715           0 :         found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive,
     716             :                                          remoteslot, localslot);
     717             : 
     718           0 :     ExecClearTuple(remoteslot);
     719             : 
     720             :     /*
     721             :      * Tuple found.
     722             :      *
     723             :      * Note this will fail if there are other conflicting unique indexes.
     724             :      */
     725           0 :     if (found)
     726             :     {
     727             :         /* Process and store remote tuple in the slot */
     728           0 :         oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
     729           0 :         ExecStoreTuple(localslot->tts_tuple, remoteslot, InvalidBuffer, false);
     730           0 :         slot_modify_cstrings(remoteslot, rel, newtup.values, newtup.changed);
     731           0 :         MemoryContextSwitchTo(oldctx);
     732             : 
     733           0 :         EvalPlanQualSetSlot(&epqstate, remoteslot);
     734             : 
     735             :         /* Do the actual update. */
     736           0 :         ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
     737             :     }
     738             :     else
     739             :     {
     740             :         /*
     741             :          * The tuple to be updated could not be found.
     742             :          *
     743             :          * TODO what to do here, change the log level to LOG perhaps?
     744             :          */
     745           0 :         elog(DEBUG1,
     746             :              "logical replication did not find row for update "
     747             :              "in replication target relation \"%s\"",
     748             :              RelationGetRelationName(rel->localrel));
     749             :     }
     750             : 
     751             :     /* Cleanup. */
     752           0 :     ExecCloseIndices(estate->es_result_relation_info);
     753           0 :     PopActiveSnapshot();
     754             : 
     755             :     /* Handle queued AFTER triggers. */
     756           0 :     AfterTriggerEndQuery(estate);
     757             : 
     758           0 :     EvalPlanQualEnd(&epqstate);
     759           0 :     ExecResetTupleTable(estate->es_tupleTable, false);
     760           0 :     FreeExecutorState(estate);
     761             : 
     762           0 :     logicalrep_rel_close(rel, NoLock);
     763             : 
     764           0 :     CommandCounterIncrement();
     765             : }
     766             : 
     767             : /*
     768             :  * Handle DELETE message.
     769             :  *
     770             :  * TODO: FDW support
     771             :  */
     772             : static void
     773           0 : apply_handle_delete(StringInfo s)
     774             : {
     775             :     LogicalRepRelMapEntry *rel;
     776             :     LogicalRepTupleData oldtup;
     777             :     LogicalRepRelId relid;
     778             :     Oid         idxoid;
     779             :     EState     *estate;
     780             :     EPQState    epqstate;
     781             :     TupleTableSlot *remoteslot;
     782             :     TupleTableSlot *localslot;
     783             :     bool        found;
     784             :     MemoryContext oldctx;
     785             : 
     786           0 :     ensure_transaction();
     787             : 
     788           0 :     relid = logicalrep_read_delete(s, &oldtup);
     789           0 :     rel = logicalrep_rel_open(relid, RowExclusiveLock);
     790           0 :     if (!should_apply_changes_for_rel(rel))
     791             :     {
     792             :         /*
     793             :          * The relation can't become interesting in the middle of the
     794             :          * transaction so it's safe to unlock it.
     795             :          */
     796           0 :         logicalrep_rel_close(rel, RowExclusiveLock);
     797           0 :         return;
     798             :     }
     799             : 
     800             :     /* Check if we can do the delete. */
     801           0 :     check_relation_updatable(rel);
     802             : 
     803             :     /* Initialize the executor state. */
     804           0 :     estate = create_estate_for_relation(rel);
     805           0 :     remoteslot = ExecInitExtraTupleSlot(estate);
     806           0 :     ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->localrel));
     807           0 :     localslot = ExecInitExtraTupleSlot(estate);
     808           0 :     ExecSetSlotDescriptor(localslot, RelationGetDescr(rel->localrel));
     809           0 :     EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
     810             : 
     811           0 :     PushActiveSnapshot(GetTransactionSnapshot());
     812           0 :     ExecOpenIndices(estate->es_result_relation_info, false);
     813             : 
     814             :     /* Find the tuple using the replica identity index. */
     815           0 :     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
     816           0 :     slot_store_cstrings(remoteslot, rel, oldtup.values);
     817           0 :     MemoryContextSwitchTo(oldctx);
     818             : 
     819             :     /*
     820             :      * Try to find tuple using either replica identity index, primary key or
     821             :      * if needed, sequential scan.
     822             :      */
     823           0 :     idxoid = GetRelationIdentityOrPK(rel->localrel);
     824           0 :     Assert(OidIsValid(idxoid) ||
     825             :            (rel->remoterel.replident == REPLICA_IDENTITY_FULL));
     826             : 
     827           0 :     if (OidIsValid(idxoid))
     828           0 :         found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
     829             :                                              LockTupleExclusive,
     830             :                                              remoteslot, localslot);
     831             :     else
     832           0 :         found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive,
     833             :                                          remoteslot, localslot);
     834             :     /* If found delete it. */
     835           0 :     if (found)
     836             :     {
     837           0 :         EvalPlanQualSetSlot(&epqstate, localslot);
     838             : 
     839             :         /* Do the actual delete. */
     840           0 :         ExecSimpleRelationDelete(estate, &epqstate, localslot);
     841             :     }
     842             :     else
     843             :     {
     844             :         /* The tuple to be deleted could not be found. */
     845           0 :         ereport(DEBUG1,
     846             :                 (errmsg("logical replication could not find row for delete "
     847             :                         "in replication target %s",
     848             :                         RelationGetRelationName(rel->localrel))));
     849             :     }
     850             : 
     851             :     /* Cleanup. */
     852           0 :     ExecCloseIndices(estate->es_result_relation_info);
     853           0 :     PopActiveSnapshot();
     854             : 
     855             :     /* Handle queued AFTER triggers. */
     856           0 :     AfterTriggerEndQuery(estate);
     857             : 
     858           0 :     EvalPlanQualEnd(&epqstate);
     859           0 :     ExecResetTupleTable(estate->es_tupleTable, false);
     860           0 :     FreeExecutorState(estate);
     861             : 
     862           0 :     logicalrep_rel_close(rel, NoLock);
     863             : 
     864           0 :     CommandCounterIncrement();
     865             : }
     866             : 
     867             : 
     868             : /*
     869             :  * Logical replication protocol message dispatcher.
     870             :  */
     871             : static void
     872           0 : apply_dispatch(StringInfo s)
     873             : {
     874           0 :     char        action = pq_getmsgbyte(s);
     875             : 
     876           0 :     switch (action)
     877             :     {
     878             :             /* BEGIN */
     879             :         case 'B':
     880           0 :             apply_handle_begin(s);
     881           0 :             break;
     882             :             /* COMMIT */
     883             :         case 'C':
     884           0 :             apply_handle_commit(s);
     885           0 :             break;
     886             :             /* INSERT */
     887             :         case 'I':
     888           0 :             apply_handle_insert(s);
     889           0 :             break;
     890             :             /* UPDATE */
     891             :         case 'U':
     892           0 :             apply_handle_update(s);
     893           0 :             break;
     894             :             /* DELETE */
     895             :         case 'D':
     896           0 :             apply_handle_delete(s);
     897           0 :             break;
     898             :             /* RELATION */
     899             :         case 'R':
     900           0 :             apply_handle_relation(s);
     901           0 :             break;
     902             :             /* TYPE */
     903             :         case 'Y':
     904           0 :             apply_handle_type(s);
     905           0 :             break;
     906             :             /* ORIGIN */
     907             :         case 'O':
     908           0 :             apply_handle_origin(s);
     909           0 :             break;
     910             :         default:
     911           0 :             ereport(ERROR,
     912             :                     (errcode(ERRCODE_PROTOCOL_VIOLATION),
     913             :                      errmsg("invalid logical replication message type %c", action)));
     914             :     }
     915           0 : }
     916             : 
     917             : /*
     918             :  * Figure out which write/flush positions to report to the walsender process.
     919             :  *
     920             :  * We can't simply report back the last LSN the walsender sent us because the
     921             :  * local transaction might not yet be flushed to disk locally. Instead we
     922             :  * build a list that associates local with remote LSNs for every commit. When
     923             :  * reporting back the flush position to the sender we iterate that list and
     924             :  * check which entries on it are already locally flushed. Those we can report
     925             :  * as having been flushed.
     926             :  *
     927             :  * The have_pending_txes is true if there are outstanding transactions that
     928             :  * need to be flushed.
     929             :  */
     930             : static void
     931           0 : get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
     932             :                    bool *have_pending_txes)
     933             : {
     934             :     dlist_mutable_iter iter;
     935           0 :     XLogRecPtr  local_flush = GetFlushRecPtr();
     936             : 
     937           0 :     *write = InvalidXLogRecPtr;
     938           0 :     *flush = InvalidXLogRecPtr;
     939             : 
     940           0 :     dlist_foreach_modify(iter, &lsn_mapping)
     941             :     {
     942           0 :         FlushPosition *pos =
     943           0 :         dlist_container(FlushPosition, node, iter.cur);
     944             : 
     945           0 :         *write = pos->remote_end;
     946             : 
     947           0 :         if (pos->local_end <= local_flush)
     948             :         {
     949           0 :             *flush = pos->remote_end;
     950           0 :             dlist_delete(iter.cur);
     951           0 :             pfree(pos);
     952             :         }
     953             :         else
     954             :         {
     955             :             /*
     956             :              * Don't want to uselessly iterate over the rest of the list which
     957             :              * could potentially be long. Instead get the last element and
     958             :              * grab the write position from there.
     959             :              */
     960           0 :             pos = dlist_tail_element(FlushPosition, node,
     961             :                                      &lsn_mapping);
     962           0 :             *write = pos->remote_end;
     963           0 :             *have_pending_txes = true;
     964           0 :             return;
     965             :         }
     966             :     }
     967             : 
     968           0 :     *have_pending_txes = !dlist_is_empty(&lsn_mapping);
     969             : }
     970             : 
     971             : /*
     972             :  * Store current remote/local lsn pair in the tracking list.
     973             :  */
     974             : static void
     975           0 : store_flush_position(XLogRecPtr remote_lsn)
     976             : {
     977             :     FlushPosition *flushpos;
     978             : 
     979             :     /* Need to do this in permanent context */
     980           0 :     MemoryContextSwitchTo(ApplyContext);
     981             : 
     982             :     /* Track commit lsn  */
     983           0 :     flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
     984           0 :     flushpos->local_end = XactLastCommitEnd;
     985           0 :     flushpos->remote_end = remote_lsn;
     986             : 
     987           0 :     dlist_push_tail(&lsn_mapping, &flushpos->node);
     988           0 :     MemoryContextSwitchTo(ApplyMessageContext);
     989           0 : }
     990             : 
     991             : 
     992             : /* Update statistics of the worker. */
     993             : static void
     994           0 : UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
     995             : {
     996           0 :     MyLogicalRepWorker->last_lsn = last_lsn;
     997           0 :     MyLogicalRepWorker->last_send_time = send_time;
     998           0 :     MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
     999           0 :     if (reply)
    1000             :     {
    1001           0 :         MyLogicalRepWorker->reply_lsn = last_lsn;
    1002           0 :         MyLogicalRepWorker->reply_time = send_time;
    1003             :     }
    1004           0 : }
    1005             : 
    1006             : /*
    1007             :  * Apply main loop.
    1008             :  */
    1009             : static void
    1010           0 : LogicalRepApplyLoop(XLogRecPtr last_received)
    1011             : {
    1012             :     /*
    1013             :      * Init the ApplyMessageContext which we clean up after each replication
    1014             :      * protocol message.
    1015             :      */
    1016           0 :     ApplyMessageContext = AllocSetContextCreate(ApplyContext,
    1017             :                                                 "ApplyMessageContext",
    1018             :                                                 ALLOCSET_DEFAULT_SIZES);
    1019             : 
    1020             :     /* mark as idle, before starting to loop */
    1021           0 :     pgstat_report_activity(STATE_IDLE, NULL);
    1022             : 
    1023             :     for (;;)
    1024             :     {
    1025           0 :         pgsocket    fd = PGINVALID_SOCKET;
    1026             :         int         rc;
    1027             :         int         len;
    1028           0 :         char       *buf = NULL;
    1029           0 :         bool        endofstream = false;
    1030           0 :         TimestampTz last_recv_timestamp = GetCurrentTimestamp();
    1031           0 :         bool        ping_sent = false;
    1032             :         long        wait_time;
    1033             : 
    1034           0 :         CHECK_FOR_INTERRUPTS();
    1035             : 
    1036           0 :         MemoryContextSwitchTo(ApplyMessageContext);
    1037             : 
    1038           0 :         len = walrcv_receive(wrconn, &buf, &fd);
    1039             : 
    1040           0 :         if (len != 0)
    1041             :         {
    1042             :             /* Process the data */
    1043             :             for (;;)
    1044             :             {
    1045           0 :                 CHECK_FOR_INTERRUPTS();
    1046             : 
    1047           0 :                 if (len == 0)
    1048             :                 {
    1049           0 :                     break;
    1050             :                 }
    1051           0 :                 else if (len < 0)
    1052             :                 {
    1053           0 :                     ereport(LOG,
    1054             :                             (errmsg("data stream from publisher has ended")));
    1055           0 :                     endofstream = true;
    1056           0 :                     break;
    1057             :                 }
    1058             :                 else
    1059             :                 {
    1060             :                     int         c;
    1061             :                     StringInfoData s;
    1062             : 
    1063             :                     /* Reset timeout. */
    1064           0 :                     last_recv_timestamp = GetCurrentTimestamp();
    1065           0 :                     ping_sent = false;
    1066             : 
    1067             :                     /* Ensure we are reading the data into our memory context. */
    1068           0 :                     MemoryContextSwitchTo(ApplyMessageContext);
    1069             : 
    1070           0 :                     s.data = buf;
    1071           0 :                     s.len = len;
    1072           0 :                     s.cursor = 0;
    1073           0 :                     s.maxlen = -1;
    1074             : 
    1075           0 :                     c = pq_getmsgbyte(&s);
    1076             : 
    1077           0 :                     if (c == 'w')
    1078             :                     {
    1079             :                         XLogRecPtr  start_lsn;
    1080             :                         XLogRecPtr  end_lsn;
    1081             :                         TimestampTz send_time;
    1082             : 
    1083           0 :                         start_lsn = pq_getmsgint64(&s);
    1084           0 :                         end_lsn = pq_getmsgint64(&s);
    1085           0 :                         send_time = pq_getmsgint64(&s);
    1086             : 
    1087           0 :                         if (last_received < start_lsn)
    1088           0 :                             last_received = start_lsn;
    1089             : 
    1090           0 :                         if (last_received < end_lsn)
    1091           0 :                             last_received = end_lsn;
    1092             : 
    1093           0 :                         UpdateWorkerStats(last_received, send_time, false);
    1094             : 
    1095           0 :                         apply_dispatch(&s);
    1096             :                     }
    1097           0 :                     else if (c == 'k')
    1098             :                     {
    1099             :                         XLogRecPtr  end_lsn;
    1100             :                         TimestampTz timestamp;
    1101             :                         bool        reply_requested;
    1102             : 
    1103           0 :                         end_lsn = pq_getmsgint64(&s);
    1104           0 :                         timestamp = pq_getmsgint64(&s);
    1105           0 :                         reply_requested = pq_getmsgbyte(&s);
    1106             : 
    1107           0 :                         if (last_received < end_lsn)
    1108           0 :                             last_received = end_lsn;
    1109             : 
    1110           0 :                         send_feedback(last_received, reply_requested, false);
    1111           0 :                         UpdateWorkerStats(last_received, timestamp, true);
    1112             :                     }
    1113             :                     /* other message types are purposefully ignored */
    1114             : 
    1115           0 :                     MemoryContextReset(ApplyMessageContext);
    1116             :                 }
    1117             : 
    1118           0 :                 len = walrcv_receive(wrconn, &buf, &fd);
    1119           0 :             }
    1120             :         }
    1121             : 
    1122             :         /* confirm all writes so far */
    1123           0 :         send_feedback(last_received, false, false);
    1124             : 
    1125           0 :         if (!in_remote_transaction)
    1126             :         {
    1127             :             /*
    1128             :              * If we didn't get any transactions for a while there might be
    1129             :              * unconsumed invalidation messages in the queue, consume them
    1130             :              * now.
    1131             :              */
    1132           0 :             AcceptInvalidationMessages();
    1133           0 :             maybe_reread_subscription();
    1134             : 
    1135             :             /* Process any table synchronization changes. */
    1136           0 :             process_syncing_tables(last_received);
    1137             :         }
    1138             : 
    1139             :         /* Cleanup the memory. */
    1140           0 :         MemoryContextResetAndDeleteChildren(ApplyMessageContext);
    1141           0 :         MemoryContextSwitchTo(TopMemoryContext);
    1142             : 
    1143             :         /* Check if we need to exit the streaming loop. */
    1144           0 :         if (endofstream)
    1145             :         {
    1146             :             TimeLineID  tli;
    1147             : 
    1148           0 :             walrcv_endstreaming(wrconn, &tli);
    1149           0 :             break;
    1150             :         }
    1151             : 
    1152             :         /*
    1153             :          * Wait for more data or latch.  If we have unflushed transactions,
    1154             :          * wake up after WalWriterDelay to see if they've been flushed yet (in
    1155             :          * which case we should send a feedback message).  Otherwise, there's
    1156             :          * no particular urgency about waking up unless we get data or a
    1157             :          * signal.
    1158             :          */
    1159           0 :         if (!dlist_is_empty(&lsn_mapping))
    1160           0 :             wait_time = WalWriterDelay;
    1161             :         else
    1162           0 :             wait_time = NAPTIME_PER_CYCLE;
    1163             : 
    1164           0 :         rc = WaitLatchOrSocket(MyLatch,
    1165             :                                WL_SOCKET_READABLE | WL_LATCH_SET |
    1166             :                                WL_TIMEOUT | WL_POSTMASTER_DEATH,
    1167             :                                fd, wait_time,
    1168             :                                WAIT_EVENT_LOGICAL_APPLY_MAIN);
    1169             : 
    1170             :         /* Emergency bailout if postmaster has died */
    1171           0 :         if (rc & WL_POSTMASTER_DEATH)
    1172           0 :             proc_exit(1);
    1173             : 
    1174           0 :         if (rc & WL_LATCH_SET)
    1175             :         {
    1176           0 :             ResetLatch(MyLatch);
    1177           0 :             CHECK_FOR_INTERRUPTS();
    1178             :         }
    1179             : 
    1180           0 :         if (got_SIGHUP)
    1181             :         {
    1182           0 :             got_SIGHUP = false;
    1183           0 :             ProcessConfigFile(PGC_SIGHUP);
    1184             :         }
    1185             : 
    1186           0 :         if (rc & WL_TIMEOUT)
    1187             :         {
    1188             :             /*
    1189             :              * We didn't receive anything new. If we haven't heard anything
    1190             :              * from the server for more than wal_receiver_timeout / 2, ping
    1191             :              * the server. Also, if it's been longer than
    1192             :              * wal_receiver_status_interval since the last update we sent,
    1193             :              * send a status update to the master anyway, to report any
    1194             :              * progress in applying WAL.
    1195             :              */
    1196           0 :             bool        requestReply = false;
    1197             : 
    1198             :             /*
    1199             :              * Check if time since last receive from standby has reached the
    1200             :              * configured limit.
    1201             :              */
    1202           0 :             if (wal_receiver_timeout > 0)
    1203             :             {
    1204           0 :                 TimestampTz now = GetCurrentTimestamp();
    1205             :                 TimestampTz timeout;
    1206             : 
    1207           0 :                 timeout =
    1208           0 :                     TimestampTzPlusMilliseconds(last_recv_timestamp,
    1209             :                                                 wal_receiver_timeout);
    1210             : 
    1211           0 :                 if (now >= timeout)
    1212           0 :                     ereport(ERROR,
    1213             :                             (errmsg("terminating logical replication worker due to timeout")));
    1214             : 
    1215             :                 /*
    1216             :                  * We didn't receive anything new, for half of receiver
    1217             :                  * replication timeout. Ping the server.
    1218             :                  */
    1219           0 :                 if (!ping_sent)
    1220             :                 {
    1221           0 :                     timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
    1222             :                                                           (wal_receiver_timeout / 2));
    1223           0 :                     if (now >= timeout)
    1224             :                     {
    1225           0 :                         requestReply = true;
    1226           0 :                         ping_sent = true;
    1227             :                     }
    1228             :                 }
    1229             :             }
    1230             : 
    1231           0 :             send_feedback(last_received, requestReply, requestReply);
    1232             :         }
    1233           0 :     }
    1234           0 : }
    1235             : 
    1236             : /*
    1237             :  * Send a Standby Status Update message to server.
    1238             :  *
    1239             :  * 'recvpos' is the latest LSN we've received data to, force is set if we need
    1240             :  * to send a response to avoid timeouts.
    1241             :  */
    1242             : static void
    1243           0 : send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
    1244             : {
    1245             :     static StringInfo reply_message = NULL;
    1246             :     static TimestampTz send_time = 0;
    1247             : 
    1248             :     static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
    1249             :     static XLogRecPtr last_writepos = InvalidXLogRecPtr;
    1250             :     static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
    1251             : 
    1252             :     XLogRecPtr  writepos;
    1253             :     XLogRecPtr  flushpos;
    1254             :     TimestampTz now;
    1255             :     bool        have_pending_txes;
    1256             : 
    1257             :     /*
    1258             :      * If the user doesn't want status to be reported to the publisher, be
    1259             :      * sure to exit before doing anything at all.
    1260             :      */
    1261           0 :     if (!force && wal_receiver_status_interval <= 0)
    1262           0 :         return;
    1263             : 
    1264             :     /* It's legal to not pass a recvpos */
    1265           0 :     if (recvpos < last_recvpos)
    1266           0 :         recvpos = last_recvpos;
    1267             : 
    1268           0 :     get_flush_position(&writepos, &flushpos, &have_pending_txes);
    1269             : 
    1270             :     /*
    1271             :      * No outstanding transactions to flush, we can report the latest received
    1272             :      * position. This is important for synchronous replication.
    1273             :      */
    1274           0 :     if (!have_pending_txes)
    1275           0 :         flushpos = writepos = recvpos;
    1276             : 
    1277           0 :     if (writepos < last_writepos)
    1278           0 :         writepos = last_writepos;
    1279             : 
    1280           0 :     if (flushpos < last_flushpos)
    1281           0 :         flushpos = last_flushpos;
    1282             : 
    1283           0 :     now = GetCurrentTimestamp();
    1284             : 
    1285             :     /* if we've already reported everything we're good */
    1286           0 :     if (!force &&
    1287           0 :         writepos == last_writepos &&
    1288           0 :         flushpos == last_flushpos &&
    1289           0 :         !TimestampDifferenceExceeds(send_time, now,
    1290             :                                     wal_receiver_status_interval * 1000))
    1291           0 :         return;
    1292           0 :     send_time = now;
    1293             : 
    1294           0 :     if (!reply_message)
    1295             :     {
    1296           0 :         MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
    1297             : 
    1298           0 :         reply_message = makeStringInfo();
    1299           0 :         MemoryContextSwitchTo(oldctx);
    1300             :     }
    1301             :     else
    1302           0 :         resetStringInfo(reply_message);
    1303             : 
    1304           0 :     pq_sendbyte(reply_message, 'r');
    1305           0 :     pq_sendint64(reply_message, recvpos);   /* write */
    1306           0 :     pq_sendint64(reply_message, flushpos);  /* flush */
    1307           0 :     pq_sendint64(reply_message, writepos);  /* apply */
    1308           0 :     pq_sendint64(reply_message, now);   /* sendTime */
    1309           0 :     pq_sendbyte(reply_message, requestReply);   /* replyRequested */
    1310             : 
    1311           0 :     elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
    1312             :          force,
    1313             :          (uint32) (recvpos >> 32), (uint32) recvpos,
    1314             :          (uint32) (writepos >> 32), (uint32) writepos,
    1315             :          (uint32) (flushpos >> 32), (uint32) flushpos
    1316             :         );
    1317             : 
    1318           0 :     walrcv_send(wrconn, reply_message->data, reply_message->len);
    1319             : 
    1320           0 :     if (recvpos > last_recvpos)
    1321           0 :         last_recvpos = recvpos;
    1322           0 :     if (writepos > last_writepos)
    1323           0 :         last_writepos = writepos;
    1324           0 :     if (flushpos > last_flushpos)
    1325           0 :         last_flushpos = flushpos;
    1326             : }
    1327             : 
    1328             : /*
    1329             :  * Reread subscription info if needed. Most changes will be exit.
    1330             :  */
    1331             : static void
    1332           0 : maybe_reread_subscription(void)
    1333             : {
    1334             :     MemoryContext oldctx;
    1335             :     Subscription *newsub;
    1336           0 :     bool        started_tx = false;
    1337             : 
    1338             :     /* When cache state is valid there is nothing to do here. */
    1339           0 :     if (MySubscriptionValid)
    1340           0 :         return;
    1341             : 
    1342             :     /* This function might be called inside or outside of transaction. */
    1343           0 :     if (!IsTransactionState())
    1344             :     {
    1345           0 :         StartTransactionCommand();
    1346           0 :         started_tx = true;
    1347             :     }
    1348             : 
    1349             :     /* Ensure allocations in permanent context. */
    1350           0 :     oldctx = MemoryContextSwitchTo(ApplyContext);
    1351             : 
    1352           0 :     newsub = GetSubscription(MyLogicalRepWorker->subid, true);
    1353             : 
    1354             :     /*
    1355             :      * Exit if the subscription was removed. This normally should not happen
    1356             :      * as the worker gets killed during DROP SUBSCRIPTION.
    1357             :      */
    1358           0 :     if (!newsub)
    1359             :     {
    1360           0 :         ereport(LOG,
    1361             :                 (errmsg("logical replication apply worker for subscription \"%s\" will "
    1362             :                         "stop because the subscription was removed",
    1363             :                         MySubscription->name)));
    1364             : 
    1365           0 :         proc_exit(0);
    1366             :     }
    1367             : 
    1368             :     /*
    1369             :      * Exit if the subscription was disabled. This normally should not happen
    1370             :      * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
    1371             :      */
    1372           0 :     if (!newsub->enabled)
    1373             :     {
    1374           0 :         ereport(LOG,
    1375             :                 (errmsg("logical replication apply worker for subscription \"%s\" will "
    1376             :                         "stop because the subscription was disabled",
    1377             :                         MySubscription->name)));
    1378             : 
    1379           0 :         proc_exit(0);
    1380             :     }
    1381             : 
    1382             :     /*
    1383             :      * Exit if connection string was changed. The launcher will start new
    1384             :      * worker.
    1385             :      */
    1386           0 :     if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
    1387             :     {
    1388           0 :         ereport(LOG,
    1389             :                 (errmsg("logical replication apply worker for subscription \"%s\" will "
    1390             :                         "restart because the connection information was changed",
    1391             :                         MySubscription->name)));
    1392             : 
    1393           0 :         proc_exit(0);
    1394             :     }
    1395             : 
    1396             :     /*
    1397             :      * Exit if subscription name was changed (it's used for
    1398             :      * fallback_application_name). The launcher will start new worker.
    1399             :      */
    1400           0 :     if (strcmp(newsub->name, MySubscription->name) != 0)
    1401             :     {
    1402           0 :         ereport(LOG,
    1403             :                 (errmsg("logical replication apply worker for subscription \"%s\" will "
    1404             :                         "restart because subscription was renamed",
    1405             :                         MySubscription->name)));
    1406             : 
    1407           0 :         proc_exit(0);
    1408             :     }
    1409             : 
    1410             :     /* !slotname should never happen when enabled is true. */
    1411           0 :     Assert(newsub->slotname);
    1412             : 
    1413             :     /*
    1414             :      * We need to make new connection to new slot if slot name has changed so
    1415             :      * exit here as well if that's the case.
    1416             :      */
    1417           0 :     if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
    1418             :     {
    1419           0 :         ereport(LOG,
    1420             :                 (errmsg("logical replication apply worker for subscription \"%s\" will "
    1421             :                         "restart because the replication slot name was changed",
    1422             :                         MySubscription->name)));
    1423             : 
    1424           0 :         proc_exit(0);
    1425             :     }
    1426             : 
    1427             :     /*
    1428             :      * Exit if publication list was changed. The launcher will start new
    1429             :      * worker.
    1430             :      */
    1431           0 :     if (!equal(newsub->publications, MySubscription->publications))
    1432             :     {
    1433           0 :         ereport(LOG,
    1434             :                 (errmsg("logical replication apply worker for subscription \"%s\" will "
    1435             :                         "restart because subscription's publications were changed",
    1436             :                         MySubscription->name)));
    1437             : 
    1438           0 :         proc_exit(0);
    1439             :     }
    1440             : 
    1441             :     /* Check for other changes that should never happen too. */
    1442           0 :     if (newsub->dbid != MySubscription->dbid)
    1443             :     {
    1444           0 :         elog(ERROR, "subscription %u changed unexpectedly",
    1445             :              MyLogicalRepWorker->subid);
    1446             :     }
    1447             : 
    1448             :     /* Clean old subscription info and switch to new one. */
    1449           0 :     FreeSubscription(MySubscription);
    1450           0 :     MySubscription = newsub;
    1451             : 
    1452           0 :     MemoryContextSwitchTo(oldctx);
    1453             : 
    1454             :     /* Change synchronous commit according to the user's wishes */
    1455           0 :     SetConfigOption("synchronous_commit", MySubscription->synccommit,
    1456             :                     PGC_BACKEND, PGC_S_OVERRIDE);
    1457             : 
    1458           0 :     if (started_tx)
    1459           0 :         CommitTransactionCommand();
    1460             : 
    1461           0 :     MySubscriptionValid = true;
    1462             : }
    1463             : 
    1464             : /*
    1465             :  * Callback from subscription syscache invalidation.
    1466             :  */
    1467             : static void
    1468           0 : subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
    1469             : {
    1470           0 :     MySubscriptionValid = false;
    1471           0 : }
    1472             : 
    1473             : /* SIGHUP: set flag to reload configuration at next convenient time */
    1474             : static void
    1475           0 : logicalrep_worker_sighup(SIGNAL_ARGS)
    1476             : {
    1477           0 :     int         save_errno = errno;
    1478             : 
    1479           0 :     got_SIGHUP = true;
    1480             : 
    1481             :     /* Waken anything waiting on the process latch */
    1482           0 :     SetLatch(MyLatch);
    1483             : 
    1484           0 :     errno = save_errno;
    1485           0 : }
    1486             : 
    1487             : /* Logical Replication Apply worker entry point */
    1488             : void
    1489           0 : ApplyWorkerMain(Datum main_arg)
    1490             : {
    1491           0 :     int         worker_slot = DatumGetInt32(main_arg);
    1492             :     MemoryContext oldctx;
    1493             :     char        originname[NAMEDATALEN];
    1494             :     XLogRecPtr  origin_startpos;
    1495             :     char       *myslotname;
    1496             :     WalRcvStreamOptions options;
    1497             : 
    1498             :     /* Attach to slot */
    1499           0 :     logicalrep_worker_attach(worker_slot);
    1500             : 
    1501             :     /* Setup signal handling */
    1502           0 :     pqsignal(SIGHUP, logicalrep_worker_sighup);
    1503           0 :     pqsignal(SIGTERM, die);
    1504           0 :     BackgroundWorkerUnblockSignals();
    1505             : 
    1506             :     /* Initialise stats to a sanish value */
    1507           0 :     MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
    1508           0 :         MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
    1509             : 
    1510             :     /* Load the libpq-specific functions */
    1511           0 :     load_file("libpqwalreceiver", false);
    1512             : 
    1513           0 :     Assert(CurrentResourceOwner == NULL);
    1514           0 :     CurrentResourceOwner = ResourceOwnerCreate(NULL,
    1515             :                                                "logical replication apply");
    1516             : 
    1517             :     /* Run as replica session replication role. */
    1518           0 :     SetConfigOption("session_replication_role", "replica",
    1519             :                     PGC_SUSET, PGC_S_OVERRIDE);
    1520             : 
    1521             :     /* Connect to our database. */
    1522           0 :     BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
    1523           0 :                                               MyLogicalRepWorker->userid);
    1524             : 
    1525             :     /* Load the subscription into persistent memory context. */
    1526           0 :     ApplyContext = AllocSetContextCreate(TopMemoryContext,
    1527             :                                          "ApplyContext",
    1528             :                                          ALLOCSET_DEFAULT_SIZES);
    1529           0 :     StartTransactionCommand();
    1530           0 :     oldctx = MemoryContextSwitchTo(ApplyContext);
    1531           0 :     MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
    1532           0 :     MySubscriptionValid = true;
    1533           0 :     MemoryContextSwitchTo(oldctx);
    1534             : 
    1535             :     /* Setup synchronous commit according to the user's wishes */
    1536           0 :     SetConfigOption("synchronous_commit", MySubscription->synccommit,
    1537             :                     PGC_BACKEND, PGC_S_OVERRIDE);
    1538             : 
    1539           0 :     if (!MySubscription->enabled)
    1540             :     {
    1541           0 :         ereport(LOG,
    1542             :                 (errmsg("logical replication apply worker for subscription \"%s\" will not "
    1543             :                         "start because the subscription was disabled during startup",
    1544             :                         MySubscription->name)));
    1545             : 
    1546           0 :         proc_exit(0);
    1547             :     }
    1548             : 
    1549             :     /* Keep us informed about subscription changes. */
    1550           0 :     CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
    1551             :                                   subscription_change_cb,
    1552             :                                   (Datum) 0);
    1553             : 
    1554           0 :     if (am_tablesync_worker())
    1555           0 :         ereport(LOG,
    1556             :                 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
    1557             :                         MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
    1558             :     else
    1559           0 :         ereport(LOG,
    1560             :                 (errmsg("logical replication apply worker for subscription \"%s\" has started",
    1561             :                         MySubscription->name)));
    1562             : 
    1563           0 :     CommitTransactionCommand();
    1564             : 
    1565             :     /* Connect to the origin and start the replication. */
    1566           0 :     elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
    1567             :          MySubscription->conninfo);
    1568             : 
    1569           0 :     if (am_tablesync_worker())
    1570             :     {
    1571             :         char       *syncslotname;
    1572             : 
    1573             :         /* This is table synchroniation worker, call initial sync. */
    1574           0 :         syncslotname = LogicalRepSyncTableStart(&origin_startpos);
    1575             : 
    1576             :         /* The slot name needs to be allocated in permanent memory context. */
    1577           0 :         oldctx = MemoryContextSwitchTo(ApplyContext);
    1578           0 :         myslotname = pstrdup(syncslotname);
    1579           0 :         MemoryContextSwitchTo(oldctx);
    1580             : 
    1581           0 :         pfree(syncslotname);
    1582             :     }
    1583             :     else
    1584             :     {
    1585             :         /* This is main apply worker */
    1586             :         RepOriginId originid;
    1587             :         TimeLineID  startpointTLI;
    1588             :         char       *err;
    1589             :         int         server_version;
    1590             : 
    1591           0 :         myslotname = MySubscription->slotname;
    1592             : 
    1593             :         /*
    1594             :          * This shouldn't happen if the subscription is enabled, but guard
    1595             :          * against DDL bugs or manual catalog changes.  (libpqwalreceiver will
    1596             :          * crash if slot is NULL.)
    1597             :          */
    1598           0 :         if (!myslotname)
    1599           0 :             ereport(ERROR,
    1600             :                     (errmsg("subscription has no replication slot set")));
    1601             : 
    1602             :         /* Setup replication origin tracking. */
    1603           0 :         StartTransactionCommand();
    1604           0 :         snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
    1605           0 :         originid = replorigin_by_name(originname, true);
    1606           0 :         if (!OidIsValid(originid))
    1607           0 :             originid = replorigin_create(originname);
    1608           0 :         replorigin_session_setup(originid);
    1609           0 :         replorigin_session_origin = originid;
    1610           0 :         origin_startpos = replorigin_session_get_progress(false);
    1611           0 :         CommitTransactionCommand();
    1612             : 
    1613           0 :         wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
    1614             :                                 &err);
    1615           0 :         if (wrconn == NULL)
    1616           0 :             ereport(ERROR,
    1617             :                     (errmsg("could not connect to the publisher: %s", err)));
    1618             : 
    1619             :         /*
    1620             :          * We don't really use the output identify_system for anything but it
    1621             :          * does some initializations on the upstream so let's still call it.
    1622             :          */
    1623           0 :         (void) walrcv_identify_system(wrconn, &startpointTLI,
    1624             :                                       &server_version);
    1625             : 
    1626             :     }
    1627             : 
    1628             :     /*
    1629             :      * Setup callback for syscache so that we know when something changes in
    1630             :      * the subscription relation state.
    1631             :      */
    1632           0 :     CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
    1633             :                                   invalidate_syncing_table_states,
    1634             :                                   (Datum) 0);
    1635             : 
    1636             :     /* Build logical replication streaming options. */
    1637           0 :     options.logical = true;
    1638           0 :     options.startpoint = origin_startpos;
    1639           0 :     options.slotname = myslotname;
    1640           0 :     options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
    1641           0 :     options.proto.logical.publication_names = MySubscription->publications;
    1642             : 
    1643             :     /* Start normal logical streaming replication. */
    1644           0 :     walrcv_startstreaming(wrconn, &options);
    1645             : 
    1646             :     /* Run the main loop. */
    1647           0 :     LogicalRepApplyLoop(origin_startpos);
    1648             : 
    1649           0 :     proc_exit(0);
    1650             : }
    1651             : 
    1652             : /*
    1653             :  * Is current process a logical replication worker?
    1654             :  */
    1655             : bool
    1656           2 : IsLogicalWorker(void)
    1657             : {
    1658           2 :     return MyLogicalRepWorker != NULL;
    1659             : }

Generated by: LCOV version 1.11