LCOV - code coverage report
Current view: top level - src/backend/access/spgist - spgvacuum.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 236 320 73.8 %
Date: 2017-09-29 15:12:54 Functions: 8 11 72.7 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * spgvacuum.c
       4             :  *    vacuum for SP-GiST
       5             :  *
       6             :  *
       7             :  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
       8             :  * Portions Copyright (c) 1994, Regents of the University of California
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *          src/backend/access/spgist/spgvacuum.c
      12             :  *
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : 
      16             : #include "postgres.h"
      17             : 
      18             : #include "access/genam.h"
      19             : #include "access/spgist_private.h"
      20             : #include "access/spgxlog.h"
      21             : #include "access/transam.h"
      22             : #include "access/xloginsert.h"
      23             : #include "catalog/storage_xlog.h"
      24             : #include "commands/vacuum.h"
      25             : #include "miscadmin.h"
      26             : #include "storage/bufmgr.h"
      27             : #include "storage/indexfsm.h"
      28             : #include "storage/lmgr.h"
      29             : #include "utils/snapmgr.h"
      30             : 
      31             : 
      32             : /* Entry in pending-list of TIDs we need to revisit */
      33             : typedef struct spgVacPendingItem
      34             : {
      35             :     ItemPointerData tid;        /* redirection target to visit */
      36             :     bool        done;           /* have we dealt with this? */
      37             :     struct spgVacPendingItem *next; /* list link */
      38             : } spgVacPendingItem;
      39             : 
      40             : /* Local state for vacuum operations */
      41             : typedef struct spgBulkDeleteState
      42             : {
      43             :     /* Parameters passed in to spgvacuumscan */
      44             :     IndexVacuumInfo *info;
      45             :     IndexBulkDeleteResult *stats;
      46             :     IndexBulkDeleteCallback callback;
      47             :     void       *callback_state;
      48             : 
      49             :     /* Additional working state */
      50             :     SpGistState spgstate;       /* for SPGiST operations that need one */
      51             :     spgVacPendingItem *pendingList; /* TIDs we need to (re)visit */
      52             :     TransactionId myXmin;       /* for detecting newly-added redirects */
      53             :     BlockNumber lastFilledBlock;    /* last non-deletable block */
      54             : } spgBulkDeleteState;
      55             : 
      56             : 
      57             : /*
      58             :  * Add TID to pendingList, but only if not already present.
      59             :  *
      60             :  * Note that new items are always appended at the end of the list; this
      61             :  * ensures that scans of the list don't miss items added during the scan.
      62             :  */
      63             : static void
      64           0 : spgAddPendingTID(spgBulkDeleteState *bds, ItemPointer tid)
      65             : {
      66             :     spgVacPendingItem *pitem;
      67             :     spgVacPendingItem **listLink;
      68             : 
      69             :     /* search the list for pre-existing entry */
      70           0 :     listLink = &bds->pendingList;
      71           0 :     while (*listLink != NULL)
      72             :     {
      73           0 :         pitem = *listLink;
      74           0 :         if (ItemPointerEquals(tid, &pitem->tid))
      75           0 :             return;             /* already in list, do nothing */
      76           0 :         listLink = &pitem->next;
      77             :     }
      78             :     /* not there, so append new entry */
      79           0 :     pitem = (spgVacPendingItem *) palloc(sizeof(spgVacPendingItem));
      80           0 :     pitem->tid = *tid;
      81           0 :     pitem->done = false;
      82           0 :     pitem->next = NULL;
      83           0 :     *listLink = pitem;
      84             : }
      85             : 
      86             : /*
      87             :  * Clear pendingList
      88             :  */
      89             : static void
      90           0 : spgClearPendingList(spgBulkDeleteState *bds)
      91             : {
      92             :     spgVacPendingItem *pitem;
      93             :     spgVacPendingItem *nitem;
      94             : 
      95           0 :     for (pitem = bds->pendingList; pitem != NULL; pitem = nitem)
      96             :     {
      97           0 :         nitem = pitem->next;
      98             :         /* All items in list should have been dealt with */
      99           0 :         Assert(pitem->done);
     100           0 :         pfree(pitem);
     101             :     }
     102           0 :     bds->pendingList = NULL;
     103           0 : }
     104             : 
     105             : /*
     106             :  * Vacuum a regular (non-root) leaf page
     107             :  *
     108             :  * We must delete tuples that are targeted for deletion by the VACUUM,
     109             :  * but not move any tuples that are referenced by outside links; we assume
     110             :  * those are the ones that are heads of chains.
     111             :  *
     112             :  * If we find a REDIRECT that was made by a concurrently-running transaction,
     113             :  * we must add its target TID to pendingList.  (We don't try to visit the
     114             :  * target immediately, first because we don't want VACUUM locking more than
     115             :  * one buffer at a time, and second because the duplicate-filtering logic
     116             :  * in spgAddPendingTID is useful to ensure we can't get caught in an infinite
     117             :  * loop in the face of continuous concurrent insertions.)
     118             :  *
     119             :  * If forPending is true, we are examining the page as a consequence of
     120             :  * chasing a redirect link, not as part of the normal sequential scan.
     121             :  * We still vacuum the page normally, but we don't increment the stats
     122             :  * about live tuples; else we'd double-count those tuples, since the page
     123             :  * has been or will be visited in the sequential scan as well.
     124             :  */
     125             : static void
     126         554 : vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer,
     127             :                bool forPending)
     128             : {
     129         554 :     Page        page = BufferGetPage(buffer);
     130             :     spgxlogVacuumLeaf xlrec;
     131             :     OffsetNumber toDead[MaxIndexTuplesPerPage];
     132             :     OffsetNumber toPlaceholder[MaxIndexTuplesPerPage];
     133             :     OffsetNumber moveSrc[MaxIndexTuplesPerPage];
     134             :     OffsetNumber moveDest[MaxIndexTuplesPerPage];
     135             :     OffsetNumber chainSrc[MaxIndexTuplesPerPage];
     136             :     OffsetNumber chainDest[MaxIndexTuplesPerPage];
     137             :     OffsetNumber predecessor[MaxIndexTuplesPerPage + 1];
     138             :     bool        deletable[MaxIndexTuplesPerPage + 1];
     139             :     int         nDeletable;
     140             :     OffsetNumber i,
     141         554 :                 max = PageGetMaxOffsetNumber(page);
     142             : 
     143         554 :     memset(predecessor, 0, sizeof(predecessor));
     144         554 :     memset(deletable, 0, sizeof(deletable));
     145         554 :     nDeletable = 0;
     146             : 
     147             :     /* Scan page, identify tuples to delete, accumulate stats */
     148      113312 :     for (i = FirstOffsetNumber; i <= max; i++)
     149             :     {
     150             :         SpGistLeafTuple lt;
     151             : 
     152      112758 :         lt = (SpGistLeafTuple) PageGetItem(page,
     153             :                                            PageGetItemId(page, i));
     154      112758 :         if (lt->tupstate == SPGIST_LIVE)
     155             :         {
     156       85470 :             Assert(ItemPointerIsValid(&lt->heapPtr));
     157             : 
     158       85470 :             if (bds->callback(&lt->heapPtr, bds->callback_state))
     159             :             {
     160       15009 :                 bds->stats->tuples_removed += 1;
     161       15009 :                 deletable[i] = true;
     162       15009 :                 nDeletable++;
     163             :             }
     164             :             else
     165             :             {
     166       70461 :                 if (!forPending)
     167       70461 :                     bds->stats->num_index_tuples += 1;
     168             :             }
     169             : 
     170             :             /* Form predecessor map, too */
     171       85470 :             if (lt->nextOffset != InvalidOffsetNumber)
     172             :             {
     173             :                 /* paranoia about corrupted chain links */
     174      168512 :                 if (lt->nextOffset < FirstOffsetNumber ||
     175      168512 :                     lt->nextOffset > max ||
     176       84256 :                     predecessor[lt->nextOffset] != InvalidOffsetNumber)
     177           0 :                     elog(ERROR, "inconsistent tuple chain links in page %u of index \"%s\"",
     178             :                          BufferGetBlockNumber(buffer),
     179             :                          RelationGetRelationName(index));
     180       84256 :                 predecessor[lt->nextOffset] = i;
     181             :             }
     182             :         }
     183       27288 :         else if (lt->tupstate == SPGIST_REDIRECT)
     184             :         {
     185         116 :             SpGistDeadTuple dt = (SpGistDeadTuple) lt;
     186             : 
     187         116 :             Assert(dt->nextOffset == InvalidOffsetNumber);
     188         116 :             Assert(ItemPointerIsValid(&dt->pointer));
     189             : 
     190             :             /*
     191             :              * Add target TID to pending list if the redirection could have
     192             :              * happened since VACUUM started.
     193             :              *
     194             :              * Note: we could make a tighter test by seeing if the xid is
     195             :              * "running" according to the active snapshot; but tqual.c doesn't
     196             :              * currently export a suitable API, and it's not entirely clear
     197             :              * that a tighter test is worth the cycles anyway.
     198             :              */
     199         116 :             if (TransactionIdFollowsOrEquals(dt->xid, bds->myXmin))
     200           0 :                 spgAddPendingTID(bds, &dt->pointer);
     201             :         }
     202             :         else
     203             :         {
     204       27172 :             Assert(lt->nextOffset == InvalidOffsetNumber);
     205             :         }
     206             :     }
     207             : 
     208         554 :     if (nDeletable == 0)
     209         990 :         return;                 /* nothing more to do */
     210             : 
     211             :     /*----------
     212             :      * Figure out exactly what we have to do.  We do this separately from
     213             :      * actually modifying the page, mainly so that we have a representation
     214             :      * that can be dumped into WAL and then the replay code can do exactly
     215             :      * the same thing.  The output of this step consists of six arrays
     216             :      * describing four kinds of operations, to be performed in this order:
     217             :      *
     218             :      * toDead[]: tuple numbers to be replaced with DEAD tuples
     219             :      * toPlaceholder[]: tuple numbers to be replaced with PLACEHOLDER tuples
     220             :      * moveSrc[]: tuple numbers that need to be relocated to another offset
     221             :      * (replacing the tuple there) and then replaced with PLACEHOLDER tuples
     222             :      * moveDest[]: new locations for moveSrc tuples
     223             :      * chainSrc[]: tuple numbers whose chain links (nextOffset) need updates
     224             :      * chainDest[]: new values of nextOffset for chainSrc members
     225             :      *
     226             :      * It's easiest to figure out what we have to do by processing tuple
     227             :      * chains, so we iterate over all the tuples (not just the deletable
     228             :      * ones!) to identify chain heads, then chase down each chain and make
     229             :      * work item entries for deletable tuples within the chain.
     230             :      *----------
     231             :      */
     232         118 :     xlrec.nDead = xlrec.nPlaceholder = xlrec.nMove = xlrec.nChain = 0;
     233             : 
     234       20391 :     for (i = FirstOffsetNumber; i <= max; i++)
     235             :     {
     236             :         SpGistLeafTuple head;
     237             :         bool        interveningDeletable;
     238             :         OffsetNumber prevLive;
     239             :         OffsetNumber j;
     240             : 
     241       20273 :         head = (SpGistLeafTuple) PageGetItem(page,
     242             :                                              PageGetItemId(page, i));
     243       20273 :         if (head->tupstate != SPGIST_LIVE)
     244          21 :             continue;           /* can't be a chain member */
     245       20252 :         if (predecessor[i] != 0)
     246       20133 :             continue;           /* not a chain head */
     247             : 
     248             :         /* initialize ... */
     249         119 :         interveningDeletable = false;
     250         119 :         prevLive = deletable[i] ? InvalidOffsetNumber : i;
     251             : 
     252             :         /* scan down the chain ... */
     253         119 :         j = head->nextOffset;
     254       20371 :         while (j != InvalidOffsetNumber)
     255             :         {
     256             :             SpGistLeafTuple lt;
     257             : 
     258       20133 :             lt = (SpGistLeafTuple) PageGetItem(page,
     259             :                                                PageGetItemId(page, j));
     260       20133 :             if (lt->tupstate != SPGIST_LIVE)
     261             :             {
     262             :                 /* all tuples in chain should be live */
     263           0 :                 elog(ERROR, "unexpected SPGiST tuple state: %d",
     264             :                      lt->tupstate);
     265             :             }
     266             : 
     267       20133 :             if (deletable[j])
     268             :             {
     269             :                 /* This tuple should be replaced by a placeholder */
     270       14892 :                 toPlaceholder[xlrec.nPlaceholder] = j;
     271       14892 :                 xlrec.nPlaceholder++;
     272             :                 /* previous live tuple's chain link will need an update */
     273       14892 :                 interveningDeletable = true;
     274             :             }
     275        5241 :             else if (prevLive == InvalidOffsetNumber)
     276             :             {
     277             :                 /*
     278             :                  * This is the first live tuple in the chain.  It has to move
     279             :                  * to the head position.
     280             :                  */
     281         117 :                 moveSrc[xlrec.nMove] = j;
     282         117 :                 moveDest[xlrec.nMove] = i;
     283         117 :                 xlrec.nMove++;
     284             :                 /* Chain updates will be applied after the move */
     285         117 :                 prevLive = i;
     286         117 :                 interveningDeletable = false;
     287             :             }
     288             :             else
     289             :             {
     290             :                 /*
     291             :                  * Second or later live tuple.  Arrange to re-chain it to the
     292             :                  * previous live one, if there was a gap.
     293             :                  */
     294        5124 :                 if (interveningDeletable)
     295             :                 {
     296        4882 :                     chainSrc[xlrec.nChain] = prevLive;
     297        4882 :                     chainDest[xlrec.nChain] = j;
     298        4882 :                     xlrec.nChain++;
     299             :                 }
     300        5124 :                 prevLive = j;
     301        5124 :                 interveningDeletable = false;
     302             :             }
     303             : 
     304       20133 :             j = lt->nextOffset;
     305             :         }
     306             : 
     307         119 :         if (prevLive == InvalidOffsetNumber)
     308             :         {
     309             :             /* The chain is entirely removable, so we need a DEAD tuple */
     310           0 :             toDead[xlrec.nDead] = i;
     311           0 :             xlrec.nDead++;
     312             :         }
     313         119 :         else if (interveningDeletable)
     314             :         {
     315             :             /* One or more deletions at end of chain, so close it off */
     316         119 :             chainSrc[xlrec.nChain] = prevLive;
     317         119 :             chainDest[xlrec.nChain] = InvalidOffsetNumber;
     318         119 :             xlrec.nChain++;
     319             :         }
     320             :     }
     321             : 
     322             :     /* sanity check ... */
     323         118 :     if (nDeletable != xlrec.nDead + xlrec.nPlaceholder + xlrec.nMove)
     324           0 :         elog(ERROR, "inconsistent counts of deletable tuples");
     325             : 
     326             :     /* Do the updates */
     327         118 :     START_CRIT_SECTION();
     328             : 
     329         118 :     spgPageIndexMultiDelete(&bds->spgstate, page,
     330         118 :                             toDead, xlrec.nDead,
     331             :                             SPGIST_DEAD, SPGIST_DEAD,
     332             :                             InvalidBlockNumber, InvalidOffsetNumber);
     333             : 
     334         118 :     spgPageIndexMultiDelete(&bds->spgstate, page,
     335         118 :                             toPlaceholder, xlrec.nPlaceholder,
     336             :                             SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
     337             :                             InvalidBlockNumber, InvalidOffsetNumber);
     338             : 
     339             :     /*
     340             :      * We implement the move step by swapping the item pointers of the source
     341             :      * and target tuples, then replacing the newly-source tuples with
     342             :      * placeholders.  This is perhaps unduly friendly with the page data
     343             :      * representation, but it's fast and doesn't risk page overflow when a
     344             :      * tuple to be relocated is large.
     345             :      */
     346         235 :     for (i = 0; i < xlrec.nMove; i++)
     347             :     {
     348         117 :         ItemId      idSrc = PageGetItemId(page, moveSrc[i]);
     349         117 :         ItemId      idDest = PageGetItemId(page, moveDest[i]);
     350             :         ItemIdData  tmp;
     351             : 
     352         117 :         tmp = *idSrc;
     353         117 :         *idSrc = *idDest;
     354         117 :         *idDest = tmp;
     355             :     }
     356             : 
     357         118 :     spgPageIndexMultiDelete(&bds->spgstate, page,
     358         118 :                             moveSrc, xlrec.nMove,
     359             :                             SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
     360             :                             InvalidBlockNumber, InvalidOffsetNumber);
     361             : 
     362        5119 :     for (i = 0; i < xlrec.nChain; i++)
     363             :     {
     364             :         SpGistLeafTuple lt;
     365             : 
     366        5001 :         lt = (SpGistLeafTuple) PageGetItem(page,
     367             :                                            PageGetItemId(page, chainSrc[i]));
     368        5001 :         Assert(lt->tupstate == SPGIST_LIVE);
     369        5001 :         lt->nextOffset = chainDest[i];
     370             :     }
     371             : 
     372         118 :     MarkBufferDirty(buffer);
     373             : 
     374         118 :     if (RelationNeedsWAL(index))
     375             :     {
     376             :         XLogRecPtr  recptr;
     377             : 
     378         118 :         XLogBeginInsert();
     379             : 
     380         118 :         STORE_STATE(&bds->spgstate, xlrec.stateSrc);
     381             : 
     382         118 :         XLogRegisterData((char *) &xlrec, SizeOfSpgxlogVacuumLeaf);
     383             :         /* sizeof(xlrec) should be a multiple of sizeof(OffsetNumber) */
     384         118 :         XLogRegisterData((char *) toDead, sizeof(OffsetNumber) * xlrec.nDead);
     385         118 :         XLogRegisterData((char *) toPlaceholder, sizeof(OffsetNumber) * xlrec.nPlaceholder);
     386         118 :         XLogRegisterData((char *) moveSrc, sizeof(OffsetNumber) * xlrec.nMove);
     387         118 :         XLogRegisterData((char *) moveDest, sizeof(OffsetNumber) * xlrec.nMove);
     388         118 :         XLogRegisterData((char *) chainSrc, sizeof(OffsetNumber) * xlrec.nChain);
     389         118 :         XLogRegisterData((char *) chainDest, sizeof(OffsetNumber) * xlrec.nChain);
     390             : 
     391         118 :         XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
     392             : 
     393         118 :         recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_LEAF);
     394             : 
     395         118 :         PageSetLSN(page, recptr);
     396             :     }
     397             : 
     398         118 :     END_CRIT_SECTION();
     399             : }
     400             : 
     401             : /*
     402             :  * Vacuum a root page when it is also a leaf
     403             :  *
     404             :  * On the root, we just delete any dead leaf tuples; no fancy business
     405             :  */
     406             : static void
     407           4 : vacuumLeafRoot(spgBulkDeleteState *bds, Relation index, Buffer buffer)
     408             : {
     409           4 :     Page        page = BufferGetPage(buffer);
     410             :     spgxlogVacuumRoot xlrec;
     411             :     OffsetNumber toDelete[MaxIndexTuplesPerPage];
     412             :     OffsetNumber i,
     413           4 :                 max = PageGetMaxOffsetNumber(page);
     414             : 
     415           4 :     xlrec.nDelete = 0;
     416             : 
     417             :     /* Scan page, identify tuples to delete, accumulate stats */
     418          22 :     for (i = FirstOffsetNumber; i <= max; i++)
     419             :     {
     420             :         SpGistLeafTuple lt;
     421             : 
     422          18 :         lt = (SpGistLeafTuple) PageGetItem(page,
     423             :                                            PageGetItemId(page, i));
     424          18 :         if (lt->tupstate == SPGIST_LIVE)
     425             :         {
     426          18 :             Assert(ItemPointerIsValid(&lt->heapPtr));
     427             : 
     428          18 :             if (bds->callback(&lt->heapPtr, bds->callback_state))
     429             :             {
     430           0 :                 bds->stats->tuples_removed += 1;
     431           0 :                 toDelete[xlrec.nDelete] = i;
     432           0 :                 xlrec.nDelete++;
     433             :             }
     434             :             else
     435             :             {
     436          18 :                 bds->stats->num_index_tuples += 1;
     437             :             }
     438             :         }
     439             :         else
     440             :         {
     441             :             /* all tuples on root should be live */
     442           0 :             elog(ERROR, "unexpected SPGiST tuple state: %d",
     443             :                  lt->tupstate);
     444             :         }
     445             :     }
     446             : 
     447           4 :     if (xlrec.nDelete == 0)
     448           8 :         return;                 /* nothing more to do */
     449             : 
     450             :     /* Do the update */
     451           0 :     START_CRIT_SECTION();
     452             : 
     453             :     /* The tuple numbers are in order, so we can use PageIndexMultiDelete */
     454           0 :     PageIndexMultiDelete(page, toDelete, xlrec.nDelete);
     455             : 
     456           0 :     MarkBufferDirty(buffer);
     457             : 
     458           0 :     if (RelationNeedsWAL(index))
     459             :     {
     460             :         XLogRecPtr  recptr;
     461             : 
     462           0 :         XLogBeginInsert();
     463             : 
     464             :         /* Prepare WAL record */
     465           0 :         STORE_STATE(&bds->spgstate, xlrec.stateSrc);
     466             : 
     467           0 :         XLogRegisterData((char *) &xlrec, SizeOfSpgxlogVacuumRoot);
     468             :         /* sizeof(xlrec) should be a multiple of sizeof(OffsetNumber) */
     469           0 :         XLogRegisterData((char *) toDelete,
     470           0 :                          sizeof(OffsetNumber) * xlrec.nDelete);
     471             : 
     472           0 :         XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
     473             : 
     474           0 :         recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_ROOT);
     475             : 
     476           0 :         PageSetLSN(page, recptr);
     477             :     }
     478             : 
     479           0 :     END_CRIT_SECTION();
     480             : }
     481             : 
     482             : /*
     483             :  * Clean up redirect and placeholder tuples on the given page
     484             :  *
     485             :  * Redirect tuples can be marked placeholder once they're old enough.
     486             :  * Placeholder tuples can be removed if it won't change the offsets of
     487             :  * non-placeholder ones.
     488             :  *
     489             :  * Unlike the routines above, this works on both leaf and inner pages.
     490             :  */
     491             : static void
     492         570 : vacuumRedirectAndPlaceholder(Relation index, Buffer buffer)
     493             : {
     494         570 :     Page        page = BufferGetPage(buffer);
     495         570 :     SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
     496             :     OffsetNumber i,
     497         570 :                 max = PageGetMaxOffsetNumber(page),
     498         570 :                 firstPlaceholder = InvalidOffsetNumber;
     499         570 :     bool        hasNonPlaceholder = false;
     500         570 :     bool        hasUpdate = false;
     501             :     OffsetNumber itemToPlaceholder[MaxIndexTuplesPerPage];
     502             :     OffsetNumber itemnos[MaxIndexTuplesPerPage];
     503             :     spgxlogVacuumRedirect xlrec;
     504             : 
     505         570 :     xlrec.nToPlaceholder = 0;
     506         570 :     xlrec.newestRedirectXid = InvalidTransactionId;
     507             : 
     508         570 :     START_CRIT_SECTION();
     509             : 
     510             :     /*
     511             :      * Scan backwards to convert old redirection tuples to placeholder tuples,
     512             :      * and identify location of last non-placeholder tuple while at it.
     513             :      */
     514       23585 :     for (i = max;
     515       23008 :          i >= FirstOffsetNumber &&
     516       36208 :          (opaque->nRedirection > 0 || !hasNonPlaceholder);
     517       22445 :          i--)
     518             :     {
     519             :         SpGistDeadTuple dt;
     520             : 
     521       22445 :         dt = (SpGistDeadTuple) PageGetItem(page, PageGetItemId(page, i));
     522             : 
     523       22561 :         if (dt->tupstate == SPGIST_REDIRECT &&
     524         116 :             TransactionIdPrecedes(dt->xid, RecentGlobalXmin))
     525             :         {
     526         116 :             dt->tupstate = SPGIST_PLACEHOLDER;
     527         116 :             Assert(opaque->nRedirection > 0);
     528         116 :             opaque->nRedirection--;
     529         116 :             opaque->nPlaceholder++;
     530             : 
     531             :             /* remember newest XID among the removed redirects */
     532         116 :             if (!TransactionIdIsValid(xlrec.newestRedirectXid) ||
     533           0 :                 TransactionIdPrecedes(xlrec.newestRedirectXid, dt->xid))
     534         116 :                 xlrec.newestRedirectXid = dt->xid;
     535             : 
     536         116 :             ItemPointerSetInvalid(&dt->pointer);
     537             : 
     538         116 :             itemToPlaceholder[xlrec.nToPlaceholder] = i;
     539         116 :             xlrec.nToPlaceholder++;
     540             : 
     541         116 :             hasUpdate = true;
     542             :         }
     543             : 
     544       22445 :         if (dt->tupstate == SPGIST_PLACEHOLDER)
     545             :         {
     546       21807 :             if (!hasNonPlaceholder)
     547       21799 :                 firstPlaceholder = i;
     548             :         }
     549             :         else
     550             :         {
     551         638 :             hasNonPlaceholder = true;
     552             :         }
     553             :     }
     554             : 
     555             :     /*
     556             :      * Any placeholder tuples at the end of page can safely be removed.  We
     557             :      * can't remove ones before the last non-placeholder, though, because we
     558             :      * can't alter the offset numbers of non-placeholder tuples.
     559             :      */
     560         570 :     if (firstPlaceholder != InvalidOffsetNumber)
     561             :     {
     562             :         /*
     563             :          * We do not store this array to rdata because it's easy to recreate.
     564             :          */
     565       22088 :         for (i = firstPlaceholder; i <= max; i++)
     566       21799 :             itemnos[i - firstPlaceholder] = i;
     567             : 
     568         289 :         i = max - firstPlaceholder + 1;
     569         289 :         Assert(opaque->nPlaceholder >= i);
     570         289 :         opaque->nPlaceholder -= i;
     571             : 
     572             :         /* The array is surely sorted, so can use PageIndexMultiDelete */
     573         289 :         PageIndexMultiDelete(page, itemnos, i);
     574             : 
     575         289 :         hasUpdate = true;
     576             :     }
     577             : 
     578         570 :     xlrec.firstPlaceholder = firstPlaceholder;
     579             : 
     580         570 :     if (hasUpdate)
     581         289 :         MarkBufferDirty(buffer);
     582             : 
     583         570 :     if (hasUpdate && RelationNeedsWAL(index))
     584             :     {
     585             :         XLogRecPtr  recptr;
     586             : 
     587         289 :         XLogBeginInsert();
     588             : 
     589         289 :         XLogRegisterData((char *) &xlrec, SizeOfSpgxlogVacuumRedirect);
     590         289 :         XLogRegisterData((char *) itemToPlaceholder,
     591         289 :                          sizeof(OffsetNumber) * xlrec.nToPlaceholder);
     592             : 
     593         289 :         XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
     594             : 
     595         289 :         recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_REDIRECT);
     596             : 
     597         289 :         PageSetLSN(page, recptr);
     598             :     }
     599             : 
     600         570 :     END_CRIT_SECTION();
     601         570 : }
     602             : 
     603             : /*
     604             :  * Process one page during a bulkdelete scan
     605             :  */
     606             : static void
     607         584 : spgvacuumpage(spgBulkDeleteState *bds, BlockNumber blkno)
     608             : {
     609         584 :     Relation    index = bds->info->index;
     610             :     Buffer      buffer;
     611             :     Page        page;
     612             : 
     613             :     /* call vacuum_delay_point while not holding any buffer lock */
     614         584 :     vacuum_delay_point();
     615             : 
     616         584 :     buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
     617         584 :                                 RBM_NORMAL, bds->info->strategy);
     618         584 :     LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
     619         584 :     page = (Page) BufferGetPage(buffer);
     620             : 
     621         584 :     if (PageIsNew(page))
     622             :     {
     623             :         /*
     624             :          * We found an all-zero page, which could happen if the database
     625             :          * crashed just after extending the file.  Recycle it.
     626             :          */
     627             :     }
     628         584 :     else if (PageIsEmpty(page))
     629             :     {
     630             :         /* nothing to do */
     631             :     }
     632         574 :     else if (SpGistPageIsLeaf(page))
     633             :     {
     634         558 :         if (SpGistBlockIsRoot(blkno))
     635             :         {
     636           4 :             vacuumLeafRoot(bds, index, buffer);
     637             :             /* no need for vacuumRedirectAndPlaceholder */
     638             :         }
     639             :         else
     640             :         {
     641         554 :             vacuumLeafPage(bds, index, buffer, false);
     642         554 :             vacuumRedirectAndPlaceholder(index, buffer);
     643             :         }
     644             :     }
     645             :     else
     646             :     {
     647             :         /* inner page */
     648          16 :         vacuumRedirectAndPlaceholder(index, buffer);
     649             :     }
     650             : 
     651             :     /*
     652             :      * The root pages must never be deleted, nor marked as available in FSM,
     653             :      * because we don't want them ever returned by a search for a place to put
     654             :      * a new tuple.  Otherwise, check for empty page, and make sure the FSM
     655             :      * knows about it.
     656             :      */
     657         584 :     if (!SpGistBlockIsRoot(blkno))
     658             :     {
     659         568 :         if (PageIsNew(page) || PageIsEmpty(page))
     660             :         {
     661           5 :             RecordFreeIndexPage(index, blkno);
     662           5 :             bds->stats->pages_deleted++;
     663             :         }
     664             :         else
     665             :         {
     666         563 :             SpGistSetLastUsedPage(index, buffer);
     667         563 :             bds->lastFilledBlock = blkno;
     668             :         }
     669             :     }
     670             : 
     671         584 :     UnlockReleaseBuffer(buffer);
     672         584 : }
     673             : 
     674             : /*
     675             :  * Process the pending-TID list between pages of the main scan
     676             :  */
     677             : static void
     678           0 : spgprocesspending(spgBulkDeleteState *bds)
     679             : {
     680           0 :     Relation    index = bds->info->index;
     681             :     spgVacPendingItem *pitem;
     682             :     spgVacPendingItem *nitem;
     683             :     BlockNumber blkno;
     684             :     Buffer      buffer;
     685             :     Page        page;
     686             : 
     687           0 :     for (pitem = bds->pendingList; pitem != NULL; pitem = pitem->next)
     688             :     {
     689           0 :         if (pitem->done)
     690           0 :             continue;           /* ignore already-done items */
     691             : 
     692             :         /* call vacuum_delay_point while not holding any buffer lock */
     693           0 :         vacuum_delay_point();
     694             : 
     695             :         /* examine the referenced page */
     696           0 :         blkno = ItemPointerGetBlockNumber(&pitem->tid);
     697           0 :         buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
     698           0 :                                     RBM_NORMAL, bds->info->strategy);
     699           0 :         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
     700           0 :         page = (Page) BufferGetPage(buffer);
     701             : 
     702           0 :         if (PageIsNew(page) || SpGistPageIsDeleted(page))
     703             :         {
     704             :             /* Probably shouldn't happen, but ignore it */
     705             :         }
     706           0 :         else if (SpGistPageIsLeaf(page))
     707             :         {
     708           0 :             if (SpGistBlockIsRoot(blkno))
     709             :             {
     710             :                 /* this should definitely not happen */
     711           0 :                 elog(ERROR, "redirection leads to root page of index \"%s\"",
     712             :                      RelationGetRelationName(index));
     713             :             }
     714             : 
     715             :             /* deal with any deletable tuples */
     716           0 :             vacuumLeafPage(bds, index, buffer, true);
     717             :             /* might as well do this while we are here */
     718           0 :             vacuumRedirectAndPlaceholder(index, buffer);
     719             : 
     720           0 :             SpGistSetLastUsedPage(index, buffer);
     721             : 
     722             :             /*
     723             :              * We can mark as done not only this item, but any later ones
     724             :              * pointing at the same page, since we vacuumed the whole page.
     725             :              */
     726           0 :             pitem->done = true;
     727           0 :             for (nitem = pitem->next; nitem != NULL; nitem = nitem->next)
     728             :             {
     729           0 :                 if (ItemPointerGetBlockNumber(&nitem->tid) == blkno)
     730           0 :                     nitem->done = true;
     731             :             }
     732             :         }
     733             :         else
     734             :         {
     735             :             /*
     736             :              * On an inner page, visit the referenced inner tuple and add all
     737             :              * its downlinks to the pending list.  We might have pending items
     738             :              * for more than one inner tuple on the same page (in fact this is
     739             :              * pretty likely given the way space allocation works), so get
     740             :              * them all while we are here.
     741             :              */
     742           0 :             for (nitem = pitem; nitem != NULL; nitem = nitem->next)
     743             :             {
     744           0 :                 if (nitem->done)
     745           0 :                     continue;
     746           0 :                 if (ItemPointerGetBlockNumber(&nitem->tid) == blkno)
     747             :                 {
     748             :                     OffsetNumber offset;
     749             :                     SpGistInnerTuple innerTuple;
     750             : 
     751           0 :                     offset = ItemPointerGetOffsetNumber(&nitem->tid);
     752           0 :                     innerTuple = (SpGistInnerTuple) PageGetItem(page,
     753             :                                                                 PageGetItemId(page, offset));
     754           0 :                     if (innerTuple->tupstate == SPGIST_LIVE)
     755             :                     {
     756             :                         SpGistNodeTuple node;
     757             :                         int         i;
     758             : 
     759           0 :                         SGITITERATE(innerTuple, i, node)
     760             :                         {
     761           0 :                             if (ItemPointerIsValid(&node->t_tid))
     762           0 :                                 spgAddPendingTID(bds, &node->t_tid);
     763             :                         }
     764             :                     }
     765           0 :                     else if (innerTuple->tupstate == SPGIST_REDIRECT)
     766             :                     {
     767             :                         /* transfer attention to redirect point */
     768           0 :                         spgAddPendingTID(bds,
     769             :                                          &((SpGistDeadTuple) innerTuple)->pointer);
     770             :                     }
     771             :                     else
     772           0 :                         elog(ERROR, "unexpected SPGiST tuple state: %d",
     773             :                              innerTuple->tupstate);
     774             : 
     775           0 :                     nitem->done = true;
     776             :                 }
     777             :             }
     778             :         }
     779             : 
     780           0 :         UnlockReleaseBuffer(buffer);
     781             :     }
     782             : 
     783           0 :     spgClearPendingList(bds);
     784           0 : }
     785             : 
     786             : /*
     787             :  * Perform a bulkdelete scan
     788             :  */
     789             : static void
     790           8 : spgvacuumscan(spgBulkDeleteState *bds)
     791             : {
     792           8 :     Relation    index = bds->info->index;
     793             :     bool        needLock;
     794             :     BlockNumber num_pages,
     795             :                 blkno;
     796             : 
     797             :     /* Finish setting up spgBulkDeleteState */
     798           8 :     initSpGistState(&bds->spgstate, index);
     799           8 :     bds->pendingList = NULL;
     800           8 :     bds->myXmin = GetActiveSnapshot()->xmin;
     801           8 :     bds->lastFilledBlock = SPGIST_LAST_FIXED_BLKNO;
     802             : 
     803             :     /*
     804             :      * Reset counts that will be incremented during the scan; needed in case
     805             :      * of multiple scans during a single VACUUM command
     806             :      */
     807           8 :     bds->stats->estimated_count = false;
     808           8 :     bds->stats->num_index_tuples = 0;
     809           8 :     bds->stats->pages_deleted = 0;
     810             : 
     811             :     /* We can skip locking for new or temp relations */
     812           8 :     needLock = !RELATION_IS_LOCAL(index);
     813             : 
     814             :     /*
     815             :      * The outer loop iterates over all index pages except the metapage, in
     816             :      * physical order (we hope the kernel will cooperate in providing
     817             :      * read-ahead for speed).  It is critical that we visit all leaf pages,
     818             :      * including ones added after we start the scan, else we might fail to
     819             :      * delete some deletable tuples.  See more extensive comments about this
     820             :      * in btvacuumscan().
     821             :      */
     822           8 :     blkno = SPGIST_METAPAGE_BLKNO + 1;
     823             :     for (;;)
     824             :     {
     825             :         /* Get the current relation length */
     826          16 :         if (needLock)
     827          16 :             LockRelationForExtension(index, ExclusiveLock);
     828          16 :         num_pages = RelationGetNumberOfBlocks(index);
     829          16 :         if (needLock)
     830          16 :             UnlockRelationForExtension(index, ExclusiveLock);
     831             : 
     832             :         /* Quit if we've scanned the whole relation */
     833          16 :         if (blkno >= num_pages)
     834           8 :             break;
     835             :         /* Iterate over pages, then loop back to recheck length */
     836         592 :         for (; blkno < num_pages; blkno++)
     837             :         {
     838         584 :             spgvacuumpage(bds, blkno);
     839             :             /* empty the pending-list after each page */
     840         584 :             if (bds->pendingList != NULL)
     841           0 :                 spgprocesspending(bds);
     842             :         }
     843           8 :     }
     844             : 
     845             :     /* Propagate local lastUsedPage cache to metablock */
     846           8 :     SpGistUpdateMetaPage(index);
     847             : 
     848             :     /*
     849             :      * Truncate index if possible
     850             :      *
     851             :      * XXX disabled because it's unsafe due to possible concurrent inserts.
     852             :      * We'd have to rescan the pages to make sure they're still empty, and it
     853             :      * doesn't seem worth it.  Note that btree doesn't do this either.
     854             :      *
     855             :      * Another reason not to truncate is that it could invalidate the cached
     856             :      * pages-with-freespace pointers in the metapage and other backends'
     857             :      * relation caches, that is leave them pointing to nonexistent pages.
     858             :      * Adding RelationGetNumberOfBlocks calls to protect the places that use
     859             :      * those pointers would be unduly expensive.
     860             :      */
     861             : #ifdef NOT_USED
     862             :     if (num_pages > bds->lastFilledBlock + 1)
     863             :     {
     864             :         BlockNumber lastBlock = num_pages - 1;
     865             : 
     866             :         num_pages = bds->lastFilledBlock + 1;
     867             :         RelationTruncate(index, num_pages);
     868             :         bds->stats->pages_removed += lastBlock - bds->lastFilledBlock;
     869             :         bds->stats->pages_deleted -= lastBlock - bds->lastFilledBlock;
     870             :     }
     871             : #endif
     872             : 
     873             :     /* Report final stats */
     874           8 :     bds->stats->num_pages = num_pages;
     875           8 :     bds->stats->pages_free = bds->stats->pages_deleted;
     876           8 : }
     877             : 
     878             : /*
     879             :  * Bulk deletion of all index entries pointing to a set of heap tuples.
     880             :  * The set of target tuples is specified via a callback routine that tells
     881             :  * whether any given heap tuple (identified by ItemPointer) is being deleted.
     882             :  *
     883             :  * Result: a palloc'd struct containing statistical info for VACUUM displays.
     884             :  */
     885             : IndexBulkDeleteResult *
     886           2 : spgbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
     887             :               IndexBulkDeleteCallback callback, void *callback_state)
     888             : {
     889             :     spgBulkDeleteState bds;
     890             : 
     891             :     /* allocate stats if first time through, else re-use existing struct */
     892           2 :     if (stats == NULL)
     893           2 :         stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
     894           2 :     bds.info = info;
     895           2 :     bds.stats = stats;
     896           2 :     bds.callback = callback;
     897           2 :     bds.callback_state = callback_state;
     898             : 
     899           2 :     spgvacuumscan(&bds);
     900             : 
     901           2 :     return stats;
     902             : }
     903             : 
     904             : /* Dummy callback to delete no tuples during spgvacuumcleanup */
     905             : static bool
     906       45472 : dummy_callback(ItemPointer itemptr, void *state)
     907             : {
     908       45472 :     return false;
     909             : }
     910             : 
     911             : /*
     912             :  * Post-VACUUM cleanup.
     913             :  *
     914             :  * Result: a palloc'd struct containing statistical info for VACUUM displays.
     915             :  */
     916             : IndexBulkDeleteResult *
     917          13 : spgvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
     918             : {
     919          13 :     Relation    index = info->index;
     920             :     spgBulkDeleteState bds;
     921             : 
     922             :     /* No-op in ANALYZE ONLY mode */
     923          13 :     if (info->analyze_only)
     924           5 :         return stats;
     925             : 
     926             :     /*
     927             :      * We don't need to scan the index if there was a preceding bulkdelete
     928             :      * pass.  Otherwise, make a pass that won't delete any live tuples, but
     929             :      * might still accomplish useful stuff with redirect/placeholder cleanup,
     930             :      * and in any case will provide stats.
     931             :      */
     932           8 :     if (stats == NULL)
     933             :     {
     934           6 :         stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
     935           6 :         bds.info = info;
     936           6 :         bds.stats = stats;
     937           6 :         bds.callback = dummy_callback;
     938           6 :         bds.callback_state = NULL;
     939             : 
     940           6 :         spgvacuumscan(&bds);
     941             :     }
     942             : 
     943             :     /* Finally, vacuum the FSM */
     944           8 :     IndexFreeSpaceMapVacuum(index);
     945             : 
     946             :     /*
     947             :      * It's quite possible for us to be fooled by concurrent tuple moves into
     948             :      * double-counting some index tuples, so disbelieve any total that exceeds
     949             :      * the underlying heap's count ... if we know that accurately.  Otherwise
     950             :      * this might just make matters worse.
     951             :      */
     952           8 :     if (!info->estimated_count)
     953             :     {
     954           8 :         if (stats->num_index_tuples > info->num_heap_tuples)
     955           0 :             stats->num_index_tuples = info->num_heap_tuples;
     956             :     }
     957             : 
     958           8 :     return stats;
     959             : }

Generated by: LCOV version 1.11