LCOV - code coverage report
Current view: top level - src/backend/access/gist - gistxlog.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 34 183 18.6 %
Date: 2017-09-29 15:12:54 Functions: 2 11 18.2 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * gistxlog.c
       4             :  *    WAL replay logic for GiST.
       5             :  *
       6             :  *
       7             :  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
       8             :  * Portions Copyright (c) 1994, Regents of the University of California
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *           src/backend/access/gist/gistxlog.c
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : #include "postgres.h"
      15             : 
      16             : #include "access/bufmask.h"
      17             : #include "access/gist_private.h"
      18             : #include "access/gistxlog.h"
      19             : #include "access/xloginsert.h"
      20             : #include "access/xlogutils.h"
      21             : #include "utils/memutils.h"
      22             : 
      23             : static MemoryContext opCtx;     /* working memory for operations */
      24             : 
      25             : /*
      26             :  * Replay the clearing of F_FOLLOW_RIGHT flag on a child page.
      27             :  *
      28             :  * Even if the WAL record includes a full-page image, we have to update the
      29             :  * follow-right flag, because that change is not included in the full-page
      30             :  * image.  To be sure that the intermediate state with the wrong flag value is
      31             :  * not visible to concurrent Hot Standby queries, this function handles
      32             :  * restoring the full-page image as well as updating the flag.  (Note that
      33             :  * we never need to do anything else to the child page in the current WAL
      34             :  * action.)
      35             :  */
      36             : static void
      37           0 : gistRedoClearFollowRight(XLogReaderState *record, uint8 block_id)
      38             : {
      39           0 :     XLogRecPtr  lsn = record->EndRecPtr;
      40             :     Buffer      buffer;
      41             :     Page        page;
      42             :     XLogRedoAction action;
      43             : 
      44             :     /*
      45             :      * Note that we still update the page even if it was restored from a full
      46             :      * page image, because the updated NSN is not included in the image.
      47             :      */
      48           0 :     action = XLogReadBufferForRedo(record, block_id, &buffer);
      49           0 :     if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
      50             :     {
      51           0 :         page = BufferGetPage(buffer);
      52             : 
      53           0 :         GistPageSetNSN(page, lsn);
      54           0 :         GistClearFollowRight(page);
      55             : 
      56           0 :         PageSetLSN(page, lsn);
      57           0 :         MarkBufferDirty(buffer);
      58             :     }
      59           0 :     if (BufferIsValid(buffer))
      60           0 :         UnlockReleaseBuffer(buffer);
      61           0 : }
      62             : 
      63             : /*
      64             :  * redo any page update (except page split)
      65             :  */
      66             : static void
      67           0 : gistRedoPageUpdateRecord(XLogReaderState *record)
      68             : {
      69           0 :     XLogRecPtr  lsn = record->EndRecPtr;
      70           0 :     gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) XLogRecGetData(record);
      71             :     Buffer      buffer;
      72             :     Page        page;
      73             : 
      74           0 :     if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
      75             :     {
      76             :         char       *begin;
      77             :         char       *data;
      78             :         Size        datalen;
      79           0 :         int         ninserted = 0;
      80             : 
      81           0 :         data = begin = XLogRecGetBlockData(record, 0, &datalen);
      82             : 
      83           0 :         page = (Page) BufferGetPage(buffer);
      84             : 
      85           0 :         if (xldata->ntodelete == 1 && xldata->ntoinsert == 1)
      86           0 :         {
      87             :             /*
      88             :              * When replacing one tuple with one other tuple, we must use
      89             :              * PageIndexTupleOverwrite for consistency with gistplacetopage.
      90             :              */
      91           0 :             OffsetNumber offnum = *((OffsetNumber *) data);
      92             :             IndexTuple  itup;
      93             :             Size        itupsize;
      94             : 
      95           0 :             data += sizeof(OffsetNumber);
      96           0 :             itup = (IndexTuple) data;
      97           0 :             itupsize = IndexTupleSize(itup);
      98           0 :             if (!PageIndexTupleOverwrite(page, offnum, (Item) itup, itupsize))
      99           0 :                 elog(ERROR, "failed to add item to GiST index page, size %d bytes",
     100             :                      (int) itupsize);
     101           0 :             data += itupsize;
     102             :             /* should be nothing left after consuming 1 tuple */
     103           0 :             Assert(data - begin == datalen);
     104             :             /* update insertion count for assert check below */
     105           0 :             ninserted++;
     106             :         }
     107           0 :         else if (xldata->ntodelete > 0)
     108             :         {
     109             :             /* Otherwise, delete old tuples if any */
     110           0 :             OffsetNumber *todelete = (OffsetNumber *) data;
     111             : 
     112           0 :             data += sizeof(OffsetNumber) * xldata->ntodelete;
     113             : 
     114           0 :             PageIndexMultiDelete(page, todelete, xldata->ntodelete);
     115           0 :             if (GistPageIsLeaf(page))
     116           0 :                 GistMarkTuplesDeleted(page);
     117             :         }
     118             : 
     119             :         /* Add new tuples if any */
     120           0 :         if (data - begin < datalen)
     121             :         {
     122           0 :             OffsetNumber off = (PageIsEmpty(page)) ? FirstOffsetNumber :
     123           0 :             OffsetNumberNext(PageGetMaxOffsetNumber(page));
     124             : 
     125           0 :             while (data - begin < datalen)
     126             :             {
     127           0 :                 IndexTuple  itup = (IndexTuple) data;
     128           0 :                 Size        sz = IndexTupleSize(itup);
     129             :                 OffsetNumber l;
     130             : 
     131           0 :                 data += sz;
     132             : 
     133           0 :                 l = PageAddItem(page, (Item) itup, sz, off, false, false);
     134           0 :                 if (l == InvalidOffsetNumber)
     135           0 :                     elog(ERROR, "failed to add item to GiST index page, size %d bytes",
     136             :                          (int) sz);
     137           0 :                 off++;
     138           0 :                 ninserted++;
     139             :             }
     140             :         }
     141             : 
     142             :         /* Check that XLOG record contained expected number of tuples */
     143           0 :         Assert(ninserted == xldata->ntoinsert);
     144             : 
     145           0 :         PageSetLSN(page, lsn);
     146           0 :         MarkBufferDirty(buffer);
     147             :     }
     148             : 
     149             :     /*
     150             :      * Fix follow-right data on left child page
     151             :      *
     152             :      * This must be done while still holding the lock on the target page. Note
     153             :      * that even if the target page no longer exists, we still attempt to
     154             :      * replay the change on the child page.
     155             :      */
     156           0 :     if (XLogRecHasBlockRef(record, 1))
     157           0 :         gistRedoClearFollowRight(record, 1);
     158             : 
     159           0 :     if (BufferIsValid(buffer))
     160           0 :         UnlockReleaseBuffer(buffer);
     161           0 : }
     162             : 
     163             : /*
     164             :  * Returns an array of index pointers.
     165             :  */
     166             : static IndexTuple *
     167           0 : decodePageSplitRecord(char *begin, int len, int *n)
     168             : {
     169             :     char       *ptr;
     170           0 :     int         i = 0;
     171             :     IndexTuple *tuples;
     172             : 
     173             :     /* extract the number of tuples */
     174           0 :     memcpy(n, begin, sizeof(int));
     175           0 :     ptr = begin + sizeof(int);
     176             : 
     177           0 :     tuples = palloc(*n * sizeof(IndexTuple));
     178             : 
     179           0 :     for (i = 0; i < *n; i++)
     180             :     {
     181           0 :         Assert(ptr - begin < len);
     182           0 :         tuples[i] = (IndexTuple) ptr;
     183           0 :         ptr += IndexTupleSize((IndexTuple) ptr);
     184             :     }
     185           0 :     Assert(ptr - begin == len);
     186             : 
     187           0 :     return tuples;
     188             : }
     189             : 
     190             : static void
     191           0 : gistRedoPageSplitRecord(XLogReaderState *record)
     192             : {
     193           0 :     XLogRecPtr  lsn = record->EndRecPtr;
     194           0 :     gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record);
     195           0 :     Buffer      firstbuffer = InvalidBuffer;
     196             :     Buffer      buffer;
     197             :     Page        page;
     198             :     int         i;
     199           0 :     bool        isrootsplit = false;
     200             : 
     201             :     /*
     202             :      * We must hold lock on the first-listed page throughout the action,
     203             :      * including while updating the left child page (if any).  We can unlock
     204             :      * remaining pages in the list as soon as they've been written, because
     205             :      * there is no path for concurrent queries to reach those pages without
     206             :      * first visiting the first-listed page.
     207             :      */
     208             : 
     209             :     /* loop around all pages */
     210           0 :     for (i = 0; i < xldata->npage; i++)
     211             :     {
     212             :         int         flags;
     213             :         char       *data;
     214             :         Size        datalen;
     215             :         int         num;
     216             :         BlockNumber blkno;
     217             :         IndexTuple *tuples;
     218             : 
     219           0 :         XLogRecGetBlockTag(record, i + 1, NULL, NULL, &blkno);
     220           0 :         if (blkno == GIST_ROOT_BLKNO)
     221             :         {
     222           0 :             Assert(i == 0);
     223           0 :             isrootsplit = true;
     224             :         }
     225             : 
     226           0 :         buffer = XLogInitBufferForRedo(record, i + 1);
     227           0 :         page = (Page) BufferGetPage(buffer);
     228           0 :         data = XLogRecGetBlockData(record, i + 1, &datalen);
     229             : 
     230           0 :         tuples = decodePageSplitRecord(data, datalen, &num);
     231             : 
     232             :         /* ok, clear buffer */
     233           0 :         if (xldata->origleaf && blkno != GIST_ROOT_BLKNO)
     234           0 :             flags = F_LEAF;
     235             :         else
     236           0 :             flags = 0;
     237           0 :         GISTInitBuffer(buffer, flags);
     238             : 
     239             :         /* and fill it */
     240           0 :         gistfillbuffer(page, tuples, num, FirstOffsetNumber);
     241             : 
     242           0 :         if (blkno == GIST_ROOT_BLKNO)
     243             :         {
     244           0 :             GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
     245           0 :             GistPageSetNSN(page, xldata->orignsn);
     246           0 :             GistClearFollowRight(page);
     247             :         }
     248             :         else
     249             :         {
     250           0 :             if (i < xldata->npage - 1)
     251             :             {
     252             :                 BlockNumber nextblkno;
     253             : 
     254           0 :                 XLogRecGetBlockTag(record, i + 2, NULL, NULL, &nextblkno);
     255           0 :                 GistPageGetOpaque(page)->rightlink = nextblkno;
     256             :             }
     257             :             else
     258           0 :                 GistPageGetOpaque(page)->rightlink = xldata->origrlink;
     259           0 :             GistPageSetNSN(page, xldata->orignsn);
     260           0 :             if (i < xldata->npage - 1 && !isrootsplit &&
     261           0 :                 xldata->markfollowright)
     262           0 :                 GistMarkFollowRight(page);
     263             :             else
     264           0 :                 GistClearFollowRight(page);
     265             :         }
     266             : 
     267           0 :         PageSetLSN(page, lsn);
     268           0 :         MarkBufferDirty(buffer);
     269             : 
     270           0 :         if (i == 0)
     271           0 :             firstbuffer = buffer;
     272             :         else
     273           0 :             UnlockReleaseBuffer(buffer);
     274             :     }
     275             : 
     276             :     /* Fix follow-right data on left child page, if any */
     277           0 :     if (XLogRecHasBlockRef(record, 0))
     278           0 :         gistRedoClearFollowRight(record, 0);
     279             : 
     280             :     /* Finally, release lock on the first page */
     281           0 :     UnlockReleaseBuffer(firstbuffer);
     282           0 : }
     283             : 
     284             : static void
     285           0 : gistRedoCreateIndex(XLogReaderState *record)
     286             : {
     287           0 :     XLogRecPtr  lsn = record->EndRecPtr;
     288             :     Buffer      buffer;
     289             :     Page        page;
     290             : 
     291           0 :     buffer = XLogInitBufferForRedo(record, 0);
     292           0 :     Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
     293           0 :     page = (Page) BufferGetPage(buffer);
     294             : 
     295           0 :     GISTInitBuffer(buffer, F_LEAF);
     296             : 
     297           0 :     PageSetLSN(page, lsn);
     298             : 
     299           0 :     MarkBufferDirty(buffer);
     300           0 :     UnlockReleaseBuffer(buffer);
     301           0 : }
     302             : 
     303             : void
     304           0 : gist_redo(XLogReaderState *record)
     305             : {
     306           0 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
     307             :     MemoryContext oldCxt;
     308             : 
     309             :     /*
     310             :      * GiST indexes do not require any conflict processing. NB: If we ever
     311             :      * implement a similar optimization we have in b-tree, and remove killed
     312             :      * tuples outside VACUUM, we'll need to handle that here.
     313             :      */
     314             : 
     315           0 :     oldCxt = MemoryContextSwitchTo(opCtx);
     316           0 :     switch (info)
     317             :     {
     318             :         case XLOG_GIST_PAGE_UPDATE:
     319           0 :             gistRedoPageUpdateRecord(record);
     320           0 :             break;
     321             :         case XLOG_GIST_PAGE_SPLIT:
     322           0 :             gistRedoPageSplitRecord(record);
     323           0 :             break;
     324             :         case XLOG_GIST_CREATE_INDEX:
     325           0 :             gistRedoCreateIndex(record);
     326           0 :             break;
     327             :         default:
     328           0 :             elog(PANIC, "gist_redo: unknown op code %u", info);
     329             :     }
     330             : 
     331           0 :     MemoryContextSwitchTo(oldCxt);
     332           0 :     MemoryContextReset(opCtx);
     333           0 : }
     334             : 
     335             : void
     336           0 : gist_xlog_startup(void)
     337             : {
     338           0 :     opCtx = createTempGistContext();
     339           0 : }
     340             : 
     341             : void
     342           0 : gist_xlog_cleanup(void)
     343             : {
     344           0 :     MemoryContextDelete(opCtx);
     345           0 : }
     346             : 
     347             : /*
     348             :  * Mask a Gist page before running consistency checks on it.
     349             :  */
     350             : void
     351           0 : gist_mask(char *pagedata, BlockNumber blkno)
     352             : {
     353           0 :     Page        page = (Page) pagedata;
     354             : 
     355           0 :     mask_page_lsn(page);
     356             : 
     357           0 :     mask_page_hint_bits(page);
     358           0 :     mask_unused_space(page);
     359             : 
     360             :     /*
     361             :      * NSN is nothing but a special purpose LSN. Hence, mask it for the same
     362             :      * reason as mask_page_lsn.
     363             :      */
     364           0 :     GistPageSetNSN(page, (uint64) MASK_MARKER);
     365             : 
     366             :     /*
     367             :      * We update F_FOLLOW_RIGHT flag on the left child after writing WAL
     368             :      * record. Hence, mask this flag. See gistplacetopage() for details.
     369             :      */
     370           0 :     GistMarkFollowRight(page);
     371             : 
     372           0 :     if (GistPageIsLeaf(page))
     373             :     {
     374             :         /*
     375             :          * In gist leaf pages, it is possible to modify the LP_FLAGS without
     376             :          * emitting any WAL record. Hence, mask the line pointer flags. See
     377             :          * gistkillitems() for details.
     378             :          */
     379           0 :         mask_lp_flags(page);
     380             :     }
     381             : 
     382             :     /*
     383             :      * During gist redo, we never mark a page as garbage. Hence, mask it to
     384             :      * ignore any differences.
     385             :      */
     386           0 :     GistClearPageHasGarbage(page);
     387           0 : }
     388             : 
     389             : /*
     390             :  * Write WAL record of a page split.
     391             :  */
     392             : XLogRecPtr
     393        1274 : gistXLogSplit(bool page_is_leaf,
     394             :               SplitedPageLayout *dist,
     395             :               BlockNumber origrlink, GistNSN orignsn,
     396             :               Buffer leftchildbuf, bool markfollowright)
     397             : {
     398             :     gistxlogPageSplit xlrec;
     399             :     SplitedPageLayout *ptr;
     400        1274 :     int         npage = 0;
     401             :     XLogRecPtr  recptr;
     402             :     int         i;
     403             : 
     404        3837 :     for (ptr = dist; ptr; ptr = ptr->next)
     405        2563 :         npage++;
     406             : 
     407        1274 :     xlrec.origrlink = origrlink;
     408        1274 :     xlrec.orignsn = orignsn;
     409        1274 :     xlrec.origleaf = page_is_leaf;
     410        1274 :     xlrec.npage = (uint16) npage;
     411        1274 :     xlrec.markfollowright = markfollowright;
     412             : 
     413        1274 :     XLogBeginInsert();
     414             : 
     415             :     /*
     416             :      * Include a full page image of the child buf. (only necessary if a
     417             :      * checkpoint happened since the child page was split)
     418             :      */
     419        1274 :     if (BufferIsValid(leftchildbuf))
     420           4 :         XLogRegisterBuffer(0, leftchildbuf, REGBUF_STANDARD);
     421             : 
     422             :     /*
     423             :      * NOTE: We register a lot of data. The caller must've called
     424             :      * XLogEnsureRecordSpace() to prepare for that. We cannot do it here,
     425             :      * because we're already in a critical section. If you change the number
     426             :      * of buffer or data registrations here, make sure you modify the
     427             :      * XLogEnsureRecordSpace() calls accordingly!
     428             :      */
     429        1274 :     XLogRegisterData((char *) &xlrec, sizeof(gistxlogPageSplit));
     430             : 
     431        1274 :     i = 1;
     432        3837 :     for (ptr = dist; ptr; ptr = ptr->next)
     433             :     {
     434        2563 :         XLogRegisterBuffer(i, ptr->buffer, REGBUF_WILL_INIT);
     435        2563 :         XLogRegisterBufData(i, (char *) &(ptr->block.num), sizeof(int));
     436        2563 :         XLogRegisterBufData(i, (char *) ptr->list, ptr->lenlist);
     437        2563 :         i++;
     438             :     }
     439             : 
     440        1274 :     recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT);
     441             : 
     442        1274 :     return recptr;
     443             : }
     444             : 
     445             : /*
     446             :  * Write XLOG record describing a page update. The update can include any
     447             :  * number of deletions and/or insertions of tuples on a single index page.
     448             :  *
     449             :  * If this update inserts a downlink for a split page, also record that
     450             :  * the F_FOLLOW_RIGHT flag on the child page is cleared and NSN set.
     451             :  *
     452             :  * Note that both the todelete array and the tuples are marked as belonging
     453             :  * to the target buffer; they need not be stored in XLOG if XLogInsert decides
     454             :  * to log the whole buffer contents instead.
     455             :  */
     456             : XLogRecPtr
     457      202090 : gistXLogUpdate(Buffer buffer,
     458             :                OffsetNumber *todelete, int ntodelete,
     459             :                IndexTuple *itup, int ituplen,
     460             :                Buffer leftchildbuf)
     461             : {
     462             :     gistxlogPageUpdate xlrec;
     463             :     int         i;
     464             :     XLogRecPtr  recptr;
     465             : 
     466      202090 :     xlrec.ntodelete = ntodelete;
     467      202090 :     xlrec.ntoinsert = ituplen;
     468             : 
     469      202090 :     XLogBeginInsert();
     470      202090 :     XLogRegisterData((char *) &xlrec, sizeof(gistxlogPageUpdate));
     471             : 
     472      202090 :     XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
     473      202090 :     XLogRegisterBufData(0, (char *) todelete, sizeof(OffsetNumber) * ntodelete);
     474             : 
     475             :     /* new tuples */
     476      404739 :     for (i = 0; i < ituplen; i++)
     477      202649 :         XLogRegisterBufData(0, (char *) (itup[i]), IndexTupleSize(itup[i]));
     478             : 
     479             :     /*
     480             :      * Include a full page image of the child buf. (only necessary if a
     481             :      * checkpoint happened since the child page was split)
     482             :      */
     483      202090 :     if (BufferIsValid(leftchildbuf))
     484        1048 :         XLogRegisterBuffer(1, leftchildbuf, REGBUF_STANDARD);
     485             : 
     486      202090 :     recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE);
     487             : 
     488      202090 :     return recptr;
     489             : }

Generated by: LCOV version 1.11