LCOV - code coverage report
Current view: top level - src/backend/replication/logical - decode.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 0 321 0.0 %
Date: 2017-09-29 13:40:31 Functions: 0 16 0.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /* -------------------------------------------------------------------------
       2             :  *
       3             :  * decode.c
       4             :  *      This module decodes WAL records read using xlogreader.h's APIs for the
       5             :  *      purpose of logical decoding by passing information to the
       6             :  *      reorderbuffer module (containing the actual changes) and to the
       7             :  *      snapbuild module to build a fitting catalog snapshot (to be able to
       8             :  *      properly decode the changes in the reorderbuffer).
       9             :  *
      10             :  * NOTE:
      11             :  *      This basically tries to handle all low level xlog stuff for
      12             :  *      reorderbuffer.c and snapbuild.c. There's some minor leakage where a
      13             :  *      specific record's struct is used to pass data along, but those just
      14             :  *      happen to contain the right amount of data in a convenient
      15             :  *      format. There isn't and shouldn't be much intelligence about the
      16             :  *      contents of records in here except turning them into a more usable
      17             :  *      format.
      18             :  *
      19             :  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
      20             :  * Portions Copyright (c) 1994, Regents of the University of California
      21             :  *
      22             :  * IDENTIFICATION
      23             :  *    src/backend/replication/logical/decode.c
      24             :  *
      25             :  * -------------------------------------------------------------------------
      26             :  */
      27             : #include "postgres.h"
      28             : 
      29             : #include "access/heapam.h"
      30             : #include "access/heapam_xlog.h"
      31             : #include "access/transam.h"
      32             : #include "access/xact.h"
      33             : #include "access/xlog_internal.h"
      34             : #include "access/xlogutils.h"
      35             : #include "access/xlogreader.h"
      36             : #include "access/xlogrecord.h"
      37             : 
      38             : #include "catalog/pg_control.h"
      39             : 
      40             : #include "replication/decode.h"
      41             : #include "replication/logical.h"
      42             : #include "replication/message.h"
      43             : #include "replication/reorderbuffer.h"
      44             : #include "replication/origin.h"
      45             : #include "replication/snapbuild.h"
      46             : 
      47             : #include "storage/standby.h"
      48             : 
      49             : typedef struct XLogRecordBuffer
      50             : {
      51             :     XLogRecPtr  origptr;
      52             :     XLogRecPtr  endptr;
      53             :     XLogReaderState *record;
      54             : } XLogRecordBuffer;
      55             : 
      56             : /* RMGR Handlers */
      57             : static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      58             : static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      59             : static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      60             : static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      61             : static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      62             : static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      63             : 
      64             : /* individual record(group)'s handlers */
      65             : static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      66             : static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      67             : static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      68             : static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      69             : static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      70             : 
      71             : static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      72             :              xl_xact_parsed_commit *parsed, TransactionId xid);
      73             : static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      74             :             xl_xact_parsed_abort *parsed, TransactionId xid);
      75             : 
      76             : /* common function to decode tuples */
      77             : static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
      78             : 
      79             : /*
      80             :  * Take every XLogReadRecord()ed record and perform the actions required to
      81             :  * decode it using the output plugin already setup in the logical decoding
      82             :  * context.
      83             :  *
      84             :  * NB: Note that every record's xid needs to be processed by reorderbuffer
      85             :  * (xids contained in the content of records are not relevant for this rule).
      86             :  * That means that for records which'd otherwise not go through the
      87             :  * reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to
      88             :  * call ReorderBufferProcessXid for each record type by default, because
      89             :  * e.g. empty xacts can be handled more efficiently if there's no previous
      90             :  * state for them.
      91             :  */
      92             : void
      93           0 : LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
      94             : {
      95             :     XLogRecordBuffer buf;
      96             : 
      97           0 :     buf.origptr = ctx->reader->ReadRecPtr;
      98           0 :     buf.endptr = ctx->reader->EndRecPtr;
      99           0 :     buf.record = record;
     100             : 
     101             :     /* cast so we get a warning when new rmgrs are added */
     102           0 :     switch ((RmgrIds) XLogRecGetRmid(record))
     103             :     {
     104             :             /*
     105             :              * Rmgrs we care about for logical decoding. Add new rmgrs in
     106             :              * rmgrlist.h's order.
     107             :              */
     108             :         case RM_XLOG_ID:
     109           0 :             DecodeXLogOp(ctx, &buf);
     110           0 :             break;
     111             : 
     112             :         case RM_XACT_ID:
     113           0 :             DecodeXactOp(ctx, &buf);
     114           0 :             break;
     115             : 
     116             :         case RM_STANDBY_ID:
     117           0 :             DecodeStandbyOp(ctx, &buf);
     118           0 :             break;
     119             : 
     120             :         case RM_HEAP2_ID:
     121           0 :             DecodeHeap2Op(ctx, &buf);
     122           0 :             break;
     123             : 
     124             :         case RM_HEAP_ID:
     125           0 :             DecodeHeapOp(ctx, &buf);
     126           0 :             break;
     127             : 
     128             :         case RM_LOGICALMSG_ID:
     129           0 :             DecodeLogicalMsgOp(ctx, &buf);
     130           0 :             break;
     131             : 
     132             :             /*
     133             :              * Rmgrs irrelevant for logical decoding; they describe stuff not
     134             :              * represented in logical decoding. Add new rmgrs in rmgrlist.h's
     135             :              * order.
     136             :              */
     137             :         case RM_SMGR_ID:
     138             :         case RM_CLOG_ID:
     139             :         case RM_DBASE_ID:
     140             :         case RM_TBLSPC_ID:
     141             :         case RM_MULTIXACT_ID:
     142             :         case RM_RELMAP_ID:
     143             :         case RM_BTREE_ID:
     144             :         case RM_HASH_ID:
     145             :         case RM_GIN_ID:
     146             :         case RM_GIST_ID:
     147             :         case RM_SEQ_ID:
     148             :         case RM_SPGIST_ID:
     149             :         case RM_BRIN_ID:
     150             :         case RM_COMMIT_TS_ID:
     151             :         case RM_REPLORIGIN_ID:
     152             :         case RM_GENERIC_ID:
     153             :             /* just deal with xid, and done */
     154           0 :             ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
     155             :                                     buf.origptr);
     156           0 :             break;
     157             :         case RM_NEXT_ID:
     158           0 :             elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
     159             :     }
     160           0 : }
     161             : 
     162             : /*
     163             :  * Handle rmgr XLOG_ID records for DecodeRecordIntoReorderBuffer().
     164             :  */
     165             : static void
     166           0 : DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     167             : {
     168           0 :     SnapBuild  *builder = ctx->snapshot_builder;
     169           0 :     uint8       info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
     170             : 
     171           0 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record),
     172             :                             buf->origptr);
     173             : 
     174           0 :     switch (info)
     175             :     {
     176             :             /* this is also used in END_OF_RECOVERY checkpoints */
     177             :         case XLOG_CHECKPOINT_SHUTDOWN:
     178             :         case XLOG_END_OF_RECOVERY:
     179           0 :             SnapBuildSerializationPoint(builder, buf->origptr);
     180             : 
     181           0 :             break;
     182             :         case XLOG_CHECKPOINT_ONLINE:
     183             : 
     184             :             /*
     185             :              * a RUNNING_XACTS record will have been logged near to this, we
     186             :              * can restart from there.
     187             :              */
     188           0 :             break;
     189             :         case XLOG_NOOP:
     190             :         case XLOG_NEXTOID:
     191             :         case XLOG_SWITCH:
     192             :         case XLOG_BACKUP_END:
     193             :         case XLOG_PARAMETER_CHANGE:
     194             :         case XLOG_RESTORE_POINT:
     195             :         case XLOG_FPW_CHANGE:
     196             :         case XLOG_FPI_FOR_HINT:
     197             :         case XLOG_FPI:
     198           0 :             break;
     199             :         default:
     200           0 :             elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info);
     201             :     }
     202           0 : }
     203             : 
     204             : /*
     205             :  * Handle rmgr XACT_ID records for DecodeRecordIntoReorderBuffer().
     206             :  */
     207             : static void
     208           0 : DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     209             : {
     210           0 :     SnapBuild  *builder = ctx->snapshot_builder;
     211           0 :     ReorderBuffer *reorder = ctx->reorder;
     212           0 :     XLogReaderState *r = buf->record;
     213           0 :     uint8       info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
     214             : 
     215             :     /*
     216             :      * No point in doing anything yet, data could not be decoded anyway. It's
     217             :      * ok not to call ReorderBufferProcessXid() in that case, except in the
     218             :      * assignment case there'll not be any later records with the same xid;
     219             :      * and in the assignment case we'll not decode those xacts.
     220             :      */
     221           0 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     222           0 :         return;
     223             : 
     224           0 :     switch (info)
     225             :     {
     226             :         case XLOG_XACT_COMMIT:
     227             :         case XLOG_XACT_COMMIT_PREPARED:
     228             :             {
     229             :                 xl_xact_commit *xlrec;
     230             :                 xl_xact_parsed_commit parsed;
     231             :                 TransactionId xid;
     232             : 
     233           0 :                 xlrec = (xl_xact_commit *) XLogRecGetData(r);
     234           0 :                 ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
     235             : 
     236           0 :                 if (!TransactionIdIsValid(parsed.twophase_xid))
     237           0 :                     xid = XLogRecGetXid(r);
     238             :                 else
     239           0 :                     xid = parsed.twophase_xid;
     240             : 
     241           0 :                 DecodeCommit(ctx, buf, &parsed, xid);
     242           0 :                 break;
     243             :             }
     244             :         case XLOG_XACT_ABORT:
     245             :         case XLOG_XACT_ABORT_PREPARED:
     246             :             {
     247             :                 xl_xact_abort *xlrec;
     248             :                 xl_xact_parsed_abort parsed;
     249             :                 TransactionId xid;
     250             : 
     251           0 :                 xlrec = (xl_xact_abort *) XLogRecGetData(r);
     252           0 :                 ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
     253             : 
     254           0 :                 if (!TransactionIdIsValid(parsed.twophase_xid))
     255           0 :                     xid = XLogRecGetXid(r);
     256             :                 else
     257           0 :                     xid = parsed.twophase_xid;
     258             : 
     259           0 :                 DecodeAbort(ctx, buf, &parsed, xid);
     260           0 :                 break;
     261             :             }
     262             :         case XLOG_XACT_ASSIGNMENT:
     263             :             {
     264             :                 xl_xact_assignment *xlrec;
     265             :                 int         i;
     266             :                 TransactionId *sub_xid;
     267             : 
     268           0 :                 xlrec = (xl_xact_assignment *) XLogRecGetData(r);
     269             : 
     270           0 :                 sub_xid = &xlrec->xsub[0];
     271             : 
     272           0 :                 for (i = 0; i < xlrec->nsubxacts; i++)
     273             :                 {
     274           0 :                     ReorderBufferAssignChild(reorder, xlrec->xtop,
     275           0 :                                              *(sub_xid++), buf->origptr);
     276             :                 }
     277           0 :                 break;
     278             :             }
     279             :         case XLOG_XACT_PREPARE:
     280             : 
     281             :             /*
     282             :              * Currently decoding ignores PREPARE TRANSACTION and will just
     283             :              * decode the transaction when the COMMIT PREPARED is sent or
     284             :              * throw away the transaction's contents when a ROLLBACK PREPARED
     285             :              * is received. In the future we could add code to expose prepared
     286             :              * transactions in the changestream allowing for a kind of
     287             :              * distributed 2PC.
     288             :              */
     289           0 :             ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
     290           0 :             break;
     291             :         default:
     292           0 :             elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
     293             :     }
     294             : }
     295             : 
     296             : /*
     297             :  * Handle rmgr STANDBY_ID records for DecodeRecordIntoReorderBuffer().
     298             :  */
     299             : static void
     300           0 : DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     301             : {
     302           0 :     SnapBuild  *builder = ctx->snapshot_builder;
     303           0 :     XLogReaderState *r = buf->record;
     304           0 :     uint8       info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
     305             : 
     306           0 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
     307             : 
     308           0 :     switch (info)
     309             :     {
     310             :         case XLOG_RUNNING_XACTS:
     311             :             {
     312           0 :                 xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r);
     313             : 
     314           0 :                 SnapBuildProcessRunningXacts(builder, buf->origptr, running);
     315             : 
     316             :                 /*
     317             :                  * Abort all transactions that we keep track of, that are
     318             :                  * older than the record's oldestRunningXid. This is the most
     319             :                  * convenient spot for doing so since, in contrast to shutdown
     320             :                  * or end-of-recovery checkpoints, we have information about
     321             :                  * all running transactions which includes prepared ones,
     322             :                  * while shutdown checkpoints just know that no non-prepared
     323             :                  * transactions are in progress.
     324             :                  */
     325           0 :                 ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid);
     326             :             }
     327           0 :             break;
     328             :         case XLOG_STANDBY_LOCK:
     329           0 :             break;
     330             :         case XLOG_INVALIDATIONS:
     331             :             {
     332           0 :                 xl_invalidations *invalidations =
     333             :                 (xl_invalidations *) XLogRecGetData(r);
     334             : 
     335           0 :                 ReorderBufferImmediateInvalidation(
     336           0 :                                                    ctx->reorder, invalidations->nmsgs, invalidations->msgs);
     337             :             }
     338           0 :             break;
     339             :         default:
     340           0 :             elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
     341             :     }
     342           0 : }
     343             : 
     344             : /*
     345             :  * Handle rmgr HEAP2_ID records for DecodeRecordIntoReorderBuffer().
     346             :  */
     347             : static void
     348           0 : DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     349             : {
     350           0 :     uint8       info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
     351           0 :     TransactionId xid = XLogRecGetXid(buf->record);
     352           0 :     SnapBuild  *builder = ctx->snapshot_builder;
     353             : 
     354           0 :     ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
     355             : 
     356             :     /* no point in doing anything yet */
     357           0 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     358           0 :         return;
     359             : 
     360           0 :     switch (info)
     361             :     {
     362             :         case XLOG_HEAP2_MULTI_INSERT:
     363           0 :             if (SnapBuildProcessChange(builder, xid, buf->origptr))
     364           0 :                 DecodeMultiInsert(ctx, buf);
     365           0 :             break;
     366             :         case XLOG_HEAP2_NEW_CID:
     367             :             {
     368             :                 xl_heap_new_cid *xlrec;
     369             : 
     370           0 :                 xlrec = (xl_heap_new_cid *) XLogRecGetData(buf->record);
     371           0 :                 SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec);
     372             : 
     373           0 :                 break;
     374             :             }
     375             :         case XLOG_HEAP2_REWRITE:
     376             : 
     377             :             /*
     378             :              * Although these records only exist to serve the needs of logical
     379             :              * decoding, all the work happens as part of crash or archive
     380             :              * recovery, so we don't need to do anything here.
     381             :              */
     382           0 :             break;
     383             : 
     384             :             /*
     385             :              * Everything else here is just low level physical stuff we're not
     386             :              * interested in.
     387             :              */
     388             :         case XLOG_HEAP2_FREEZE_PAGE:
     389             :         case XLOG_HEAP2_CLEAN:
     390             :         case XLOG_HEAP2_CLEANUP_INFO:
     391             :         case XLOG_HEAP2_VISIBLE:
     392             :         case XLOG_HEAP2_LOCK_UPDATED:
     393           0 :             break;
     394             :         default:
     395           0 :             elog(ERROR, "unexpected RM_HEAP2_ID record type: %u", info);
     396             :     }
     397             : }
     398             : 
     399             : /*
     400             :  * Handle rmgr HEAP_ID records for DecodeRecordIntoReorderBuffer().
     401             :  */
     402             : static void
     403           0 : DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     404             : {
     405           0 :     uint8       info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
     406           0 :     TransactionId xid = XLogRecGetXid(buf->record);
     407           0 :     SnapBuild  *builder = ctx->snapshot_builder;
     408             : 
     409           0 :     ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
     410             : 
     411             :     /* no point in doing anything yet */
     412           0 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     413           0 :         return;
     414             : 
     415           0 :     switch (info)
     416             :     {
     417             :         case XLOG_HEAP_INSERT:
     418           0 :             if (SnapBuildProcessChange(builder, xid, buf->origptr))
     419           0 :                 DecodeInsert(ctx, buf);
     420           0 :             break;
     421             : 
     422             :             /*
     423             :              * Treat HOT update as normal updates. There is no useful
     424             :              * information in the fact that we could make it a HOT update
     425             :              * locally and the WAL layout is compatible.
     426             :              */
     427             :         case XLOG_HEAP_HOT_UPDATE:
     428             :         case XLOG_HEAP_UPDATE:
     429           0 :             if (SnapBuildProcessChange(builder, xid, buf->origptr))
     430           0 :                 DecodeUpdate(ctx, buf);
     431           0 :             break;
     432             : 
     433             :         case XLOG_HEAP_DELETE:
     434           0 :             if (SnapBuildProcessChange(builder, xid, buf->origptr))
     435           0 :                 DecodeDelete(ctx, buf);
     436           0 :             break;
     437             : 
     438             :         case XLOG_HEAP_INPLACE:
     439             : 
     440             :             /*
     441             :              * Inplace updates are only ever performed on catalog tuples and
     442             :              * can, per definition, not change tuple visibility.  Since we
     443             :              * don't decode catalog tuples, we're not interested in the
     444             :              * record's contents.
     445             :              *
     446             :              * In-place updates can be used either by XID-bearing transactions
     447             :              * (e.g.  in CREATE INDEX CONCURRENTLY) or by XID-less
     448             :              * transactions (e.g.  VACUUM).  In the former case, the commit
     449             :              * record will include cache invalidations, so we mark the
     450             :              * transaction as catalog modifying here. Currently that's
     451             :              * redundant because the commit will do that as well, but once we
     452             :              * support decoding in-progress relations, this will be important.
     453             :              */
     454           0 :             if (!TransactionIdIsValid(xid))
     455           0 :                 break;
     456             : 
     457           0 :             SnapBuildProcessChange(builder, xid, buf->origptr);
     458           0 :             ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
     459           0 :             break;
     460             : 
     461             :         case XLOG_HEAP_CONFIRM:
     462           0 :             if (SnapBuildProcessChange(builder, xid, buf->origptr))
     463           0 :                 DecodeSpecConfirm(ctx, buf);
     464           0 :             break;
     465             : 
     466             :         case XLOG_HEAP_LOCK:
     467             :             /* we don't care about row level locks for now */
     468           0 :             break;
     469             : 
     470             :         default:
     471           0 :             elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info);
     472             :             break;
     473             :     }
     474             : }
     475             : 
     476             : static inline bool
     477           0 : FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
     478             : {
     479           0 :     if (ctx->callbacks.filter_by_origin_cb == NULL)
     480           0 :         return false;
     481             : 
     482           0 :     return filter_by_origin_cb_wrapper(ctx, origin_id);
     483             : }
     484             : 
     485             : /*
     486             :  * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
     487             :  */
     488             : static void
     489           0 : DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     490             : {
     491           0 :     SnapBuild  *builder = ctx->snapshot_builder;
     492           0 :     XLogReaderState *r = buf->record;
     493           0 :     TransactionId xid = XLogRecGetXid(r);
     494           0 :     uint8       info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
     495           0 :     RepOriginId origin_id = XLogRecGetOrigin(r);
     496             :     Snapshot    snapshot;
     497             :     xl_logical_message *message;
     498             : 
     499           0 :     if (info != XLOG_LOGICAL_MESSAGE)
     500           0 :         elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
     501             : 
     502           0 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
     503             : 
     504             :     /* No point in doing anything yet. */
     505           0 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     506           0 :         return;
     507             : 
     508           0 :     message = (xl_logical_message *) XLogRecGetData(r);
     509             : 
     510           0 :     if (message->dbId != ctx->slot->data.database ||
     511           0 :         FilterByOrigin(ctx, origin_id))
     512           0 :         return;
     513             : 
     514           0 :     if (message->transactional &&
     515           0 :         !SnapBuildProcessChange(builder, xid, buf->origptr))
     516           0 :         return;
     517           0 :     else if (!message->transactional &&
     518           0 :              (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
     519           0 :               SnapBuildXactNeedsSkip(builder, buf->origptr)))
     520           0 :         return;
     521             : 
     522           0 :     snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
     523           0 :     ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr,
     524           0 :                               message->transactional,
     525           0 :                               message->message, /* first part of message is
     526             :                                                  * prefix */
     527             :                               message->message_size,
     528           0 :                               message->message + message->prefix_size);
     529             : }
     530             : 
     531             : /*
     532             :  * Consolidated commit record handling between the different form of commit
     533             :  * records.
     534             :  */
     535             : static void
     536           0 : DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     537             :              xl_xact_parsed_commit *parsed, TransactionId xid)
     538             : {
     539           0 :     XLogRecPtr  origin_lsn = InvalidXLogRecPtr;
     540           0 :     TimestampTz commit_time = parsed->xact_time;
     541           0 :     RepOriginId origin_id = XLogRecGetOrigin(buf->record);
     542             :     int         i;
     543             : 
     544           0 :     if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
     545             :     {
     546           0 :         origin_lsn = parsed->origin_lsn;
     547           0 :         commit_time = parsed->origin_timestamp;
     548             :     }
     549             : 
     550             :     /*
     551             :      * Process invalidation messages, even if we're not interested in the
     552             :      * transaction's contents, since the various caches need to always be
     553             :      * consistent.
     554             :      */
     555           0 :     if (parsed->nmsgs > 0)
     556             :     {
     557           0 :         ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
     558           0 :                                       parsed->nmsgs, parsed->msgs);
     559           0 :         ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
     560             :     }
     561             : 
     562           0 :     SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
     563             :                        parsed->nsubxacts, parsed->subxacts);
     564             : 
     565             :     /* ----
     566             :      * Check whether we are interested in this specific transaction, and tell
     567             :      * the reorderbuffer to forget the content of the (sub-)transactions
     568             :      * if not.
     569             :      *
     570             :      * There can be several reasons we might not be interested in this
     571             :      * transaction:
     572             :      * 1) We might not be interested in decoding transactions up to this
     573             :      *    LSN. This can happen because we previously decoded it and now just
     574             :      *    are restarting or if we haven't assembled a consistent snapshot yet.
     575             :      * 2) The transaction happened in another database.
     576             :      * 3) The output plugin is not interested in the origin.
     577             :      *
     578             :      * We can't just use ReorderBufferAbort() here, because we need to execute
     579             :      * the transaction's invalidations.  This currently won't be needed if
     580             :      * we're just skipping over the transaction because currently we only do
     581             :      * so during startup, to get to the first transaction the client needs. As
     582             :      * we have reset the catalog caches before starting to read WAL, and we
     583             :      * haven't yet touched any catalogs, there can't be anything to invalidate.
     584             :      * But if we're "forgetting" this commit because it's it happened in
     585             :      * another database, the invalidations might be important, because they
     586             :      * could be for shared catalogs and we might have loaded data into the
     587             :      * relevant syscaches.
     588             :      * ---
     589             :      */
     590           0 :     if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
     591           0 :         (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
     592           0 :         FilterByOrigin(ctx, origin_id))
     593             :     {
     594           0 :         for (i = 0; i < parsed->nsubxacts; i++)
     595             :         {
     596           0 :             ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
     597             :         }
     598           0 :         ReorderBufferForget(ctx->reorder, xid, buf->origptr);
     599             : 
     600           0 :         return;
     601             :     }
     602             : 
     603             :     /* tell the reorderbuffer about the surviving subtransactions */
     604           0 :     for (i = 0; i < parsed->nsubxacts; i++)
     605             :     {
     606           0 :         ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
     607             :                                  buf->origptr, buf->endptr);
     608             :     }
     609             : 
     610             :     /* replay actions of all transaction + subtransactions in order */
     611           0 :     ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
     612             :                         commit_time, origin_id, origin_lsn);
     613             : }
     614             : 
     615             : /*
     616             :  * Get the data from the various forms of abort records and pass it on to
     617             :  * snapbuild.c and reorderbuffer.c
     618             :  */
     619             : static void
     620           0 : DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     621             :             xl_xact_parsed_abort *parsed, TransactionId xid)
     622             : {
     623             :     int         i;
     624             : 
     625           0 :     for (i = 0; i < parsed->nsubxacts; i++)
     626             :     {
     627           0 :         ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
     628           0 :                            buf->record->EndRecPtr);
     629             :     }
     630             : 
     631           0 :     ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr);
     632           0 : }
     633             : 
     634             : /*
     635             :  * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs.
     636             :  *
     637             :  * Deletes can contain the new tuple.
     638             :  */
     639             : static void
     640           0 : DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     641             : {
     642           0 :     XLogReaderState *r = buf->record;
     643             :     xl_heap_insert *xlrec;
     644             :     ReorderBufferChange *change;
     645             :     RelFileNode target_node;
     646             : 
     647           0 :     xlrec = (xl_heap_insert *) XLogRecGetData(r);
     648             : 
     649             :     /* only interested in our database */
     650           0 :     XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
     651           0 :     if (target_node.dbNode != ctx->slot->data.database)
     652           0 :         return;
     653             : 
     654             :     /* output plugin doesn't look for this origin, no need to queue */
     655           0 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     656           0 :         return;
     657             : 
     658           0 :     change = ReorderBufferGetChange(ctx->reorder);
     659           0 :     if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
     660           0 :         change->action = REORDER_BUFFER_CHANGE_INSERT;
     661             :     else
     662           0 :         change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
     663           0 :     change->origin_id = XLogRecGetOrigin(r);
     664             : 
     665           0 :     memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
     666             : 
     667           0 :     if (xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE)
     668             :     {
     669             :         Size        datalen;
     670           0 :         char       *tupledata = XLogRecGetBlockData(r, 0, &datalen);
     671           0 :         Size        tuplelen = datalen - SizeOfHeapHeader;
     672             : 
     673           0 :         change->data.tp.newtuple =
     674           0 :             ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
     675             : 
     676           0 :         DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
     677             :     }
     678             : 
     679           0 :     change->data.tp.clear_toast_afterwards = true;
     680             : 
     681           0 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
     682             : }
     683             : 
     684             : /*
     685             :  * Parse XLOG_HEAP_UPDATE and XLOG_HEAP_HOT_UPDATE, which have the same layout
     686             :  * in the record, from wal into proper tuplebufs.
     687             :  *
     688             :  * Updates can possibly contain a new tuple and the old primary key.
     689             :  */
     690             : static void
     691           0 : DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     692             : {
     693           0 :     XLogReaderState *r = buf->record;
     694             :     xl_heap_update *xlrec;
     695             :     ReorderBufferChange *change;
     696             :     char       *data;
     697             :     RelFileNode target_node;
     698             : 
     699           0 :     xlrec = (xl_heap_update *) XLogRecGetData(r);
     700             : 
     701             :     /* only interested in our database */
     702           0 :     XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
     703           0 :     if (target_node.dbNode != ctx->slot->data.database)
     704           0 :         return;
     705             : 
     706             :     /* output plugin doesn't look for this origin, no need to queue */
     707           0 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     708           0 :         return;
     709             : 
     710           0 :     change = ReorderBufferGetChange(ctx->reorder);
     711           0 :     change->action = REORDER_BUFFER_CHANGE_UPDATE;
     712           0 :     change->origin_id = XLogRecGetOrigin(r);
     713           0 :     memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
     714             : 
     715           0 :     if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE)
     716             :     {
     717             :         Size        datalen;
     718             :         Size        tuplelen;
     719             : 
     720           0 :         data = XLogRecGetBlockData(r, 0, &datalen);
     721             : 
     722           0 :         tuplelen = datalen - SizeOfHeapHeader;
     723             : 
     724           0 :         change->data.tp.newtuple =
     725           0 :             ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
     726             : 
     727           0 :         DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
     728             :     }
     729             : 
     730           0 :     if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD)
     731             :     {
     732             :         Size        datalen;
     733             :         Size        tuplelen;
     734             : 
     735             :         /* caution, remaining data in record is not aligned */
     736           0 :         data = XLogRecGetData(r) + SizeOfHeapUpdate;
     737           0 :         datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate;
     738           0 :         tuplelen = datalen - SizeOfHeapHeader;
     739             : 
     740           0 :         change->data.tp.oldtuple =
     741           0 :             ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
     742             : 
     743           0 :         DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
     744             :     }
     745             : 
     746           0 :     change->data.tp.clear_toast_afterwards = true;
     747             : 
     748           0 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
     749             : }
     750             : 
     751             : /*
     752             :  * Parse XLOG_HEAP_DELETE from wal into proper tuplebufs.
     753             :  *
     754             :  * Deletes can possibly contain the old primary key.
     755             :  */
     756             : static void
     757           0 : DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     758             : {
     759           0 :     XLogReaderState *r = buf->record;
     760             :     xl_heap_delete *xlrec;
     761             :     ReorderBufferChange *change;
     762             :     RelFileNode target_node;
     763             : 
     764           0 :     xlrec = (xl_heap_delete *) XLogRecGetData(r);
     765             : 
     766             :     /* only interested in our database */
     767           0 :     XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
     768           0 :     if (target_node.dbNode != ctx->slot->data.database)
     769           0 :         return;
     770             : 
     771             :     /*
     772             :      * Super deletions are irrelevant for logical decoding, it's driven by the
     773             :      * confirmation records.
     774             :      */
     775           0 :     if (xlrec->flags & XLH_DELETE_IS_SUPER)
     776           0 :         return;
     777             : 
     778             :     /* output plugin doesn't look for this origin, no need to queue */
     779           0 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     780           0 :         return;
     781             : 
     782           0 :     change = ReorderBufferGetChange(ctx->reorder);
     783           0 :     change->action = REORDER_BUFFER_CHANGE_DELETE;
     784           0 :     change->origin_id = XLogRecGetOrigin(r);
     785             : 
     786           0 :     memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
     787             : 
     788             :     /* old primary key stored */
     789           0 :     if (xlrec->flags & XLH_DELETE_CONTAINS_OLD)
     790             :     {
     791           0 :         Size        datalen = XLogRecGetDataLen(r) - SizeOfHeapDelete;
     792           0 :         Size        tuplelen = datalen - SizeOfHeapHeader;
     793             : 
     794           0 :         Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader));
     795             : 
     796           0 :         change->data.tp.oldtuple =
     797           0 :             ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
     798             : 
     799           0 :         DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
     800             :                         datalen, change->data.tp.oldtuple);
     801             :     }
     802             : 
     803           0 :     change->data.tp.clear_toast_afterwards = true;
     804             : 
     805           0 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
     806             : }
     807             : 
     808             : /*
     809             :  * Decode XLOG_HEAP2_MULTI_INSERT_insert record into multiple tuplebufs.
     810             :  *
     811             :  * Currently MULTI_INSERT will always contain the full tuples.
     812             :  */
     813             : static void
     814           0 : DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     815             : {
     816           0 :     XLogReaderState *r = buf->record;
     817             :     xl_heap_multi_insert *xlrec;
     818             :     int         i;
     819             :     char       *data;
     820             :     char       *tupledata;
     821             :     Size        tuplelen;
     822             :     RelFileNode rnode;
     823             : 
     824           0 :     xlrec = (xl_heap_multi_insert *) XLogRecGetData(r);
     825             : 
     826             :     /* only interested in our database */
     827           0 :     XLogRecGetBlockTag(r, 0, &rnode, NULL, NULL);
     828           0 :     if (rnode.dbNode != ctx->slot->data.database)
     829           0 :         return;
     830             : 
     831             :     /* output plugin doesn't look for this origin, no need to queue */
     832           0 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     833           0 :         return;
     834             : 
     835           0 :     tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
     836             : 
     837           0 :     data = tupledata;
     838           0 :     for (i = 0; i < xlrec->ntuples; i++)
     839             :     {
     840             :         ReorderBufferChange *change;
     841             :         xl_multi_insert_tuple *xlhdr;
     842             :         int         datalen;
     843             :         ReorderBufferTupleBuf *tuple;
     844             : 
     845           0 :         change = ReorderBufferGetChange(ctx->reorder);
     846           0 :         change->action = REORDER_BUFFER_CHANGE_INSERT;
     847           0 :         change->origin_id = XLogRecGetOrigin(r);
     848             : 
     849           0 :         memcpy(&change->data.tp.relnode, &rnode, sizeof(RelFileNode));
     850             : 
     851             :         /*
     852             :          * CONTAINS_NEW_TUPLE will always be set currently as multi_insert
     853             :          * isn't used for catalogs, but better be future proof.
     854             :          *
     855             :          * We decode the tuple in pretty much the same way as DecodeXLogTuple,
     856             :          * but since the layout is slightly different, we can't use it here.
     857             :          */
     858           0 :         if (xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE)
     859             :         {
     860             :             HeapTupleHeader header;
     861             : 
     862           0 :             xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
     863           0 :             data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
     864           0 :             datalen = xlhdr->datalen;
     865             : 
     866           0 :             change->data.tp.newtuple =
     867           0 :                 ReorderBufferGetTupleBuf(ctx->reorder, datalen);
     868             : 
     869           0 :             tuple = change->data.tp.newtuple;
     870           0 :             header = tuple->tuple.t_data;
     871             : 
     872             :             /* not a disk based tuple */
     873           0 :             ItemPointerSetInvalid(&tuple->tuple.t_self);
     874             : 
     875             :             /*
     876             :              * We can only figure this out after reassembling the
     877             :              * transactions.
     878             :              */
     879           0 :             tuple->tuple.t_tableOid = InvalidOid;
     880             : 
     881           0 :             tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
     882             : 
     883           0 :             memset(header, 0, SizeofHeapTupleHeader);
     884             : 
     885           0 :             memcpy((char *) tuple->tuple.t_data + SizeofHeapTupleHeader,
     886             :                    (char *) data,
     887             :                    datalen);
     888           0 :             data += datalen;
     889             : 
     890           0 :             header->t_infomask = xlhdr->t_infomask;
     891           0 :             header->t_infomask2 = xlhdr->t_infomask2;
     892           0 :             header->t_hoff = xlhdr->t_hoff;
     893             :         }
     894             : 
     895             :         /*
     896             :          * Reset toast reassembly state only after the last row in the last
     897             :          * xl_multi_insert_tuple record emitted by one heap_multi_insert()
     898             :          * call.
     899             :          */
     900           0 :         if (xlrec->flags & XLH_INSERT_LAST_IN_MULTI &&
     901           0 :             (i + 1) == xlrec->ntuples)
     902           0 :             change->data.tp.clear_toast_afterwards = true;
     903             :         else
     904           0 :             change->data.tp.clear_toast_afterwards = false;
     905             : 
     906           0 :         ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
     907             :                                  buf->origptr, change);
     908             :     }
     909           0 :     Assert(data == tupledata + tuplelen);
     910             : }
     911             : 
     912             : /*
     913             :  * Parse XLOG_HEAP_CONFIRM from wal into a confirmation change.
     914             :  *
     915             :  * This is pretty trivial, all the state essentially already setup by the
     916             :  * speculative insertion.
     917             :  */
     918             : static void
     919           0 : DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     920             : {
     921           0 :     XLogReaderState *r = buf->record;
     922             :     ReorderBufferChange *change;
     923             :     RelFileNode target_node;
     924             : 
     925             :     /* only interested in our database */
     926           0 :     XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
     927           0 :     if (target_node.dbNode != ctx->slot->data.database)
     928           0 :         return;
     929             : 
     930             :     /* output plugin doesn't look for this origin, no need to queue */
     931           0 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     932           0 :         return;
     933             : 
     934           0 :     change = ReorderBufferGetChange(ctx->reorder);
     935           0 :     change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM;
     936           0 :     change->origin_id = XLogRecGetOrigin(r);
     937             : 
     938           0 :     memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
     939             : 
     940           0 :     change->data.tp.clear_toast_afterwards = true;
     941             : 
     942           0 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
     943             : }
     944             : 
     945             : 
     946             : /*
     947             :  * Read a HeapTuple as WAL logged by heap_insert, heap_update and heap_delete
     948             :  * (but not by heap_multi_insert) into a tuplebuf.
     949             :  *
     950             :  * The size 'len' and the pointer 'data' in the record need to be
     951             :  * computed outside as they are record specific.
     952             :  */
     953             : static void
     954           0 : DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
     955             : {
     956             :     xl_heap_header xlhdr;
     957           0 :     int         datalen = len - SizeOfHeapHeader;
     958             :     HeapTupleHeader header;
     959             : 
     960           0 :     Assert(datalen >= 0);
     961             : 
     962           0 :     tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
     963           0 :     header = tuple->tuple.t_data;
     964             : 
     965             :     /* not a disk based tuple */
     966           0 :     ItemPointerSetInvalid(&tuple->tuple.t_self);
     967             : 
     968             :     /* we can only figure this out after reassembling the transactions */
     969           0 :     tuple->tuple.t_tableOid = InvalidOid;
     970             : 
     971             :     /* data is not stored aligned, copy to aligned storage */
     972           0 :     memcpy((char *) &xlhdr,
     973             :            data,
     974             :            SizeOfHeapHeader);
     975             : 
     976           0 :     memset(header, 0, SizeofHeapTupleHeader);
     977             : 
     978           0 :     memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
     979           0 :            data + SizeOfHeapHeader,
     980             :            datalen);
     981             : 
     982           0 :     header->t_infomask = xlhdr.t_infomask;
     983           0 :     header->t_infomask2 = xlhdr.t_infomask2;
     984           0 :     header->t_hoff = xlhdr.t_hoff;
     985           0 : }

Generated by: LCOV version 1.11