Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * ginxlog.c
4 : * WAL replay logic for inverted index.
5 : *
6 : *
7 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994, Regents of the University of California
9 : *
10 : * IDENTIFICATION
11 : * src/backend/access/gin/ginxlog.c
12 : *-------------------------------------------------------------------------
13 : */
14 : #include "postgres.h"
15 :
16 : #include "access/bufmask.h"
17 : #include "access/gin_private.h"
18 : #include "access/ginxlog.h"
19 : #include "access/xlogutils.h"
20 : #include "utils/memutils.h"
21 :
22 : static MemoryContext opCtx; /* working memory for operations */
23 :
24 : static void
25 0 : ginRedoClearIncompleteSplit(XLogReaderState *record, uint8 block_id)
26 : {
27 0 : XLogRecPtr lsn = record->EndRecPtr;
28 : Buffer buffer;
29 : Page page;
30 :
31 0 : if (XLogReadBufferForRedo(record, block_id, &buffer) == BLK_NEEDS_REDO)
32 : {
33 0 : page = (Page) BufferGetPage(buffer);
34 0 : GinPageGetOpaque(page)->flags &= ~GIN_INCOMPLETE_SPLIT;
35 :
36 0 : PageSetLSN(page, lsn);
37 0 : MarkBufferDirty(buffer);
38 : }
39 0 : if (BufferIsValid(buffer))
40 0 : UnlockReleaseBuffer(buffer);
41 0 : }
42 :
43 : static void
44 0 : ginRedoCreateIndex(XLogReaderState *record)
45 : {
46 0 : XLogRecPtr lsn = record->EndRecPtr;
47 : Buffer RootBuffer,
48 : MetaBuffer;
49 : Page page;
50 :
51 0 : MetaBuffer = XLogInitBufferForRedo(record, 0);
52 0 : Assert(BufferGetBlockNumber(MetaBuffer) == GIN_METAPAGE_BLKNO);
53 0 : page = (Page) BufferGetPage(MetaBuffer);
54 :
55 0 : GinInitMetabuffer(MetaBuffer);
56 :
57 0 : PageSetLSN(page, lsn);
58 0 : MarkBufferDirty(MetaBuffer);
59 :
60 0 : RootBuffer = XLogInitBufferForRedo(record, 1);
61 0 : Assert(BufferGetBlockNumber(RootBuffer) == GIN_ROOT_BLKNO);
62 0 : page = (Page) BufferGetPage(RootBuffer);
63 :
64 0 : GinInitBuffer(RootBuffer, GIN_LEAF);
65 :
66 0 : PageSetLSN(page, lsn);
67 0 : MarkBufferDirty(RootBuffer);
68 :
69 0 : UnlockReleaseBuffer(RootBuffer);
70 0 : UnlockReleaseBuffer(MetaBuffer);
71 0 : }
72 :
73 : static void
74 0 : ginRedoCreatePTree(XLogReaderState *record)
75 : {
76 0 : XLogRecPtr lsn = record->EndRecPtr;
77 0 : ginxlogCreatePostingTree *data = (ginxlogCreatePostingTree *) XLogRecGetData(record);
78 : char *ptr;
79 : Buffer buffer;
80 : Page page;
81 :
82 0 : buffer = XLogInitBufferForRedo(record, 0);
83 0 : page = (Page) BufferGetPage(buffer);
84 :
85 0 : GinInitBuffer(buffer, GIN_DATA | GIN_LEAF | GIN_COMPRESSED);
86 :
87 0 : ptr = XLogRecGetData(record) + sizeof(ginxlogCreatePostingTree);
88 :
89 : /* Place page data */
90 0 : memcpy(GinDataLeafPageGetPostingList(page), ptr, data->size);
91 :
92 0 : GinDataPageSetDataSize(page, data->size);
93 :
94 0 : PageSetLSN(page, lsn);
95 :
96 0 : MarkBufferDirty(buffer);
97 0 : UnlockReleaseBuffer(buffer);
98 0 : }
99 :
100 : static void
101 0 : ginRedoInsertEntry(Buffer buffer, bool isLeaf, BlockNumber rightblkno, void *rdata)
102 : {
103 0 : Page page = BufferGetPage(buffer);
104 0 : ginxlogInsertEntry *data = (ginxlogInsertEntry *) rdata;
105 0 : OffsetNumber offset = data->offset;
106 : IndexTuple itup;
107 :
108 0 : if (rightblkno != InvalidBlockNumber)
109 : {
110 : /* update link to right page after split */
111 0 : Assert(!GinPageIsLeaf(page));
112 0 : Assert(offset >= FirstOffsetNumber && offset <= PageGetMaxOffsetNumber(page));
113 0 : itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offset));
114 0 : GinSetDownlink(itup, rightblkno);
115 : }
116 :
117 0 : if (data->isDelete)
118 : {
119 0 : Assert(GinPageIsLeaf(page));
120 0 : Assert(offset >= FirstOffsetNumber && offset <= PageGetMaxOffsetNumber(page));
121 0 : PageIndexTupleDelete(page, offset);
122 : }
123 :
124 0 : itup = &data->tuple;
125 :
126 0 : if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), offset, false, false) == InvalidOffsetNumber)
127 : {
128 : RelFileNode node;
129 : ForkNumber forknum;
130 : BlockNumber blknum;
131 :
132 0 : BufferGetTag(buffer, &node, &forknum, &blknum);
133 0 : elog(ERROR, "failed to add item to index page in %u/%u/%u",
134 : node.spcNode, node.dbNode, node.relNode);
135 : }
136 0 : }
137 :
138 : static void
139 0 : ginRedoRecompress(Page page, ginxlogRecompressDataLeaf *data)
140 : {
141 : int actionno;
142 : int segno;
143 : GinPostingList *oldseg;
144 : Pointer segmentend;
145 : char *walbuf;
146 : int totalsize;
147 :
148 : /*
149 : * If the page is in pre-9.4 format, convert to new format first.
150 : */
151 0 : if (!GinPageIsCompressed(page))
152 : {
153 0 : ItemPointer uncompressed = (ItemPointer) GinDataPageGetData(page);
154 0 : int nuncompressed = GinPageGetOpaque(page)->maxoff;
155 : int npacked;
156 : GinPostingList *plist;
157 :
158 0 : plist = ginCompressPostingList(uncompressed, nuncompressed,
159 : BLCKSZ, &npacked);
160 0 : Assert(npacked == nuncompressed);
161 :
162 0 : totalsize = SizeOfGinPostingList(plist);
163 :
164 0 : memcpy(GinDataLeafPageGetPostingList(page), plist, totalsize);
165 0 : GinDataPageSetDataSize(page, totalsize);
166 0 : GinPageSetCompressed(page);
167 0 : GinPageGetOpaque(page)->maxoff = InvalidOffsetNumber;
168 : }
169 :
170 0 : oldseg = GinDataLeafPageGetPostingList(page);
171 0 : segmentend = (Pointer) oldseg + GinDataLeafPageGetPostingListSize(page);
172 0 : segno = 0;
173 :
174 0 : walbuf = ((char *) data) + sizeof(ginxlogRecompressDataLeaf);
175 0 : for (actionno = 0; actionno < data->nactions; actionno++)
176 : {
177 0 : uint8 a_segno = *((uint8 *) (walbuf++));
178 0 : uint8 a_action = *((uint8 *) (walbuf++));
179 0 : GinPostingList *newseg = NULL;
180 0 : int newsegsize = 0;
181 0 : ItemPointerData *items = NULL;
182 0 : uint16 nitems = 0;
183 : ItemPointerData *olditems;
184 : int nolditems;
185 : ItemPointerData *newitems;
186 : int nnewitems;
187 : int segsize;
188 : Pointer segptr;
189 : int szleft;
190 :
191 : /* Extract all the information we need from the WAL record */
192 0 : if (a_action == GIN_SEGMENT_INSERT ||
193 : a_action == GIN_SEGMENT_REPLACE)
194 : {
195 0 : newseg = (GinPostingList *) walbuf;
196 0 : newsegsize = SizeOfGinPostingList(newseg);
197 0 : walbuf += SHORTALIGN(newsegsize);
198 : }
199 :
200 0 : if (a_action == GIN_SEGMENT_ADDITEMS)
201 : {
202 0 : memcpy(&nitems, walbuf, sizeof(uint16));
203 0 : walbuf += sizeof(uint16);
204 0 : items = (ItemPointerData *) walbuf;
205 0 : walbuf += nitems * sizeof(ItemPointerData);
206 : }
207 :
208 : /* Skip to the segment that this action concerns */
209 0 : Assert(segno <= a_segno);
210 0 : while (segno < a_segno)
211 : {
212 0 : oldseg = GinNextPostingListSegment(oldseg);
213 0 : segno++;
214 : }
215 :
216 : /*
217 : * ADDITEMS action is handled like REPLACE, but the new segment to
218 : * replace the old one is reconstructed using the old segment from
219 : * disk and the new items from the WAL record.
220 : */
221 0 : if (a_action == GIN_SEGMENT_ADDITEMS)
222 : {
223 : int npacked;
224 :
225 0 : olditems = ginPostingListDecode(oldseg, &nolditems);
226 :
227 0 : newitems = ginMergeItemPointers(items, nitems,
228 : olditems, nolditems,
229 : &nnewitems);
230 0 : Assert(nnewitems == nolditems + nitems);
231 :
232 0 : newseg = ginCompressPostingList(newitems, nnewitems,
233 : BLCKSZ, &npacked);
234 0 : Assert(npacked == nnewitems);
235 :
236 0 : newsegsize = SizeOfGinPostingList(newseg);
237 0 : a_action = GIN_SEGMENT_REPLACE;
238 : }
239 :
240 0 : segptr = (Pointer) oldseg;
241 0 : if (segptr != segmentend)
242 0 : segsize = SizeOfGinPostingList(oldseg);
243 : else
244 : {
245 : /*
246 : * Positioned after the last existing segment. Only INSERTs
247 : * expected here.
248 : */
249 0 : Assert(a_action == GIN_SEGMENT_INSERT);
250 0 : segsize = 0;
251 : }
252 0 : szleft = segmentend - segptr;
253 :
254 0 : switch (a_action)
255 : {
256 : case GIN_SEGMENT_DELETE:
257 0 : memmove(segptr, segptr + segsize, szleft - segsize);
258 0 : segmentend -= segsize;
259 :
260 0 : segno++;
261 0 : break;
262 :
263 : case GIN_SEGMENT_INSERT:
264 : /* make room for the new segment */
265 0 : memmove(segptr + newsegsize, segptr, szleft);
266 : /* copy the new segment in place */
267 0 : memcpy(segptr, newseg, newsegsize);
268 0 : segmentend += newsegsize;
269 0 : segptr += newsegsize;
270 0 : break;
271 :
272 : case GIN_SEGMENT_REPLACE:
273 : /* shift the segments that follow */
274 0 : memmove(segptr + newsegsize,
275 : segptr + segsize,
276 0 : szleft - segsize);
277 : /* copy the replacement segment in place */
278 0 : memcpy(segptr, newseg, newsegsize);
279 0 : segmentend -= segsize;
280 0 : segmentend += newsegsize;
281 0 : segptr += newsegsize;
282 0 : segno++;
283 0 : break;
284 :
285 : default:
286 0 : elog(ERROR, "unexpected GIN leaf action: %u", a_action);
287 : }
288 0 : oldseg = (GinPostingList *) segptr;
289 : }
290 :
291 0 : totalsize = segmentend - (Pointer) GinDataLeafPageGetPostingList(page);
292 0 : GinDataPageSetDataSize(page, totalsize);
293 0 : }
294 :
295 : static void
296 0 : ginRedoInsertData(Buffer buffer, bool isLeaf, BlockNumber rightblkno, void *rdata)
297 : {
298 0 : Page page = BufferGetPage(buffer);
299 :
300 0 : if (isLeaf)
301 : {
302 0 : ginxlogRecompressDataLeaf *data = (ginxlogRecompressDataLeaf *) rdata;
303 :
304 0 : Assert(GinPageIsLeaf(page));
305 :
306 0 : ginRedoRecompress(page, data);
307 : }
308 : else
309 : {
310 0 : ginxlogInsertDataInternal *data = (ginxlogInsertDataInternal *) rdata;
311 : PostingItem *oldpitem;
312 :
313 0 : Assert(!GinPageIsLeaf(page));
314 :
315 : /* update link to right page after split */
316 0 : oldpitem = GinDataPageGetPostingItem(page, data->offset);
317 0 : PostingItemSetBlockNumber(oldpitem, rightblkno);
318 :
319 0 : GinDataPageAddPostingItem(page, &data->newitem, data->offset);
320 : }
321 0 : }
322 :
323 : static void
324 0 : ginRedoInsert(XLogReaderState *record)
325 : {
326 0 : XLogRecPtr lsn = record->EndRecPtr;
327 0 : ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record);
328 : Buffer buffer;
329 : #ifdef NOT_USED
330 : BlockNumber leftChildBlkno = InvalidBlockNumber;
331 : #endif
332 0 : BlockNumber rightChildBlkno = InvalidBlockNumber;
333 0 : bool isLeaf = (data->flags & GIN_INSERT_ISLEAF) != 0;
334 :
335 : /*
336 : * First clear incomplete-split flag on child page if this finishes a
337 : * split.
338 : */
339 0 : if (!isLeaf)
340 : {
341 0 : char *payload = XLogRecGetData(record) + sizeof(ginxlogInsert);
342 :
343 : #ifdef NOT_USED
344 : leftChildBlkno = BlockIdGetBlockNumber((BlockId) payload);
345 : #endif
346 0 : payload += sizeof(BlockIdData);
347 0 : rightChildBlkno = BlockIdGetBlockNumber((BlockId) payload);
348 0 : payload += sizeof(BlockIdData);
349 :
350 0 : ginRedoClearIncompleteSplit(record, 1);
351 : }
352 :
353 0 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
354 : {
355 0 : Page page = BufferGetPage(buffer);
356 : Size len;
357 0 : char *payload = XLogRecGetBlockData(record, 0, &len);
358 :
359 : /* How to insert the payload is tree-type specific */
360 0 : if (data->flags & GIN_INSERT_ISDATA)
361 : {
362 0 : Assert(GinPageIsData(page));
363 0 : ginRedoInsertData(buffer, isLeaf, rightChildBlkno, payload);
364 : }
365 : else
366 : {
367 0 : Assert(!GinPageIsData(page));
368 0 : ginRedoInsertEntry(buffer, isLeaf, rightChildBlkno, payload);
369 : }
370 :
371 0 : PageSetLSN(page, lsn);
372 0 : MarkBufferDirty(buffer);
373 : }
374 0 : if (BufferIsValid(buffer))
375 0 : UnlockReleaseBuffer(buffer);
376 0 : }
377 :
378 : static void
379 0 : ginRedoSplit(XLogReaderState *record)
380 : {
381 0 : ginxlogSplit *data = (ginxlogSplit *) XLogRecGetData(record);
382 : Buffer lbuffer,
383 : rbuffer,
384 : rootbuf;
385 0 : bool isLeaf = (data->flags & GIN_INSERT_ISLEAF) != 0;
386 0 : bool isRoot = (data->flags & GIN_SPLIT_ROOT) != 0;
387 :
388 : /*
389 : * First clear incomplete-split flag on child page if this finishes a
390 : * split
391 : */
392 0 : if (!isLeaf)
393 0 : ginRedoClearIncompleteSplit(record, 3);
394 :
395 0 : if (XLogReadBufferForRedo(record, 0, &lbuffer) != BLK_RESTORED)
396 0 : elog(ERROR, "GIN split record did not contain a full-page image of left page");
397 :
398 0 : if (XLogReadBufferForRedo(record, 1, &rbuffer) != BLK_RESTORED)
399 0 : elog(ERROR, "GIN split record did not contain a full-page image of right page");
400 :
401 0 : if (isRoot)
402 : {
403 0 : if (XLogReadBufferForRedo(record, 2, &rootbuf) != BLK_RESTORED)
404 0 : elog(ERROR, "GIN split record did not contain a full-page image of root page");
405 0 : UnlockReleaseBuffer(rootbuf);
406 : }
407 :
408 0 : UnlockReleaseBuffer(rbuffer);
409 0 : UnlockReleaseBuffer(lbuffer);
410 0 : }
411 :
412 : /*
413 : * VACUUM_PAGE record contains simply a full image of the page, similar to
414 : * an XLOG_FPI record.
415 : */
416 : static void
417 0 : ginRedoVacuumPage(XLogReaderState *record)
418 : {
419 : Buffer buffer;
420 :
421 0 : if (XLogReadBufferForRedo(record, 0, &buffer) != BLK_RESTORED)
422 : {
423 0 : elog(ERROR, "replay of gin entry tree page vacuum did not restore the page");
424 : }
425 0 : UnlockReleaseBuffer(buffer);
426 0 : }
427 :
428 : static void
429 0 : ginRedoVacuumDataLeafPage(XLogReaderState *record)
430 : {
431 0 : XLogRecPtr lsn = record->EndRecPtr;
432 : Buffer buffer;
433 :
434 0 : if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
435 : {
436 0 : Page page = BufferGetPage(buffer);
437 : Size len;
438 : ginxlogVacuumDataLeafPage *xlrec;
439 :
440 0 : xlrec = (ginxlogVacuumDataLeafPage *) XLogRecGetBlockData(record, 0, &len);
441 :
442 0 : Assert(GinPageIsLeaf(page));
443 0 : Assert(GinPageIsData(page));
444 :
445 0 : ginRedoRecompress(page, &xlrec->data);
446 0 : PageSetLSN(page, lsn);
447 0 : MarkBufferDirty(buffer);
448 : }
449 0 : if (BufferIsValid(buffer))
450 0 : UnlockReleaseBuffer(buffer);
451 0 : }
452 :
453 : static void
454 0 : ginRedoDeletePage(XLogReaderState *record)
455 : {
456 0 : XLogRecPtr lsn = record->EndRecPtr;
457 0 : ginxlogDeletePage *data = (ginxlogDeletePage *) XLogRecGetData(record);
458 : Buffer dbuffer;
459 : Buffer pbuffer;
460 : Buffer lbuffer;
461 : Page page;
462 :
463 0 : if (XLogReadBufferForRedo(record, 0, &dbuffer) == BLK_NEEDS_REDO)
464 : {
465 0 : page = BufferGetPage(dbuffer);
466 0 : Assert(GinPageIsData(page));
467 0 : GinPageGetOpaque(page)->flags = GIN_DELETED;
468 0 : PageSetLSN(page, lsn);
469 0 : MarkBufferDirty(dbuffer);
470 : }
471 :
472 0 : if (XLogReadBufferForRedo(record, 1, &pbuffer) == BLK_NEEDS_REDO)
473 : {
474 0 : page = BufferGetPage(pbuffer);
475 0 : Assert(GinPageIsData(page));
476 0 : Assert(!GinPageIsLeaf(page));
477 0 : GinPageDeletePostingItem(page, data->parentOffset);
478 0 : PageSetLSN(page, lsn);
479 0 : MarkBufferDirty(pbuffer);
480 : }
481 :
482 0 : if (XLogReadBufferForRedo(record, 2, &lbuffer) == BLK_NEEDS_REDO)
483 : {
484 0 : page = BufferGetPage(lbuffer);
485 0 : Assert(GinPageIsData(page));
486 0 : GinPageGetOpaque(page)->rightlink = data->rightLink;
487 0 : PageSetLSN(page, lsn);
488 0 : MarkBufferDirty(lbuffer);
489 : }
490 :
491 0 : if (BufferIsValid(lbuffer))
492 0 : UnlockReleaseBuffer(lbuffer);
493 0 : if (BufferIsValid(pbuffer))
494 0 : UnlockReleaseBuffer(pbuffer);
495 0 : if (BufferIsValid(dbuffer))
496 0 : UnlockReleaseBuffer(dbuffer);
497 0 : }
498 :
499 : static void
500 0 : ginRedoUpdateMetapage(XLogReaderState *record)
501 : {
502 0 : XLogRecPtr lsn = record->EndRecPtr;
503 0 : ginxlogUpdateMeta *data = (ginxlogUpdateMeta *) XLogRecGetData(record);
504 : Buffer metabuffer;
505 : Page metapage;
506 : Buffer buffer;
507 :
508 : /*
509 : * Restore the metapage. This is essentially the same as a full-page
510 : * image, so restore the metapage unconditionally without looking at the
511 : * LSN, to avoid torn page hazards.
512 : */
513 0 : metabuffer = XLogInitBufferForRedo(record, 0);
514 0 : Assert(BufferGetBlockNumber(metabuffer) == GIN_METAPAGE_BLKNO);
515 0 : metapage = BufferGetPage(metabuffer);
516 :
517 0 : GinInitPage(metapage, GIN_META, BufferGetPageSize(metabuffer));
518 0 : memcpy(GinPageGetMeta(metapage), &data->metadata, sizeof(GinMetaPageData));
519 0 : PageSetLSN(metapage, lsn);
520 0 : MarkBufferDirty(metabuffer);
521 :
522 0 : if (data->ntuples > 0)
523 : {
524 : /*
525 : * insert into tail page
526 : */
527 0 : if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
528 : {
529 0 : Page page = BufferGetPage(buffer);
530 : OffsetNumber off;
531 : int i;
532 : Size tupsize;
533 : char *payload;
534 : IndexTuple tuples;
535 : Size totaltupsize;
536 :
537 0 : payload = XLogRecGetBlockData(record, 1, &totaltupsize);
538 0 : tuples = (IndexTuple) payload;
539 :
540 0 : if (PageIsEmpty(page))
541 0 : off = FirstOffsetNumber;
542 : else
543 0 : off = OffsetNumberNext(PageGetMaxOffsetNumber(page));
544 :
545 0 : for (i = 0; i < data->ntuples; i++)
546 : {
547 0 : tupsize = IndexTupleSize(tuples);
548 :
549 0 : if (PageAddItem(page, (Item) tuples, tupsize, off,
550 : false, false) == InvalidOffsetNumber)
551 0 : elog(ERROR, "failed to add item to index page");
552 :
553 0 : tuples = (IndexTuple) (((char *) tuples) + tupsize);
554 :
555 0 : off++;
556 : }
557 0 : Assert(payload + totaltupsize == (char *) tuples);
558 :
559 : /*
560 : * Increase counter of heap tuples
561 : */
562 0 : GinPageGetOpaque(page)->maxoff++;
563 :
564 0 : PageSetLSN(page, lsn);
565 0 : MarkBufferDirty(buffer);
566 : }
567 0 : if (BufferIsValid(buffer))
568 0 : UnlockReleaseBuffer(buffer);
569 : }
570 0 : else if (data->prevTail != InvalidBlockNumber)
571 : {
572 : /*
573 : * New tail
574 : */
575 0 : if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
576 : {
577 0 : Page page = BufferGetPage(buffer);
578 :
579 0 : GinPageGetOpaque(page)->rightlink = data->newRightlink;
580 :
581 0 : PageSetLSN(page, lsn);
582 0 : MarkBufferDirty(buffer);
583 : }
584 0 : if (BufferIsValid(buffer))
585 0 : UnlockReleaseBuffer(buffer);
586 : }
587 :
588 0 : UnlockReleaseBuffer(metabuffer);
589 0 : }
590 :
591 : static void
592 0 : ginRedoInsertListPage(XLogReaderState *record)
593 : {
594 0 : XLogRecPtr lsn = record->EndRecPtr;
595 0 : ginxlogInsertListPage *data = (ginxlogInsertListPage *) XLogRecGetData(record);
596 : Buffer buffer;
597 : Page page;
598 : OffsetNumber l,
599 0 : off = FirstOffsetNumber;
600 : int i,
601 : tupsize;
602 : char *payload;
603 : IndexTuple tuples;
604 : Size totaltupsize;
605 :
606 : /* We always re-initialize the page. */
607 0 : buffer = XLogInitBufferForRedo(record, 0);
608 0 : page = BufferGetPage(buffer);
609 :
610 0 : GinInitBuffer(buffer, GIN_LIST);
611 0 : GinPageGetOpaque(page)->rightlink = data->rightlink;
612 0 : if (data->rightlink == InvalidBlockNumber)
613 : {
614 : /* tail of sublist */
615 0 : GinPageSetFullRow(page);
616 0 : GinPageGetOpaque(page)->maxoff = 1;
617 : }
618 : else
619 : {
620 0 : GinPageGetOpaque(page)->maxoff = 0;
621 : }
622 :
623 0 : payload = XLogRecGetBlockData(record, 0, &totaltupsize);
624 :
625 0 : tuples = (IndexTuple) payload;
626 0 : for (i = 0; i < data->ntuples; i++)
627 : {
628 0 : tupsize = IndexTupleSize(tuples);
629 :
630 0 : l = PageAddItem(page, (Item) tuples, tupsize, off, false, false);
631 :
632 0 : if (l == InvalidOffsetNumber)
633 0 : elog(ERROR, "failed to add item to index page");
634 :
635 0 : tuples = (IndexTuple) (((char *) tuples) + tupsize);
636 0 : off++;
637 : }
638 0 : Assert((char *) tuples == payload + totaltupsize);
639 :
640 0 : PageSetLSN(page, lsn);
641 0 : MarkBufferDirty(buffer);
642 :
643 0 : UnlockReleaseBuffer(buffer);
644 0 : }
645 :
646 : static void
647 0 : ginRedoDeleteListPages(XLogReaderState *record)
648 : {
649 0 : XLogRecPtr lsn = record->EndRecPtr;
650 0 : ginxlogDeleteListPages *data = (ginxlogDeleteListPages *) XLogRecGetData(record);
651 : Buffer metabuffer;
652 : Page metapage;
653 : int i;
654 :
655 0 : metabuffer = XLogInitBufferForRedo(record, 0);
656 0 : Assert(BufferGetBlockNumber(metabuffer) == GIN_METAPAGE_BLKNO);
657 0 : metapage = BufferGetPage(metabuffer);
658 :
659 0 : GinInitPage(metapage, GIN_META, BufferGetPageSize(metabuffer));
660 :
661 0 : memcpy(GinPageGetMeta(metapage), &data->metadata, sizeof(GinMetaPageData));
662 0 : PageSetLSN(metapage, lsn);
663 0 : MarkBufferDirty(metabuffer);
664 :
665 : /*
666 : * In normal operation, shiftList() takes exclusive lock on all the
667 : * pages-to-be-deleted simultaneously. During replay, however, it should
668 : * be all right to lock them one at a time. This is dependent on the fact
669 : * that we are deleting pages from the head of the list, and that readers
670 : * share-lock the next page before releasing the one they are on. So we
671 : * cannot get past a reader that is on, or due to visit, any page we are
672 : * going to delete. New incoming readers will block behind our metapage
673 : * lock and then see a fully updated page list.
674 : *
675 : * No full-page images are taken of the deleted pages. Instead, they are
676 : * re-initialized as empty, deleted pages. Their right-links don't need to
677 : * be preserved, because no new readers can see the pages, as explained
678 : * above.
679 : */
680 0 : for (i = 0; i < data->ndeleted; i++)
681 : {
682 : Buffer buffer;
683 : Page page;
684 :
685 0 : buffer = XLogInitBufferForRedo(record, i + 1);
686 0 : page = BufferGetPage(buffer);
687 0 : GinInitBuffer(buffer, GIN_DELETED);
688 :
689 0 : PageSetLSN(page, lsn);
690 0 : MarkBufferDirty(buffer);
691 :
692 0 : UnlockReleaseBuffer(buffer);
693 : }
694 0 : UnlockReleaseBuffer(metabuffer);
695 0 : }
696 :
697 : void
698 0 : gin_redo(XLogReaderState *record)
699 : {
700 0 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
701 : MemoryContext oldCtx;
702 :
703 : /*
704 : * GIN indexes do not require any conflict processing. NB: If we ever
705 : * implement a similar optimization as we have in b-tree, and remove
706 : * killed tuples outside VACUUM, we'll need to handle that here.
707 : */
708 :
709 0 : oldCtx = MemoryContextSwitchTo(opCtx);
710 0 : switch (info)
711 : {
712 : case XLOG_GIN_CREATE_INDEX:
713 0 : ginRedoCreateIndex(record);
714 0 : break;
715 : case XLOG_GIN_CREATE_PTREE:
716 0 : ginRedoCreatePTree(record);
717 0 : break;
718 : case XLOG_GIN_INSERT:
719 0 : ginRedoInsert(record);
720 0 : break;
721 : case XLOG_GIN_SPLIT:
722 0 : ginRedoSplit(record);
723 0 : break;
724 : case XLOG_GIN_VACUUM_PAGE:
725 0 : ginRedoVacuumPage(record);
726 0 : break;
727 : case XLOG_GIN_VACUUM_DATA_LEAF_PAGE:
728 0 : ginRedoVacuumDataLeafPage(record);
729 0 : break;
730 : case XLOG_GIN_DELETE_PAGE:
731 0 : ginRedoDeletePage(record);
732 0 : break;
733 : case XLOG_GIN_UPDATE_META_PAGE:
734 0 : ginRedoUpdateMetapage(record);
735 0 : break;
736 : case XLOG_GIN_INSERT_LISTPAGE:
737 0 : ginRedoInsertListPage(record);
738 0 : break;
739 : case XLOG_GIN_DELETE_LISTPAGE:
740 0 : ginRedoDeleteListPages(record);
741 0 : break;
742 : default:
743 0 : elog(PANIC, "gin_redo: unknown op code %u", info);
744 : }
745 0 : MemoryContextSwitchTo(oldCtx);
746 0 : MemoryContextReset(opCtx);
747 0 : }
748 :
749 : void
750 0 : gin_xlog_startup(void)
751 : {
752 0 : opCtx = AllocSetContextCreate(CurrentMemoryContext,
753 : "GIN recovery temporary context",
754 : ALLOCSET_DEFAULT_SIZES);
755 0 : }
756 :
757 : void
758 0 : gin_xlog_cleanup(void)
759 : {
760 0 : MemoryContextDelete(opCtx);
761 0 : opCtx = NULL;
762 0 : }
763 :
764 : /*
765 : * Mask a GIN page before running consistency checks on it.
766 : */
767 : void
768 0 : gin_mask(char *pagedata, BlockNumber blkno)
769 : {
770 0 : Page page = (Page) pagedata;
771 : GinPageOpaque opaque;
772 :
773 0 : mask_page_lsn(page);
774 0 : opaque = GinPageGetOpaque(page);
775 :
776 0 : mask_page_hint_bits(page);
777 :
778 : /*
779 : * GIN metapage doesn't use pd_lower/pd_upper. Other page types do. Hence,
780 : * we need to apply masking for those pages.
781 : */
782 0 : if (opaque->flags != GIN_META)
783 : {
784 : /*
785 : * For GIN_DELETED page, the page is initialized to empty. Hence, mask
786 : * the page content.
787 : */
788 0 : if (opaque->flags & GIN_DELETED)
789 0 : mask_page_content(page);
790 : else
791 0 : mask_unused_space(page);
792 : }
793 0 : }
|