Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * logical.c
3 : * PostgreSQL logical decoding coordination
4 : *
5 : * Copyright (c) 2012-2017, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/logical.c
9 : *
10 : * NOTES
11 : * This file coordinates interaction between the various modules that
12 : * together provide logical decoding, primarily by providing so
13 : * called LogicalDecodingContexts. The goal is to encapsulate most of the
14 : * internal complexity for consumers of logical decoding, so they can
15 : * create and consume a changestream with a low amount of code. Builtin
16 : * consumers are the walsender and SQL SRF interface, but it's possible to
17 : * add further ones without changing core code, e.g. to consume changes in
18 : * a bgworker.
19 : *
20 : * The idea is that a consumer provides three callbacks, one to read WAL,
21 : * one to prepare a data write, and a final one for actually writing since
22 : * their implementation depends on the type of consumer. Check
23 : * logicalfuncs.c for an example implementation of a fairly simple consumer
24 : * and an implementation of a WAL reading callback that's suitable for
25 : * simple consumers.
26 : *-------------------------------------------------------------------------
27 : */
28 :
29 : #include "postgres.h"
30 :
31 : #include "miscadmin.h"
32 :
33 : #include "access/xact.h"
34 : #include "access/xlog_internal.h"
35 :
36 : #include "replication/decode.h"
37 : #include "replication/logical.h"
38 : #include "replication/reorderbuffer.h"
39 : #include "replication/origin.h"
40 : #include "replication/snapbuild.h"
41 :
42 : #include "storage/proc.h"
43 : #include "storage/procarray.h"
44 :
45 : #include "utils/memutils.h"
46 :
47 : /* data for errcontext callback */
48 : typedef struct LogicalErrorCallbackState
49 : {
50 : LogicalDecodingContext *ctx;
51 : const char *callback_name;
52 : XLogRecPtr report_location;
53 : } LogicalErrorCallbackState;
54 :
55 : /* wrappers around output plugin callbacks */
56 : static void output_plugin_error_callback(void *arg);
57 : static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
58 : bool is_init);
59 : static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
60 : static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
61 : static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
62 : XLogRecPtr commit_lsn);
63 : static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
64 : Relation relation, ReorderBufferChange *change);
65 : static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
66 : XLogRecPtr message_lsn, bool transactional,
67 : const char *prefix, Size message_size, const char *message);
68 :
69 : static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin);
70 :
71 : /*
72 : * Make sure the current settings & environment are capable of doing logical
73 : * decoding.
74 : */
75 : void
76 0 : CheckLogicalDecodingRequirements(void)
77 : {
78 0 : CheckSlotRequirements();
79 :
80 0 : if (wal_level < WAL_LEVEL_LOGICAL)
81 0 : ereport(ERROR,
82 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
83 : errmsg("logical decoding requires wal_level >= logical")));
84 :
85 0 : if (MyDatabaseId == InvalidOid)
86 0 : ereport(ERROR,
87 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
88 : errmsg("logical decoding requires a database connection")));
89 :
90 : /* ----
91 : * TODO: We got to change that someday soon...
92 : *
93 : * There's basically three things missing to allow this:
94 : * 1) We need to be able to correctly and quickly identify the timeline a
95 : * LSN belongs to
96 : * 2) We need to force hot_standby_feedback to be enabled at all times so
97 : * the primary cannot remove rows we need.
98 : * 3) support dropping replication slots referring to a database, in
99 : * dbase_redo. There can't be any active ones due to HS recovery
100 : * conflicts, so that should be relatively easy.
101 : * ----
102 : */
103 0 : if (RecoveryInProgress())
104 0 : ereport(ERROR,
105 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
106 : errmsg("logical decoding cannot be used while in recovery")));
107 0 : }
108 :
109 : /*
110 : * Helper function for CreateInitialDecodingContext() and
111 : * CreateDecodingContext() performing common tasks.
112 : */
113 : static LogicalDecodingContext *
114 0 : StartupDecodingContext(List *output_plugin_options,
115 : XLogRecPtr start_lsn,
116 : TransactionId xmin_horizon,
117 : bool need_full_snapshot,
118 : XLogPageReadCB read_page,
119 : LogicalOutputPluginWriterPrepareWrite prepare_write,
120 : LogicalOutputPluginWriterWrite do_write,
121 : LogicalOutputPluginWriterUpdateProgress update_progress)
122 : {
123 : ReplicationSlot *slot;
124 : MemoryContext context,
125 : old_context;
126 : LogicalDecodingContext *ctx;
127 :
128 : /* shorter lines... */
129 0 : slot = MyReplicationSlot;
130 :
131 0 : context = AllocSetContextCreate(CurrentMemoryContext,
132 : "Logical decoding context",
133 : ALLOCSET_DEFAULT_SIZES);
134 0 : old_context = MemoryContextSwitchTo(context);
135 0 : ctx = palloc0(sizeof(LogicalDecodingContext));
136 :
137 0 : ctx->context = context;
138 :
139 : /*
140 : * (re-)load output plugins, so we detect a bad (removed) output plugin
141 : * now.
142 : */
143 0 : LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));
144 :
145 : /*
146 : * Now that the slot's xmin has been set, we can announce ourselves as a
147 : * logical decoding backend which doesn't need to be checked individually
148 : * when computing the xmin horizon because the xmin is enforced via
149 : * replication slots.
150 : *
151 : * We can only do so if we're outside of a transaction (i.e. the case when
152 : * streaming changes via walsender), otherwise an already setup
153 : * snapshot/xid would end up being ignored. That's not a particularly
154 : * bothersome restriction since the SQL interface can't be used for
155 : * streaming anyway.
156 : */
157 0 : if (!IsTransactionOrTransactionBlock())
158 : {
159 0 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
160 0 : MyPgXact->vacuumFlags |= PROC_IN_LOGICAL_DECODING;
161 0 : LWLockRelease(ProcArrayLock);
162 : }
163 :
164 0 : ctx->slot = slot;
165 :
166 0 : ctx->reader = XLogReaderAllocate(read_page, ctx);
167 0 : if (!ctx->reader)
168 0 : ereport(ERROR,
169 : (errcode(ERRCODE_OUT_OF_MEMORY),
170 : errmsg("out of memory")));
171 :
172 0 : ctx->reader->private_data = ctx;
173 :
174 0 : ctx->reorder = ReorderBufferAllocate();
175 0 : ctx->snapshot_builder =
176 0 : AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
177 : need_full_snapshot);
178 :
179 0 : ctx->reorder->private_data = ctx;
180 :
181 : /* wrap output plugin callbacks, so we can add error context information */
182 0 : ctx->reorder->begin = begin_cb_wrapper;
183 0 : ctx->reorder->apply_change = change_cb_wrapper;
184 0 : ctx->reorder->commit = commit_cb_wrapper;
185 0 : ctx->reorder->message = message_cb_wrapper;
186 :
187 0 : ctx->out = makeStringInfo();
188 0 : ctx->prepare_write = prepare_write;
189 0 : ctx->write = do_write;
190 0 : ctx->update_progress = update_progress;
191 :
192 0 : ctx->output_plugin_options = output_plugin_options;
193 :
194 0 : MemoryContextSwitchTo(old_context);
195 :
196 0 : return ctx;
197 : }
198 :
199 : /*
200 : * Create a new decoding context, for a new logical slot.
201 : *
202 : * plugin contains the name of the output plugin
203 : * output_plugin_options contains options passed to the output plugin
204 : * read_page, prepare_write, do_write, update_progress
205 : * callbacks that have to be filled to perform the use-case dependent,
206 : * actual, work.
207 : *
208 : * Needs to be called while in a memory context that's at least as long lived
209 : * as the decoding context because further memory contexts will be created
210 : * inside it.
211 : *
212 : * Returns an initialized decoding context after calling the output plugin's
213 : * startup function.
214 : */
215 : LogicalDecodingContext *
216 0 : CreateInitDecodingContext(char *plugin,
217 : List *output_plugin_options,
218 : bool need_full_snapshot,
219 : XLogPageReadCB read_page,
220 : LogicalOutputPluginWriterPrepareWrite prepare_write,
221 : LogicalOutputPluginWriterWrite do_write,
222 : LogicalOutputPluginWriterUpdateProgress update_progress)
223 : {
224 0 : TransactionId xmin_horizon = InvalidTransactionId;
225 : ReplicationSlot *slot;
226 : LogicalDecodingContext *ctx;
227 : MemoryContext old_context;
228 :
229 : /* shorter lines... */
230 0 : slot = MyReplicationSlot;
231 :
232 : /* first some sanity checks that are unlikely to be violated */
233 0 : if (slot == NULL)
234 0 : elog(ERROR, "cannot perform logical decoding without an acquired slot");
235 :
236 0 : if (plugin == NULL)
237 0 : elog(ERROR, "cannot initialize logical decoding without a specified plugin");
238 :
239 : /* Make sure the passed slot is suitable. These are user facing errors. */
240 0 : if (SlotIsPhysical(slot))
241 0 : ereport(ERROR,
242 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
243 : errmsg("cannot use physical replication slot for logical decoding")));
244 :
245 0 : if (slot->data.database != MyDatabaseId)
246 0 : ereport(ERROR,
247 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
248 : errmsg("replication slot \"%s\" was not created in this database",
249 : NameStr(slot->data.name))));
250 :
251 0 : if (IsTransactionState() &&
252 0 : GetTopTransactionIdIfAny() != InvalidTransactionId)
253 0 : ereport(ERROR,
254 : (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
255 : errmsg("cannot create logical replication slot in transaction that has performed writes")));
256 :
257 : /* register output plugin name with slot */
258 0 : SpinLockAcquire(&slot->mutex);
259 0 : StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN);
260 0 : SpinLockRelease(&slot->mutex);
261 :
262 0 : ReplicationSlotReserveWal();
263 :
264 : /* ----
265 : * This is a bit tricky: We need to determine a safe xmin horizon to start
266 : * decoding from, to avoid starting from a running xacts record referring
267 : * to xids whose rows have been vacuumed or pruned
268 : * already. GetOldestSafeDecodingTransactionId() returns such a value, but
269 : * without further interlock its return value might immediately be out of
270 : * date.
271 : *
272 : * So we have to acquire the ProcArrayLock to prevent computation of new
273 : * xmin horizons by other backends, get the safe decoding xid, and inform
274 : * the slot machinery about the new limit. Once that's done the
275 : * ProcArrayLock can be released as the slot machinery now is
276 : * protecting against vacuum.
277 : *
278 : * Note that, temporarily, the data, not just the catalog, xmin has to be
279 : * reserved if a data snapshot is to be exported. Otherwise the initial
280 : * data snapshot created here is not guaranteed to be valid. After that
281 : * the data xmin doesn't need to be managed anymore and the global xmin
282 : * should be recomputed. As we are fine with losing the pegged data xmin
283 : * after crash - no chance a snapshot would get exported anymore - we can
284 : * get away with just setting the slot's
285 : * effective_xmin. ReplicationSlotRelease will reset it again.
286 : *
287 : * ----
288 : */
289 0 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
290 :
291 0 : xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
292 :
293 0 : slot->effective_catalog_xmin = xmin_horizon;
294 0 : slot->data.catalog_xmin = xmin_horizon;
295 0 : if (need_full_snapshot)
296 0 : slot->effective_xmin = xmin_horizon;
297 :
298 0 : ReplicationSlotsComputeRequiredXmin(true);
299 :
300 0 : LWLockRelease(ProcArrayLock);
301 :
302 0 : ReplicationSlotMarkDirty();
303 0 : ReplicationSlotSave();
304 :
305 0 : ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
306 : need_full_snapshot, read_page, prepare_write,
307 : do_write, update_progress);
308 :
309 : /* call output plugin initialization callback */
310 0 : old_context = MemoryContextSwitchTo(ctx->context);
311 0 : if (ctx->callbacks.startup_cb != NULL)
312 0 : startup_cb_wrapper(ctx, &ctx->options, true);
313 0 : MemoryContextSwitchTo(old_context);
314 :
315 0 : return ctx;
316 : }
317 :
318 : /*
319 : * Create a new decoding context, for a logical slot that has previously been
320 : * used already.
321 : *
322 : * start_lsn
323 : * The LSN at which to start decoding. If InvalidXLogRecPtr, restart
324 : * from the slot's confirmed_flush; otherwise, start from the specified
325 : * location (but move it forwards to confirmed_flush if it's older than
326 : * that, see below).
327 : *
328 : * output_plugin_options
329 : * contains options passed to the output plugin.
330 : *
331 : * read_page, prepare_write, do_write, update_progress
332 : * callbacks that have to be filled to perform the use-case dependent,
333 : * actual work.
334 : *
335 : * Needs to be called while in a memory context that's at least as long lived
336 : * as the decoding context because further memory contexts will be created
337 : * inside it.
338 : *
339 : * Returns an initialized decoding context after calling the output plugin's
340 : * startup function.
341 : */
342 : LogicalDecodingContext *
343 0 : CreateDecodingContext(XLogRecPtr start_lsn,
344 : List *output_plugin_options,
345 : XLogPageReadCB read_page,
346 : LogicalOutputPluginWriterPrepareWrite prepare_write,
347 : LogicalOutputPluginWriterWrite do_write,
348 : LogicalOutputPluginWriterUpdateProgress update_progress)
349 : {
350 : LogicalDecodingContext *ctx;
351 : ReplicationSlot *slot;
352 : MemoryContext old_context;
353 :
354 : /* shorter lines... */
355 0 : slot = MyReplicationSlot;
356 :
357 : /* first some sanity checks that are unlikely to be violated */
358 0 : if (slot == NULL)
359 0 : elog(ERROR, "cannot perform logical decoding without an acquired slot");
360 :
361 : /* make sure the passed slot is suitable, these are user facing errors */
362 0 : if (SlotIsPhysical(slot))
363 0 : ereport(ERROR,
364 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
365 : (errmsg("cannot use physical replication slot for logical decoding"))));
366 :
367 0 : if (slot->data.database != MyDatabaseId)
368 0 : ereport(ERROR,
369 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
370 : (errmsg("replication slot \"%s\" was not created in this database",
371 : NameStr(slot->data.name)))));
372 :
373 0 : if (start_lsn == InvalidXLogRecPtr)
374 : {
375 : /* continue from last position */
376 0 : start_lsn = slot->data.confirmed_flush;
377 : }
378 0 : else if (start_lsn < slot->data.confirmed_flush)
379 : {
380 : /*
381 : * It might seem like we should error out in this case, but it's
382 : * pretty common for a client to acknowledge a LSN it doesn't have to
383 : * do anything for, and thus didn't store persistently, because the
384 : * xlog records didn't result in anything relevant for logical
385 : * decoding. Clients have to be able to do that to support synchronous
386 : * replication.
387 : */
388 0 : elog(DEBUG1, "cannot stream from %X/%X, minimum is %X/%X, forwarding",
389 : (uint32) (start_lsn >> 32), (uint32) start_lsn,
390 : (uint32) (slot->data.confirmed_flush >> 32),
391 : (uint32) slot->data.confirmed_flush);
392 :
393 0 : start_lsn = slot->data.confirmed_flush;
394 : }
395 :
396 0 : ctx = StartupDecodingContext(output_plugin_options,
397 : start_lsn, InvalidTransactionId, false,
398 : read_page, prepare_write, do_write,
399 : update_progress);
400 :
401 : /* call output plugin initialization callback */
402 0 : old_context = MemoryContextSwitchTo(ctx->context);
403 0 : if (ctx->callbacks.startup_cb != NULL)
404 0 : startup_cb_wrapper(ctx, &ctx->options, false);
405 0 : MemoryContextSwitchTo(old_context);
406 :
407 0 : ereport(LOG,
408 : (errmsg("starting logical decoding for slot \"%s\"",
409 : NameStr(slot->data.name)),
410 : errdetail("streaming transactions committing after %X/%X, reading WAL from %X/%X",
411 : (uint32) (slot->data.confirmed_flush >> 32),
412 : (uint32) slot->data.confirmed_flush,
413 : (uint32) (slot->data.restart_lsn >> 32),
414 : (uint32) slot->data.restart_lsn)));
415 :
416 0 : return ctx;
417 : }
418 :
419 : /*
420 : * Returns true if a consistent initial decoding snapshot has been built.
421 : */
422 : bool
423 0 : DecodingContextReady(LogicalDecodingContext *ctx)
424 : {
425 0 : return SnapBuildCurrentState(ctx->snapshot_builder) == SNAPBUILD_CONSISTENT;
426 : }
427 :
428 : /*
429 : * Read from the decoding slot, until it is ready to start extracting changes.
430 : */
431 : void
432 0 : DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
433 : {
434 : XLogRecPtr startptr;
435 :
436 : /* Initialize from where to start reading WAL. */
437 0 : startptr = ctx->slot->data.restart_lsn;
438 :
439 0 : elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
440 : (uint32) (ctx->slot->data.restart_lsn >> 32),
441 : (uint32) ctx->slot->data.restart_lsn);
442 :
443 : /* Wait for a consistent starting point */
444 : for (;;)
445 : {
446 : XLogRecord *record;
447 0 : char *err = NULL;
448 :
449 : /* the read_page callback waits for new WAL */
450 0 : record = XLogReadRecord(ctx->reader, startptr, &err);
451 0 : if (err)
452 0 : elog(ERROR, "%s", err);
453 0 : if (!record)
454 0 : elog(ERROR, "no record found"); /* shouldn't happen */
455 :
456 0 : startptr = InvalidXLogRecPtr;
457 :
458 0 : LogicalDecodingProcessRecord(ctx, ctx->reader);
459 :
460 : /* only continue till we found a consistent spot */
461 0 : if (DecodingContextReady(ctx))
462 0 : break;
463 :
464 0 : CHECK_FOR_INTERRUPTS();
465 0 : }
466 :
467 0 : ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr;
468 0 : }
469 :
470 : /*
471 : * Free a previously allocated decoding context, invoking the shutdown
472 : * callback if necessary.
473 : */
474 : void
475 0 : FreeDecodingContext(LogicalDecodingContext *ctx)
476 : {
477 0 : if (ctx->callbacks.shutdown_cb != NULL)
478 0 : shutdown_cb_wrapper(ctx);
479 :
480 0 : ReorderBufferFree(ctx->reorder);
481 0 : FreeSnapshotBuilder(ctx->snapshot_builder);
482 0 : XLogReaderFree(ctx->reader);
483 0 : MemoryContextDelete(ctx->context);
484 0 : }
485 :
486 : /*
487 : * Prepare a write using the context's output routine.
488 : */
489 : void
490 0 : OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
491 : {
492 0 : if (!ctx->accept_writes)
493 0 : elog(ERROR, "writes are only accepted in commit, begin and change callbacks");
494 :
495 0 : ctx->prepare_write(ctx, ctx->write_location, ctx->write_xid, last_write);
496 0 : ctx->prepared_write = true;
497 0 : }
498 :
499 : /*
500 : * Perform a write using the context's output routine.
501 : */
502 : void
503 0 : OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
504 : {
505 0 : if (!ctx->prepared_write)
506 0 : elog(ERROR, "OutputPluginPrepareWrite needs to be called before OutputPluginWrite");
507 :
508 0 : ctx->write(ctx, ctx->write_location, ctx->write_xid, last_write);
509 0 : ctx->prepared_write = false;
510 0 : }
511 :
512 : /*
513 : * Update progress tracking (if supported).
514 : */
515 : void
516 0 : OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
517 : {
518 0 : if (!ctx->update_progress)
519 0 : return;
520 :
521 0 : ctx->update_progress(ctx, ctx->write_location, ctx->write_xid);
522 : }
523 :
524 : /*
525 : * Load the output plugin, lookup its output plugin init function, and check
526 : * that it provides the required callbacks.
527 : */
528 : static void
529 0 : LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin)
530 : {
531 : LogicalOutputPluginInit plugin_init;
532 :
533 0 : plugin_init = (LogicalOutputPluginInit)
534 : load_external_function(plugin, "_PG_output_plugin_init", false, NULL);
535 :
536 0 : if (plugin_init == NULL)
537 0 : elog(ERROR, "output plugins have to declare the _PG_output_plugin_init symbol");
538 :
539 : /* ask the output plugin to fill the callback struct */
540 0 : plugin_init(callbacks);
541 :
542 0 : if (callbacks->begin_cb == NULL)
543 0 : elog(ERROR, "output plugins have to register a begin callback");
544 0 : if (callbacks->change_cb == NULL)
545 0 : elog(ERROR, "output plugins have to register a change callback");
546 0 : if (callbacks->commit_cb == NULL)
547 0 : elog(ERROR, "output plugins have to register a commit callback");
548 0 : }
549 :
550 : static void
551 0 : output_plugin_error_callback(void *arg)
552 : {
553 0 : LogicalErrorCallbackState *state = (LogicalErrorCallbackState *) arg;
554 :
555 : /* not all callbacks have an associated LSN */
556 0 : if (state->report_location != InvalidXLogRecPtr)
557 0 : errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%X",
558 0 : NameStr(state->ctx->slot->data.name),
559 0 : NameStr(state->ctx->slot->data.plugin),
560 : state->callback_name,
561 0 : (uint32) (state->report_location >> 32),
562 0 : (uint32) state->report_location);
563 : else
564 0 : errcontext("slot \"%s\", output plugin \"%s\", in the %s callback",
565 0 : NameStr(state->ctx->slot->data.name),
566 0 : NameStr(state->ctx->slot->data.plugin),
567 : state->callback_name);
568 0 : }
569 :
570 : static void
571 0 : startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
572 : {
573 : LogicalErrorCallbackState state;
574 : ErrorContextCallback errcallback;
575 :
576 : /* Push callback + info on the error context stack */
577 0 : state.ctx = ctx;
578 0 : state.callback_name = "startup";
579 0 : state.report_location = InvalidXLogRecPtr;
580 0 : errcallback.callback = output_plugin_error_callback;
581 0 : errcallback.arg = (void *) &state;
582 0 : errcallback.previous = error_context_stack;
583 0 : error_context_stack = &errcallback;
584 :
585 : /* set output state */
586 0 : ctx->accept_writes = false;
587 :
588 : /* do the actual work: call callback */
589 0 : ctx->callbacks.startup_cb(ctx, opt, is_init);
590 :
591 : /* Pop the error context stack */
592 0 : error_context_stack = errcallback.previous;
593 0 : }
594 :
595 : static void
596 0 : shutdown_cb_wrapper(LogicalDecodingContext *ctx)
597 : {
598 : LogicalErrorCallbackState state;
599 : ErrorContextCallback errcallback;
600 :
601 : /* Push callback + info on the error context stack */
602 0 : state.ctx = ctx;
603 0 : state.callback_name = "shutdown";
604 0 : state.report_location = InvalidXLogRecPtr;
605 0 : errcallback.callback = output_plugin_error_callback;
606 0 : errcallback.arg = (void *) &state;
607 0 : errcallback.previous = error_context_stack;
608 0 : error_context_stack = &errcallback;
609 :
610 : /* set output state */
611 0 : ctx->accept_writes = false;
612 :
613 : /* do the actual work: call callback */
614 0 : ctx->callbacks.shutdown_cb(ctx);
615 :
616 : /* Pop the error context stack */
617 0 : error_context_stack = errcallback.previous;
618 0 : }
619 :
620 :
621 : /*
622 : * Callbacks for ReorderBuffer which add in some more information and then call
623 : * output_plugin.h plugins.
624 : */
625 : static void
626 0 : begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
627 : {
628 0 : LogicalDecodingContext *ctx = cache->private_data;
629 : LogicalErrorCallbackState state;
630 : ErrorContextCallback errcallback;
631 :
632 : /* Push callback + info on the error context stack */
633 0 : state.ctx = ctx;
634 0 : state.callback_name = "begin";
635 0 : state.report_location = txn->first_lsn;
636 0 : errcallback.callback = output_plugin_error_callback;
637 0 : errcallback.arg = (void *) &state;
638 0 : errcallback.previous = error_context_stack;
639 0 : error_context_stack = &errcallback;
640 :
641 : /* set output state */
642 0 : ctx->accept_writes = true;
643 0 : ctx->write_xid = txn->xid;
644 0 : ctx->write_location = txn->first_lsn;
645 :
646 : /* do the actual work: call callback */
647 0 : ctx->callbacks.begin_cb(ctx, txn);
648 :
649 : /* Pop the error context stack */
650 0 : error_context_stack = errcallback.previous;
651 0 : }
652 :
653 : static void
654 0 : commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
655 : XLogRecPtr commit_lsn)
656 : {
657 0 : LogicalDecodingContext *ctx = cache->private_data;
658 : LogicalErrorCallbackState state;
659 : ErrorContextCallback errcallback;
660 :
661 : /* Push callback + info on the error context stack */
662 0 : state.ctx = ctx;
663 0 : state.callback_name = "commit";
664 0 : state.report_location = txn->final_lsn; /* beginning of commit record */
665 0 : errcallback.callback = output_plugin_error_callback;
666 0 : errcallback.arg = (void *) &state;
667 0 : errcallback.previous = error_context_stack;
668 0 : error_context_stack = &errcallback;
669 :
670 : /* set output state */
671 0 : ctx->accept_writes = true;
672 0 : ctx->write_xid = txn->xid;
673 0 : ctx->write_location = txn->end_lsn; /* points to the end of the record */
674 :
675 : /* do the actual work: call callback */
676 0 : ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
677 :
678 : /* Pop the error context stack */
679 0 : error_context_stack = errcallback.previous;
680 0 : }
681 :
682 : static void
683 0 : change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
684 : Relation relation, ReorderBufferChange *change)
685 : {
686 0 : LogicalDecodingContext *ctx = cache->private_data;
687 : LogicalErrorCallbackState state;
688 : ErrorContextCallback errcallback;
689 :
690 : /* Push callback + info on the error context stack */
691 0 : state.ctx = ctx;
692 0 : state.callback_name = "change";
693 0 : state.report_location = change->lsn;
694 0 : errcallback.callback = output_plugin_error_callback;
695 0 : errcallback.arg = (void *) &state;
696 0 : errcallback.previous = error_context_stack;
697 0 : error_context_stack = &errcallback;
698 :
699 : /* set output state */
700 0 : ctx->accept_writes = true;
701 0 : ctx->write_xid = txn->xid;
702 :
703 : /*
704 : * report this change's lsn so replies from clients can give an up2date
705 : * answer. This won't ever be enough (and shouldn't be!) to confirm
706 : * receipt of this transaction, but it might allow another transaction's
707 : * commit to be confirmed with one message.
708 : */
709 0 : ctx->write_location = change->lsn;
710 :
711 0 : ctx->callbacks.change_cb(ctx, txn, relation, change);
712 :
713 : /* Pop the error context stack */
714 0 : error_context_stack = errcallback.previous;
715 0 : }
716 :
717 : bool
718 0 : filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
719 : {
720 : LogicalErrorCallbackState state;
721 : ErrorContextCallback errcallback;
722 : bool ret;
723 :
724 : /* Push callback + info on the error context stack */
725 0 : state.ctx = ctx;
726 0 : state.callback_name = "filter_by_origin";
727 0 : state.report_location = InvalidXLogRecPtr;
728 0 : errcallback.callback = output_plugin_error_callback;
729 0 : errcallback.arg = (void *) &state;
730 0 : errcallback.previous = error_context_stack;
731 0 : error_context_stack = &errcallback;
732 :
733 : /* set output state */
734 0 : ctx->accept_writes = false;
735 :
736 : /* do the actual work: call callback */
737 0 : ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
738 :
739 : /* Pop the error context stack */
740 0 : error_context_stack = errcallback.previous;
741 :
742 0 : return ret;
743 : }
744 :
745 : static void
746 0 : message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
747 : XLogRecPtr message_lsn, bool transactional,
748 : const char *prefix, Size message_size, const char *message)
749 : {
750 0 : LogicalDecodingContext *ctx = cache->private_data;
751 : LogicalErrorCallbackState state;
752 : ErrorContextCallback errcallback;
753 :
754 0 : if (ctx->callbacks.message_cb == NULL)
755 0 : return;
756 :
757 : /* Push callback + info on the error context stack */
758 0 : state.ctx = ctx;
759 0 : state.callback_name = "message";
760 0 : state.report_location = message_lsn;
761 0 : errcallback.callback = output_plugin_error_callback;
762 0 : errcallback.arg = (void *) &state;
763 0 : errcallback.previous = error_context_stack;
764 0 : error_context_stack = &errcallback;
765 :
766 : /* set output state */
767 0 : ctx->accept_writes = true;
768 0 : ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
769 0 : ctx->write_location = message_lsn;
770 :
771 : /* do the actual work: call callback */
772 0 : ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
773 : message_size, message);
774 :
775 : /* Pop the error context stack */
776 0 : error_context_stack = errcallback.previous;
777 : }
778 :
779 : /*
780 : * Set the required catalog xmin horizon for historic snapshots in the current
781 : * replication slot.
782 : *
783 : * Note that in the most cases, we won't be able to immediately use the xmin
784 : * to increase the xmin horizon: we need to wait till the client has confirmed
785 : * receiving current_lsn with LogicalConfirmReceivedLocation().
786 : */
787 : void
788 0 : LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
789 : {
790 0 : bool updated_xmin = false;
791 : ReplicationSlot *slot;
792 :
793 0 : slot = MyReplicationSlot;
794 :
795 0 : Assert(slot != NULL);
796 :
797 0 : SpinLockAcquire(&slot->mutex);
798 :
799 : /*
800 : * don't overwrite if we already have a newer xmin. This can happen if we
801 : * restart decoding in a slot.
802 : */
803 0 : if (TransactionIdPrecedesOrEquals(xmin, slot->data.catalog_xmin))
804 : {
805 : }
806 :
807 : /*
808 : * If the client has already confirmed up to this lsn, we directly can
809 : * mark this as accepted. This can happen if we restart decoding in a
810 : * slot.
811 : */
812 0 : else if (current_lsn <= slot->data.confirmed_flush)
813 : {
814 0 : slot->candidate_catalog_xmin = xmin;
815 0 : slot->candidate_xmin_lsn = current_lsn;
816 :
817 : /* our candidate can directly be used */
818 0 : updated_xmin = true;
819 : }
820 :
821 : /*
822 : * Only increase if the previous values have been applied, otherwise we
823 : * might never end up updating if the receiver acks too slowly.
824 : */
825 0 : else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
826 : {
827 0 : slot->candidate_catalog_xmin = xmin;
828 0 : slot->candidate_xmin_lsn = current_lsn;
829 : }
830 0 : SpinLockRelease(&slot->mutex);
831 :
832 : /* candidate already valid with the current flush position, apply */
833 0 : if (updated_xmin)
834 0 : LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
835 0 : }
836 :
837 : /*
838 : * Mark the minimal LSN (restart_lsn) we need to read to replay all
839 : * transactions that have not yet committed at current_lsn.
840 : *
841 : * Just like IncreaseRestartDecodingForSlot this only takes effect when the
842 : * client has confirmed to have received current_lsn.
843 : */
844 : void
845 0 : LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
846 : {
847 0 : bool updated_lsn = false;
848 : ReplicationSlot *slot;
849 :
850 0 : slot = MyReplicationSlot;
851 :
852 0 : Assert(slot != NULL);
853 0 : Assert(restart_lsn != InvalidXLogRecPtr);
854 0 : Assert(current_lsn != InvalidXLogRecPtr);
855 :
856 0 : SpinLockAcquire(&slot->mutex);
857 :
858 : /* don't overwrite if have a newer restart lsn */
859 0 : if (restart_lsn <= slot->data.restart_lsn)
860 : {
861 : }
862 :
863 : /*
864 : * We might have already flushed far enough to directly accept this lsn,
865 : * in this case there is no need to check for existing candidate LSNs
866 : */
867 0 : else if (current_lsn <= slot->data.confirmed_flush)
868 : {
869 0 : slot->candidate_restart_valid = current_lsn;
870 0 : slot->candidate_restart_lsn = restart_lsn;
871 :
872 : /* our candidate can directly be used */
873 0 : updated_lsn = true;
874 : }
875 :
876 : /*
877 : * Only increase if the previous values have been applied, otherwise we
878 : * might never end up updating if the receiver acks too slowly. A missed
879 : * value here will just cause some extra effort after reconnecting.
880 : */
881 0 : if (slot->candidate_restart_valid == InvalidXLogRecPtr)
882 : {
883 0 : slot->candidate_restart_valid = current_lsn;
884 0 : slot->candidate_restart_lsn = restart_lsn;
885 :
886 0 : elog(DEBUG1, "got new restart lsn %X/%X at %X/%X",
887 : (uint32) (restart_lsn >> 32), (uint32) restart_lsn,
888 : (uint32) (current_lsn >> 32), (uint32) current_lsn);
889 : }
890 : else
891 : {
892 0 : elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X",
893 : (uint32) (restart_lsn >> 32), (uint32) restart_lsn,
894 : (uint32) (current_lsn >> 32), (uint32) current_lsn,
895 : (uint32) (slot->candidate_restart_lsn >> 32),
896 : (uint32) slot->candidate_restart_lsn,
897 : (uint32) (slot->candidate_restart_valid >> 32),
898 : (uint32) slot->candidate_restart_valid,
899 : (uint32) (slot->data.confirmed_flush >> 32),
900 : (uint32) slot->data.confirmed_flush
901 : );
902 : }
903 0 : SpinLockRelease(&slot->mutex);
904 :
905 : /* candidates are already valid with the current flush position, apply */
906 0 : if (updated_lsn)
907 0 : LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
908 0 : }
909 :
910 : /*
911 : * Handle a consumer's confirmation having received all changes up to lsn.
912 : */
913 : void
914 0 : LogicalConfirmReceivedLocation(XLogRecPtr lsn)
915 : {
916 0 : Assert(lsn != InvalidXLogRecPtr);
917 :
918 : /* Do an unlocked check for candidate_lsn first. */
919 0 : if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr ||
920 0 : MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr)
921 0 : {
922 0 : bool updated_xmin = false;
923 0 : bool updated_restart = false;
924 :
925 0 : SpinLockAcquire(&MyReplicationSlot->mutex);
926 :
927 0 : MyReplicationSlot->data.confirmed_flush = lsn;
928 :
929 : /* if we're past the location required for bumping xmin, do so */
930 0 : if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr &&
931 0 : MyReplicationSlot->candidate_xmin_lsn <= lsn)
932 : {
933 : /*
934 : * We have to write the changed xmin to disk *before* we change
935 : * the in-memory value, otherwise after a crash we wouldn't know
936 : * that some catalog tuples might have been removed already.
937 : *
938 : * Ensure that by first writing to ->xmin and only update
939 : * ->effective_xmin once the new state is synced to disk. After a
940 : * crash ->effective_xmin is set to ->xmin.
941 : */
942 0 : if (TransactionIdIsValid(MyReplicationSlot->candidate_catalog_xmin) &&
943 0 : MyReplicationSlot->data.catalog_xmin != MyReplicationSlot->candidate_catalog_xmin)
944 : {
945 0 : MyReplicationSlot->data.catalog_xmin = MyReplicationSlot->candidate_catalog_xmin;
946 0 : MyReplicationSlot->candidate_catalog_xmin = InvalidTransactionId;
947 0 : MyReplicationSlot->candidate_xmin_lsn = InvalidXLogRecPtr;
948 0 : updated_xmin = true;
949 : }
950 : }
951 :
952 0 : if (MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr &&
953 0 : MyReplicationSlot->candidate_restart_valid <= lsn)
954 : {
955 0 : Assert(MyReplicationSlot->candidate_restart_lsn != InvalidXLogRecPtr);
956 :
957 0 : MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn;
958 0 : MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
959 0 : MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
960 0 : updated_restart = true;
961 : }
962 :
963 0 : SpinLockRelease(&MyReplicationSlot->mutex);
964 :
965 : /* first write new xmin to disk, so we know what's up after a crash */
966 0 : if (updated_xmin || updated_restart)
967 : {
968 0 : ReplicationSlotMarkDirty();
969 0 : ReplicationSlotSave();
970 0 : elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
971 : }
972 :
973 : /*
974 : * Now the new xmin is safely on disk, we can let the global value
975 : * advance. We do not take ProcArrayLock or similar since we only
976 : * advance xmin here and there's not much harm done by a concurrent
977 : * computation missing that.
978 : */
979 0 : if (updated_xmin)
980 : {
981 0 : SpinLockAcquire(&MyReplicationSlot->mutex);
982 0 : MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
983 0 : SpinLockRelease(&MyReplicationSlot->mutex);
984 :
985 0 : ReplicationSlotsComputeRequiredXmin(false);
986 0 : ReplicationSlotsComputeRequiredLSN();
987 : }
988 : }
989 : else
990 : {
991 0 : SpinLockAcquire(&MyReplicationSlot->mutex);
992 0 : MyReplicationSlot->data.confirmed_flush = lsn;
993 0 : SpinLockRelease(&MyReplicationSlot->mutex);
994 : }
995 0 : }
|