LCOV - code coverage report
Current view: top level - src/backend/replication/logical - reorderbuffer.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 8 1102 0.7 %
Date: 2017-09-29 13:40:31 Functions: 1 56 1.8 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * reorderbuffer.c
       4             :  *    PostgreSQL logical replay/reorder buffer management
       5             :  *
       6             :  *
       7             :  * Copyright (c) 2012-2017, PostgreSQL Global Development Group
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/replication/reorderbuffer.c
      12             :  *
      13             :  * NOTES
      14             :  *    This module gets handed individual pieces of transactions in the order
      15             :  *    they are written to the WAL and is responsible to reassemble them into
      16             :  *    toplevel transaction sized pieces. When a transaction is completely
      17             :  *    reassembled - signalled by reading the transaction commit record - it
      18             :  *    will then call the output plugin (c.f. ReorderBufferCommit()) with the
      19             :  *    individual changes. The output plugins rely on snapshots built by
      20             :  *    snapbuild.c which hands them to us.
      21             :  *
      22             :  *    Transactions and subtransactions/savepoints in postgres are not
      23             :  *    immediately linked to each other from outside the performing
      24             :  *    backend. Only at commit/abort (or special xact_assignment records) they
      25             :  *    are linked together. Which means that we will have to splice together a
      26             :  *    toplevel transaction from its subtransactions. To do that efficiently we
      27             :  *    build a binary heap indexed by the smallest current lsn of the individual
      28             :  *    subtransactions' changestreams. As the individual streams are inherently
      29             :  *    ordered by LSN - since that is where we build them from - the transaction
      30             :  *    can easily be reassembled by always using the subtransaction with the
      31             :  *    smallest current LSN from the heap.
      32             :  *
      33             :  *    In order to cope with large transactions - which can be several times as
      34             :  *    big as the available memory - this module supports spooling the contents
      35             :  *    of a large transactions to disk. When the transaction is replayed the
      36             :  *    contents of individual (sub-)transactions will be read from disk in
      37             :  *    chunks.
      38             :  *
      39             :  *    This module also has to deal with reassembling toast records from the
      40             :  *    individual chunks stored in WAL. When a new (or initial) version of a
      41             :  *    tuple is stored in WAL it will always be preceded by the toast chunks
      42             :  *    emitted for the columns stored out of line. Within a single toplevel
      43             :  *    transaction there will be no other data carrying records between a row's
      44             :  *    toast chunks and the row data itself. See ReorderBufferToast* for
      45             :  *    details.
      46             :  * -------------------------------------------------------------------------
      47             :  */
      48             : #include "postgres.h"
      49             : 
      50             : #include <unistd.h>
      51             : #include <sys/stat.h>
      52             : 
      53             : #include "access/rewriteheap.h"
      54             : #include "access/transam.h"
      55             : #include "access/tuptoaster.h"
      56             : #include "access/xact.h"
      57             : #include "access/xlog_internal.h"
      58             : #include "catalog/catalog.h"
      59             : #include "lib/binaryheap.h"
      60             : #include "miscadmin.h"
      61             : #include "pgstat.h"
      62             : #include "replication/logical.h"
      63             : #include "replication/reorderbuffer.h"
      64             : #include "replication/slot.h"
      65             : #include "replication/snapbuild.h"    /* just for SnapBuildSnapDecRefcount */
      66             : #include "storage/bufmgr.h"
      67             : #include "storage/fd.h"
      68             : #include "storage/sinval.h"
      69             : #include "utils/builtins.h"
      70             : #include "utils/combocid.h"
      71             : #include "utils/memdebug.h"
      72             : #include "utils/memutils.h"
      73             : #include "utils/rel.h"
      74             : #include "utils/relfilenodemap.h"
      75             : #include "utils/tqual.h"
      76             : 
      77             : 
      78             : /* entry for a hash table we use to map from xid to our transaction state */
      79             : typedef struct ReorderBufferTXNByIdEnt
      80             : {
      81             :     TransactionId xid;
      82             :     ReorderBufferTXN *txn;
      83             : } ReorderBufferTXNByIdEnt;
      84             : 
      85             : /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
      86             : typedef struct ReorderBufferTupleCidKey
      87             : {
      88             :     RelFileNode relnode;
      89             :     ItemPointerData tid;
      90             : } ReorderBufferTupleCidKey;
      91             : 
      92             : typedef struct ReorderBufferTupleCidEnt
      93             : {
      94             :     ReorderBufferTupleCidKey key;
      95             :     CommandId   cmin;
      96             :     CommandId   cmax;
      97             :     CommandId   combocid;       /* just for debugging */
      98             : } ReorderBufferTupleCidEnt;
      99             : 
     100             : /* k-way in-order change iteration support structures */
     101             : typedef struct ReorderBufferIterTXNEntry
     102             : {
     103             :     XLogRecPtr  lsn;
     104             :     ReorderBufferChange *change;
     105             :     ReorderBufferTXN *txn;
     106             :     int         fd;
     107             :     XLogSegNo   segno;
     108             : } ReorderBufferIterTXNEntry;
     109             : 
     110             : typedef struct ReorderBufferIterTXNState
     111             : {
     112             :     binaryheap *heap;
     113             :     Size        nr_txns;
     114             :     dlist_head  old_change;
     115             :     ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER];
     116             : } ReorderBufferIterTXNState;
     117             : 
     118             : /* toast datastructures */
     119             : typedef struct ReorderBufferToastEnt
     120             : {
     121             :     Oid         chunk_id;       /* toast_table.chunk_id */
     122             :     int32       last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
     123             :                                  * have seen */
     124             :     Size        num_chunks;     /* number of chunks we've already seen */
     125             :     Size        size;           /* combined size of chunks seen */
     126             :     dlist_head  chunks;         /* linked list of chunks */
     127             :     struct varlena *reconstructed;  /* reconstructed varlena now pointed to in
     128             :                                      * main tup */
     129             : } ReorderBufferToastEnt;
     130             : 
     131             : /* Disk serialization support datastructures */
     132             : typedef struct ReorderBufferDiskChange
     133             : {
     134             :     Size        size;
     135             :     ReorderBufferChange change;
     136             :     /* data follows */
     137             : } ReorderBufferDiskChange;
     138             : 
     139             : /*
     140             :  * Maximum number of changes kept in memory, per transaction. After that,
     141             :  * changes are spooled to disk.
     142             :  *
     143             :  * The current value should be sufficient to decode the entire transaction
     144             :  * without hitting disk in OLTP workloads, while starting to spool to disk in
     145             :  * other workloads reasonably fast.
     146             :  *
     147             :  * At some point in the future it probably makes sense to have a more elaborate
     148             :  * resource management here, but it's not entirely clear what that would look
     149             :  * like.
     150             :  */
     151             : static const Size max_changes_in_memory = 4096;
     152             : 
     153             : /*
     154             :  * We use a very simple form of a slab allocator for frequently allocated
     155             :  * objects, simply keeping a fixed number in a linked list when unused,
     156             :  * instead pfree()ing them. Without that in many workloads aset.c becomes a
     157             :  * major bottleneck, especially when spilling to disk while decoding batch
     158             :  * workloads.
     159             :  */
     160             : static const Size max_cached_tuplebufs = 4096 * 2;  /* ~8MB */
     161             : 
     162             : /* ---------------------------------------
     163             :  * primary reorderbuffer support routines
     164             :  * ---------------------------------------
     165             :  */
     166             : static ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb);
     167             : static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
     168             : static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
     169             :                       TransactionId xid, bool create, bool *is_new,
     170             :                       XLogRecPtr lsn, bool create_as_top);
     171             : 
     172             : static void AssertTXNLsnOrder(ReorderBuffer *rb);
     173             : 
     174             : /* ---------------------------------------
     175             :  * support functions for lsn-order iterating over the ->changes of a
     176             :  * transaction and its subtransactions
     177             :  *
     178             :  * used for iteration over the k-way heap merge of a transaction and its
     179             :  * subtransactions
     180             :  * ---------------------------------------
     181             :  */
     182             : static ReorderBufferIterTXNState *ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn);
     183             : static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
     184             : static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
     185             :                            ReorderBufferIterTXNState *state);
     186             : static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn);
     187             : 
     188             : /*
     189             :  * ---------------------------------------
     190             :  * Disk serialization support functions
     191             :  * ---------------------------------------
     192             :  */
     193             : static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
     194             : static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
     195             : static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
     196             :                              int fd, ReorderBufferChange *change);
     197             : static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
     198             :                             int *fd, XLogSegNo *segno);
     199             : static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
     200             :                            char *change);
     201             : static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
     202             : 
     203             : static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
     204             : static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
     205             :                       ReorderBufferTXN *txn, CommandId cid);
     206             : 
     207             : /* ---------------------------------------
     208             :  * toast reassembly support
     209             :  * ---------------------------------------
     210             :  */
     211             : static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn);
     212             : static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn);
     213             : static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
     214             :                           Relation relation, ReorderBufferChange *change);
     215             : static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
     216             :                               Relation relation, ReorderBufferChange *change);
     217             : 
     218             : 
     219             : /*
     220             :  * Allocate a new ReorderBuffer
     221             :  */
     222             : ReorderBuffer *
     223           0 : ReorderBufferAllocate(void)
     224             : {
     225             :     ReorderBuffer *buffer;
     226             :     HASHCTL     hash_ctl;
     227             :     MemoryContext new_ctx;
     228             : 
     229             :     /* allocate memory in own context, to have better accountability */
     230           0 :     new_ctx = AllocSetContextCreate(CurrentMemoryContext,
     231             :                                     "ReorderBuffer",
     232             :                                     ALLOCSET_DEFAULT_SIZES);
     233             : 
     234           0 :     buffer =
     235             :         (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
     236             : 
     237           0 :     memset(&hash_ctl, 0, sizeof(hash_ctl));
     238             : 
     239           0 :     buffer->context = new_ctx;
     240             : 
     241           0 :     buffer->change_context = SlabContextCreate(new_ctx,
     242             :                                                "Change",
     243             :                                                SLAB_DEFAULT_BLOCK_SIZE,
     244             :                                                sizeof(ReorderBufferChange));
     245             : 
     246           0 :     buffer->txn_context = SlabContextCreate(new_ctx,
     247             :                                             "TXN",
     248             :                                             SLAB_DEFAULT_BLOCK_SIZE,
     249             :                                             sizeof(ReorderBufferTXN));
     250             : 
     251           0 :     hash_ctl.keysize = sizeof(TransactionId);
     252           0 :     hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
     253           0 :     hash_ctl.hcxt = buffer->context;
     254             : 
     255           0 :     buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
     256             :                                  HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
     257             : 
     258           0 :     buffer->by_txn_last_xid = InvalidTransactionId;
     259           0 :     buffer->by_txn_last_txn = NULL;
     260             : 
     261           0 :     buffer->nr_cached_tuplebufs = 0;
     262             : 
     263           0 :     buffer->outbuf = NULL;
     264           0 :     buffer->outbufsize = 0;
     265             : 
     266           0 :     buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
     267             : 
     268           0 :     dlist_init(&buffer->toplevel_by_lsn);
     269           0 :     slist_init(&buffer->cached_tuplebufs);
     270             : 
     271           0 :     return buffer;
     272             : }
     273             : 
     274             : /*
     275             :  * Free a ReorderBuffer
     276             :  */
     277             : void
     278           0 : ReorderBufferFree(ReorderBuffer *rb)
     279             : {
     280           0 :     MemoryContext context = rb->context;
     281             : 
     282             :     /*
     283             :      * We free separately allocated data by entirely scrapping reorderbuffer's
     284             :      * memory context.
     285             :      */
     286           0 :     MemoryContextDelete(context);
     287           0 : }
     288             : 
     289             : /*
     290             :  * Get an unused, possibly preallocated, ReorderBufferTXN.
     291             :  */
     292             : static ReorderBufferTXN *
     293           0 : ReorderBufferGetTXN(ReorderBuffer *rb)
     294             : {
     295             :     ReorderBufferTXN *txn;
     296             : 
     297           0 :     txn = (ReorderBufferTXN *)
     298           0 :         MemoryContextAlloc(rb->txn_context, sizeof(ReorderBufferTXN));
     299             : 
     300           0 :     memset(txn, 0, sizeof(ReorderBufferTXN));
     301             : 
     302           0 :     dlist_init(&txn->changes);
     303           0 :     dlist_init(&txn->tuplecids);
     304           0 :     dlist_init(&txn->subtxns);
     305             : 
     306           0 :     return txn;
     307             : }
     308             : 
     309             : /*
     310             :  * Free a ReorderBufferTXN.
     311             :  *
     312             :  * Deallocation might be delayed for efficiency purposes, for details check
     313             :  * the comments above max_cached_changes's definition.
     314             :  */
     315             : static void
     316           0 : ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
     317             : {
     318             :     /* clean the lookup cache if we were cached (quite likely) */
     319           0 :     if (rb->by_txn_last_xid == txn->xid)
     320             :     {
     321           0 :         rb->by_txn_last_xid = InvalidTransactionId;
     322           0 :         rb->by_txn_last_txn = NULL;
     323             :     }
     324             : 
     325             :     /* free data that's contained */
     326             : 
     327           0 :     if (txn->tuplecid_hash != NULL)
     328             :     {
     329           0 :         hash_destroy(txn->tuplecid_hash);
     330           0 :         txn->tuplecid_hash = NULL;
     331             :     }
     332             : 
     333           0 :     if (txn->invalidations)
     334             :     {
     335           0 :         pfree(txn->invalidations);
     336           0 :         txn->invalidations = NULL;
     337             :     }
     338             : 
     339           0 :     pfree(txn);
     340           0 : }
     341             : 
     342             : /*
     343             :  * Get an unused, possibly preallocated, ReorderBufferChange.
     344             :  */
     345             : ReorderBufferChange *
     346           0 : ReorderBufferGetChange(ReorderBuffer *rb)
     347             : {
     348             :     ReorderBufferChange *change;
     349             : 
     350           0 :     change = (ReorderBufferChange *)
     351           0 :         MemoryContextAlloc(rb->change_context, sizeof(ReorderBufferChange));
     352             : 
     353           0 :     memset(change, 0, sizeof(ReorderBufferChange));
     354           0 :     return change;
     355             : }
     356             : 
     357             : /*
     358             :  * Free an ReorderBufferChange.
     359             :  *
     360             :  * Deallocation might be delayed for efficiency purposes, for details check
     361             :  * the comments above max_cached_changes's definition.
     362             :  */
     363             : void
     364           0 : ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
     365             : {
     366             :     /* free contained data */
     367           0 :     switch (change->action)
     368             :     {
     369             :         case REORDER_BUFFER_CHANGE_INSERT:
     370             :         case REORDER_BUFFER_CHANGE_UPDATE:
     371             :         case REORDER_BUFFER_CHANGE_DELETE:
     372             :         case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
     373           0 :             if (change->data.tp.newtuple)
     374             :             {
     375           0 :                 ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
     376           0 :                 change->data.tp.newtuple = NULL;
     377             :             }
     378             : 
     379           0 :             if (change->data.tp.oldtuple)
     380             :             {
     381           0 :                 ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
     382           0 :                 change->data.tp.oldtuple = NULL;
     383             :             }
     384           0 :             break;
     385             :         case REORDER_BUFFER_CHANGE_MESSAGE:
     386           0 :             if (change->data.msg.prefix != NULL)
     387           0 :                 pfree(change->data.msg.prefix);
     388           0 :             change->data.msg.prefix = NULL;
     389           0 :             if (change->data.msg.message != NULL)
     390           0 :                 pfree(change->data.msg.message);
     391           0 :             change->data.msg.message = NULL;
     392           0 :             break;
     393             :         case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
     394           0 :             if (change->data.snapshot)
     395             :             {
     396           0 :                 ReorderBufferFreeSnap(rb, change->data.snapshot);
     397           0 :                 change->data.snapshot = NULL;
     398             :             }
     399           0 :             break;
     400             :             /* no data in addition to the struct itself */
     401             :         case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
     402             :         case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
     403             :         case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
     404           0 :             break;
     405             :     }
     406             : 
     407           0 :     pfree(change);
     408           0 : }
     409             : 
     410             : /*
     411             :  * Get an unused, possibly preallocated, ReorderBufferTupleBuf fitting at
     412             :  * least a tuple of size tuple_len (excluding header overhead).
     413             :  */
     414             : ReorderBufferTupleBuf *
     415           0 : ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
     416             : {
     417             :     ReorderBufferTupleBuf *tuple;
     418             :     Size        alloc_len;
     419             : 
     420           0 :     alloc_len = tuple_len + SizeofHeapTupleHeader;
     421             : 
     422             :     /*
     423             :      * Most tuples are below MaxHeapTupleSize, so we use a slab allocator for
     424             :      * those. Thus always allocate at least MaxHeapTupleSize. Note that tuples
     425             :      * generated for oldtuples can be bigger, as they don't have out-of-line
     426             :      * toast columns.
     427             :      */
     428           0 :     if (alloc_len < MaxHeapTupleSize)
     429           0 :         alloc_len = MaxHeapTupleSize;
     430             : 
     431             : 
     432             :     /* if small enough, check the slab cache */
     433           0 :     if (alloc_len <= MaxHeapTupleSize && rb->nr_cached_tuplebufs)
     434             :     {
     435           0 :         rb->nr_cached_tuplebufs--;
     436           0 :         tuple = slist_container(ReorderBufferTupleBuf, node,
     437             :                                 slist_pop_head_node(&rb->cached_tuplebufs));
     438           0 :         Assert(tuple->alloc_tuple_size == MaxHeapTupleSize);
     439             : #ifdef USE_ASSERT_CHECKING
     440           0 :         memset(&tuple->tuple, 0xa9, sizeof(HeapTupleData));
     441             :         VALGRIND_MAKE_MEM_UNDEFINED(&tuple->tuple, sizeof(HeapTupleData));
     442             : #endif
     443           0 :         tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
     444             : #ifdef USE_ASSERT_CHECKING
     445           0 :         memset(tuple->tuple.t_data, 0xa8, tuple->alloc_tuple_size);
     446             :         VALGRIND_MAKE_MEM_UNDEFINED(tuple->tuple.t_data, tuple->alloc_tuple_size);
     447             : #endif
     448             :     }
     449             :     else
     450             :     {
     451           0 :         tuple = (ReorderBufferTupleBuf *)
     452           0 :             MemoryContextAlloc(rb->context,
     453             :                                sizeof(ReorderBufferTupleBuf) +
     454             :                                MAXIMUM_ALIGNOF + alloc_len);
     455           0 :         tuple->alloc_tuple_size = alloc_len;
     456           0 :         tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
     457             :     }
     458             : 
     459           0 :     return tuple;
     460             : }
     461             : 
     462             : /*
     463             :  * Free an ReorderBufferTupleBuf.
     464             :  *
     465             :  * Deallocation might be delayed for efficiency purposes, for details check
     466             :  * the comments above max_cached_changes's definition.
     467             :  */
     468             : void
     469           0 : ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
     470             : {
     471             :     /* check whether to put into the slab cache, oversized tuples never are */
     472           0 :     if (tuple->alloc_tuple_size == MaxHeapTupleSize &&
     473           0 :         rb->nr_cached_tuplebufs < max_cached_tuplebufs)
     474             :     {
     475           0 :         rb->nr_cached_tuplebufs++;
     476           0 :         slist_push_head(&rb->cached_tuplebufs, &tuple->node);
     477             :         VALGRIND_MAKE_MEM_UNDEFINED(tuple->tuple.t_data, tuple->alloc_tuple_size);
     478             :         VALGRIND_MAKE_MEM_UNDEFINED(tuple, sizeof(ReorderBufferTupleBuf));
     479             :         VALGRIND_MAKE_MEM_DEFINED(&tuple->node, sizeof(tuple->node));
     480             :         VALGRIND_MAKE_MEM_DEFINED(&tuple->alloc_tuple_size, sizeof(tuple->alloc_tuple_size));
     481             :     }
     482             :     else
     483             :     {
     484           0 :         pfree(tuple);
     485             :     }
     486           0 : }
     487             : 
     488             : /*
     489             :  * Return the ReorderBufferTXN from the given buffer, specified by Xid.
     490             :  * If create is true, and a transaction doesn't already exist, create it
     491             :  * (with the given LSN, and as top transaction if that's specified);
     492             :  * when this happens, is_new is set to true.
     493             :  */
     494             : static ReorderBufferTXN *
     495           0 : ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
     496             :                       bool *is_new, XLogRecPtr lsn, bool create_as_top)
     497             : {
     498             :     ReorderBufferTXN *txn;
     499             :     ReorderBufferTXNByIdEnt *ent;
     500             :     bool        found;
     501             : 
     502           0 :     Assert(TransactionIdIsValid(xid));
     503           0 :     Assert(!create || lsn != InvalidXLogRecPtr);
     504             : 
     505             :     /*
     506             :      * Check the one-entry lookup cache first
     507             :      */
     508           0 :     if (TransactionIdIsValid(rb->by_txn_last_xid) &&
     509           0 :         rb->by_txn_last_xid == xid)
     510             :     {
     511           0 :         txn = rb->by_txn_last_txn;
     512             : 
     513           0 :         if (txn != NULL)
     514             :         {
     515             :             /* found it, and it's valid */
     516           0 :             if (is_new)
     517           0 :                 *is_new = false;
     518           0 :             return txn;
     519             :         }
     520             : 
     521             :         /*
     522             :          * cached as non-existent, and asked not to create? Then nothing else
     523             :          * to do.
     524             :          */
     525           0 :         if (!create)
     526           0 :             return NULL;
     527             :         /* otherwise fall through to create it */
     528             :     }
     529             : 
     530             :     /*
     531             :      * If the cache wasn't hit or it yielded an "does-not-exist" and we want
     532             :      * to create an entry.
     533             :      */
     534             : 
     535             :     /* search the lookup table */
     536           0 :     ent = (ReorderBufferTXNByIdEnt *)
     537           0 :         hash_search(rb->by_txn,
     538             :                     (void *) &xid,
     539             :                     create ? HASH_ENTER : HASH_FIND,
     540             :                     &found);
     541           0 :     if (found)
     542           0 :         txn = ent->txn;
     543           0 :     else if (create)
     544             :     {
     545             :         /* initialize the new entry, if creation was requested */
     546           0 :         Assert(ent != NULL);
     547             : 
     548           0 :         ent->txn = ReorderBufferGetTXN(rb);
     549           0 :         ent->txn->xid = xid;
     550           0 :         txn = ent->txn;
     551           0 :         txn->first_lsn = lsn;
     552           0 :         txn->restart_decoding_lsn = rb->current_restart_decoding_lsn;
     553             : 
     554           0 :         if (create_as_top)
     555             :         {
     556           0 :             dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
     557           0 :             AssertTXNLsnOrder(rb);
     558             :         }
     559             :     }
     560             :     else
     561           0 :         txn = NULL;             /* not found and not asked to create */
     562             : 
     563             :     /* update cache */
     564           0 :     rb->by_txn_last_xid = xid;
     565           0 :     rb->by_txn_last_txn = txn;
     566             : 
     567           0 :     if (is_new)
     568           0 :         *is_new = !found;
     569             : 
     570           0 :     Assert(!create || txn != NULL);
     571           0 :     return txn;
     572             : }
     573             : 
     574             : /*
     575             :  * Queue a change into a transaction so it can be replayed upon commit.
     576             :  */
     577             : void
     578           0 : ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
     579             :                          ReorderBufferChange *change)
     580             : {
     581             :     ReorderBufferTXN *txn;
     582             : 
     583           0 :     txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
     584             : 
     585           0 :     change->lsn = lsn;
     586           0 :     Assert(InvalidXLogRecPtr != lsn);
     587           0 :     dlist_push_tail(&txn->changes, &change->node);
     588           0 :     txn->nentries++;
     589           0 :     txn->nentries_mem++;
     590             : 
     591           0 :     ReorderBufferCheckSerializeTXN(rb, txn);
     592           0 : }
     593             : 
     594             : /*
     595             :  * Queue message into a transaction so it can be processed upon commit.
     596             :  */
     597             : void
     598           0 : ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
     599             :                           Snapshot snapshot, XLogRecPtr lsn,
     600             :                           bool transactional, const char *prefix,
     601             :                           Size message_size, const char *message)
     602             : {
     603           0 :     if (transactional)
     604             :     {
     605             :         MemoryContext oldcontext;
     606             :         ReorderBufferChange *change;
     607             : 
     608           0 :         Assert(xid != InvalidTransactionId);
     609             : 
     610           0 :         oldcontext = MemoryContextSwitchTo(rb->context);
     611             : 
     612           0 :         change = ReorderBufferGetChange(rb);
     613           0 :         change->action = REORDER_BUFFER_CHANGE_MESSAGE;
     614           0 :         change->data.msg.prefix = pstrdup(prefix);
     615           0 :         change->data.msg.message_size = message_size;
     616           0 :         change->data.msg.message = palloc(message_size);
     617           0 :         memcpy(change->data.msg.message, message, message_size);
     618             : 
     619           0 :         ReorderBufferQueueChange(rb, xid, lsn, change);
     620             : 
     621           0 :         MemoryContextSwitchTo(oldcontext);
     622             :     }
     623             :     else
     624             :     {
     625           0 :         ReorderBufferTXN *txn = NULL;
     626           0 :         volatile Snapshot snapshot_now = snapshot;
     627             : 
     628           0 :         if (xid != InvalidTransactionId)
     629           0 :             txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
     630             : 
     631             :         /* setup snapshot to allow catalog access */
     632           0 :         SetupHistoricSnapshot(snapshot_now, NULL);
     633           0 :         PG_TRY();
     634             :         {
     635           0 :             rb->message(rb, txn, lsn, false, prefix, message_size, message);
     636             : 
     637           0 :             TeardownHistoricSnapshot(false);
     638             :         }
     639           0 :         PG_CATCH();
     640             :         {
     641           0 :             TeardownHistoricSnapshot(true);
     642           0 :             PG_RE_THROW();
     643             :         }
     644           0 :         PG_END_TRY();
     645             :     }
     646           0 : }
     647             : 
     648             : 
     649             : static void
     650           0 : AssertTXNLsnOrder(ReorderBuffer *rb)
     651             : {
     652             : #ifdef USE_ASSERT_CHECKING
     653             :     dlist_iter  iter;
     654           0 :     XLogRecPtr  prev_first_lsn = InvalidXLogRecPtr;
     655             : 
     656           0 :     dlist_foreach(iter, &rb->toplevel_by_lsn)
     657             :     {
     658             :         ReorderBufferTXN *cur_txn;
     659             : 
     660           0 :         cur_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
     661           0 :         Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
     662             : 
     663           0 :         if (cur_txn->end_lsn != InvalidXLogRecPtr)
     664           0 :             Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
     665             : 
     666           0 :         if (prev_first_lsn != InvalidXLogRecPtr)
     667           0 :             Assert(prev_first_lsn < cur_txn->first_lsn);
     668             : 
     669           0 :         Assert(!cur_txn->is_known_as_subxact);
     670           0 :         prev_first_lsn = cur_txn->first_lsn;
     671             :     }
     672             : #endif
     673           0 : }
     674             : 
     675             : ReorderBufferTXN *
     676           0 : ReorderBufferGetOldestTXN(ReorderBuffer *rb)
     677             : {
     678             :     ReorderBufferTXN *txn;
     679             : 
     680           0 :     if (dlist_is_empty(&rb->toplevel_by_lsn))
     681           0 :         return NULL;
     682             : 
     683           0 :     AssertTXNLsnOrder(rb);
     684             : 
     685           0 :     txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
     686             : 
     687           0 :     Assert(!txn->is_known_as_subxact);
     688           0 :     Assert(txn->first_lsn != InvalidXLogRecPtr);
     689           0 :     return txn;
     690             : }
     691             : 
     692             : void
     693           0 : ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
     694             : {
     695           0 :     rb->current_restart_decoding_lsn = ptr;
     696           0 : }
     697             : 
     698             : void
     699           0 : ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
     700             :                          TransactionId subxid, XLogRecPtr lsn)
     701             : {
     702             :     ReorderBufferTXN *txn;
     703             :     ReorderBufferTXN *subtxn;
     704             :     bool        new_top;
     705             :     bool        new_sub;
     706             : 
     707           0 :     txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
     708           0 :     subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
     709             : 
     710           0 :     if (new_sub)
     711             :     {
     712             :         /*
     713             :          * we assign subtransactions to top level transaction even if we don't
     714             :          * have data for it yet, assignment records frequently reference xids
     715             :          * that have not yet produced any records. Knowing those aren't top
     716             :          * level xids allows us to make processing cheaper in some places.
     717             :          */
     718           0 :         dlist_push_tail(&txn->subtxns, &subtxn->node);
     719           0 :         txn->nsubtxns++;
     720             :     }
     721           0 :     else if (!subtxn->is_known_as_subxact)
     722             :     {
     723           0 :         subtxn->is_known_as_subxact = true;
     724           0 :         Assert(subtxn->nsubtxns == 0);
     725             : 
     726             :         /* remove from lsn order list of top-level transactions */
     727           0 :         dlist_delete(&subtxn->node);
     728             : 
     729             :         /* add to toplevel transaction */
     730           0 :         dlist_push_tail(&txn->subtxns, &subtxn->node);
     731           0 :         txn->nsubtxns++;
     732             :     }
     733           0 :     else if (new_top)
     734             :     {
     735           0 :         elog(ERROR, "existing subxact assigned to unknown toplevel xact");
     736             :     }
     737           0 : }
     738             : 
     739             : /*
     740             :  * Associate a subtransaction with its toplevel transaction at commit
     741             :  * time. There may be no further changes added after this.
     742             :  */
     743             : void
     744           0 : ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
     745             :                          TransactionId subxid, XLogRecPtr commit_lsn,
     746             :                          XLogRecPtr end_lsn)
     747             : {
     748             :     ReorderBufferTXN *txn;
     749             :     ReorderBufferTXN *subtxn;
     750             : 
     751           0 :     subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
     752             :                                    InvalidXLogRecPtr, false);
     753             : 
     754             :     /*
     755             :      * No need to do anything if that subtxn didn't contain any changes
     756             :      */
     757           0 :     if (!subtxn)
     758           0 :         return;
     759             : 
     760           0 :     txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true);
     761             : 
     762           0 :     if (txn == NULL)
     763           0 :         elog(ERROR, "subxact logged without previous toplevel record");
     764             : 
     765             :     /*
     766             :      * Pass our base snapshot to the parent transaction if it doesn't have
     767             :      * one, or ours is older. That can happen if there are no changes in the
     768             :      * toplevel transaction but in one of the child transactions. This allows
     769             :      * the parent to simply use its base snapshot initially.
     770             :      */
     771           0 :     if (subtxn->base_snapshot != NULL &&
     772           0 :         (txn->base_snapshot == NULL ||
     773           0 :          txn->base_snapshot_lsn > subtxn->base_snapshot_lsn))
     774             :     {
     775           0 :         txn->base_snapshot = subtxn->base_snapshot;
     776           0 :         txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
     777           0 :         subtxn->base_snapshot = NULL;
     778           0 :         subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
     779             :     }
     780             : 
     781           0 :     subtxn->final_lsn = commit_lsn;
     782           0 :     subtxn->end_lsn = end_lsn;
     783             : 
     784           0 :     if (!subtxn->is_known_as_subxact)
     785             :     {
     786           0 :         subtxn->is_known_as_subxact = true;
     787           0 :         Assert(subtxn->nsubtxns == 0);
     788             : 
     789             :         /* remove from lsn order list of top-level transactions */
     790           0 :         dlist_delete(&subtxn->node);
     791             : 
     792             :         /* add to subtransaction list */
     793           0 :         dlist_push_tail(&txn->subtxns, &subtxn->node);
     794           0 :         txn->nsubtxns++;
     795             :     }
     796             : }
     797             : 
     798             : 
     799             : /*
     800             :  * Support for efficiently iterating over a transaction's and its
     801             :  * subtransactions' changes.
     802             :  *
     803             :  * We do by doing a k-way merge between transactions/subtransactions. For that
     804             :  * we model the current heads of the different transactions as a binary heap
     805             :  * so we easily know which (sub-)transaction has the change with the smallest
     806             :  * lsn next.
     807             :  *
     808             :  * We assume the changes in individual transactions are already sorted by LSN.
     809             :  */
     810             : 
     811             : /*
     812             :  * Binary heap comparison function.
     813             :  */
     814             : static int
     815           0 : ReorderBufferIterCompare(Datum a, Datum b, void *arg)
     816             : {
     817           0 :     ReorderBufferIterTXNState *state = (ReorderBufferIterTXNState *) arg;
     818           0 :     XLogRecPtr  pos_a = state->entries[DatumGetInt32(a)].lsn;
     819           0 :     XLogRecPtr  pos_b = state->entries[DatumGetInt32(b)].lsn;
     820             : 
     821           0 :     if (pos_a < pos_b)
     822           0 :         return 1;
     823           0 :     else if (pos_a == pos_b)
     824           0 :         return 0;
     825           0 :     return -1;
     826             : }
     827             : 
     828             : /*
     829             :  * Allocate & initialize an iterator which iterates in lsn order over a
     830             :  * transaction and all its subtransactions.
     831             :  */
     832             : static ReorderBufferIterTXNState *
     833           0 : ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
     834             : {
     835           0 :     Size        nr_txns = 0;
     836             :     ReorderBufferIterTXNState *state;
     837             :     dlist_iter  cur_txn_i;
     838             :     int32       off;
     839             : 
     840             :     /*
     841             :      * Calculate the size of our heap: one element for every transaction that
     842             :      * contains changes.  (Besides the transactions already in the reorder
     843             :      * buffer, we count the one we were directly passed.)
     844             :      */
     845           0 :     if (txn->nentries > 0)
     846           0 :         nr_txns++;
     847             : 
     848           0 :     dlist_foreach(cur_txn_i, &txn->subtxns)
     849             :     {
     850             :         ReorderBufferTXN *cur_txn;
     851             : 
     852           0 :         cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
     853             : 
     854           0 :         if (cur_txn->nentries > 0)
     855           0 :             nr_txns++;
     856             :     }
     857             : 
     858             :     /*
     859             :      * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no
     860             :      * need to allocate/build a heap then.
     861             :      */
     862             : 
     863             :     /* allocate iteration state */
     864           0 :     state = (ReorderBufferIterTXNState *)
     865           0 :         MemoryContextAllocZero(rb->context,
     866             :                                sizeof(ReorderBufferIterTXNState) +
     867           0 :                                sizeof(ReorderBufferIterTXNEntry) * nr_txns);
     868             : 
     869           0 :     state->nr_txns = nr_txns;
     870           0 :     dlist_init(&state->old_change);
     871             : 
     872           0 :     for (off = 0; off < state->nr_txns; off++)
     873             :     {
     874           0 :         state->entries[off].fd = -1;
     875           0 :         state->entries[off].segno = 0;
     876             :     }
     877             : 
     878             :     /* allocate heap */
     879           0 :     state->heap = binaryheap_allocate(state->nr_txns,
     880             :                                       ReorderBufferIterCompare,
     881             :                                       state);
     882             : 
     883             :     /*
     884             :      * Now insert items into the binary heap, in an unordered fashion.  (We
     885             :      * will run a heap assembly step at the end; this is more efficient.)
     886             :      */
     887             : 
     888           0 :     off = 0;
     889             : 
     890             :     /* add toplevel transaction if it contains changes */
     891           0 :     if (txn->nentries > 0)
     892             :     {
     893             :         ReorderBufferChange *cur_change;
     894             : 
     895           0 :         if (txn->serialized)
     896             :         {
     897             :             /* serialize remaining changes */
     898           0 :             ReorderBufferSerializeTXN(rb, txn);
     899           0 :             ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
     900             :                                         &state->entries[off].segno);
     901             :         }
     902             : 
     903           0 :         cur_change = dlist_head_element(ReorderBufferChange, node,
     904             :                                         &txn->changes);
     905             : 
     906           0 :         state->entries[off].lsn = cur_change->lsn;
     907           0 :         state->entries[off].change = cur_change;
     908           0 :         state->entries[off].txn = txn;
     909             : 
     910           0 :         binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
     911             :     }
     912             : 
     913             :     /* add subtransactions if they contain changes */
     914           0 :     dlist_foreach(cur_txn_i, &txn->subtxns)
     915             :     {
     916             :         ReorderBufferTXN *cur_txn;
     917             : 
     918           0 :         cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
     919             : 
     920           0 :         if (cur_txn->nentries > 0)
     921             :         {
     922             :             ReorderBufferChange *cur_change;
     923             : 
     924           0 :             if (cur_txn->serialized)
     925             :             {
     926             :                 /* serialize remaining changes */
     927           0 :                 ReorderBufferSerializeTXN(rb, cur_txn);
     928           0 :                 ReorderBufferRestoreChanges(rb, cur_txn,
     929             :                                             &state->entries[off].fd,
     930             :                                             &state->entries[off].segno);
     931             :             }
     932           0 :             cur_change = dlist_head_element(ReorderBufferChange, node,
     933             :                                             &cur_txn->changes);
     934             : 
     935           0 :             state->entries[off].lsn = cur_change->lsn;
     936           0 :             state->entries[off].change = cur_change;
     937           0 :             state->entries[off].txn = cur_txn;
     938             : 
     939           0 :             binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
     940             :         }
     941             :     }
     942             : 
     943             :     /* assemble a valid binary heap */
     944           0 :     binaryheap_build(state->heap);
     945             : 
     946           0 :     return state;
     947             : }
     948             : 
     949             : /*
     950             :  * Return the next change when iterating over a transaction and its
     951             :  * subtransactions.
     952             :  *
     953             :  * Returns NULL when no further changes exist.
     954             :  */
     955             : static ReorderBufferChange *
     956           0 : ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
     957             : {
     958             :     ReorderBufferChange *change;
     959             :     ReorderBufferIterTXNEntry *entry;
     960             :     int32       off;
     961             : 
     962             :     /* nothing there anymore */
     963           0 :     if (state->heap->bh_size == 0)
     964           0 :         return NULL;
     965             : 
     966           0 :     off = DatumGetInt32(binaryheap_first(state->heap));
     967           0 :     entry = &state->entries[off];
     968             : 
     969             :     /* free memory we might have "leaked" in the previous *Next call */
     970           0 :     if (!dlist_is_empty(&state->old_change))
     971             :     {
     972           0 :         change = dlist_container(ReorderBufferChange, node,
     973             :                                  dlist_pop_head_node(&state->old_change));
     974           0 :         ReorderBufferReturnChange(rb, change);
     975           0 :         Assert(dlist_is_empty(&state->old_change));
     976             :     }
     977             : 
     978           0 :     change = entry->change;
     979             : 
     980             :     /*
     981             :      * update heap with information about which transaction has the next
     982             :      * relevant change in LSN order
     983             :      */
     984             : 
     985             :     /* there are in-memory changes */
     986           0 :     if (dlist_has_next(&entry->txn->changes, &entry->change->node))
     987             :     {
     988           0 :         dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
     989           0 :         ReorderBufferChange *next_change =
     990           0 :         dlist_container(ReorderBufferChange, node, next);
     991             : 
     992             :         /* txn stays the same */
     993           0 :         state->entries[off].lsn = next_change->lsn;
     994           0 :         state->entries[off].change = next_change;
     995             : 
     996           0 :         binaryheap_replace_first(state->heap, Int32GetDatum(off));
     997           0 :         return change;
     998             :     }
     999             : 
    1000             :     /* try to load changes from disk */
    1001           0 :     if (entry->txn->nentries != entry->txn->nentries_mem)
    1002             :     {
    1003             :         /*
    1004             :          * Ugly: restoring changes will reuse *Change records, thus delete the
    1005             :          * current one from the per-tx list and only free in the next call.
    1006             :          */
    1007           0 :         dlist_delete(&change->node);
    1008           0 :         dlist_push_tail(&state->old_change, &change->node);
    1009             : 
    1010           0 :         if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
    1011             :                                         &state->entries[off].segno))
    1012             :         {
    1013             :             /* successfully restored changes from disk */
    1014           0 :             ReorderBufferChange *next_change =
    1015           0 :             dlist_head_element(ReorderBufferChange, node,
    1016             :                                &entry->txn->changes);
    1017             : 
    1018           0 :             elog(DEBUG2, "restored %u/%u changes from disk",
    1019             :                  (uint32) entry->txn->nentries_mem,
    1020             :                  (uint32) entry->txn->nentries);
    1021             : 
    1022           0 :             Assert(entry->txn->nentries_mem);
    1023             :             /* txn stays the same */
    1024           0 :             state->entries[off].lsn = next_change->lsn;
    1025           0 :             state->entries[off].change = next_change;
    1026           0 :             binaryheap_replace_first(state->heap, Int32GetDatum(off));
    1027             : 
    1028           0 :             return change;
    1029             :         }
    1030             :     }
    1031             : 
    1032             :     /* ok, no changes there anymore, remove */
    1033           0 :     binaryheap_remove_first(state->heap);
    1034             : 
    1035           0 :     return change;
    1036             : }
    1037             : 
    1038             : /*
    1039             :  * Deallocate the iterator
    1040             :  */
    1041             : static void
    1042           0 : ReorderBufferIterTXNFinish(ReorderBuffer *rb,
    1043             :                            ReorderBufferIterTXNState *state)
    1044             : {
    1045             :     int32       off;
    1046             : 
    1047           0 :     for (off = 0; off < state->nr_txns; off++)
    1048             :     {
    1049           0 :         if (state->entries[off].fd != -1)
    1050           0 :             CloseTransientFile(state->entries[off].fd);
    1051             :     }
    1052             : 
    1053             :     /* free memory we might have "leaked" in the last *Next call */
    1054           0 :     if (!dlist_is_empty(&state->old_change))
    1055             :     {
    1056             :         ReorderBufferChange *change;
    1057             : 
    1058           0 :         change = dlist_container(ReorderBufferChange, node,
    1059             :                                  dlist_pop_head_node(&state->old_change));
    1060           0 :         ReorderBufferReturnChange(rb, change);
    1061           0 :         Assert(dlist_is_empty(&state->old_change));
    1062             :     }
    1063             : 
    1064           0 :     binaryheap_free(state->heap);
    1065           0 :     pfree(state);
    1066           0 : }
    1067             : 
    1068             : /*
    1069             :  * Cleanup the contents of a transaction, usually after the transaction
    1070             :  * committed or aborted.
    1071             :  */
    1072             : static void
    1073           0 : ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
    1074             : {
    1075             :     bool        found;
    1076             :     dlist_mutable_iter iter;
    1077             : 
    1078             :     /* cleanup subtransactions & their changes */
    1079           0 :     dlist_foreach_modify(iter, &txn->subtxns)
    1080             :     {
    1081             :         ReorderBufferTXN *subtxn;
    1082             : 
    1083           0 :         subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
    1084             : 
    1085             :         /*
    1086             :          * Subtransactions are always associated to the toplevel TXN, even if
    1087             :          * they originally were happening inside another subtxn, so we won't
    1088             :          * ever recurse more than one level deep here.
    1089             :          */
    1090           0 :         Assert(subtxn->is_known_as_subxact);
    1091           0 :         Assert(subtxn->nsubtxns == 0);
    1092             : 
    1093           0 :         ReorderBufferCleanupTXN(rb, subtxn);
    1094             :     }
    1095             : 
    1096             :     /* cleanup changes in the toplevel txn */
    1097           0 :     dlist_foreach_modify(iter, &txn->changes)
    1098             :     {
    1099             :         ReorderBufferChange *change;
    1100             : 
    1101           0 :         change = dlist_container(ReorderBufferChange, node, iter.cur);
    1102             : 
    1103           0 :         ReorderBufferReturnChange(rb, change);
    1104             :     }
    1105             : 
    1106             :     /*
    1107             :      * Cleanup the tuplecids we stored for decoding catalog snapshot access.
    1108             :      * They are always stored in the toplevel transaction.
    1109             :      */
    1110           0 :     dlist_foreach_modify(iter, &txn->tuplecids)
    1111             :     {
    1112             :         ReorderBufferChange *change;
    1113             : 
    1114           0 :         change = dlist_container(ReorderBufferChange, node, iter.cur);
    1115           0 :         Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
    1116           0 :         ReorderBufferReturnChange(rb, change);
    1117             :     }
    1118             : 
    1119           0 :     if (txn->base_snapshot != NULL)
    1120             :     {
    1121           0 :         SnapBuildSnapDecRefcount(txn->base_snapshot);
    1122           0 :         txn->base_snapshot = NULL;
    1123           0 :         txn->base_snapshot_lsn = InvalidXLogRecPtr;
    1124             :     }
    1125             : 
    1126             :     /*
    1127             :      * Remove TXN from its containing list.
    1128             :      *
    1129             :      * Note: if txn->is_known_as_subxact, we are deleting the TXN from its
    1130             :      * parent's list of known subxacts; this leaves the parent's nsubxacts
    1131             :      * count too high, but we don't care.  Otherwise, we are deleting the TXN
    1132             :      * from the LSN-ordered list of toplevel TXNs.
    1133             :      */
    1134           0 :     dlist_delete(&txn->node);
    1135             : 
    1136             :     /* now remove reference from buffer */
    1137           0 :     hash_search(rb->by_txn,
    1138           0 :                 (void *) &txn->xid,
    1139             :                 HASH_REMOVE,
    1140             :                 &found);
    1141           0 :     Assert(found);
    1142             : 
    1143             :     /* remove entries spilled to disk */
    1144           0 :     if (txn->serialized)
    1145           0 :         ReorderBufferRestoreCleanup(rb, txn);
    1146             : 
    1147             :     /* deallocate */
    1148           0 :     ReorderBufferReturnTXN(rb, txn);
    1149           0 : }
    1150             : 
    1151             : /*
    1152             :  * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by
    1153             :  * tqual.c's HeapTupleSatisfiesHistoricMVCC.
    1154             :  */
    1155             : static void
    1156           0 : ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
    1157             : {
    1158             :     dlist_iter  iter;
    1159             :     HASHCTL     hash_ctl;
    1160             : 
    1161           0 :     if (!txn->has_catalog_changes || dlist_is_empty(&txn->tuplecids))
    1162           0 :         return;
    1163             : 
    1164           0 :     memset(&hash_ctl, 0, sizeof(hash_ctl));
    1165             : 
    1166           0 :     hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
    1167           0 :     hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
    1168           0 :     hash_ctl.hcxt = rb->context;
    1169             : 
    1170             :     /*
    1171             :      * create the hash with the exact number of to-be-stored tuplecids from
    1172             :      * the start
    1173             :      */
    1174           0 :     txn->tuplecid_hash =
    1175           0 :         hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
    1176             :                     HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
    1177             : 
    1178           0 :     dlist_foreach(iter, &txn->tuplecids)
    1179             :     {
    1180             :         ReorderBufferTupleCidKey key;
    1181             :         ReorderBufferTupleCidEnt *ent;
    1182             :         bool        found;
    1183             :         ReorderBufferChange *change;
    1184             : 
    1185           0 :         change = dlist_container(ReorderBufferChange, node, iter.cur);
    1186             : 
    1187           0 :         Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
    1188             : 
    1189             :         /* be careful about padding */
    1190           0 :         memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
    1191             : 
    1192           0 :         key.relnode = change->data.tuplecid.node;
    1193             : 
    1194           0 :         ItemPointerCopy(&change->data.tuplecid.tid,
    1195             :                         &key.tid);
    1196             : 
    1197           0 :         ent = (ReorderBufferTupleCidEnt *)
    1198           0 :             hash_search(txn->tuplecid_hash,
    1199             :                         (void *) &key,
    1200             :                         HASH_ENTER | HASH_FIND,
    1201             :                         &found);
    1202           0 :         if (!found)
    1203             :         {
    1204           0 :             ent->cmin = change->data.tuplecid.cmin;
    1205           0 :             ent->cmax = change->data.tuplecid.cmax;
    1206           0 :             ent->combocid = change->data.tuplecid.combocid;
    1207             :         }
    1208             :         else
    1209             :         {
    1210           0 :             Assert(ent->cmin == change->data.tuplecid.cmin);
    1211           0 :             Assert(ent->cmax == InvalidCommandId ||
    1212             :                    ent->cmax == change->data.tuplecid.cmax);
    1213             : 
    1214             :             /*
    1215             :              * if the tuple got valid in this transaction and now got deleted
    1216             :              * we already have a valid cmin stored. The cmax will be
    1217             :              * InvalidCommandId though.
    1218             :              */
    1219           0 :             ent->cmax = change->data.tuplecid.cmax;
    1220             :         }
    1221             :     }
    1222             : }
    1223             : 
    1224             : /*
    1225             :  * Copy a provided snapshot so we can modify it privately. This is needed so
    1226             :  * that catalog modifying transactions can look into intermediate catalog
    1227             :  * states.
    1228             :  */
    1229             : static Snapshot
    1230           0 : ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
    1231             :                       ReorderBufferTXN *txn, CommandId cid)
    1232             : {
    1233             :     Snapshot    snap;
    1234             :     dlist_iter  iter;
    1235           0 :     int         i = 0;
    1236             :     Size        size;
    1237             : 
    1238           0 :     size = sizeof(SnapshotData) +
    1239           0 :         sizeof(TransactionId) * orig_snap->xcnt +
    1240           0 :         sizeof(TransactionId) * (txn->nsubtxns + 1);
    1241             : 
    1242           0 :     snap = MemoryContextAllocZero(rb->context, size);
    1243           0 :     memcpy(snap, orig_snap, sizeof(SnapshotData));
    1244             : 
    1245           0 :     snap->copied = true;
    1246           0 :     snap->active_count = 1;      /* mark as active so nobody frees it */
    1247           0 :     snap->regd_count = 0;
    1248           0 :     snap->xip = (TransactionId *) (snap + 1);
    1249             : 
    1250           0 :     memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
    1251             : 
    1252             :     /*
    1253             :      * snap->subxip contains all txids that belong to our transaction which we
    1254             :      * need to check via cmin/cmax. That's why we store the toplevel
    1255             :      * transaction in there as well.
    1256             :      */
    1257           0 :     snap->subxip = snap->xip + snap->xcnt;
    1258           0 :     snap->subxip[i++] = txn->xid;
    1259             : 
    1260             :     /*
    1261             :      * nsubxcnt isn't decreased when subtransactions abort, so count manually.
    1262             :      * Since it's an upper boundary it is safe to use it for the allocation
    1263             :      * above.
    1264             :      */
    1265           0 :     snap->subxcnt = 1;
    1266             : 
    1267           0 :     dlist_foreach(iter, &txn->subtxns)
    1268             :     {
    1269             :         ReorderBufferTXN *sub_txn;
    1270             : 
    1271           0 :         sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
    1272           0 :         snap->subxip[i++] = sub_txn->xid;
    1273           0 :         snap->subxcnt++;
    1274             :     }
    1275             : 
    1276             :     /* sort so we can bsearch() later */
    1277           0 :     qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
    1278             : 
    1279             :     /* store the specified current CommandId */
    1280           0 :     snap->curcid = cid;
    1281             : 
    1282           0 :     return snap;
    1283             : }
    1284             : 
    1285             : /*
    1286             :  * Free a previously ReorderBufferCopySnap'ed snapshot
    1287             :  */
    1288             : static void
    1289           0 : ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
    1290             : {
    1291           0 :     if (snap->copied)
    1292           0 :         pfree(snap);
    1293             :     else
    1294           0 :         SnapBuildSnapDecRefcount(snap);
    1295           0 : }
    1296             : 
    1297             : /*
    1298             :  * Perform the replay of a transaction and it's non-aborted subtransactions.
    1299             :  *
    1300             :  * Subtransactions previously have to be processed by
    1301             :  * ReorderBufferCommitChild(), even if previously assigned to the toplevel
    1302             :  * transaction with ReorderBufferAssignChild.
    1303             :  *
    1304             :  * We currently can only decode a transaction's contents in when their commit
    1305             :  * record is read because that's currently the only place where we know about
    1306             :  * cache invalidations. Thus, once a toplevel commit is read, we iterate over
    1307             :  * the top and subtransactions (using a k-way merge) and replay the changes in
    1308             :  * lsn order.
    1309             :  */
    1310             : void
    1311           0 : ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
    1312             :                     XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
    1313             :                     TimestampTz commit_time,
    1314             :                     RepOriginId origin_id, XLogRecPtr origin_lsn)
    1315             : {
    1316             :     ReorderBufferTXN *txn;
    1317             :     volatile Snapshot snapshot_now;
    1318           0 :     volatile CommandId command_id = FirstCommandId;
    1319             :     bool        using_subtxn;
    1320           0 :     ReorderBufferIterTXNState *volatile iterstate = NULL;
    1321             : 
    1322           0 :     txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
    1323             :                                 false);
    1324             : 
    1325             :     /* unknown transaction, nothing to replay */
    1326           0 :     if (txn == NULL)
    1327           0 :         return;
    1328             : 
    1329           0 :     txn->final_lsn = commit_lsn;
    1330           0 :     txn->end_lsn = end_lsn;
    1331           0 :     txn->commit_time = commit_time;
    1332           0 :     txn->origin_id = origin_id;
    1333           0 :     txn->origin_lsn = origin_lsn;
    1334             : 
    1335             :     /*
    1336             :      * If this transaction didn't have any real changes in our database, it's
    1337             :      * OK not to have a snapshot. Note that ReorderBufferCommitChild will have
    1338             :      * transferred its snapshot to this transaction if it had one and the
    1339             :      * toplevel tx didn't.
    1340             :      */
    1341           0 :     if (txn->base_snapshot == NULL)
    1342             :     {
    1343           0 :         Assert(txn->ninvalidations == 0);
    1344           0 :         ReorderBufferCleanupTXN(rb, txn);
    1345           0 :         return;
    1346             :     }
    1347             : 
    1348           0 :     snapshot_now = txn->base_snapshot;
    1349             : 
    1350             :     /* build data to be able to lookup the CommandIds of catalog tuples */
    1351           0 :     ReorderBufferBuildTupleCidHash(rb, txn);
    1352             : 
    1353             :     /* setup the initial snapshot */
    1354           0 :     SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
    1355             : 
    1356             :     /*
    1357             :      * Decoding needs access to syscaches et al., which in turn use
    1358             :      * heavyweight locks and such. Thus we need to have enough state around to
    1359             :      * keep track of those.  The easiest way is to simply use a transaction
    1360             :      * internally.  That also allows us to easily enforce that nothing writes
    1361             :      * to the database by checking for xid assignments.
    1362             :      *
    1363             :      * When we're called via the SQL SRF there's already a transaction
    1364             :      * started, so start an explicit subtransaction there.
    1365             :      */
    1366           0 :     using_subtxn = IsTransactionOrTransactionBlock();
    1367             : 
    1368           0 :     PG_TRY();
    1369             :     {
    1370             :         ReorderBufferChange *change;
    1371           0 :         ReorderBufferChange *specinsert = NULL;
    1372             : 
    1373           0 :         if (using_subtxn)
    1374           0 :             BeginInternalSubTransaction("replay");
    1375             :         else
    1376           0 :             StartTransactionCommand();
    1377             : 
    1378           0 :         rb->begin(rb, txn);
    1379             : 
    1380           0 :         iterstate = ReorderBufferIterTXNInit(rb, txn);
    1381           0 :         while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
    1382             :         {
    1383           0 :             Relation    relation = NULL;
    1384             :             Oid         reloid;
    1385             : 
    1386           0 :             switch (change->action)
    1387             :             {
    1388             :                 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
    1389             : 
    1390             :                     /*
    1391             :                      * Confirmation for speculative insertion arrived. Simply
    1392             :                      * use as a normal record. It'll be cleaned up at the end
    1393             :                      * of INSERT processing.
    1394             :                      */
    1395           0 :                     Assert(specinsert->data.tp.oldtuple == NULL);
    1396           0 :                     change = specinsert;
    1397           0 :                     change->action = REORDER_BUFFER_CHANGE_INSERT;
    1398             : 
    1399             :                     /* intentionally fall through */
    1400             :                 case REORDER_BUFFER_CHANGE_INSERT:
    1401             :                 case REORDER_BUFFER_CHANGE_UPDATE:
    1402             :                 case REORDER_BUFFER_CHANGE_DELETE:
    1403           0 :                     Assert(snapshot_now);
    1404             : 
    1405           0 :                     reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
    1406             :                                                 change->data.tp.relnode.relNode);
    1407             : 
    1408             :                     /*
    1409             :                      * Catalog tuple without data, emitted while catalog was
    1410             :                      * in the process of being rewritten.
    1411             :                      */
    1412           0 :                     if (reloid == InvalidOid &&
    1413           0 :                         change->data.tp.newtuple == NULL &&
    1414           0 :                         change->data.tp.oldtuple == NULL)
    1415             :                         goto change_done;
    1416           0 :                     else if (reloid == InvalidOid)
    1417           0 :                         elog(ERROR, "could not map filenode \"%s\" to relation OID",
    1418             :                              relpathperm(change->data.tp.relnode,
    1419             :                                          MAIN_FORKNUM));
    1420             : 
    1421           0 :                     relation = RelationIdGetRelation(reloid);
    1422             : 
    1423           0 :                     if (relation == NULL)
    1424           0 :                         elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
    1425             :                              reloid,
    1426             :                              relpathperm(change->data.tp.relnode,
    1427             :                                          MAIN_FORKNUM));
    1428             : 
    1429           0 :                     if (!RelationIsLogicallyLogged(relation))
    1430             :                         goto change_done;
    1431             : 
    1432             :                     /*
    1433             :                      * For now ignore sequence changes entirely. Most of the
    1434             :                      * time they don't log changes using records we
    1435             :                      * understand, so it doesn't make sense to handle the few
    1436             :                      * cases we do.
    1437             :                      */
    1438           0 :                     if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
    1439           0 :                         goto change_done;
    1440             : 
    1441             :                     /* user-triggered change */
    1442           0 :                     if (!IsToastRelation(relation))
    1443             :                     {
    1444           0 :                         ReorderBufferToastReplace(rb, txn, relation, change);
    1445           0 :                         rb->apply_change(rb, txn, relation, change);
    1446             : 
    1447             :                         /*
    1448             :                          * Only clear reassembled toast chunks if we're sure
    1449             :                          * they're not required anymore. The creator of the
    1450             :                          * tuple tells us.
    1451             :                          */
    1452           0 :                         if (change->data.tp.clear_toast_afterwards)
    1453           0 :                             ReorderBufferToastReset(rb, txn);
    1454             :                     }
    1455             :                     /* we're not interested in toast deletions */
    1456           0 :                     else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
    1457             :                     {
    1458             :                         /*
    1459             :                          * Need to reassemble the full toasted Datum in
    1460             :                          * memory, to ensure the chunks don't get reused till
    1461             :                          * we're done remove it from the list of this
    1462             :                          * transaction's changes. Otherwise it will get
    1463             :                          * freed/reused while restoring spooled data from
    1464             :                          * disk.
    1465             :                          */
    1466           0 :                         dlist_delete(&change->node);
    1467           0 :                         ReorderBufferToastAppendChunk(rb, txn, relation,
    1468             :                                                       change);
    1469             :                     }
    1470             : 
    1471             :             change_done:
    1472             : 
    1473             :                     /*
    1474             :                      * Either speculative insertion was confirmed, or it was
    1475             :                      * unsuccessful and the record isn't needed anymore.
    1476             :                      */
    1477           0 :                     if (specinsert != NULL)
    1478             :                     {
    1479           0 :                         ReorderBufferReturnChange(rb, specinsert);
    1480           0 :                         specinsert = NULL;
    1481             :                     }
    1482             : 
    1483           0 :                     if (relation != NULL)
    1484             :                     {
    1485           0 :                         RelationClose(relation);
    1486           0 :                         relation = NULL;
    1487             :                     }
    1488           0 :                     break;
    1489             : 
    1490             :                 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
    1491             : 
    1492             :                     /*
    1493             :                      * Speculative insertions are dealt with by delaying the
    1494             :                      * processing of the insert until the confirmation record
    1495             :                      * arrives. For that we simply unlink the record from the
    1496             :                      * chain, so it does not get freed/reused while restoring
    1497             :                      * spooled data from disk.
    1498             :                      *
    1499             :                      * This is safe in the face of concurrent catalog changes
    1500             :                      * because the relevant relation can't be changed between
    1501             :                      * speculative insertion and confirmation due to
    1502             :                      * CheckTableNotInUse() and locking.
    1503             :                      */
    1504             : 
    1505             :                     /* clear out a pending (and thus failed) speculation */
    1506           0 :                     if (specinsert != NULL)
    1507             :                     {
    1508           0 :                         ReorderBufferReturnChange(rb, specinsert);
    1509           0 :                         specinsert = NULL;
    1510             :                     }
    1511             : 
    1512             :                     /* and memorize the pending insertion */
    1513           0 :                     dlist_delete(&change->node);
    1514           0 :                     specinsert = change;
    1515           0 :                     break;
    1516             : 
    1517             :                 case REORDER_BUFFER_CHANGE_MESSAGE:
    1518           0 :                     rb->message(rb, txn, change->lsn, true,
    1519           0 :                                 change->data.msg.prefix,
    1520             :                                 change->data.msg.message_size,
    1521           0 :                                 change->data.msg.message);
    1522           0 :                     break;
    1523             : 
    1524             :                 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
    1525             :                     /* get rid of the old */
    1526           0 :                     TeardownHistoricSnapshot(false);
    1527             : 
    1528           0 :                     if (snapshot_now->copied)
    1529             :                     {
    1530           0 :                         ReorderBufferFreeSnap(rb, snapshot_now);
    1531           0 :                         snapshot_now =
    1532           0 :                             ReorderBufferCopySnap(rb, change->data.snapshot,
    1533             :                                                   txn, command_id);
    1534             :                     }
    1535             : 
    1536             :                     /*
    1537             :                      * Restored from disk, need to be careful not to double
    1538             :                      * free. We could introduce refcounting for that, but for
    1539             :                      * now this seems infrequent enough not to care.
    1540             :                      */
    1541           0 :                     else if (change->data.snapshot->copied)
    1542             :                     {
    1543           0 :                         snapshot_now =
    1544           0 :                             ReorderBufferCopySnap(rb, change->data.snapshot,
    1545             :                                                   txn, command_id);
    1546             :                     }
    1547             :                     else
    1548             :                     {
    1549           0 :                         snapshot_now = change->data.snapshot;
    1550             :                     }
    1551             : 
    1552             : 
    1553             :                     /* and continue with the new one */
    1554           0 :                     SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
    1555           0 :                     break;
    1556             : 
    1557             :                 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
    1558           0 :                     Assert(change->data.command_id != InvalidCommandId);
    1559             : 
    1560           0 :                     if (command_id < change->data.command_id)
    1561             :                     {
    1562           0 :                         command_id = change->data.command_id;
    1563             : 
    1564           0 :                         if (!snapshot_now->copied)
    1565             :                         {
    1566             :                             /* we don't use the global one anymore */
    1567           0 :                             snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
    1568             :                                                                  txn, command_id);
    1569             :                         }
    1570             : 
    1571           0 :                         snapshot_now->curcid = command_id;
    1572             : 
    1573           0 :                         TeardownHistoricSnapshot(false);
    1574           0 :                         SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
    1575             : 
    1576             :                         /*
    1577             :                          * Every time the CommandId is incremented, we could
    1578             :                          * see new catalog contents, so execute all
    1579             :                          * invalidations.
    1580             :                          */
    1581           0 :                         ReorderBufferExecuteInvalidations(rb, txn);
    1582             :                     }
    1583             : 
    1584           0 :                     break;
    1585             : 
    1586             :                 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
    1587           0 :                     elog(ERROR, "tuplecid value in changequeue");
    1588             :                     break;
    1589             :             }
    1590             :         }
    1591             : 
    1592             :         /*
    1593             :          * There's a speculative insertion remaining, just clean in up, it
    1594             :          * can't have been successful, otherwise we'd gotten a confirmation
    1595             :          * record.
    1596             :          */
    1597           0 :         if (specinsert)
    1598             :         {
    1599           0 :             ReorderBufferReturnChange(rb, specinsert);
    1600           0 :             specinsert = NULL;
    1601             :         }
    1602             : 
    1603             :         /* clean up the iterator */
    1604           0 :         ReorderBufferIterTXNFinish(rb, iterstate);
    1605           0 :         iterstate = NULL;
    1606             : 
    1607             :         /* call commit callback */
    1608           0 :         rb->commit(rb, txn, commit_lsn);
    1609             : 
    1610             :         /* this is just a sanity check against bad output plugin behaviour */
    1611           0 :         if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
    1612           0 :             elog(ERROR, "output plugin used XID %u",
    1613             :                  GetCurrentTransactionId());
    1614             : 
    1615             :         /* cleanup */
    1616           0 :         TeardownHistoricSnapshot(false);
    1617             : 
    1618             :         /*
    1619             :          * Aborting the current (sub-)transaction as a whole has the right
    1620             :          * semantics. We want all locks acquired in here to be released, not
    1621             :          * reassigned to the parent and we do not want any database access
    1622             :          * have persistent effects.
    1623             :          */
    1624           0 :         AbortCurrentTransaction();
    1625             : 
    1626             :         /* make sure there's no cache pollution */
    1627           0 :         ReorderBufferExecuteInvalidations(rb, txn);
    1628             : 
    1629           0 :         if (using_subtxn)
    1630           0 :             RollbackAndReleaseCurrentSubTransaction();
    1631             : 
    1632           0 :         if (snapshot_now->copied)
    1633           0 :             ReorderBufferFreeSnap(rb, snapshot_now);
    1634             : 
    1635             :         /* remove potential on-disk data, and deallocate */
    1636           0 :         ReorderBufferCleanupTXN(rb, txn);
    1637             :     }
    1638           0 :     PG_CATCH();
    1639             :     {
    1640             :         /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
    1641           0 :         if (iterstate)
    1642           0 :             ReorderBufferIterTXNFinish(rb, iterstate);
    1643             : 
    1644           0 :         TeardownHistoricSnapshot(true);
    1645             : 
    1646             :         /*
    1647             :          * Force cache invalidation to happen outside of a valid transaction
    1648             :          * to prevent catalog access as we just caught an error.
    1649             :          */
    1650           0 :         AbortCurrentTransaction();
    1651             : 
    1652             :         /* make sure there's no cache pollution */
    1653           0 :         ReorderBufferExecuteInvalidations(rb, txn);
    1654             : 
    1655           0 :         if (using_subtxn)
    1656           0 :             RollbackAndReleaseCurrentSubTransaction();
    1657             : 
    1658           0 :         if (snapshot_now->copied)
    1659           0 :             ReorderBufferFreeSnap(rb, snapshot_now);
    1660             : 
    1661             :         /* remove potential on-disk data, and deallocate */
    1662           0 :         ReorderBufferCleanupTXN(rb, txn);
    1663             : 
    1664           0 :         PG_RE_THROW();
    1665             :     }
    1666           0 :     PG_END_TRY();
    1667             : }
    1668             : 
    1669             : /*
    1670             :  * Abort a transaction that possibly has previous changes. Needs to be first
    1671             :  * called for subtransactions and then for the toplevel xid.
    1672             :  *
    1673             :  * NB: Transactions handled here have to have actively aborted (i.e. have
    1674             :  * produced an abort record). Implicitly aborted transactions are handled via
    1675             :  * ReorderBufferAbortOld(); transactions we're just not interested in, but
    1676             :  * which have committed are handled in ReorderBufferForget().
    1677             :  *
    1678             :  * This function purges this transaction and its contents from memory and
    1679             :  * disk.
    1680             :  */
    1681             : void
    1682           0 : ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
    1683             : {
    1684             :     ReorderBufferTXN *txn;
    1685             : 
    1686           0 :     txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
    1687             :                                 false);
    1688             : 
    1689             :     /* unknown, nothing to remove */
    1690           0 :     if (txn == NULL)
    1691           0 :         return;
    1692             : 
    1693             :     /* cosmetic... */
    1694           0 :     txn->final_lsn = lsn;
    1695             : 
    1696             :     /* remove potential on-disk data, and deallocate */
    1697           0 :     ReorderBufferCleanupTXN(rb, txn);
    1698             : }
    1699             : 
    1700             : /*
    1701             :  * Abort all transactions that aren't actually running anymore because the
    1702             :  * server restarted.
    1703             :  *
    1704             :  * NB: These really have to be transactions that have aborted due to a server
    1705             :  * crash/immediate restart, as we don't deal with invalidations here.
    1706             :  */
    1707             : void
    1708           0 : ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
    1709             : {
    1710             :     dlist_mutable_iter it;
    1711             : 
    1712             :     /*
    1713             :      * Iterate through all (potential) toplevel TXNs and abort all that are
    1714             :      * older than what possibly can be running. Once we've found the first
    1715             :      * that is alive we stop, there might be some that acquired an xid earlier
    1716             :      * but started writing later, but it's unlikely and they will cleaned up
    1717             :      * in a later call to ReorderBufferAbortOld().
    1718             :      */
    1719           0 :     dlist_foreach_modify(it, &rb->toplevel_by_lsn)
    1720             :     {
    1721             :         ReorderBufferTXN *txn;
    1722             : 
    1723           0 :         txn = dlist_container(ReorderBufferTXN, node, it.cur);
    1724             : 
    1725           0 :         if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
    1726             :         {
    1727           0 :             elog(DEBUG2, "aborting old transaction %u", txn->xid);
    1728             : 
    1729             :             /* remove potential on-disk data, and deallocate this tx */
    1730           0 :             ReorderBufferCleanupTXN(rb, txn);
    1731             :         }
    1732             :         else
    1733           0 :             return;
    1734             :     }
    1735             : }
    1736             : 
    1737             : /*
    1738             :  * Forget the contents of a transaction if we aren't interested in it's
    1739             :  * contents. Needs to be first called for subtransactions and then for the
    1740             :  * toplevel xid.
    1741             :  *
    1742             :  * This is significantly different to ReorderBufferAbort() because
    1743             :  * transactions that have committed need to be treated differently from aborted
    1744             :  * ones since they may have modified the catalog.
    1745             :  *
    1746             :  * Note that this is only allowed to be called in the moment a transaction
    1747             :  * commit has just been read, not earlier; otherwise later records referring
    1748             :  * to this xid might re-create the transaction incompletely.
    1749             :  */
    1750             : void
    1751           0 : ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
    1752             : {
    1753             :     ReorderBufferTXN *txn;
    1754             : 
    1755           0 :     txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
    1756             :                                 false);
    1757             : 
    1758             :     /* unknown, nothing to forget */
    1759           0 :     if (txn == NULL)
    1760           0 :         return;
    1761             : 
    1762             :     /* cosmetic... */
    1763           0 :     txn->final_lsn = lsn;
    1764             : 
    1765             :     /*
    1766             :      * Process cache invalidation messages if there are any. Even if we're not
    1767             :      * interested in the transaction's contents, it could have manipulated the
    1768             :      * catalog and we need to update the caches according to that.
    1769             :      */
    1770           0 :     if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
    1771           0 :         ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
    1772             :                                            txn->invalidations);
    1773             :     else
    1774           0 :         Assert(txn->ninvalidations == 0);
    1775             : 
    1776             :     /* remove potential on-disk data, and deallocate */
    1777           0 :     ReorderBufferCleanupTXN(rb, txn);
    1778             : }
    1779             : 
    1780             : /*
    1781             :  * Execute invalidations happening outside the context of a decoded
    1782             :  * transaction. That currently happens either for xid-less commits
    1783             :  * (c.f. RecordTransactionCommit()) or for invalidations in uninteresting
    1784             :  * transactions (via ReorderBufferForget()).
    1785             :  */
    1786             : void
    1787           0 : ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
    1788             :                                    SharedInvalidationMessage *invalidations)
    1789             : {
    1790           0 :     bool        use_subtxn = IsTransactionOrTransactionBlock();
    1791             :     int         i;
    1792             : 
    1793           0 :     if (use_subtxn)
    1794           0 :         BeginInternalSubTransaction("replay");
    1795             : 
    1796             :     /*
    1797             :      * Force invalidations to happen outside of a valid transaction - that way
    1798             :      * entries will just be marked as invalid without accessing the catalog.
    1799             :      * That's advantageous because we don't need to setup the full state
    1800             :      * necessary for catalog access.
    1801             :      */
    1802           0 :     if (use_subtxn)
    1803           0 :         AbortCurrentTransaction();
    1804             : 
    1805           0 :     for (i = 0; i < ninvalidations; i++)
    1806           0 :         LocalExecuteInvalidationMessage(&invalidations[i]);
    1807             : 
    1808           0 :     if (use_subtxn)
    1809           0 :         RollbackAndReleaseCurrentSubTransaction();
    1810           0 : }
    1811             : 
    1812             : /*
    1813             :  * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
    1814             :  * least once for every xid in XLogRecord->xl_xid (other places in records
    1815             :  * may, but do not have to be passed through here).
    1816             :  *
    1817             :  * Reorderbuffer keeps some datastructures about transactions in LSN order,
    1818             :  * for efficiency. To do that it has to know about when transactions are seen
    1819             :  * first in the WAL. As many types of records are not actually interesting for
    1820             :  * logical decoding, they do not necessarily pass though here.
    1821             :  */
    1822             : void
    1823           0 : ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
    1824             : {
    1825             :     /* many records won't have an xid assigned, centralize check here */
    1826           0 :     if (xid != InvalidTransactionId)
    1827           0 :         ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
    1828           0 : }
    1829             : 
    1830             : /*
    1831             :  * Add a new snapshot to this transaction that may only used after lsn 'lsn'
    1832             :  * because the previous snapshot doesn't describe the catalog correctly for
    1833             :  * following rows.
    1834             :  */
    1835             : void
    1836           0 : ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid,
    1837             :                          XLogRecPtr lsn, Snapshot snap)
    1838             : {
    1839           0 :     ReorderBufferChange *change = ReorderBufferGetChange(rb);
    1840             : 
    1841           0 :     change->data.snapshot = snap;
    1842           0 :     change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT;
    1843             : 
    1844           0 :     ReorderBufferQueueChange(rb, xid, lsn, change);
    1845           0 : }
    1846             : 
    1847             : /*
    1848             :  * Setup the base snapshot of a transaction. The base snapshot is the snapshot
    1849             :  * that is used to decode all changes until either this transaction modifies
    1850             :  * the catalog or another catalog modifying transaction commits.
    1851             :  *
    1852             :  * Needs to be called before any changes are added with
    1853             :  * ReorderBufferQueueChange().
    1854             :  */
    1855             : void
    1856           0 : ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
    1857             :                              XLogRecPtr lsn, Snapshot snap)
    1858             : {
    1859             :     ReorderBufferTXN *txn;
    1860             :     bool        is_new;
    1861             : 
    1862           0 :     txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
    1863           0 :     Assert(txn->base_snapshot == NULL);
    1864           0 :     Assert(snap != NULL);
    1865             : 
    1866           0 :     txn->base_snapshot = snap;
    1867           0 :     txn->base_snapshot_lsn = lsn;
    1868           0 : }
    1869             : 
    1870             : /*
    1871             :  * Access the catalog with this CommandId at this point in the changestream.
    1872             :  *
    1873             :  * May only be called for command ids > 1
    1874             :  */
    1875             : void
    1876           0 : ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
    1877             :                              XLogRecPtr lsn, CommandId cid)
    1878             : {
    1879           0 :     ReorderBufferChange *change = ReorderBufferGetChange(rb);
    1880             : 
    1881           0 :     change->data.command_id = cid;
    1882           0 :     change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID;
    1883             : 
    1884           0 :     ReorderBufferQueueChange(rb, xid, lsn, change);
    1885           0 : }
    1886             : 
    1887             : 
    1888             : /*
    1889             :  * Add new (relfilenode, tid) -> (cmin, cmax) mappings.
    1890             :  */
    1891             : void
    1892           0 : ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
    1893             :                              XLogRecPtr lsn, RelFileNode node,
    1894             :                              ItemPointerData tid, CommandId cmin,
    1895             :                              CommandId cmax, CommandId combocid)
    1896             : {
    1897           0 :     ReorderBufferChange *change = ReorderBufferGetChange(rb);
    1898             :     ReorderBufferTXN *txn;
    1899             : 
    1900           0 :     txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
    1901             : 
    1902           0 :     change->data.tuplecid.node = node;
    1903           0 :     change->data.tuplecid.tid = tid;
    1904           0 :     change->data.tuplecid.cmin = cmin;
    1905           0 :     change->data.tuplecid.cmax = cmax;
    1906           0 :     change->data.tuplecid.combocid = combocid;
    1907           0 :     change->lsn = lsn;
    1908           0 :     change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID;
    1909             : 
    1910           0 :     dlist_push_tail(&txn->tuplecids, &change->node);
    1911           0 :     txn->ntuplecids++;
    1912           0 : }
    1913             : 
    1914             : /*
    1915             :  * Setup the invalidation of the toplevel transaction.
    1916             :  *
    1917             :  * This needs to be done before ReorderBufferCommit is called!
    1918             :  */
    1919             : void
    1920           0 : ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
    1921             :                               XLogRecPtr lsn, Size nmsgs,
    1922             :                               SharedInvalidationMessage *msgs)
    1923             : {
    1924             :     ReorderBufferTXN *txn;
    1925             : 
    1926           0 :     txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
    1927             : 
    1928           0 :     if (txn->ninvalidations != 0)
    1929           0 :         elog(ERROR, "only ever add one set of invalidations");
    1930             : 
    1931           0 :     Assert(nmsgs > 0);
    1932             : 
    1933           0 :     txn->ninvalidations = nmsgs;
    1934           0 :     txn->invalidations = (SharedInvalidationMessage *)
    1935           0 :         MemoryContextAlloc(rb->context,
    1936             :                            sizeof(SharedInvalidationMessage) * nmsgs);
    1937           0 :     memcpy(txn->invalidations, msgs,
    1938             :            sizeof(SharedInvalidationMessage) * nmsgs);
    1939           0 : }
    1940             : 
    1941             : /*
    1942             :  * Apply all invalidations we know. Possibly we only need parts at this point
    1943             :  * in the changestream but we don't know which those are.
    1944             :  */
    1945             : static void
    1946           0 : ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
    1947             : {
    1948             :     int         i;
    1949             : 
    1950           0 :     for (i = 0; i < txn->ninvalidations; i++)
    1951           0 :         LocalExecuteInvalidationMessage(&txn->invalidations[i]);
    1952           0 : }
    1953             : 
    1954             : /*
    1955             :  * Mark a transaction as containing catalog changes
    1956             :  */
    1957             : void
    1958           0 : ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
    1959             :                                   XLogRecPtr lsn)
    1960             : {
    1961             :     ReorderBufferTXN *txn;
    1962             : 
    1963           0 :     txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
    1964             : 
    1965           0 :     txn->has_catalog_changes = true;
    1966           0 : }
    1967             : 
    1968             : /*
    1969             :  * Query whether a transaction is already *known* to contain catalog
    1970             :  * changes. This can be wrong until directly before the commit!
    1971             :  */
    1972             : bool
    1973           0 : ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
    1974             : {
    1975             :     ReorderBufferTXN *txn;
    1976             : 
    1977           0 :     txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
    1978             :                                 false);
    1979           0 :     if (txn == NULL)
    1980           0 :         return false;
    1981             : 
    1982           0 :     return txn->has_catalog_changes;
    1983             : }
    1984             : 
    1985             : /*
    1986             :  * Have we already added the first snapshot?
    1987             :  */
    1988             : bool
    1989           0 : ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
    1990             : {
    1991             :     ReorderBufferTXN *txn;
    1992             : 
    1993           0 :     txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
    1994             :                                 false);
    1995             : 
    1996             :     /* transaction isn't known yet, ergo no snapshot */
    1997           0 :     if (txn == NULL)
    1998           0 :         return false;
    1999             : 
    2000             :     /*
    2001             :      * TODO: It would be a nice improvement if we would check the toplevel
    2002             :      * transaction in subtransactions, but we'd need to keep track of a bit
    2003             :      * more state.
    2004             :      */
    2005           0 :     return txn->base_snapshot != NULL;
    2006             : }
    2007             : 
    2008             : 
    2009             : /*
    2010             :  * ---------------------------------------
    2011             :  * Disk serialization support
    2012             :  * ---------------------------------------
    2013             :  */
    2014             : 
    2015             : /*
    2016             :  * Ensure the IO buffer is >= sz.
    2017             :  */
    2018             : static void
    2019           0 : ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
    2020             : {
    2021           0 :     if (!rb->outbufsize)
    2022             :     {
    2023           0 :         rb->outbuf = MemoryContextAlloc(rb->context, sz);
    2024           0 :         rb->outbufsize = sz;
    2025             :     }
    2026           0 :     else if (rb->outbufsize < sz)
    2027             :     {
    2028           0 :         rb->outbuf = repalloc(rb->outbuf, sz);
    2029           0 :         rb->outbufsize = sz;
    2030             :     }
    2031           0 : }
    2032             : 
    2033             : /*
    2034             :  * Check whether the transaction tx should spill its data to disk.
    2035             :  */
    2036             : static void
    2037           0 : ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
    2038             : {
    2039             :     /*
    2040             :      * TODO: improve accounting so we cheaply can take subtransactions into
    2041             :      * account here.
    2042             :      */
    2043           0 :     if (txn->nentries_mem >= max_changes_in_memory)
    2044             :     {
    2045           0 :         ReorderBufferSerializeTXN(rb, txn);
    2046           0 :         Assert(txn->nentries_mem == 0);
    2047             :     }
    2048           0 : }
    2049             : 
    2050             : /*
    2051             :  * Spill data of a large transaction (and its subtransactions) to disk.
    2052             :  */
    2053             : static void
    2054           0 : ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
    2055             : {
    2056             :     dlist_iter  subtxn_i;
    2057             :     dlist_mutable_iter change_i;
    2058           0 :     int         fd = -1;
    2059           0 :     XLogSegNo   curOpenSegNo = 0;
    2060           0 :     Size        spilled = 0;
    2061             :     char        path[MAXPGPATH];
    2062             : 
    2063           0 :     elog(DEBUG2, "spill %u changes in XID %u to disk",
    2064             :          (uint32) txn->nentries_mem, txn->xid);
    2065             : 
    2066             :     /* do the same to all child TXs */
    2067           0 :     dlist_foreach(subtxn_i, &txn->subtxns)
    2068             :     {
    2069             :         ReorderBufferTXN *subtxn;
    2070             : 
    2071           0 :         subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
    2072           0 :         ReorderBufferSerializeTXN(rb, subtxn);
    2073             :     }
    2074             : 
    2075             :     /* serialize changestream */
    2076           0 :     dlist_foreach_modify(change_i, &txn->changes)
    2077             :     {
    2078             :         ReorderBufferChange *change;
    2079             : 
    2080           0 :         change = dlist_container(ReorderBufferChange, node, change_i.cur);
    2081             : 
    2082             :         /*
    2083             :          * store in segment in which it belongs by start lsn, don't split over
    2084             :          * multiple segments tho
    2085             :          */
    2086           0 :         if (fd == -1 || !XLByteInSeg(change->lsn, curOpenSegNo))
    2087             :         {
    2088             :             XLogRecPtr  recptr;
    2089             : 
    2090           0 :             if (fd != -1)
    2091           0 :                 CloseTransientFile(fd);
    2092             : 
    2093           0 :             XLByteToSeg(change->lsn, curOpenSegNo);
    2094           0 :             XLogSegNoOffsetToRecPtr(curOpenSegNo, 0, recptr);
    2095             : 
    2096             :             /*
    2097             :              * No need to care about TLIs here, only used during a single run,
    2098             :              * so each LSN only maps to a specific WAL record.
    2099             :              */
    2100           0 :             sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
    2101           0 :                     NameStr(MyReplicationSlot->data.name), txn->xid,
    2102           0 :                     (uint32) (recptr >> 32), (uint32) recptr);
    2103             : 
    2104             :             /* open segment, create it if necessary */
    2105           0 :             fd = OpenTransientFile(path,
    2106             :                                    O_CREAT | O_WRONLY | O_APPEND | PG_BINARY,
    2107             :                                    S_IRUSR | S_IWUSR);
    2108             : 
    2109           0 :             if (fd < 0)
    2110           0 :                 ereport(ERROR,
    2111             :                         (errcode_for_file_access(),
    2112             :                          errmsg("could not open file \"%s\": %m",
    2113             :                                 path)));
    2114             :         }
    2115             : 
    2116           0 :         ReorderBufferSerializeChange(rb, txn, fd, change);
    2117           0 :         dlist_delete(&change->node);
    2118           0 :         ReorderBufferReturnChange(rb, change);
    2119             : 
    2120           0 :         spilled++;
    2121             :     }
    2122             : 
    2123           0 :     Assert(spilled == txn->nentries_mem);
    2124           0 :     Assert(dlist_is_empty(&txn->changes));
    2125           0 :     txn->nentries_mem = 0;
    2126           0 :     txn->serialized = true;
    2127             : 
    2128           0 :     if (fd != -1)
    2129           0 :         CloseTransientFile(fd);
    2130           0 : }
    2131             : 
    2132             : /*
    2133             :  * Serialize individual change to disk.
    2134             :  */
    2135             : static void
    2136           0 : ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
    2137             :                              int fd, ReorderBufferChange *change)
    2138             : {
    2139             :     ReorderBufferDiskChange *ondisk;
    2140           0 :     Size        sz = sizeof(ReorderBufferDiskChange);
    2141             : 
    2142           0 :     ReorderBufferSerializeReserve(rb, sz);
    2143             : 
    2144           0 :     ondisk = (ReorderBufferDiskChange *) rb->outbuf;
    2145           0 :     memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
    2146             : 
    2147           0 :     switch (change->action)
    2148             :     {
    2149             :             /* fall through these, they're all similar enough */
    2150             :         case REORDER_BUFFER_CHANGE_INSERT:
    2151             :         case REORDER_BUFFER_CHANGE_UPDATE:
    2152             :         case REORDER_BUFFER_CHANGE_DELETE:
    2153             :         case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
    2154             :             {
    2155             :                 char       *data;
    2156             :                 ReorderBufferTupleBuf *oldtup,
    2157             :                            *newtup;
    2158           0 :                 Size        oldlen = 0;
    2159           0 :                 Size        newlen = 0;
    2160             : 
    2161           0 :                 oldtup = change->data.tp.oldtuple;
    2162           0 :                 newtup = change->data.tp.newtuple;
    2163             : 
    2164           0 :                 if (oldtup)
    2165             :                 {
    2166           0 :                     sz += sizeof(HeapTupleData);
    2167           0 :                     oldlen = oldtup->tuple.t_len;
    2168           0 :                     sz += oldlen;
    2169             :                 }
    2170             : 
    2171           0 :                 if (newtup)
    2172             :                 {
    2173           0 :                     sz += sizeof(HeapTupleData);
    2174           0 :                     newlen = newtup->tuple.t_len;
    2175           0 :                     sz += newlen;
    2176             :                 }
    2177             : 
    2178             :                 /* make sure we have enough space */
    2179           0 :                 ReorderBufferSerializeReserve(rb, sz);
    2180             : 
    2181           0 :                 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
    2182             :                 /* might have been reallocated above */
    2183           0 :                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
    2184             : 
    2185           0 :                 if (oldlen)
    2186             :                 {
    2187           0 :                     memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
    2188           0 :                     data += sizeof(HeapTupleData);
    2189             : 
    2190           0 :                     memcpy(data, oldtup->tuple.t_data, oldlen);
    2191           0 :                     data += oldlen;
    2192             :                 }
    2193             : 
    2194           0 :                 if (newlen)
    2195             :                 {
    2196           0 :                     memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
    2197           0 :                     data += sizeof(HeapTupleData);
    2198             : 
    2199           0 :                     memcpy(data, newtup->tuple.t_data, newlen);
    2200           0 :                     data += newlen;
    2201             :                 }
    2202           0 :                 break;
    2203             :             }
    2204             :         case REORDER_BUFFER_CHANGE_MESSAGE:
    2205             :             {
    2206             :                 char       *data;
    2207           0 :                 Size        prefix_size = strlen(change->data.msg.prefix) + 1;
    2208             : 
    2209           0 :                 sz += prefix_size + change->data.msg.message_size +
    2210             :                     sizeof(Size) + sizeof(Size);
    2211           0 :                 ReorderBufferSerializeReserve(rb, sz);
    2212             : 
    2213           0 :                 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
    2214             : 
    2215             :                 /* might have been reallocated above */
    2216           0 :                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
    2217             : 
    2218             :                 /* write the prefix including the size */
    2219           0 :                 memcpy(data, &prefix_size, sizeof(Size));
    2220           0 :                 data += sizeof(Size);
    2221           0 :                 memcpy(data, change->data.msg.prefix,
    2222             :                        prefix_size);
    2223           0 :                 data += prefix_size;
    2224             : 
    2225             :                 /* write the message including the size */
    2226           0 :                 memcpy(data, &change->data.msg.message_size, sizeof(Size));
    2227           0 :                 data += sizeof(Size);
    2228           0 :                 memcpy(data, change->data.msg.message,
    2229             :                        change->data.msg.message_size);
    2230           0 :                 data += change->data.msg.message_size;
    2231             : 
    2232           0 :                 break;
    2233             :             }
    2234             :         case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
    2235             :             {
    2236             :                 Snapshot    snap;
    2237             :                 char       *data;
    2238             : 
    2239           0 :                 snap = change->data.snapshot;
    2240             : 
    2241           0 :                 sz += sizeof(SnapshotData) +
    2242           0 :                     sizeof(TransactionId) * snap->xcnt +
    2243           0 :                     sizeof(TransactionId) * snap->subxcnt
    2244             :                     ;
    2245             : 
    2246             :                 /* make sure we have enough space */
    2247           0 :                 ReorderBufferSerializeReserve(rb, sz);
    2248           0 :                 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
    2249             :                 /* might have been reallocated above */
    2250           0 :                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
    2251             : 
    2252           0 :                 memcpy(data, snap, sizeof(SnapshotData));
    2253           0 :                 data += sizeof(SnapshotData);
    2254             : 
    2255           0 :                 if (snap->xcnt)
    2256             :                 {
    2257           0 :                     memcpy(data, snap->xip,
    2258           0 :                            sizeof(TransactionId) * snap->xcnt);
    2259           0 :                     data += sizeof(TransactionId) * snap->xcnt;
    2260             :                 }
    2261             : 
    2262           0 :                 if (snap->subxcnt)
    2263             :                 {
    2264           0 :                     memcpy(data, snap->subxip,
    2265           0 :                            sizeof(TransactionId) * snap->subxcnt);
    2266           0 :                     data += sizeof(TransactionId) * snap->subxcnt;
    2267             :                 }
    2268           0 :                 break;
    2269             :             }
    2270             :         case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
    2271             :         case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
    2272             :         case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
    2273             :             /* ReorderBufferChange contains everything important */
    2274           0 :             break;
    2275             :     }
    2276             : 
    2277           0 :     ondisk->size = sz;
    2278             : 
    2279           0 :     pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
    2280           0 :     if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
    2281             :     {
    2282           0 :         int         save_errno = errno;
    2283             : 
    2284           0 :         CloseTransientFile(fd);
    2285           0 :         errno = save_errno;
    2286           0 :         ereport(ERROR,
    2287             :                 (errcode_for_file_access(),
    2288             :                  errmsg("could not write to data file for XID %u: %m",
    2289             :                         txn->xid)));
    2290             :     }
    2291           0 :     pgstat_report_wait_end();
    2292             : 
    2293           0 :     Assert(ondisk->change.action == change->action);
    2294           0 : }
    2295             : 
    2296             : /*
    2297             :  * Restore a number of changes spilled to disk back into memory.
    2298             :  */
    2299             : static Size
    2300           0 : ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
    2301             :                             int *fd, XLogSegNo *segno)
    2302             : {
    2303           0 :     Size        restored = 0;
    2304             :     XLogSegNo   last_segno;
    2305             :     dlist_mutable_iter cleanup_iter;
    2306             : 
    2307           0 :     Assert(txn->first_lsn != InvalidXLogRecPtr);
    2308           0 :     Assert(txn->final_lsn != InvalidXLogRecPtr);
    2309             : 
    2310             :     /* free current entries, so we have memory for more */
    2311           0 :     dlist_foreach_modify(cleanup_iter, &txn->changes)
    2312             :     {
    2313           0 :         ReorderBufferChange *cleanup =
    2314           0 :         dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
    2315             : 
    2316           0 :         dlist_delete(&cleanup->node);
    2317           0 :         ReorderBufferReturnChange(rb, cleanup);
    2318             :     }
    2319           0 :     txn->nentries_mem = 0;
    2320           0 :     Assert(dlist_is_empty(&txn->changes));
    2321             : 
    2322           0 :     XLByteToSeg(txn->final_lsn, last_segno);
    2323             : 
    2324           0 :     while (restored < max_changes_in_memory && *segno <= last_segno)
    2325             :     {
    2326             :         int         readBytes;
    2327             :         ReorderBufferDiskChange *ondisk;
    2328             : 
    2329           0 :         if (*fd == -1)
    2330             :         {
    2331             :             XLogRecPtr  recptr;
    2332             :             char        path[MAXPGPATH];
    2333             : 
    2334             :             /* first time in */
    2335           0 :             if (*segno == 0)
    2336             :             {
    2337           0 :                 XLByteToSeg(txn->first_lsn, *segno);
    2338             :             }
    2339             : 
    2340           0 :             Assert(*segno != 0 || dlist_is_empty(&txn->changes));
    2341           0 :             XLogSegNoOffsetToRecPtr(*segno, 0, recptr);
    2342             : 
    2343             :             /*
    2344             :              * No need to care about TLIs here, only used during a single run,
    2345             :              * so each LSN only maps to a specific WAL record.
    2346             :              */
    2347           0 :             sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
    2348           0 :                     NameStr(MyReplicationSlot->data.name), txn->xid,
    2349           0 :                     (uint32) (recptr >> 32), (uint32) recptr);
    2350             : 
    2351           0 :             *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
    2352           0 :             if (*fd < 0 && errno == ENOENT)
    2353             :             {
    2354           0 :                 *fd = -1;
    2355           0 :                 (*segno)++;
    2356           0 :                 continue;
    2357             :             }
    2358           0 :             else if (*fd < 0)
    2359           0 :                 ereport(ERROR,
    2360             :                         (errcode_for_file_access(),
    2361             :                          errmsg("could not open file \"%s\": %m",
    2362             :                                 path)));
    2363             : 
    2364             :         }
    2365             : 
    2366             :         /*
    2367             :          * Read the statically sized part of a change which has information
    2368             :          * about the total size. If we couldn't read a record, we're at the
    2369             :          * end of this file.
    2370             :          */
    2371           0 :         ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
    2372           0 :         pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
    2373           0 :         readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
    2374           0 :         pgstat_report_wait_end();
    2375             : 
    2376             :         /* eof */
    2377           0 :         if (readBytes == 0)
    2378             :         {
    2379           0 :             CloseTransientFile(*fd);
    2380           0 :             *fd = -1;
    2381           0 :             (*segno)++;
    2382           0 :             continue;
    2383             :         }
    2384           0 :         else if (readBytes < 0)
    2385           0 :             ereport(ERROR,
    2386             :                     (errcode_for_file_access(),
    2387             :                      errmsg("could not read from reorderbuffer spill file: %m")));
    2388           0 :         else if (readBytes != sizeof(ReorderBufferDiskChange))
    2389           0 :             ereport(ERROR,
    2390             :                     (errcode_for_file_access(),
    2391             :                      errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
    2392             :                             readBytes,
    2393             :                             (uint32) sizeof(ReorderBufferDiskChange))));
    2394             : 
    2395           0 :         ondisk = (ReorderBufferDiskChange *) rb->outbuf;
    2396             : 
    2397           0 :         ReorderBufferSerializeReserve(rb,
    2398           0 :                                       sizeof(ReorderBufferDiskChange) + ondisk->size);
    2399           0 :         ondisk = (ReorderBufferDiskChange *) rb->outbuf;
    2400             : 
    2401           0 :         pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
    2402           0 :         readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
    2403           0 :                          ondisk->size - sizeof(ReorderBufferDiskChange));
    2404           0 :         pgstat_report_wait_end();
    2405             : 
    2406           0 :         if (readBytes < 0)
    2407           0 :             ereport(ERROR,
    2408             :                     (errcode_for_file_access(),
    2409             :                      errmsg("could not read from reorderbuffer spill file: %m")));
    2410           0 :         else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
    2411           0 :             ereport(ERROR,
    2412             :                     (errcode_for_file_access(),
    2413             :                      errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
    2414             :                             readBytes,
    2415             :                             (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
    2416             : 
    2417             :         /*
    2418             :          * ok, read a full change from disk, now restore it into proper
    2419             :          * in-memory format
    2420             :          */
    2421           0 :         ReorderBufferRestoreChange(rb, txn, rb->outbuf);
    2422           0 :         restored++;
    2423             :     }
    2424             : 
    2425           0 :     return restored;
    2426             : }
    2427             : 
    2428             : /*
    2429             :  * Convert change from its on-disk format to in-memory format and queue it onto
    2430             :  * the TXN's ->changes list.
    2431             :  *
    2432             :  * Note: although "data" is declared char*, at entry it points to a
    2433             :  * maxalign'd buffer, making it safe in most of this function to assume
    2434             :  * that the pointed-to data is suitably aligned for direct access.
    2435             :  */
    2436             : static void
    2437           0 : ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
    2438             :                            char *data)
    2439             : {
    2440             :     ReorderBufferDiskChange *ondisk;
    2441             :     ReorderBufferChange *change;
    2442             : 
    2443           0 :     ondisk = (ReorderBufferDiskChange *) data;
    2444             : 
    2445           0 :     change = ReorderBufferGetChange(rb);
    2446             : 
    2447             :     /* copy static part */
    2448           0 :     memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
    2449             : 
    2450           0 :     data += sizeof(ReorderBufferDiskChange);
    2451             : 
    2452             :     /* restore individual stuff */
    2453           0 :     switch (change->action)
    2454             :     {
    2455             :             /* fall through these, they're all similar enough */
    2456             :         case REORDER_BUFFER_CHANGE_INSERT:
    2457             :         case REORDER_BUFFER_CHANGE_UPDATE:
    2458             :         case REORDER_BUFFER_CHANGE_DELETE:
    2459             :         case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
    2460           0 :             if (change->data.tp.oldtuple)
    2461             :             {
    2462           0 :                 uint32      tuplelen = ((HeapTuple) data)->t_len;
    2463             : 
    2464           0 :                 change->data.tp.oldtuple =
    2465           0 :                     ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
    2466             : 
    2467             :                 /* restore ->tuple */
    2468           0 :                 memcpy(&change->data.tp.oldtuple->tuple, data,
    2469             :                        sizeof(HeapTupleData));
    2470           0 :                 data += sizeof(HeapTupleData);
    2471             : 
    2472             :                 /* reset t_data pointer into the new tuplebuf */
    2473           0 :                 change->data.tp.oldtuple->tuple.t_data =
    2474           0 :                     ReorderBufferTupleBufData(change->data.tp.oldtuple);
    2475             : 
    2476             :                 /* restore tuple data itself */
    2477           0 :                 memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
    2478           0 :                 data += tuplelen;
    2479             :             }
    2480             : 
    2481           0 :             if (change->data.tp.newtuple)
    2482             :             {
    2483             :                 /* here, data might not be suitably aligned! */
    2484             :                 uint32      tuplelen;
    2485             : 
    2486           0 :                 memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
    2487             :                        sizeof(uint32));
    2488             : 
    2489           0 :                 change->data.tp.newtuple =
    2490           0 :                     ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
    2491             : 
    2492             :                 /* restore ->tuple */
    2493           0 :                 memcpy(&change->data.tp.newtuple->tuple, data,
    2494             :                        sizeof(HeapTupleData));
    2495           0 :                 data += sizeof(HeapTupleData);
    2496             : 
    2497             :                 /* reset t_data pointer into the new tuplebuf */
    2498           0 :                 change->data.tp.newtuple->tuple.t_data =
    2499           0 :                     ReorderBufferTupleBufData(change->data.tp.newtuple);
    2500             : 
    2501             :                 /* restore tuple data itself */
    2502           0 :                 memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
    2503           0 :                 data += tuplelen;
    2504             :             }
    2505             : 
    2506           0 :             break;
    2507             :         case REORDER_BUFFER_CHANGE_MESSAGE:
    2508             :             {
    2509             :                 Size        prefix_size;
    2510             : 
    2511             :                 /* read prefix */
    2512           0 :                 memcpy(&prefix_size, data, sizeof(Size));
    2513           0 :                 data += sizeof(Size);
    2514           0 :                 change->data.msg.prefix = MemoryContextAlloc(rb->context,
    2515             :                                                              prefix_size);
    2516           0 :                 memcpy(change->data.msg.prefix, data, prefix_size);
    2517           0 :                 Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
    2518           0 :                 data += prefix_size;
    2519             : 
    2520             :                 /* read the message */
    2521           0 :                 memcpy(&change->data.msg.message_size, data, sizeof(Size));
    2522           0 :                 data += sizeof(Size);
    2523           0 :                 change->data.msg.message = MemoryContextAlloc(rb->context,
    2524             :                                                               change->data.msg.message_size);
    2525           0 :                 memcpy(change->data.msg.message, data,
    2526             :                        change->data.msg.message_size);
    2527           0 :                 data += change->data.msg.message_size;
    2528             : 
    2529           0 :                 break;
    2530             :             }
    2531             :         case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
    2532             :             {
    2533             :                 Snapshot    oldsnap;
    2534             :                 Snapshot    newsnap;
    2535             :                 Size        size;
    2536             : 
    2537           0 :                 oldsnap = (Snapshot) data;
    2538             : 
    2539           0 :                 size = sizeof(SnapshotData) +
    2540           0 :                     sizeof(TransactionId) * oldsnap->xcnt +
    2541           0 :                     sizeof(TransactionId) * (oldsnap->subxcnt + 0);
    2542             : 
    2543           0 :                 change->data.snapshot = MemoryContextAllocZero(rb->context, size);
    2544             : 
    2545           0 :                 newsnap = change->data.snapshot;
    2546             : 
    2547           0 :                 memcpy(newsnap, data, size);
    2548           0 :                 newsnap->xip = (TransactionId *)
    2549             :                     (((char *) newsnap) + sizeof(SnapshotData));
    2550           0 :                 newsnap->subxip = newsnap->xip + newsnap->xcnt;
    2551           0 :                 newsnap->copied = true;
    2552           0 :                 break;
    2553             :             }
    2554             :             /* the base struct contains all the data, easy peasy */
    2555             :         case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
    2556             :         case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
    2557             :         case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
    2558           0 :             break;
    2559             :     }
    2560             : 
    2561           0 :     dlist_push_tail(&txn->changes, &change->node);
    2562           0 :     txn->nentries_mem++;
    2563           0 : }
    2564             : 
    2565             : /*
    2566             :  * Remove all on-disk stored for the passed in transaction.
    2567             :  */
    2568             : static void
    2569           0 : ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
    2570             : {
    2571             :     XLogSegNo   first;
    2572             :     XLogSegNo   cur;
    2573             :     XLogSegNo   last;
    2574             : 
    2575           0 :     Assert(txn->first_lsn != InvalidXLogRecPtr);
    2576           0 :     Assert(txn->final_lsn != InvalidXLogRecPtr);
    2577             : 
    2578           0 :     XLByteToSeg(txn->first_lsn, first);
    2579           0 :     XLByteToSeg(txn->final_lsn, last);
    2580             : 
    2581             :     /* iterate over all possible filenames, and delete them */
    2582           0 :     for (cur = first; cur <= last; cur++)
    2583             :     {
    2584             :         char        path[MAXPGPATH];
    2585             :         XLogRecPtr  recptr;
    2586             : 
    2587           0 :         XLogSegNoOffsetToRecPtr(cur, 0, recptr);
    2588             : 
    2589           0 :         sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
    2590           0 :                 NameStr(MyReplicationSlot->data.name), txn->xid,
    2591           0 :                 (uint32) (recptr >> 32), (uint32) recptr);
    2592           0 :         if (unlink(path) != 0 && errno != ENOENT)
    2593           0 :             ereport(ERROR,
    2594             :                     (errcode_for_file_access(),
    2595             :                      errmsg("could not remove file \"%s\": %m", path)));
    2596             :     }
    2597           0 : }
    2598             : 
    2599             : /*
    2600             :  * Delete all data spilled to disk after we've restarted/crashed. It will be
    2601             :  * recreated when the respective slots are reused.
    2602             :  */
    2603             : void
    2604           6 : StartupReorderBuffer(void)
    2605             : {
    2606             :     DIR        *logical_dir;
    2607             :     struct dirent *logical_de;
    2608             : 
    2609             :     DIR        *spill_dir;
    2610             :     struct dirent *spill_de;
    2611             : 
    2612           6 :     logical_dir = AllocateDir("pg_replslot");
    2613          24 :     while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
    2614             :     {
    2615             :         struct stat statbuf;
    2616             :         char        path[MAXPGPATH * 2 + 12];
    2617             : 
    2618          18 :         if (strcmp(logical_de->d_name, ".") == 0 ||
    2619           6 :             strcmp(logical_de->d_name, "..") == 0)
    2620          24 :             continue;
    2621             : 
    2622             :         /* if it cannot be a slot, skip the directory */
    2623           0 :         if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
    2624           0 :             continue;
    2625             : 
    2626             :         /*
    2627             :          * ok, has to be a surviving logical slot, iterate and delete
    2628             :          * everything starting with xid-*
    2629             :          */
    2630           0 :         sprintf(path, "pg_replslot/%s", logical_de->d_name);
    2631             : 
    2632             :         /* we're only creating directories here, skip if it's not our's */
    2633           0 :         if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
    2634           0 :             continue;
    2635             : 
    2636           0 :         spill_dir = AllocateDir(path);
    2637           0 :         while ((spill_de = ReadDir(spill_dir, path)) != NULL)
    2638             :         {
    2639           0 :             if (strcmp(spill_de->d_name, ".") == 0 ||
    2640           0 :                 strcmp(spill_de->d_name, "..") == 0)
    2641           0 :                 continue;
    2642             : 
    2643             :             /* only look at names that can be ours */
    2644           0 :             if (strncmp(spill_de->d_name, "xid", 3) == 0)
    2645             :             {
    2646           0 :                 sprintf(path, "pg_replslot/%s/%s", logical_de->d_name,
    2647           0 :                         spill_de->d_name);
    2648             : 
    2649           0 :                 if (unlink(path) != 0)
    2650           0 :                     ereport(PANIC,
    2651             :                             (errcode_for_file_access(),
    2652             :                              errmsg("could not remove file \"%s\": %m",
    2653             :                                     path)));
    2654             :             }
    2655             :         }
    2656           0 :         FreeDir(spill_dir);
    2657             :     }
    2658           6 :     FreeDir(logical_dir);
    2659           6 : }
    2660             : 
    2661             : /* ---------------------------------------
    2662             :  * toast reassembly support
    2663             :  * ---------------------------------------
    2664             :  */
    2665             : 
    2666             : /*
    2667             :  * Initialize per tuple toast reconstruction support.
    2668             :  */
    2669             : static void
    2670           0 : ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
    2671             : {
    2672             :     HASHCTL     hash_ctl;
    2673             : 
    2674           0 :     Assert(txn->toast_hash == NULL);
    2675             : 
    2676           0 :     memset(&hash_ctl, 0, sizeof(hash_ctl));
    2677           0 :     hash_ctl.keysize = sizeof(Oid);
    2678           0 :     hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
    2679           0 :     hash_ctl.hcxt = rb->context;
    2680           0 :     txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
    2681             :                                   HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
    2682           0 : }
    2683             : 
    2684             : /*
    2685             :  * Per toast-chunk handling for toast reconstruction
    2686             :  *
    2687             :  * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
    2688             :  * toasted Datum comes along.
    2689             :  */
    2690             : static void
    2691           0 : ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
    2692             :                               Relation relation, ReorderBufferChange *change)
    2693             : {
    2694             :     ReorderBufferToastEnt *ent;
    2695             :     ReorderBufferTupleBuf *newtup;
    2696             :     bool        found;
    2697             :     int32       chunksize;
    2698             :     bool        isnull;
    2699             :     Pointer     chunk;
    2700           0 :     TupleDesc   desc = RelationGetDescr(relation);
    2701             :     Oid         chunk_id;
    2702             :     int32       chunk_seq;
    2703             : 
    2704           0 :     if (txn->toast_hash == NULL)
    2705           0 :         ReorderBufferToastInitHash(rb, txn);
    2706             : 
    2707           0 :     Assert(IsToastRelation(relation));
    2708             : 
    2709           0 :     newtup = change->data.tp.newtuple;
    2710           0 :     chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
    2711           0 :     Assert(!isnull);
    2712           0 :     chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
    2713           0 :     Assert(!isnull);
    2714             : 
    2715           0 :     ent = (ReorderBufferToastEnt *)
    2716           0 :         hash_search(txn->toast_hash,
    2717             :                     (void *) &chunk_id,
    2718             :                     HASH_ENTER,
    2719             :                     &found);
    2720             : 
    2721           0 :     if (!found)
    2722             :     {
    2723           0 :         Assert(ent->chunk_id == chunk_id);
    2724           0 :         ent->num_chunks = 0;
    2725           0 :         ent->last_chunk_seq = 0;
    2726           0 :         ent->size = 0;
    2727           0 :         ent->reconstructed = NULL;
    2728           0 :         dlist_init(&ent->chunks);
    2729             : 
    2730           0 :         if (chunk_seq != 0)
    2731           0 :             elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
    2732             :                  chunk_seq, chunk_id);
    2733             :     }
    2734           0 :     else if (found && chunk_seq != ent->last_chunk_seq + 1)
    2735           0 :         elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
    2736             :              chunk_seq, chunk_id, ent->last_chunk_seq + 1);
    2737             : 
    2738           0 :     chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
    2739           0 :     Assert(!isnull);
    2740             : 
    2741             :     /* calculate size so we can allocate the right size at once later */
    2742           0 :     if (!VARATT_IS_EXTENDED(chunk))
    2743           0 :         chunksize = VARSIZE(chunk) - VARHDRSZ;
    2744           0 :     else if (VARATT_IS_SHORT(chunk))
    2745             :         /* could happen due to heap_form_tuple doing its thing */
    2746           0 :         chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
    2747             :     else
    2748           0 :         elog(ERROR, "unexpected type of toast chunk");
    2749             : 
    2750           0 :     ent->size += chunksize;
    2751           0 :     ent->last_chunk_seq = chunk_seq;
    2752           0 :     ent->num_chunks++;
    2753           0 :     dlist_push_tail(&ent->chunks, &change->node);
    2754           0 : }
    2755             : 
    2756             : /*
    2757             :  * Rejigger change->newtuple to point to in-memory toast tuples instead to
    2758             :  * on-disk toast tuples that may not longer exist (think DROP TABLE or VACUUM).
    2759             :  *
    2760             :  * We cannot replace unchanged toast tuples though, so those will still point
    2761             :  * to on-disk toast data.
    2762             :  */
    2763             : static void
    2764           0 : ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
    2765             :                           Relation relation, ReorderBufferChange *change)
    2766             : {
    2767             :     TupleDesc   desc;
    2768             :     int         natt;
    2769             :     Datum      *attrs;
    2770             :     bool       *isnull;
    2771             :     bool       *free;
    2772             :     HeapTuple   tmphtup;
    2773             :     Relation    toast_rel;
    2774             :     TupleDesc   toast_desc;
    2775             :     MemoryContext oldcontext;
    2776             :     ReorderBufferTupleBuf *newtup;
    2777             : 
    2778             :     /* no toast tuples changed */
    2779           0 :     if (txn->toast_hash == NULL)
    2780           0 :         return;
    2781             : 
    2782           0 :     oldcontext = MemoryContextSwitchTo(rb->context);
    2783             : 
    2784             :     /* we should only have toast tuples in an INSERT or UPDATE */
    2785           0 :     Assert(change->data.tp.newtuple);
    2786             : 
    2787           0 :     desc = RelationGetDescr(relation);
    2788             : 
    2789           0 :     toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
    2790           0 :     toast_desc = RelationGetDescr(toast_rel);
    2791             : 
    2792             :     /* should we allocate from stack instead? */
    2793           0 :     attrs = palloc0(sizeof(Datum) * desc->natts);
    2794           0 :     isnull = palloc0(sizeof(bool) * desc->natts);
    2795           0 :     free = palloc0(sizeof(bool) * desc->natts);
    2796             : 
    2797           0 :     newtup = change->data.tp.newtuple;
    2798             : 
    2799           0 :     heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
    2800             : 
    2801           0 :     for (natt = 0; natt < desc->natts; natt++)
    2802             :     {
    2803           0 :         Form_pg_attribute attr = TupleDescAttr(desc, natt);
    2804             :         ReorderBufferToastEnt *ent;
    2805             :         struct varlena *varlena;
    2806             : 
    2807             :         /* va_rawsize is the size of the original datum -- including header */
    2808             :         struct varatt_external toast_pointer;
    2809             :         struct varatt_indirect redirect_pointer;
    2810           0 :         struct varlena *new_datum = NULL;
    2811             :         struct varlena *reconstructed;
    2812             :         dlist_iter  it;
    2813           0 :         Size        data_done = 0;
    2814             : 
    2815             :         /* system columns aren't toasted */
    2816           0 :         if (attr->attnum < 0)
    2817           0 :             continue;
    2818             : 
    2819           0 :         if (attr->attisdropped)
    2820           0 :             continue;
    2821             : 
    2822             :         /* not a varlena datatype */
    2823           0 :         if (attr->attlen != -1)
    2824           0 :             continue;
    2825             : 
    2826             :         /* no data */
    2827           0 :         if (isnull[natt])
    2828           0 :             continue;
    2829             : 
    2830             :         /* ok, we know we have a toast datum */
    2831           0 :         varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
    2832             : 
    2833             :         /* no need to do anything if the tuple isn't external */
    2834           0 :         if (!VARATT_IS_EXTERNAL(varlena))
    2835           0 :             continue;
    2836             : 
    2837           0 :         VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
    2838             : 
    2839             :         /*
    2840             :          * Check whether the toast tuple changed, replace if so.
    2841             :          */
    2842           0 :         ent = (ReorderBufferToastEnt *)
    2843           0 :             hash_search(txn->toast_hash,
    2844             :                         (void *) &toast_pointer.va_valueid,
    2845             :                         HASH_FIND,
    2846             :                         NULL);
    2847           0 :         if (ent == NULL)
    2848           0 :             continue;
    2849             : 
    2850           0 :         new_datum =
    2851             :             (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
    2852             : 
    2853           0 :         free[natt] = true;
    2854             : 
    2855           0 :         reconstructed = palloc0(toast_pointer.va_rawsize);
    2856             : 
    2857           0 :         ent->reconstructed = reconstructed;
    2858             : 
    2859             :         /* stitch toast tuple back together from its parts */
    2860           0 :         dlist_foreach(it, &ent->chunks)
    2861             :         {
    2862             :             bool        isnull;
    2863             :             ReorderBufferChange *cchange;
    2864             :             ReorderBufferTupleBuf *ctup;
    2865             :             Pointer     chunk;
    2866             : 
    2867           0 :             cchange = dlist_container(ReorderBufferChange, node, it.cur);
    2868           0 :             ctup = cchange->data.tp.newtuple;
    2869           0 :             chunk = DatumGetPointer(
    2870             :                                     fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
    2871             : 
    2872           0 :             Assert(!isnull);
    2873           0 :             Assert(!VARATT_IS_EXTERNAL(chunk));
    2874           0 :             Assert(!VARATT_IS_SHORT(chunk));
    2875             : 
    2876           0 :             memcpy(VARDATA(reconstructed) + data_done,
    2877           0 :                    VARDATA(chunk),
    2878           0 :                    VARSIZE(chunk) - VARHDRSZ);
    2879           0 :             data_done += VARSIZE(chunk) - VARHDRSZ;
    2880             :         }
    2881           0 :         Assert(data_done == toast_pointer.va_extsize);
    2882             : 
    2883             :         /* make sure its marked as compressed or not */
    2884           0 :         if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
    2885           0 :             SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
    2886             :         else
    2887           0 :             SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
    2888             : 
    2889           0 :         memset(&redirect_pointer, 0, sizeof(redirect_pointer));
    2890           0 :         redirect_pointer.pointer = reconstructed;
    2891             : 
    2892           0 :         SET_VARTAG_EXTERNAL(new_datum, VARTAG_INDIRECT);
    2893           0 :         memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
    2894             :                sizeof(redirect_pointer));
    2895             : 
    2896           0 :         attrs[natt] = PointerGetDatum(new_datum);
    2897             :     }
    2898             : 
    2899             :     /*
    2900             :      * Build tuple in separate memory & copy tuple back into the tuplebuf
    2901             :      * passed to the output plugin. We can't directly heap_fill_tuple() into
    2902             :      * the tuplebuf because attrs[] will point back into the current content.
    2903             :      */
    2904           0 :     tmphtup = heap_form_tuple(desc, attrs, isnull);
    2905           0 :     Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
    2906           0 :     Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
    2907             : 
    2908           0 :     memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
    2909           0 :     newtup->tuple.t_len = tmphtup->t_len;
    2910             : 
    2911             :     /*
    2912             :      * free resources we won't further need, more persistent stuff will be
    2913             :      * free'd in ReorderBufferToastReset().
    2914             :      */
    2915           0 :     RelationClose(toast_rel);
    2916           0 :     pfree(tmphtup);
    2917           0 :     for (natt = 0; natt < desc->natts; natt++)
    2918             :     {
    2919           0 :         if (free[natt])
    2920           0 :             pfree(DatumGetPointer(attrs[natt]));
    2921             :     }
    2922           0 :     pfree(attrs);
    2923           0 :     pfree(free);
    2924           0 :     pfree(isnull);
    2925             : 
    2926           0 :     MemoryContextSwitchTo(oldcontext);
    2927             : }
    2928             : 
    2929             : /*
    2930             :  * Free all resources allocated for toast reconstruction.
    2931             :  */
    2932             : static void
    2933           0 : ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
    2934             : {
    2935             :     HASH_SEQ_STATUS hstat;
    2936             :     ReorderBufferToastEnt *ent;
    2937             : 
    2938           0 :     if (txn->toast_hash == NULL)
    2939           0 :         return;
    2940             : 
    2941             :     /* sequentially walk over the hash and free everything */
    2942           0 :     hash_seq_init(&hstat, txn->toast_hash);
    2943           0 :     while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
    2944             :     {
    2945             :         dlist_mutable_iter it;
    2946             : 
    2947           0 :         if (ent->reconstructed != NULL)
    2948           0 :             pfree(ent->reconstructed);
    2949             : 
    2950           0 :         dlist_foreach_modify(it, &ent->chunks)
    2951             :         {
    2952           0 :             ReorderBufferChange *change =
    2953           0 :             dlist_container(ReorderBufferChange, node, it.cur);
    2954             : 
    2955           0 :             dlist_delete(&change->node);
    2956           0 :             ReorderBufferReturnChange(rb, change);
    2957             :         }
    2958             :     }
    2959             : 
    2960           0 :     hash_destroy(txn->toast_hash);
    2961           0 :     txn->toast_hash = NULL;
    2962             : }
    2963             : 
    2964             : 
    2965             : /* ---------------------------------------
    2966             :  * Visibility support for logical decoding
    2967             :  *
    2968             :  *
    2969             :  * Lookup actual cmin/cmax values when using decoding snapshot. We can't
    2970             :  * always rely on stored cmin/cmax values because of two scenarios:
    2971             :  *
    2972             :  * * A tuple got changed multiple times during a single transaction and thus
    2973             :  *   has got a combocid. Combocid's are only valid for the duration of a
    2974             :  *   single transaction.
    2975             :  * * A tuple with a cmin but no cmax (and thus no combocid) got
    2976             :  *   deleted/updated in another transaction than the one which created it
    2977             :  *   which we are looking at right now. As only one of cmin, cmax or combocid
    2978             :  *   is actually stored in the heap we don't have access to the value we
    2979             :  *   need anymore.
    2980             :  *
    2981             :  * To resolve those problems we have a per-transaction hash of (cmin,
    2982             :  * cmax) tuples keyed by (relfilenode, ctid) which contains the actual
    2983             :  * (cmin, cmax) values. That also takes care of combocids by simply
    2984             :  * not caring about them at all. As we have the real cmin/cmax values
    2985             :  * combocids aren't interesting.
    2986             :  *
    2987             :  * As we only care about catalog tuples here the overhead of this
    2988             :  * hashtable should be acceptable.
    2989             :  *
    2990             :  * Heap rewrites complicate this a bit, check rewriteheap.c for
    2991             :  * details.
    2992             :  * -------------------------------------------------------------------------
    2993             :  */
    2994             : 
    2995             : /* struct for qsort()ing mapping files by lsn somewhat efficiently */
    2996             : typedef struct RewriteMappingFile
    2997             : {
    2998             :     XLogRecPtr  lsn;
    2999             :     char        fname[MAXPGPATH];
    3000             : } RewriteMappingFile;
    3001             : 
    3002             : #if NOT_USED
    3003             : static void
    3004             : DisplayMapping(HTAB *tuplecid_data)
    3005             : {
    3006             :     HASH_SEQ_STATUS hstat;
    3007             :     ReorderBufferTupleCidEnt *ent;
    3008             : 
    3009             :     hash_seq_init(&hstat, tuplecid_data);
    3010             :     while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
    3011             :     {
    3012             :         elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
    3013             :              ent->key.relnode.dbNode,
    3014             :              ent->key.relnode.spcNode,
    3015             :              ent->key.relnode.relNode,
    3016             :              ItemPointerGetBlockNumber(&ent->key.tid),
    3017             :              ItemPointerGetOffsetNumber(&ent->key.tid),
    3018             :              ent->cmin,
    3019             :              ent->cmax
    3020             :             );
    3021             :     }
    3022             : }
    3023             : #endif
    3024             : 
    3025             : /*
    3026             :  * Apply a single mapping file to tuplecid_data.
    3027             :  *
    3028             :  * The mapping file has to have been verified to be a) committed b) for our
    3029             :  * transaction c) applied in LSN order.
    3030             :  */
    3031             : static void
    3032           0 : ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
    3033             : {
    3034             :     char        path[MAXPGPATH];
    3035             :     int         fd;
    3036             :     int         readBytes;
    3037             :     LogicalRewriteMappingData map;
    3038             : 
    3039           0 :     sprintf(path, "pg_logical/mappings/%s", fname);
    3040           0 :     fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
    3041           0 :     if (fd < 0)
    3042           0 :         ereport(ERROR,
    3043             :                 (errcode_for_file_access(),
    3044             :                  errmsg("could not open file \"%s\": %m", path)));
    3045             : 
    3046             :     while (true)
    3047             :     {
    3048             :         ReorderBufferTupleCidKey key;
    3049             :         ReorderBufferTupleCidEnt *ent;
    3050             :         ReorderBufferTupleCidEnt *new_ent;
    3051             :         bool        found;
    3052             : 
    3053             :         /* be careful about padding */
    3054           0 :         memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
    3055             : 
    3056             :         /* read all mappings till the end of the file */
    3057           0 :         pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ);
    3058           0 :         readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
    3059           0 :         pgstat_report_wait_end();
    3060             : 
    3061           0 :         if (readBytes < 0)
    3062           0 :             ereport(ERROR,
    3063             :                     (errcode_for_file_access(),
    3064             :                      errmsg("could not read file \"%s\": %m",
    3065             :                             path)));
    3066           0 :         else if (readBytes == 0)    /* EOF */
    3067           0 :             break;
    3068           0 :         else if (readBytes != sizeof(LogicalRewriteMappingData))
    3069           0 :             ereport(ERROR,
    3070             :                     (errcode_for_file_access(),
    3071             :                      errmsg("could not read from file \"%s\": read %d instead of %d bytes",
    3072             :                             path, readBytes,
    3073             :                             (int32) sizeof(LogicalRewriteMappingData))));
    3074             : 
    3075           0 :         key.relnode = map.old_node;
    3076           0 :         ItemPointerCopy(&map.old_tid,
    3077             :                         &key.tid);
    3078             : 
    3079             : 
    3080           0 :         ent = (ReorderBufferTupleCidEnt *)
    3081             :             hash_search(tuplecid_data,
    3082             :                         (void *) &key,
    3083             :                         HASH_FIND,
    3084             :                         NULL);
    3085             : 
    3086             :         /* no existing mapping, no need to update */
    3087           0 :         if (!ent)
    3088           0 :             continue;
    3089             : 
    3090           0 :         key.relnode = map.new_node;
    3091           0 :         ItemPointerCopy(&map.new_tid,
    3092             :                         &key.tid);
    3093             : 
    3094           0 :         new_ent = (ReorderBufferTupleCidEnt *)
    3095             :             hash_search(tuplecid_data,
    3096             :                         (void *) &key,
    3097             :                         HASH_ENTER,
    3098             :                         &found);
    3099             : 
    3100           0 :         if (found)
    3101             :         {
    3102             :             /*
    3103             :              * Make sure the existing mapping makes sense. We sometime update
    3104             :              * old records that did not yet have a cmax (e.g. pg_class' own
    3105             :              * entry while rewriting it) during rewrites, so allow that.
    3106             :              */
    3107           0 :             Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
    3108           0 :             Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
    3109             :         }
    3110             :         else
    3111             :         {
    3112             :             /* update mapping */
    3113           0 :             new_ent->cmin = ent->cmin;
    3114           0 :             new_ent->cmax = ent->cmax;
    3115           0 :             new_ent->combocid = ent->combocid;
    3116             :         }
    3117           0 :     }
    3118           0 : }
    3119             : 
    3120             : 
    3121             : /*
    3122             :  * Check whether the TransactionOId 'xid' is in the pre-sorted array 'xip'.
    3123             :  */
    3124             : static bool
    3125           0 : TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
    3126             : {
    3127           0 :     return bsearch(&xid, xip, num,
    3128           0 :                    sizeof(TransactionId), xidComparator) != NULL;
    3129             : }
    3130             : 
    3131             : /*
    3132             :  * qsort() comparator for sorting RewriteMappingFiles in LSN order.
    3133             :  */
    3134             : static int
    3135           0 : file_sort_by_lsn(const void *a_p, const void *b_p)
    3136             : {
    3137           0 :     RewriteMappingFile *a = *(RewriteMappingFile **) a_p;
    3138           0 :     RewriteMappingFile *b = *(RewriteMappingFile **) b_p;
    3139             : 
    3140           0 :     if (a->lsn < b->lsn)
    3141           0 :         return -1;
    3142           0 :     else if (a->lsn > b->lsn)
    3143           0 :         return 1;
    3144           0 :     return 0;
    3145             : }
    3146             : 
    3147             : /*
    3148             :  * Apply any existing logical remapping files if there are any targeted at our
    3149             :  * transaction for relid.
    3150             :  */
    3151             : static void
    3152           0 : UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
    3153             : {
    3154             :     DIR        *mapping_dir;
    3155             :     struct dirent *mapping_de;
    3156           0 :     List       *files = NIL;
    3157             :     ListCell   *file;
    3158             :     RewriteMappingFile **files_a;
    3159             :     size_t      off;
    3160           0 :     Oid         dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
    3161             : 
    3162           0 :     mapping_dir = AllocateDir("pg_logical/mappings");
    3163           0 :     while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
    3164             :     {
    3165             :         Oid         f_dboid;
    3166             :         Oid         f_relid;
    3167             :         TransactionId f_mapped_xid;
    3168             :         TransactionId f_create_xid;
    3169             :         XLogRecPtr  f_lsn;
    3170             :         uint32      f_hi,
    3171             :                     f_lo;
    3172             :         RewriteMappingFile *f;
    3173             : 
    3174           0 :         if (strcmp(mapping_de->d_name, ".") == 0 ||
    3175           0 :             strcmp(mapping_de->d_name, "..") == 0)
    3176           0 :             continue;
    3177             : 
    3178             :         /* Ignore files that aren't ours */
    3179           0 :         if (strncmp(mapping_de->d_name, "map-", 4) != 0)
    3180           0 :             continue;
    3181             : 
    3182           0 :         if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
    3183             :                    &f_dboid, &f_relid, &f_hi, &f_lo,
    3184             :                    &f_mapped_xid, &f_create_xid) != 6)
    3185           0 :             elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
    3186             : 
    3187           0 :         f_lsn = ((uint64) f_hi) << 32 | f_lo;
    3188             : 
    3189             :         /* mapping for another database */
    3190           0 :         if (f_dboid != dboid)
    3191           0 :             continue;
    3192             : 
    3193             :         /* mapping for another relation */
    3194           0 :         if (f_relid != relid)
    3195           0 :             continue;
    3196             : 
    3197             :         /* did the creating transaction abort? */
    3198           0 :         if (!TransactionIdDidCommit(f_create_xid))
    3199           0 :             continue;
    3200             : 
    3201             :         /* not for our transaction */
    3202           0 :         if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
    3203           0 :             continue;
    3204             : 
    3205             :         /* ok, relevant, queue for apply */
    3206           0 :         f = palloc(sizeof(RewriteMappingFile));
    3207           0 :         f->lsn = f_lsn;
    3208           0 :         strcpy(f->fname, mapping_de->d_name);
    3209           0 :         files = lappend(files, f);
    3210             :     }
    3211           0 :     FreeDir(mapping_dir);
    3212             : 
    3213             :     /* build array we can easily sort */
    3214           0 :     files_a = palloc(list_length(files) * sizeof(RewriteMappingFile *));
    3215           0 :     off = 0;
    3216           0 :     foreach(file, files)
    3217             :     {
    3218           0 :         files_a[off++] = lfirst(file);
    3219             :     }
    3220             : 
    3221             :     /* sort files so we apply them in LSN order */
    3222           0 :     qsort(files_a, list_length(files), sizeof(RewriteMappingFile *),
    3223             :           file_sort_by_lsn);
    3224             : 
    3225           0 :     for (off = 0; off < list_length(files); off++)
    3226             :     {
    3227           0 :         RewriteMappingFile *f = files_a[off];
    3228             : 
    3229           0 :         elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
    3230             :              snapshot->subxip[0]);
    3231           0 :         ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
    3232           0 :         pfree(f);
    3233             :     }
    3234           0 : }
    3235             : 
    3236             : /*
    3237             :  * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
    3238             :  * combocids.
    3239             :  */
    3240             : bool
    3241           0 : ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data,
    3242             :                               Snapshot snapshot,
    3243             :                               HeapTuple htup, Buffer buffer,
    3244             :                               CommandId *cmin, CommandId *cmax)
    3245             : {
    3246             :     ReorderBufferTupleCidKey key;
    3247             :     ReorderBufferTupleCidEnt *ent;
    3248             :     ForkNumber  forkno;
    3249             :     BlockNumber blockno;
    3250           0 :     bool        updated_mapping = false;
    3251             : 
    3252             :     /* be careful about padding */
    3253           0 :     memset(&key, 0, sizeof(key));
    3254             : 
    3255           0 :     Assert(!BufferIsLocal(buffer));
    3256             : 
    3257             :     /*
    3258             :      * get relfilenode from the buffer, no convenient way to access it other
    3259             :      * than that.
    3260             :      */
    3261           0 :     BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
    3262             : 
    3263             :     /* tuples can only be in the main fork */
    3264           0 :     Assert(forkno == MAIN_FORKNUM);
    3265           0 :     Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
    3266             : 
    3267           0 :     ItemPointerCopy(&htup->t_self,
    3268             :                     &key.tid);
    3269             : 
    3270             : restart:
    3271           0 :     ent = (ReorderBufferTupleCidEnt *)
    3272             :         hash_search(tuplecid_data,
    3273             :                     (void *) &key,
    3274             :                     HASH_FIND,
    3275             :                     NULL);
    3276             : 
    3277             :     /*
    3278             :      * failed to find a mapping, check whether the table was rewritten and
    3279             :      * apply mapping if so, but only do that once - there can be no new
    3280             :      * mappings while we are in here since we have to hold a lock on the
    3281             :      * relation.
    3282             :      */
    3283           0 :     if (ent == NULL && !updated_mapping)
    3284             :     {
    3285           0 :         UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
    3286             :         /* now check but don't update for a mapping again */
    3287           0 :         updated_mapping = true;
    3288           0 :         goto restart;
    3289             :     }
    3290           0 :     else if (ent == NULL)
    3291           0 :         return false;
    3292             : 
    3293           0 :     if (cmin)
    3294           0 :         *cmin = ent->cmin;
    3295           0 :     if (cmax)
    3296           0 :         *cmax = ent->cmax;
    3297           0 :     return true;
    3298             : }

Generated by: LCOV version 1.11