Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * rewriteheap.c
4 : * Support functions to rewrite tables.
5 : *
6 : * These functions provide a facility to completely rewrite a heap, while
7 : * preserving visibility information and update chains.
8 : *
9 : * INTERFACE
10 : *
11 : * The caller is responsible for creating the new heap, all catalog
12 : * changes, supplying the tuples to be written to the new heap, and
13 : * rebuilding indexes. The caller must hold AccessExclusiveLock on the
14 : * target table, because we assume no one else is writing into it.
15 : *
16 : * To use the facility:
17 : *
18 : * begin_heap_rewrite
19 : * while (fetch next tuple)
20 : * {
21 : * if (tuple is dead)
22 : * rewrite_heap_dead_tuple
23 : * else
24 : * {
25 : * // do any transformations here if required
26 : * rewrite_heap_tuple
27 : * }
28 : * }
29 : * end_heap_rewrite
30 : *
31 : * The contents of the new relation shouldn't be relied on until after
32 : * end_heap_rewrite is called.
33 : *
34 : *
35 : * IMPLEMENTATION
36 : *
37 : * This would be a fairly trivial affair, except that we need to maintain
38 : * the ctid chains that link versions of an updated tuple together.
39 : * Since the newly stored tuples will have tids different from the original
40 : * ones, if we just copied t_ctid fields to the new table the links would
41 : * be wrong. When we are required to copy a (presumably recently-dead or
42 : * delete-in-progress) tuple whose ctid doesn't point to itself, we have
43 : * to substitute the correct ctid instead.
44 : *
45 : * For each ctid reference from A -> B, we might encounter either A first
46 : * or B first. (Note that a tuple in the middle of a chain is both A and B
47 : * of different pairs.)
48 : *
49 : * If we encounter A first, we'll store the tuple in the unresolved_tups
50 : * hash table. When we later encounter B, we remove A from the hash table,
51 : * fix the ctid to point to the new location of B, and insert both A and B
52 : * to the new heap.
53 : *
54 : * If we encounter B first, we can insert B to the new heap right away.
55 : * We then add an entry to the old_new_tid_map hash table showing B's
56 : * original tid (in the old heap) and new tid (in the new heap).
57 : * When we later encounter A, we get the new location of B from the table,
58 : * and can write A immediately with the correct ctid.
59 : *
60 : * Entries in the hash tables can be removed as soon as the later tuple
61 : * is encountered. That helps to keep the memory usage down. At the end,
62 : * both tables are usually empty; we should have encountered both A and B
63 : * of each pair. However, it's possible for A to be RECENTLY_DEAD and B
64 : * entirely DEAD according to HeapTupleSatisfiesVacuum, because the test
65 : * for deadness using OldestXmin is not exact. In such a case we might
66 : * encounter B first, and skip it, and find A later. Then A would be added
67 : * to unresolved_tups, and stay there until end of the rewrite. Since
68 : * this case is very unusual, we don't worry about the memory usage.
69 : *
70 : * Using in-memory hash tables means that we use some memory for each live
71 : * update chain in the table, from the time we find one end of the
72 : * reference until we find the other end. That shouldn't be a problem in
73 : * practice, but if you do something like an UPDATE without a where-clause
74 : * on a large table, and then run CLUSTER in the same transaction, you
75 : * could run out of memory. It doesn't seem worthwhile to add support for
76 : * spill-to-disk, as there shouldn't be that many RECENTLY_DEAD tuples in a
77 : * table under normal circumstances. Furthermore, in the typical scenario
78 : * of CLUSTERing on an unchanging key column, we'll see all the versions
79 : * of a given tuple together anyway, and so the peak memory usage is only
80 : * proportional to the number of RECENTLY_DEAD versions of a single row, not
81 : * in the whole table. Note that if we do fail halfway through a CLUSTER,
82 : * the old table is still valid, so failure is not catastrophic.
83 : *
84 : * We can't use the normal heap_insert function to insert into the new
85 : * heap, because heap_insert overwrites the visibility information.
86 : * We use a special-purpose raw_heap_insert function instead, which
87 : * is optimized for bulk inserting a lot of tuples, knowing that we have
88 : * exclusive access to the heap. raw_heap_insert builds new pages in
89 : * local storage. When a page is full, or at the end of the process,
90 : * we insert it to WAL as a single record and then write it to disk
91 : * directly through smgr. Note, however, that any data sent to the new
92 : * heap's TOAST table will go through the normal bufmgr.
93 : *
94 : *
95 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
96 : * Portions Copyright (c) 1994-5, Regents of the University of California
97 : *
98 : * IDENTIFICATION
99 : * src/backend/access/heap/rewriteheap.c
100 : *
101 : *-------------------------------------------------------------------------
102 : */
103 : #include "postgres.h"
104 :
105 : #include <sys/stat.h>
106 : #include <unistd.h>
107 :
108 : #include "miscadmin.h"
109 :
110 : #include "access/heapam.h"
111 : #include "access/heapam_xlog.h"
112 : #include "access/rewriteheap.h"
113 : #include "access/transam.h"
114 : #include "access/tuptoaster.h"
115 : #include "access/xact.h"
116 : #include "access/xloginsert.h"
117 :
118 : #include "catalog/catalog.h"
119 :
120 : #include "lib/ilist.h"
121 :
122 : #include "pgstat.h"
123 :
124 : #include "replication/logical.h"
125 : #include "replication/slot.h"
126 :
127 : #include "storage/bufmgr.h"
128 : #include "storage/fd.h"
129 : #include "storage/smgr.h"
130 :
131 : #include "utils/memutils.h"
132 : #include "utils/rel.h"
133 : #include "utils/tqual.h"
134 :
135 : #include "storage/procarray.h"
136 :
137 : /*
138 : * State associated with a rewrite operation. This is opaque to the user
139 : * of the rewrite facility.
140 : */
141 : typedef struct RewriteStateData
142 : {
143 : Relation rs_old_rel; /* source heap */
144 : Relation rs_new_rel; /* destination heap */
145 : Page rs_buffer; /* page currently being built */
146 : BlockNumber rs_blockno; /* block where page will go */
147 : bool rs_buffer_valid; /* T if any tuples in buffer */
148 : bool rs_use_wal; /* must we WAL-log inserts? */
149 : bool rs_logical_rewrite; /* do we need to do logical rewriting */
150 : TransactionId rs_oldest_xmin; /* oldest xmin used by caller to determine
151 : * tuple visibility */
152 : TransactionId rs_freeze_xid; /* Xid that will be used as freeze cutoff
153 : * point */
154 : TransactionId rs_logical_xmin; /* Xid that will be used as cutoff point
155 : * for logical rewrites */
156 : MultiXactId rs_cutoff_multi; /* MultiXactId that will be used as cutoff
157 : * point for multixacts */
158 : MemoryContext rs_cxt; /* for hash tables and entries and tuples in
159 : * them */
160 : XLogRecPtr rs_begin_lsn; /* XLogInsertLsn when starting the rewrite */
161 : HTAB *rs_unresolved_tups; /* unmatched A tuples */
162 : HTAB *rs_old_new_tid_map; /* unmatched B tuples */
163 : HTAB *rs_logical_mappings; /* logical remapping files */
164 : uint32 rs_num_rewrite_mappings; /* # in memory mappings */
165 : } RewriteStateData;
166 :
167 : /*
168 : * The lookup keys for the hash tables are tuple TID and xmin (we must check
169 : * both to avoid false matches from dead tuples). Beware that there is
170 : * probably some padding space in this struct; it must be zeroed out for
171 : * correct hashtable operation.
172 : */
173 : typedef struct
174 : {
175 : TransactionId xmin; /* tuple xmin */
176 : ItemPointerData tid; /* tuple location in old heap */
177 : } TidHashKey;
178 :
179 : /*
180 : * Entry structures for the hash tables
181 : */
182 : typedef struct
183 : {
184 : TidHashKey key; /* expected xmin/old location of B tuple */
185 : ItemPointerData old_tid; /* A's location in the old heap */
186 : HeapTuple tuple; /* A's tuple contents */
187 : } UnresolvedTupData;
188 :
189 : typedef UnresolvedTupData *UnresolvedTup;
190 :
191 : typedef struct
192 : {
193 : TidHashKey key; /* actual xmin/old location of B tuple */
194 : ItemPointerData new_tid; /* where we put it in the new heap */
195 : } OldToNewMappingData;
196 :
197 : typedef OldToNewMappingData *OldToNewMapping;
198 :
199 : /*
200 : * In-Memory data for an xid that might need logical remapping entries
201 : * to be logged.
202 : */
203 : typedef struct RewriteMappingFile
204 : {
205 : TransactionId xid; /* xid that might need to see the row */
206 : int vfd; /* fd of mappings file */
207 : off_t off; /* how far have we written yet */
208 : uint32 num_mappings; /* number of in-memory mappings */
209 : dlist_head mappings; /* list of in-memory mappings */
210 : char path[MAXPGPATH]; /* path, for error messages */
211 : } RewriteMappingFile;
212 :
213 : /*
214 : * A single In-Memory logical rewrite mapping, hanging off
215 : * RewriteMappingFile->mappings.
216 : */
217 : typedef struct RewriteMappingDataEntry
218 : {
219 : LogicalRewriteMappingData map; /* map between old and new location of the
220 : * tuple */
221 : dlist_node node;
222 : } RewriteMappingDataEntry;
223 :
224 :
225 : /* prototypes for internal functions */
226 : static void raw_heap_insert(RewriteState state, HeapTuple tup);
227 :
228 : /* internal logical remapping prototypes */
229 : static void logical_begin_heap_rewrite(RewriteState state);
230 : static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple);
231 : static void logical_end_heap_rewrite(RewriteState state);
232 :
233 :
234 : /*
235 : * Begin a rewrite of a table
236 : *
237 : * old_heap old, locked heap relation tuples will be read from
238 : * new_heap new, locked heap relation to insert tuples to
239 : * oldest_xmin xid used by the caller to determine which tuples are dead
240 : * freeze_xid xid before which tuples will be frozen
241 : * min_multi multixact before which multis will be removed
242 : * use_wal should the inserts to the new heap be WAL-logged?
243 : *
244 : * Returns an opaque RewriteState, allocated in current memory context,
245 : * to be used in subsequent calls to the other functions.
246 : */
247 : RewriteState
248 22 : begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xmin,
249 : TransactionId freeze_xid, MultiXactId cutoff_multi,
250 : bool use_wal)
251 : {
252 : RewriteState state;
253 : MemoryContext rw_cxt;
254 : MemoryContext old_cxt;
255 : HASHCTL hash_ctl;
256 :
257 : /*
258 : * To ease cleanup, make a separate context that will contain the
259 : * RewriteState struct itself plus all subsidiary data.
260 : */
261 22 : rw_cxt = AllocSetContextCreate(CurrentMemoryContext,
262 : "Table rewrite",
263 : ALLOCSET_DEFAULT_SIZES);
264 22 : old_cxt = MemoryContextSwitchTo(rw_cxt);
265 :
266 : /* Create and fill in the state struct */
267 22 : state = palloc0(sizeof(RewriteStateData));
268 :
269 22 : state->rs_old_rel = old_heap;
270 22 : state->rs_new_rel = new_heap;
271 22 : state->rs_buffer = (Page) palloc(BLCKSZ);
272 : /* new_heap needn't be empty, just locked */
273 22 : state->rs_blockno = RelationGetNumberOfBlocks(new_heap);
274 22 : state->rs_buffer_valid = false;
275 22 : state->rs_use_wal = use_wal;
276 22 : state->rs_oldest_xmin = oldest_xmin;
277 22 : state->rs_freeze_xid = freeze_xid;
278 22 : state->rs_cutoff_multi = cutoff_multi;
279 22 : state->rs_cxt = rw_cxt;
280 :
281 : /* Initialize hash tables used to track update chains */
282 22 : memset(&hash_ctl, 0, sizeof(hash_ctl));
283 22 : hash_ctl.keysize = sizeof(TidHashKey);
284 22 : hash_ctl.entrysize = sizeof(UnresolvedTupData);
285 22 : hash_ctl.hcxt = state->rs_cxt;
286 :
287 22 : state->rs_unresolved_tups =
288 22 : hash_create("Rewrite / Unresolved ctids",
289 : 128, /* arbitrary initial size */
290 : &hash_ctl,
291 : HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
292 :
293 22 : hash_ctl.entrysize = sizeof(OldToNewMappingData);
294 :
295 22 : state->rs_old_new_tid_map =
296 22 : hash_create("Rewrite / Old to new tid map",
297 : 128, /* arbitrary initial size */
298 : &hash_ctl,
299 : HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
300 :
301 22 : MemoryContextSwitchTo(old_cxt);
302 :
303 22 : logical_begin_heap_rewrite(state);
304 :
305 22 : return state;
306 : }
307 :
308 : /*
309 : * End a rewrite.
310 : *
311 : * state and any other resources are freed.
312 : */
313 : void
314 22 : end_heap_rewrite(RewriteState state)
315 : {
316 : HASH_SEQ_STATUS seq_status;
317 : UnresolvedTup unresolved;
318 :
319 : /*
320 : * Write any remaining tuples in the UnresolvedTups table. If we have any
321 : * left, they should in fact be dead, but let's err on the safe side.
322 : */
323 22 : hash_seq_init(&seq_status, state->rs_unresolved_tups);
324 :
325 44 : while ((unresolved = hash_seq_search(&seq_status)) != NULL)
326 : {
327 0 : ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
328 0 : raw_heap_insert(state, unresolved->tuple);
329 : }
330 :
331 : /* Write the last page, if any */
332 22 : if (state->rs_buffer_valid)
333 : {
334 20 : if (state->rs_use_wal)
335 19 : log_newpage(&state->rs_new_rel->rd_node,
336 : MAIN_FORKNUM,
337 : state->rs_blockno,
338 : state->rs_buffer,
339 : true);
340 20 : RelationOpenSmgr(state->rs_new_rel);
341 :
342 20 : PageSetChecksumInplace(state->rs_buffer, state->rs_blockno);
343 :
344 20 : smgrextend(state->rs_new_rel->rd_smgr, MAIN_FORKNUM, state->rs_blockno,
345 : (char *) state->rs_buffer, true);
346 : }
347 :
348 : /*
349 : * If the rel is WAL-logged, must fsync before commit. We use heap_sync
350 : * to ensure that the toast table gets fsync'd too.
351 : *
352 : * It's obvious that we must do this when not WAL-logging. It's less
353 : * obvious that we have to do it even if we did WAL-log the pages. The
354 : * reason is the same as in tablecmds.c's copy_relation_data(): we're
355 : * writing data that's not in shared buffers, and so a CHECKPOINT
356 : * occurring during the rewriteheap operation won't have fsync'd data we
357 : * wrote before the checkpoint.
358 : */
359 22 : if (RelationNeedsWAL(state->rs_new_rel))
360 21 : heap_sync(state->rs_new_rel);
361 :
362 22 : logical_end_heap_rewrite(state);
363 :
364 : /* Deleting the context frees everything */
365 22 : MemoryContextDelete(state->rs_cxt);
366 22 : }
367 :
368 : /*
369 : * Add a tuple to the new heap.
370 : *
371 : * Visibility information is copied from the original tuple, except that
372 : * we "freeze" very-old tuples. Note that since we scribble on new_tuple,
373 : * it had better be temp storage not a pointer to the original tuple.
374 : *
375 : * state opaque state as returned by begin_heap_rewrite
376 : * old_tuple original tuple in the old heap
377 : * new_tuple new, rewritten tuple to be inserted to new heap
378 : */
379 : void
380 22761 : rewrite_heap_tuple(RewriteState state,
381 : HeapTuple old_tuple, HeapTuple new_tuple)
382 : {
383 : MemoryContext old_cxt;
384 : ItemPointerData old_tid;
385 : TidHashKey hashkey;
386 : bool found;
387 : bool free_new;
388 :
389 22761 : old_cxt = MemoryContextSwitchTo(state->rs_cxt);
390 :
391 : /*
392 : * Copy the original tuple's visibility information into new_tuple.
393 : *
394 : * XXX we might later need to copy some t_infomask2 bits, too? Right now,
395 : * we intentionally clear the HOT status bits.
396 : */
397 22761 : memcpy(&new_tuple->t_data->t_choice.t_heap,
398 22761 : &old_tuple->t_data->t_choice.t_heap,
399 : sizeof(HeapTupleFields));
400 :
401 22761 : new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
402 22761 : new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
403 45522 : new_tuple->t_data->t_infomask |=
404 22761 : old_tuple->t_data->t_infomask & HEAP_XACT_MASK;
405 :
406 : /*
407 : * While we have our hands on the tuple, we may as well freeze any
408 : * eligible xmin or xmax, so that future VACUUM effort can be saved.
409 : */
410 22761 : heap_freeze_tuple(new_tuple->t_data, state->rs_freeze_xid,
411 : state->rs_cutoff_multi);
412 :
413 : /*
414 : * Invalid ctid means that ctid should point to the tuple itself. We'll
415 : * override it later if the tuple is part of an update chain.
416 : */
417 22761 : ItemPointerSetInvalid(&new_tuple->t_data->t_ctid);
418 :
419 : /*
420 : * If the tuple has been updated, check the old-to-new mapping hash table.
421 : */
422 24822 : if (!((old_tuple->t_data->t_infomask & HEAP_XMAX_INVALID) ||
423 4122 : HeapTupleHeaderIsOnlyLocked(old_tuple->t_data)) &&
424 2061 : !(ItemPointerEquals(&(old_tuple->t_self),
425 2061 : &(old_tuple->t_data->t_ctid))))
426 : {
427 : OldToNewMapping mapping;
428 :
429 5 : memset(&hashkey, 0, sizeof(hashkey));
430 5 : hashkey.xmin = HeapTupleHeaderGetUpdateXid(old_tuple->t_data);
431 5 : hashkey.tid = old_tuple->t_data->t_ctid;
432 :
433 5 : mapping = (OldToNewMapping)
434 5 : hash_search(state->rs_old_new_tid_map, &hashkey,
435 : HASH_FIND, NULL);
436 :
437 5 : if (mapping != NULL)
438 : {
439 : /*
440 : * We've already copied the tuple that t_ctid points to, so we can
441 : * set the ctid of this tuple to point to the new location, and
442 : * insert it right away.
443 : */
444 1 : new_tuple->t_data->t_ctid = mapping->new_tid;
445 :
446 : /* We don't need the mapping entry anymore */
447 1 : hash_search(state->rs_old_new_tid_map, &hashkey,
448 : HASH_REMOVE, &found);
449 1 : Assert(found);
450 : }
451 : else
452 : {
453 : /*
454 : * We haven't seen the tuple t_ctid points to yet. Stash this
455 : * tuple into unresolved_tups to be written later.
456 : */
457 : UnresolvedTup unresolved;
458 :
459 4 : unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
460 : HASH_ENTER, &found);
461 4 : Assert(!found);
462 :
463 4 : unresolved->old_tid = old_tuple->t_self;
464 4 : unresolved->tuple = heap_copytuple(new_tuple);
465 :
466 : /*
467 : * We can't do anything more now, since we don't know where the
468 : * tuple will be written.
469 : */
470 4 : MemoryContextSwitchTo(old_cxt);
471 4 : return;
472 : }
473 : }
474 :
475 : /*
476 : * Now we will write the tuple, and then check to see if it is the B tuple
477 : * in any new or known pair. When we resolve a known pair, we will be
478 : * able to write that pair's A tuple, and then we have to check if it
479 : * resolves some other pair. Hence, we need a loop here.
480 : */
481 22757 : old_tid = old_tuple->t_self;
482 22757 : free_new = false;
483 :
484 : for (;;)
485 : {
486 : ItemPointerData new_tid;
487 :
488 : /* Insert the tuple and find out where it's put in new_heap */
489 22761 : raw_heap_insert(state, new_tuple);
490 22761 : new_tid = new_tuple->t_self;
491 :
492 22761 : logical_rewrite_heap_tuple(state, old_tid, new_tuple);
493 :
494 : /*
495 : * If the tuple is the updated version of a row, and the prior version
496 : * wouldn't be DEAD yet, then we need to either resolve the prior
497 : * version (if it's waiting in rs_unresolved_tups), or make an entry
498 : * in rs_old_new_tid_map (so we can resolve it when we do see it). The
499 : * previous tuple's xmax would equal this one's xmin, so it's
500 : * RECENTLY_DEAD if and only if the xmin is not before OldestXmin.
501 : */
502 23027 : if ((new_tuple->t_data->t_infomask & HEAP_UPDATED) &&
503 266 : !TransactionIdPrecedes(HeapTupleHeaderGetXmin(new_tuple->t_data),
504 : state->rs_oldest_xmin))
505 : {
506 : /*
507 : * Okay, this is B in an update pair. See if we've seen A.
508 : */
509 : UnresolvedTup unresolved;
510 :
511 5 : memset(&hashkey, 0, sizeof(hashkey));
512 5 : hashkey.xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
513 5 : hashkey.tid = old_tid;
514 :
515 5 : unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
516 : HASH_FIND, NULL);
517 :
518 5 : if (unresolved != NULL)
519 : {
520 : /*
521 : * We have seen and memorized the previous tuple already. Now
522 : * that we know where we inserted the tuple its t_ctid points
523 : * to, fix its t_ctid and insert it to the new heap.
524 : */
525 4 : if (free_new)
526 2 : heap_freetuple(new_tuple);
527 4 : new_tuple = unresolved->tuple;
528 4 : free_new = true;
529 4 : old_tid = unresolved->old_tid;
530 4 : new_tuple->t_data->t_ctid = new_tid;
531 :
532 : /*
533 : * We don't need the hash entry anymore, but don't free its
534 : * tuple just yet.
535 : */
536 4 : hash_search(state->rs_unresolved_tups, &hashkey,
537 : HASH_REMOVE, &found);
538 4 : Assert(found);
539 :
540 : /* loop back to insert the previous tuple in the chain */
541 4 : continue;
542 : }
543 : else
544 : {
545 : /*
546 : * Remember the new tid of this tuple. We'll use it to set the
547 : * ctid when we find the previous tuple in the chain.
548 : */
549 : OldToNewMapping mapping;
550 :
551 1 : mapping = hash_search(state->rs_old_new_tid_map, &hashkey,
552 : HASH_ENTER, &found);
553 1 : Assert(!found);
554 :
555 1 : mapping->new_tid = new_tid;
556 : }
557 : }
558 :
559 : /* Done with this (chain of) tuples, for now */
560 22757 : if (free_new)
561 2 : heap_freetuple(new_tuple);
562 22757 : break;
563 4 : }
564 :
565 22757 : MemoryContextSwitchTo(old_cxt);
566 : }
567 :
568 : /*
569 : * Register a dead tuple with an ongoing rewrite. Dead tuples are not
570 : * copied to the new table, but we still make note of them so that we
571 : * can release some resources earlier.
572 : *
573 : * Returns true if a tuple was removed from the unresolved_tups table.
574 : * This indicates that that tuple, previously thought to be "recently dead",
575 : * is now known really dead and won't be written to the output.
576 : */
577 : bool
578 4373 : rewrite_heap_dead_tuple(RewriteState state, HeapTuple old_tuple)
579 : {
580 : /*
581 : * If we have already seen an earlier tuple in the update chain that
582 : * points to this tuple, let's forget about that earlier tuple. It's in
583 : * fact dead as well, our simple xmax < OldestXmin test in
584 : * HeapTupleSatisfiesVacuum just wasn't enough to detect it. It happens
585 : * when xmin of a tuple is greater than xmax, which sounds
586 : * counter-intuitive but is perfectly valid.
587 : *
588 : * We don't bother to try to detect the situation the other way round,
589 : * when we encounter the dead tuple first and then the recently dead one
590 : * that points to it. If that happens, we'll have some unmatched entries
591 : * in the UnresolvedTups hash table at the end. That can happen anyway,
592 : * because a vacuum might have removed the dead tuple in the chain before
593 : * us.
594 : */
595 : UnresolvedTup unresolved;
596 : TidHashKey hashkey;
597 : bool found;
598 :
599 4373 : memset(&hashkey, 0, sizeof(hashkey));
600 4373 : hashkey.xmin = HeapTupleHeaderGetXmin(old_tuple->t_data);
601 4373 : hashkey.tid = old_tuple->t_self;
602 :
603 4373 : unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
604 : HASH_FIND, NULL);
605 :
606 4373 : if (unresolved != NULL)
607 : {
608 : /* Need to free the contained tuple as well as the hashtable entry */
609 0 : heap_freetuple(unresolved->tuple);
610 0 : hash_search(state->rs_unresolved_tups, &hashkey,
611 : HASH_REMOVE, &found);
612 0 : Assert(found);
613 0 : return true;
614 : }
615 :
616 4373 : return false;
617 : }
618 :
619 : /*
620 : * Insert a tuple to the new relation. This has to track heap_insert
621 : * and its subsidiary functions!
622 : *
623 : * t_self of the tuple is set to the new TID of the tuple. If t_ctid of the
624 : * tuple is invalid on entry, it's replaced with the new TID as well (in
625 : * the inserted data only, not in the caller's copy).
626 : */
627 : static void
628 22761 : raw_heap_insert(RewriteState state, HeapTuple tup)
629 : {
630 22761 : Page page = state->rs_buffer;
631 : Size pageFreeSpace,
632 : saveFreeSpace;
633 : Size len;
634 : OffsetNumber newoff;
635 : HeapTuple heaptup;
636 :
637 : /*
638 : * If the new tuple is too big for storage or contains already toasted
639 : * out-of-line attributes from some other relation, invoke the toaster.
640 : *
641 : * Note: below this point, heaptup is the data we actually intend to store
642 : * into the relation; tup is the caller's original untoasted data.
643 : */
644 22761 : if (state->rs_new_rel->rd_rel->relkind == RELKIND_TOASTVALUE)
645 : {
646 : /* toast table entries should never be recursively toasted */
647 0 : Assert(!HeapTupleHasExternal(tup));
648 0 : heaptup = tup;
649 : }
650 22761 : else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
651 1 : heaptup = toast_insert_or_update(state->rs_new_rel, tup, NULL,
652 : HEAP_INSERT_SKIP_FSM |
653 1 : (state->rs_use_wal ?
654 : 0 : HEAP_INSERT_SKIP_WAL));
655 : else
656 22760 : heaptup = tup;
657 :
658 22761 : len = MAXALIGN(heaptup->t_len); /* be conservative */
659 :
660 : /*
661 : * If we're gonna fail for oversize tuple, do it right away
662 : */
663 22761 : if (len > MaxHeapTupleSize)
664 0 : ereport(ERROR,
665 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
666 : errmsg("row is too big: size %zu, maximum size %zu",
667 : len, MaxHeapTupleSize)));
668 :
669 : /* Compute desired extra freespace due to fillfactor option */
670 22761 : saveFreeSpace = RelationGetTargetPageFreeSpace(state->rs_new_rel,
671 : HEAP_DEFAULT_FILLFACTOR);
672 :
673 : /* Now we can check to see if there's enough free space already. */
674 22761 : if (state->rs_buffer_valid)
675 : {
676 22741 : pageFreeSpace = PageGetHeapFreeSpace(page);
677 :
678 22741 : if (len + saveFreeSpace > pageFreeSpace)
679 : {
680 : /* Doesn't fit, so write out the existing page */
681 :
682 : /* XLOG stuff */
683 689 : if (state->rs_use_wal)
684 689 : log_newpage(&state->rs_new_rel->rd_node,
685 : MAIN_FORKNUM,
686 : state->rs_blockno,
687 : page,
688 : true);
689 :
690 : /*
691 : * Now write the page. We say isTemp = true even if it's not a
692 : * temp table, because there's no need for smgr to schedule an
693 : * fsync for this write; we'll do it ourselves in
694 : * end_heap_rewrite.
695 : */
696 689 : RelationOpenSmgr(state->rs_new_rel);
697 :
698 689 : PageSetChecksumInplace(page, state->rs_blockno);
699 :
700 689 : smgrextend(state->rs_new_rel->rd_smgr, MAIN_FORKNUM,
701 : state->rs_blockno, (char *) page, true);
702 :
703 689 : state->rs_blockno++;
704 689 : state->rs_buffer_valid = false;
705 : }
706 : }
707 :
708 22761 : if (!state->rs_buffer_valid)
709 : {
710 : /* Initialize a new empty page */
711 709 : PageInit(page, BLCKSZ, 0);
712 709 : state->rs_buffer_valid = true;
713 : }
714 :
715 : /* And now we can insert the tuple into the page */
716 22761 : newoff = PageAddItem(page, (Item) heaptup->t_data, heaptup->t_len,
717 : InvalidOffsetNumber, false, true);
718 22761 : if (newoff == InvalidOffsetNumber)
719 0 : elog(ERROR, "failed to add tuple");
720 :
721 : /* Update caller's t_self to the actual position where it was stored */
722 22761 : ItemPointerSet(&(tup->t_self), state->rs_blockno, newoff);
723 :
724 : /*
725 : * Insert the correct position into CTID of the stored tuple, too, if the
726 : * caller didn't supply a valid CTID.
727 : */
728 22761 : if (!ItemPointerIsValid(&tup->t_data->t_ctid))
729 : {
730 : ItemId newitemid;
731 : HeapTupleHeader onpage_tup;
732 :
733 22756 : newitemid = PageGetItemId(page, newoff);
734 22756 : onpage_tup = (HeapTupleHeader) PageGetItem(page, newitemid);
735 :
736 22756 : onpage_tup->t_ctid = tup->t_self;
737 : }
738 :
739 : /* If heaptup is a private copy, release it. */
740 22761 : if (heaptup != tup)
741 1 : heap_freetuple(heaptup);
742 22761 : }
743 :
744 : /* ------------------------------------------------------------------------
745 : * Logical rewrite support
746 : *
747 : * When doing logical decoding - which relies on using cmin/cmax of catalog
748 : * tuples, via xl_heap_new_cid records - heap rewrites have to log enough
749 : * information to allow the decoding backend to updates its internal mapping
750 : * of (relfilenode,ctid) => (cmin, cmax) to be correct for the rewritten heap.
751 : *
752 : * For that, every time we find a tuple that's been modified in a catalog
753 : * relation within the xmin horizon of any decoding slot, we log a mapping
754 : * from the old to the new location.
755 : *
756 : * To deal with rewrites that abort the filename of a mapping file contains
757 : * the xid of the transaction performing the rewrite, which then can be
758 : * checked before being read in.
759 : *
760 : * For efficiency we don't immediately spill every single map mapping for a
761 : * row to disk but only do so in batches when we've collected several of them
762 : * in memory or when end_heap_rewrite() has been called.
763 : *
764 : * Crash-Safety: This module diverts from the usual patterns of doing WAL
765 : * since it cannot rely on checkpoint flushing out all buffers and thus
766 : * waiting for exclusive locks on buffers. Usually the XLogInsert() covering
767 : * buffer modifications is performed while the buffer(s) that are being
768 : * modified are exclusively locked guaranteeing that both the WAL record and
769 : * the modified heap are on either side of the checkpoint. But since the
770 : * mapping files we log aren't in shared_buffers that interlock doesn't work.
771 : *
772 : * Instead we simply write the mapping files out to disk, *before* the
773 : * XLogInsert() is performed. That guarantees that either the XLogInsert() is
774 : * inserted after the checkpoint's redo pointer or that the checkpoint (via
775 : * LogicalRewriteHeapCheckpoint()) has flushed the (partial) mapping file to
776 : * disk. That leaves the tail end that has not yet been flushed open to
777 : * corruption, which is solved by including the current offset in the
778 : * xl_heap_rewrite_mapping records and truncating the mapping file to it
779 : * during replay. Every time a rewrite is finished all generated mapping files
780 : * are synced to disk.
781 : *
782 : * Note that if we were only concerned about crash safety we wouldn't have to
783 : * deal with WAL logging at all - an fsync() at the end of a rewrite would be
784 : * sufficient for crash safety. Any mapping that hasn't been safely flushed to
785 : * disk has to be by an aborted (explicitly or via a crash) transaction and is
786 : * ignored by virtue of the xid in its name being subject to a
787 : * TransactionDidCommit() check. But we want to support having standbys via
788 : * physical replication, both for availability and to do logical decoding
789 : * there.
790 : * ------------------------------------------------------------------------
791 : */
792 :
793 : /*
794 : * Do preparations for logging logical mappings during a rewrite if
795 : * necessary. If we detect that we don't need to log anything we'll prevent
796 : * any further action by the various logical rewrite functions.
797 : */
798 : static void
799 22 : logical_begin_heap_rewrite(RewriteState state)
800 : {
801 : HASHCTL hash_ctl;
802 : TransactionId logical_xmin;
803 :
804 : /*
805 : * We only need to persist these mappings if the rewritten table can be
806 : * accessed during logical decoding, if not, we can skip doing any
807 : * additional work.
808 : */
809 22 : state->rs_logical_rewrite =
810 22 : RelationIsAccessibleInLogicalDecoding(state->rs_old_rel);
811 :
812 22 : if (!state->rs_logical_rewrite)
813 44 : return;
814 :
815 0 : ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin);
816 :
817 : /*
818 : * If there are no logical slots in progress we don't need to do anything,
819 : * there cannot be any remappings for relevant rows yet. The relation's
820 : * lock protects us against races.
821 : */
822 0 : if (logical_xmin == InvalidTransactionId)
823 : {
824 0 : state->rs_logical_rewrite = false;
825 0 : return;
826 : }
827 :
828 0 : state->rs_logical_xmin = logical_xmin;
829 0 : state->rs_begin_lsn = GetXLogInsertRecPtr();
830 0 : state->rs_num_rewrite_mappings = 0;
831 :
832 0 : memset(&hash_ctl, 0, sizeof(hash_ctl));
833 0 : hash_ctl.keysize = sizeof(TransactionId);
834 0 : hash_ctl.entrysize = sizeof(RewriteMappingFile);
835 0 : hash_ctl.hcxt = state->rs_cxt;
836 :
837 0 : state->rs_logical_mappings =
838 0 : hash_create("Logical rewrite mapping",
839 : 128, /* arbitrary initial size */
840 : &hash_ctl,
841 : HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
842 : }
843 :
844 : /*
845 : * Flush all logical in-memory mappings to disk, but don't fsync them yet.
846 : */
847 : static void
848 0 : logical_heap_rewrite_flush_mappings(RewriteState state)
849 : {
850 : HASH_SEQ_STATUS seq_status;
851 : RewriteMappingFile *src;
852 : dlist_mutable_iter iter;
853 :
854 0 : Assert(state->rs_logical_rewrite);
855 :
856 : /* no logical rewrite in progress, no need to iterate over mappings */
857 0 : if (state->rs_num_rewrite_mappings == 0)
858 0 : return;
859 :
860 0 : elog(DEBUG1, "flushing %u logical rewrite mapping entries",
861 : state->rs_num_rewrite_mappings);
862 :
863 0 : hash_seq_init(&seq_status, state->rs_logical_mappings);
864 0 : while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
865 : {
866 : char *waldata;
867 : char *waldata_start;
868 : xl_heap_rewrite_mapping xlrec;
869 : Oid dboid;
870 : uint32 len;
871 : int written;
872 :
873 : /* this file hasn't got any new mappings */
874 0 : if (src->num_mappings == 0)
875 0 : continue;
876 :
877 0 : if (state->rs_old_rel->rd_rel->relisshared)
878 0 : dboid = InvalidOid;
879 : else
880 0 : dboid = MyDatabaseId;
881 :
882 0 : xlrec.num_mappings = src->num_mappings;
883 0 : xlrec.mapped_rel = RelationGetRelid(state->rs_old_rel);
884 0 : xlrec.mapped_xid = src->xid;
885 0 : xlrec.mapped_db = dboid;
886 0 : xlrec.offset = src->off;
887 0 : xlrec.start_lsn = state->rs_begin_lsn;
888 :
889 : /* write all mappings consecutively */
890 0 : len = src->num_mappings * sizeof(LogicalRewriteMappingData);
891 0 : waldata_start = waldata = palloc(len);
892 :
893 : /*
894 : * collect data we need to write out, but don't modify ondisk data yet
895 : */
896 0 : dlist_foreach_modify(iter, &src->mappings)
897 : {
898 : RewriteMappingDataEntry *pmap;
899 :
900 0 : pmap = dlist_container(RewriteMappingDataEntry, node, iter.cur);
901 :
902 0 : memcpy(waldata, &pmap->map, sizeof(pmap->map));
903 0 : waldata += sizeof(pmap->map);
904 :
905 : /* remove from the list and free */
906 0 : dlist_delete(&pmap->node);
907 0 : pfree(pmap);
908 :
909 : /* update bookkeeping */
910 0 : state->rs_num_rewrite_mappings--;
911 0 : src->num_mappings--;
912 : }
913 :
914 0 : Assert(src->num_mappings == 0);
915 0 : Assert(waldata == waldata_start + len);
916 :
917 : /*
918 : * Note that we deviate from the usual WAL coding practices here,
919 : * check the above "Logical rewrite support" comment for reasoning.
920 : */
921 0 : written = FileWrite(src->vfd, waldata_start, len,
922 : WAIT_EVENT_LOGICAL_REWRITE_WRITE);
923 0 : if (written != len)
924 0 : ereport(ERROR,
925 : (errcode_for_file_access(),
926 : errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path,
927 : written, len)));
928 0 : src->off += len;
929 :
930 0 : XLogBeginInsert();
931 0 : XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
932 0 : XLogRegisterData(waldata_start, len);
933 :
934 : /* write xlog record */
935 0 : XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_REWRITE);
936 :
937 0 : pfree(waldata_start);
938 : }
939 0 : Assert(state->rs_num_rewrite_mappings == 0);
940 : }
941 :
942 : /*
943 : * Logical remapping part of end_heap_rewrite().
944 : */
945 : static void
946 22 : logical_end_heap_rewrite(RewriteState state)
947 : {
948 : HASH_SEQ_STATUS seq_status;
949 : RewriteMappingFile *src;
950 :
951 : /* done, no logical rewrite in progress */
952 22 : if (!state->rs_logical_rewrite)
953 44 : return;
954 :
955 : /* writeout remaining in-memory entries */
956 0 : if (state->rs_num_rewrite_mappings > 0)
957 0 : logical_heap_rewrite_flush_mappings(state);
958 :
959 : /* Iterate over all mappings we have written and fsync the files. */
960 0 : hash_seq_init(&seq_status, state->rs_logical_mappings);
961 0 : while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
962 : {
963 0 : if (FileSync(src->vfd, WAIT_EVENT_LOGICAL_REWRITE_SYNC) != 0)
964 0 : ereport(ERROR,
965 : (errcode_for_file_access(),
966 : errmsg("could not fsync file \"%s\": %m", src->path)));
967 0 : FileClose(src->vfd);
968 : }
969 : /* memory context cleanup will deal with the rest */
970 : }
971 :
972 : /*
973 : * Log a single (old->new) mapping for 'xid'.
974 : */
975 : static void
976 0 : logical_rewrite_log_mapping(RewriteState state, TransactionId xid,
977 : LogicalRewriteMappingData *map)
978 : {
979 : RewriteMappingFile *src;
980 : RewriteMappingDataEntry *pmap;
981 : Oid relid;
982 : bool found;
983 :
984 0 : relid = RelationGetRelid(state->rs_old_rel);
985 :
986 : /* look for existing mappings for this 'mapped' xid */
987 0 : src = hash_search(state->rs_logical_mappings, &xid,
988 : HASH_ENTER, &found);
989 :
990 : /*
991 : * We haven't yet had the need to map anything for this xid, create
992 : * per-xid data structures.
993 : */
994 0 : if (!found)
995 : {
996 : char path[MAXPGPATH];
997 : Oid dboid;
998 :
999 0 : if (state->rs_old_rel->rd_rel->relisshared)
1000 0 : dboid = InvalidOid;
1001 : else
1002 0 : dboid = MyDatabaseId;
1003 :
1004 0 : snprintf(path, MAXPGPATH,
1005 : "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
1006 : dboid, relid,
1007 0 : (uint32) (state->rs_begin_lsn >> 32),
1008 0 : (uint32) state->rs_begin_lsn,
1009 : xid, GetCurrentTransactionId());
1010 :
1011 0 : dlist_init(&src->mappings);
1012 0 : src->num_mappings = 0;
1013 0 : src->off = 0;
1014 0 : memcpy(src->path, path, sizeof(path));
1015 0 : src->vfd = PathNameOpenFile(path,
1016 : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
1017 : S_IRUSR | S_IWUSR);
1018 0 : if (src->vfd < 0)
1019 0 : ereport(ERROR,
1020 : (errcode_for_file_access(),
1021 : errmsg("could not create file \"%s\": %m", path)));
1022 : }
1023 :
1024 0 : pmap = MemoryContextAlloc(state->rs_cxt,
1025 : sizeof(RewriteMappingDataEntry));
1026 0 : memcpy(&pmap->map, map, sizeof(LogicalRewriteMappingData));
1027 0 : dlist_push_tail(&src->mappings, &pmap->node);
1028 0 : src->num_mappings++;
1029 0 : state->rs_num_rewrite_mappings++;
1030 :
1031 : /*
1032 : * Write out buffer every time we've too many in-memory entries across all
1033 : * mapping files.
1034 : */
1035 0 : if (state->rs_num_rewrite_mappings >= 1000 /* arbitrary number */ )
1036 0 : logical_heap_rewrite_flush_mappings(state);
1037 0 : }
1038 :
1039 : /*
1040 : * Perform logical remapping for a tuple that's mapped from old_tid to
1041 : * new_tuple->t_self by rewrite_heap_tuple() if necessary for the tuple.
1042 : */
1043 : static void
1044 22761 : logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid,
1045 : HeapTuple new_tuple)
1046 : {
1047 22761 : ItemPointerData new_tid = new_tuple->t_self;
1048 22761 : TransactionId cutoff = state->rs_logical_xmin;
1049 : TransactionId xmin;
1050 : TransactionId xmax;
1051 22761 : bool do_log_xmin = false;
1052 22761 : bool do_log_xmax = false;
1053 : LogicalRewriteMappingData map;
1054 :
1055 : /* no logical rewrite in progress, we don't need to log anything */
1056 22761 : if (!state->rs_logical_rewrite)
1057 45522 : return;
1058 :
1059 0 : xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
1060 : /* use *GetUpdateXid to correctly deal with multixacts */
1061 0 : xmax = HeapTupleHeaderGetUpdateXid(new_tuple->t_data);
1062 :
1063 : /*
1064 : * Log the mapping iff the tuple has been created recently.
1065 : */
1066 0 : if (TransactionIdIsNormal(xmin) && !TransactionIdPrecedes(xmin, cutoff))
1067 0 : do_log_xmin = true;
1068 :
1069 0 : if (!TransactionIdIsNormal(xmax))
1070 : {
1071 : /*
1072 : * no xmax is set, can't have any permanent ones, so this check is
1073 : * sufficient
1074 : */
1075 : }
1076 0 : else if (HEAP_XMAX_IS_LOCKED_ONLY(new_tuple->t_data->t_infomask))
1077 : {
1078 : /* only locked, we don't care */
1079 : }
1080 0 : else if (!TransactionIdPrecedes(xmax, cutoff))
1081 : {
1082 : /* tuple has been deleted recently, log */
1083 0 : do_log_xmax = true;
1084 : }
1085 :
1086 : /* if neither needs to be logged, we're done */
1087 0 : if (!do_log_xmin && !do_log_xmax)
1088 0 : return;
1089 :
1090 : /* fill out mapping information */
1091 0 : map.old_node = state->rs_old_rel->rd_node;
1092 0 : map.old_tid = old_tid;
1093 0 : map.new_node = state->rs_new_rel->rd_node;
1094 0 : map.new_tid = new_tid;
1095 :
1096 : /* ---
1097 : * Now persist the mapping for the individual xids that are affected. We
1098 : * need to log for both xmin and xmax if they aren't the same transaction
1099 : * since the mapping files are per "affected" xid.
1100 : * We don't muster all that much effort detecting whether xmin and xmax
1101 : * are actually the same transaction, we just check whether the xid is the
1102 : * same disregarding subtransactions. Logging too much is relatively
1103 : * harmless and we could never do the check fully since subtransaction
1104 : * data is thrown away during restarts.
1105 : * ---
1106 : */
1107 0 : if (do_log_xmin)
1108 0 : logical_rewrite_log_mapping(state, xmin, &map);
1109 : /* separately log mapping for xmax unless it'd be redundant */
1110 0 : if (do_log_xmax && !TransactionIdEquals(xmin, xmax))
1111 0 : logical_rewrite_log_mapping(state, xmax, &map);
1112 : }
1113 :
1114 : /*
1115 : * Replay XLOG_HEAP2_REWRITE records
1116 : */
1117 : void
1118 0 : heap_xlog_logical_rewrite(XLogReaderState *r)
1119 : {
1120 : char path[MAXPGPATH];
1121 : int fd;
1122 : xl_heap_rewrite_mapping *xlrec;
1123 : uint32 len;
1124 : char *data;
1125 :
1126 0 : xlrec = (xl_heap_rewrite_mapping *) XLogRecGetData(r);
1127 :
1128 0 : snprintf(path, MAXPGPATH,
1129 : "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
1130 : xlrec->mapped_db, xlrec->mapped_rel,
1131 0 : (uint32) (xlrec->start_lsn >> 32),
1132 0 : (uint32) xlrec->start_lsn,
1133 0 : xlrec->mapped_xid, XLogRecGetXid(r));
1134 :
1135 0 : fd = OpenTransientFile(path,
1136 : O_CREAT | O_WRONLY | PG_BINARY,
1137 : S_IRUSR | S_IWUSR);
1138 0 : if (fd < 0)
1139 0 : ereport(ERROR,
1140 : (errcode_for_file_access(),
1141 : errmsg("could not create file \"%s\": %m", path)));
1142 :
1143 : /*
1144 : * Truncate all data that's not guaranteed to have been safely fsynced (by
1145 : * previous record or by the last checkpoint).
1146 : */
1147 0 : pgstat_report_wait_start(WAIT_EVENT_LOGICAL_REWRITE_TRUNCATE);
1148 0 : if (ftruncate(fd, xlrec->offset) != 0)
1149 0 : ereport(ERROR,
1150 : (errcode_for_file_access(),
1151 : errmsg("could not truncate file \"%s\" to %u: %m",
1152 : path, (uint32) xlrec->offset)));
1153 0 : pgstat_report_wait_end();
1154 :
1155 : /* now seek to the position we want to write our data to */
1156 0 : if (lseek(fd, xlrec->offset, SEEK_SET) != xlrec->offset)
1157 0 : ereport(ERROR,
1158 : (errcode_for_file_access(),
1159 : errmsg("could not seek to end of file \"%s\": %m",
1160 : path)));
1161 :
1162 0 : data = XLogRecGetData(r) + sizeof(*xlrec);
1163 :
1164 0 : len = xlrec->num_mappings * sizeof(LogicalRewriteMappingData);
1165 :
1166 : /* write out tail end of mapping file (again) */
1167 0 : pgstat_report_wait_start(WAIT_EVENT_LOGICAL_REWRITE_MAPPING_WRITE);
1168 0 : if (write(fd, data, len) != len)
1169 0 : ereport(ERROR,
1170 : (errcode_for_file_access(),
1171 : errmsg("could not write to file \"%s\": %m", path)));
1172 0 : pgstat_report_wait_end();
1173 :
1174 : /*
1175 : * Now fsync all previously written data. We could improve things and only
1176 : * do this for the last write to a file, but the required bookkeeping
1177 : * doesn't seem worth the trouble.
1178 : */
1179 0 : pgstat_report_wait_start(WAIT_EVENT_LOGICAL_REWRITE_MAPPING_SYNC);
1180 0 : if (pg_fsync(fd) != 0)
1181 0 : ereport(ERROR,
1182 : (errcode_for_file_access(),
1183 : errmsg("could not fsync file \"%s\": %m", path)));
1184 0 : pgstat_report_wait_end();
1185 :
1186 0 : CloseTransientFile(fd);
1187 0 : }
1188 :
1189 : /* ---
1190 : * Perform a checkpoint for logical rewrite mappings
1191 : *
1192 : * This serves two tasks:
1193 : * 1) Remove all mappings not needed anymore based on the logical restart LSN
1194 : * 2) Flush all remaining mappings to disk, so that replay after a checkpoint
1195 : * only has to deal with the parts of a mapping that have been written out
1196 : * after the checkpoint started.
1197 : * ---
1198 : */
1199 : void
1200 11 : CheckPointLogicalRewriteHeap(void)
1201 : {
1202 : XLogRecPtr cutoff;
1203 : XLogRecPtr redo;
1204 : DIR *mappings_dir;
1205 : struct dirent *mapping_de;
1206 : char path[MAXPGPATH + 20];
1207 :
1208 : /*
1209 : * We start of with a minimum of the last redo pointer. No new decoding
1210 : * slot will start before that, so that's a safe upper bound for removal.
1211 : */
1212 11 : redo = GetRedoRecPtr();
1213 :
1214 : /* now check for the restart ptrs from existing slots */
1215 11 : cutoff = ReplicationSlotsComputeLogicalRestartLSN();
1216 :
1217 : /* don't start earlier than the restart lsn */
1218 11 : if (cutoff != InvalidXLogRecPtr && redo < cutoff)
1219 0 : cutoff = redo;
1220 :
1221 11 : mappings_dir = AllocateDir("pg_logical/mappings");
1222 44 : while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
1223 : {
1224 : struct stat statbuf;
1225 : Oid dboid;
1226 : Oid relid;
1227 : XLogRecPtr lsn;
1228 : TransactionId rewrite_xid;
1229 : TransactionId create_xid;
1230 : uint32 hi,
1231 : lo;
1232 :
1233 33 : if (strcmp(mapping_de->d_name, ".") == 0 ||
1234 11 : strcmp(mapping_de->d_name, "..") == 0)
1235 44 : continue;
1236 :
1237 0 : snprintf(path, sizeof(path), "pg_logical/mappings/%s", mapping_de->d_name);
1238 0 : if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
1239 0 : continue;
1240 :
1241 : /* Skip over files that cannot be ours. */
1242 0 : if (strncmp(mapping_de->d_name, "map-", 4) != 0)
1243 0 : continue;
1244 :
1245 0 : if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
1246 : &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
1247 0 : elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
1248 :
1249 0 : lsn = ((uint64) hi) << 32 | lo;
1250 :
1251 0 : if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
1252 : {
1253 0 : elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
1254 0 : if (unlink(path) < 0)
1255 0 : ereport(ERROR,
1256 : (errcode_for_file_access(),
1257 : errmsg("could not remove file \"%s\": %m", path)));
1258 : }
1259 : else
1260 : {
1261 0 : int fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
1262 :
1263 : /*
1264 : * The file cannot vanish due to concurrency since this function
1265 : * is the only one removing logical mappings and it's run while
1266 : * CheckpointLock is held exclusively.
1267 : */
1268 0 : if (fd < 0)
1269 0 : ereport(ERROR,
1270 : (errcode_for_file_access(),
1271 : errmsg("could not open file \"%s\": %m", path)));
1272 :
1273 : /*
1274 : * We could try to avoid fsyncing files that either haven't
1275 : * changed or have only been created since the checkpoint's start,
1276 : * but it's currently not deemed worth the effort.
1277 : */
1278 0 : pgstat_report_wait_start(WAIT_EVENT_LOGICAL_REWRITE_CHECKPOINT_SYNC);
1279 0 : if (pg_fsync(fd) != 0)
1280 0 : ereport(ERROR,
1281 : (errcode_for_file_access(),
1282 : errmsg("could not fsync file \"%s\": %m", path)));
1283 0 : pgstat_report_wait_end();
1284 0 : CloseTransientFile(fd);
1285 : }
1286 : }
1287 11 : FreeDir(mappings_dir);
1288 11 : }
|