Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * spgxlog.c
4 : * WAL replay logic for SP-GiST
5 : *
6 : *
7 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994, Regents of the University of California
9 : *
10 : * IDENTIFICATION
11 : * src/backend/access/spgist/spgxlog.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : #include "postgres.h"
16 :
17 : #include "access/bufmask.h"
18 : #include "access/spgist_private.h"
19 : #include "access/spgxlog.h"
20 : #include "access/transam.h"
21 : #include "access/xlog.h"
22 : #include "access/xlogutils.h"
23 : #include "storage/standby.h"
24 : #include "utils/memutils.h"
25 :
26 :
27 : static MemoryContext opCtx; /* working memory for operations */
28 :
29 :
30 : /*
31 : * Prepare a dummy SpGistState, with just the minimum info needed for replay.
32 : *
33 : * At present, all we need is enough info to support spgFormDeadTuple(),
34 : * plus the isBuild flag.
35 : */
36 : static void
37 0 : fillFakeState(SpGistState *state, spgxlogState stateSrc)
38 : {
39 0 : memset(state, 0, sizeof(*state));
40 :
41 0 : state->myXid = stateSrc.myXid;
42 0 : state->isBuild = stateSrc.isBuild;
43 0 : state->deadTupleStorage = palloc0(SGDTSIZE);
44 0 : }
45 :
46 : /*
47 : * Add a leaf tuple, or replace an existing placeholder tuple. This is used
48 : * to replay SpGistPageAddNewItem() operations. If the offset points at an
49 : * existing tuple, it had better be a placeholder tuple.
50 : */
51 : static void
52 0 : addOrReplaceTuple(Page page, Item tuple, int size, OffsetNumber offset)
53 : {
54 0 : if (offset <= PageGetMaxOffsetNumber(page))
55 : {
56 0 : SpGistDeadTuple dt = (SpGistDeadTuple) PageGetItem(page,
57 : PageGetItemId(page, offset));
58 :
59 0 : if (dt->tupstate != SPGIST_PLACEHOLDER)
60 0 : elog(ERROR, "SPGiST tuple to be replaced is not a placeholder");
61 :
62 0 : Assert(SpGistPageGetOpaque(page)->nPlaceholder > 0);
63 0 : SpGistPageGetOpaque(page)->nPlaceholder--;
64 :
65 0 : PageIndexTupleDelete(page, offset);
66 : }
67 :
68 0 : Assert(offset <= PageGetMaxOffsetNumber(page) + 1);
69 :
70 0 : if (PageAddItem(page, tuple, size, offset, false, false) != offset)
71 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
72 : size);
73 0 : }
74 :
75 : static void
76 0 : spgRedoCreateIndex(XLogReaderState *record)
77 : {
78 0 : XLogRecPtr lsn = record->EndRecPtr;
79 : Buffer buffer;
80 : Page page;
81 :
82 0 : buffer = XLogInitBufferForRedo(record, 0);
83 0 : Assert(BufferGetBlockNumber(buffer) == SPGIST_METAPAGE_BLKNO);
84 0 : page = (Page) BufferGetPage(buffer);
85 0 : SpGistInitMetapage(page);
86 0 : PageSetLSN(page, lsn);
87 0 : MarkBufferDirty(buffer);
88 0 : UnlockReleaseBuffer(buffer);
89 :
90 0 : buffer = XLogInitBufferForRedo(record, 1);
91 0 : Assert(BufferGetBlockNumber(buffer) == SPGIST_ROOT_BLKNO);
92 0 : SpGistInitBuffer(buffer, SPGIST_LEAF);
93 0 : page = (Page) BufferGetPage(buffer);
94 0 : PageSetLSN(page, lsn);
95 0 : MarkBufferDirty(buffer);
96 0 : UnlockReleaseBuffer(buffer);
97 :
98 0 : buffer = XLogInitBufferForRedo(record, 2);
99 0 : Assert(BufferGetBlockNumber(buffer) == SPGIST_NULL_BLKNO);
100 0 : SpGistInitBuffer(buffer, SPGIST_LEAF | SPGIST_NULLS);
101 0 : page = (Page) BufferGetPage(buffer);
102 0 : PageSetLSN(page, lsn);
103 0 : MarkBufferDirty(buffer);
104 0 : UnlockReleaseBuffer(buffer);
105 0 : }
106 :
107 : static void
108 0 : spgRedoAddLeaf(XLogReaderState *record)
109 : {
110 0 : XLogRecPtr lsn = record->EndRecPtr;
111 0 : char *ptr = XLogRecGetData(record);
112 0 : spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr;
113 : char *leafTuple;
114 : SpGistLeafTupleData leafTupleHdr;
115 : Buffer buffer;
116 : Page page;
117 : XLogRedoAction action;
118 :
119 0 : ptr += sizeof(spgxlogAddLeaf);
120 0 : leafTuple = ptr;
121 : /* the leaf tuple is unaligned, so make a copy to access its header */
122 0 : memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
123 :
124 : /*
125 : * In normal operation we would have both current and parent pages locked
126 : * simultaneously; but in WAL replay it should be safe to update the leaf
127 : * page before updating the parent.
128 : */
129 0 : if (xldata->newPage)
130 : {
131 0 : buffer = XLogInitBufferForRedo(record, 0);
132 0 : SpGistInitBuffer(buffer,
133 0 : SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
134 0 : action = BLK_NEEDS_REDO;
135 : }
136 : else
137 0 : action = XLogReadBufferForRedo(record, 0, &buffer);
138 :
139 0 : if (action == BLK_NEEDS_REDO)
140 : {
141 0 : page = BufferGetPage(buffer);
142 :
143 : /* insert new tuple */
144 0 : if (xldata->offnumLeaf != xldata->offnumHeadLeaf)
145 : {
146 : /* normal cases, tuple was added by SpGistPageAddNewItem */
147 0 : addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
148 0 : xldata->offnumLeaf);
149 :
150 : /* update head tuple's chain link if needed */
151 0 : if (xldata->offnumHeadLeaf != InvalidOffsetNumber)
152 : {
153 : SpGistLeafTuple head;
154 :
155 0 : head = (SpGistLeafTuple) PageGetItem(page,
156 : PageGetItemId(page, xldata->offnumHeadLeaf));
157 0 : Assert(head->nextOffset == leafTupleHdr.nextOffset);
158 0 : head->nextOffset = xldata->offnumLeaf;
159 : }
160 : }
161 : else
162 : {
163 : /* replacing a DEAD tuple */
164 0 : PageIndexTupleDelete(page, xldata->offnumLeaf);
165 0 : if (PageAddItem(page,
166 : (Item) leafTuple, leafTupleHdr.size,
167 0 : xldata->offnumLeaf, false, false) != xldata->offnumLeaf)
168 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
169 : leafTupleHdr.size);
170 : }
171 :
172 0 : PageSetLSN(page, lsn);
173 0 : MarkBufferDirty(buffer);
174 : }
175 0 : if (BufferIsValid(buffer))
176 0 : UnlockReleaseBuffer(buffer);
177 :
178 : /* update parent downlink if necessary */
179 0 : if (xldata->offnumParent != InvalidOffsetNumber)
180 : {
181 0 : if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
182 : {
183 : SpGistInnerTuple tuple;
184 : BlockNumber blknoLeaf;
185 :
186 0 : XLogRecGetBlockTag(record, 0, NULL, NULL, &blknoLeaf);
187 :
188 0 : page = BufferGetPage(buffer);
189 :
190 0 : tuple = (SpGistInnerTuple) PageGetItem(page,
191 : PageGetItemId(page, xldata->offnumParent));
192 :
193 0 : spgUpdateNodeLink(tuple, xldata->nodeI,
194 0 : blknoLeaf, xldata->offnumLeaf);
195 :
196 0 : PageSetLSN(page, lsn);
197 0 : MarkBufferDirty(buffer);
198 : }
199 0 : if (BufferIsValid(buffer))
200 0 : UnlockReleaseBuffer(buffer);
201 : }
202 0 : }
203 :
204 : static void
205 0 : spgRedoMoveLeafs(XLogReaderState *record)
206 : {
207 0 : XLogRecPtr lsn = record->EndRecPtr;
208 0 : char *ptr = XLogRecGetData(record);
209 0 : spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr;
210 : SpGistState state;
211 : OffsetNumber *toDelete;
212 : OffsetNumber *toInsert;
213 : int nInsert;
214 : Buffer buffer;
215 : Page page;
216 : XLogRedoAction action;
217 : BlockNumber blknoDst;
218 :
219 0 : XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoDst);
220 :
221 0 : fillFakeState(&state, xldata->stateSrc);
222 :
223 0 : nInsert = xldata->replaceDead ? 1 : xldata->nMoves + 1;
224 :
225 0 : ptr += SizeOfSpgxlogMoveLeafs;
226 0 : toDelete = (OffsetNumber *) ptr;
227 0 : ptr += sizeof(OffsetNumber) * xldata->nMoves;
228 0 : toInsert = (OffsetNumber *) ptr;
229 0 : ptr += sizeof(OffsetNumber) * nInsert;
230 :
231 : /* now ptr points to the list of leaf tuples */
232 :
233 : /*
234 : * In normal operation we would have all three pages (source, dest, and
235 : * parent) locked simultaneously; but in WAL replay it should be safe to
236 : * update them one at a time, as long as we do it in the right order.
237 : */
238 :
239 : /* Insert tuples on the dest page (do first, so redirect is valid) */
240 0 : if (xldata->newPage)
241 : {
242 0 : buffer = XLogInitBufferForRedo(record, 1);
243 0 : SpGistInitBuffer(buffer,
244 0 : SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
245 0 : action = BLK_NEEDS_REDO;
246 : }
247 : else
248 0 : action = XLogReadBufferForRedo(record, 1, &buffer);
249 :
250 0 : if (action == BLK_NEEDS_REDO)
251 : {
252 : int i;
253 :
254 0 : page = BufferGetPage(buffer);
255 :
256 0 : for (i = 0; i < nInsert; i++)
257 : {
258 : char *leafTuple;
259 : SpGistLeafTupleData leafTupleHdr;
260 :
261 : /*
262 : * the tuples are not aligned, so must copy to access the size
263 : * field.
264 : */
265 0 : leafTuple = ptr;
266 0 : memcpy(&leafTupleHdr, leafTuple,
267 : sizeof(SpGistLeafTupleData));
268 :
269 0 : addOrReplaceTuple(page, (Item) leafTuple,
270 0 : leafTupleHdr.size, toInsert[i]);
271 0 : ptr += leafTupleHdr.size;
272 : }
273 :
274 0 : PageSetLSN(page, lsn);
275 0 : MarkBufferDirty(buffer);
276 : }
277 0 : if (BufferIsValid(buffer))
278 0 : UnlockReleaseBuffer(buffer);
279 :
280 : /* Delete tuples from the source page, inserting a redirection pointer */
281 0 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
282 : {
283 0 : page = BufferGetPage(buffer);
284 :
285 0 : spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves,
286 0 : state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
287 : SPGIST_PLACEHOLDER,
288 : blknoDst,
289 0 : toInsert[nInsert - 1]);
290 :
291 0 : PageSetLSN(page, lsn);
292 0 : MarkBufferDirty(buffer);
293 : }
294 0 : if (BufferIsValid(buffer))
295 0 : UnlockReleaseBuffer(buffer);
296 :
297 : /* And update the parent downlink */
298 0 : if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
299 : {
300 : SpGistInnerTuple tuple;
301 :
302 0 : page = BufferGetPage(buffer);
303 :
304 0 : tuple = (SpGistInnerTuple) PageGetItem(page,
305 : PageGetItemId(page, xldata->offnumParent));
306 :
307 0 : spgUpdateNodeLink(tuple, xldata->nodeI,
308 0 : blknoDst, toInsert[nInsert - 1]);
309 :
310 0 : PageSetLSN(page, lsn);
311 0 : MarkBufferDirty(buffer);
312 : }
313 0 : if (BufferIsValid(buffer))
314 0 : UnlockReleaseBuffer(buffer);
315 0 : }
316 :
317 : static void
318 0 : spgRedoAddNode(XLogReaderState *record)
319 : {
320 0 : XLogRecPtr lsn = record->EndRecPtr;
321 0 : char *ptr = XLogRecGetData(record);
322 0 : spgxlogAddNode *xldata = (spgxlogAddNode *) ptr;
323 : char *innerTuple;
324 : SpGistInnerTupleData innerTupleHdr;
325 : SpGistState state;
326 : Buffer buffer;
327 : Page page;
328 : XLogRedoAction action;
329 :
330 0 : ptr += sizeof(spgxlogAddNode);
331 0 : innerTuple = ptr;
332 : /* the tuple is unaligned, so make a copy to access its header */
333 0 : memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
334 :
335 0 : fillFakeState(&state, xldata->stateSrc);
336 :
337 0 : if (!XLogRecHasBlockRef(record, 1))
338 : {
339 : /* update in place */
340 0 : Assert(xldata->parentBlk == -1);
341 0 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
342 : {
343 0 : page = BufferGetPage(buffer);
344 :
345 0 : PageIndexTupleDelete(page, xldata->offnum);
346 0 : if (PageAddItem(page, (Item) innerTuple, innerTupleHdr.size,
347 : xldata->offnum,
348 0 : false, false) != xldata->offnum)
349 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
350 : innerTupleHdr.size);
351 :
352 0 : PageSetLSN(page, lsn);
353 0 : MarkBufferDirty(buffer);
354 : }
355 0 : if (BufferIsValid(buffer))
356 0 : UnlockReleaseBuffer(buffer);
357 : }
358 : else
359 : {
360 : BlockNumber blkno;
361 : BlockNumber blknoNew;
362 :
363 0 : XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno);
364 0 : XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoNew);
365 :
366 : /*
367 : * In normal operation we would have all three pages (source, dest,
368 : * and parent) locked simultaneously; but in WAL replay it should be
369 : * safe to update them one at a time, as long as we do it in the right
370 : * order. We must insert the new tuple before replacing the old tuple
371 : * with the redirect tuple.
372 : */
373 :
374 : /* Install new tuple first so redirect is valid */
375 0 : if (xldata->newPage)
376 : {
377 : /* AddNode is not used for nulls pages */
378 0 : buffer = XLogInitBufferForRedo(record, 1);
379 0 : SpGistInitBuffer(buffer, 0);
380 0 : action = BLK_NEEDS_REDO;
381 : }
382 : else
383 0 : action = XLogReadBufferForRedo(record, 1, &buffer);
384 0 : if (action == BLK_NEEDS_REDO)
385 : {
386 0 : page = BufferGetPage(buffer);
387 :
388 0 : addOrReplaceTuple(page, (Item) innerTuple,
389 0 : innerTupleHdr.size, xldata->offnumNew);
390 :
391 : /*
392 : * If parent is in this same page, update it now.
393 : */
394 0 : if (xldata->parentBlk == 1)
395 : {
396 : SpGistInnerTuple parentTuple;
397 :
398 0 : parentTuple = (SpGistInnerTuple) PageGetItem(page,
399 : PageGetItemId(page, xldata->offnumParent));
400 :
401 0 : spgUpdateNodeLink(parentTuple, xldata->nodeI,
402 0 : blknoNew, xldata->offnumNew);
403 : }
404 0 : PageSetLSN(page, lsn);
405 0 : MarkBufferDirty(buffer);
406 : }
407 0 : if (BufferIsValid(buffer))
408 0 : UnlockReleaseBuffer(buffer);
409 :
410 : /* Delete old tuple, replacing it with redirect or placeholder tuple */
411 0 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
412 : {
413 : SpGistDeadTuple dt;
414 :
415 0 : page = BufferGetPage(buffer);
416 :
417 0 : if (state.isBuild)
418 0 : dt = spgFormDeadTuple(&state, SPGIST_PLACEHOLDER,
419 : InvalidBlockNumber,
420 : InvalidOffsetNumber);
421 : else
422 0 : dt = spgFormDeadTuple(&state, SPGIST_REDIRECT,
423 : blknoNew,
424 0 : xldata->offnumNew);
425 :
426 0 : PageIndexTupleDelete(page, xldata->offnum);
427 0 : if (PageAddItem(page, (Item) dt, dt->size,
428 : xldata->offnum,
429 0 : false, false) != xldata->offnum)
430 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
431 : dt->size);
432 :
433 0 : if (state.isBuild)
434 0 : SpGistPageGetOpaque(page)->nPlaceholder++;
435 : else
436 0 : SpGistPageGetOpaque(page)->nRedirection++;
437 :
438 : /*
439 : * If parent is in this same page, update it now.
440 : */
441 0 : if (xldata->parentBlk == 0)
442 : {
443 : SpGistInnerTuple parentTuple;
444 :
445 0 : parentTuple = (SpGistInnerTuple) PageGetItem(page,
446 : PageGetItemId(page, xldata->offnumParent));
447 :
448 0 : spgUpdateNodeLink(parentTuple, xldata->nodeI,
449 0 : blknoNew, xldata->offnumNew);
450 : }
451 0 : PageSetLSN(page, lsn);
452 0 : MarkBufferDirty(buffer);
453 : }
454 0 : if (BufferIsValid(buffer))
455 0 : UnlockReleaseBuffer(buffer);
456 :
457 : /*
458 : * Update parent downlink (if we didn't do it as part of the source or
459 : * destination page update already).
460 : */
461 0 : if (xldata->parentBlk == 2)
462 : {
463 0 : if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
464 : {
465 : SpGistInnerTuple parentTuple;
466 :
467 0 : page = BufferGetPage(buffer);
468 :
469 0 : parentTuple = (SpGistInnerTuple) PageGetItem(page,
470 : PageGetItemId(page, xldata->offnumParent));
471 :
472 0 : spgUpdateNodeLink(parentTuple, xldata->nodeI,
473 0 : blknoNew, xldata->offnumNew);
474 :
475 0 : PageSetLSN(page, lsn);
476 0 : MarkBufferDirty(buffer);
477 : }
478 0 : if (BufferIsValid(buffer))
479 0 : UnlockReleaseBuffer(buffer);
480 : }
481 : }
482 0 : }
483 :
484 : static void
485 0 : spgRedoSplitTuple(XLogReaderState *record)
486 : {
487 0 : XLogRecPtr lsn = record->EndRecPtr;
488 0 : char *ptr = XLogRecGetData(record);
489 0 : spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr;
490 : char *prefixTuple;
491 : SpGistInnerTupleData prefixTupleHdr;
492 : char *postfixTuple;
493 : SpGistInnerTupleData postfixTupleHdr;
494 : Buffer buffer;
495 : Page page;
496 : XLogRedoAction action;
497 :
498 0 : ptr += sizeof(spgxlogSplitTuple);
499 0 : prefixTuple = ptr;
500 : /* the prefix tuple is unaligned, so make a copy to access its header */
501 0 : memcpy(&prefixTupleHdr, prefixTuple, sizeof(SpGistInnerTupleData));
502 0 : ptr += prefixTupleHdr.size;
503 0 : postfixTuple = ptr;
504 : /* postfix tuple is also unaligned */
505 0 : memcpy(&postfixTupleHdr, postfixTuple, sizeof(SpGistInnerTupleData));
506 :
507 : /*
508 : * In normal operation we would have both pages locked simultaneously; but
509 : * in WAL replay it should be safe to update them one at a time, as long
510 : * as we do it in the right order.
511 : */
512 :
513 : /* insert postfix tuple first to avoid dangling link */
514 0 : if (!xldata->postfixBlkSame)
515 : {
516 0 : if (xldata->newPage)
517 : {
518 0 : buffer = XLogInitBufferForRedo(record, 1);
519 : /* SplitTuple is not used for nulls pages */
520 0 : SpGistInitBuffer(buffer, 0);
521 0 : action = BLK_NEEDS_REDO;
522 : }
523 : else
524 0 : action = XLogReadBufferForRedo(record, 1, &buffer);
525 0 : if (action == BLK_NEEDS_REDO)
526 : {
527 0 : page = BufferGetPage(buffer);
528 :
529 0 : addOrReplaceTuple(page, (Item) postfixTuple,
530 0 : postfixTupleHdr.size, xldata->offnumPostfix);
531 :
532 0 : PageSetLSN(page, lsn);
533 0 : MarkBufferDirty(buffer);
534 : }
535 0 : if (BufferIsValid(buffer))
536 0 : UnlockReleaseBuffer(buffer);
537 : }
538 :
539 : /* now handle the original page */
540 0 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
541 : {
542 0 : page = BufferGetPage(buffer);
543 :
544 0 : PageIndexTupleDelete(page, xldata->offnumPrefix);
545 0 : if (PageAddItem(page, (Item) prefixTuple, prefixTupleHdr.size,
546 0 : xldata->offnumPrefix, false, false) != xldata->offnumPrefix)
547 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
548 : prefixTupleHdr.size);
549 :
550 0 : if (xldata->postfixBlkSame)
551 0 : addOrReplaceTuple(page, (Item) postfixTuple,
552 0 : postfixTupleHdr.size,
553 0 : xldata->offnumPostfix);
554 :
555 0 : PageSetLSN(page, lsn);
556 0 : MarkBufferDirty(buffer);
557 : }
558 0 : if (BufferIsValid(buffer))
559 0 : UnlockReleaseBuffer(buffer);
560 0 : }
561 :
562 : static void
563 0 : spgRedoPickSplit(XLogReaderState *record)
564 : {
565 0 : XLogRecPtr lsn = record->EndRecPtr;
566 0 : char *ptr = XLogRecGetData(record);
567 0 : spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr;
568 : char *innerTuple;
569 : SpGistInnerTupleData innerTupleHdr;
570 : SpGistState state;
571 : OffsetNumber *toDelete;
572 : OffsetNumber *toInsert;
573 : uint8 *leafPageSelect;
574 : Buffer srcBuffer;
575 : Buffer destBuffer;
576 : Buffer innerBuffer;
577 : Page srcPage;
578 : Page destPage;
579 : Page page;
580 : int i;
581 : BlockNumber blknoInner;
582 : XLogRedoAction action;
583 :
584 0 : XLogRecGetBlockTag(record, 2, NULL, NULL, &blknoInner);
585 :
586 0 : fillFakeState(&state, xldata->stateSrc);
587 :
588 0 : ptr += SizeOfSpgxlogPickSplit;
589 0 : toDelete = (OffsetNumber *) ptr;
590 0 : ptr += sizeof(OffsetNumber) * xldata->nDelete;
591 0 : toInsert = (OffsetNumber *) ptr;
592 0 : ptr += sizeof(OffsetNumber) * xldata->nInsert;
593 0 : leafPageSelect = (uint8 *) ptr;
594 0 : ptr += sizeof(uint8) * xldata->nInsert;
595 :
596 0 : innerTuple = ptr;
597 : /* the inner tuple is unaligned, so make a copy to access its header */
598 0 : memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
599 0 : ptr += innerTupleHdr.size;
600 :
601 : /* now ptr points to the list of leaf tuples */
602 :
603 0 : if (xldata->isRootSplit)
604 : {
605 : /* when splitting root, we touch it only in the guise of new inner */
606 0 : srcBuffer = InvalidBuffer;
607 0 : srcPage = NULL;
608 : }
609 0 : else if (xldata->initSrc)
610 : {
611 : /* just re-init the source page */
612 0 : srcBuffer = XLogInitBufferForRedo(record, 0);
613 0 : srcPage = (Page) BufferGetPage(srcBuffer);
614 :
615 0 : SpGistInitBuffer(srcBuffer,
616 0 : SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
617 : /* don't update LSN etc till we're done with it */
618 : }
619 : else
620 : {
621 : /*
622 : * Delete the specified tuples from source page. (In case we're in
623 : * Hot Standby, we need to hold lock on the page till we're done
624 : * inserting leaf tuples and the new inner tuple, else the added
625 : * redirect tuple will be a dangling link.)
626 : */
627 0 : srcPage = NULL;
628 0 : if (XLogReadBufferForRedo(record, 0, &srcBuffer) == BLK_NEEDS_REDO)
629 : {
630 0 : srcPage = BufferGetPage(srcBuffer);
631 :
632 : /*
633 : * We have it a bit easier here than in doPickSplit(), because we
634 : * know the inner tuple's location already, so we can inject the
635 : * correct redirection tuple now.
636 : */
637 0 : if (!state.isBuild)
638 0 : spgPageIndexMultiDelete(&state, srcPage,
639 0 : toDelete, xldata->nDelete,
640 : SPGIST_REDIRECT,
641 : SPGIST_PLACEHOLDER,
642 : blknoInner,
643 0 : xldata->offnumInner);
644 : else
645 0 : spgPageIndexMultiDelete(&state, srcPage,
646 0 : toDelete, xldata->nDelete,
647 : SPGIST_PLACEHOLDER,
648 : SPGIST_PLACEHOLDER,
649 : InvalidBlockNumber,
650 : InvalidOffsetNumber);
651 :
652 : /* don't update LSN etc till we're done with it */
653 : }
654 : }
655 :
656 : /* try to access dest page if any */
657 0 : if (!XLogRecHasBlockRef(record, 1))
658 : {
659 0 : destBuffer = InvalidBuffer;
660 0 : destPage = NULL;
661 : }
662 0 : else if (xldata->initDest)
663 : {
664 : /* just re-init the dest page */
665 0 : destBuffer = XLogInitBufferForRedo(record, 1);
666 0 : destPage = (Page) BufferGetPage(destBuffer);
667 :
668 0 : SpGistInitBuffer(destBuffer,
669 0 : SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
670 : /* don't update LSN etc till we're done with it */
671 : }
672 : else
673 : {
674 : /*
675 : * We could probably release the page lock immediately in the
676 : * full-page-image case, but for safety let's hold it till later.
677 : */
678 0 : if (XLogReadBufferForRedo(record, 1, &destBuffer) == BLK_NEEDS_REDO)
679 0 : destPage = (Page) BufferGetPage(destBuffer);
680 : else
681 0 : destPage = NULL; /* don't do any page updates */
682 : }
683 :
684 : /* restore leaf tuples to src and/or dest page */
685 0 : for (i = 0; i < xldata->nInsert; i++)
686 : {
687 : char *leafTuple;
688 : SpGistLeafTupleData leafTupleHdr;
689 :
690 : /* the tuples are not aligned, so must copy to access the size field. */
691 0 : leafTuple = ptr;
692 0 : memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
693 0 : ptr += leafTupleHdr.size;
694 :
695 0 : page = leafPageSelect[i] ? destPage : srcPage;
696 0 : if (page == NULL)
697 0 : continue; /* no need to touch this page */
698 :
699 0 : addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
700 0 : toInsert[i]);
701 : }
702 :
703 : /* Now update src and dest page LSNs if needed */
704 0 : if (srcPage != NULL)
705 : {
706 0 : PageSetLSN(srcPage, lsn);
707 0 : MarkBufferDirty(srcBuffer);
708 : }
709 0 : if (destPage != NULL)
710 : {
711 0 : PageSetLSN(destPage, lsn);
712 0 : MarkBufferDirty(destBuffer);
713 : }
714 :
715 : /* restore new inner tuple */
716 0 : if (xldata->initInner)
717 : {
718 0 : innerBuffer = XLogInitBufferForRedo(record, 2);
719 0 : SpGistInitBuffer(innerBuffer, (xldata->storesNulls ? SPGIST_NULLS : 0));
720 0 : action = BLK_NEEDS_REDO;
721 : }
722 : else
723 0 : action = XLogReadBufferForRedo(record, 2, &innerBuffer);
724 :
725 0 : if (action == BLK_NEEDS_REDO)
726 : {
727 0 : page = BufferGetPage(innerBuffer);
728 :
729 0 : addOrReplaceTuple(page, (Item) innerTuple, innerTupleHdr.size,
730 0 : xldata->offnumInner);
731 :
732 : /* if inner is also parent, update link while we're here */
733 0 : if (xldata->innerIsParent)
734 : {
735 : SpGistInnerTuple parent;
736 :
737 0 : parent = (SpGistInnerTuple) PageGetItem(page,
738 : PageGetItemId(page, xldata->offnumParent));
739 0 : spgUpdateNodeLink(parent, xldata->nodeI,
740 0 : blknoInner, xldata->offnumInner);
741 : }
742 :
743 0 : PageSetLSN(page, lsn);
744 0 : MarkBufferDirty(innerBuffer);
745 : }
746 0 : if (BufferIsValid(innerBuffer))
747 0 : UnlockReleaseBuffer(innerBuffer);
748 :
749 : /*
750 : * Now we can release the leaf-page locks. It's okay to do this before
751 : * updating the parent downlink.
752 : */
753 0 : if (BufferIsValid(srcBuffer))
754 0 : UnlockReleaseBuffer(srcBuffer);
755 0 : if (BufferIsValid(destBuffer))
756 0 : UnlockReleaseBuffer(destBuffer);
757 :
758 : /* update parent downlink, unless we did it above */
759 0 : if (XLogRecHasBlockRef(record, 3))
760 : {
761 : Buffer parentBuffer;
762 :
763 0 : if (XLogReadBufferForRedo(record, 3, &parentBuffer) == BLK_NEEDS_REDO)
764 : {
765 : SpGistInnerTuple parent;
766 :
767 0 : page = BufferGetPage(parentBuffer);
768 :
769 0 : parent = (SpGistInnerTuple) PageGetItem(page,
770 : PageGetItemId(page, xldata->offnumParent));
771 0 : spgUpdateNodeLink(parent, xldata->nodeI,
772 0 : blknoInner, xldata->offnumInner);
773 :
774 0 : PageSetLSN(page, lsn);
775 0 : MarkBufferDirty(parentBuffer);
776 : }
777 0 : if (BufferIsValid(parentBuffer))
778 0 : UnlockReleaseBuffer(parentBuffer);
779 : }
780 : else
781 0 : Assert(xldata->innerIsParent || xldata->isRootSplit);
782 0 : }
783 :
784 : static void
785 0 : spgRedoVacuumLeaf(XLogReaderState *record)
786 : {
787 0 : XLogRecPtr lsn = record->EndRecPtr;
788 0 : char *ptr = XLogRecGetData(record);
789 0 : spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr;
790 : OffsetNumber *toDead;
791 : OffsetNumber *toPlaceholder;
792 : OffsetNumber *moveSrc;
793 : OffsetNumber *moveDest;
794 : OffsetNumber *chainSrc;
795 : OffsetNumber *chainDest;
796 : SpGistState state;
797 : Buffer buffer;
798 : Page page;
799 : int i;
800 :
801 0 : fillFakeState(&state, xldata->stateSrc);
802 :
803 0 : ptr += SizeOfSpgxlogVacuumLeaf;
804 0 : toDead = (OffsetNumber *) ptr;
805 0 : ptr += sizeof(OffsetNumber) * xldata->nDead;
806 0 : toPlaceholder = (OffsetNumber *) ptr;
807 0 : ptr += sizeof(OffsetNumber) * xldata->nPlaceholder;
808 0 : moveSrc = (OffsetNumber *) ptr;
809 0 : ptr += sizeof(OffsetNumber) * xldata->nMove;
810 0 : moveDest = (OffsetNumber *) ptr;
811 0 : ptr += sizeof(OffsetNumber) * xldata->nMove;
812 0 : chainSrc = (OffsetNumber *) ptr;
813 0 : ptr += sizeof(OffsetNumber) * xldata->nChain;
814 0 : chainDest = (OffsetNumber *) ptr;
815 :
816 0 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
817 : {
818 0 : page = BufferGetPage(buffer);
819 :
820 0 : spgPageIndexMultiDelete(&state, page,
821 0 : toDead, xldata->nDead,
822 : SPGIST_DEAD, SPGIST_DEAD,
823 : InvalidBlockNumber,
824 : InvalidOffsetNumber);
825 :
826 0 : spgPageIndexMultiDelete(&state, page,
827 0 : toPlaceholder, xldata->nPlaceholder,
828 : SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
829 : InvalidBlockNumber,
830 : InvalidOffsetNumber);
831 :
832 : /* see comments in vacuumLeafPage() */
833 0 : for (i = 0; i < xldata->nMove; i++)
834 : {
835 0 : ItemId idSrc = PageGetItemId(page, moveSrc[i]);
836 0 : ItemId idDest = PageGetItemId(page, moveDest[i]);
837 : ItemIdData tmp;
838 :
839 0 : tmp = *idSrc;
840 0 : *idSrc = *idDest;
841 0 : *idDest = tmp;
842 : }
843 :
844 0 : spgPageIndexMultiDelete(&state, page,
845 0 : moveSrc, xldata->nMove,
846 : SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
847 : InvalidBlockNumber,
848 : InvalidOffsetNumber);
849 :
850 0 : for (i = 0; i < xldata->nChain; i++)
851 : {
852 : SpGistLeafTuple lt;
853 :
854 0 : lt = (SpGistLeafTuple) PageGetItem(page,
855 : PageGetItemId(page, chainSrc[i]));
856 0 : Assert(lt->tupstate == SPGIST_LIVE);
857 0 : lt->nextOffset = chainDest[i];
858 : }
859 :
860 0 : PageSetLSN(page, lsn);
861 0 : MarkBufferDirty(buffer);
862 : }
863 0 : if (BufferIsValid(buffer))
864 0 : UnlockReleaseBuffer(buffer);
865 0 : }
866 :
867 : static void
868 0 : spgRedoVacuumRoot(XLogReaderState *record)
869 : {
870 0 : XLogRecPtr lsn = record->EndRecPtr;
871 0 : char *ptr = XLogRecGetData(record);
872 0 : spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr;
873 : OffsetNumber *toDelete;
874 : Buffer buffer;
875 : Page page;
876 :
877 0 : toDelete = xldata->offsets;
878 :
879 0 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
880 : {
881 0 : page = BufferGetPage(buffer);
882 :
883 : /* The tuple numbers are in order */
884 0 : PageIndexMultiDelete(page, toDelete, xldata->nDelete);
885 :
886 0 : PageSetLSN(page, lsn);
887 0 : MarkBufferDirty(buffer);
888 : }
889 0 : if (BufferIsValid(buffer))
890 0 : UnlockReleaseBuffer(buffer);
891 0 : }
892 :
893 : static void
894 0 : spgRedoVacuumRedirect(XLogReaderState *record)
895 : {
896 0 : XLogRecPtr lsn = record->EndRecPtr;
897 0 : char *ptr = XLogRecGetData(record);
898 0 : spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *) ptr;
899 : OffsetNumber *itemToPlaceholder;
900 : Buffer buffer;
901 :
902 0 : itemToPlaceholder = xldata->offsets;
903 :
904 : /*
905 : * If any redirection tuples are being removed, make sure there are no
906 : * live Hot Standby transactions that might need to see them.
907 : */
908 0 : if (InHotStandby)
909 : {
910 0 : if (TransactionIdIsValid(xldata->newestRedirectXid))
911 : {
912 : RelFileNode node;
913 :
914 0 : XLogRecGetBlockTag(record, 0, &node, NULL, NULL);
915 0 : ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid,
916 : node);
917 : }
918 : }
919 :
920 0 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
921 : {
922 0 : Page page = BufferGetPage(buffer);
923 0 : SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
924 : int i;
925 :
926 : /* Convert redirect pointers to plain placeholders */
927 0 : for (i = 0; i < xldata->nToPlaceholder; i++)
928 : {
929 : SpGistDeadTuple dt;
930 :
931 0 : dt = (SpGistDeadTuple) PageGetItem(page,
932 : PageGetItemId(page, itemToPlaceholder[i]));
933 0 : Assert(dt->tupstate == SPGIST_REDIRECT);
934 0 : dt->tupstate = SPGIST_PLACEHOLDER;
935 0 : ItemPointerSetInvalid(&dt->pointer);
936 : }
937 :
938 0 : Assert(opaque->nRedirection >= xldata->nToPlaceholder);
939 0 : opaque->nRedirection -= xldata->nToPlaceholder;
940 0 : opaque->nPlaceholder += xldata->nToPlaceholder;
941 :
942 : /* Remove placeholder tuples at end of page */
943 0 : if (xldata->firstPlaceholder != InvalidOffsetNumber)
944 : {
945 0 : int max = PageGetMaxOffsetNumber(page);
946 : OffsetNumber *toDelete;
947 :
948 0 : toDelete = palloc(sizeof(OffsetNumber) * max);
949 :
950 0 : for (i = xldata->firstPlaceholder; i <= max; i++)
951 0 : toDelete[i - xldata->firstPlaceholder] = i;
952 :
953 0 : i = max - xldata->firstPlaceholder + 1;
954 0 : Assert(opaque->nPlaceholder >= i);
955 0 : opaque->nPlaceholder -= i;
956 :
957 : /* The array is sorted, so can use PageIndexMultiDelete */
958 0 : PageIndexMultiDelete(page, toDelete, i);
959 :
960 0 : pfree(toDelete);
961 : }
962 :
963 0 : PageSetLSN(page, lsn);
964 0 : MarkBufferDirty(buffer);
965 : }
966 0 : if (BufferIsValid(buffer))
967 0 : UnlockReleaseBuffer(buffer);
968 0 : }
969 :
970 : void
971 0 : spg_redo(XLogReaderState *record)
972 : {
973 0 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
974 : MemoryContext oldCxt;
975 :
976 0 : oldCxt = MemoryContextSwitchTo(opCtx);
977 0 : switch (info)
978 : {
979 : case XLOG_SPGIST_CREATE_INDEX:
980 0 : spgRedoCreateIndex(record);
981 0 : break;
982 : case XLOG_SPGIST_ADD_LEAF:
983 0 : spgRedoAddLeaf(record);
984 0 : break;
985 : case XLOG_SPGIST_MOVE_LEAFS:
986 0 : spgRedoMoveLeafs(record);
987 0 : break;
988 : case XLOG_SPGIST_ADD_NODE:
989 0 : spgRedoAddNode(record);
990 0 : break;
991 : case XLOG_SPGIST_SPLIT_TUPLE:
992 0 : spgRedoSplitTuple(record);
993 0 : break;
994 : case XLOG_SPGIST_PICKSPLIT:
995 0 : spgRedoPickSplit(record);
996 0 : break;
997 : case XLOG_SPGIST_VACUUM_LEAF:
998 0 : spgRedoVacuumLeaf(record);
999 0 : break;
1000 : case XLOG_SPGIST_VACUUM_ROOT:
1001 0 : spgRedoVacuumRoot(record);
1002 0 : break;
1003 : case XLOG_SPGIST_VACUUM_REDIRECT:
1004 0 : spgRedoVacuumRedirect(record);
1005 0 : break;
1006 : default:
1007 0 : elog(PANIC, "spg_redo: unknown op code %u", info);
1008 : }
1009 :
1010 0 : MemoryContextSwitchTo(oldCxt);
1011 0 : MemoryContextReset(opCtx);
1012 0 : }
1013 :
1014 : void
1015 0 : spg_xlog_startup(void)
1016 : {
1017 0 : opCtx = AllocSetContextCreate(CurrentMemoryContext,
1018 : "SP-GiST temporary context",
1019 : ALLOCSET_DEFAULT_SIZES);
1020 0 : }
1021 :
1022 : void
1023 0 : spg_xlog_cleanup(void)
1024 : {
1025 0 : MemoryContextDelete(opCtx);
1026 0 : opCtx = NULL;
1027 0 : }
1028 :
1029 : /*
1030 : * Mask a SpGist page before performing consistency checks on it.
1031 : */
1032 : void
1033 0 : spg_mask(char *pagedata, BlockNumber blkno)
1034 : {
1035 0 : Page page = (Page) pagedata;
1036 :
1037 0 : mask_page_lsn(page);
1038 :
1039 0 : mask_page_hint_bits(page);
1040 :
1041 : /*
1042 : * Any SpGist page other than meta contains unused space which needs to be
1043 : * masked.
1044 : */
1045 0 : if (!SpGistPageIsMeta(page))
1046 0 : mask_unused_space(page);
1047 0 : }
|