LCOV - code coverage report
Current view: top level - src/backend/access/brin - brin.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 396 457 86.7 %
Date: 2017-09-29 13:40:31 Functions: 22 25 88.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  * brin.c
       3             :  *      Implementation of BRIN indexes for Postgres
       4             :  *
       5             :  * See src/backend/access/brin/README for details.
       6             :  *
       7             :  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
       8             :  * Portions Copyright (c) 1994, Regents of the University of California
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/access/brin/brin.c
      12             :  *
      13             :  * TODO
      14             :  *      * ScalarArrayOpExpr (amsearcharray -> SK_SEARCHARRAY)
      15             :  */
      16             : #include "postgres.h"
      17             : 
      18             : #include "access/brin.h"
      19             : #include "access/brin_page.h"
      20             : #include "access/brin_pageops.h"
      21             : #include "access/brin_xlog.h"
      22             : #include "access/reloptions.h"
      23             : #include "access/relscan.h"
      24             : #include "access/xloginsert.h"
      25             : #include "catalog/index.h"
      26             : #include "catalog/pg_am.h"
      27             : #include "miscadmin.h"
      28             : #include "pgstat.h"
      29             : #include "postmaster/autovacuum.h"
      30             : #include "storage/bufmgr.h"
      31             : #include "storage/freespace.h"
      32             : #include "utils/builtins.h"
      33             : #include "utils/index_selfuncs.h"
      34             : #include "utils/memutils.h"
      35             : #include "utils/rel.h"
      36             : 
      37             : 
      38             : /*
      39             :  * We use a BrinBuildState during initial construction of a BRIN index.
      40             :  * The running state is kept in a BrinMemTuple.
      41             :  */
      42             : typedef struct BrinBuildState
      43             : {
      44             :     Relation    bs_irel;
      45             :     int         bs_numtuples;
      46             :     Buffer      bs_currentInsertBuf;
      47             :     BlockNumber bs_pagesPerRange;
      48             :     BlockNumber bs_currRangeStart;
      49             :     BrinRevmap *bs_rmAccess;
      50             :     BrinDesc   *bs_bdesc;
      51             :     BrinMemTuple *bs_dtuple;
      52             : } BrinBuildState;
      53             : 
      54             : /*
      55             :  * Struct used as "opaque" during index scans
      56             :  */
      57             : typedef struct BrinOpaque
      58             : {
      59             :     BlockNumber bo_pagesPerRange;
      60             :     BrinRevmap *bo_rmAccess;
      61             :     BrinDesc   *bo_bdesc;
      62             : } BrinOpaque;
      63             : 
      64             : #define BRIN_ALL_BLOCKRANGES    InvalidBlockNumber
      65             : 
      66             : static BrinBuildState *initialize_brin_buildstate(Relation idxRel,
      67             :                            BrinRevmap *revmap, BlockNumber pagesPerRange);
      68             : static void terminate_brin_buildstate(BrinBuildState *state);
      69             : static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
      70             :               double *numSummarized, double *numExisting);
      71             : static void form_and_insert_tuple(BrinBuildState *state);
      72             : static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
      73             :              BrinTuple *b);
      74             : static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
      75             : 
      76             : 
      77             : /*
      78             :  * BRIN handler function: return IndexAmRoutine with access method parameters
      79             :  * and callbacks.
      80             :  */
      81             : Datum
      82          68 : brinhandler(PG_FUNCTION_ARGS)
      83             : {
      84          68 :     IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
      85             : 
      86          68 :     amroutine->amstrategies = 0;
      87          68 :     amroutine->amsupport = BRIN_LAST_OPTIONAL_PROCNUM;
      88          68 :     amroutine->amcanorder = false;
      89          68 :     amroutine->amcanorderbyop = false;
      90          68 :     amroutine->amcanbackward = false;
      91          68 :     amroutine->amcanunique = false;
      92          68 :     amroutine->amcanmulticol = true;
      93          68 :     amroutine->amoptionalkey = true;
      94          68 :     amroutine->amsearcharray = false;
      95          68 :     amroutine->amsearchnulls = true;
      96          68 :     amroutine->amstorage = true;
      97          68 :     amroutine->amclusterable = false;
      98          68 :     amroutine->ampredlocks = false;
      99          68 :     amroutine->amcanparallel = false;
     100          68 :     amroutine->amkeytype = InvalidOid;
     101             : 
     102          68 :     amroutine->ambuild = brinbuild;
     103          68 :     amroutine->ambuildempty = brinbuildempty;
     104          68 :     amroutine->aminsert = brininsert;
     105          68 :     amroutine->ambulkdelete = brinbulkdelete;
     106          68 :     amroutine->amvacuumcleanup = brinvacuumcleanup;
     107          68 :     amroutine->amcanreturn = NULL;
     108          68 :     amroutine->amcostestimate = brincostestimate;
     109          68 :     amroutine->amoptions = brinoptions;
     110          68 :     amroutine->amproperty = NULL;
     111          68 :     amroutine->amvalidate = brinvalidate;
     112          68 :     amroutine->ambeginscan = brinbeginscan;
     113          68 :     amroutine->amrescan = brinrescan;
     114          68 :     amroutine->amgettuple = NULL;
     115          68 :     amroutine->amgetbitmap = bringetbitmap;
     116          68 :     amroutine->amendscan = brinendscan;
     117          68 :     amroutine->ammarkpos = NULL;
     118          68 :     amroutine->amrestrpos = NULL;
     119          68 :     amroutine->amestimateparallelscan = NULL;
     120          68 :     amroutine->aminitparallelscan = NULL;
     121          68 :     amroutine->amparallelrescan = NULL;
     122             : 
     123          68 :     PG_RETURN_POINTER(amroutine);
     124             : }
     125             : 
     126             : /*
     127             :  * A tuple in the heap is being inserted.  To keep a brin index up to date,
     128             :  * we need to obtain the relevant index tuple and compare its stored values
     129             :  * with those of the new tuple.  If the tuple values are not consistent with
     130             :  * the summary tuple, we need to update the index tuple.
     131             :  *
     132             :  * If autosummarization is enabled, check if we need to summarize the previous
     133             :  * page range.
     134             :  *
     135             :  * If the range is not currently summarized (i.e. the revmap returns NULL for
     136             :  * it), there's nothing to do for this tuple.
     137             :  */
     138             : bool
     139         264 : brininsert(Relation idxRel, Datum *values, bool *nulls,
     140             :            ItemPointer heaptid, Relation heapRel,
     141             :            IndexUniqueCheck checkUnique,
     142             :            IndexInfo *indexInfo)
     143             : {
     144             :     BlockNumber pagesPerRange;
     145             :     BlockNumber origHeapBlk;
     146             :     BlockNumber heapBlk;
     147         264 :     BrinDesc   *bdesc = (BrinDesc *) indexInfo->ii_AmCache;
     148             :     BrinRevmap *revmap;
     149         264 :     Buffer      buf = InvalidBuffer;
     150         264 :     MemoryContext tupcxt = NULL;
     151         264 :     MemoryContext oldcxt = CurrentMemoryContext;
     152         264 :     bool        autosummarize = BrinGetAutoSummarize(idxRel);
     153             : 
     154         264 :     revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL);
     155             : 
     156             :     /*
     157             :      * origHeapBlk is the block number where the insertion occurred.  heapBlk
     158             :      * is the first block in the corresponding page range.
     159             :      */
     160         264 :     origHeapBlk = ItemPointerGetBlockNumber(heaptid);
     161         264 :     heapBlk = (origHeapBlk / pagesPerRange) * pagesPerRange;
     162             : 
     163             :     for (;;)
     164             :     {
     165         264 :         bool        need_insert = false;
     166             :         OffsetNumber off;
     167             :         BrinTuple  *brtup;
     168             :         BrinMemTuple *dtup;
     169             :         int         keyno;
     170             : 
     171         264 :         CHECK_FOR_INTERRUPTS();
     172             : 
     173             :         /*
     174             :          * If auto-summarization is enabled and we just inserted the first
     175             :          * tuple into the first block of a new non-first page range, request a
     176             :          * summarization run of the previous range.
     177             :          */
     178         264 :         if (autosummarize &&
     179           0 :             heapBlk > 0 &&
     180           0 :             heapBlk == origHeapBlk &&
     181           0 :             ItemPointerGetOffsetNumber(heaptid) == FirstOffsetNumber)
     182             :         {
     183           0 :             BlockNumber lastPageRange = heapBlk - 1;
     184             :             BrinTuple  *lastPageTuple;
     185             : 
     186           0 :             lastPageTuple =
     187             :                 brinGetTupleForHeapBlock(revmap, lastPageRange, &buf, &off,
     188             :                                          NULL, BUFFER_LOCK_SHARE, NULL);
     189           0 :             if (!lastPageTuple)
     190           0 :                 AutoVacuumRequestWork(AVW_BRINSummarizeRange,
     191             :                                       RelationGetRelid(idxRel),
     192             :                                       lastPageRange);
     193             :             else
     194           0 :                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     195             :         }
     196             : 
     197         264 :         brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off,
     198             :                                          NULL, BUFFER_LOCK_SHARE, NULL);
     199             : 
     200             :         /* if range is unsummarized, there's nothing to do */
     201         264 :         if (!brtup)
     202         270 :             break;
     203             : 
     204             :         /* First time through in this statement? */
     205         258 :         if (bdesc == NULL)
     206             :         {
     207          50 :             MemoryContextSwitchTo(indexInfo->ii_Context);
     208          50 :             bdesc = brin_build_desc(idxRel);
     209          50 :             indexInfo->ii_AmCache = (void *) bdesc;
     210          50 :             MemoryContextSwitchTo(oldcxt);
     211             :         }
     212             :         /* First time through in this brininsert call? */
     213         258 :         if (tupcxt == NULL)
     214             :         {
     215         258 :             tupcxt = AllocSetContextCreate(CurrentMemoryContext,
     216             :                                            "brininsert cxt",
     217             :                                            ALLOCSET_DEFAULT_SIZES);
     218         258 :             MemoryContextSwitchTo(tupcxt);
     219             :         }
     220             : 
     221         258 :         dtup = brin_deform_tuple(bdesc, brtup, NULL);
     222             : 
     223             :         /*
     224             :          * Compare the key values of the new tuple to the stored index values;
     225             :          * our deformed tuple will get updated if the new tuple doesn't fit
     226             :          * the original range (note this means we can't break out of the loop
     227             :          * early). Make a note of whether this happens, so that we know to
     228             :          * insert the modified tuple later.
     229             :          */
     230        6606 :         for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
     231             :         {
     232             :             Datum       result;
     233             :             BrinValues *bval;
     234             :             FmgrInfo   *addValue;
     235             : 
     236        6348 :             bval = &dtup->bt_columns[keyno];
     237        6348 :             addValue = index_getprocinfo(idxRel, keyno + 1,
     238             :                                          BRIN_PROCNUM_ADDVALUE);
     239       19044 :             result = FunctionCall4Coll(addValue,
     240        6348 :                                        idxRel->rd_indcollation[keyno],
     241             :                                        PointerGetDatum(bdesc),
     242             :                                        PointerGetDatum(bval),
     243        6348 :                                        values[keyno],
     244        6348 :                                        nulls[keyno]);
     245             :             /* if that returned true, we need to insert the updated tuple */
     246        6348 :             need_insert |= DatumGetBool(result);
     247             :         }
     248             : 
     249         258 :         if (!need_insert)
     250             :         {
     251             :             /*
     252             :              * The tuple is consistent with the new values, so there's nothing
     253             :              * to do.
     254             :              */
     255          47 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     256             :         }
     257             :         else
     258             :         {
     259         211 :             Page        page = BufferGetPage(buf);
     260         211 :             ItemId      lp = PageGetItemId(page, off);
     261             :             Size        origsz;
     262             :             BrinTuple  *origtup;
     263             :             Size        newsz;
     264             :             BrinTuple  *newtup;
     265             :             bool        samepage;
     266             : 
     267             :             /*
     268             :              * Make a copy of the old tuple, so that we can compare it after
     269             :              * re-acquiring the lock.
     270             :              */
     271         211 :             origsz = ItemIdGetLength(lp);
     272         211 :             origtup = brin_copy_tuple(brtup, origsz, NULL, NULL);
     273             : 
     274             :             /*
     275             :              * Before releasing the lock, check if we can attempt a same-page
     276             :              * update.  Another process could insert a tuple concurrently in
     277             :              * the same page though, so downstream we must be prepared to cope
     278             :              * if this turns out to not be possible after all.
     279             :              */
     280         211 :             newtup = brin_form_tuple(bdesc, heapBlk, dtup, &newsz);
     281         211 :             samepage = brin_can_do_samepage_update(buf, origsz, newsz);
     282         211 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     283             : 
     284             :             /*
     285             :              * Try to update the tuple.  If this doesn't work for whatever
     286             :              * reason, we need to restart from the top; the revmap might be
     287             :              * pointing at a different tuple for this block now, so we need to
     288             :              * recompute to ensure both our new heap tuple and the other
     289             :              * inserter's are covered by the combined tuple.  It might be that
     290             :              * we don't need to update at all.
     291             :              */
     292         211 :             if (!brin_doupdate(idxRel, pagesPerRange, revmap, heapBlk,
     293             :                                buf, off, origtup, origsz, newtup, newsz,
     294             :                                samepage))
     295             :             {
     296             :                 /* no luck; start over */
     297           0 :                 MemoryContextResetAndDeleteChildren(tupcxt);
     298           0 :                 continue;
     299             :             }
     300             :         }
     301             : 
     302             :         /* success! */
     303         258 :         break;
     304           0 :     }
     305             : 
     306         264 :     brinRevmapTerminate(revmap);
     307         264 :     if (BufferIsValid(buf))
     308         258 :         ReleaseBuffer(buf);
     309         264 :     MemoryContextSwitchTo(oldcxt);
     310         264 :     if (tupcxt != NULL)
     311         258 :         MemoryContextDelete(tupcxt);
     312             : 
     313         264 :     return false;
     314             : }
     315             : 
     316             : /*
     317             :  * Initialize state for a BRIN index scan.
     318             :  *
     319             :  * We read the metapage here to determine the pages-per-range number that this
     320             :  * index was built with.  Note that since this cannot be changed while we're
     321             :  * holding lock on index, it's not necessary to recompute it during brinrescan.
     322             :  */
     323             : IndexScanDesc
     324         248 : brinbeginscan(Relation r, int nkeys, int norderbys)
     325             : {
     326             :     IndexScanDesc scan;
     327             :     BrinOpaque *opaque;
     328             : 
     329         248 :     scan = RelationGetIndexScan(r, nkeys, norderbys);
     330             : 
     331         248 :     opaque = (BrinOpaque *) palloc(sizeof(BrinOpaque));
     332         248 :     opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange,
     333             :                                                scan->xs_snapshot);
     334         248 :     opaque->bo_bdesc = brin_build_desc(r);
     335         248 :     scan->opaque = opaque;
     336             : 
     337         248 :     return scan;
     338             : }
     339             : 
     340             : /*
     341             :  * Execute the index scan.
     342             :  *
     343             :  * This works by reading index TIDs from the revmap, and obtaining the index
     344             :  * tuples pointed to by them; the summary values in the index tuples are
     345             :  * compared to the scan keys.  We return into the TID bitmap all the pages in
     346             :  * ranges corresponding to index tuples that match the scan keys.
     347             :  *
     348             :  * If a TID from the revmap is read as InvalidTID, we know that range is
     349             :  * unsummarized.  Pages in those ranges need to be returned regardless of scan
     350             :  * keys.
     351             :  */
     352             : int64
     353         248 : bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
     354             : {
     355         248 :     Relation    idxRel = scan->indexRelation;
     356         248 :     Buffer      buf = InvalidBuffer;
     357             :     BrinDesc   *bdesc;
     358             :     Oid         heapOid;
     359             :     Relation    heapRel;
     360             :     BrinOpaque *opaque;
     361             :     BlockNumber nblocks;
     362             :     BlockNumber heapBlk;
     363         248 :     int         totalpages = 0;
     364             :     FmgrInfo   *consistentFn;
     365             :     MemoryContext oldcxt;
     366             :     MemoryContext perRangeCxt;
     367             :     BrinMemTuple *dtup;
     368         248 :     BrinTuple  *btup = NULL;
     369         248 :     Size        btupsz = 0;
     370             : 
     371         248 :     opaque = (BrinOpaque *) scan->opaque;
     372         248 :     bdesc = opaque->bo_bdesc;
     373         248 :     pgstat_count_index_scan(idxRel);
     374             : 
     375             :     /*
     376             :      * We need to know the size of the table so that we know how long to
     377             :      * iterate on the revmap.
     378             :      */
     379         248 :     heapOid = IndexGetRelation(RelationGetRelid(idxRel), false);
     380         248 :     heapRel = heap_open(heapOid, AccessShareLock);
     381         248 :     nblocks = RelationGetNumberOfBlocks(heapRel);
     382         248 :     heap_close(heapRel, AccessShareLock);
     383             : 
     384             :     /*
     385             :      * Make room for the consistent support procedures of indexed columns.  We
     386             :      * don't look them up here; we do that lazily the first time we see a scan
     387             :      * key reference each of them.  We rely on zeroing fn_oid to InvalidOid.
     388             :      */
     389         248 :     consistentFn = palloc0(sizeof(FmgrInfo) * bdesc->bd_tupdesc->natts);
     390             : 
     391             :     /* allocate an initial in-memory tuple, out of the per-range memcxt */
     392         248 :     dtup = brin_new_memtuple(bdesc);
     393             : 
     394             :     /*
     395             :      * Setup and use a per-range memory context, which is reset every time we
     396             :      * loop below.  This avoids having to free the tuples within the loop.
     397             :      */
     398         248 :     perRangeCxt = AllocSetContextCreate(CurrentMemoryContext,
     399             :                                         "bringetbitmap cxt",
     400             :                                         ALLOCSET_DEFAULT_SIZES);
     401         248 :     oldcxt = MemoryContextSwitchTo(perRangeCxt);
     402             : 
     403             :     /*
     404             :      * Now scan the revmap.  We start by querying for heap page 0,
     405             :      * incrementing by the number of pages per range; this gives us a full
     406             :      * view of the table.
     407             :      */
     408       25048 :     for (heapBlk = 0; heapBlk < nblocks; heapBlk += opaque->bo_pagesPerRange)
     409             :     {
     410             :         bool        addrange;
     411       24800 :         bool        gottuple = false;
     412             :         BrinTuple  *tup;
     413             :         OffsetNumber off;
     414             :         Size        size;
     415             : 
     416       24800 :         CHECK_FOR_INTERRUPTS();
     417             : 
     418       24800 :         MemoryContextResetAndDeleteChildren(perRangeCxt);
     419             : 
     420       24800 :         tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
     421             :                                        &off, &size, BUFFER_LOCK_SHARE,
     422             :                                        scan->xs_snapshot);
     423       24800 :         if (tup)
     424             :         {
     425       24800 :             gottuple = true;
     426       24800 :             btup = brin_copy_tuple(tup, size, btup, &btupsz);
     427       24800 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     428             :         }
     429             : 
     430             :         /*
     431             :          * For page ranges with no indexed tuple, we must return the whole
     432             :          * range; otherwise, compare it to the scan keys.
     433             :          */
     434       24800 :         if (!gottuple)
     435             :         {
     436           0 :             addrange = true;
     437             :         }
     438             :         else
     439             :         {
     440       24800 :             dtup = brin_deform_tuple(bdesc, btup, dtup);
     441       24800 :             if (dtup->bt_placeholder)
     442             :             {
     443             :                 /*
     444             :                  * Placeholder tuples are always returned, regardless of the
     445             :                  * values stored in them.
     446             :                  */
     447           0 :                 addrange = true;
     448             :             }
     449             :             else
     450             :             {
     451             :                 int         keyno;
     452             : 
     453             :                 /*
     454             :                  * Compare scan keys with summary values stored for the range.
     455             :                  * If scan keys are matched, the page range must be added to
     456             :                  * the bitmap.  We initially assume the range needs to be
     457             :                  * added; in particular this serves the case where there are
     458             :                  * no keys.
     459             :                  */
     460       24800 :                 addrange = true;
     461       43249 :                 for (keyno = 0; keyno < scan->numberOfKeys; keyno++)
     462             :                 {
     463       24800 :                     ScanKey     key = &scan->keyData[keyno];
     464       24800 :                     AttrNumber  keyattno = key->sk_attno;
     465       24800 :                     BrinValues *bval = &dtup->bt_columns[keyattno - 1];
     466             :                     Datum       add;
     467             : 
     468             :                     /*
     469             :                      * The collation of the scan key must match the collation
     470             :                      * used in the index column (but only if the search is not
     471             :                      * IS NULL/ IS NOT NULL).  Otherwise we shouldn't be using
     472             :                      * this index ...
     473             :                      */
     474       24800 :                     Assert((key->sk_flags & SK_ISNULL) ||
     475             :                            (key->sk_collation ==
     476             :                             TupleDescAttr(bdesc->bd_tupdesc,
     477             :                                           keyattno - 1)->attcollation));
     478             : 
     479             :                     /* First time this column? look up consistent function */
     480       24800 :                     if (consistentFn[keyattno - 1].fn_oid == InvalidOid)
     481             :                     {
     482             :                         FmgrInfo   *tmp;
     483             : 
     484         248 :                         tmp = index_getprocinfo(idxRel, keyattno,
     485             :                                                 BRIN_PROCNUM_CONSISTENT);
     486         248 :                         fmgr_info_copy(&consistentFn[keyattno - 1], tmp,
     487             :                                        CurrentMemoryContext);
     488             :                     }
     489             : 
     490             :                     /*
     491             :                      * Check whether the scan key is consistent with the page
     492             :                      * range values; if so, have the pages in the range added
     493             :                      * to the output bitmap.
     494             :                      *
     495             :                      * When there are multiple scan keys, failure to meet the
     496             :                      * criteria for a single one of them is enough to discard
     497             :                      * the range as a whole, so break out of the loop as soon
     498             :                      * as a false return value is obtained.
     499             :                      */
     500       24800 :                     add = FunctionCall3Coll(&consistentFn[keyattno - 1],
     501             :                                             key->sk_collation,
     502             :                                             PointerGetDatum(bdesc),
     503             :                                             PointerGetDatum(bval),
     504             :                                             PointerGetDatum(key));
     505       24800 :                     addrange = DatumGetBool(add);
     506       24800 :                     if (!addrange)
     507        6351 :                         break;
     508             :                 }
     509             :             }
     510             :         }
     511             : 
     512             :         /* add the pages in the range to the output bitmap, if needed */
     513       24800 :         if (addrange)
     514             :         {
     515             :             BlockNumber pageno;
     516             : 
     517       55347 :             for (pageno = heapBlk;
     518       36898 :                  pageno <= heapBlk + opaque->bo_pagesPerRange - 1;
     519       18449 :                  pageno++)
     520             :             {
     521       18449 :                 MemoryContextSwitchTo(oldcxt);
     522       18449 :                 tbm_add_page(tbm, pageno);
     523       18449 :                 totalpages++;
     524       18449 :                 MemoryContextSwitchTo(perRangeCxt);
     525             :             }
     526             :         }
     527             :     }
     528             : 
     529         248 :     MemoryContextSwitchTo(oldcxt);
     530         248 :     MemoryContextDelete(perRangeCxt);
     531             : 
     532         248 :     if (buf != InvalidBuffer)
     533         248 :         ReleaseBuffer(buf);
     534             : 
     535             :     /*
     536             :      * XXX We have an approximation of the number of *pages* that our scan
     537             :      * returns, but we don't have a precise idea of the number of heap tuples
     538             :      * involved.
     539             :      */
     540         248 :     return totalpages * 10;
     541             : }
     542             : 
     543             : /*
     544             :  * Re-initialize state for a BRIN index scan
     545             :  */
     546             : void
     547         248 : brinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
     548             :            ScanKey orderbys, int norderbys)
     549             : {
     550             :     /*
     551             :      * Other index AMs preprocess the scan keys at this point, or sometime
     552             :      * early during the scan; this lets them optimize by removing redundant
     553             :      * keys, or doing early returns when they are impossible to satisfy; see
     554             :      * _bt_preprocess_keys for an example.  Something like that could be added
     555             :      * here someday, too.
     556             :      */
     557             : 
     558         248 :     if (scankey && scan->numberOfKeys > 0)
     559         248 :         memmove(scan->keyData, scankey,
     560         248 :                 scan->numberOfKeys * sizeof(ScanKeyData));
     561         248 : }
     562             : 
     563             : /*
     564             :  * Close down a BRIN index scan
     565             :  */
     566             : void
     567         248 : brinendscan(IndexScanDesc scan)
     568             : {
     569         248 :     BrinOpaque *opaque = (BrinOpaque *) scan->opaque;
     570             : 
     571         248 :     brinRevmapTerminate(opaque->bo_rmAccess);
     572         248 :     brin_free_desc(opaque->bo_bdesc);
     573         248 :     pfree(opaque);
     574         248 : }
     575             : 
     576             : /*
     577             :  * Per-heap-tuple callback for IndexBuildHeapScan.
     578             :  *
     579             :  * Note we don't worry about the page range at the end of the table here; it is
     580             :  * present in the build state struct after we're called the last time, but not
     581             :  * inserted into the index.  Caller must ensure to do so, if appropriate.
     582             :  */
     583             : static void
     584       20137 : brinbuildCallback(Relation index,
     585             :                   HeapTuple htup,
     586             :                   Datum *values,
     587             :                   bool *isnull,
     588             :                   bool tupleIsAlive,
     589             :                   void *brstate)
     590             : {
     591       20137 :     BrinBuildState *state = (BrinBuildState *) brstate;
     592             :     BlockNumber thisblock;
     593             :     int         i;
     594             : 
     595       20137 :     thisblock = ItemPointerGetBlockNumber(&htup->t_self);
     596             : 
     597             :     /*
     598             :      * If we're in a block that belongs to a future range, summarize what
     599             :      * we've got and start afresh.  Note the scan might have skipped many
     600             :      * pages, if they were devoid of live tuples; make sure to insert index
     601             :      * tuples for those too.
     602             :      */
     603       40417 :     while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
     604             :     {
     605             : 
     606             :         BRIN_elog((DEBUG2,
     607             :                    "brinbuildCallback: completed a range: %u--%u",
     608             :                    state->bs_currRangeStart,
     609             :                    state->bs_currRangeStart + state->bs_pagesPerRange));
     610             : 
     611             :         /* create the index tuple and insert it */
     612         143 :         form_and_insert_tuple(state);
     613             : 
     614             :         /* set state to correspond to the next range */
     615         143 :         state->bs_currRangeStart += state->bs_pagesPerRange;
     616             : 
     617             :         /* re-initialize state for it */
     618         143 :         brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
     619             :     }
     620             : 
     621             :     /* Accumulate the current tuple into the running state */
     622       44218 :     for (i = 0; i < state->bs_bdesc->bd_tupdesc->natts; i++)
     623             :     {
     624             :         FmgrInfo   *addValue;
     625             :         BrinValues *col;
     626       24081 :         Form_pg_attribute attr = TupleDescAttr(state->bs_bdesc->bd_tupdesc, i);
     627             : 
     628       24081 :         col = &state->bs_dtuple->bt_columns[i];
     629       24081 :         addValue = index_getprocinfo(index, i + 1,
     630             :                                      BRIN_PROCNUM_ADDVALUE);
     631             : 
     632             :         /*
     633             :          * Update dtuple state, if and as necessary.
     634             :          */
     635       72243 :         FunctionCall4Coll(addValue,
     636             :                           attr->attcollation,
     637       24081 :                           PointerGetDatum(state->bs_bdesc),
     638             :                           PointerGetDatum(col),
     639       48162 :                           values[i], isnull[i]);
     640             :     }
     641       20137 : }
     642             : 
     643             : /*
     644             :  * brinbuild() -- build a new BRIN index.
     645             :  */
     646             : IndexBuildResult *
     647           4 : brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
     648             : {
     649             :     IndexBuildResult *result;
     650             :     double      reltuples;
     651             :     double      idxtuples;
     652             :     BrinRevmap *revmap;
     653             :     BrinBuildState *state;
     654             :     Buffer      meta;
     655             :     BlockNumber pagesPerRange;
     656             : 
     657             :     /*
     658             :      * We expect to be called exactly once for any index relation.
     659             :      */
     660           4 :     if (RelationGetNumberOfBlocks(index) != 0)
     661           0 :         elog(ERROR, "index \"%s\" already contains data",
     662             :              RelationGetRelationName(index));
     663             : 
     664             :     /*
     665             :      * Critical section not required, because on error the creation of the
     666             :      * whole relation will be rolled back.
     667             :      */
     668             : 
     669           4 :     meta = ReadBuffer(index, P_NEW);
     670           4 :     Assert(BufferGetBlockNumber(meta) == BRIN_METAPAGE_BLKNO);
     671           4 :     LockBuffer(meta, BUFFER_LOCK_EXCLUSIVE);
     672             : 
     673           4 :     brin_metapage_init(BufferGetPage(meta), BrinGetPagesPerRange(index),
     674             :                        BRIN_CURRENT_VERSION);
     675           4 :     MarkBufferDirty(meta);
     676             : 
     677           4 :     if (RelationNeedsWAL(index))
     678             :     {
     679             :         xl_brin_createidx xlrec;
     680             :         XLogRecPtr  recptr;
     681             :         Page        page;
     682             : 
     683           4 :         xlrec.version = BRIN_CURRENT_VERSION;
     684           4 :         xlrec.pagesPerRange = BrinGetPagesPerRange(index);
     685             : 
     686           4 :         XLogBeginInsert();
     687           4 :         XLogRegisterData((char *) &xlrec, SizeOfBrinCreateIdx);
     688           4 :         XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT);
     689             : 
     690           4 :         recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX);
     691             : 
     692           4 :         page = BufferGetPage(meta);
     693           4 :         PageSetLSN(page, recptr);
     694             :     }
     695             : 
     696           4 :     UnlockReleaseBuffer(meta);
     697             : 
     698             :     /*
     699             :      * Initialize our state, including the deformed tuple state.
     700             :      */
     701           4 :     revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
     702           4 :     state = initialize_brin_buildstate(index, revmap, pagesPerRange);
     703             : 
     704             :     /*
     705             :      * Now scan the relation.  No syncscan allowed here because we want the
     706             :      * heap blocks in physical order.
     707             :      */
     708           4 :     reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
     709             :                                    brinbuildCallback, (void *) state);
     710             : 
     711             :     /* process the final batch */
     712           4 :     form_and_insert_tuple(state);
     713             : 
     714             :     /* release resources */
     715           4 :     idxtuples = state->bs_numtuples;
     716           4 :     brinRevmapTerminate(state->bs_rmAccess);
     717           4 :     terminate_brin_buildstate(state);
     718             : 
     719             :     /*
     720             :      * Return statistics
     721             :      */
     722           4 :     result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
     723             : 
     724           4 :     result->heap_tuples = reltuples;
     725           4 :     result->index_tuples = idxtuples;
     726             : 
     727           4 :     return result;
     728             : }
     729             : 
     730             : void
     731           0 : brinbuildempty(Relation index)
     732             : {
     733             :     Buffer      metabuf;
     734             : 
     735             :     /* An empty BRIN index has a metapage only. */
     736           0 :     metabuf =
     737             :         ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
     738           0 :     LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
     739             : 
     740             :     /* Initialize and xlog metabuffer. */
     741           0 :     START_CRIT_SECTION();
     742           0 :     brin_metapage_init(BufferGetPage(metabuf), BrinGetPagesPerRange(index),
     743             :                        BRIN_CURRENT_VERSION);
     744           0 :     MarkBufferDirty(metabuf);
     745           0 :     log_newpage_buffer(metabuf, false);
     746           0 :     END_CRIT_SECTION();
     747             : 
     748           0 :     UnlockReleaseBuffer(metabuf);
     749           0 : }
     750             : 
     751             : /*
     752             :  * brinbulkdelete
     753             :  *      Since there are no per-heap-tuple index tuples in BRIN indexes,
     754             :  *      there's not a lot we can do here.
     755             :  *
     756             :  * XXX we could mark item tuples as "dirty" (when a minimum or maximum heap
     757             :  * tuple is deleted), meaning the need to re-run summarization on the affected
     758             :  * range.  Would need to add an extra flag in brintuples for that.
     759             :  */
     760             : IndexBulkDeleteResult *
     761           0 : brinbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
     762             :                IndexBulkDeleteCallback callback, void *callback_state)
     763             : {
     764             :     /* allocate stats if first time through, else re-use existing struct */
     765           0 :     if (stats == NULL)
     766           0 :         stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
     767             : 
     768           0 :     return stats;
     769             : }
     770             : 
     771             : /*
     772             :  * This routine is in charge of "vacuuming" a BRIN index: we just summarize
     773             :  * ranges that are currently unsummarized.
     774             :  */
     775             : IndexBulkDeleteResult *
     776           3 : brinvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
     777             : {
     778             :     Relation    heapRel;
     779             : 
     780             :     /* No-op in ANALYZE ONLY mode */
     781           3 :     if (info->analyze_only)
     782           0 :         return stats;
     783             : 
     784           3 :     if (!stats)
     785           3 :         stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
     786           3 :     stats->num_pages = RelationGetNumberOfBlocks(info->index);
     787             :     /* rest of stats is initialized by zeroing */
     788             : 
     789           3 :     heapRel = heap_open(IndexGetRelation(RelationGetRelid(info->index), false),
     790             :                         AccessShareLock);
     791             : 
     792           3 :     brin_vacuum_scan(info->index, info->strategy);
     793             : 
     794           3 :     brinsummarize(info->index, heapRel, BRIN_ALL_BLOCKRANGES,
     795             :                   &stats->num_index_tuples, &stats->num_index_tuples);
     796             : 
     797           3 :     heap_close(heapRel, AccessShareLock);
     798             : 
     799           3 :     return stats;
     800             : }
     801             : 
     802             : /*
     803             :  * reloptions processor for BRIN indexes
     804             :  */
     805             : bytea *
     806          17 : brinoptions(Datum reloptions, bool validate)
     807             : {
     808             :     relopt_value *options;
     809             :     BrinOptions *rdopts;
     810             :     int         numoptions;
     811             :     static const relopt_parse_elt tab[] = {
     812             :         {"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, pagesPerRange)},
     813             :         {"autosummarize", RELOPT_TYPE_BOOL, offsetof(BrinOptions, autosummarize)}
     814             :     };
     815             : 
     816          17 :     options = parseRelOptions(reloptions, validate, RELOPT_KIND_BRIN,
     817             :                               &numoptions);
     818             : 
     819             :     /* if none set, we're done */
     820          17 :     if (numoptions == 0)
     821           0 :         return NULL;
     822             : 
     823          17 :     rdopts = allocateReloptStruct(sizeof(BrinOptions), options, numoptions);
     824             : 
     825          17 :     fillRelOptions((void *) rdopts, sizeof(BrinOptions), options, numoptions,
     826             :                    validate, tab, lengthof(tab));
     827             : 
     828          17 :     pfree(options);
     829             : 
     830          17 :     return (bytea *) rdopts;
     831             : }
     832             : 
     833             : /*
     834             :  * SQL-callable function to scan through an index and summarize all ranges
     835             :  * that are not currently summarized.
     836             :  */
     837             : Datum
     838           3 : brin_summarize_new_values(PG_FUNCTION_ARGS)
     839             : {
     840           3 :     Datum       relation = PG_GETARG_DATUM(0);
     841             : 
     842           3 :     return DirectFunctionCall2(brin_summarize_range,
     843             :                                relation,
     844             :                                Int64GetDatum((int64) BRIN_ALL_BLOCKRANGES));
     845             : }
     846             : 
     847             : /*
     848             :  * SQL-callable function to summarize the indicated page range, if not already
     849             :  * summarized.  If the second argument is BRIN_ALL_BLOCKRANGES, all
     850             :  * unsummarized ranges are summarized.
     851             :  */
     852             : Datum
     853           9 : brin_summarize_range(PG_FUNCTION_ARGS)
     854             : {
     855           9 :     Oid         indexoid = PG_GETARG_OID(0);
     856           9 :     int64       heapBlk64 = PG_GETARG_INT64(1);
     857             :     BlockNumber heapBlk;
     858             :     Oid         heapoid;
     859             :     Relation    indexRel;
     860             :     Relation    heapRel;
     861           9 :     double      numSummarized = 0;
     862             : 
     863           9 :     if (heapBlk64 > BRIN_ALL_BLOCKRANGES || heapBlk64 < 0)
     864             :     {
     865           2 :         char       *blk = psprintf(INT64_FORMAT, heapBlk64);
     866             : 
     867           2 :         ereport(ERROR,
     868             :                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
     869             :                  errmsg("block number out of range: %s", blk)));
     870             :     }
     871           7 :     heapBlk = (BlockNumber) heapBlk64;
     872             : 
     873             :     /*
     874             :      * We must lock table before index to avoid deadlocks.  However, if the
     875             :      * passed indexoid isn't an index then IndexGetRelation() will fail.
     876             :      * Rather than emitting a not-very-helpful error message, postpone
     877             :      * complaining, expecting that the is-it-an-index test below will fail.
     878             :      */
     879           7 :     heapoid = IndexGetRelation(indexoid, true);
     880           7 :     if (OidIsValid(heapoid))
     881           6 :         heapRel = heap_open(heapoid, ShareUpdateExclusiveLock);
     882             :     else
     883           1 :         heapRel = NULL;
     884             : 
     885           7 :     indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
     886             : 
     887             :     /* Must be a BRIN index */
     888          12 :     if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
     889           6 :         indexRel->rd_rel->relam != BRIN_AM_OID)
     890           1 :         ereport(ERROR,
     891             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     892             :                  errmsg("\"%s\" is not a BRIN index",
     893             :                         RelationGetRelationName(indexRel))));
     894             : 
     895             :     /* User must own the index (comparable to privileges needed for VACUUM) */
     896           5 :     if (!pg_class_ownercheck(indexoid, GetUserId()))
     897           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
     898           0 :                        RelationGetRelationName(indexRel));
     899             : 
     900             :     /*
     901             :      * Since we did the IndexGetRelation call above without any lock, it's
     902             :      * barely possible that a race against an index drop/recreation could have
     903             :      * netted us the wrong table.  Recheck.
     904             :      */
     905           5 :     if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
     906           0 :         ereport(ERROR,
     907             :                 (errcode(ERRCODE_UNDEFINED_TABLE),
     908             :                  errmsg("could not open parent table of index %s",
     909             :                         RelationGetRelationName(indexRel))));
     910             : 
     911             :     /* OK, do it */
     912           5 :     brinsummarize(indexRel, heapRel, heapBlk, &numSummarized, NULL);
     913             : 
     914           5 :     relation_close(indexRel, ShareUpdateExclusiveLock);
     915           5 :     relation_close(heapRel, ShareUpdateExclusiveLock);
     916             : 
     917           5 :     PG_RETURN_INT32((int32) numSummarized);
     918             : }
     919             : 
     920             : /*
     921             :  * SQL-callable interface to mark a range as no longer summarized
     922             :  */
     923             : Datum
     924           5 : brin_desummarize_range(PG_FUNCTION_ARGS)
     925             : {
     926           5 :     Oid         indexoid = PG_GETARG_OID(0);
     927           5 :     int64       heapBlk64 = PG_GETARG_INT64(1);
     928             :     BlockNumber heapBlk;
     929             :     Oid         heapoid;
     930             :     Relation    heapRel;
     931             :     Relation    indexRel;
     932             :     bool        done;
     933             : 
     934           5 :     if (heapBlk64 > MaxBlockNumber || heapBlk64 < 0)
     935             :     {
     936           1 :         char       *blk = psprintf(INT64_FORMAT, heapBlk64);
     937             : 
     938           1 :         ereport(ERROR,
     939             :                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
     940             :                  errmsg("block number out of range: %s", blk)));
     941             :     }
     942           4 :     heapBlk = (BlockNumber) heapBlk64;
     943             : 
     944             :     /*
     945             :      * We must lock table before index to avoid deadlocks.  However, if the
     946             :      * passed indexoid isn't an index then IndexGetRelation() will fail.
     947             :      * Rather than emitting a not-very-helpful error message, postpone
     948             :      * complaining, expecting that the is-it-an-index test below will fail.
     949             :      */
     950           4 :     heapoid = IndexGetRelation(indexoid, true);
     951           4 :     if (OidIsValid(heapoid))
     952           4 :         heapRel = heap_open(heapoid, ShareUpdateExclusiveLock);
     953             :     else
     954           0 :         heapRel = NULL;
     955             : 
     956           4 :     indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
     957             : 
     958             :     /* Must be a BRIN index */
     959           8 :     if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
     960           4 :         indexRel->rd_rel->relam != BRIN_AM_OID)
     961           0 :         ereport(ERROR,
     962             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     963             :                  errmsg("\"%s\" is not a BRIN index",
     964             :                         RelationGetRelationName(indexRel))));
     965             : 
     966             :     /* User must own the index (comparable to privileges needed for VACUUM) */
     967           4 :     if (!pg_class_ownercheck(indexoid, GetUserId()))
     968           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
     969           0 :                        RelationGetRelationName(indexRel));
     970             : 
     971             :     /*
     972             :      * Since we did the IndexGetRelation call above without any lock, it's
     973             :      * barely possible that a race against an index drop/recreation could have
     974             :      * netted us the wrong table.  Recheck.
     975             :      */
     976           4 :     if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
     977           0 :         ereport(ERROR,
     978             :                 (errcode(ERRCODE_UNDEFINED_TABLE),
     979             :                  errmsg("could not open parent table of index %s",
     980             :                         RelationGetRelationName(indexRel))));
     981             : 
     982             :     /* the revmap does the hard work */
     983             :     do
     984             :     {
     985           4 :         done = brinRevmapDesummarizeRange(indexRel, heapBlk);
     986             :     }
     987           4 :     while (!done);
     988             : 
     989           4 :     relation_close(indexRel, ShareUpdateExclusiveLock);
     990           4 :     relation_close(heapRel, ShareUpdateExclusiveLock);
     991             : 
     992           4 :     PG_RETURN_VOID();
     993             : }
     994             : 
     995             : /*
     996             :  * Build a BrinDesc used to create or scan a BRIN index
     997             :  */
     998             : BrinDesc *
     999         304 : brin_build_desc(Relation rel)
    1000             : {
    1001             :     BrinOpcInfo **opcinfo;
    1002             :     BrinDesc   *bdesc;
    1003             :     TupleDesc   tupdesc;
    1004         304 :     int         totalstored = 0;
    1005             :     int         keyno;
    1006             :     long        totalsize;
    1007             :     MemoryContext cxt;
    1008             :     MemoryContext oldcxt;
    1009             : 
    1010         304 :     cxt = AllocSetContextCreate(CurrentMemoryContext,
    1011             :                                 "brin desc cxt",
    1012             :                                 ALLOCSET_SMALL_SIZES);
    1013         304 :     oldcxt = MemoryContextSwitchTo(cxt);
    1014         304 :     tupdesc = RelationGetDescr(rel);
    1015             : 
    1016             :     /*
    1017             :      * Obtain BrinOpcInfo for each indexed column.  While at it, accumulate
    1018             :      * the number of columns stored, since the number is opclass-defined.
    1019             :      */
    1020         304 :     opcinfo = (BrinOpcInfo **) palloc(sizeof(BrinOpcInfo *) * tupdesc->natts);
    1021        7916 :     for (keyno = 0; keyno < tupdesc->natts; keyno++)
    1022             :     {
    1023             :         FmgrInfo   *opcInfoFn;
    1024        7612 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, keyno);
    1025             : 
    1026        7612 :         opcInfoFn = index_getprocinfo(rel, keyno + 1, BRIN_PROCNUM_OPCINFO);
    1027             : 
    1028       15224 :         opcinfo[keyno] = (BrinOpcInfo *)
    1029        7612 :             DatumGetPointer(FunctionCall1(opcInfoFn, attr->atttypid));
    1030        7612 :         totalstored += opcinfo[keyno]->oi_nstored;
    1031             :     }
    1032             : 
    1033             :     /* Allocate our result struct and fill it in */
    1034         304 :     totalsize = offsetof(BrinDesc, bd_info) +
    1035         304 :         sizeof(BrinOpcInfo *) * tupdesc->natts;
    1036             : 
    1037         304 :     bdesc = palloc(totalsize);
    1038         304 :     bdesc->bd_context = cxt;
    1039         304 :     bdesc->bd_index = rel;
    1040         304 :     bdesc->bd_tupdesc = tupdesc;
    1041         304 :     bdesc->bd_disktdesc = NULL; /* generated lazily */
    1042         304 :     bdesc->bd_totalstored = totalstored;
    1043             : 
    1044        7916 :     for (keyno = 0; keyno < tupdesc->natts; keyno++)
    1045        7612 :         bdesc->bd_info[keyno] = opcinfo[keyno];
    1046         304 :     pfree(opcinfo);
    1047             : 
    1048         304 :     MemoryContextSwitchTo(oldcxt);
    1049             : 
    1050         304 :     return bdesc;
    1051             : }
    1052             : 
    1053             : void
    1054         254 : brin_free_desc(BrinDesc *bdesc)
    1055             : {
    1056             :     /* make sure the tupdesc is still valid */
    1057         254 :     Assert(bdesc->bd_tupdesc->tdrefcount >= 1);
    1058             :     /* no need for retail pfree */
    1059         254 :     MemoryContextDelete(bdesc->bd_context);
    1060         254 : }
    1061             : 
    1062             : /*
    1063             :  * Fetch index's statistical data into *stats
    1064             :  */
    1065             : void
    1066         995 : brinGetStats(Relation index, BrinStatsData *stats)
    1067             : {
    1068             :     Buffer      metabuffer;
    1069             :     Page        metapage;
    1070             :     BrinMetaPageData *metadata;
    1071             : 
    1072         995 :     metabuffer = ReadBuffer(index, BRIN_METAPAGE_BLKNO);
    1073         995 :     LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
    1074         995 :     metapage = BufferGetPage(metabuffer);
    1075         995 :     metadata = (BrinMetaPageData *) PageGetContents(metapage);
    1076             : 
    1077         995 :     stats->pagesPerRange = metadata->pagesPerRange;
    1078         995 :     stats->revmapNumPages = metadata->lastRevmapPage - 1;
    1079             : 
    1080         995 :     UnlockReleaseBuffer(metabuffer);
    1081         995 : }
    1082             : 
    1083             : /*
    1084             :  * Initialize a BrinBuildState appropriate to create tuples on the given index.
    1085             :  */
    1086             : static BrinBuildState *
    1087           6 : initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
    1088             :                            BlockNumber pagesPerRange)
    1089             : {
    1090             :     BrinBuildState *state;
    1091             : 
    1092           6 :     state = palloc(sizeof(BrinBuildState));
    1093             : 
    1094           6 :     state->bs_irel = idxRel;
    1095           6 :     state->bs_numtuples = 0;
    1096           6 :     state->bs_currentInsertBuf = InvalidBuffer;
    1097           6 :     state->bs_pagesPerRange = pagesPerRange;
    1098           6 :     state->bs_currRangeStart = 0;
    1099           6 :     state->bs_rmAccess = revmap;
    1100           6 :     state->bs_bdesc = brin_build_desc(idxRel);
    1101           6 :     state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
    1102             : 
    1103           6 :     brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
    1104             : 
    1105           6 :     return state;
    1106             : }
    1107             : 
    1108             : /*
    1109             :  * Release resources associated with a BrinBuildState.
    1110             :  */
    1111             : static void
    1112           6 : terminate_brin_buildstate(BrinBuildState *state)
    1113             : {
    1114             :     /* release the last index buffer used */
    1115           6 :     if (!BufferIsInvalid(state->bs_currentInsertBuf))
    1116             :     {
    1117             :         Page        page;
    1118             : 
    1119           4 :         page = BufferGetPage(state->bs_currentInsertBuf);
    1120           4 :         RecordPageWithFreeSpace(state->bs_irel,
    1121             :                                 BufferGetBlockNumber(state->bs_currentInsertBuf),
    1122             :                                 PageGetFreeSpace(page));
    1123           4 :         ReleaseBuffer(state->bs_currentInsertBuf);
    1124             :     }
    1125             : 
    1126           6 :     brin_free_desc(state->bs_bdesc);
    1127           6 :     pfree(state->bs_dtuple);
    1128           6 :     pfree(state);
    1129           6 : }
    1130             : 
    1131             : /*
    1132             :  * Summarize the given page range of the given index.
    1133             :  *
    1134             :  * This routine can run in parallel with insertions into the heap.  To avoid
    1135             :  * missing those values from the summary tuple, we first insert a placeholder
    1136             :  * index tuple into the index, then execute the heap scan; transactions
    1137             :  * concurrent with the scan update the placeholder tuple.  After the scan, we
    1138             :  * union the placeholder tuple with the one computed by this routine.  The
    1139             :  * update of the index value happens in a loop, so that if somebody updates
    1140             :  * the placeholder tuple after we read it, we detect the case and try again.
    1141             :  * This ensures that the concurrently inserted tuples are not lost.
    1142             :  */
    1143             : static void
    1144           7 : summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
    1145             :                 BlockNumber heapBlk, BlockNumber heapNumBlks)
    1146             : {
    1147             :     Buffer      phbuf;
    1148             :     BrinTuple  *phtup;
    1149             :     Size        phsz;
    1150             :     OffsetNumber offset;
    1151             :     BlockNumber scanNumBlks;
    1152             : 
    1153             :     /*
    1154             :      * Insert the placeholder tuple
    1155             :      */
    1156           7 :     phbuf = InvalidBuffer;
    1157           7 :     phtup = brin_form_placeholder_tuple(state->bs_bdesc, heapBlk, &phsz);
    1158           7 :     offset = brin_doinsert(state->bs_irel, state->bs_pagesPerRange,
    1159             :                            state->bs_rmAccess, &phbuf,
    1160             :                            heapBlk, phtup, phsz);
    1161             : 
    1162             :     /*
    1163             :      * Execute the partial heap scan covering the heap blocks in the specified
    1164             :      * page range, summarizing the heap tuples in it.  This scan stops just
    1165             :      * short of brinbuildCallback creating the new index entry.
    1166             :      *
    1167             :      * Note that it is critical we use the "any visible" mode of
    1168             :      * IndexBuildHeapRangeScan here: otherwise, we would miss tuples inserted
    1169             :      * by transactions that are still in progress, among other corner cases.
    1170             :      */
    1171           7 :     state->bs_currRangeStart = heapBlk;
    1172          14 :     scanNumBlks = heapBlk + state->bs_pagesPerRange <= heapNumBlks ?
    1173           7 :         state->bs_pagesPerRange : heapNumBlks - heapBlk;
    1174           7 :     IndexBuildHeapRangeScan(heapRel, state->bs_irel, indexInfo, false, true,
    1175             :                             heapBlk, scanNumBlks,
    1176             :                             brinbuildCallback, (void *) state);
    1177             : 
    1178             :     /*
    1179             :      * Now we update the values obtained by the scan with the placeholder
    1180             :      * tuple.  We do this in a loop which only terminates if we're able to
    1181             :      * update the placeholder tuple successfully; if we are not, this means
    1182             :      * somebody else modified the placeholder tuple after we read it.
    1183             :      */
    1184             :     for (;;)
    1185             :     {
    1186             :         BrinTuple  *newtup;
    1187             :         Size        newsize;
    1188             :         bool        didupdate;
    1189             :         bool        samepage;
    1190             : 
    1191           7 :         CHECK_FOR_INTERRUPTS();
    1192             : 
    1193             :         /*
    1194             :          * Update the summary tuple and try to update.
    1195             :          */
    1196           7 :         newtup = brin_form_tuple(state->bs_bdesc,
    1197             :                                  heapBlk, state->bs_dtuple, &newsize);
    1198           7 :         samepage = brin_can_do_samepage_update(phbuf, phsz, newsize);
    1199           7 :         didupdate =
    1200           7 :             brin_doupdate(state->bs_irel, state->bs_pagesPerRange,
    1201             :                           state->bs_rmAccess, heapBlk, phbuf, offset,
    1202             :                           phtup, phsz, newtup, newsize, samepage);
    1203           7 :         brin_free_tuple(phtup);
    1204           7 :         brin_free_tuple(newtup);
    1205             : 
    1206             :         /* If the update succeeded, we're done. */
    1207           7 :         if (didupdate)
    1208           7 :             break;
    1209             : 
    1210             :         /*
    1211             :          * If the update didn't work, it might be because somebody updated the
    1212             :          * placeholder tuple concurrently.  Extract the new version, union it
    1213             :          * with the values we have from the scan, and start over.  (There are
    1214             :          * other reasons for the update to fail, but it's simple to treat them
    1215             :          * the same.)
    1216             :          */
    1217           0 :         phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
    1218             :                                          &offset, &phsz, BUFFER_LOCK_SHARE,
    1219             :                                          NULL);
    1220             :         /* the placeholder tuple must exist */
    1221           0 :         if (phtup == NULL)
    1222           0 :             elog(ERROR, "missing placeholder tuple");
    1223           0 :         phtup = brin_copy_tuple(phtup, phsz, NULL, NULL);
    1224           0 :         LockBuffer(phbuf, BUFFER_LOCK_UNLOCK);
    1225             : 
    1226             :         /* merge it into the tuple from the heap scan */
    1227           0 :         union_tuples(state->bs_bdesc, state->bs_dtuple, phtup);
    1228           0 :     }
    1229             : 
    1230           7 :     ReleaseBuffer(phbuf);
    1231           7 : }
    1232             : 
    1233             : /*
    1234             :  * Summarize page ranges that are not already summarized.  If pageRange is
    1235             :  * BRIN_ALL_BLOCKRANGES then the whole table is scanned; otherwise, only the
    1236             :  * page range containing the given heap page number is scanned.
    1237             :  *
    1238             :  * For each new index tuple inserted, *numSummarized (if not NULL) is
    1239             :  * incremented; for each existing tuple, *numExisting (if not NULL) is
    1240             :  * incremented.
    1241             :  */
    1242             : static void
    1243           8 : brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
    1244             :               double *numSummarized, double *numExisting)
    1245             : {
    1246             :     BrinRevmap *revmap;
    1247           8 :     BrinBuildState *state = NULL;
    1248           8 :     IndexInfo  *indexInfo = NULL;
    1249             :     BlockNumber heapNumBlocks;
    1250             :     BlockNumber heapBlk;
    1251             :     BlockNumber pagesPerRange;
    1252             :     Buffer      buf;
    1253             :     BlockNumber startBlk;
    1254             :     BlockNumber endBlk;
    1255             : 
    1256             :     /* determine range of pages to process; nothing to do for an empty table */
    1257           8 :     heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
    1258           8 :     if (heapNumBlocks == 0)
    1259           0 :         return;
    1260             : 
    1261           8 :     revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
    1262             : 
    1263           8 :     if (pageRange == BRIN_ALL_BLOCKRANGES)
    1264             :     {
    1265           5 :         startBlk = 0;
    1266           5 :         endBlk = heapNumBlocks;
    1267             :     }
    1268             :     else
    1269             :     {
    1270           3 :         startBlk = (pageRange / pagesPerRange) * pagesPerRange;
    1271             :         /* Nothing to do if start point is beyond end of table */
    1272           3 :         if (startBlk > heapNumBlocks)
    1273             :         {
    1274           0 :             brinRevmapTerminate(revmap);
    1275           0 :             return;
    1276             :         }
    1277           3 :         endBlk = startBlk + pagesPerRange;
    1278           3 :         if (endBlk > heapNumBlocks)
    1279           1 :             endBlk = heapNumBlocks;
    1280             :     }
    1281             : 
    1282             :     /*
    1283             :      * Scan the revmap to find unsummarized items.
    1284             :      */
    1285           8 :     buf = InvalidBuffer;
    1286         269 :     for (heapBlk = startBlk; heapBlk < endBlk; heapBlk += pagesPerRange)
    1287             :     {
    1288             :         BrinTuple  *tup;
    1289             :         OffsetNumber off;
    1290             : 
    1291         261 :         CHECK_FOR_INTERRUPTS();
    1292             : 
    1293         261 :         tup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off, NULL,
    1294             :                                        BUFFER_LOCK_SHARE, NULL);
    1295         261 :         if (tup == NULL)
    1296             :         {
    1297             :             /* no revmap entry for this heap range. Summarize it. */
    1298           7 :             if (state == NULL)
    1299             :             {
    1300             :                 /* first time through */
    1301           2 :                 Assert(!indexInfo);
    1302           2 :                 state = initialize_brin_buildstate(index, revmap,
    1303             :                                                    pagesPerRange);
    1304           2 :                 indexInfo = BuildIndexInfo(index);
    1305             :             }
    1306           7 :             summarize_range(indexInfo, state, heapRel, heapBlk, heapNumBlocks);
    1307             : 
    1308             :             /* and re-initialize state for the next range */
    1309           7 :             brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
    1310             : 
    1311           7 :             if (numSummarized)
    1312           7 :                 *numSummarized += 1.0;
    1313             :         }
    1314             :         else
    1315             :         {
    1316         254 :             if (numExisting)
    1317         145 :                 *numExisting += 1.0;
    1318         254 :             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
    1319             :         }
    1320             :     }
    1321             : 
    1322           8 :     if (BufferIsValid(buf))
    1323           7 :         ReleaseBuffer(buf);
    1324             : 
    1325             :     /* free resources */
    1326           8 :     brinRevmapTerminate(revmap);
    1327           8 :     if (state)
    1328             :     {
    1329           2 :         terminate_brin_buildstate(state);
    1330           2 :         pfree(indexInfo);
    1331             :     }
    1332             : }
    1333             : 
    1334             : /*
    1335             :  * Given a deformed tuple in the build state, convert it into the on-disk
    1336             :  * format and insert it into the index, making the revmap point to it.
    1337             :  */
    1338             : static void
    1339         147 : form_and_insert_tuple(BrinBuildState *state)
    1340             : {
    1341             :     BrinTuple  *tup;
    1342             :     Size        size;
    1343             : 
    1344         147 :     tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
    1345             :                           state->bs_dtuple, &size);
    1346         147 :     brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
    1347             :                   &state->bs_currentInsertBuf, state->bs_currRangeStart,
    1348             :                   tup, size);
    1349         147 :     state->bs_numtuples++;
    1350             : 
    1351         147 :     pfree(tup);
    1352         147 : }
    1353             : 
    1354             : /*
    1355             :  * Given two deformed tuples, adjust the first one so that it's consistent
    1356             :  * with the summary values in both.
    1357             :  */
    1358             : static void
    1359           0 : union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
    1360             : {
    1361             :     int         keyno;
    1362             :     BrinMemTuple *db;
    1363             :     MemoryContext cxt;
    1364             :     MemoryContext oldcxt;
    1365             : 
    1366             :     /* Use our own memory context to avoid retail pfree */
    1367           0 :     cxt = AllocSetContextCreate(CurrentMemoryContext,
    1368             :                                 "brin union",
    1369             :                                 ALLOCSET_DEFAULT_SIZES);
    1370           0 :     oldcxt = MemoryContextSwitchTo(cxt);
    1371           0 :     db = brin_deform_tuple(bdesc, b, NULL);
    1372           0 :     MemoryContextSwitchTo(oldcxt);
    1373             : 
    1374           0 :     for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
    1375             :     {
    1376             :         FmgrInfo   *unionFn;
    1377           0 :         BrinValues *col_a = &a->bt_columns[keyno];
    1378           0 :         BrinValues *col_b = &db->bt_columns[keyno];
    1379             : 
    1380           0 :         unionFn = index_getprocinfo(bdesc->bd_index, keyno + 1,
    1381             :                                     BRIN_PROCNUM_UNION);
    1382           0 :         FunctionCall3Coll(unionFn,
    1383           0 :                           bdesc->bd_index->rd_indcollation[keyno],
    1384             :                           PointerGetDatum(bdesc),
    1385             :                           PointerGetDatum(col_a),
    1386             :                           PointerGetDatum(col_b));
    1387             :     }
    1388             : 
    1389           0 :     MemoryContextDelete(cxt);
    1390           0 : }
    1391             : 
    1392             : /*
    1393             :  * brin_vacuum_scan
    1394             :  *      Do a complete scan of the index during VACUUM.
    1395             :  *
    1396             :  * This routine scans the complete index looking for uncatalogued index pages,
    1397             :  * i.e. those that might have been lost due to a crash after index extension
    1398             :  * and such.
    1399             :  */
    1400             : static void
    1401           3 : brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy)
    1402             : {
    1403           3 :     bool        vacuum_fsm = false;
    1404             :     BlockNumber blkno;
    1405             : 
    1406             :     /*
    1407             :      * Scan the index in physical order, and clean up any possible mess in
    1408             :      * each page.
    1409             :      */
    1410          21 :     for (blkno = 0; blkno < RelationGetNumberOfBlocks(idxrel); blkno++)
    1411             :     {
    1412             :         Buffer      buf;
    1413             : 
    1414          18 :         CHECK_FOR_INTERRUPTS();
    1415             : 
    1416          18 :         buf = ReadBufferExtended(idxrel, MAIN_FORKNUM, blkno,
    1417             :                                  RBM_NORMAL, strategy);
    1418             : 
    1419          18 :         vacuum_fsm |= brin_page_cleanup(idxrel, buf);
    1420             : 
    1421          18 :         ReleaseBuffer(buf);
    1422             :     }
    1423             : 
    1424             :     /*
    1425             :      * If we made any change to the FSM, make sure the new info is visible all
    1426             :      * the way to the top.
    1427             :      */
    1428           3 :     if (vacuum_fsm)
    1429           3 :         FreeSpaceMapVacuum(idxrel);
    1430           3 : }

Generated by: LCOV version 1.11