|           Line data    Source code 
       1             : /*-------------------------------------------------------------------------
       2             :  * logical.c
       3             :  *     PostgreSQL logical decoding coordination
       4             :  *
       5             :  * Copyright (c) 2012-2017, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/logical.c
       9             :  *
      10             :  * NOTES
      11             :  *    This file coordinates interaction between the various modules that
      12             :  *    together provide logical decoding, primarily by providing so
      13             :  *    called LogicalDecodingContexts. The goal is to encapsulate most of the
      14             :  *    internal complexity for consumers of logical decoding, so they can
      15             :  *    create and consume a changestream with a low amount of code. Builtin
      16             :  *    consumers are the walsender and SQL SRF interface, but it's possible to
      17             :  *    add further ones without changing core code, e.g. to consume changes in
      18             :  *    a bgworker.
      19             :  *
      20             :  *    The idea is that a consumer provides three callbacks, one to read WAL,
      21             :  *    one to prepare a data write, and a final one for actually writing since
      22             :  *    their implementation depends on the type of consumer.  Check
      23             :  *    logicalfuncs.c for an example implementation of a fairly simple consumer
      24             :  *    and an implementation of a WAL reading callback that's suitable for
      25             :  *    simple consumers.
      26             :  *-------------------------------------------------------------------------
      27             :  */
      28             : 
      29             : #include "postgres.h"
      30             : 
      31             : #include "miscadmin.h"
      32             : 
      33             : #include "access/xact.h"
      34             : #include "access/xlog_internal.h"
      35             : 
      36             : #include "replication/decode.h"
      37             : #include "replication/logical.h"
      38             : #include "replication/reorderbuffer.h"
      39             : #include "replication/origin.h"
      40             : #include "replication/snapbuild.h"
      41             : 
      42             : #include "storage/proc.h"
      43             : #include "storage/procarray.h"
      44             : 
      45             : #include "utils/memutils.h"
      46             : 
      47             : /* data for errcontext callback */
      48             : typedef struct LogicalErrorCallbackState
      49             : {
      50             :     LogicalDecodingContext *ctx;
      51             :     const char *callback_name;
      52             :     XLogRecPtr  report_location;
      53             : } LogicalErrorCallbackState;
      54             : 
      55             : /* wrappers around output plugin callbacks */
      56             : static void output_plugin_error_callback(void *arg);
      57             : static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
      58             :                    bool is_init);
      59             : static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
      60             : static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
      61             : static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      62             :                   XLogRecPtr commit_lsn);
      63             : static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      64             :                   Relation relation, ReorderBufferChange *change);
      65             : static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      66             :                    XLogRecPtr message_lsn, bool transactional,
      67             :                    const char *prefix, Size message_size, const char *message);
      68             : 
      69             : static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin);
      70             : 
      71             : /*
      72             :  * Make sure the current settings & environment are capable of doing logical
      73             :  * decoding.
      74             :  */
      75             : void
      76           0 : CheckLogicalDecodingRequirements(void)
      77             : {
      78           0 :     CheckSlotRequirements();
      79             : 
      80           0 :     if (wal_level < WAL_LEVEL_LOGICAL)
      81           0 :         ereport(ERROR,
      82             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
      83             :                  errmsg("logical decoding requires wal_level >= logical")));
      84             : 
      85           0 :     if (MyDatabaseId == InvalidOid)
      86           0 :         ereport(ERROR,
      87             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
      88             :                  errmsg("logical decoding requires a database connection")));
      89             : 
      90             :     /* ----
      91             :      * TODO: We got to change that someday soon...
      92             :      *
      93             :      * There's basically three things missing to allow this:
      94             :      * 1) We need to be able to correctly and quickly identify the timeline a
      95             :      *    LSN belongs to
      96             :      * 2) We need to force hot_standby_feedback to be enabled at all times so
      97             :      *    the primary cannot remove rows we need.
      98             :      * 3) support dropping replication slots referring to a database, in
      99             :      *    dbase_redo. There can't be any active ones due to HS recovery
     100             :      *    conflicts, so that should be relatively easy.
     101             :      * ----
     102             :      */
     103           0 :     if (RecoveryInProgress())
     104           0 :         ereport(ERROR,
     105             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     106             :                  errmsg("logical decoding cannot be used while in recovery")));
     107           0 : }
     108             : 
     109             : /*
     110             :  * Helper function for CreateInitialDecodingContext() and
     111             :  * CreateDecodingContext() performing common tasks.
     112             :  */
     113             : static LogicalDecodingContext *
     114           0 : StartupDecodingContext(List *output_plugin_options,
     115             :                        XLogRecPtr start_lsn,
     116             :                        TransactionId xmin_horizon,
     117             :                        bool need_full_snapshot,
     118             :                        XLogPageReadCB read_page,
     119             :                        LogicalOutputPluginWriterPrepareWrite prepare_write,
     120             :                        LogicalOutputPluginWriterWrite do_write,
     121             :                        LogicalOutputPluginWriterUpdateProgress update_progress)
     122             : {
     123             :     ReplicationSlot *slot;
     124             :     MemoryContext context,
     125             :                 old_context;
     126             :     LogicalDecodingContext *ctx;
     127             : 
     128             :     /* shorter lines... */
     129           0 :     slot = MyReplicationSlot;
     130             : 
     131           0 :     context = AllocSetContextCreate(CurrentMemoryContext,
     132             :                                     "Logical decoding context",
     133             :                                     ALLOCSET_DEFAULT_SIZES);
     134           0 :     old_context = MemoryContextSwitchTo(context);
     135           0 :     ctx = palloc0(sizeof(LogicalDecodingContext));
     136             : 
     137           0 :     ctx->context = context;
     138             : 
     139             :     /*
     140             :      * (re-)load output plugins, so we detect a bad (removed) output plugin
     141             :      * now.
     142             :      */
     143           0 :     LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));
     144             : 
     145             :     /*
     146             :      * Now that the slot's xmin has been set, we can announce ourselves as a
     147             :      * logical decoding backend which doesn't need to be checked individually
     148             :      * when computing the xmin horizon because the xmin is enforced via
     149             :      * replication slots.
     150             :      *
     151             :      * We can only do so if we're outside of a transaction (i.e. the case when
     152             :      * streaming changes via walsender), otherwise an already setup
     153             :      * snapshot/xid would end up being ignored. That's not a particularly
     154             :      * bothersome restriction since the SQL interface can't be used for
     155             :      * streaming anyway.
     156             :      */
     157           0 :     if (!IsTransactionOrTransactionBlock())
     158             :     {
     159           0 :         LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     160           0 :         MyPgXact->vacuumFlags |= PROC_IN_LOGICAL_DECODING;
     161           0 :         LWLockRelease(ProcArrayLock);
     162             :     }
     163             : 
     164           0 :     ctx->slot = slot;
     165             : 
     166           0 :     ctx->reader = XLogReaderAllocate(read_page, ctx);
     167           0 :     if (!ctx->reader)
     168           0 :         ereport(ERROR,
     169             :                 (errcode(ERRCODE_OUT_OF_MEMORY),
     170             :                  errmsg("out of memory")));
     171             : 
     172           0 :     ctx->reader->private_data = ctx;
     173             : 
     174           0 :     ctx->reorder = ReorderBufferAllocate();
     175           0 :     ctx->snapshot_builder =
     176           0 :         AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
     177             :                                 need_full_snapshot);
     178             : 
     179           0 :     ctx->reorder->private_data = ctx;
     180             : 
     181             :     /* wrap output plugin callbacks, so we can add error context information */
     182           0 :     ctx->reorder->begin = begin_cb_wrapper;
     183           0 :     ctx->reorder->apply_change = change_cb_wrapper;
     184           0 :     ctx->reorder->commit = commit_cb_wrapper;
     185           0 :     ctx->reorder->message = message_cb_wrapper;
     186             : 
     187           0 :     ctx->out = makeStringInfo();
     188           0 :     ctx->prepare_write = prepare_write;
     189           0 :     ctx->write = do_write;
     190           0 :     ctx->update_progress = update_progress;
     191             : 
     192           0 :     ctx->output_plugin_options = output_plugin_options;
     193             : 
     194           0 :     MemoryContextSwitchTo(old_context);
     195             : 
     196           0 :     return ctx;
     197             : }
     198             : 
     199             : /*
     200             :  * Create a new decoding context, for a new logical slot.
     201             :  *
     202             :  * plugin contains the name of the output plugin
     203             :  * output_plugin_options contains options passed to the output plugin
     204             :  * read_page, prepare_write, do_write, update_progress
     205             :  *      callbacks that have to be filled to perform the use-case dependent,
     206             :  *      actual, work.
     207             :  *
     208             :  * Needs to be called while in a memory context that's at least as long lived
     209             :  * as the decoding context because further memory contexts will be created
     210             :  * inside it.
     211             :  *
     212             :  * Returns an initialized decoding context after calling the output plugin's
     213             :  * startup function.
     214             :  */
     215             : LogicalDecodingContext *
     216           0 : CreateInitDecodingContext(char *plugin,
     217             :                           List *output_plugin_options,
     218             :                           bool need_full_snapshot,
     219             :                           XLogPageReadCB read_page,
     220             :                           LogicalOutputPluginWriterPrepareWrite prepare_write,
     221             :                           LogicalOutputPluginWriterWrite do_write,
     222             :                           LogicalOutputPluginWriterUpdateProgress update_progress)
     223             : {
     224           0 :     TransactionId xmin_horizon = InvalidTransactionId;
     225             :     ReplicationSlot *slot;
     226             :     LogicalDecodingContext *ctx;
     227             :     MemoryContext old_context;
     228             : 
     229             :     /* shorter lines... */
     230           0 :     slot = MyReplicationSlot;
     231             : 
     232             :     /* first some sanity checks that are unlikely to be violated */
     233           0 :     if (slot == NULL)
     234           0 :         elog(ERROR, "cannot perform logical decoding without an acquired slot");
     235             : 
     236           0 :     if (plugin == NULL)
     237           0 :         elog(ERROR, "cannot initialize logical decoding without a specified plugin");
     238             : 
     239             :     /* Make sure the passed slot is suitable. These are user facing errors. */
     240           0 :     if (SlotIsPhysical(slot))
     241           0 :         ereport(ERROR,
     242             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     243             :                  errmsg("cannot use physical replication slot for logical decoding")));
     244             : 
     245           0 :     if (slot->data.database != MyDatabaseId)
     246           0 :         ereport(ERROR,
     247             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     248             :                  errmsg("replication slot \"%s\" was not created in this database",
     249             :                         NameStr(slot->data.name))));
     250             : 
     251           0 :     if (IsTransactionState() &&
     252           0 :         GetTopTransactionIdIfAny() != InvalidTransactionId)
     253           0 :         ereport(ERROR,
     254             :                 (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
     255             :                  errmsg("cannot create logical replication slot in transaction that has performed writes")));
     256             : 
     257             :     /* register output plugin name with slot */
     258           0 :     SpinLockAcquire(&slot->mutex);
     259           0 :     StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN);
     260           0 :     SpinLockRelease(&slot->mutex);
     261             : 
     262           0 :     ReplicationSlotReserveWal();
     263             : 
     264             :     /* ----
     265             :      * This is a bit tricky: We need to determine a safe xmin horizon to start
     266             :      * decoding from, to avoid starting from a running xacts record referring
     267             :      * to xids whose rows have been vacuumed or pruned
     268             :      * already. GetOldestSafeDecodingTransactionId() returns such a value, but
     269             :      * without further interlock its return value might immediately be out of
     270             :      * date.
     271             :      *
     272             :      * So we have to acquire the ProcArrayLock to prevent computation of new
     273             :      * xmin horizons by other backends, get the safe decoding xid, and inform
     274             :      * the slot machinery about the new limit. Once that's done the
     275             :      * ProcArrayLock can be released as the slot machinery now is
     276             :      * protecting against vacuum.
     277             :      *
     278             :      * Note that, temporarily, the data, not just the catalog, xmin has to be
     279             :      * reserved if a data snapshot is to be exported.  Otherwise the initial
     280             :      * data snapshot created here is not guaranteed to be valid. After that
     281             :      * the data xmin doesn't need to be managed anymore and the global xmin
     282             :      * should be recomputed. As we are fine with losing the pegged data xmin
     283             :      * after crash - no chance a snapshot would get exported anymore - we can
     284             :      * get away with just setting the slot's
     285             :      * effective_xmin. ReplicationSlotRelease will reset it again.
     286             :      *
     287             :      * ----
     288             :      */
     289           0 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     290             : 
     291           0 :     xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
     292             : 
     293           0 :     slot->effective_catalog_xmin = xmin_horizon;
     294           0 :     slot->data.catalog_xmin = xmin_horizon;
     295           0 :     if (need_full_snapshot)
     296           0 :         slot->effective_xmin = xmin_horizon;
     297             : 
     298           0 :     ReplicationSlotsComputeRequiredXmin(true);
     299             : 
     300           0 :     LWLockRelease(ProcArrayLock);
     301             : 
     302           0 :     ReplicationSlotMarkDirty();
     303           0 :     ReplicationSlotSave();
     304             : 
     305           0 :     ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
     306             :                                  need_full_snapshot, read_page, prepare_write,
     307             :                                  do_write, update_progress);
     308             : 
     309             :     /* call output plugin initialization callback */
     310           0 :     old_context = MemoryContextSwitchTo(ctx->context);
     311           0 :     if (ctx->callbacks.startup_cb != NULL)
     312           0 :         startup_cb_wrapper(ctx, &ctx->options, true);
     313           0 :     MemoryContextSwitchTo(old_context);
     314             : 
     315           0 :     return ctx;
     316             : }
     317             : 
     318             : /*
     319             :  * Create a new decoding context, for a logical slot that has previously been
     320             :  * used already.
     321             :  *
     322             :  * start_lsn
     323             :  *      The LSN at which to start decoding.  If InvalidXLogRecPtr, restart
     324             :  *      from the slot's confirmed_flush; otherwise, start from the specified
     325             :  *      location (but move it forwards to confirmed_flush if it's older than
     326             :  *      that, see below).
     327             :  *
     328             :  * output_plugin_options
     329             :  *      contains options passed to the output plugin.
     330             :  *
     331             :  * read_page, prepare_write, do_write, update_progress
     332             :  *      callbacks that have to be filled to perform the use-case dependent,
     333             :  *      actual work.
     334             :  *
     335             :  * Needs to be called while in a memory context that's at least as long lived
     336             :  * as the decoding context because further memory contexts will be created
     337             :  * inside it.
     338             :  *
     339             :  * Returns an initialized decoding context after calling the output plugin's
     340             :  * startup function.
     341             :  */
     342             : LogicalDecodingContext *
     343           0 : CreateDecodingContext(XLogRecPtr start_lsn,
     344             :                       List *output_plugin_options,
     345             :                       XLogPageReadCB read_page,
     346             :                       LogicalOutputPluginWriterPrepareWrite prepare_write,
     347             :                       LogicalOutputPluginWriterWrite do_write,
     348             :                       LogicalOutputPluginWriterUpdateProgress update_progress)
     349             : {
     350             :     LogicalDecodingContext *ctx;
     351             :     ReplicationSlot *slot;
     352             :     MemoryContext old_context;
     353             : 
     354             :     /* shorter lines... */
     355           0 :     slot = MyReplicationSlot;
     356             : 
     357             :     /* first some sanity checks that are unlikely to be violated */
     358           0 :     if (slot == NULL)
     359           0 :         elog(ERROR, "cannot perform logical decoding without an acquired slot");
     360             : 
     361             :     /* make sure the passed slot is suitable, these are user facing errors */
     362           0 :     if (SlotIsPhysical(slot))
     363           0 :         ereport(ERROR,
     364             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     365             :                  (errmsg("cannot use physical replication slot for logical decoding"))));
     366             : 
     367           0 :     if (slot->data.database != MyDatabaseId)
     368           0 :         ereport(ERROR,
     369             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     370             :                  (errmsg("replication slot \"%s\" was not created in this database",
     371             :                          NameStr(slot->data.name)))));
     372             : 
     373           0 :     if (start_lsn == InvalidXLogRecPtr)
     374             :     {
     375             :         /* continue from last position */
     376           0 :         start_lsn = slot->data.confirmed_flush;
     377             :     }
     378           0 :     else if (start_lsn < slot->data.confirmed_flush)
     379             :     {
     380             :         /*
     381             :          * It might seem like we should error out in this case, but it's
     382             :          * pretty common for a client to acknowledge a LSN it doesn't have to
     383             :          * do anything for, and thus didn't store persistently, because the
     384             :          * xlog records didn't result in anything relevant for logical
     385             :          * decoding. Clients have to be able to do that to support synchronous
     386             :          * replication.
     387             :          */
     388           0 :         elog(DEBUG1, "cannot stream from %X/%X, minimum is %X/%X, forwarding",
     389             :              (uint32) (start_lsn >> 32), (uint32) start_lsn,
     390             :              (uint32) (slot->data.confirmed_flush >> 32),
     391             :              (uint32) slot->data.confirmed_flush);
     392             : 
     393           0 :         start_lsn = slot->data.confirmed_flush;
     394             :     }
     395             : 
     396           0 :     ctx = StartupDecodingContext(output_plugin_options,
     397             :                                  start_lsn, InvalidTransactionId, false,
     398             :                                  read_page, prepare_write, do_write,
     399             :                                  update_progress);
     400             : 
     401             :     /* call output plugin initialization callback */
     402           0 :     old_context = MemoryContextSwitchTo(ctx->context);
     403           0 :     if (ctx->callbacks.startup_cb != NULL)
     404           0 :         startup_cb_wrapper(ctx, &ctx->options, false);
     405           0 :     MemoryContextSwitchTo(old_context);
     406             : 
     407           0 :     ereport(LOG,
     408             :             (errmsg("starting logical decoding for slot \"%s\"",
     409             :                     NameStr(slot->data.name)),
     410             :              errdetail("streaming transactions committing after %X/%X, reading WAL from %X/%X",
     411             :                        (uint32) (slot->data.confirmed_flush >> 32),
     412             :                        (uint32) slot->data.confirmed_flush,
     413             :                        (uint32) (slot->data.restart_lsn >> 32),
     414             :                        (uint32) slot->data.restart_lsn)));
     415             : 
     416           0 :     return ctx;
     417             : }
     418             : 
     419             : /*
     420             :  * Returns true if a consistent initial decoding snapshot has been built.
     421             :  */
     422             : bool
     423           0 : DecodingContextReady(LogicalDecodingContext *ctx)
     424             : {
     425           0 :     return SnapBuildCurrentState(ctx->snapshot_builder) == SNAPBUILD_CONSISTENT;
     426             : }
     427             : 
     428             : /*
     429             :  * Read from the decoding slot, until it is ready to start extracting changes.
     430             :  */
     431             : void
     432           0 : DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
     433             : {
     434             :     XLogRecPtr  startptr;
     435             : 
     436             :     /* Initialize from where to start reading WAL. */
     437           0 :     startptr = ctx->slot->data.restart_lsn;
     438             : 
     439           0 :     elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
     440             :          (uint32) (ctx->slot->data.restart_lsn >> 32),
     441             :          (uint32) ctx->slot->data.restart_lsn);
     442             : 
     443             :     /* Wait for a consistent starting point */
     444             :     for (;;)
     445             :     {
     446             :         XLogRecord *record;
     447           0 :         char       *err = NULL;
     448             : 
     449             :         /* the read_page callback waits for new WAL */
     450           0 :         record = XLogReadRecord(ctx->reader, startptr, &err);
     451           0 :         if (err)
     452           0 :             elog(ERROR, "%s", err);
     453           0 :         if (!record)
     454           0 :             elog(ERROR, "no record found"); /* shouldn't happen */
     455             : 
     456           0 :         startptr = InvalidXLogRecPtr;
     457             : 
     458           0 :         LogicalDecodingProcessRecord(ctx, ctx->reader);
     459             : 
     460             :         /* only continue till we found a consistent spot */
     461           0 :         if (DecodingContextReady(ctx))
     462           0 :             break;
     463             : 
     464           0 :         CHECK_FOR_INTERRUPTS();
     465           0 :     }
     466             : 
     467           0 :     ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr;
     468           0 : }
     469             : 
     470             : /*
     471             :  * Free a previously allocated decoding context, invoking the shutdown
     472             :  * callback if necessary.
     473             :  */
     474             : void
     475           0 : FreeDecodingContext(LogicalDecodingContext *ctx)
     476             : {
     477           0 :     if (ctx->callbacks.shutdown_cb != NULL)
     478           0 :         shutdown_cb_wrapper(ctx);
     479             : 
     480           0 :     ReorderBufferFree(ctx->reorder);
     481           0 :     FreeSnapshotBuilder(ctx->snapshot_builder);
     482           0 :     XLogReaderFree(ctx->reader);
     483           0 :     MemoryContextDelete(ctx->context);
     484           0 : }
     485             : 
     486             : /*
     487             :  * Prepare a write using the context's output routine.
     488             :  */
     489             : void
     490           0 : OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
     491             : {
     492           0 :     if (!ctx->accept_writes)
     493           0 :         elog(ERROR, "writes are only accepted in commit, begin and change callbacks");
     494             : 
     495           0 :     ctx->prepare_write(ctx, ctx->write_location, ctx->write_xid, last_write);
     496           0 :     ctx->prepared_write = true;
     497           0 : }
     498             : 
     499             : /*
     500             :  * Perform a write using the context's output routine.
     501             :  */
     502             : void
     503           0 : OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
     504             : {
     505           0 :     if (!ctx->prepared_write)
     506           0 :         elog(ERROR, "OutputPluginPrepareWrite needs to be called before OutputPluginWrite");
     507             : 
     508           0 :     ctx->write(ctx, ctx->write_location, ctx->write_xid, last_write);
     509           0 :     ctx->prepared_write = false;
     510           0 : }
     511             : 
     512             : /*
     513             :  * Update progress tracking (if supported).
     514             :  */
     515             : void
     516           0 : OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
     517             : {
     518           0 :     if (!ctx->update_progress)
     519           0 :         return;
     520             : 
     521           0 :     ctx->update_progress(ctx, ctx->write_location, ctx->write_xid);
     522             : }
     523             : 
     524             : /*
     525             :  * Load the output plugin, lookup its output plugin init function, and check
     526             :  * that it provides the required callbacks.
     527             :  */
     528             : static void
     529           0 : LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin)
     530             : {
     531             :     LogicalOutputPluginInit plugin_init;
     532             : 
     533           0 :     plugin_init = (LogicalOutputPluginInit)
     534             :         load_external_function(plugin, "_PG_output_plugin_init", false, NULL);
     535             : 
     536           0 :     if (plugin_init == NULL)
     537           0 :         elog(ERROR, "output plugins have to declare the _PG_output_plugin_init symbol");
     538             : 
     539             :     /* ask the output plugin to fill the callback struct */
     540           0 :     plugin_init(callbacks);
     541             : 
     542           0 :     if (callbacks->begin_cb == NULL)
     543           0 :         elog(ERROR, "output plugins have to register a begin callback");
     544           0 :     if (callbacks->change_cb == NULL)
     545           0 :         elog(ERROR, "output plugins have to register a change callback");
     546           0 :     if (callbacks->commit_cb == NULL)
     547           0 :         elog(ERROR, "output plugins have to register a commit callback");
     548           0 : }
     549             : 
     550             : static void
     551           0 : output_plugin_error_callback(void *arg)
     552             : {
     553           0 :     LogicalErrorCallbackState *state = (LogicalErrorCallbackState *) arg;
     554             : 
     555             :     /* not all callbacks have an associated LSN  */
     556           0 :     if (state->report_location != InvalidXLogRecPtr)
     557           0 :         errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%X",
     558           0 :                    NameStr(state->ctx->slot->data.name),
     559           0 :                    NameStr(state->ctx->slot->data.plugin),
     560             :                    state->callback_name,
     561           0 :                    (uint32) (state->report_location >> 32),
     562           0 :                    (uint32) state->report_location);
     563             :     else
     564           0 :         errcontext("slot \"%s\", output plugin \"%s\", in the %s callback",
     565           0 :                    NameStr(state->ctx->slot->data.name),
     566           0 :                    NameStr(state->ctx->slot->data.plugin),
     567             :                    state->callback_name);
     568           0 : }
     569             : 
     570             : static void
     571           0 : startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
     572             : {
     573             :     LogicalErrorCallbackState state;
     574             :     ErrorContextCallback errcallback;
     575             : 
     576             :     /* Push callback + info on the error context stack */
     577           0 :     state.ctx = ctx;
     578           0 :     state.callback_name = "startup";
     579           0 :     state.report_location = InvalidXLogRecPtr;
     580           0 :     errcallback.callback = output_plugin_error_callback;
     581           0 :     errcallback.arg = (void *) &state;
     582           0 :     errcallback.previous = error_context_stack;
     583           0 :     error_context_stack = &errcallback;
     584             : 
     585             :     /* set output state */
     586           0 :     ctx->accept_writes = false;
     587             : 
     588             :     /* do the actual work: call callback */
     589           0 :     ctx->callbacks.startup_cb(ctx, opt, is_init);
     590             : 
     591             :     /* Pop the error context stack */
     592           0 :     error_context_stack = errcallback.previous;
     593           0 : }
     594             : 
     595             : static void
     596           0 : shutdown_cb_wrapper(LogicalDecodingContext *ctx)
     597             : {
     598             :     LogicalErrorCallbackState state;
     599             :     ErrorContextCallback errcallback;
     600             : 
     601             :     /* Push callback + info on the error context stack */
     602           0 :     state.ctx = ctx;
     603           0 :     state.callback_name = "shutdown";
     604           0 :     state.report_location = InvalidXLogRecPtr;
     605           0 :     errcallback.callback = output_plugin_error_callback;
     606           0 :     errcallback.arg = (void *) &state;
     607           0 :     errcallback.previous = error_context_stack;
     608           0 :     error_context_stack = &errcallback;
     609             : 
     610             :     /* set output state */
     611           0 :     ctx->accept_writes = false;
     612             : 
     613             :     /* do the actual work: call callback */
     614           0 :     ctx->callbacks.shutdown_cb(ctx);
     615             : 
     616             :     /* Pop the error context stack */
     617           0 :     error_context_stack = errcallback.previous;
     618           0 : }
     619             : 
     620             : 
     621             : /*
     622             :  * Callbacks for ReorderBuffer which add in some more information and then call
     623             :  * output_plugin.h plugins.
     624             :  */
     625             : static void
     626           0 : begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
     627             : {
     628           0 :     LogicalDecodingContext *ctx = cache->private_data;
     629             :     LogicalErrorCallbackState state;
     630             :     ErrorContextCallback errcallback;
     631             : 
     632             :     /* Push callback + info on the error context stack */
     633           0 :     state.ctx = ctx;
     634           0 :     state.callback_name = "begin";
     635           0 :     state.report_location = txn->first_lsn;
     636           0 :     errcallback.callback = output_plugin_error_callback;
     637           0 :     errcallback.arg = (void *) &state;
     638           0 :     errcallback.previous = error_context_stack;
     639           0 :     error_context_stack = &errcallback;
     640             : 
     641             :     /* set output state */
     642           0 :     ctx->accept_writes = true;
     643           0 :     ctx->write_xid = txn->xid;
     644           0 :     ctx->write_location = txn->first_lsn;
     645             : 
     646             :     /* do the actual work: call callback */
     647           0 :     ctx->callbacks.begin_cb(ctx, txn);
     648             : 
     649             :     /* Pop the error context stack */
     650           0 :     error_context_stack = errcallback.previous;
     651           0 : }
     652             : 
     653             : static void
     654           0 : commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
     655             :                   XLogRecPtr commit_lsn)
     656             : {
     657           0 :     LogicalDecodingContext *ctx = cache->private_data;
     658             :     LogicalErrorCallbackState state;
     659             :     ErrorContextCallback errcallback;
     660             : 
     661             :     /* Push callback + info on the error context stack */
     662           0 :     state.ctx = ctx;
     663           0 :     state.callback_name = "commit";
     664           0 :     state.report_location = txn->final_lsn; /* beginning of commit record */
     665           0 :     errcallback.callback = output_plugin_error_callback;
     666           0 :     errcallback.arg = (void *) &state;
     667           0 :     errcallback.previous = error_context_stack;
     668           0 :     error_context_stack = &errcallback;
     669             : 
     670             :     /* set output state */
     671           0 :     ctx->accept_writes = true;
     672           0 :     ctx->write_xid = txn->xid;
     673           0 :     ctx->write_location = txn->end_lsn; /* points to the end of the record */
     674             : 
     675             :     /* do the actual work: call callback */
     676           0 :     ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
     677             : 
     678             :     /* Pop the error context stack */
     679           0 :     error_context_stack = errcallback.previous;
     680           0 : }
     681             : 
     682             : static void
     683           0 : change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
     684             :                   Relation relation, ReorderBufferChange *change)
     685             : {
     686           0 :     LogicalDecodingContext *ctx = cache->private_data;
     687             :     LogicalErrorCallbackState state;
     688             :     ErrorContextCallback errcallback;
     689             : 
     690             :     /* Push callback + info on the error context stack */
     691           0 :     state.ctx = ctx;
     692           0 :     state.callback_name = "change";
     693           0 :     state.report_location = change->lsn;
     694           0 :     errcallback.callback = output_plugin_error_callback;
     695           0 :     errcallback.arg = (void *) &state;
     696           0 :     errcallback.previous = error_context_stack;
     697           0 :     error_context_stack = &errcallback;
     698             : 
     699             :     /* set output state */
     700           0 :     ctx->accept_writes = true;
     701           0 :     ctx->write_xid = txn->xid;
     702             : 
     703             :     /*
     704             :      * report this change's lsn so replies from clients can give an up2date
     705             :      * answer. This won't ever be enough (and shouldn't be!) to confirm
     706             :      * receipt of this transaction, but it might allow another transaction's
     707             :      * commit to be confirmed with one message.
     708             :      */
     709           0 :     ctx->write_location = change->lsn;
     710             : 
     711           0 :     ctx->callbacks.change_cb(ctx, txn, relation, change);
     712             : 
     713             :     /* Pop the error context stack */
     714           0 :     error_context_stack = errcallback.previous;
     715           0 : }
     716             : 
     717             : bool
     718           0 : filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
     719             : {
     720             :     LogicalErrorCallbackState state;
     721             :     ErrorContextCallback errcallback;
     722             :     bool        ret;
     723             : 
     724             :     /* Push callback + info on the error context stack */
     725           0 :     state.ctx = ctx;
     726           0 :     state.callback_name = "filter_by_origin";
     727           0 :     state.report_location = InvalidXLogRecPtr;
     728           0 :     errcallback.callback = output_plugin_error_callback;
     729           0 :     errcallback.arg = (void *) &state;
     730           0 :     errcallback.previous = error_context_stack;
     731           0 :     error_context_stack = &errcallback;
     732             : 
     733             :     /* set output state */
     734           0 :     ctx->accept_writes = false;
     735             : 
     736             :     /* do the actual work: call callback */
     737           0 :     ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
     738             : 
     739             :     /* Pop the error context stack */
     740           0 :     error_context_stack = errcallback.previous;
     741             : 
     742           0 :     return ret;
     743             : }
     744             : 
     745             : static void
     746           0 : message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
     747             :                    XLogRecPtr message_lsn, bool transactional,
     748             :                    const char *prefix, Size message_size, const char *message)
     749             : {
     750           0 :     LogicalDecodingContext *ctx = cache->private_data;
     751             :     LogicalErrorCallbackState state;
     752             :     ErrorContextCallback errcallback;
     753             : 
     754           0 :     if (ctx->callbacks.message_cb == NULL)
     755           0 :         return;
     756             : 
     757             :     /* Push callback + info on the error context stack */
     758           0 :     state.ctx = ctx;
     759           0 :     state.callback_name = "message";
     760           0 :     state.report_location = message_lsn;
     761           0 :     errcallback.callback = output_plugin_error_callback;
     762           0 :     errcallback.arg = (void *) &state;
     763           0 :     errcallback.previous = error_context_stack;
     764           0 :     error_context_stack = &errcallback;
     765             : 
     766             :     /* set output state */
     767           0 :     ctx->accept_writes = true;
     768           0 :     ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
     769           0 :     ctx->write_location = message_lsn;
     770             : 
     771             :     /* do the actual work: call callback */
     772           0 :     ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
     773             :                               message_size, message);
     774             : 
     775             :     /* Pop the error context stack */
     776           0 :     error_context_stack = errcallback.previous;
     777             : }
     778             : 
     779             : /*
     780             :  * Set the required catalog xmin horizon for historic snapshots in the current
     781             :  * replication slot.
     782             :  *
     783             :  * Note that in the most cases, we won't be able to immediately use the xmin
     784             :  * to increase the xmin horizon: we need to wait till the client has confirmed
     785             :  * receiving current_lsn with LogicalConfirmReceivedLocation().
     786             :  */
     787             : void
     788           0 : LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
     789             : {
     790           0 :     bool        updated_xmin = false;
     791             :     ReplicationSlot *slot;
     792             : 
     793           0 :     slot = MyReplicationSlot;
     794             : 
     795           0 :     Assert(slot != NULL);
     796             : 
     797           0 :     SpinLockAcquire(&slot->mutex);
     798             : 
     799             :     /*
     800             :      * don't overwrite if we already have a newer xmin. This can happen if we
     801             :      * restart decoding in a slot.
     802             :      */
     803           0 :     if (TransactionIdPrecedesOrEquals(xmin, slot->data.catalog_xmin))
     804             :     {
     805             :     }
     806             : 
     807             :     /*
     808             :      * If the client has already confirmed up to this lsn, we directly can
     809             :      * mark this as accepted. This can happen if we restart decoding in a
     810             :      * slot.
     811             :      */
     812           0 :     else if (current_lsn <= slot->data.confirmed_flush)
     813             :     {
     814           0 :         slot->candidate_catalog_xmin = xmin;
     815           0 :         slot->candidate_xmin_lsn = current_lsn;
     816             : 
     817             :         /* our candidate can directly be used */
     818           0 :         updated_xmin = true;
     819             :     }
     820             : 
     821             :     /*
     822             :      * Only increase if the previous values have been applied, otherwise we
     823             :      * might never end up updating if the receiver acks too slowly.
     824             :      */
     825           0 :     else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
     826             :     {
     827           0 :         slot->candidate_catalog_xmin = xmin;
     828           0 :         slot->candidate_xmin_lsn = current_lsn;
     829             :     }
     830           0 :     SpinLockRelease(&slot->mutex);
     831             : 
     832             :     /* candidate already valid with the current flush position, apply */
     833           0 :     if (updated_xmin)
     834           0 :         LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
     835           0 : }
     836             : 
     837             : /*
     838             :  * Mark the minimal LSN (restart_lsn) we need to read to replay all
     839             :  * transactions that have not yet committed at current_lsn.
     840             :  *
     841             :  * Just like IncreaseRestartDecodingForSlot this only takes effect when the
     842             :  * client has confirmed to have received current_lsn.
     843             :  */
     844             : void
     845           0 : LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
     846             : {
     847           0 :     bool        updated_lsn = false;
     848             :     ReplicationSlot *slot;
     849             : 
     850           0 :     slot = MyReplicationSlot;
     851             : 
     852           0 :     Assert(slot != NULL);
     853           0 :     Assert(restart_lsn != InvalidXLogRecPtr);
     854           0 :     Assert(current_lsn != InvalidXLogRecPtr);
     855             : 
     856           0 :     SpinLockAcquire(&slot->mutex);
     857             : 
     858             :     /* don't overwrite if have a newer restart lsn */
     859           0 :     if (restart_lsn <= slot->data.restart_lsn)
     860             :     {
     861             :     }
     862             : 
     863             :     /*
     864             :      * We might have already flushed far enough to directly accept this lsn,
     865             :      * in this case there is no need to check for existing candidate LSNs
     866             :      */
     867           0 :     else if (current_lsn <= slot->data.confirmed_flush)
     868             :     {
     869           0 :         slot->candidate_restart_valid = current_lsn;
     870           0 :         slot->candidate_restart_lsn = restart_lsn;
     871             : 
     872             :         /* our candidate can directly be used */
     873           0 :         updated_lsn = true;
     874             :     }
     875             : 
     876             :     /*
     877             :      * Only increase if the previous values have been applied, otherwise we
     878             :      * might never end up updating if the receiver acks too slowly. A missed
     879             :      * value here will just cause some extra effort after reconnecting.
     880             :      */
     881           0 :     if (slot->candidate_restart_valid == InvalidXLogRecPtr)
     882             :     {
     883           0 :         slot->candidate_restart_valid = current_lsn;
     884           0 :         slot->candidate_restart_lsn = restart_lsn;
     885             : 
     886           0 :         elog(DEBUG1, "got new restart lsn %X/%X at %X/%X",
     887             :              (uint32) (restart_lsn >> 32), (uint32) restart_lsn,
     888             :              (uint32) (current_lsn >> 32), (uint32) current_lsn);
     889             :     }
     890             :     else
     891             :     {
     892           0 :         elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X",
     893             :              (uint32) (restart_lsn >> 32), (uint32) restart_lsn,
     894             :              (uint32) (current_lsn >> 32), (uint32) current_lsn,
     895             :              (uint32) (slot->candidate_restart_lsn >> 32),
     896             :              (uint32) slot->candidate_restart_lsn,
     897             :              (uint32) (slot->candidate_restart_valid >> 32),
     898             :              (uint32) slot->candidate_restart_valid,
     899             :              (uint32) (slot->data.confirmed_flush >> 32),
     900             :              (uint32) slot->data.confirmed_flush
     901             :             );
     902             :     }
     903           0 :     SpinLockRelease(&slot->mutex);
     904             : 
     905             :     /* candidates are already valid with the current flush position, apply */
     906           0 :     if (updated_lsn)
     907           0 :         LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
     908           0 : }
     909             : 
     910             : /*
     911             :  * Handle a consumer's confirmation having received all changes up to lsn.
     912             :  */
     913             : void
     914           0 : LogicalConfirmReceivedLocation(XLogRecPtr lsn)
     915             : {
     916           0 :     Assert(lsn != InvalidXLogRecPtr);
     917             : 
     918             :     /* Do an unlocked check for candidate_lsn first. */
     919           0 :     if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr ||
     920           0 :         MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr)
     921           0 :     {
     922           0 :         bool        updated_xmin = false;
     923           0 :         bool        updated_restart = false;
     924             : 
     925           0 :         SpinLockAcquire(&MyReplicationSlot->mutex);
     926             : 
     927           0 :         MyReplicationSlot->data.confirmed_flush = lsn;
     928             : 
     929             :         /* if we're past the location required for bumping xmin, do so */
     930           0 :         if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr &&
     931           0 :             MyReplicationSlot->candidate_xmin_lsn <= lsn)
     932             :         {
     933             :             /*
     934             :              * We have to write the changed xmin to disk *before* we change
     935             :              * the in-memory value, otherwise after a crash we wouldn't know
     936             :              * that some catalog tuples might have been removed already.
     937             :              *
     938             :              * Ensure that by first writing to ->xmin and only update
     939             :              * ->effective_xmin once the new state is synced to disk. After a
     940             :              * crash ->effective_xmin is set to ->xmin.
     941             :              */
     942           0 :             if (TransactionIdIsValid(MyReplicationSlot->candidate_catalog_xmin) &&
     943           0 :                 MyReplicationSlot->data.catalog_xmin != MyReplicationSlot->candidate_catalog_xmin)
     944             :             {
     945           0 :                 MyReplicationSlot->data.catalog_xmin = MyReplicationSlot->candidate_catalog_xmin;
     946           0 :                 MyReplicationSlot->candidate_catalog_xmin = InvalidTransactionId;
     947           0 :                 MyReplicationSlot->candidate_xmin_lsn = InvalidXLogRecPtr;
     948           0 :                 updated_xmin = true;
     949             :             }
     950             :         }
     951             : 
     952           0 :         if (MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr &&
     953           0 :             MyReplicationSlot->candidate_restart_valid <= lsn)
     954             :         {
     955           0 :             Assert(MyReplicationSlot->candidate_restart_lsn != InvalidXLogRecPtr);
     956             : 
     957           0 :             MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn;
     958           0 :             MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
     959           0 :             MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
     960           0 :             updated_restart = true;
     961             :         }
     962             : 
     963           0 :         SpinLockRelease(&MyReplicationSlot->mutex);
     964             : 
     965             :         /* first write new xmin to disk, so we know what's up after a crash */
     966           0 :         if (updated_xmin || updated_restart)
     967             :         {
     968           0 :             ReplicationSlotMarkDirty();
     969           0 :             ReplicationSlotSave();
     970           0 :             elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
     971             :         }
     972             : 
     973             :         /*
     974             :          * Now the new xmin is safely on disk, we can let the global value
     975             :          * advance. We do not take ProcArrayLock or similar since we only
     976             :          * advance xmin here and there's not much harm done by a concurrent
     977             :          * computation missing that.
     978             :          */
     979           0 :         if (updated_xmin)
     980             :         {
     981           0 :             SpinLockAcquire(&MyReplicationSlot->mutex);
     982           0 :             MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
     983           0 :             SpinLockRelease(&MyReplicationSlot->mutex);
     984             : 
     985           0 :             ReplicationSlotsComputeRequiredXmin(false);
     986           0 :             ReplicationSlotsComputeRequiredLSN();
     987             :         }
     988             :     }
     989             :     else
     990             :     {
     991           0 :         SpinLockAcquire(&MyReplicationSlot->mutex);
     992           0 :         MyReplicationSlot->data.confirmed_flush = lsn;
     993           0 :         SpinLockRelease(&MyReplicationSlot->mutex);
     994             :     }
     995           0 : }
 |