LCOV - code coverage report
Current view: top level - src/backend/access/spgist - spgxlog.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 0 463 0.0 %
Date: 2017-09-29 15:12:54 Functions: 0 15 0.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * spgxlog.c
       4             :  *    WAL replay logic for SP-GiST
       5             :  *
       6             :  *
       7             :  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
       8             :  * Portions Copyright (c) 1994, Regents of the University of California
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *           src/backend/access/spgist/spgxlog.c
      12             :  *
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/bufmask.h"
      18             : #include "access/spgist_private.h"
      19             : #include "access/spgxlog.h"
      20             : #include "access/transam.h"
      21             : #include "access/xlog.h"
      22             : #include "access/xlogutils.h"
      23             : #include "storage/standby.h"
      24             : #include "utils/memutils.h"
      25             : 
      26             : 
      27             : static MemoryContext opCtx;     /* working memory for operations */
      28             : 
      29             : 
      30             : /*
      31             :  * Prepare a dummy SpGistState, with just the minimum info needed for replay.
      32             :  *
      33             :  * At present, all we need is enough info to support spgFormDeadTuple(),
      34             :  * plus the isBuild flag.
      35             :  */
      36             : static void
      37           0 : fillFakeState(SpGistState *state, spgxlogState stateSrc)
      38             : {
      39           0 :     memset(state, 0, sizeof(*state));
      40             : 
      41           0 :     state->myXid = stateSrc.myXid;
      42           0 :     state->isBuild = stateSrc.isBuild;
      43           0 :     state->deadTupleStorage = palloc0(SGDTSIZE);
      44           0 : }
      45             : 
      46             : /*
      47             :  * Add a leaf tuple, or replace an existing placeholder tuple.  This is used
      48             :  * to replay SpGistPageAddNewItem() operations.  If the offset points at an
      49             :  * existing tuple, it had better be a placeholder tuple.
      50             :  */
      51             : static void
      52           0 : addOrReplaceTuple(Page page, Item tuple, int size, OffsetNumber offset)
      53             : {
      54           0 :     if (offset <= PageGetMaxOffsetNumber(page))
      55             :     {
      56           0 :         SpGistDeadTuple dt = (SpGistDeadTuple) PageGetItem(page,
      57             :                                                            PageGetItemId(page, offset));
      58             : 
      59           0 :         if (dt->tupstate != SPGIST_PLACEHOLDER)
      60           0 :             elog(ERROR, "SPGiST tuple to be replaced is not a placeholder");
      61             : 
      62           0 :         Assert(SpGistPageGetOpaque(page)->nPlaceholder > 0);
      63           0 :         SpGistPageGetOpaque(page)->nPlaceholder--;
      64             : 
      65           0 :         PageIndexTupleDelete(page, offset);
      66             :     }
      67             : 
      68           0 :     Assert(offset <= PageGetMaxOffsetNumber(page) + 1);
      69             : 
      70           0 :     if (PageAddItem(page, tuple, size, offset, false, false) != offset)
      71           0 :         elog(ERROR, "failed to add item of size %u to SPGiST index page",
      72             :              size);
      73           0 : }
      74             : 
      75             : static void
      76           0 : spgRedoCreateIndex(XLogReaderState *record)
      77             : {
      78           0 :     XLogRecPtr  lsn = record->EndRecPtr;
      79             :     Buffer      buffer;
      80             :     Page        page;
      81             : 
      82           0 :     buffer = XLogInitBufferForRedo(record, 0);
      83           0 :     Assert(BufferGetBlockNumber(buffer) == SPGIST_METAPAGE_BLKNO);
      84           0 :     page = (Page) BufferGetPage(buffer);
      85           0 :     SpGistInitMetapage(page);
      86           0 :     PageSetLSN(page, lsn);
      87           0 :     MarkBufferDirty(buffer);
      88           0 :     UnlockReleaseBuffer(buffer);
      89             : 
      90           0 :     buffer = XLogInitBufferForRedo(record, 1);
      91           0 :     Assert(BufferGetBlockNumber(buffer) == SPGIST_ROOT_BLKNO);
      92           0 :     SpGistInitBuffer(buffer, SPGIST_LEAF);
      93           0 :     page = (Page) BufferGetPage(buffer);
      94           0 :     PageSetLSN(page, lsn);
      95           0 :     MarkBufferDirty(buffer);
      96           0 :     UnlockReleaseBuffer(buffer);
      97             : 
      98           0 :     buffer = XLogInitBufferForRedo(record, 2);
      99           0 :     Assert(BufferGetBlockNumber(buffer) == SPGIST_NULL_BLKNO);
     100           0 :     SpGistInitBuffer(buffer, SPGIST_LEAF | SPGIST_NULLS);
     101           0 :     page = (Page) BufferGetPage(buffer);
     102           0 :     PageSetLSN(page, lsn);
     103           0 :     MarkBufferDirty(buffer);
     104           0 :     UnlockReleaseBuffer(buffer);
     105           0 : }
     106             : 
     107             : static void
     108           0 : spgRedoAddLeaf(XLogReaderState *record)
     109             : {
     110           0 :     XLogRecPtr  lsn = record->EndRecPtr;
     111           0 :     char       *ptr = XLogRecGetData(record);
     112           0 :     spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr;
     113             :     char       *leafTuple;
     114             :     SpGistLeafTupleData leafTupleHdr;
     115             :     Buffer      buffer;
     116             :     Page        page;
     117             :     XLogRedoAction action;
     118             : 
     119           0 :     ptr += sizeof(spgxlogAddLeaf);
     120           0 :     leafTuple = ptr;
     121             :     /* the leaf tuple is unaligned, so make a copy to access its header */
     122           0 :     memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
     123             : 
     124             :     /*
     125             :      * In normal operation we would have both current and parent pages locked
     126             :      * simultaneously; but in WAL replay it should be safe to update the leaf
     127             :      * page before updating the parent.
     128             :      */
     129           0 :     if (xldata->newPage)
     130             :     {
     131           0 :         buffer = XLogInitBufferForRedo(record, 0);
     132           0 :         SpGistInitBuffer(buffer,
     133           0 :                          SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
     134           0 :         action = BLK_NEEDS_REDO;
     135             :     }
     136             :     else
     137           0 :         action = XLogReadBufferForRedo(record, 0, &buffer);
     138             : 
     139           0 :     if (action == BLK_NEEDS_REDO)
     140             :     {
     141           0 :         page = BufferGetPage(buffer);
     142             : 
     143             :         /* insert new tuple */
     144           0 :         if (xldata->offnumLeaf != xldata->offnumHeadLeaf)
     145             :         {
     146             :             /* normal cases, tuple was added by SpGistPageAddNewItem */
     147           0 :             addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
     148           0 :                               xldata->offnumLeaf);
     149             : 
     150             :             /* update head tuple's chain link if needed */
     151           0 :             if (xldata->offnumHeadLeaf != InvalidOffsetNumber)
     152             :             {
     153             :                 SpGistLeafTuple head;
     154             : 
     155           0 :                 head = (SpGistLeafTuple) PageGetItem(page,
     156             :                                                      PageGetItemId(page, xldata->offnumHeadLeaf));
     157           0 :                 Assert(head->nextOffset == leafTupleHdr.nextOffset);
     158           0 :                 head->nextOffset = xldata->offnumLeaf;
     159             :             }
     160             :         }
     161             :         else
     162             :         {
     163             :             /* replacing a DEAD tuple */
     164           0 :             PageIndexTupleDelete(page, xldata->offnumLeaf);
     165           0 :             if (PageAddItem(page,
     166             :                             (Item) leafTuple, leafTupleHdr.size,
     167           0 :                             xldata->offnumLeaf, false, false) != xldata->offnumLeaf)
     168           0 :                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
     169             :                      leafTupleHdr.size);
     170             :         }
     171             : 
     172           0 :         PageSetLSN(page, lsn);
     173           0 :         MarkBufferDirty(buffer);
     174             :     }
     175           0 :     if (BufferIsValid(buffer))
     176           0 :         UnlockReleaseBuffer(buffer);
     177             : 
     178             :     /* update parent downlink if necessary */
     179           0 :     if (xldata->offnumParent != InvalidOffsetNumber)
     180             :     {
     181           0 :         if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
     182             :         {
     183             :             SpGistInnerTuple tuple;
     184             :             BlockNumber blknoLeaf;
     185             : 
     186           0 :             XLogRecGetBlockTag(record, 0, NULL, NULL, &blknoLeaf);
     187             : 
     188           0 :             page = BufferGetPage(buffer);
     189             : 
     190           0 :             tuple = (SpGistInnerTuple) PageGetItem(page,
     191             :                                                    PageGetItemId(page, xldata->offnumParent));
     192             : 
     193           0 :             spgUpdateNodeLink(tuple, xldata->nodeI,
     194           0 :                               blknoLeaf, xldata->offnumLeaf);
     195             : 
     196           0 :             PageSetLSN(page, lsn);
     197           0 :             MarkBufferDirty(buffer);
     198             :         }
     199           0 :         if (BufferIsValid(buffer))
     200           0 :             UnlockReleaseBuffer(buffer);
     201             :     }
     202           0 : }
     203             : 
     204             : static void
     205           0 : spgRedoMoveLeafs(XLogReaderState *record)
     206             : {
     207           0 :     XLogRecPtr  lsn = record->EndRecPtr;
     208           0 :     char       *ptr = XLogRecGetData(record);
     209           0 :     spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr;
     210             :     SpGistState state;
     211             :     OffsetNumber *toDelete;
     212             :     OffsetNumber *toInsert;
     213             :     int         nInsert;
     214             :     Buffer      buffer;
     215             :     Page        page;
     216             :     XLogRedoAction action;
     217             :     BlockNumber blknoDst;
     218             : 
     219           0 :     XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoDst);
     220             : 
     221           0 :     fillFakeState(&state, xldata->stateSrc);
     222             : 
     223           0 :     nInsert = xldata->replaceDead ? 1 : xldata->nMoves + 1;
     224             : 
     225           0 :     ptr += SizeOfSpgxlogMoveLeafs;
     226           0 :     toDelete = (OffsetNumber *) ptr;
     227           0 :     ptr += sizeof(OffsetNumber) * xldata->nMoves;
     228           0 :     toInsert = (OffsetNumber *) ptr;
     229           0 :     ptr += sizeof(OffsetNumber) * nInsert;
     230             : 
     231             :     /* now ptr points to the list of leaf tuples */
     232             : 
     233             :     /*
     234             :      * In normal operation we would have all three pages (source, dest, and
     235             :      * parent) locked simultaneously; but in WAL replay it should be safe to
     236             :      * update them one at a time, as long as we do it in the right order.
     237             :      */
     238             : 
     239             :     /* Insert tuples on the dest page (do first, so redirect is valid) */
     240           0 :     if (xldata->newPage)
     241             :     {
     242           0 :         buffer = XLogInitBufferForRedo(record, 1);
     243           0 :         SpGistInitBuffer(buffer,
     244           0 :                          SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
     245           0 :         action = BLK_NEEDS_REDO;
     246             :     }
     247             :     else
     248           0 :         action = XLogReadBufferForRedo(record, 1, &buffer);
     249             : 
     250           0 :     if (action == BLK_NEEDS_REDO)
     251             :     {
     252             :         int         i;
     253             : 
     254           0 :         page = BufferGetPage(buffer);
     255             : 
     256           0 :         for (i = 0; i < nInsert; i++)
     257             :         {
     258             :             char       *leafTuple;
     259             :             SpGistLeafTupleData leafTupleHdr;
     260             : 
     261             :             /*
     262             :              * the tuples are not aligned, so must copy to access the size
     263             :              * field.
     264             :              */
     265           0 :             leafTuple = ptr;
     266           0 :             memcpy(&leafTupleHdr, leafTuple,
     267             :                    sizeof(SpGistLeafTupleData));
     268             : 
     269           0 :             addOrReplaceTuple(page, (Item) leafTuple,
     270           0 :                               leafTupleHdr.size, toInsert[i]);
     271           0 :             ptr += leafTupleHdr.size;
     272             :         }
     273             : 
     274           0 :         PageSetLSN(page, lsn);
     275           0 :         MarkBufferDirty(buffer);
     276             :     }
     277           0 :     if (BufferIsValid(buffer))
     278           0 :         UnlockReleaseBuffer(buffer);
     279             : 
     280             :     /* Delete tuples from the source page, inserting a redirection pointer */
     281           0 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     282             :     {
     283           0 :         page = BufferGetPage(buffer);
     284             : 
     285           0 :         spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves,
     286           0 :                                 state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
     287             :                                 SPGIST_PLACEHOLDER,
     288             :                                 blknoDst,
     289           0 :                                 toInsert[nInsert - 1]);
     290             : 
     291           0 :         PageSetLSN(page, lsn);
     292           0 :         MarkBufferDirty(buffer);
     293             :     }
     294           0 :     if (BufferIsValid(buffer))
     295           0 :         UnlockReleaseBuffer(buffer);
     296             : 
     297             :     /* And update the parent downlink */
     298           0 :     if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
     299             :     {
     300             :         SpGistInnerTuple tuple;
     301             : 
     302           0 :         page = BufferGetPage(buffer);
     303             : 
     304           0 :         tuple = (SpGistInnerTuple) PageGetItem(page,
     305             :                                                PageGetItemId(page, xldata->offnumParent));
     306             : 
     307           0 :         spgUpdateNodeLink(tuple, xldata->nodeI,
     308           0 :                           blknoDst, toInsert[nInsert - 1]);
     309             : 
     310           0 :         PageSetLSN(page, lsn);
     311           0 :         MarkBufferDirty(buffer);
     312             :     }
     313           0 :     if (BufferIsValid(buffer))
     314           0 :         UnlockReleaseBuffer(buffer);
     315           0 : }
     316             : 
     317             : static void
     318           0 : spgRedoAddNode(XLogReaderState *record)
     319             : {
     320           0 :     XLogRecPtr  lsn = record->EndRecPtr;
     321           0 :     char       *ptr = XLogRecGetData(record);
     322           0 :     spgxlogAddNode *xldata = (spgxlogAddNode *) ptr;
     323             :     char       *innerTuple;
     324             :     SpGistInnerTupleData innerTupleHdr;
     325             :     SpGistState state;
     326             :     Buffer      buffer;
     327             :     Page        page;
     328             :     XLogRedoAction action;
     329             : 
     330           0 :     ptr += sizeof(spgxlogAddNode);
     331           0 :     innerTuple = ptr;
     332             :     /* the tuple is unaligned, so make a copy to access its header */
     333           0 :     memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
     334             : 
     335           0 :     fillFakeState(&state, xldata->stateSrc);
     336             : 
     337           0 :     if (!XLogRecHasBlockRef(record, 1))
     338             :     {
     339             :         /* update in place */
     340           0 :         Assert(xldata->parentBlk == -1);
     341           0 :         if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     342             :         {
     343           0 :             page = BufferGetPage(buffer);
     344             : 
     345           0 :             PageIndexTupleDelete(page, xldata->offnum);
     346           0 :             if (PageAddItem(page, (Item) innerTuple, innerTupleHdr.size,
     347             :                             xldata->offnum,
     348           0 :                             false, false) != xldata->offnum)
     349           0 :                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
     350             :                      innerTupleHdr.size);
     351             : 
     352           0 :             PageSetLSN(page, lsn);
     353           0 :             MarkBufferDirty(buffer);
     354             :         }
     355           0 :         if (BufferIsValid(buffer))
     356           0 :             UnlockReleaseBuffer(buffer);
     357             :     }
     358             :     else
     359             :     {
     360             :         BlockNumber blkno;
     361             :         BlockNumber blknoNew;
     362             : 
     363           0 :         XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno);
     364           0 :         XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoNew);
     365             : 
     366             :         /*
     367             :          * In normal operation we would have all three pages (source, dest,
     368             :          * and parent) locked simultaneously; but in WAL replay it should be
     369             :          * safe to update them one at a time, as long as we do it in the right
     370             :          * order. We must insert the new tuple before replacing the old tuple
     371             :          * with the redirect tuple.
     372             :          */
     373             : 
     374             :         /* Install new tuple first so redirect is valid */
     375           0 :         if (xldata->newPage)
     376             :         {
     377             :             /* AddNode is not used for nulls pages */
     378           0 :             buffer = XLogInitBufferForRedo(record, 1);
     379           0 :             SpGistInitBuffer(buffer, 0);
     380           0 :             action = BLK_NEEDS_REDO;
     381             :         }
     382             :         else
     383           0 :             action = XLogReadBufferForRedo(record, 1, &buffer);
     384           0 :         if (action == BLK_NEEDS_REDO)
     385             :         {
     386           0 :             page = BufferGetPage(buffer);
     387             : 
     388           0 :             addOrReplaceTuple(page, (Item) innerTuple,
     389           0 :                               innerTupleHdr.size, xldata->offnumNew);
     390             : 
     391             :             /*
     392             :              * If parent is in this same page, update it now.
     393             :              */
     394           0 :             if (xldata->parentBlk == 1)
     395             :             {
     396             :                 SpGistInnerTuple parentTuple;
     397             : 
     398           0 :                 parentTuple = (SpGistInnerTuple) PageGetItem(page,
     399             :                                                              PageGetItemId(page, xldata->offnumParent));
     400             : 
     401           0 :                 spgUpdateNodeLink(parentTuple, xldata->nodeI,
     402           0 :                                   blknoNew, xldata->offnumNew);
     403             :             }
     404           0 :             PageSetLSN(page, lsn);
     405           0 :             MarkBufferDirty(buffer);
     406             :         }
     407           0 :         if (BufferIsValid(buffer))
     408           0 :             UnlockReleaseBuffer(buffer);
     409             : 
     410             :         /* Delete old tuple, replacing it with redirect or placeholder tuple */
     411           0 :         if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     412             :         {
     413             :             SpGistDeadTuple dt;
     414             : 
     415           0 :             page = BufferGetPage(buffer);
     416             : 
     417           0 :             if (state.isBuild)
     418           0 :                 dt = spgFormDeadTuple(&state, SPGIST_PLACEHOLDER,
     419             :                                       InvalidBlockNumber,
     420             :                                       InvalidOffsetNumber);
     421             :             else
     422           0 :                 dt = spgFormDeadTuple(&state, SPGIST_REDIRECT,
     423             :                                       blknoNew,
     424           0 :                                       xldata->offnumNew);
     425             : 
     426           0 :             PageIndexTupleDelete(page, xldata->offnum);
     427           0 :             if (PageAddItem(page, (Item) dt, dt->size,
     428             :                             xldata->offnum,
     429           0 :                             false, false) != xldata->offnum)
     430           0 :                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
     431             :                      dt->size);
     432             : 
     433           0 :             if (state.isBuild)
     434           0 :                 SpGistPageGetOpaque(page)->nPlaceholder++;
     435             :             else
     436           0 :                 SpGistPageGetOpaque(page)->nRedirection++;
     437             : 
     438             :             /*
     439             :              * If parent is in this same page, update it now.
     440             :              */
     441           0 :             if (xldata->parentBlk == 0)
     442             :             {
     443             :                 SpGistInnerTuple parentTuple;
     444             : 
     445           0 :                 parentTuple = (SpGistInnerTuple) PageGetItem(page,
     446             :                                                              PageGetItemId(page, xldata->offnumParent));
     447             : 
     448           0 :                 spgUpdateNodeLink(parentTuple, xldata->nodeI,
     449           0 :                                   blknoNew, xldata->offnumNew);
     450             :             }
     451           0 :             PageSetLSN(page, lsn);
     452           0 :             MarkBufferDirty(buffer);
     453             :         }
     454           0 :         if (BufferIsValid(buffer))
     455           0 :             UnlockReleaseBuffer(buffer);
     456             : 
     457             :         /*
     458             :          * Update parent downlink (if we didn't do it as part of the source or
     459             :          * destination page update already).
     460             :          */
     461           0 :         if (xldata->parentBlk == 2)
     462             :         {
     463           0 :             if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
     464             :             {
     465             :                 SpGistInnerTuple parentTuple;
     466             : 
     467           0 :                 page = BufferGetPage(buffer);
     468             : 
     469           0 :                 parentTuple = (SpGistInnerTuple) PageGetItem(page,
     470             :                                                              PageGetItemId(page, xldata->offnumParent));
     471             : 
     472           0 :                 spgUpdateNodeLink(parentTuple, xldata->nodeI,
     473           0 :                                   blknoNew, xldata->offnumNew);
     474             : 
     475           0 :                 PageSetLSN(page, lsn);
     476           0 :                 MarkBufferDirty(buffer);
     477             :             }
     478           0 :             if (BufferIsValid(buffer))
     479           0 :                 UnlockReleaseBuffer(buffer);
     480             :         }
     481             :     }
     482           0 : }
     483             : 
     484             : static void
     485           0 : spgRedoSplitTuple(XLogReaderState *record)
     486             : {
     487           0 :     XLogRecPtr  lsn = record->EndRecPtr;
     488           0 :     char       *ptr = XLogRecGetData(record);
     489           0 :     spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr;
     490             :     char       *prefixTuple;
     491             :     SpGistInnerTupleData prefixTupleHdr;
     492             :     char       *postfixTuple;
     493             :     SpGistInnerTupleData postfixTupleHdr;
     494             :     Buffer      buffer;
     495             :     Page        page;
     496             :     XLogRedoAction action;
     497             : 
     498           0 :     ptr += sizeof(spgxlogSplitTuple);
     499           0 :     prefixTuple = ptr;
     500             :     /* the prefix tuple is unaligned, so make a copy to access its header */
     501           0 :     memcpy(&prefixTupleHdr, prefixTuple, sizeof(SpGistInnerTupleData));
     502           0 :     ptr += prefixTupleHdr.size;
     503           0 :     postfixTuple = ptr;
     504             :     /* postfix tuple is also unaligned */
     505           0 :     memcpy(&postfixTupleHdr, postfixTuple, sizeof(SpGistInnerTupleData));
     506             : 
     507             :     /*
     508             :      * In normal operation we would have both pages locked simultaneously; but
     509             :      * in WAL replay it should be safe to update them one at a time, as long
     510             :      * as we do it in the right order.
     511             :      */
     512             : 
     513             :     /* insert postfix tuple first to avoid dangling link */
     514           0 :     if (!xldata->postfixBlkSame)
     515             :     {
     516           0 :         if (xldata->newPage)
     517             :         {
     518           0 :             buffer = XLogInitBufferForRedo(record, 1);
     519             :             /* SplitTuple is not used for nulls pages */
     520           0 :             SpGistInitBuffer(buffer, 0);
     521           0 :             action = BLK_NEEDS_REDO;
     522             :         }
     523             :         else
     524           0 :             action = XLogReadBufferForRedo(record, 1, &buffer);
     525           0 :         if (action == BLK_NEEDS_REDO)
     526             :         {
     527           0 :             page = BufferGetPage(buffer);
     528             : 
     529           0 :             addOrReplaceTuple(page, (Item) postfixTuple,
     530           0 :                               postfixTupleHdr.size, xldata->offnumPostfix);
     531             : 
     532           0 :             PageSetLSN(page, lsn);
     533           0 :             MarkBufferDirty(buffer);
     534             :         }
     535           0 :         if (BufferIsValid(buffer))
     536           0 :             UnlockReleaseBuffer(buffer);
     537             :     }
     538             : 
     539             :     /* now handle the original page */
     540           0 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     541             :     {
     542           0 :         page = BufferGetPage(buffer);
     543             : 
     544           0 :         PageIndexTupleDelete(page, xldata->offnumPrefix);
     545           0 :         if (PageAddItem(page, (Item) prefixTuple, prefixTupleHdr.size,
     546           0 :                         xldata->offnumPrefix, false, false) != xldata->offnumPrefix)
     547           0 :             elog(ERROR, "failed to add item of size %u to SPGiST index page",
     548             :                  prefixTupleHdr.size);
     549             : 
     550           0 :         if (xldata->postfixBlkSame)
     551           0 :             addOrReplaceTuple(page, (Item) postfixTuple,
     552           0 :                               postfixTupleHdr.size,
     553           0 :                               xldata->offnumPostfix);
     554             : 
     555           0 :         PageSetLSN(page, lsn);
     556           0 :         MarkBufferDirty(buffer);
     557             :     }
     558           0 :     if (BufferIsValid(buffer))
     559           0 :         UnlockReleaseBuffer(buffer);
     560           0 : }
     561             : 
     562             : static void
     563           0 : spgRedoPickSplit(XLogReaderState *record)
     564             : {
     565           0 :     XLogRecPtr  lsn = record->EndRecPtr;
     566           0 :     char       *ptr = XLogRecGetData(record);
     567           0 :     spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr;
     568             :     char       *innerTuple;
     569             :     SpGistInnerTupleData innerTupleHdr;
     570             :     SpGistState state;
     571             :     OffsetNumber *toDelete;
     572             :     OffsetNumber *toInsert;
     573             :     uint8      *leafPageSelect;
     574             :     Buffer      srcBuffer;
     575             :     Buffer      destBuffer;
     576             :     Buffer      innerBuffer;
     577             :     Page        srcPage;
     578             :     Page        destPage;
     579             :     Page        page;
     580             :     int         i;
     581             :     BlockNumber blknoInner;
     582             :     XLogRedoAction action;
     583             : 
     584           0 :     XLogRecGetBlockTag(record, 2, NULL, NULL, &blknoInner);
     585             : 
     586           0 :     fillFakeState(&state, xldata->stateSrc);
     587             : 
     588           0 :     ptr += SizeOfSpgxlogPickSplit;
     589           0 :     toDelete = (OffsetNumber *) ptr;
     590           0 :     ptr += sizeof(OffsetNumber) * xldata->nDelete;
     591           0 :     toInsert = (OffsetNumber *) ptr;
     592           0 :     ptr += sizeof(OffsetNumber) * xldata->nInsert;
     593           0 :     leafPageSelect = (uint8 *) ptr;
     594           0 :     ptr += sizeof(uint8) * xldata->nInsert;
     595             : 
     596           0 :     innerTuple = ptr;
     597             :     /* the inner tuple is unaligned, so make a copy to access its header */
     598           0 :     memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
     599           0 :     ptr += innerTupleHdr.size;
     600             : 
     601             :     /* now ptr points to the list of leaf tuples */
     602             : 
     603           0 :     if (xldata->isRootSplit)
     604             :     {
     605             :         /* when splitting root, we touch it only in the guise of new inner */
     606           0 :         srcBuffer = InvalidBuffer;
     607           0 :         srcPage = NULL;
     608             :     }
     609           0 :     else if (xldata->initSrc)
     610             :     {
     611             :         /* just re-init the source page */
     612           0 :         srcBuffer = XLogInitBufferForRedo(record, 0);
     613           0 :         srcPage = (Page) BufferGetPage(srcBuffer);
     614             : 
     615           0 :         SpGistInitBuffer(srcBuffer,
     616           0 :                          SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
     617             :         /* don't update LSN etc till we're done with it */
     618             :     }
     619             :     else
     620             :     {
     621             :         /*
     622             :          * Delete the specified tuples from source page.  (In case we're in
     623             :          * Hot Standby, we need to hold lock on the page till we're done
     624             :          * inserting leaf tuples and the new inner tuple, else the added
     625             :          * redirect tuple will be a dangling link.)
     626             :          */
     627           0 :         srcPage = NULL;
     628           0 :         if (XLogReadBufferForRedo(record, 0, &srcBuffer) == BLK_NEEDS_REDO)
     629             :         {
     630           0 :             srcPage = BufferGetPage(srcBuffer);
     631             : 
     632             :             /*
     633             :              * We have it a bit easier here than in doPickSplit(), because we
     634             :              * know the inner tuple's location already, so we can inject the
     635             :              * correct redirection tuple now.
     636             :              */
     637           0 :             if (!state.isBuild)
     638           0 :                 spgPageIndexMultiDelete(&state, srcPage,
     639           0 :                                         toDelete, xldata->nDelete,
     640             :                                         SPGIST_REDIRECT,
     641             :                                         SPGIST_PLACEHOLDER,
     642             :                                         blknoInner,
     643           0 :                                         xldata->offnumInner);
     644             :             else
     645           0 :                 spgPageIndexMultiDelete(&state, srcPage,
     646           0 :                                         toDelete, xldata->nDelete,
     647             :                                         SPGIST_PLACEHOLDER,
     648             :                                         SPGIST_PLACEHOLDER,
     649             :                                         InvalidBlockNumber,
     650             :                                         InvalidOffsetNumber);
     651             : 
     652             :             /* don't update LSN etc till we're done with it */
     653             :         }
     654             :     }
     655             : 
     656             :     /* try to access dest page if any */
     657           0 :     if (!XLogRecHasBlockRef(record, 1))
     658             :     {
     659           0 :         destBuffer = InvalidBuffer;
     660           0 :         destPage = NULL;
     661             :     }
     662           0 :     else if (xldata->initDest)
     663             :     {
     664             :         /* just re-init the dest page */
     665           0 :         destBuffer = XLogInitBufferForRedo(record, 1);
     666           0 :         destPage = (Page) BufferGetPage(destBuffer);
     667             : 
     668           0 :         SpGistInitBuffer(destBuffer,
     669           0 :                          SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
     670             :         /* don't update LSN etc till we're done with it */
     671             :     }
     672             :     else
     673             :     {
     674             :         /*
     675             :          * We could probably release the page lock immediately in the
     676             :          * full-page-image case, but for safety let's hold it till later.
     677             :          */
     678           0 :         if (XLogReadBufferForRedo(record, 1, &destBuffer) == BLK_NEEDS_REDO)
     679           0 :             destPage = (Page) BufferGetPage(destBuffer);
     680             :         else
     681           0 :             destPage = NULL;    /* don't do any page updates */
     682             :     }
     683             : 
     684             :     /* restore leaf tuples to src and/or dest page */
     685           0 :     for (i = 0; i < xldata->nInsert; i++)
     686             :     {
     687             :         char       *leafTuple;
     688             :         SpGistLeafTupleData leafTupleHdr;
     689             : 
     690             :         /* the tuples are not aligned, so must copy to access the size field. */
     691           0 :         leafTuple = ptr;
     692           0 :         memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
     693           0 :         ptr += leafTupleHdr.size;
     694             : 
     695           0 :         page = leafPageSelect[i] ? destPage : srcPage;
     696           0 :         if (page == NULL)
     697           0 :             continue;           /* no need to touch this page */
     698             : 
     699           0 :         addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
     700           0 :                           toInsert[i]);
     701             :     }
     702             : 
     703             :     /* Now update src and dest page LSNs if needed */
     704           0 :     if (srcPage != NULL)
     705             :     {
     706           0 :         PageSetLSN(srcPage, lsn);
     707           0 :         MarkBufferDirty(srcBuffer);
     708             :     }
     709           0 :     if (destPage != NULL)
     710             :     {
     711           0 :         PageSetLSN(destPage, lsn);
     712           0 :         MarkBufferDirty(destBuffer);
     713             :     }
     714             : 
     715             :     /* restore new inner tuple */
     716           0 :     if (xldata->initInner)
     717             :     {
     718           0 :         innerBuffer = XLogInitBufferForRedo(record, 2);
     719           0 :         SpGistInitBuffer(innerBuffer, (xldata->storesNulls ? SPGIST_NULLS : 0));
     720           0 :         action = BLK_NEEDS_REDO;
     721             :     }
     722             :     else
     723           0 :         action = XLogReadBufferForRedo(record, 2, &innerBuffer);
     724             : 
     725           0 :     if (action == BLK_NEEDS_REDO)
     726             :     {
     727           0 :         page = BufferGetPage(innerBuffer);
     728             : 
     729           0 :         addOrReplaceTuple(page, (Item) innerTuple, innerTupleHdr.size,
     730           0 :                           xldata->offnumInner);
     731             : 
     732             :         /* if inner is also parent, update link while we're here */
     733           0 :         if (xldata->innerIsParent)
     734             :         {
     735             :             SpGistInnerTuple parent;
     736             : 
     737           0 :             parent = (SpGistInnerTuple) PageGetItem(page,
     738             :                                                     PageGetItemId(page, xldata->offnumParent));
     739           0 :             spgUpdateNodeLink(parent, xldata->nodeI,
     740           0 :                               blknoInner, xldata->offnumInner);
     741             :         }
     742             : 
     743           0 :         PageSetLSN(page, lsn);
     744           0 :         MarkBufferDirty(innerBuffer);
     745             :     }
     746           0 :     if (BufferIsValid(innerBuffer))
     747           0 :         UnlockReleaseBuffer(innerBuffer);
     748             : 
     749             :     /*
     750             :      * Now we can release the leaf-page locks.  It's okay to do this before
     751             :      * updating the parent downlink.
     752             :      */
     753           0 :     if (BufferIsValid(srcBuffer))
     754           0 :         UnlockReleaseBuffer(srcBuffer);
     755           0 :     if (BufferIsValid(destBuffer))
     756           0 :         UnlockReleaseBuffer(destBuffer);
     757             : 
     758             :     /* update parent downlink, unless we did it above */
     759           0 :     if (XLogRecHasBlockRef(record, 3))
     760             :     {
     761             :         Buffer      parentBuffer;
     762             : 
     763           0 :         if (XLogReadBufferForRedo(record, 3, &parentBuffer) == BLK_NEEDS_REDO)
     764             :         {
     765             :             SpGistInnerTuple parent;
     766             : 
     767           0 :             page = BufferGetPage(parentBuffer);
     768             : 
     769           0 :             parent = (SpGistInnerTuple) PageGetItem(page,
     770             :                                                     PageGetItemId(page, xldata->offnumParent));
     771           0 :             spgUpdateNodeLink(parent, xldata->nodeI,
     772           0 :                               blknoInner, xldata->offnumInner);
     773             : 
     774           0 :             PageSetLSN(page, lsn);
     775           0 :             MarkBufferDirty(parentBuffer);
     776             :         }
     777           0 :         if (BufferIsValid(parentBuffer))
     778           0 :             UnlockReleaseBuffer(parentBuffer);
     779             :     }
     780             :     else
     781           0 :         Assert(xldata->innerIsParent || xldata->isRootSplit);
     782           0 : }
     783             : 
     784             : static void
     785           0 : spgRedoVacuumLeaf(XLogReaderState *record)
     786             : {
     787           0 :     XLogRecPtr  lsn = record->EndRecPtr;
     788           0 :     char       *ptr = XLogRecGetData(record);
     789           0 :     spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr;
     790             :     OffsetNumber *toDead;
     791             :     OffsetNumber *toPlaceholder;
     792             :     OffsetNumber *moveSrc;
     793             :     OffsetNumber *moveDest;
     794             :     OffsetNumber *chainSrc;
     795             :     OffsetNumber *chainDest;
     796             :     SpGistState state;
     797             :     Buffer      buffer;
     798             :     Page        page;
     799             :     int         i;
     800             : 
     801           0 :     fillFakeState(&state, xldata->stateSrc);
     802             : 
     803           0 :     ptr += SizeOfSpgxlogVacuumLeaf;
     804           0 :     toDead = (OffsetNumber *) ptr;
     805           0 :     ptr += sizeof(OffsetNumber) * xldata->nDead;
     806           0 :     toPlaceholder = (OffsetNumber *) ptr;
     807           0 :     ptr += sizeof(OffsetNumber) * xldata->nPlaceholder;
     808           0 :     moveSrc = (OffsetNumber *) ptr;
     809           0 :     ptr += sizeof(OffsetNumber) * xldata->nMove;
     810           0 :     moveDest = (OffsetNumber *) ptr;
     811           0 :     ptr += sizeof(OffsetNumber) * xldata->nMove;
     812           0 :     chainSrc = (OffsetNumber *) ptr;
     813           0 :     ptr += sizeof(OffsetNumber) * xldata->nChain;
     814           0 :     chainDest = (OffsetNumber *) ptr;
     815             : 
     816           0 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     817             :     {
     818           0 :         page = BufferGetPage(buffer);
     819             : 
     820           0 :         spgPageIndexMultiDelete(&state, page,
     821           0 :                                 toDead, xldata->nDead,
     822             :                                 SPGIST_DEAD, SPGIST_DEAD,
     823             :                                 InvalidBlockNumber,
     824             :                                 InvalidOffsetNumber);
     825             : 
     826           0 :         spgPageIndexMultiDelete(&state, page,
     827           0 :                                 toPlaceholder, xldata->nPlaceholder,
     828             :                                 SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
     829             :                                 InvalidBlockNumber,
     830             :                                 InvalidOffsetNumber);
     831             : 
     832             :         /* see comments in vacuumLeafPage() */
     833           0 :         for (i = 0; i < xldata->nMove; i++)
     834             :         {
     835           0 :             ItemId      idSrc = PageGetItemId(page, moveSrc[i]);
     836           0 :             ItemId      idDest = PageGetItemId(page, moveDest[i]);
     837             :             ItemIdData  tmp;
     838             : 
     839           0 :             tmp = *idSrc;
     840           0 :             *idSrc = *idDest;
     841           0 :             *idDest = tmp;
     842             :         }
     843             : 
     844           0 :         spgPageIndexMultiDelete(&state, page,
     845           0 :                                 moveSrc, xldata->nMove,
     846             :                                 SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
     847             :                                 InvalidBlockNumber,
     848             :                                 InvalidOffsetNumber);
     849             : 
     850           0 :         for (i = 0; i < xldata->nChain; i++)
     851             :         {
     852             :             SpGistLeafTuple lt;
     853             : 
     854           0 :             lt = (SpGistLeafTuple) PageGetItem(page,
     855             :                                                PageGetItemId(page, chainSrc[i]));
     856           0 :             Assert(lt->tupstate == SPGIST_LIVE);
     857           0 :             lt->nextOffset = chainDest[i];
     858             :         }
     859             : 
     860           0 :         PageSetLSN(page, lsn);
     861           0 :         MarkBufferDirty(buffer);
     862             :     }
     863           0 :     if (BufferIsValid(buffer))
     864           0 :         UnlockReleaseBuffer(buffer);
     865           0 : }
     866             : 
     867             : static void
     868           0 : spgRedoVacuumRoot(XLogReaderState *record)
     869             : {
     870           0 :     XLogRecPtr  lsn = record->EndRecPtr;
     871           0 :     char       *ptr = XLogRecGetData(record);
     872           0 :     spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr;
     873             :     OffsetNumber *toDelete;
     874             :     Buffer      buffer;
     875             :     Page        page;
     876             : 
     877           0 :     toDelete = xldata->offsets;
     878             : 
     879           0 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     880             :     {
     881           0 :         page = BufferGetPage(buffer);
     882             : 
     883             :         /* The tuple numbers are in order */
     884           0 :         PageIndexMultiDelete(page, toDelete, xldata->nDelete);
     885             : 
     886           0 :         PageSetLSN(page, lsn);
     887           0 :         MarkBufferDirty(buffer);
     888             :     }
     889           0 :     if (BufferIsValid(buffer))
     890           0 :         UnlockReleaseBuffer(buffer);
     891           0 : }
     892             : 
     893             : static void
     894           0 : spgRedoVacuumRedirect(XLogReaderState *record)
     895             : {
     896           0 :     XLogRecPtr  lsn = record->EndRecPtr;
     897           0 :     char       *ptr = XLogRecGetData(record);
     898           0 :     spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *) ptr;
     899             :     OffsetNumber *itemToPlaceholder;
     900             :     Buffer      buffer;
     901             : 
     902           0 :     itemToPlaceholder = xldata->offsets;
     903             : 
     904             :     /*
     905             :      * If any redirection tuples are being removed, make sure there are no
     906             :      * live Hot Standby transactions that might need to see them.
     907             :      */
     908           0 :     if (InHotStandby)
     909             :     {
     910           0 :         if (TransactionIdIsValid(xldata->newestRedirectXid))
     911             :         {
     912             :             RelFileNode node;
     913             : 
     914           0 :             XLogRecGetBlockTag(record, 0, &node, NULL, NULL);
     915           0 :             ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid,
     916             :                                                 node);
     917             :         }
     918             :     }
     919             : 
     920           0 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
     921             :     {
     922           0 :         Page        page = BufferGetPage(buffer);
     923           0 :         SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
     924             :         int         i;
     925             : 
     926             :         /* Convert redirect pointers to plain placeholders */
     927           0 :         for (i = 0; i < xldata->nToPlaceholder; i++)
     928             :         {
     929             :             SpGistDeadTuple dt;
     930             : 
     931           0 :             dt = (SpGistDeadTuple) PageGetItem(page,
     932             :                                                PageGetItemId(page, itemToPlaceholder[i]));
     933           0 :             Assert(dt->tupstate == SPGIST_REDIRECT);
     934           0 :             dt->tupstate = SPGIST_PLACEHOLDER;
     935           0 :             ItemPointerSetInvalid(&dt->pointer);
     936             :         }
     937             : 
     938           0 :         Assert(opaque->nRedirection >= xldata->nToPlaceholder);
     939           0 :         opaque->nRedirection -= xldata->nToPlaceholder;
     940           0 :         opaque->nPlaceholder += xldata->nToPlaceholder;
     941             : 
     942             :         /* Remove placeholder tuples at end of page */
     943           0 :         if (xldata->firstPlaceholder != InvalidOffsetNumber)
     944             :         {
     945           0 :             int         max = PageGetMaxOffsetNumber(page);
     946             :             OffsetNumber *toDelete;
     947             : 
     948           0 :             toDelete = palloc(sizeof(OffsetNumber) * max);
     949             : 
     950           0 :             for (i = xldata->firstPlaceholder; i <= max; i++)
     951           0 :                 toDelete[i - xldata->firstPlaceholder] = i;
     952             : 
     953           0 :             i = max - xldata->firstPlaceholder + 1;
     954           0 :             Assert(opaque->nPlaceholder >= i);
     955           0 :             opaque->nPlaceholder -= i;
     956             : 
     957             :             /* The array is sorted, so can use PageIndexMultiDelete */
     958           0 :             PageIndexMultiDelete(page, toDelete, i);
     959             : 
     960           0 :             pfree(toDelete);
     961             :         }
     962             : 
     963           0 :         PageSetLSN(page, lsn);
     964           0 :         MarkBufferDirty(buffer);
     965             :     }
     966           0 :     if (BufferIsValid(buffer))
     967           0 :         UnlockReleaseBuffer(buffer);
     968           0 : }
     969             : 
     970             : void
     971           0 : spg_redo(XLogReaderState *record)
     972             : {
     973           0 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
     974             :     MemoryContext oldCxt;
     975             : 
     976           0 :     oldCxt = MemoryContextSwitchTo(opCtx);
     977           0 :     switch (info)
     978             :     {
     979             :         case XLOG_SPGIST_CREATE_INDEX:
     980           0 :             spgRedoCreateIndex(record);
     981           0 :             break;
     982             :         case XLOG_SPGIST_ADD_LEAF:
     983           0 :             spgRedoAddLeaf(record);
     984           0 :             break;
     985             :         case XLOG_SPGIST_MOVE_LEAFS:
     986           0 :             spgRedoMoveLeafs(record);
     987           0 :             break;
     988             :         case XLOG_SPGIST_ADD_NODE:
     989           0 :             spgRedoAddNode(record);
     990           0 :             break;
     991             :         case XLOG_SPGIST_SPLIT_TUPLE:
     992           0 :             spgRedoSplitTuple(record);
     993           0 :             break;
     994             :         case XLOG_SPGIST_PICKSPLIT:
     995           0 :             spgRedoPickSplit(record);
     996           0 :             break;
     997             :         case XLOG_SPGIST_VACUUM_LEAF:
     998           0 :             spgRedoVacuumLeaf(record);
     999           0 :             break;
    1000             :         case XLOG_SPGIST_VACUUM_ROOT:
    1001           0 :             spgRedoVacuumRoot(record);
    1002           0 :             break;
    1003             :         case XLOG_SPGIST_VACUUM_REDIRECT:
    1004           0 :             spgRedoVacuumRedirect(record);
    1005           0 :             break;
    1006             :         default:
    1007           0 :             elog(PANIC, "spg_redo: unknown op code %u", info);
    1008             :     }
    1009             : 
    1010           0 :     MemoryContextSwitchTo(oldCxt);
    1011           0 :     MemoryContextReset(opCtx);
    1012           0 : }
    1013             : 
    1014             : void
    1015           0 : spg_xlog_startup(void)
    1016             : {
    1017           0 :     opCtx = AllocSetContextCreate(CurrentMemoryContext,
    1018             :                                   "SP-GiST temporary context",
    1019             :                                   ALLOCSET_DEFAULT_SIZES);
    1020           0 : }
    1021             : 
    1022             : void
    1023           0 : spg_xlog_cleanup(void)
    1024             : {
    1025           0 :     MemoryContextDelete(opCtx);
    1026           0 :     opCtx = NULL;
    1027           0 : }
    1028             : 
    1029             : /*
    1030             :  * Mask a SpGist page before performing consistency checks on it.
    1031             :  */
    1032             : void
    1033           0 : spg_mask(char *pagedata, BlockNumber blkno)
    1034             : {
    1035           0 :     Page        page = (Page) pagedata;
    1036             : 
    1037           0 :     mask_page_lsn(page);
    1038             : 
    1039           0 :     mask_page_hint_bits(page);
    1040             : 
    1041             :     /*
    1042             :      * Any SpGist page other than meta contains unused space which needs to be
    1043             :      * masked.
    1044             :      */
    1045           0 :     if (!SpGistPageIsMeta(page))
    1046           0 :         mask_unused_space(page);
    1047           0 : }

Generated by: LCOV version 1.11