Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * reorderbuffer.c
4 : * PostgreSQL logical replay/reorder buffer management
5 : *
6 : *
7 : * Copyright (c) 2012-2017, PostgreSQL Global Development Group
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/replication/reorderbuffer.c
12 : *
13 : * NOTES
14 : * This module gets handed individual pieces of transactions in the order
15 : * they are written to the WAL and is responsible to reassemble them into
16 : * toplevel transaction sized pieces. When a transaction is completely
17 : * reassembled - signalled by reading the transaction commit record - it
18 : * will then call the output plugin (c.f. ReorderBufferCommit()) with the
19 : * individual changes. The output plugins rely on snapshots built by
20 : * snapbuild.c which hands them to us.
21 : *
22 : * Transactions and subtransactions/savepoints in postgres are not
23 : * immediately linked to each other from outside the performing
24 : * backend. Only at commit/abort (or special xact_assignment records) they
25 : * are linked together. Which means that we will have to splice together a
26 : * toplevel transaction from its subtransactions. To do that efficiently we
27 : * build a binary heap indexed by the smallest current lsn of the individual
28 : * subtransactions' changestreams. As the individual streams are inherently
29 : * ordered by LSN - since that is where we build them from - the transaction
30 : * can easily be reassembled by always using the subtransaction with the
31 : * smallest current LSN from the heap.
32 : *
33 : * In order to cope with large transactions - which can be several times as
34 : * big as the available memory - this module supports spooling the contents
35 : * of a large transactions to disk. When the transaction is replayed the
36 : * contents of individual (sub-)transactions will be read from disk in
37 : * chunks.
38 : *
39 : * This module also has to deal with reassembling toast records from the
40 : * individual chunks stored in WAL. When a new (or initial) version of a
41 : * tuple is stored in WAL it will always be preceded by the toast chunks
42 : * emitted for the columns stored out of line. Within a single toplevel
43 : * transaction there will be no other data carrying records between a row's
44 : * toast chunks and the row data itself. See ReorderBufferToast* for
45 : * details.
46 : * -------------------------------------------------------------------------
47 : */
48 : #include "postgres.h"
49 :
50 : #include <unistd.h>
51 : #include <sys/stat.h>
52 :
53 : #include "access/rewriteheap.h"
54 : #include "access/transam.h"
55 : #include "access/tuptoaster.h"
56 : #include "access/xact.h"
57 : #include "access/xlog_internal.h"
58 : #include "catalog/catalog.h"
59 : #include "lib/binaryheap.h"
60 : #include "miscadmin.h"
61 : #include "pgstat.h"
62 : #include "replication/logical.h"
63 : #include "replication/reorderbuffer.h"
64 : #include "replication/slot.h"
65 : #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
66 : #include "storage/bufmgr.h"
67 : #include "storage/fd.h"
68 : #include "storage/sinval.h"
69 : #include "utils/builtins.h"
70 : #include "utils/combocid.h"
71 : #include "utils/memdebug.h"
72 : #include "utils/memutils.h"
73 : #include "utils/rel.h"
74 : #include "utils/relfilenodemap.h"
75 : #include "utils/tqual.h"
76 :
77 :
78 : /* entry for a hash table we use to map from xid to our transaction state */
79 : typedef struct ReorderBufferTXNByIdEnt
80 : {
81 : TransactionId xid;
82 : ReorderBufferTXN *txn;
83 : } ReorderBufferTXNByIdEnt;
84 :
85 : /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
86 : typedef struct ReorderBufferTupleCidKey
87 : {
88 : RelFileNode relnode;
89 : ItemPointerData tid;
90 : } ReorderBufferTupleCidKey;
91 :
92 : typedef struct ReorderBufferTupleCidEnt
93 : {
94 : ReorderBufferTupleCidKey key;
95 : CommandId cmin;
96 : CommandId cmax;
97 : CommandId combocid; /* just for debugging */
98 : } ReorderBufferTupleCidEnt;
99 :
100 : /* k-way in-order change iteration support structures */
101 : typedef struct ReorderBufferIterTXNEntry
102 : {
103 : XLogRecPtr lsn;
104 : ReorderBufferChange *change;
105 : ReorderBufferTXN *txn;
106 : int fd;
107 : XLogSegNo segno;
108 : } ReorderBufferIterTXNEntry;
109 :
110 : typedef struct ReorderBufferIterTXNState
111 : {
112 : binaryheap *heap;
113 : Size nr_txns;
114 : dlist_head old_change;
115 : ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER];
116 : } ReorderBufferIterTXNState;
117 :
118 : /* toast datastructures */
119 : typedef struct ReorderBufferToastEnt
120 : {
121 : Oid chunk_id; /* toast_table.chunk_id */
122 : int32 last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
123 : * have seen */
124 : Size num_chunks; /* number of chunks we've already seen */
125 : Size size; /* combined size of chunks seen */
126 : dlist_head chunks; /* linked list of chunks */
127 : struct varlena *reconstructed; /* reconstructed varlena now pointed to in
128 : * main tup */
129 : } ReorderBufferToastEnt;
130 :
131 : /* Disk serialization support datastructures */
132 : typedef struct ReorderBufferDiskChange
133 : {
134 : Size size;
135 : ReorderBufferChange change;
136 : /* data follows */
137 : } ReorderBufferDiskChange;
138 :
139 : /*
140 : * Maximum number of changes kept in memory, per transaction. After that,
141 : * changes are spooled to disk.
142 : *
143 : * The current value should be sufficient to decode the entire transaction
144 : * without hitting disk in OLTP workloads, while starting to spool to disk in
145 : * other workloads reasonably fast.
146 : *
147 : * At some point in the future it probably makes sense to have a more elaborate
148 : * resource management here, but it's not entirely clear what that would look
149 : * like.
150 : */
151 : static const Size max_changes_in_memory = 4096;
152 :
153 : /*
154 : * We use a very simple form of a slab allocator for frequently allocated
155 : * objects, simply keeping a fixed number in a linked list when unused,
156 : * instead pfree()ing them. Without that in many workloads aset.c becomes a
157 : * major bottleneck, especially when spilling to disk while decoding batch
158 : * workloads.
159 : */
160 : static const Size max_cached_tuplebufs = 4096 * 2; /* ~8MB */
161 :
162 : /* ---------------------------------------
163 : * primary reorderbuffer support routines
164 : * ---------------------------------------
165 : */
166 : static ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb);
167 : static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
168 : static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
169 : TransactionId xid, bool create, bool *is_new,
170 : XLogRecPtr lsn, bool create_as_top);
171 :
172 : static void AssertTXNLsnOrder(ReorderBuffer *rb);
173 :
174 : /* ---------------------------------------
175 : * support functions for lsn-order iterating over the ->changes of a
176 : * transaction and its subtransactions
177 : *
178 : * used for iteration over the k-way heap merge of a transaction and its
179 : * subtransactions
180 : * ---------------------------------------
181 : */
182 : static ReorderBufferIterTXNState *ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn);
183 : static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
184 : static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
185 : ReorderBufferIterTXNState *state);
186 : static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn);
187 :
188 : /*
189 : * ---------------------------------------
190 : * Disk serialization support functions
191 : * ---------------------------------------
192 : */
193 : static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
194 : static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
195 : static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
196 : int fd, ReorderBufferChange *change);
197 : static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
198 : int *fd, XLogSegNo *segno);
199 : static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
200 : char *change);
201 : static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
202 :
203 : static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
204 : static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
205 : ReorderBufferTXN *txn, CommandId cid);
206 :
207 : /* ---------------------------------------
208 : * toast reassembly support
209 : * ---------------------------------------
210 : */
211 : static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn);
212 : static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn);
213 : static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
214 : Relation relation, ReorderBufferChange *change);
215 : static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
216 : Relation relation, ReorderBufferChange *change);
217 :
218 :
219 : /*
220 : * Allocate a new ReorderBuffer
221 : */
222 : ReorderBuffer *
223 0 : ReorderBufferAllocate(void)
224 : {
225 : ReorderBuffer *buffer;
226 : HASHCTL hash_ctl;
227 : MemoryContext new_ctx;
228 :
229 : /* allocate memory in own context, to have better accountability */
230 0 : new_ctx = AllocSetContextCreate(CurrentMemoryContext,
231 : "ReorderBuffer",
232 : ALLOCSET_DEFAULT_SIZES);
233 :
234 0 : buffer =
235 : (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
236 :
237 0 : memset(&hash_ctl, 0, sizeof(hash_ctl));
238 :
239 0 : buffer->context = new_ctx;
240 :
241 0 : buffer->change_context = SlabContextCreate(new_ctx,
242 : "Change",
243 : SLAB_DEFAULT_BLOCK_SIZE,
244 : sizeof(ReorderBufferChange));
245 :
246 0 : buffer->txn_context = SlabContextCreate(new_ctx,
247 : "TXN",
248 : SLAB_DEFAULT_BLOCK_SIZE,
249 : sizeof(ReorderBufferTXN));
250 :
251 0 : hash_ctl.keysize = sizeof(TransactionId);
252 0 : hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
253 0 : hash_ctl.hcxt = buffer->context;
254 :
255 0 : buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
256 : HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
257 :
258 0 : buffer->by_txn_last_xid = InvalidTransactionId;
259 0 : buffer->by_txn_last_txn = NULL;
260 :
261 0 : buffer->nr_cached_tuplebufs = 0;
262 :
263 0 : buffer->outbuf = NULL;
264 0 : buffer->outbufsize = 0;
265 :
266 0 : buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
267 :
268 0 : dlist_init(&buffer->toplevel_by_lsn);
269 0 : slist_init(&buffer->cached_tuplebufs);
270 :
271 0 : return buffer;
272 : }
273 :
274 : /*
275 : * Free a ReorderBuffer
276 : */
277 : void
278 0 : ReorderBufferFree(ReorderBuffer *rb)
279 : {
280 0 : MemoryContext context = rb->context;
281 :
282 : /*
283 : * We free separately allocated data by entirely scrapping reorderbuffer's
284 : * memory context.
285 : */
286 0 : MemoryContextDelete(context);
287 0 : }
288 :
289 : /*
290 : * Get an unused, possibly preallocated, ReorderBufferTXN.
291 : */
292 : static ReorderBufferTXN *
293 0 : ReorderBufferGetTXN(ReorderBuffer *rb)
294 : {
295 : ReorderBufferTXN *txn;
296 :
297 0 : txn = (ReorderBufferTXN *)
298 0 : MemoryContextAlloc(rb->txn_context, sizeof(ReorderBufferTXN));
299 :
300 0 : memset(txn, 0, sizeof(ReorderBufferTXN));
301 :
302 0 : dlist_init(&txn->changes);
303 0 : dlist_init(&txn->tuplecids);
304 0 : dlist_init(&txn->subtxns);
305 :
306 0 : return txn;
307 : }
308 :
309 : /*
310 : * Free a ReorderBufferTXN.
311 : *
312 : * Deallocation might be delayed for efficiency purposes, for details check
313 : * the comments above max_cached_changes's definition.
314 : */
315 : static void
316 0 : ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
317 : {
318 : /* clean the lookup cache if we were cached (quite likely) */
319 0 : if (rb->by_txn_last_xid == txn->xid)
320 : {
321 0 : rb->by_txn_last_xid = InvalidTransactionId;
322 0 : rb->by_txn_last_txn = NULL;
323 : }
324 :
325 : /* free data that's contained */
326 :
327 0 : if (txn->tuplecid_hash != NULL)
328 : {
329 0 : hash_destroy(txn->tuplecid_hash);
330 0 : txn->tuplecid_hash = NULL;
331 : }
332 :
333 0 : if (txn->invalidations)
334 : {
335 0 : pfree(txn->invalidations);
336 0 : txn->invalidations = NULL;
337 : }
338 :
339 0 : pfree(txn);
340 0 : }
341 :
342 : /*
343 : * Get an unused, possibly preallocated, ReorderBufferChange.
344 : */
345 : ReorderBufferChange *
346 0 : ReorderBufferGetChange(ReorderBuffer *rb)
347 : {
348 : ReorderBufferChange *change;
349 :
350 0 : change = (ReorderBufferChange *)
351 0 : MemoryContextAlloc(rb->change_context, sizeof(ReorderBufferChange));
352 :
353 0 : memset(change, 0, sizeof(ReorderBufferChange));
354 0 : return change;
355 : }
356 :
357 : /*
358 : * Free an ReorderBufferChange.
359 : *
360 : * Deallocation might be delayed for efficiency purposes, for details check
361 : * the comments above max_cached_changes's definition.
362 : */
363 : void
364 0 : ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
365 : {
366 : /* free contained data */
367 0 : switch (change->action)
368 : {
369 : case REORDER_BUFFER_CHANGE_INSERT:
370 : case REORDER_BUFFER_CHANGE_UPDATE:
371 : case REORDER_BUFFER_CHANGE_DELETE:
372 : case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
373 0 : if (change->data.tp.newtuple)
374 : {
375 0 : ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
376 0 : change->data.tp.newtuple = NULL;
377 : }
378 :
379 0 : if (change->data.tp.oldtuple)
380 : {
381 0 : ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
382 0 : change->data.tp.oldtuple = NULL;
383 : }
384 0 : break;
385 : case REORDER_BUFFER_CHANGE_MESSAGE:
386 0 : if (change->data.msg.prefix != NULL)
387 0 : pfree(change->data.msg.prefix);
388 0 : change->data.msg.prefix = NULL;
389 0 : if (change->data.msg.message != NULL)
390 0 : pfree(change->data.msg.message);
391 0 : change->data.msg.message = NULL;
392 0 : break;
393 : case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
394 0 : if (change->data.snapshot)
395 : {
396 0 : ReorderBufferFreeSnap(rb, change->data.snapshot);
397 0 : change->data.snapshot = NULL;
398 : }
399 0 : break;
400 : /* no data in addition to the struct itself */
401 : case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
402 : case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
403 : case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
404 0 : break;
405 : }
406 :
407 0 : pfree(change);
408 0 : }
409 :
410 : /*
411 : * Get an unused, possibly preallocated, ReorderBufferTupleBuf fitting at
412 : * least a tuple of size tuple_len (excluding header overhead).
413 : */
414 : ReorderBufferTupleBuf *
415 0 : ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
416 : {
417 : ReorderBufferTupleBuf *tuple;
418 : Size alloc_len;
419 :
420 0 : alloc_len = tuple_len + SizeofHeapTupleHeader;
421 :
422 : /*
423 : * Most tuples are below MaxHeapTupleSize, so we use a slab allocator for
424 : * those. Thus always allocate at least MaxHeapTupleSize. Note that tuples
425 : * generated for oldtuples can be bigger, as they don't have out-of-line
426 : * toast columns.
427 : */
428 0 : if (alloc_len < MaxHeapTupleSize)
429 0 : alloc_len = MaxHeapTupleSize;
430 :
431 :
432 : /* if small enough, check the slab cache */
433 0 : if (alloc_len <= MaxHeapTupleSize && rb->nr_cached_tuplebufs)
434 : {
435 0 : rb->nr_cached_tuplebufs--;
436 0 : tuple = slist_container(ReorderBufferTupleBuf, node,
437 : slist_pop_head_node(&rb->cached_tuplebufs));
438 0 : Assert(tuple->alloc_tuple_size == MaxHeapTupleSize);
439 : #ifdef USE_ASSERT_CHECKING
440 0 : memset(&tuple->tuple, 0xa9, sizeof(HeapTupleData));
441 : VALGRIND_MAKE_MEM_UNDEFINED(&tuple->tuple, sizeof(HeapTupleData));
442 : #endif
443 0 : tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
444 : #ifdef USE_ASSERT_CHECKING
445 0 : memset(tuple->tuple.t_data, 0xa8, tuple->alloc_tuple_size);
446 : VALGRIND_MAKE_MEM_UNDEFINED(tuple->tuple.t_data, tuple->alloc_tuple_size);
447 : #endif
448 : }
449 : else
450 : {
451 0 : tuple = (ReorderBufferTupleBuf *)
452 0 : MemoryContextAlloc(rb->context,
453 : sizeof(ReorderBufferTupleBuf) +
454 : MAXIMUM_ALIGNOF + alloc_len);
455 0 : tuple->alloc_tuple_size = alloc_len;
456 0 : tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
457 : }
458 :
459 0 : return tuple;
460 : }
461 :
462 : /*
463 : * Free an ReorderBufferTupleBuf.
464 : *
465 : * Deallocation might be delayed for efficiency purposes, for details check
466 : * the comments above max_cached_changes's definition.
467 : */
468 : void
469 0 : ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
470 : {
471 : /* check whether to put into the slab cache, oversized tuples never are */
472 0 : if (tuple->alloc_tuple_size == MaxHeapTupleSize &&
473 0 : rb->nr_cached_tuplebufs < max_cached_tuplebufs)
474 : {
475 0 : rb->nr_cached_tuplebufs++;
476 0 : slist_push_head(&rb->cached_tuplebufs, &tuple->node);
477 : VALGRIND_MAKE_MEM_UNDEFINED(tuple->tuple.t_data, tuple->alloc_tuple_size);
478 : VALGRIND_MAKE_MEM_UNDEFINED(tuple, sizeof(ReorderBufferTupleBuf));
479 : VALGRIND_MAKE_MEM_DEFINED(&tuple->node, sizeof(tuple->node));
480 : VALGRIND_MAKE_MEM_DEFINED(&tuple->alloc_tuple_size, sizeof(tuple->alloc_tuple_size));
481 : }
482 : else
483 : {
484 0 : pfree(tuple);
485 : }
486 0 : }
487 :
488 : /*
489 : * Return the ReorderBufferTXN from the given buffer, specified by Xid.
490 : * If create is true, and a transaction doesn't already exist, create it
491 : * (with the given LSN, and as top transaction if that's specified);
492 : * when this happens, is_new is set to true.
493 : */
494 : static ReorderBufferTXN *
495 0 : ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
496 : bool *is_new, XLogRecPtr lsn, bool create_as_top)
497 : {
498 : ReorderBufferTXN *txn;
499 : ReorderBufferTXNByIdEnt *ent;
500 : bool found;
501 :
502 0 : Assert(TransactionIdIsValid(xid));
503 0 : Assert(!create || lsn != InvalidXLogRecPtr);
504 :
505 : /*
506 : * Check the one-entry lookup cache first
507 : */
508 0 : if (TransactionIdIsValid(rb->by_txn_last_xid) &&
509 0 : rb->by_txn_last_xid == xid)
510 : {
511 0 : txn = rb->by_txn_last_txn;
512 :
513 0 : if (txn != NULL)
514 : {
515 : /* found it, and it's valid */
516 0 : if (is_new)
517 0 : *is_new = false;
518 0 : return txn;
519 : }
520 :
521 : /*
522 : * cached as non-existent, and asked not to create? Then nothing else
523 : * to do.
524 : */
525 0 : if (!create)
526 0 : return NULL;
527 : /* otherwise fall through to create it */
528 : }
529 :
530 : /*
531 : * If the cache wasn't hit or it yielded an "does-not-exist" and we want
532 : * to create an entry.
533 : */
534 :
535 : /* search the lookup table */
536 0 : ent = (ReorderBufferTXNByIdEnt *)
537 0 : hash_search(rb->by_txn,
538 : (void *) &xid,
539 : create ? HASH_ENTER : HASH_FIND,
540 : &found);
541 0 : if (found)
542 0 : txn = ent->txn;
543 0 : else if (create)
544 : {
545 : /* initialize the new entry, if creation was requested */
546 0 : Assert(ent != NULL);
547 :
548 0 : ent->txn = ReorderBufferGetTXN(rb);
549 0 : ent->txn->xid = xid;
550 0 : txn = ent->txn;
551 0 : txn->first_lsn = lsn;
552 0 : txn->restart_decoding_lsn = rb->current_restart_decoding_lsn;
553 :
554 0 : if (create_as_top)
555 : {
556 0 : dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
557 0 : AssertTXNLsnOrder(rb);
558 : }
559 : }
560 : else
561 0 : txn = NULL; /* not found and not asked to create */
562 :
563 : /* update cache */
564 0 : rb->by_txn_last_xid = xid;
565 0 : rb->by_txn_last_txn = txn;
566 :
567 0 : if (is_new)
568 0 : *is_new = !found;
569 :
570 0 : Assert(!create || txn != NULL);
571 0 : return txn;
572 : }
573 :
574 : /*
575 : * Queue a change into a transaction so it can be replayed upon commit.
576 : */
577 : void
578 0 : ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
579 : ReorderBufferChange *change)
580 : {
581 : ReorderBufferTXN *txn;
582 :
583 0 : txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
584 :
585 0 : change->lsn = lsn;
586 0 : Assert(InvalidXLogRecPtr != lsn);
587 0 : dlist_push_tail(&txn->changes, &change->node);
588 0 : txn->nentries++;
589 0 : txn->nentries_mem++;
590 :
591 0 : ReorderBufferCheckSerializeTXN(rb, txn);
592 0 : }
593 :
594 : /*
595 : * Queue message into a transaction so it can be processed upon commit.
596 : */
597 : void
598 0 : ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
599 : Snapshot snapshot, XLogRecPtr lsn,
600 : bool transactional, const char *prefix,
601 : Size message_size, const char *message)
602 : {
603 0 : if (transactional)
604 : {
605 : MemoryContext oldcontext;
606 : ReorderBufferChange *change;
607 :
608 0 : Assert(xid != InvalidTransactionId);
609 :
610 0 : oldcontext = MemoryContextSwitchTo(rb->context);
611 :
612 0 : change = ReorderBufferGetChange(rb);
613 0 : change->action = REORDER_BUFFER_CHANGE_MESSAGE;
614 0 : change->data.msg.prefix = pstrdup(prefix);
615 0 : change->data.msg.message_size = message_size;
616 0 : change->data.msg.message = palloc(message_size);
617 0 : memcpy(change->data.msg.message, message, message_size);
618 :
619 0 : ReorderBufferQueueChange(rb, xid, lsn, change);
620 :
621 0 : MemoryContextSwitchTo(oldcontext);
622 : }
623 : else
624 : {
625 0 : ReorderBufferTXN *txn = NULL;
626 0 : volatile Snapshot snapshot_now = snapshot;
627 :
628 0 : if (xid != InvalidTransactionId)
629 0 : txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
630 :
631 : /* setup snapshot to allow catalog access */
632 0 : SetupHistoricSnapshot(snapshot_now, NULL);
633 0 : PG_TRY();
634 : {
635 0 : rb->message(rb, txn, lsn, false, prefix, message_size, message);
636 :
637 0 : TeardownHistoricSnapshot(false);
638 : }
639 0 : PG_CATCH();
640 : {
641 0 : TeardownHistoricSnapshot(true);
642 0 : PG_RE_THROW();
643 : }
644 0 : PG_END_TRY();
645 : }
646 0 : }
647 :
648 :
649 : static void
650 0 : AssertTXNLsnOrder(ReorderBuffer *rb)
651 : {
652 : #ifdef USE_ASSERT_CHECKING
653 : dlist_iter iter;
654 0 : XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
655 :
656 0 : dlist_foreach(iter, &rb->toplevel_by_lsn)
657 : {
658 : ReorderBufferTXN *cur_txn;
659 :
660 0 : cur_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
661 0 : Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
662 :
663 0 : if (cur_txn->end_lsn != InvalidXLogRecPtr)
664 0 : Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
665 :
666 0 : if (prev_first_lsn != InvalidXLogRecPtr)
667 0 : Assert(prev_first_lsn < cur_txn->first_lsn);
668 :
669 0 : Assert(!cur_txn->is_known_as_subxact);
670 0 : prev_first_lsn = cur_txn->first_lsn;
671 : }
672 : #endif
673 0 : }
674 :
675 : ReorderBufferTXN *
676 0 : ReorderBufferGetOldestTXN(ReorderBuffer *rb)
677 : {
678 : ReorderBufferTXN *txn;
679 :
680 0 : if (dlist_is_empty(&rb->toplevel_by_lsn))
681 0 : return NULL;
682 :
683 0 : AssertTXNLsnOrder(rb);
684 :
685 0 : txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
686 :
687 0 : Assert(!txn->is_known_as_subxact);
688 0 : Assert(txn->first_lsn != InvalidXLogRecPtr);
689 0 : return txn;
690 : }
691 :
692 : void
693 0 : ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
694 : {
695 0 : rb->current_restart_decoding_lsn = ptr;
696 0 : }
697 :
698 : void
699 0 : ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
700 : TransactionId subxid, XLogRecPtr lsn)
701 : {
702 : ReorderBufferTXN *txn;
703 : ReorderBufferTXN *subtxn;
704 : bool new_top;
705 : bool new_sub;
706 :
707 0 : txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
708 0 : subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
709 :
710 0 : if (new_sub)
711 : {
712 : /*
713 : * we assign subtransactions to top level transaction even if we don't
714 : * have data for it yet, assignment records frequently reference xids
715 : * that have not yet produced any records. Knowing those aren't top
716 : * level xids allows us to make processing cheaper in some places.
717 : */
718 0 : dlist_push_tail(&txn->subtxns, &subtxn->node);
719 0 : txn->nsubtxns++;
720 : }
721 0 : else if (!subtxn->is_known_as_subxact)
722 : {
723 0 : subtxn->is_known_as_subxact = true;
724 0 : Assert(subtxn->nsubtxns == 0);
725 :
726 : /* remove from lsn order list of top-level transactions */
727 0 : dlist_delete(&subtxn->node);
728 :
729 : /* add to toplevel transaction */
730 0 : dlist_push_tail(&txn->subtxns, &subtxn->node);
731 0 : txn->nsubtxns++;
732 : }
733 0 : else if (new_top)
734 : {
735 0 : elog(ERROR, "existing subxact assigned to unknown toplevel xact");
736 : }
737 0 : }
738 :
739 : /*
740 : * Associate a subtransaction with its toplevel transaction at commit
741 : * time. There may be no further changes added after this.
742 : */
743 : void
744 0 : ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
745 : TransactionId subxid, XLogRecPtr commit_lsn,
746 : XLogRecPtr end_lsn)
747 : {
748 : ReorderBufferTXN *txn;
749 : ReorderBufferTXN *subtxn;
750 :
751 0 : subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
752 : InvalidXLogRecPtr, false);
753 :
754 : /*
755 : * No need to do anything if that subtxn didn't contain any changes
756 : */
757 0 : if (!subtxn)
758 0 : return;
759 :
760 0 : txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true);
761 :
762 0 : if (txn == NULL)
763 0 : elog(ERROR, "subxact logged without previous toplevel record");
764 :
765 : /*
766 : * Pass our base snapshot to the parent transaction if it doesn't have
767 : * one, or ours is older. That can happen if there are no changes in the
768 : * toplevel transaction but in one of the child transactions. This allows
769 : * the parent to simply use its base snapshot initially.
770 : */
771 0 : if (subtxn->base_snapshot != NULL &&
772 0 : (txn->base_snapshot == NULL ||
773 0 : txn->base_snapshot_lsn > subtxn->base_snapshot_lsn))
774 : {
775 0 : txn->base_snapshot = subtxn->base_snapshot;
776 0 : txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
777 0 : subtxn->base_snapshot = NULL;
778 0 : subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
779 : }
780 :
781 0 : subtxn->final_lsn = commit_lsn;
782 0 : subtxn->end_lsn = end_lsn;
783 :
784 0 : if (!subtxn->is_known_as_subxact)
785 : {
786 0 : subtxn->is_known_as_subxact = true;
787 0 : Assert(subtxn->nsubtxns == 0);
788 :
789 : /* remove from lsn order list of top-level transactions */
790 0 : dlist_delete(&subtxn->node);
791 :
792 : /* add to subtransaction list */
793 0 : dlist_push_tail(&txn->subtxns, &subtxn->node);
794 0 : txn->nsubtxns++;
795 : }
796 : }
797 :
798 :
799 : /*
800 : * Support for efficiently iterating over a transaction's and its
801 : * subtransactions' changes.
802 : *
803 : * We do by doing a k-way merge between transactions/subtransactions. For that
804 : * we model the current heads of the different transactions as a binary heap
805 : * so we easily know which (sub-)transaction has the change with the smallest
806 : * lsn next.
807 : *
808 : * We assume the changes in individual transactions are already sorted by LSN.
809 : */
810 :
811 : /*
812 : * Binary heap comparison function.
813 : */
814 : static int
815 0 : ReorderBufferIterCompare(Datum a, Datum b, void *arg)
816 : {
817 0 : ReorderBufferIterTXNState *state = (ReorderBufferIterTXNState *) arg;
818 0 : XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
819 0 : XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
820 :
821 0 : if (pos_a < pos_b)
822 0 : return 1;
823 0 : else if (pos_a == pos_b)
824 0 : return 0;
825 0 : return -1;
826 : }
827 :
828 : /*
829 : * Allocate & initialize an iterator which iterates in lsn order over a
830 : * transaction and all its subtransactions.
831 : */
832 : static ReorderBufferIterTXNState *
833 0 : ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
834 : {
835 0 : Size nr_txns = 0;
836 : ReorderBufferIterTXNState *state;
837 : dlist_iter cur_txn_i;
838 : int32 off;
839 :
840 : /*
841 : * Calculate the size of our heap: one element for every transaction that
842 : * contains changes. (Besides the transactions already in the reorder
843 : * buffer, we count the one we were directly passed.)
844 : */
845 0 : if (txn->nentries > 0)
846 0 : nr_txns++;
847 :
848 0 : dlist_foreach(cur_txn_i, &txn->subtxns)
849 : {
850 : ReorderBufferTXN *cur_txn;
851 :
852 0 : cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
853 :
854 0 : if (cur_txn->nentries > 0)
855 0 : nr_txns++;
856 : }
857 :
858 : /*
859 : * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no
860 : * need to allocate/build a heap then.
861 : */
862 :
863 : /* allocate iteration state */
864 0 : state = (ReorderBufferIterTXNState *)
865 0 : MemoryContextAllocZero(rb->context,
866 : sizeof(ReorderBufferIterTXNState) +
867 0 : sizeof(ReorderBufferIterTXNEntry) * nr_txns);
868 :
869 0 : state->nr_txns = nr_txns;
870 0 : dlist_init(&state->old_change);
871 :
872 0 : for (off = 0; off < state->nr_txns; off++)
873 : {
874 0 : state->entries[off].fd = -1;
875 0 : state->entries[off].segno = 0;
876 : }
877 :
878 : /* allocate heap */
879 0 : state->heap = binaryheap_allocate(state->nr_txns,
880 : ReorderBufferIterCompare,
881 : state);
882 :
883 : /*
884 : * Now insert items into the binary heap, in an unordered fashion. (We
885 : * will run a heap assembly step at the end; this is more efficient.)
886 : */
887 :
888 0 : off = 0;
889 :
890 : /* add toplevel transaction if it contains changes */
891 0 : if (txn->nentries > 0)
892 : {
893 : ReorderBufferChange *cur_change;
894 :
895 0 : if (txn->serialized)
896 : {
897 : /* serialize remaining changes */
898 0 : ReorderBufferSerializeTXN(rb, txn);
899 0 : ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
900 : &state->entries[off].segno);
901 : }
902 :
903 0 : cur_change = dlist_head_element(ReorderBufferChange, node,
904 : &txn->changes);
905 :
906 0 : state->entries[off].lsn = cur_change->lsn;
907 0 : state->entries[off].change = cur_change;
908 0 : state->entries[off].txn = txn;
909 :
910 0 : binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
911 : }
912 :
913 : /* add subtransactions if they contain changes */
914 0 : dlist_foreach(cur_txn_i, &txn->subtxns)
915 : {
916 : ReorderBufferTXN *cur_txn;
917 :
918 0 : cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
919 :
920 0 : if (cur_txn->nentries > 0)
921 : {
922 : ReorderBufferChange *cur_change;
923 :
924 0 : if (cur_txn->serialized)
925 : {
926 : /* serialize remaining changes */
927 0 : ReorderBufferSerializeTXN(rb, cur_txn);
928 0 : ReorderBufferRestoreChanges(rb, cur_txn,
929 : &state->entries[off].fd,
930 : &state->entries[off].segno);
931 : }
932 0 : cur_change = dlist_head_element(ReorderBufferChange, node,
933 : &cur_txn->changes);
934 :
935 0 : state->entries[off].lsn = cur_change->lsn;
936 0 : state->entries[off].change = cur_change;
937 0 : state->entries[off].txn = cur_txn;
938 :
939 0 : binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
940 : }
941 : }
942 :
943 : /* assemble a valid binary heap */
944 0 : binaryheap_build(state->heap);
945 :
946 0 : return state;
947 : }
948 :
949 : /*
950 : * Return the next change when iterating over a transaction and its
951 : * subtransactions.
952 : *
953 : * Returns NULL when no further changes exist.
954 : */
955 : static ReorderBufferChange *
956 0 : ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
957 : {
958 : ReorderBufferChange *change;
959 : ReorderBufferIterTXNEntry *entry;
960 : int32 off;
961 :
962 : /* nothing there anymore */
963 0 : if (state->heap->bh_size == 0)
964 0 : return NULL;
965 :
966 0 : off = DatumGetInt32(binaryheap_first(state->heap));
967 0 : entry = &state->entries[off];
968 :
969 : /* free memory we might have "leaked" in the previous *Next call */
970 0 : if (!dlist_is_empty(&state->old_change))
971 : {
972 0 : change = dlist_container(ReorderBufferChange, node,
973 : dlist_pop_head_node(&state->old_change));
974 0 : ReorderBufferReturnChange(rb, change);
975 0 : Assert(dlist_is_empty(&state->old_change));
976 : }
977 :
978 0 : change = entry->change;
979 :
980 : /*
981 : * update heap with information about which transaction has the next
982 : * relevant change in LSN order
983 : */
984 :
985 : /* there are in-memory changes */
986 0 : if (dlist_has_next(&entry->txn->changes, &entry->change->node))
987 : {
988 0 : dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
989 0 : ReorderBufferChange *next_change =
990 0 : dlist_container(ReorderBufferChange, node, next);
991 :
992 : /* txn stays the same */
993 0 : state->entries[off].lsn = next_change->lsn;
994 0 : state->entries[off].change = next_change;
995 :
996 0 : binaryheap_replace_first(state->heap, Int32GetDatum(off));
997 0 : return change;
998 : }
999 :
1000 : /* try to load changes from disk */
1001 0 : if (entry->txn->nentries != entry->txn->nentries_mem)
1002 : {
1003 : /*
1004 : * Ugly: restoring changes will reuse *Change records, thus delete the
1005 : * current one from the per-tx list and only free in the next call.
1006 : */
1007 0 : dlist_delete(&change->node);
1008 0 : dlist_push_tail(&state->old_change, &change->node);
1009 :
1010 0 : if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
1011 : &state->entries[off].segno))
1012 : {
1013 : /* successfully restored changes from disk */
1014 0 : ReorderBufferChange *next_change =
1015 0 : dlist_head_element(ReorderBufferChange, node,
1016 : &entry->txn->changes);
1017 :
1018 0 : elog(DEBUG2, "restored %u/%u changes from disk",
1019 : (uint32) entry->txn->nentries_mem,
1020 : (uint32) entry->txn->nentries);
1021 :
1022 0 : Assert(entry->txn->nentries_mem);
1023 : /* txn stays the same */
1024 0 : state->entries[off].lsn = next_change->lsn;
1025 0 : state->entries[off].change = next_change;
1026 0 : binaryheap_replace_first(state->heap, Int32GetDatum(off));
1027 :
1028 0 : return change;
1029 : }
1030 : }
1031 :
1032 : /* ok, no changes there anymore, remove */
1033 0 : binaryheap_remove_first(state->heap);
1034 :
1035 0 : return change;
1036 : }
1037 :
1038 : /*
1039 : * Deallocate the iterator
1040 : */
1041 : static void
1042 0 : ReorderBufferIterTXNFinish(ReorderBuffer *rb,
1043 : ReorderBufferIterTXNState *state)
1044 : {
1045 : int32 off;
1046 :
1047 0 : for (off = 0; off < state->nr_txns; off++)
1048 : {
1049 0 : if (state->entries[off].fd != -1)
1050 0 : CloseTransientFile(state->entries[off].fd);
1051 : }
1052 :
1053 : /* free memory we might have "leaked" in the last *Next call */
1054 0 : if (!dlist_is_empty(&state->old_change))
1055 : {
1056 : ReorderBufferChange *change;
1057 :
1058 0 : change = dlist_container(ReorderBufferChange, node,
1059 : dlist_pop_head_node(&state->old_change));
1060 0 : ReorderBufferReturnChange(rb, change);
1061 0 : Assert(dlist_is_empty(&state->old_change));
1062 : }
1063 :
1064 0 : binaryheap_free(state->heap);
1065 0 : pfree(state);
1066 0 : }
1067 :
1068 : /*
1069 : * Cleanup the contents of a transaction, usually after the transaction
1070 : * committed or aborted.
1071 : */
1072 : static void
1073 0 : ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
1074 : {
1075 : bool found;
1076 : dlist_mutable_iter iter;
1077 :
1078 : /* cleanup subtransactions & their changes */
1079 0 : dlist_foreach_modify(iter, &txn->subtxns)
1080 : {
1081 : ReorderBufferTXN *subtxn;
1082 :
1083 0 : subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1084 :
1085 : /*
1086 : * Subtransactions are always associated to the toplevel TXN, even if
1087 : * they originally were happening inside another subtxn, so we won't
1088 : * ever recurse more than one level deep here.
1089 : */
1090 0 : Assert(subtxn->is_known_as_subxact);
1091 0 : Assert(subtxn->nsubtxns == 0);
1092 :
1093 0 : ReorderBufferCleanupTXN(rb, subtxn);
1094 : }
1095 :
1096 : /* cleanup changes in the toplevel txn */
1097 0 : dlist_foreach_modify(iter, &txn->changes)
1098 : {
1099 : ReorderBufferChange *change;
1100 :
1101 0 : change = dlist_container(ReorderBufferChange, node, iter.cur);
1102 :
1103 0 : ReorderBufferReturnChange(rb, change);
1104 : }
1105 :
1106 : /*
1107 : * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1108 : * They are always stored in the toplevel transaction.
1109 : */
1110 0 : dlist_foreach_modify(iter, &txn->tuplecids)
1111 : {
1112 : ReorderBufferChange *change;
1113 :
1114 0 : change = dlist_container(ReorderBufferChange, node, iter.cur);
1115 0 : Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1116 0 : ReorderBufferReturnChange(rb, change);
1117 : }
1118 :
1119 0 : if (txn->base_snapshot != NULL)
1120 : {
1121 0 : SnapBuildSnapDecRefcount(txn->base_snapshot);
1122 0 : txn->base_snapshot = NULL;
1123 0 : txn->base_snapshot_lsn = InvalidXLogRecPtr;
1124 : }
1125 :
1126 : /*
1127 : * Remove TXN from its containing list.
1128 : *
1129 : * Note: if txn->is_known_as_subxact, we are deleting the TXN from its
1130 : * parent's list of known subxacts; this leaves the parent's nsubxacts
1131 : * count too high, but we don't care. Otherwise, we are deleting the TXN
1132 : * from the LSN-ordered list of toplevel TXNs.
1133 : */
1134 0 : dlist_delete(&txn->node);
1135 :
1136 : /* now remove reference from buffer */
1137 0 : hash_search(rb->by_txn,
1138 0 : (void *) &txn->xid,
1139 : HASH_REMOVE,
1140 : &found);
1141 0 : Assert(found);
1142 :
1143 : /* remove entries spilled to disk */
1144 0 : if (txn->serialized)
1145 0 : ReorderBufferRestoreCleanup(rb, txn);
1146 :
1147 : /* deallocate */
1148 0 : ReorderBufferReturnTXN(rb, txn);
1149 0 : }
1150 :
1151 : /*
1152 : * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by
1153 : * tqual.c's HeapTupleSatisfiesHistoricMVCC.
1154 : */
1155 : static void
1156 0 : ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
1157 : {
1158 : dlist_iter iter;
1159 : HASHCTL hash_ctl;
1160 :
1161 0 : if (!txn->has_catalog_changes || dlist_is_empty(&txn->tuplecids))
1162 0 : return;
1163 :
1164 0 : memset(&hash_ctl, 0, sizeof(hash_ctl));
1165 :
1166 0 : hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1167 0 : hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1168 0 : hash_ctl.hcxt = rb->context;
1169 :
1170 : /*
1171 : * create the hash with the exact number of to-be-stored tuplecids from
1172 : * the start
1173 : */
1174 0 : txn->tuplecid_hash =
1175 0 : hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1176 : HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
1177 :
1178 0 : dlist_foreach(iter, &txn->tuplecids)
1179 : {
1180 : ReorderBufferTupleCidKey key;
1181 : ReorderBufferTupleCidEnt *ent;
1182 : bool found;
1183 : ReorderBufferChange *change;
1184 :
1185 0 : change = dlist_container(ReorderBufferChange, node, iter.cur);
1186 :
1187 0 : Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1188 :
1189 : /* be careful about padding */
1190 0 : memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1191 :
1192 0 : key.relnode = change->data.tuplecid.node;
1193 :
1194 0 : ItemPointerCopy(&change->data.tuplecid.tid,
1195 : &key.tid);
1196 :
1197 0 : ent = (ReorderBufferTupleCidEnt *)
1198 0 : hash_search(txn->tuplecid_hash,
1199 : (void *) &key,
1200 : HASH_ENTER | HASH_FIND,
1201 : &found);
1202 0 : if (!found)
1203 : {
1204 0 : ent->cmin = change->data.tuplecid.cmin;
1205 0 : ent->cmax = change->data.tuplecid.cmax;
1206 0 : ent->combocid = change->data.tuplecid.combocid;
1207 : }
1208 : else
1209 : {
1210 0 : Assert(ent->cmin == change->data.tuplecid.cmin);
1211 0 : Assert(ent->cmax == InvalidCommandId ||
1212 : ent->cmax == change->data.tuplecid.cmax);
1213 :
1214 : /*
1215 : * if the tuple got valid in this transaction and now got deleted
1216 : * we already have a valid cmin stored. The cmax will be
1217 : * InvalidCommandId though.
1218 : */
1219 0 : ent->cmax = change->data.tuplecid.cmax;
1220 : }
1221 : }
1222 : }
1223 :
1224 : /*
1225 : * Copy a provided snapshot so we can modify it privately. This is needed so
1226 : * that catalog modifying transactions can look into intermediate catalog
1227 : * states.
1228 : */
1229 : static Snapshot
1230 0 : ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
1231 : ReorderBufferTXN *txn, CommandId cid)
1232 : {
1233 : Snapshot snap;
1234 : dlist_iter iter;
1235 0 : int i = 0;
1236 : Size size;
1237 :
1238 0 : size = sizeof(SnapshotData) +
1239 0 : sizeof(TransactionId) * orig_snap->xcnt +
1240 0 : sizeof(TransactionId) * (txn->nsubtxns + 1);
1241 :
1242 0 : snap = MemoryContextAllocZero(rb->context, size);
1243 0 : memcpy(snap, orig_snap, sizeof(SnapshotData));
1244 :
1245 0 : snap->copied = true;
1246 0 : snap->active_count = 1; /* mark as active so nobody frees it */
1247 0 : snap->regd_count = 0;
1248 0 : snap->xip = (TransactionId *) (snap + 1);
1249 :
1250 0 : memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1251 :
1252 : /*
1253 : * snap->subxip contains all txids that belong to our transaction which we
1254 : * need to check via cmin/cmax. That's why we store the toplevel
1255 : * transaction in there as well.
1256 : */
1257 0 : snap->subxip = snap->xip + snap->xcnt;
1258 0 : snap->subxip[i++] = txn->xid;
1259 :
1260 : /*
1261 : * nsubxcnt isn't decreased when subtransactions abort, so count manually.
1262 : * Since it's an upper boundary it is safe to use it for the allocation
1263 : * above.
1264 : */
1265 0 : snap->subxcnt = 1;
1266 :
1267 0 : dlist_foreach(iter, &txn->subtxns)
1268 : {
1269 : ReorderBufferTXN *sub_txn;
1270 :
1271 0 : sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1272 0 : snap->subxip[i++] = sub_txn->xid;
1273 0 : snap->subxcnt++;
1274 : }
1275 :
1276 : /* sort so we can bsearch() later */
1277 0 : qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1278 :
1279 : /* store the specified current CommandId */
1280 0 : snap->curcid = cid;
1281 :
1282 0 : return snap;
1283 : }
1284 :
1285 : /*
1286 : * Free a previously ReorderBufferCopySnap'ed snapshot
1287 : */
1288 : static void
1289 0 : ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
1290 : {
1291 0 : if (snap->copied)
1292 0 : pfree(snap);
1293 : else
1294 0 : SnapBuildSnapDecRefcount(snap);
1295 0 : }
1296 :
1297 : /*
1298 : * Perform the replay of a transaction and it's non-aborted subtransactions.
1299 : *
1300 : * Subtransactions previously have to be processed by
1301 : * ReorderBufferCommitChild(), even if previously assigned to the toplevel
1302 : * transaction with ReorderBufferAssignChild.
1303 : *
1304 : * We currently can only decode a transaction's contents in when their commit
1305 : * record is read because that's currently the only place where we know about
1306 : * cache invalidations. Thus, once a toplevel commit is read, we iterate over
1307 : * the top and subtransactions (using a k-way merge) and replay the changes in
1308 : * lsn order.
1309 : */
1310 : void
1311 0 : ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
1312 : XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
1313 : TimestampTz commit_time,
1314 : RepOriginId origin_id, XLogRecPtr origin_lsn)
1315 : {
1316 : ReorderBufferTXN *txn;
1317 : volatile Snapshot snapshot_now;
1318 0 : volatile CommandId command_id = FirstCommandId;
1319 : bool using_subtxn;
1320 0 : ReorderBufferIterTXNState *volatile iterstate = NULL;
1321 :
1322 0 : txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1323 : false);
1324 :
1325 : /* unknown transaction, nothing to replay */
1326 0 : if (txn == NULL)
1327 0 : return;
1328 :
1329 0 : txn->final_lsn = commit_lsn;
1330 0 : txn->end_lsn = end_lsn;
1331 0 : txn->commit_time = commit_time;
1332 0 : txn->origin_id = origin_id;
1333 0 : txn->origin_lsn = origin_lsn;
1334 :
1335 : /*
1336 : * If this transaction didn't have any real changes in our database, it's
1337 : * OK not to have a snapshot. Note that ReorderBufferCommitChild will have
1338 : * transferred its snapshot to this transaction if it had one and the
1339 : * toplevel tx didn't.
1340 : */
1341 0 : if (txn->base_snapshot == NULL)
1342 : {
1343 0 : Assert(txn->ninvalidations == 0);
1344 0 : ReorderBufferCleanupTXN(rb, txn);
1345 0 : return;
1346 : }
1347 :
1348 0 : snapshot_now = txn->base_snapshot;
1349 :
1350 : /* build data to be able to lookup the CommandIds of catalog tuples */
1351 0 : ReorderBufferBuildTupleCidHash(rb, txn);
1352 :
1353 : /* setup the initial snapshot */
1354 0 : SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1355 :
1356 : /*
1357 : * Decoding needs access to syscaches et al., which in turn use
1358 : * heavyweight locks and such. Thus we need to have enough state around to
1359 : * keep track of those. The easiest way is to simply use a transaction
1360 : * internally. That also allows us to easily enforce that nothing writes
1361 : * to the database by checking for xid assignments.
1362 : *
1363 : * When we're called via the SQL SRF there's already a transaction
1364 : * started, so start an explicit subtransaction there.
1365 : */
1366 0 : using_subtxn = IsTransactionOrTransactionBlock();
1367 :
1368 0 : PG_TRY();
1369 : {
1370 : ReorderBufferChange *change;
1371 0 : ReorderBufferChange *specinsert = NULL;
1372 :
1373 0 : if (using_subtxn)
1374 0 : BeginInternalSubTransaction("replay");
1375 : else
1376 0 : StartTransactionCommand();
1377 :
1378 0 : rb->begin(rb, txn);
1379 :
1380 0 : iterstate = ReorderBufferIterTXNInit(rb, txn);
1381 0 : while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1382 : {
1383 0 : Relation relation = NULL;
1384 : Oid reloid;
1385 :
1386 0 : switch (change->action)
1387 : {
1388 : case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
1389 :
1390 : /*
1391 : * Confirmation for speculative insertion arrived. Simply
1392 : * use as a normal record. It'll be cleaned up at the end
1393 : * of INSERT processing.
1394 : */
1395 0 : Assert(specinsert->data.tp.oldtuple == NULL);
1396 0 : change = specinsert;
1397 0 : change->action = REORDER_BUFFER_CHANGE_INSERT;
1398 :
1399 : /* intentionally fall through */
1400 : case REORDER_BUFFER_CHANGE_INSERT:
1401 : case REORDER_BUFFER_CHANGE_UPDATE:
1402 : case REORDER_BUFFER_CHANGE_DELETE:
1403 0 : Assert(snapshot_now);
1404 :
1405 0 : reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1406 : change->data.tp.relnode.relNode);
1407 :
1408 : /*
1409 : * Catalog tuple without data, emitted while catalog was
1410 : * in the process of being rewritten.
1411 : */
1412 0 : if (reloid == InvalidOid &&
1413 0 : change->data.tp.newtuple == NULL &&
1414 0 : change->data.tp.oldtuple == NULL)
1415 : goto change_done;
1416 0 : else if (reloid == InvalidOid)
1417 0 : elog(ERROR, "could not map filenode \"%s\" to relation OID",
1418 : relpathperm(change->data.tp.relnode,
1419 : MAIN_FORKNUM));
1420 :
1421 0 : relation = RelationIdGetRelation(reloid);
1422 :
1423 0 : if (relation == NULL)
1424 0 : elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1425 : reloid,
1426 : relpathperm(change->data.tp.relnode,
1427 : MAIN_FORKNUM));
1428 :
1429 0 : if (!RelationIsLogicallyLogged(relation))
1430 : goto change_done;
1431 :
1432 : /*
1433 : * For now ignore sequence changes entirely. Most of the
1434 : * time they don't log changes using records we
1435 : * understand, so it doesn't make sense to handle the few
1436 : * cases we do.
1437 : */
1438 0 : if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1439 0 : goto change_done;
1440 :
1441 : /* user-triggered change */
1442 0 : if (!IsToastRelation(relation))
1443 : {
1444 0 : ReorderBufferToastReplace(rb, txn, relation, change);
1445 0 : rb->apply_change(rb, txn, relation, change);
1446 :
1447 : /*
1448 : * Only clear reassembled toast chunks if we're sure
1449 : * they're not required anymore. The creator of the
1450 : * tuple tells us.
1451 : */
1452 0 : if (change->data.tp.clear_toast_afterwards)
1453 0 : ReorderBufferToastReset(rb, txn);
1454 : }
1455 : /* we're not interested in toast deletions */
1456 0 : else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1457 : {
1458 : /*
1459 : * Need to reassemble the full toasted Datum in
1460 : * memory, to ensure the chunks don't get reused till
1461 : * we're done remove it from the list of this
1462 : * transaction's changes. Otherwise it will get
1463 : * freed/reused while restoring spooled data from
1464 : * disk.
1465 : */
1466 0 : dlist_delete(&change->node);
1467 0 : ReorderBufferToastAppendChunk(rb, txn, relation,
1468 : change);
1469 : }
1470 :
1471 : change_done:
1472 :
1473 : /*
1474 : * Either speculative insertion was confirmed, or it was
1475 : * unsuccessful and the record isn't needed anymore.
1476 : */
1477 0 : if (specinsert != NULL)
1478 : {
1479 0 : ReorderBufferReturnChange(rb, specinsert);
1480 0 : specinsert = NULL;
1481 : }
1482 :
1483 0 : if (relation != NULL)
1484 : {
1485 0 : RelationClose(relation);
1486 0 : relation = NULL;
1487 : }
1488 0 : break;
1489 :
1490 : case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
1491 :
1492 : /*
1493 : * Speculative insertions are dealt with by delaying the
1494 : * processing of the insert until the confirmation record
1495 : * arrives. For that we simply unlink the record from the
1496 : * chain, so it does not get freed/reused while restoring
1497 : * spooled data from disk.
1498 : *
1499 : * This is safe in the face of concurrent catalog changes
1500 : * because the relevant relation can't be changed between
1501 : * speculative insertion and confirmation due to
1502 : * CheckTableNotInUse() and locking.
1503 : */
1504 :
1505 : /* clear out a pending (and thus failed) speculation */
1506 0 : if (specinsert != NULL)
1507 : {
1508 0 : ReorderBufferReturnChange(rb, specinsert);
1509 0 : specinsert = NULL;
1510 : }
1511 :
1512 : /* and memorize the pending insertion */
1513 0 : dlist_delete(&change->node);
1514 0 : specinsert = change;
1515 0 : break;
1516 :
1517 : case REORDER_BUFFER_CHANGE_MESSAGE:
1518 0 : rb->message(rb, txn, change->lsn, true,
1519 0 : change->data.msg.prefix,
1520 : change->data.msg.message_size,
1521 0 : change->data.msg.message);
1522 0 : break;
1523 :
1524 : case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
1525 : /* get rid of the old */
1526 0 : TeardownHistoricSnapshot(false);
1527 :
1528 0 : if (snapshot_now->copied)
1529 : {
1530 0 : ReorderBufferFreeSnap(rb, snapshot_now);
1531 0 : snapshot_now =
1532 0 : ReorderBufferCopySnap(rb, change->data.snapshot,
1533 : txn, command_id);
1534 : }
1535 :
1536 : /*
1537 : * Restored from disk, need to be careful not to double
1538 : * free. We could introduce refcounting for that, but for
1539 : * now this seems infrequent enough not to care.
1540 : */
1541 0 : else if (change->data.snapshot->copied)
1542 : {
1543 0 : snapshot_now =
1544 0 : ReorderBufferCopySnap(rb, change->data.snapshot,
1545 : txn, command_id);
1546 : }
1547 : else
1548 : {
1549 0 : snapshot_now = change->data.snapshot;
1550 : }
1551 :
1552 :
1553 : /* and continue with the new one */
1554 0 : SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1555 0 : break;
1556 :
1557 : case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
1558 0 : Assert(change->data.command_id != InvalidCommandId);
1559 :
1560 0 : if (command_id < change->data.command_id)
1561 : {
1562 0 : command_id = change->data.command_id;
1563 :
1564 0 : if (!snapshot_now->copied)
1565 : {
1566 : /* we don't use the global one anymore */
1567 0 : snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1568 : txn, command_id);
1569 : }
1570 :
1571 0 : snapshot_now->curcid = command_id;
1572 :
1573 0 : TeardownHistoricSnapshot(false);
1574 0 : SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1575 :
1576 : /*
1577 : * Every time the CommandId is incremented, we could
1578 : * see new catalog contents, so execute all
1579 : * invalidations.
1580 : */
1581 0 : ReorderBufferExecuteInvalidations(rb, txn);
1582 : }
1583 :
1584 0 : break;
1585 :
1586 : case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
1587 0 : elog(ERROR, "tuplecid value in changequeue");
1588 : break;
1589 : }
1590 : }
1591 :
1592 : /*
1593 : * There's a speculative insertion remaining, just clean in up, it
1594 : * can't have been successful, otherwise we'd gotten a confirmation
1595 : * record.
1596 : */
1597 0 : if (specinsert)
1598 : {
1599 0 : ReorderBufferReturnChange(rb, specinsert);
1600 0 : specinsert = NULL;
1601 : }
1602 :
1603 : /* clean up the iterator */
1604 0 : ReorderBufferIterTXNFinish(rb, iterstate);
1605 0 : iterstate = NULL;
1606 :
1607 : /* call commit callback */
1608 0 : rb->commit(rb, txn, commit_lsn);
1609 :
1610 : /* this is just a sanity check against bad output plugin behaviour */
1611 0 : if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
1612 0 : elog(ERROR, "output plugin used XID %u",
1613 : GetCurrentTransactionId());
1614 :
1615 : /* cleanup */
1616 0 : TeardownHistoricSnapshot(false);
1617 :
1618 : /*
1619 : * Aborting the current (sub-)transaction as a whole has the right
1620 : * semantics. We want all locks acquired in here to be released, not
1621 : * reassigned to the parent and we do not want any database access
1622 : * have persistent effects.
1623 : */
1624 0 : AbortCurrentTransaction();
1625 :
1626 : /* make sure there's no cache pollution */
1627 0 : ReorderBufferExecuteInvalidations(rb, txn);
1628 :
1629 0 : if (using_subtxn)
1630 0 : RollbackAndReleaseCurrentSubTransaction();
1631 :
1632 0 : if (snapshot_now->copied)
1633 0 : ReorderBufferFreeSnap(rb, snapshot_now);
1634 :
1635 : /* remove potential on-disk data, and deallocate */
1636 0 : ReorderBufferCleanupTXN(rb, txn);
1637 : }
1638 0 : PG_CATCH();
1639 : {
1640 : /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1641 0 : if (iterstate)
1642 0 : ReorderBufferIterTXNFinish(rb, iterstate);
1643 :
1644 0 : TeardownHistoricSnapshot(true);
1645 :
1646 : /*
1647 : * Force cache invalidation to happen outside of a valid transaction
1648 : * to prevent catalog access as we just caught an error.
1649 : */
1650 0 : AbortCurrentTransaction();
1651 :
1652 : /* make sure there's no cache pollution */
1653 0 : ReorderBufferExecuteInvalidations(rb, txn);
1654 :
1655 0 : if (using_subtxn)
1656 0 : RollbackAndReleaseCurrentSubTransaction();
1657 :
1658 0 : if (snapshot_now->copied)
1659 0 : ReorderBufferFreeSnap(rb, snapshot_now);
1660 :
1661 : /* remove potential on-disk data, and deallocate */
1662 0 : ReorderBufferCleanupTXN(rb, txn);
1663 :
1664 0 : PG_RE_THROW();
1665 : }
1666 0 : PG_END_TRY();
1667 : }
1668 :
1669 : /*
1670 : * Abort a transaction that possibly has previous changes. Needs to be first
1671 : * called for subtransactions and then for the toplevel xid.
1672 : *
1673 : * NB: Transactions handled here have to have actively aborted (i.e. have
1674 : * produced an abort record). Implicitly aborted transactions are handled via
1675 : * ReorderBufferAbortOld(); transactions we're just not interested in, but
1676 : * which have committed are handled in ReorderBufferForget().
1677 : *
1678 : * This function purges this transaction and its contents from memory and
1679 : * disk.
1680 : */
1681 : void
1682 0 : ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
1683 : {
1684 : ReorderBufferTXN *txn;
1685 :
1686 0 : txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1687 : false);
1688 :
1689 : /* unknown, nothing to remove */
1690 0 : if (txn == NULL)
1691 0 : return;
1692 :
1693 : /* cosmetic... */
1694 0 : txn->final_lsn = lsn;
1695 :
1696 : /* remove potential on-disk data, and deallocate */
1697 0 : ReorderBufferCleanupTXN(rb, txn);
1698 : }
1699 :
1700 : /*
1701 : * Abort all transactions that aren't actually running anymore because the
1702 : * server restarted.
1703 : *
1704 : * NB: These really have to be transactions that have aborted due to a server
1705 : * crash/immediate restart, as we don't deal with invalidations here.
1706 : */
1707 : void
1708 0 : ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
1709 : {
1710 : dlist_mutable_iter it;
1711 :
1712 : /*
1713 : * Iterate through all (potential) toplevel TXNs and abort all that are
1714 : * older than what possibly can be running. Once we've found the first
1715 : * that is alive we stop, there might be some that acquired an xid earlier
1716 : * but started writing later, but it's unlikely and they will cleaned up
1717 : * in a later call to ReorderBufferAbortOld().
1718 : */
1719 0 : dlist_foreach_modify(it, &rb->toplevel_by_lsn)
1720 : {
1721 : ReorderBufferTXN *txn;
1722 :
1723 0 : txn = dlist_container(ReorderBufferTXN, node, it.cur);
1724 :
1725 0 : if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1726 : {
1727 0 : elog(DEBUG2, "aborting old transaction %u", txn->xid);
1728 :
1729 : /* remove potential on-disk data, and deallocate this tx */
1730 0 : ReorderBufferCleanupTXN(rb, txn);
1731 : }
1732 : else
1733 0 : return;
1734 : }
1735 : }
1736 :
1737 : /*
1738 : * Forget the contents of a transaction if we aren't interested in it's
1739 : * contents. Needs to be first called for subtransactions and then for the
1740 : * toplevel xid.
1741 : *
1742 : * This is significantly different to ReorderBufferAbort() because
1743 : * transactions that have committed need to be treated differently from aborted
1744 : * ones since they may have modified the catalog.
1745 : *
1746 : * Note that this is only allowed to be called in the moment a transaction
1747 : * commit has just been read, not earlier; otherwise later records referring
1748 : * to this xid might re-create the transaction incompletely.
1749 : */
1750 : void
1751 0 : ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
1752 : {
1753 : ReorderBufferTXN *txn;
1754 :
1755 0 : txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1756 : false);
1757 :
1758 : /* unknown, nothing to forget */
1759 0 : if (txn == NULL)
1760 0 : return;
1761 :
1762 : /* cosmetic... */
1763 0 : txn->final_lsn = lsn;
1764 :
1765 : /*
1766 : * Process cache invalidation messages if there are any. Even if we're not
1767 : * interested in the transaction's contents, it could have manipulated the
1768 : * catalog and we need to update the caches according to that.
1769 : */
1770 0 : if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
1771 0 : ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
1772 : txn->invalidations);
1773 : else
1774 0 : Assert(txn->ninvalidations == 0);
1775 :
1776 : /* remove potential on-disk data, and deallocate */
1777 0 : ReorderBufferCleanupTXN(rb, txn);
1778 : }
1779 :
1780 : /*
1781 : * Execute invalidations happening outside the context of a decoded
1782 : * transaction. That currently happens either for xid-less commits
1783 : * (c.f. RecordTransactionCommit()) or for invalidations in uninteresting
1784 : * transactions (via ReorderBufferForget()).
1785 : */
1786 : void
1787 0 : ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
1788 : SharedInvalidationMessage *invalidations)
1789 : {
1790 0 : bool use_subtxn = IsTransactionOrTransactionBlock();
1791 : int i;
1792 :
1793 0 : if (use_subtxn)
1794 0 : BeginInternalSubTransaction("replay");
1795 :
1796 : /*
1797 : * Force invalidations to happen outside of a valid transaction - that way
1798 : * entries will just be marked as invalid without accessing the catalog.
1799 : * That's advantageous because we don't need to setup the full state
1800 : * necessary for catalog access.
1801 : */
1802 0 : if (use_subtxn)
1803 0 : AbortCurrentTransaction();
1804 :
1805 0 : for (i = 0; i < ninvalidations; i++)
1806 0 : LocalExecuteInvalidationMessage(&invalidations[i]);
1807 :
1808 0 : if (use_subtxn)
1809 0 : RollbackAndReleaseCurrentSubTransaction();
1810 0 : }
1811 :
1812 : /*
1813 : * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
1814 : * least once for every xid in XLogRecord->xl_xid (other places in records
1815 : * may, but do not have to be passed through here).
1816 : *
1817 : * Reorderbuffer keeps some datastructures about transactions in LSN order,
1818 : * for efficiency. To do that it has to know about when transactions are seen
1819 : * first in the WAL. As many types of records are not actually interesting for
1820 : * logical decoding, they do not necessarily pass though here.
1821 : */
1822 : void
1823 0 : ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
1824 : {
1825 : /* many records won't have an xid assigned, centralize check here */
1826 0 : if (xid != InvalidTransactionId)
1827 0 : ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1828 0 : }
1829 :
1830 : /*
1831 : * Add a new snapshot to this transaction that may only used after lsn 'lsn'
1832 : * because the previous snapshot doesn't describe the catalog correctly for
1833 : * following rows.
1834 : */
1835 : void
1836 0 : ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid,
1837 : XLogRecPtr lsn, Snapshot snap)
1838 : {
1839 0 : ReorderBufferChange *change = ReorderBufferGetChange(rb);
1840 :
1841 0 : change->data.snapshot = snap;
1842 0 : change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT;
1843 :
1844 0 : ReorderBufferQueueChange(rb, xid, lsn, change);
1845 0 : }
1846 :
1847 : /*
1848 : * Setup the base snapshot of a transaction. The base snapshot is the snapshot
1849 : * that is used to decode all changes until either this transaction modifies
1850 : * the catalog or another catalog modifying transaction commits.
1851 : *
1852 : * Needs to be called before any changes are added with
1853 : * ReorderBufferQueueChange().
1854 : */
1855 : void
1856 0 : ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
1857 : XLogRecPtr lsn, Snapshot snap)
1858 : {
1859 : ReorderBufferTXN *txn;
1860 : bool is_new;
1861 :
1862 0 : txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
1863 0 : Assert(txn->base_snapshot == NULL);
1864 0 : Assert(snap != NULL);
1865 :
1866 0 : txn->base_snapshot = snap;
1867 0 : txn->base_snapshot_lsn = lsn;
1868 0 : }
1869 :
1870 : /*
1871 : * Access the catalog with this CommandId at this point in the changestream.
1872 : *
1873 : * May only be called for command ids > 1
1874 : */
1875 : void
1876 0 : ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
1877 : XLogRecPtr lsn, CommandId cid)
1878 : {
1879 0 : ReorderBufferChange *change = ReorderBufferGetChange(rb);
1880 :
1881 0 : change->data.command_id = cid;
1882 0 : change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID;
1883 :
1884 0 : ReorderBufferQueueChange(rb, xid, lsn, change);
1885 0 : }
1886 :
1887 :
1888 : /*
1889 : * Add new (relfilenode, tid) -> (cmin, cmax) mappings.
1890 : */
1891 : void
1892 0 : ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
1893 : XLogRecPtr lsn, RelFileNode node,
1894 : ItemPointerData tid, CommandId cmin,
1895 : CommandId cmax, CommandId combocid)
1896 : {
1897 0 : ReorderBufferChange *change = ReorderBufferGetChange(rb);
1898 : ReorderBufferTXN *txn;
1899 :
1900 0 : txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1901 :
1902 0 : change->data.tuplecid.node = node;
1903 0 : change->data.tuplecid.tid = tid;
1904 0 : change->data.tuplecid.cmin = cmin;
1905 0 : change->data.tuplecid.cmax = cmax;
1906 0 : change->data.tuplecid.combocid = combocid;
1907 0 : change->lsn = lsn;
1908 0 : change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID;
1909 :
1910 0 : dlist_push_tail(&txn->tuplecids, &change->node);
1911 0 : txn->ntuplecids++;
1912 0 : }
1913 :
1914 : /*
1915 : * Setup the invalidation of the toplevel transaction.
1916 : *
1917 : * This needs to be done before ReorderBufferCommit is called!
1918 : */
1919 : void
1920 0 : ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
1921 : XLogRecPtr lsn, Size nmsgs,
1922 : SharedInvalidationMessage *msgs)
1923 : {
1924 : ReorderBufferTXN *txn;
1925 :
1926 0 : txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1927 :
1928 0 : if (txn->ninvalidations != 0)
1929 0 : elog(ERROR, "only ever add one set of invalidations");
1930 :
1931 0 : Assert(nmsgs > 0);
1932 :
1933 0 : txn->ninvalidations = nmsgs;
1934 0 : txn->invalidations = (SharedInvalidationMessage *)
1935 0 : MemoryContextAlloc(rb->context,
1936 : sizeof(SharedInvalidationMessage) * nmsgs);
1937 0 : memcpy(txn->invalidations, msgs,
1938 : sizeof(SharedInvalidationMessage) * nmsgs);
1939 0 : }
1940 :
1941 : /*
1942 : * Apply all invalidations we know. Possibly we only need parts at this point
1943 : * in the changestream but we don't know which those are.
1944 : */
1945 : static void
1946 0 : ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
1947 : {
1948 : int i;
1949 :
1950 0 : for (i = 0; i < txn->ninvalidations; i++)
1951 0 : LocalExecuteInvalidationMessage(&txn->invalidations[i]);
1952 0 : }
1953 :
1954 : /*
1955 : * Mark a transaction as containing catalog changes
1956 : */
1957 : void
1958 0 : ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
1959 : XLogRecPtr lsn)
1960 : {
1961 : ReorderBufferTXN *txn;
1962 :
1963 0 : txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1964 :
1965 0 : txn->has_catalog_changes = true;
1966 0 : }
1967 :
1968 : /*
1969 : * Query whether a transaction is already *known* to contain catalog
1970 : * changes. This can be wrong until directly before the commit!
1971 : */
1972 : bool
1973 0 : ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
1974 : {
1975 : ReorderBufferTXN *txn;
1976 :
1977 0 : txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1978 : false);
1979 0 : if (txn == NULL)
1980 0 : return false;
1981 :
1982 0 : return txn->has_catalog_changes;
1983 : }
1984 :
1985 : /*
1986 : * Have we already added the first snapshot?
1987 : */
1988 : bool
1989 0 : ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
1990 : {
1991 : ReorderBufferTXN *txn;
1992 :
1993 0 : txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1994 : false);
1995 :
1996 : /* transaction isn't known yet, ergo no snapshot */
1997 0 : if (txn == NULL)
1998 0 : return false;
1999 :
2000 : /*
2001 : * TODO: It would be a nice improvement if we would check the toplevel
2002 : * transaction in subtransactions, but we'd need to keep track of a bit
2003 : * more state.
2004 : */
2005 0 : return txn->base_snapshot != NULL;
2006 : }
2007 :
2008 :
2009 : /*
2010 : * ---------------------------------------
2011 : * Disk serialization support
2012 : * ---------------------------------------
2013 : */
2014 :
2015 : /*
2016 : * Ensure the IO buffer is >= sz.
2017 : */
2018 : static void
2019 0 : ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
2020 : {
2021 0 : if (!rb->outbufsize)
2022 : {
2023 0 : rb->outbuf = MemoryContextAlloc(rb->context, sz);
2024 0 : rb->outbufsize = sz;
2025 : }
2026 0 : else if (rb->outbufsize < sz)
2027 : {
2028 0 : rb->outbuf = repalloc(rb->outbuf, sz);
2029 0 : rb->outbufsize = sz;
2030 : }
2031 0 : }
2032 :
2033 : /*
2034 : * Check whether the transaction tx should spill its data to disk.
2035 : */
2036 : static void
2037 0 : ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
2038 : {
2039 : /*
2040 : * TODO: improve accounting so we cheaply can take subtransactions into
2041 : * account here.
2042 : */
2043 0 : if (txn->nentries_mem >= max_changes_in_memory)
2044 : {
2045 0 : ReorderBufferSerializeTXN(rb, txn);
2046 0 : Assert(txn->nentries_mem == 0);
2047 : }
2048 0 : }
2049 :
2050 : /*
2051 : * Spill data of a large transaction (and its subtransactions) to disk.
2052 : */
2053 : static void
2054 0 : ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
2055 : {
2056 : dlist_iter subtxn_i;
2057 : dlist_mutable_iter change_i;
2058 0 : int fd = -1;
2059 0 : XLogSegNo curOpenSegNo = 0;
2060 0 : Size spilled = 0;
2061 : char path[MAXPGPATH];
2062 :
2063 0 : elog(DEBUG2, "spill %u changes in XID %u to disk",
2064 : (uint32) txn->nentries_mem, txn->xid);
2065 :
2066 : /* do the same to all child TXs */
2067 0 : dlist_foreach(subtxn_i, &txn->subtxns)
2068 : {
2069 : ReorderBufferTXN *subtxn;
2070 :
2071 0 : subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
2072 0 : ReorderBufferSerializeTXN(rb, subtxn);
2073 : }
2074 :
2075 : /* serialize changestream */
2076 0 : dlist_foreach_modify(change_i, &txn->changes)
2077 : {
2078 : ReorderBufferChange *change;
2079 :
2080 0 : change = dlist_container(ReorderBufferChange, node, change_i.cur);
2081 :
2082 : /*
2083 : * store in segment in which it belongs by start lsn, don't split over
2084 : * multiple segments tho
2085 : */
2086 0 : if (fd == -1 || !XLByteInSeg(change->lsn, curOpenSegNo))
2087 : {
2088 : XLogRecPtr recptr;
2089 :
2090 0 : if (fd != -1)
2091 0 : CloseTransientFile(fd);
2092 :
2093 0 : XLByteToSeg(change->lsn, curOpenSegNo);
2094 0 : XLogSegNoOffsetToRecPtr(curOpenSegNo, 0, recptr);
2095 :
2096 : /*
2097 : * No need to care about TLIs here, only used during a single run,
2098 : * so each LSN only maps to a specific WAL record.
2099 : */
2100 0 : sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2101 0 : NameStr(MyReplicationSlot->data.name), txn->xid,
2102 0 : (uint32) (recptr >> 32), (uint32) recptr);
2103 :
2104 : /* open segment, create it if necessary */
2105 0 : fd = OpenTransientFile(path,
2106 : O_CREAT | O_WRONLY | O_APPEND | PG_BINARY,
2107 : S_IRUSR | S_IWUSR);
2108 :
2109 0 : if (fd < 0)
2110 0 : ereport(ERROR,
2111 : (errcode_for_file_access(),
2112 : errmsg("could not open file \"%s\": %m",
2113 : path)));
2114 : }
2115 :
2116 0 : ReorderBufferSerializeChange(rb, txn, fd, change);
2117 0 : dlist_delete(&change->node);
2118 0 : ReorderBufferReturnChange(rb, change);
2119 :
2120 0 : spilled++;
2121 : }
2122 :
2123 0 : Assert(spilled == txn->nentries_mem);
2124 0 : Assert(dlist_is_empty(&txn->changes));
2125 0 : txn->nentries_mem = 0;
2126 0 : txn->serialized = true;
2127 :
2128 0 : if (fd != -1)
2129 0 : CloseTransientFile(fd);
2130 0 : }
2131 :
2132 : /*
2133 : * Serialize individual change to disk.
2134 : */
2135 : static void
2136 0 : ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
2137 : int fd, ReorderBufferChange *change)
2138 : {
2139 : ReorderBufferDiskChange *ondisk;
2140 0 : Size sz = sizeof(ReorderBufferDiskChange);
2141 :
2142 0 : ReorderBufferSerializeReserve(rb, sz);
2143 :
2144 0 : ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2145 0 : memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
2146 :
2147 0 : switch (change->action)
2148 : {
2149 : /* fall through these, they're all similar enough */
2150 : case REORDER_BUFFER_CHANGE_INSERT:
2151 : case REORDER_BUFFER_CHANGE_UPDATE:
2152 : case REORDER_BUFFER_CHANGE_DELETE:
2153 : case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
2154 : {
2155 : char *data;
2156 : ReorderBufferTupleBuf *oldtup,
2157 : *newtup;
2158 0 : Size oldlen = 0;
2159 0 : Size newlen = 0;
2160 :
2161 0 : oldtup = change->data.tp.oldtuple;
2162 0 : newtup = change->data.tp.newtuple;
2163 :
2164 0 : if (oldtup)
2165 : {
2166 0 : sz += sizeof(HeapTupleData);
2167 0 : oldlen = oldtup->tuple.t_len;
2168 0 : sz += oldlen;
2169 : }
2170 :
2171 0 : if (newtup)
2172 : {
2173 0 : sz += sizeof(HeapTupleData);
2174 0 : newlen = newtup->tuple.t_len;
2175 0 : sz += newlen;
2176 : }
2177 :
2178 : /* make sure we have enough space */
2179 0 : ReorderBufferSerializeReserve(rb, sz);
2180 :
2181 0 : data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2182 : /* might have been reallocated above */
2183 0 : ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2184 :
2185 0 : if (oldlen)
2186 : {
2187 0 : memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
2188 0 : data += sizeof(HeapTupleData);
2189 :
2190 0 : memcpy(data, oldtup->tuple.t_data, oldlen);
2191 0 : data += oldlen;
2192 : }
2193 :
2194 0 : if (newlen)
2195 : {
2196 0 : memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
2197 0 : data += sizeof(HeapTupleData);
2198 :
2199 0 : memcpy(data, newtup->tuple.t_data, newlen);
2200 0 : data += newlen;
2201 : }
2202 0 : break;
2203 : }
2204 : case REORDER_BUFFER_CHANGE_MESSAGE:
2205 : {
2206 : char *data;
2207 0 : Size prefix_size = strlen(change->data.msg.prefix) + 1;
2208 :
2209 0 : sz += prefix_size + change->data.msg.message_size +
2210 : sizeof(Size) + sizeof(Size);
2211 0 : ReorderBufferSerializeReserve(rb, sz);
2212 :
2213 0 : data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2214 :
2215 : /* might have been reallocated above */
2216 0 : ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2217 :
2218 : /* write the prefix including the size */
2219 0 : memcpy(data, &prefix_size, sizeof(Size));
2220 0 : data += sizeof(Size);
2221 0 : memcpy(data, change->data.msg.prefix,
2222 : prefix_size);
2223 0 : data += prefix_size;
2224 :
2225 : /* write the message including the size */
2226 0 : memcpy(data, &change->data.msg.message_size, sizeof(Size));
2227 0 : data += sizeof(Size);
2228 0 : memcpy(data, change->data.msg.message,
2229 : change->data.msg.message_size);
2230 0 : data += change->data.msg.message_size;
2231 :
2232 0 : break;
2233 : }
2234 : case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
2235 : {
2236 : Snapshot snap;
2237 : char *data;
2238 :
2239 0 : snap = change->data.snapshot;
2240 :
2241 0 : sz += sizeof(SnapshotData) +
2242 0 : sizeof(TransactionId) * snap->xcnt +
2243 0 : sizeof(TransactionId) * snap->subxcnt
2244 : ;
2245 :
2246 : /* make sure we have enough space */
2247 0 : ReorderBufferSerializeReserve(rb, sz);
2248 0 : data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2249 : /* might have been reallocated above */
2250 0 : ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2251 :
2252 0 : memcpy(data, snap, sizeof(SnapshotData));
2253 0 : data += sizeof(SnapshotData);
2254 :
2255 0 : if (snap->xcnt)
2256 : {
2257 0 : memcpy(data, snap->xip,
2258 0 : sizeof(TransactionId) * snap->xcnt);
2259 0 : data += sizeof(TransactionId) * snap->xcnt;
2260 : }
2261 :
2262 0 : if (snap->subxcnt)
2263 : {
2264 0 : memcpy(data, snap->subxip,
2265 0 : sizeof(TransactionId) * snap->subxcnt);
2266 0 : data += sizeof(TransactionId) * snap->subxcnt;
2267 : }
2268 0 : break;
2269 : }
2270 : case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
2271 : case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
2272 : case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
2273 : /* ReorderBufferChange contains everything important */
2274 0 : break;
2275 : }
2276 :
2277 0 : ondisk->size = sz;
2278 :
2279 0 : pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
2280 0 : if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
2281 : {
2282 0 : int save_errno = errno;
2283 :
2284 0 : CloseTransientFile(fd);
2285 0 : errno = save_errno;
2286 0 : ereport(ERROR,
2287 : (errcode_for_file_access(),
2288 : errmsg("could not write to data file for XID %u: %m",
2289 : txn->xid)));
2290 : }
2291 0 : pgstat_report_wait_end();
2292 :
2293 0 : Assert(ondisk->change.action == change->action);
2294 0 : }
2295 :
2296 : /*
2297 : * Restore a number of changes spilled to disk back into memory.
2298 : */
2299 : static Size
2300 0 : ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
2301 : int *fd, XLogSegNo *segno)
2302 : {
2303 0 : Size restored = 0;
2304 : XLogSegNo last_segno;
2305 : dlist_mutable_iter cleanup_iter;
2306 :
2307 0 : Assert(txn->first_lsn != InvalidXLogRecPtr);
2308 0 : Assert(txn->final_lsn != InvalidXLogRecPtr);
2309 :
2310 : /* free current entries, so we have memory for more */
2311 0 : dlist_foreach_modify(cleanup_iter, &txn->changes)
2312 : {
2313 0 : ReorderBufferChange *cleanup =
2314 0 : dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
2315 :
2316 0 : dlist_delete(&cleanup->node);
2317 0 : ReorderBufferReturnChange(rb, cleanup);
2318 : }
2319 0 : txn->nentries_mem = 0;
2320 0 : Assert(dlist_is_empty(&txn->changes));
2321 :
2322 0 : XLByteToSeg(txn->final_lsn, last_segno);
2323 :
2324 0 : while (restored < max_changes_in_memory && *segno <= last_segno)
2325 : {
2326 : int readBytes;
2327 : ReorderBufferDiskChange *ondisk;
2328 :
2329 0 : if (*fd == -1)
2330 : {
2331 : XLogRecPtr recptr;
2332 : char path[MAXPGPATH];
2333 :
2334 : /* first time in */
2335 0 : if (*segno == 0)
2336 : {
2337 0 : XLByteToSeg(txn->first_lsn, *segno);
2338 : }
2339 :
2340 0 : Assert(*segno != 0 || dlist_is_empty(&txn->changes));
2341 0 : XLogSegNoOffsetToRecPtr(*segno, 0, recptr);
2342 :
2343 : /*
2344 : * No need to care about TLIs here, only used during a single run,
2345 : * so each LSN only maps to a specific WAL record.
2346 : */
2347 0 : sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2348 0 : NameStr(MyReplicationSlot->data.name), txn->xid,
2349 0 : (uint32) (recptr >> 32), (uint32) recptr);
2350 :
2351 0 : *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
2352 0 : if (*fd < 0 && errno == ENOENT)
2353 : {
2354 0 : *fd = -1;
2355 0 : (*segno)++;
2356 0 : continue;
2357 : }
2358 0 : else if (*fd < 0)
2359 0 : ereport(ERROR,
2360 : (errcode_for_file_access(),
2361 : errmsg("could not open file \"%s\": %m",
2362 : path)));
2363 :
2364 : }
2365 :
2366 : /*
2367 : * Read the statically sized part of a change which has information
2368 : * about the total size. If we couldn't read a record, we're at the
2369 : * end of this file.
2370 : */
2371 0 : ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
2372 0 : pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
2373 0 : readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
2374 0 : pgstat_report_wait_end();
2375 :
2376 : /* eof */
2377 0 : if (readBytes == 0)
2378 : {
2379 0 : CloseTransientFile(*fd);
2380 0 : *fd = -1;
2381 0 : (*segno)++;
2382 0 : continue;
2383 : }
2384 0 : else if (readBytes < 0)
2385 0 : ereport(ERROR,
2386 : (errcode_for_file_access(),
2387 : errmsg("could not read from reorderbuffer spill file: %m")));
2388 0 : else if (readBytes != sizeof(ReorderBufferDiskChange))
2389 0 : ereport(ERROR,
2390 : (errcode_for_file_access(),
2391 : errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2392 : readBytes,
2393 : (uint32) sizeof(ReorderBufferDiskChange))));
2394 :
2395 0 : ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2396 :
2397 0 : ReorderBufferSerializeReserve(rb,
2398 0 : sizeof(ReorderBufferDiskChange) + ondisk->size);
2399 0 : ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2400 :
2401 0 : pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
2402 0 : readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
2403 0 : ondisk->size - sizeof(ReorderBufferDiskChange));
2404 0 : pgstat_report_wait_end();
2405 :
2406 0 : if (readBytes < 0)
2407 0 : ereport(ERROR,
2408 : (errcode_for_file_access(),
2409 : errmsg("could not read from reorderbuffer spill file: %m")));
2410 0 : else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
2411 0 : ereport(ERROR,
2412 : (errcode_for_file_access(),
2413 : errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2414 : readBytes,
2415 : (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
2416 :
2417 : /*
2418 : * ok, read a full change from disk, now restore it into proper
2419 : * in-memory format
2420 : */
2421 0 : ReorderBufferRestoreChange(rb, txn, rb->outbuf);
2422 0 : restored++;
2423 : }
2424 :
2425 0 : return restored;
2426 : }
2427 :
2428 : /*
2429 : * Convert change from its on-disk format to in-memory format and queue it onto
2430 : * the TXN's ->changes list.
2431 : *
2432 : * Note: although "data" is declared char*, at entry it points to a
2433 : * maxalign'd buffer, making it safe in most of this function to assume
2434 : * that the pointed-to data is suitably aligned for direct access.
2435 : */
2436 : static void
2437 0 : ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
2438 : char *data)
2439 : {
2440 : ReorderBufferDiskChange *ondisk;
2441 : ReorderBufferChange *change;
2442 :
2443 0 : ondisk = (ReorderBufferDiskChange *) data;
2444 :
2445 0 : change = ReorderBufferGetChange(rb);
2446 :
2447 : /* copy static part */
2448 0 : memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
2449 :
2450 0 : data += sizeof(ReorderBufferDiskChange);
2451 :
2452 : /* restore individual stuff */
2453 0 : switch (change->action)
2454 : {
2455 : /* fall through these, they're all similar enough */
2456 : case REORDER_BUFFER_CHANGE_INSERT:
2457 : case REORDER_BUFFER_CHANGE_UPDATE:
2458 : case REORDER_BUFFER_CHANGE_DELETE:
2459 : case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
2460 0 : if (change->data.tp.oldtuple)
2461 : {
2462 0 : uint32 tuplelen = ((HeapTuple) data)->t_len;
2463 :
2464 0 : change->data.tp.oldtuple =
2465 0 : ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
2466 :
2467 : /* restore ->tuple */
2468 0 : memcpy(&change->data.tp.oldtuple->tuple, data,
2469 : sizeof(HeapTupleData));
2470 0 : data += sizeof(HeapTupleData);
2471 :
2472 : /* reset t_data pointer into the new tuplebuf */
2473 0 : change->data.tp.oldtuple->tuple.t_data =
2474 0 : ReorderBufferTupleBufData(change->data.tp.oldtuple);
2475 :
2476 : /* restore tuple data itself */
2477 0 : memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
2478 0 : data += tuplelen;
2479 : }
2480 :
2481 0 : if (change->data.tp.newtuple)
2482 : {
2483 : /* here, data might not be suitably aligned! */
2484 : uint32 tuplelen;
2485 :
2486 0 : memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
2487 : sizeof(uint32));
2488 :
2489 0 : change->data.tp.newtuple =
2490 0 : ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
2491 :
2492 : /* restore ->tuple */
2493 0 : memcpy(&change->data.tp.newtuple->tuple, data,
2494 : sizeof(HeapTupleData));
2495 0 : data += sizeof(HeapTupleData);
2496 :
2497 : /* reset t_data pointer into the new tuplebuf */
2498 0 : change->data.tp.newtuple->tuple.t_data =
2499 0 : ReorderBufferTupleBufData(change->data.tp.newtuple);
2500 :
2501 : /* restore tuple data itself */
2502 0 : memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
2503 0 : data += tuplelen;
2504 : }
2505 :
2506 0 : break;
2507 : case REORDER_BUFFER_CHANGE_MESSAGE:
2508 : {
2509 : Size prefix_size;
2510 :
2511 : /* read prefix */
2512 0 : memcpy(&prefix_size, data, sizeof(Size));
2513 0 : data += sizeof(Size);
2514 0 : change->data.msg.prefix = MemoryContextAlloc(rb->context,
2515 : prefix_size);
2516 0 : memcpy(change->data.msg.prefix, data, prefix_size);
2517 0 : Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
2518 0 : data += prefix_size;
2519 :
2520 : /* read the message */
2521 0 : memcpy(&change->data.msg.message_size, data, sizeof(Size));
2522 0 : data += sizeof(Size);
2523 0 : change->data.msg.message = MemoryContextAlloc(rb->context,
2524 : change->data.msg.message_size);
2525 0 : memcpy(change->data.msg.message, data,
2526 : change->data.msg.message_size);
2527 0 : data += change->data.msg.message_size;
2528 :
2529 0 : break;
2530 : }
2531 : case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
2532 : {
2533 : Snapshot oldsnap;
2534 : Snapshot newsnap;
2535 : Size size;
2536 :
2537 0 : oldsnap = (Snapshot) data;
2538 :
2539 0 : size = sizeof(SnapshotData) +
2540 0 : sizeof(TransactionId) * oldsnap->xcnt +
2541 0 : sizeof(TransactionId) * (oldsnap->subxcnt + 0);
2542 :
2543 0 : change->data.snapshot = MemoryContextAllocZero(rb->context, size);
2544 :
2545 0 : newsnap = change->data.snapshot;
2546 :
2547 0 : memcpy(newsnap, data, size);
2548 0 : newsnap->xip = (TransactionId *)
2549 : (((char *) newsnap) + sizeof(SnapshotData));
2550 0 : newsnap->subxip = newsnap->xip + newsnap->xcnt;
2551 0 : newsnap->copied = true;
2552 0 : break;
2553 : }
2554 : /* the base struct contains all the data, easy peasy */
2555 : case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
2556 : case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
2557 : case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
2558 0 : break;
2559 : }
2560 :
2561 0 : dlist_push_tail(&txn->changes, &change->node);
2562 0 : txn->nentries_mem++;
2563 0 : }
2564 :
2565 : /*
2566 : * Remove all on-disk stored for the passed in transaction.
2567 : */
2568 : static void
2569 0 : ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
2570 : {
2571 : XLogSegNo first;
2572 : XLogSegNo cur;
2573 : XLogSegNo last;
2574 :
2575 0 : Assert(txn->first_lsn != InvalidXLogRecPtr);
2576 0 : Assert(txn->final_lsn != InvalidXLogRecPtr);
2577 :
2578 0 : XLByteToSeg(txn->first_lsn, first);
2579 0 : XLByteToSeg(txn->final_lsn, last);
2580 :
2581 : /* iterate over all possible filenames, and delete them */
2582 0 : for (cur = first; cur <= last; cur++)
2583 : {
2584 : char path[MAXPGPATH];
2585 : XLogRecPtr recptr;
2586 :
2587 0 : XLogSegNoOffsetToRecPtr(cur, 0, recptr);
2588 :
2589 0 : sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2590 0 : NameStr(MyReplicationSlot->data.name), txn->xid,
2591 0 : (uint32) (recptr >> 32), (uint32) recptr);
2592 0 : if (unlink(path) != 0 && errno != ENOENT)
2593 0 : ereport(ERROR,
2594 : (errcode_for_file_access(),
2595 : errmsg("could not remove file \"%s\": %m", path)));
2596 : }
2597 0 : }
2598 :
2599 : /*
2600 : * Delete all data spilled to disk after we've restarted/crashed. It will be
2601 : * recreated when the respective slots are reused.
2602 : */
2603 : void
2604 6 : StartupReorderBuffer(void)
2605 : {
2606 : DIR *logical_dir;
2607 : struct dirent *logical_de;
2608 :
2609 : DIR *spill_dir;
2610 : struct dirent *spill_de;
2611 :
2612 6 : logical_dir = AllocateDir("pg_replslot");
2613 24 : while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
2614 : {
2615 : struct stat statbuf;
2616 : char path[MAXPGPATH * 2 + 12];
2617 :
2618 18 : if (strcmp(logical_de->d_name, ".") == 0 ||
2619 6 : strcmp(logical_de->d_name, "..") == 0)
2620 24 : continue;
2621 :
2622 : /* if it cannot be a slot, skip the directory */
2623 0 : if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
2624 0 : continue;
2625 :
2626 : /*
2627 : * ok, has to be a surviving logical slot, iterate and delete
2628 : * everything starting with xid-*
2629 : */
2630 0 : sprintf(path, "pg_replslot/%s", logical_de->d_name);
2631 :
2632 : /* we're only creating directories here, skip if it's not our's */
2633 0 : if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2634 0 : continue;
2635 :
2636 0 : spill_dir = AllocateDir(path);
2637 0 : while ((spill_de = ReadDir(spill_dir, path)) != NULL)
2638 : {
2639 0 : if (strcmp(spill_de->d_name, ".") == 0 ||
2640 0 : strcmp(spill_de->d_name, "..") == 0)
2641 0 : continue;
2642 :
2643 : /* only look at names that can be ours */
2644 0 : if (strncmp(spill_de->d_name, "xid", 3) == 0)
2645 : {
2646 0 : sprintf(path, "pg_replslot/%s/%s", logical_de->d_name,
2647 0 : spill_de->d_name);
2648 :
2649 0 : if (unlink(path) != 0)
2650 0 : ereport(PANIC,
2651 : (errcode_for_file_access(),
2652 : errmsg("could not remove file \"%s\": %m",
2653 : path)));
2654 : }
2655 : }
2656 0 : FreeDir(spill_dir);
2657 : }
2658 6 : FreeDir(logical_dir);
2659 6 : }
2660 :
2661 : /* ---------------------------------------
2662 : * toast reassembly support
2663 : * ---------------------------------------
2664 : */
2665 :
2666 : /*
2667 : * Initialize per tuple toast reconstruction support.
2668 : */
2669 : static void
2670 0 : ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
2671 : {
2672 : HASHCTL hash_ctl;
2673 :
2674 0 : Assert(txn->toast_hash == NULL);
2675 :
2676 0 : memset(&hash_ctl, 0, sizeof(hash_ctl));
2677 0 : hash_ctl.keysize = sizeof(Oid);
2678 0 : hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
2679 0 : hash_ctl.hcxt = rb->context;
2680 0 : txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
2681 : HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
2682 0 : }
2683 :
2684 : /*
2685 : * Per toast-chunk handling for toast reconstruction
2686 : *
2687 : * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
2688 : * toasted Datum comes along.
2689 : */
2690 : static void
2691 0 : ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
2692 : Relation relation, ReorderBufferChange *change)
2693 : {
2694 : ReorderBufferToastEnt *ent;
2695 : ReorderBufferTupleBuf *newtup;
2696 : bool found;
2697 : int32 chunksize;
2698 : bool isnull;
2699 : Pointer chunk;
2700 0 : TupleDesc desc = RelationGetDescr(relation);
2701 : Oid chunk_id;
2702 : int32 chunk_seq;
2703 :
2704 0 : if (txn->toast_hash == NULL)
2705 0 : ReorderBufferToastInitHash(rb, txn);
2706 :
2707 0 : Assert(IsToastRelation(relation));
2708 :
2709 0 : newtup = change->data.tp.newtuple;
2710 0 : chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
2711 0 : Assert(!isnull);
2712 0 : chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
2713 0 : Assert(!isnull);
2714 :
2715 0 : ent = (ReorderBufferToastEnt *)
2716 0 : hash_search(txn->toast_hash,
2717 : (void *) &chunk_id,
2718 : HASH_ENTER,
2719 : &found);
2720 :
2721 0 : if (!found)
2722 : {
2723 0 : Assert(ent->chunk_id == chunk_id);
2724 0 : ent->num_chunks = 0;
2725 0 : ent->last_chunk_seq = 0;
2726 0 : ent->size = 0;
2727 0 : ent->reconstructed = NULL;
2728 0 : dlist_init(&ent->chunks);
2729 :
2730 0 : if (chunk_seq != 0)
2731 0 : elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
2732 : chunk_seq, chunk_id);
2733 : }
2734 0 : else if (found && chunk_seq != ent->last_chunk_seq + 1)
2735 0 : elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
2736 : chunk_seq, chunk_id, ent->last_chunk_seq + 1);
2737 :
2738 0 : chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
2739 0 : Assert(!isnull);
2740 :
2741 : /* calculate size so we can allocate the right size at once later */
2742 0 : if (!VARATT_IS_EXTENDED(chunk))
2743 0 : chunksize = VARSIZE(chunk) - VARHDRSZ;
2744 0 : else if (VARATT_IS_SHORT(chunk))
2745 : /* could happen due to heap_form_tuple doing its thing */
2746 0 : chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
2747 : else
2748 0 : elog(ERROR, "unexpected type of toast chunk");
2749 :
2750 0 : ent->size += chunksize;
2751 0 : ent->last_chunk_seq = chunk_seq;
2752 0 : ent->num_chunks++;
2753 0 : dlist_push_tail(&ent->chunks, &change->node);
2754 0 : }
2755 :
2756 : /*
2757 : * Rejigger change->newtuple to point to in-memory toast tuples instead to
2758 : * on-disk toast tuples that may not longer exist (think DROP TABLE or VACUUM).
2759 : *
2760 : * We cannot replace unchanged toast tuples though, so those will still point
2761 : * to on-disk toast data.
2762 : */
2763 : static void
2764 0 : ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
2765 : Relation relation, ReorderBufferChange *change)
2766 : {
2767 : TupleDesc desc;
2768 : int natt;
2769 : Datum *attrs;
2770 : bool *isnull;
2771 : bool *free;
2772 : HeapTuple tmphtup;
2773 : Relation toast_rel;
2774 : TupleDesc toast_desc;
2775 : MemoryContext oldcontext;
2776 : ReorderBufferTupleBuf *newtup;
2777 :
2778 : /* no toast tuples changed */
2779 0 : if (txn->toast_hash == NULL)
2780 0 : return;
2781 :
2782 0 : oldcontext = MemoryContextSwitchTo(rb->context);
2783 :
2784 : /* we should only have toast tuples in an INSERT or UPDATE */
2785 0 : Assert(change->data.tp.newtuple);
2786 :
2787 0 : desc = RelationGetDescr(relation);
2788 :
2789 0 : toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
2790 0 : toast_desc = RelationGetDescr(toast_rel);
2791 :
2792 : /* should we allocate from stack instead? */
2793 0 : attrs = palloc0(sizeof(Datum) * desc->natts);
2794 0 : isnull = palloc0(sizeof(bool) * desc->natts);
2795 0 : free = palloc0(sizeof(bool) * desc->natts);
2796 :
2797 0 : newtup = change->data.tp.newtuple;
2798 :
2799 0 : heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
2800 :
2801 0 : for (natt = 0; natt < desc->natts; natt++)
2802 : {
2803 0 : Form_pg_attribute attr = TupleDescAttr(desc, natt);
2804 : ReorderBufferToastEnt *ent;
2805 : struct varlena *varlena;
2806 :
2807 : /* va_rawsize is the size of the original datum -- including header */
2808 : struct varatt_external toast_pointer;
2809 : struct varatt_indirect redirect_pointer;
2810 0 : struct varlena *new_datum = NULL;
2811 : struct varlena *reconstructed;
2812 : dlist_iter it;
2813 0 : Size data_done = 0;
2814 :
2815 : /* system columns aren't toasted */
2816 0 : if (attr->attnum < 0)
2817 0 : continue;
2818 :
2819 0 : if (attr->attisdropped)
2820 0 : continue;
2821 :
2822 : /* not a varlena datatype */
2823 0 : if (attr->attlen != -1)
2824 0 : continue;
2825 :
2826 : /* no data */
2827 0 : if (isnull[natt])
2828 0 : continue;
2829 :
2830 : /* ok, we know we have a toast datum */
2831 0 : varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
2832 :
2833 : /* no need to do anything if the tuple isn't external */
2834 0 : if (!VARATT_IS_EXTERNAL(varlena))
2835 0 : continue;
2836 :
2837 0 : VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
2838 :
2839 : /*
2840 : * Check whether the toast tuple changed, replace if so.
2841 : */
2842 0 : ent = (ReorderBufferToastEnt *)
2843 0 : hash_search(txn->toast_hash,
2844 : (void *) &toast_pointer.va_valueid,
2845 : HASH_FIND,
2846 : NULL);
2847 0 : if (ent == NULL)
2848 0 : continue;
2849 :
2850 0 : new_datum =
2851 : (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
2852 :
2853 0 : free[natt] = true;
2854 :
2855 0 : reconstructed = palloc0(toast_pointer.va_rawsize);
2856 :
2857 0 : ent->reconstructed = reconstructed;
2858 :
2859 : /* stitch toast tuple back together from its parts */
2860 0 : dlist_foreach(it, &ent->chunks)
2861 : {
2862 : bool isnull;
2863 : ReorderBufferChange *cchange;
2864 : ReorderBufferTupleBuf *ctup;
2865 : Pointer chunk;
2866 :
2867 0 : cchange = dlist_container(ReorderBufferChange, node, it.cur);
2868 0 : ctup = cchange->data.tp.newtuple;
2869 0 : chunk = DatumGetPointer(
2870 : fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
2871 :
2872 0 : Assert(!isnull);
2873 0 : Assert(!VARATT_IS_EXTERNAL(chunk));
2874 0 : Assert(!VARATT_IS_SHORT(chunk));
2875 :
2876 0 : memcpy(VARDATA(reconstructed) + data_done,
2877 0 : VARDATA(chunk),
2878 0 : VARSIZE(chunk) - VARHDRSZ);
2879 0 : data_done += VARSIZE(chunk) - VARHDRSZ;
2880 : }
2881 0 : Assert(data_done == toast_pointer.va_extsize);
2882 :
2883 : /* make sure its marked as compressed or not */
2884 0 : if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
2885 0 : SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
2886 : else
2887 0 : SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
2888 :
2889 0 : memset(&redirect_pointer, 0, sizeof(redirect_pointer));
2890 0 : redirect_pointer.pointer = reconstructed;
2891 :
2892 0 : SET_VARTAG_EXTERNAL(new_datum, VARTAG_INDIRECT);
2893 0 : memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
2894 : sizeof(redirect_pointer));
2895 :
2896 0 : attrs[natt] = PointerGetDatum(new_datum);
2897 : }
2898 :
2899 : /*
2900 : * Build tuple in separate memory & copy tuple back into the tuplebuf
2901 : * passed to the output plugin. We can't directly heap_fill_tuple() into
2902 : * the tuplebuf because attrs[] will point back into the current content.
2903 : */
2904 0 : tmphtup = heap_form_tuple(desc, attrs, isnull);
2905 0 : Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
2906 0 : Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
2907 :
2908 0 : memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
2909 0 : newtup->tuple.t_len = tmphtup->t_len;
2910 :
2911 : /*
2912 : * free resources we won't further need, more persistent stuff will be
2913 : * free'd in ReorderBufferToastReset().
2914 : */
2915 0 : RelationClose(toast_rel);
2916 0 : pfree(tmphtup);
2917 0 : for (natt = 0; natt < desc->natts; natt++)
2918 : {
2919 0 : if (free[natt])
2920 0 : pfree(DatumGetPointer(attrs[natt]));
2921 : }
2922 0 : pfree(attrs);
2923 0 : pfree(free);
2924 0 : pfree(isnull);
2925 :
2926 0 : MemoryContextSwitchTo(oldcontext);
2927 : }
2928 :
2929 : /*
2930 : * Free all resources allocated for toast reconstruction.
2931 : */
2932 : static void
2933 0 : ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
2934 : {
2935 : HASH_SEQ_STATUS hstat;
2936 : ReorderBufferToastEnt *ent;
2937 :
2938 0 : if (txn->toast_hash == NULL)
2939 0 : return;
2940 :
2941 : /* sequentially walk over the hash and free everything */
2942 0 : hash_seq_init(&hstat, txn->toast_hash);
2943 0 : while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
2944 : {
2945 : dlist_mutable_iter it;
2946 :
2947 0 : if (ent->reconstructed != NULL)
2948 0 : pfree(ent->reconstructed);
2949 :
2950 0 : dlist_foreach_modify(it, &ent->chunks)
2951 : {
2952 0 : ReorderBufferChange *change =
2953 0 : dlist_container(ReorderBufferChange, node, it.cur);
2954 :
2955 0 : dlist_delete(&change->node);
2956 0 : ReorderBufferReturnChange(rb, change);
2957 : }
2958 : }
2959 :
2960 0 : hash_destroy(txn->toast_hash);
2961 0 : txn->toast_hash = NULL;
2962 : }
2963 :
2964 :
2965 : /* ---------------------------------------
2966 : * Visibility support for logical decoding
2967 : *
2968 : *
2969 : * Lookup actual cmin/cmax values when using decoding snapshot. We can't
2970 : * always rely on stored cmin/cmax values because of two scenarios:
2971 : *
2972 : * * A tuple got changed multiple times during a single transaction and thus
2973 : * has got a combocid. Combocid's are only valid for the duration of a
2974 : * single transaction.
2975 : * * A tuple with a cmin but no cmax (and thus no combocid) got
2976 : * deleted/updated in another transaction than the one which created it
2977 : * which we are looking at right now. As only one of cmin, cmax or combocid
2978 : * is actually stored in the heap we don't have access to the value we
2979 : * need anymore.
2980 : *
2981 : * To resolve those problems we have a per-transaction hash of (cmin,
2982 : * cmax) tuples keyed by (relfilenode, ctid) which contains the actual
2983 : * (cmin, cmax) values. That also takes care of combocids by simply
2984 : * not caring about them at all. As we have the real cmin/cmax values
2985 : * combocids aren't interesting.
2986 : *
2987 : * As we only care about catalog tuples here the overhead of this
2988 : * hashtable should be acceptable.
2989 : *
2990 : * Heap rewrites complicate this a bit, check rewriteheap.c for
2991 : * details.
2992 : * -------------------------------------------------------------------------
2993 : */
2994 :
2995 : /* struct for qsort()ing mapping files by lsn somewhat efficiently */
2996 : typedef struct RewriteMappingFile
2997 : {
2998 : XLogRecPtr lsn;
2999 : char fname[MAXPGPATH];
3000 : } RewriteMappingFile;
3001 :
3002 : #if NOT_USED
3003 : static void
3004 : DisplayMapping(HTAB *tuplecid_data)
3005 : {
3006 : HASH_SEQ_STATUS hstat;
3007 : ReorderBufferTupleCidEnt *ent;
3008 :
3009 : hash_seq_init(&hstat, tuplecid_data);
3010 : while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
3011 : {
3012 : elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
3013 : ent->key.relnode.dbNode,
3014 : ent->key.relnode.spcNode,
3015 : ent->key.relnode.relNode,
3016 : ItemPointerGetBlockNumber(&ent->key.tid),
3017 : ItemPointerGetOffsetNumber(&ent->key.tid),
3018 : ent->cmin,
3019 : ent->cmax
3020 : );
3021 : }
3022 : }
3023 : #endif
3024 :
3025 : /*
3026 : * Apply a single mapping file to tuplecid_data.
3027 : *
3028 : * The mapping file has to have been verified to be a) committed b) for our
3029 : * transaction c) applied in LSN order.
3030 : */
3031 : static void
3032 0 : ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
3033 : {
3034 : char path[MAXPGPATH];
3035 : int fd;
3036 : int readBytes;
3037 : LogicalRewriteMappingData map;
3038 :
3039 0 : sprintf(path, "pg_logical/mappings/%s", fname);
3040 0 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
3041 0 : if (fd < 0)
3042 0 : ereport(ERROR,
3043 : (errcode_for_file_access(),
3044 : errmsg("could not open file \"%s\": %m", path)));
3045 :
3046 : while (true)
3047 : {
3048 : ReorderBufferTupleCidKey key;
3049 : ReorderBufferTupleCidEnt *ent;
3050 : ReorderBufferTupleCidEnt *new_ent;
3051 : bool found;
3052 :
3053 : /* be careful about padding */
3054 0 : memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
3055 :
3056 : /* read all mappings till the end of the file */
3057 0 : pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ);
3058 0 : readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
3059 0 : pgstat_report_wait_end();
3060 :
3061 0 : if (readBytes < 0)
3062 0 : ereport(ERROR,
3063 : (errcode_for_file_access(),
3064 : errmsg("could not read file \"%s\": %m",
3065 : path)));
3066 0 : else if (readBytes == 0) /* EOF */
3067 0 : break;
3068 0 : else if (readBytes != sizeof(LogicalRewriteMappingData))
3069 0 : ereport(ERROR,
3070 : (errcode_for_file_access(),
3071 : errmsg("could not read from file \"%s\": read %d instead of %d bytes",
3072 : path, readBytes,
3073 : (int32) sizeof(LogicalRewriteMappingData))));
3074 :
3075 0 : key.relnode = map.old_node;
3076 0 : ItemPointerCopy(&map.old_tid,
3077 : &key.tid);
3078 :
3079 :
3080 0 : ent = (ReorderBufferTupleCidEnt *)
3081 : hash_search(tuplecid_data,
3082 : (void *) &key,
3083 : HASH_FIND,
3084 : NULL);
3085 :
3086 : /* no existing mapping, no need to update */
3087 0 : if (!ent)
3088 0 : continue;
3089 :
3090 0 : key.relnode = map.new_node;
3091 0 : ItemPointerCopy(&map.new_tid,
3092 : &key.tid);
3093 :
3094 0 : new_ent = (ReorderBufferTupleCidEnt *)
3095 : hash_search(tuplecid_data,
3096 : (void *) &key,
3097 : HASH_ENTER,
3098 : &found);
3099 :
3100 0 : if (found)
3101 : {
3102 : /*
3103 : * Make sure the existing mapping makes sense. We sometime update
3104 : * old records that did not yet have a cmax (e.g. pg_class' own
3105 : * entry while rewriting it) during rewrites, so allow that.
3106 : */
3107 0 : Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
3108 0 : Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
3109 : }
3110 : else
3111 : {
3112 : /* update mapping */
3113 0 : new_ent->cmin = ent->cmin;
3114 0 : new_ent->cmax = ent->cmax;
3115 0 : new_ent->combocid = ent->combocid;
3116 : }
3117 0 : }
3118 0 : }
3119 :
3120 :
3121 : /*
3122 : * Check whether the TransactionOId 'xid' is in the pre-sorted array 'xip'.
3123 : */
3124 : static bool
3125 0 : TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
3126 : {
3127 0 : return bsearch(&xid, xip, num,
3128 0 : sizeof(TransactionId), xidComparator) != NULL;
3129 : }
3130 :
3131 : /*
3132 : * qsort() comparator for sorting RewriteMappingFiles in LSN order.
3133 : */
3134 : static int
3135 0 : file_sort_by_lsn(const void *a_p, const void *b_p)
3136 : {
3137 0 : RewriteMappingFile *a = *(RewriteMappingFile **) a_p;
3138 0 : RewriteMappingFile *b = *(RewriteMappingFile **) b_p;
3139 :
3140 0 : if (a->lsn < b->lsn)
3141 0 : return -1;
3142 0 : else if (a->lsn > b->lsn)
3143 0 : return 1;
3144 0 : return 0;
3145 : }
3146 :
3147 : /*
3148 : * Apply any existing logical remapping files if there are any targeted at our
3149 : * transaction for relid.
3150 : */
3151 : static void
3152 0 : UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
3153 : {
3154 : DIR *mapping_dir;
3155 : struct dirent *mapping_de;
3156 0 : List *files = NIL;
3157 : ListCell *file;
3158 : RewriteMappingFile **files_a;
3159 : size_t off;
3160 0 : Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
3161 :
3162 0 : mapping_dir = AllocateDir("pg_logical/mappings");
3163 0 : while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
3164 : {
3165 : Oid f_dboid;
3166 : Oid f_relid;
3167 : TransactionId f_mapped_xid;
3168 : TransactionId f_create_xid;
3169 : XLogRecPtr f_lsn;
3170 : uint32 f_hi,
3171 : f_lo;
3172 : RewriteMappingFile *f;
3173 :
3174 0 : if (strcmp(mapping_de->d_name, ".") == 0 ||
3175 0 : strcmp(mapping_de->d_name, "..") == 0)
3176 0 : continue;
3177 :
3178 : /* Ignore files that aren't ours */
3179 0 : if (strncmp(mapping_de->d_name, "map-", 4) != 0)
3180 0 : continue;
3181 :
3182 0 : if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
3183 : &f_dboid, &f_relid, &f_hi, &f_lo,
3184 : &f_mapped_xid, &f_create_xid) != 6)
3185 0 : elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
3186 :
3187 0 : f_lsn = ((uint64) f_hi) << 32 | f_lo;
3188 :
3189 : /* mapping for another database */
3190 0 : if (f_dboid != dboid)
3191 0 : continue;
3192 :
3193 : /* mapping for another relation */
3194 0 : if (f_relid != relid)
3195 0 : continue;
3196 :
3197 : /* did the creating transaction abort? */
3198 0 : if (!TransactionIdDidCommit(f_create_xid))
3199 0 : continue;
3200 :
3201 : /* not for our transaction */
3202 0 : if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
3203 0 : continue;
3204 :
3205 : /* ok, relevant, queue for apply */
3206 0 : f = palloc(sizeof(RewriteMappingFile));
3207 0 : f->lsn = f_lsn;
3208 0 : strcpy(f->fname, mapping_de->d_name);
3209 0 : files = lappend(files, f);
3210 : }
3211 0 : FreeDir(mapping_dir);
3212 :
3213 : /* build array we can easily sort */
3214 0 : files_a = palloc(list_length(files) * sizeof(RewriteMappingFile *));
3215 0 : off = 0;
3216 0 : foreach(file, files)
3217 : {
3218 0 : files_a[off++] = lfirst(file);
3219 : }
3220 :
3221 : /* sort files so we apply them in LSN order */
3222 0 : qsort(files_a, list_length(files), sizeof(RewriteMappingFile *),
3223 : file_sort_by_lsn);
3224 :
3225 0 : for (off = 0; off < list_length(files); off++)
3226 : {
3227 0 : RewriteMappingFile *f = files_a[off];
3228 :
3229 0 : elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
3230 : snapshot->subxip[0]);
3231 0 : ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
3232 0 : pfree(f);
3233 : }
3234 0 : }
3235 :
3236 : /*
3237 : * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
3238 : * combocids.
3239 : */
3240 : bool
3241 0 : ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data,
3242 : Snapshot snapshot,
3243 : HeapTuple htup, Buffer buffer,
3244 : CommandId *cmin, CommandId *cmax)
3245 : {
3246 : ReorderBufferTupleCidKey key;
3247 : ReorderBufferTupleCidEnt *ent;
3248 : ForkNumber forkno;
3249 : BlockNumber blockno;
3250 0 : bool updated_mapping = false;
3251 :
3252 : /* be careful about padding */
3253 0 : memset(&key, 0, sizeof(key));
3254 :
3255 0 : Assert(!BufferIsLocal(buffer));
3256 :
3257 : /*
3258 : * get relfilenode from the buffer, no convenient way to access it other
3259 : * than that.
3260 : */
3261 0 : BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
3262 :
3263 : /* tuples can only be in the main fork */
3264 0 : Assert(forkno == MAIN_FORKNUM);
3265 0 : Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
3266 :
3267 0 : ItemPointerCopy(&htup->t_self,
3268 : &key.tid);
3269 :
3270 : restart:
3271 0 : ent = (ReorderBufferTupleCidEnt *)
3272 : hash_search(tuplecid_data,
3273 : (void *) &key,
3274 : HASH_FIND,
3275 : NULL);
3276 :
3277 : /*
3278 : * failed to find a mapping, check whether the table was rewritten and
3279 : * apply mapping if so, but only do that once - there can be no new
3280 : * mappings while we are in here since we have to hold a lock on the
3281 : * relation.
3282 : */
3283 0 : if (ent == NULL && !updated_mapping)
3284 : {
3285 0 : UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
3286 : /* now check but don't update for a mapping again */
3287 0 : updated_mapping = true;
3288 0 : goto restart;
3289 : }
3290 0 : else if (ent == NULL)
3291 0 : return false;
3292 :
3293 0 : if (cmin)
3294 0 : *cmin = ent->cmin;
3295 0 : if (cmax)
3296 0 : *cmax = ent->cmax;
3297 0 : return true;
3298 : }
|