LCOV - code coverage report
Current view: top level - src/backend/access/gin - ginfast.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 330 367 89.9 %
Date: 2017-09-29 15:12:54 Functions: 10 10 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * ginfast.c
       4             :  *    Fast insert routines for the Postgres inverted index access method.
       5             :  *    Pending entries are stored in linear list of pages.  Later on
       6             :  *    (typically during VACUUM), ginInsertCleanup() will be invoked to
       7             :  *    transfer pending entries into the regular index structure.  This
       8             :  *    wins because bulk insertion is much more efficient than retail.
       9             :  *
      10             :  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
      11             :  * Portions Copyright (c) 1994, Regents of the University of California
      12             :  *
      13             :  * IDENTIFICATION
      14             :  *          src/backend/access/gin/ginfast.c
      15             :  *
      16             :  *-------------------------------------------------------------------------
      17             :  */
      18             : 
      19             : #include "postgres.h"
      20             : 
      21             : #include "access/gin_private.h"
      22             : #include "access/ginxlog.h"
      23             : #include "access/xloginsert.h"
      24             : #include "access/xlog.h"
      25             : #include "commands/vacuum.h"
      26             : #include "catalog/pg_am.h"
      27             : #include "miscadmin.h"
      28             : #include "utils/memutils.h"
      29             : #include "utils/rel.h"
      30             : #include "utils/acl.h"
      31             : #include "postmaster/autovacuum.h"
      32             : #include "storage/indexfsm.h"
      33             : #include "storage/lmgr.h"
      34             : #include "utils/builtins.h"
      35             : 
      36             : /* GUC parameter */
      37             : int         gin_pending_list_limit = 0;
      38             : 
      39             : #define GIN_PAGE_FREESIZE \
      40             :     ( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) )
      41             : 
      42             : typedef struct KeyArray
      43             : {
      44             :     Datum      *keys;           /* expansible array */
      45             :     GinNullCategory *categories;    /* another expansible array */
      46             :     int32       nvalues;        /* current number of valid entries */
      47             :     int32       maxvalues;      /* allocated size of arrays */
      48             : } KeyArray;
      49             : 
      50             : 
      51             : /*
      52             :  * Build a pending-list page from the given array of tuples, and write it out.
      53             :  *
      54             :  * Returns amount of free space left on the page.
      55             :  */
      56             : static int32
      57         132 : writeListPage(Relation index, Buffer buffer,
      58             :               IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
      59             : {
      60         132 :     Page        page = BufferGetPage(buffer);
      61             :     int32       i,
      62             :                 freesize,
      63         132 :                 size = 0;
      64             :     OffsetNumber l,
      65             :                 off;
      66             :     char       *workspace;
      67             :     char       *ptr;
      68             : 
      69             :     /* workspace could be a local array; we use palloc for alignment */
      70         132 :     workspace = palloc(BLCKSZ);
      71             : 
      72         132 :     START_CRIT_SECTION();
      73             : 
      74         132 :     GinInitBuffer(buffer, GIN_LIST);
      75             : 
      76         132 :     off = FirstOffsetNumber;
      77         132 :     ptr = workspace;
      78             : 
      79         524 :     for (i = 0; i < ntuples; i++)
      80             :     {
      81         392 :         int         this_size = IndexTupleSize(tuples[i]);
      82             : 
      83         392 :         memcpy(ptr, tuples[i], this_size);
      84         392 :         ptr += this_size;
      85         392 :         size += this_size;
      86             : 
      87         392 :         l = PageAddItem(page, (Item) tuples[i], this_size, off, false, false);
      88             : 
      89         392 :         if (l == InvalidOffsetNumber)
      90           0 :             elog(ERROR, "failed to add item to index page in \"%s\"",
      91             :                  RelationGetRelationName(index));
      92             : 
      93         392 :         off++;
      94             :     }
      95             : 
      96         132 :     Assert(size <= BLCKSZ);      /* else we overran workspace */
      97             : 
      98         132 :     GinPageGetOpaque(page)->rightlink = rightlink;
      99             : 
     100             :     /*
     101             :      * tail page may contain only whole row(s) or final part of row placed on
     102             :      * previous pages (a "row" here meaning all the index tuples generated for
     103             :      * one heap tuple)
     104             :      */
     105         132 :     if (rightlink == InvalidBlockNumber)
     106             :     {
     107         132 :         GinPageSetFullRow(page);
     108         132 :         GinPageGetOpaque(page)->maxoff = 1;
     109             :     }
     110             :     else
     111             :     {
     112           0 :         GinPageGetOpaque(page)->maxoff = 0;
     113             :     }
     114             : 
     115         132 :     MarkBufferDirty(buffer);
     116             : 
     117         132 :     if (RelationNeedsWAL(index))
     118             :     {
     119             :         ginxlogInsertListPage data;
     120             :         XLogRecPtr  recptr;
     121             : 
     122         131 :         data.rightlink = rightlink;
     123         131 :         data.ntuples = ntuples;
     124             : 
     125         131 :         XLogBeginInsert();
     126         131 :         XLogRegisterData((char *) &data, sizeof(ginxlogInsertListPage));
     127             : 
     128         131 :         XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
     129         131 :         XLogRegisterBufData(0, workspace, size);
     130             : 
     131         131 :         recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE);
     132         131 :         PageSetLSN(page, recptr);
     133             :     }
     134             : 
     135             :     /* get free space before releasing buffer */
     136         132 :     freesize = PageGetExactFreeSpace(page);
     137             : 
     138         132 :     UnlockReleaseBuffer(buffer);
     139             : 
     140         132 :     END_CRIT_SECTION();
     141             : 
     142         132 :     pfree(workspace);
     143             : 
     144         132 :     return freesize;
     145             : }
     146             : 
     147             : static void
     148         132 : makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
     149             :             GinMetaPageData *res)
     150             : {
     151         132 :     Buffer      curBuffer = InvalidBuffer;
     152         132 :     Buffer      prevBuffer = InvalidBuffer;
     153             :     int         i,
     154         132 :                 size = 0,
     155             :                 tupsize;
     156         132 :     int         startTuple = 0;
     157             : 
     158         132 :     Assert(ntuples > 0);
     159             : 
     160             :     /*
     161             :      * Split tuples into pages
     162             :      */
     163         524 :     for (i = 0; i < ntuples; i++)
     164             :     {
     165         392 :         if (curBuffer == InvalidBuffer)
     166             :         {
     167         132 :             curBuffer = GinNewBuffer(index);
     168             : 
     169         132 :             if (prevBuffer != InvalidBuffer)
     170             :             {
     171           0 :                 res->nPendingPages++;
     172           0 :                 writeListPage(index, prevBuffer,
     173           0 :                               tuples + startTuple,
     174             :                               i - startTuple,
     175             :                               BufferGetBlockNumber(curBuffer));
     176             :             }
     177             :             else
     178             :             {
     179         132 :                 res->head = BufferGetBlockNumber(curBuffer);
     180             :             }
     181             : 
     182         132 :             prevBuffer = curBuffer;
     183         132 :             startTuple = i;
     184         132 :             size = 0;
     185             :         }
     186             : 
     187         392 :         tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
     188             : 
     189         392 :         if (size + tupsize > GinListPageSize)
     190             :         {
     191             :             /* won't fit, force a new page and reprocess */
     192           0 :             i--;
     193           0 :             curBuffer = InvalidBuffer;
     194             :         }
     195             :         else
     196             :         {
     197         392 :             size += tupsize;
     198             :         }
     199             :     }
     200             : 
     201             :     /*
     202             :      * Write last page
     203             :      */
     204         132 :     res->tail = BufferGetBlockNumber(curBuffer);
     205         264 :     res->tailFreeSize = writeListPage(index, curBuffer,
     206         132 :                                       tuples + startTuple,
     207             :                                       ntuples - startTuple,
     208             :                                       InvalidBlockNumber);
     209         132 :     res->nPendingPages++;
     210             :     /* that was only one heap tuple */
     211         132 :     res->nPendingHeapTuples = 1;
     212         132 : }
     213             : 
     214             : /*
     215             :  * Write the index tuples contained in *collector into the index's
     216             :  * pending list.
     217             :  *
     218             :  * Function guarantees that all these tuples will be inserted consecutively,
     219             :  * preserving order
     220             :  */
     221             : void
     222       22006 : ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
     223             : {
     224       22006 :     Relation    index = ginstate->index;
     225             :     Buffer      metabuffer;
     226             :     Page        metapage;
     227       22006 :     GinMetaPageData *metadata = NULL;
     228       22006 :     Buffer      buffer = InvalidBuffer;
     229       22006 :     Page        page = NULL;
     230             :     ginxlogUpdateMeta data;
     231       22006 :     bool        separateList = false;
     232       22006 :     bool        needCleanup = false;
     233             :     int         cleanupSize;
     234             :     bool        needWal;
     235             : 
     236       22006 :     if (collector->ntuples == 0)
     237       22006 :         return;
     238             : 
     239       22006 :     needWal = RelationNeedsWAL(index);
     240             : 
     241       22006 :     data.node = index->rd_node;
     242       22006 :     data.ntuples = 0;
     243       22006 :     data.newRightlink = data.prevTail = InvalidBlockNumber;
     244             : 
     245       22006 :     metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
     246       22006 :     metapage = BufferGetPage(metabuffer);
     247             : 
     248       22006 :     if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
     249             :     {
     250             :         /*
     251             :          * Total size is greater than one page => make sublist
     252             :          */
     253           0 :         separateList = true;
     254             :     }
     255             :     else
     256             :     {
     257       22006 :         LockBuffer(metabuffer, GIN_EXCLUSIVE);
     258       22006 :         metadata = GinPageGetMeta(metapage);
     259             : 
     260       44008 :         if (metadata->head == InvalidBlockNumber ||
     261       22002 :             collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
     262             :         {
     263             :             /*
     264             :              * Pending list is empty or total size is greater than freespace
     265             :              * on tail page => make sublist
     266             :              *
     267             :              * We unlock metabuffer to keep high concurrency
     268             :              */
     269         132 :             separateList = true;
     270         132 :             LockBuffer(metabuffer, GIN_UNLOCK);
     271             :         }
     272             :     }
     273             : 
     274       22006 :     if (separateList)
     275             :     {
     276             :         /*
     277             :          * We should make sublist separately and append it to the tail
     278             :          */
     279             :         GinMetaPageData sublist;
     280             : 
     281         132 :         memset(&sublist, 0, sizeof(GinMetaPageData));
     282         132 :         makeSublist(index, collector->tuples, collector->ntuples, &sublist);
     283             : 
     284         132 :         if (needWal)
     285         131 :             XLogBeginInsert();
     286             : 
     287             :         /*
     288             :          * metapage was unlocked, see above
     289             :          */
     290         132 :         LockBuffer(metabuffer, GIN_EXCLUSIVE);
     291         132 :         metadata = GinPageGetMeta(metapage);
     292             : 
     293         132 :         if (metadata->head == InvalidBlockNumber)
     294             :         {
     295             :             /*
     296             :              * Main list is empty, so just insert sublist as main list
     297             :              */
     298           4 :             START_CRIT_SECTION();
     299             : 
     300           4 :             metadata->head = sublist.head;
     301           4 :             metadata->tail = sublist.tail;
     302           4 :             metadata->tailFreeSize = sublist.tailFreeSize;
     303             : 
     304           4 :             metadata->nPendingPages = sublist.nPendingPages;
     305           4 :             metadata->nPendingHeapTuples = sublist.nPendingHeapTuples;
     306             :         }
     307             :         else
     308             :         {
     309             :             /*
     310             :              * Merge lists
     311             :              */
     312         128 :             data.prevTail = metadata->tail;
     313         128 :             data.newRightlink = sublist.head;
     314             : 
     315         128 :             buffer = ReadBuffer(index, metadata->tail);
     316         128 :             LockBuffer(buffer, GIN_EXCLUSIVE);
     317         128 :             page = BufferGetPage(buffer);
     318             : 
     319         128 :             Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
     320             : 
     321         128 :             START_CRIT_SECTION();
     322             : 
     323         128 :             GinPageGetOpaque(page)->rightlink = sublist.head;
     324             : 
     325         128 :             MarkBufferDirty(buffer);
     326             : 
     327         128 :             metadata->tail = sublist.tail;
     328         128 :             metadata->tailFreeSize = sublist.tailFreeSize;
     329             : 
     330         128 :             metadata->nPendingPages += sublist.nPendingPages;
     331         128 :             metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
     332             : 
     333         128 :             if (needWal)
     334         128 :                 XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
     335             :         }
     336             :     }
     337             :     else
     338             :     {
     339             :         /*
     340             :          * Insert into tail page.  Metapage is already locked
     341             :          */
     342             :         OffsetNumber l,
     343             :                     off;
     344             :         int         i,
     345             :                     tupsize;
     346             :         char       *ptr;
     347             :         char       *collectordata;
     348             : 
     349       21874 :         buffer = ReadBuffer(index, metadata->tail);
     350       21874 :         LockBuffer(buffer, GIN_EXCLUSIVE);
     351       21874 :         page = BufferGetPage(buffer);
     352             : 
     353       65622 :         off = (PageIsEmpty(page)) ? FirstOffsetNumber :
     354       43748 :             OffsetNumberNext(PageGetMaxOffsetNumber(page));
     355             : 
     356       21874 :         collectordata = ptr = (char *) palloc(collector->sumsize);
     357             : 
     358       21874 :         data.ntuples = collector->ntuples;
     359             : 
     360       21874 :         if (needWal)
     361       21873 :             XLogBeginInsert();
     362             : 
     363       21874 :         START_CRIT_SECTION();
     364             : 
     365             :         /*
     366             :          * Increase counter of heap tuples
     367             :          */
     368       21874 :         Assert(GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples);
     369       21874 :         GinPageGetOpaque(page)->maxoff++;
     370       21874 :         metadata->nPendingHeapTuples++;
     371             : 
     372       87487 :         for (i = 0; i < collector->ntuples; i++)
     373             :         {
     374       65613 :             tupsize = IndexTupleSize(collector->tuples[i]);
     375       65613 :             l = PageAddItem(page, (Item) collector->tuples[i], tupsize, off, false, false);
     376             : 
     377       65613 :             if (l == InvalidOffsetNumber)
     378           0 :                 elog(ERROR, "failed to add item to index page in \"%s\"",
     379             :                      RelationGetRelationName(index));
     380             : 
     381       65613 :             memcpy(ptr, collector->tuples[i], tupsize);
     382       65613 :             ptr += tupsize;
     383             : 
     384       65613 :             off++;
     385             :         }
     386             : 
     387       21874 :         Assert((ptr - collectordata) <= collector->sumsize);
     388       21874 :         if (needWal)
     389             :         {
     390       21873 :             XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
     391       21873 :             XLogRegisterBufData(1, collectordata, collector->sumsize);
     392             :         }
     393             : 
     394       21874 :         metadata->tailFreeSize = PageGetExactFreeSpace(page);
     395             : 
     396       21874 :         MarkBufferDirty(buffer);
     397             :     }
     398             : 
     399             :     /*
     400             :      * Write metabuffer, make xlog entry
     401             :      */
     402       22006 :     MarkBufferDirty(metabuffer);
     403             : 
     404       22006 :     if (needWal)
     405             :     {
     406             :         XLogRecPtr  recptr;
     407             : 
     408       22004 :         memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
     409             : 
     410       22004 :         XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT);
     411       22004 :         XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
     412             : 
     413       22004 :         recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
     414       22004 :         PageSetLSN(metapage, recptr);
     415             : 
     416       22004 :         if (buffer != InvalidBuffer)
     417             :         {
     418       22001 :             PageSetLSN(page, recptr);
     419             :         }
     420             :     }
     421             : 
     422       22006 :     if (buffer != InvalidBuffer)
     423       22002 :         UnlockReleaseBuffer(buffer);
     424             : 
     425             :     /*
     426             :      * Force pending list cleanup when it becomes too long. And,
     427             :      * ginInsertCleanup could take significant amount of time, so we prefer to
     428             :      * call it when it can do all the work in a single collection cycle. In
     429             :      * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it
     430             :      * while pending list is still small enough to fit into
     431             :      * gin_pending_list_limit.
     432             :      *
     433             :      * ginInsertCleanup() should not be called inside our CRIT_SECTION.
     434             :      */
     435       22006 :     cleanupSize = GinGetPendingListCleanupSize(index);
     436       22006 :     if (metadata->nPendingPages * GIN_PAGE_FREESIZE > cleanupSize * 1024L)
     437           0 :         needCleanup = true;
     438             : 
     439       22006 :     UnlockReleaseBuffer(metabuffer);
     440             : 
     441       22006 :     END_CRIT_SECTION();
     442             : 
     443       22006 :     if (needCleanup)
     444           0 :         ginInsertCleanup(ginstate, false, true, NULL);
     445             : }
     446             : 
     447             : /*
     448             :  * Create temporary index tuples for a single indexable item (one index column
     449             :  * for the heap tuple specified by ht_ctid), and append them to the array
     450             :  * in *collector.  They will subsequently be written out using
     451             :  * ginHeapTupleFastInsert.  Note that to guarantee consistent state, all
     452             :  * temp tuples for a given heap tuple must be written in one call to
     453             :  * ginHeapTupleFastInsert.
     454             :  */
     455             : void
     456       22006 : ginHeapTupleFastCollect(GinState *ginstate,
     457             :                         GinTupleCollector *collector,
     458             :                         OffsetNumber attnum, Datum value, bool isNull,
     459             :                         ItemPointer ht_ctid)
     460             : {
     461             :     Datum      *entries;
     462             :     GinNullCategory *categories;
     463             :     int32       i,
     464             :                 nentries;
     465             : 
     466             :     /*
     467             :      * Extract the key values that need to be inserted in the index
     468             :      */
     469       22006 :     entries = ginExtractEntries(ginstate, attnum, value, isNull,
     470             :                                 &nentries, &categories);
     471             : 
     472             :     /*
     473             :      * Allocate/reallocate memory for storing collected tuples
     474             :      */
     475       22006 :     if (collector->tuples == NULL)
     476             :     {
     477       22006 :         collector->lentuples = nentries * ginstate->origTupdesc->natts;
     478       22006 :         collector->tuples = (IndexTuple *) palloc(sizeof(IndexTuple) * collector->lentuples);
     479             :     }
     480             : 
     481       44012 :     while (collector->ntuples + nentries > collector->lentuples)
     482             :     {
     483           0 :         collector->lentuples *= 2;
     484           0 :         collector->tuples = (IndexTuple *) repalloc(collector->tuples,
     485           0 :                                                     sizeof(IndexTuple) * collector->lentuples);
     486             :     }
     487             : 
     488             :     /*
     489             :      * Build an index tuple for each key value, and add to array.  In pending
     490             :      * tuples we just stick the heap TID into t_tid.
     491             :      */
     492       88011 :     for (i = 0; i < nentries; i++)
     493             :     {
     494             :         IndexTuple  itup;
     495             : 
     496       66005 :         itup = GinFormTuple(ginstate, attnum, entries[i], categories[i],
     497             :                             NULL, 0, 0, true);
     498       66005 :         itup->t_tid = *ht_ctid;
     499       66005 :         collector->tuples[collector->ntuples++] = itup;
     500       66005 :         collector->sumsize += IndexTupleSize(itup);
     501             :     }
     502       22006 : }
     503             : 
     504             : /*
     505             :  * Deletes pending list pages up to (not including) newHead page.
     506             :  * If newHead == InvalidBlockNumber then function drops the whole list.
     507             :  *
     508             :  * metapage is pinned and exclusive-locked throughout this function.
     509             :  */
     510             : static void
     511           2 : shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
     512             :           bool fill_fsm, IndexBulkDeleteResult *stats)
     513             : {
     514             :     Page        metapage;
     515             :     GinMetaPageData *metadata;
     516             :     BlockNumber blknoToDelete;
     517             : 
     518           2 :     metapage = BufferGetPage(metabuffer);
     519           2 :     metadata = GinPageGetMeta(metapage);
     520           2 :     blknoToDelete = metadata->head;
     521             : 
     522             :     do
     523             :     {
     524             :         Page        page;
     525             :         int         i;
     526           9 :         int64       nDeletedHeapTuples = 0;
     527             :         ginxlogDeleteListPages data;
     528             :         Buffer      buffers[GIN_NDELETE_AT_ONCE];
     529             :         BlockNumber freespace[GIN_NDELETE_AT_ONCE];
     530             : 
     531           9 :         data.ndeleted = 0;
     532         148 :         while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
     533             :         {
     534         130 :             freespace[data.ndeleted] = blknoToDelete;
     535         130 :             buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
     536         130 :             LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
     537         130 :             page = BufferGetPage(buffers[data.ndeleted]);
     538             : 
     539         130 :             data.ndeleted++;
     540             : 
     541         130 :             Assert(!GinPageIsDeleted(page));
     542             : 
     543         130 :             nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
     544         130 :             blknoToDelete = GinPageGetOpaque(page)->rightlink;
     545             :         }
     546             : 
     547           9 :         if (stats)
     548           9 :             stats->pages_deleted += data.ndeleted;
     549             : 
     550             :         /*
     551             :          * This operation touches an unusually large number of pages, so
     552             :          * prepare the XLogInsert machinery for that before entering the
     553             :          * critical section.
     554             :          */
     555           9 :         if (RelationNeedsWAL(index))
     556           9 :             XLogEnsureRecordSpace(data.ndeleted, 0);
     557             : 
     558           9 :         START_CRIT_SECTION();
     559             : 
     560           9 :         metadata->head = blknoToDelete;
     561             : 
     562           9 :         Assert(metadata->nPendingPages >= data.ndeleted);
     563           9 :         metadata->nPendingPages -= data.ndeleted;
     564           9 :         Assert(metadata->nPendingHeapTuples >= nDeletedHeapTuples);
     565           9 :         metadata->nPendingHeapTuples -= nDeletedHeapTuples;
     566             : 
     567           9 :         if (blknoToDelete == InvalidBlockNumber)
     568             :         {
     569           2 :             metadata->tail = InvalidBlockNumber;
     570           2 :             metadata->tailFreeSize = 0;
     571           2 :             metadata->nPendingPages = 0;
     572           2 :             metadata->nPendingHeapTuples = 0;
     573             :         }
     574             : 
     575           9 :         MarkBufferDirty(metabuffer);
     576             : 
     577         139 :         for (i = 0; i < data.ndeleted; i++)
     578             :         {
     579         130 :             page = BufferGetPage(buffers[i]);
     580         130 :             GinPageGetOpaque(page)->flags = GIN_DELETED;
     581         130 :             MarkBufferDirty(buffers[i]);
     582             :         }
     583             : 
     584           9 :         if (RelationNeedsWAL(index))
     585             :         {
     586             :             XLogRecPtr  recptr;
     587             : 
     588           9 :             XLogBeginInsert();
     589           9 :             XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT);
     590         139 :             for (i = 0; i < data.ndeleted; i++)
     591         130 :                 XLogRegisterBuffer(i + 1, buffers[i], REGBUF_WILL_INIT);
     592             : 
     593           9 :             memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
     594             : 
     595           9 :             XLogRegisterData((char *) &data,
     596             :                              sizeof(ginxlogDeleteListPages));
     597             : 
     598           9 :             recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE);
     599           9 :             PageSetLSN(metapage, recptr);
     600             : 
     601         139 :             for (i = 0; i < data.ndeleted; i++)
     602             :             {
     603         130 :                 page = BufferGetPage(buffers[i]);
     604         130 :                 PageSetLSN(page, recptr);
     605             :             }
     606             :         }
     607             : 
     608         139 :         for (i = 0; i < data.ndeleted; i++)
     609         130 :             UnlockReleaseBuffer(buffers[i]);
     610             : 
     611           9 :         END_CRIT_SECTION();
     612             : 
     613         133 :         for (i = 0; fill_fsm && i < data.ndeleted; i++)
     614         124 :             RecordFreeIndexPage(index, freespace[i]);
     615             : 
     616           9 :     } while (blknoToDelete != newHead);
     617           2 : }
     618             : 
     619             : /* Initialize empty KeyArray */
     620             : static void
     621           2 : initKeyArray(KeyArray *keys, int32 maxvalues)
     622             : {
     623           2 :     keys->keys = (Datum *) palloc(sizeof(Datum) * maxvalues);
     624           2 :     keys->categories = (GinNullCategory *)
     625           2 :         palloc(sizeof(GinNullCategory) * maxvalues);
     626           2 :     keys->nvalues = 0;
     627           2 :     keys->maxvalues = maxvalues;
     628           2 : }
     629             : 
     630             : /* Add datum to KeyArray, resizing if needed */
     631             : static void
     632       65994 : addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
     633             : {
     634       65994 :     if (keys->nvalues >= keys->maxvalues)
     635             :     {
     636           0 :         keys->maxvalues *= 2;
     637           0 :         keys->keys = (Datum *)
     638           0 :             repalloc(keys->keys, sizeof(Datum) * keys->maxvalues);
     639           0 :         keys->categories = (GinNullCategory *)
     640           0 :             repalloc(keys->categories, sizeof(GinNullCategory) * keys->maxvalues);
     641             :     }
     642             : 
     643       65994 :     keys->keys[keys->nvalues] = datum;
     644       65994 :     keys->categories[keys->nvalues] = category;
     645       65994 :     keys->nvalues++;
     646       65994 : }
     647             : 
     648             : /*
     649             :  * Collect data from a pending-list page in preparation for insertion into
     650             :  * the main index.
     651             :  *
     652             :  * Go through all tuples >= startoff on page and collect values in accum
     653             :  *
     654             :  * Note that ka is just workspace --- it does not carry any state across
     655             :  * calls.
     656             :  */
     657             : static void
     658         130 : processPendingPage(BuildAccumulator *accum, KeyArray *ka,
     659             :                    Page page, OffsetNumber startoff)
     660             : {
     661             :     ItemPointerData heapptr;
     662             :     OffsetNumber i,
     663             :                 maxoff;
     664             :     OffsetNumber attrnum;
     665             : 
     666             :     /* reset *ka to empty */
     667         130 :     ka->nvalues = 0;
     668             : 
     669         130 :     maxoff = PageGetMaxOffsetNumber(page);
     670         130 :     Assert(maxoff >= FirstOffsetNumber);
     671         130 :     ItemPointerSetInvalid(&heapptr);
     672         130 :     attrnum = 0;
     673             : 
     674       66124 :     for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
     675             :     {
     676       65994 :         IndexTuple  itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
     677             :         OffsetNumber curattnum;
     678             :         Datum       curkey;
     679             :         GinNullCategory curcategory;
     680             : 
     681             :         /* Check for change of heap TID or attnum */
     682       65994 :         curattnum = gintuple_get_attrnum(accum->ginstate, itup);
     683             : 
     684       65994 :         if (!ItemPointerIsValid(&heapptr))
     685             :         {
     686         130 :             heapptr = itup->t_tid;
     687         130 :             attrnum = curattnum;
     688             :         }
     689       65864 :         else if (!(ItemPointerEquals(&heapptr, &itup->t_tid) &&
     690             :                    curattnum == attrnum))
     691             :         {
     692             :             /*
     693             :              * ginInsertBAEntries can insert several datums per call, but only
     694             :              * for one heap tuple and one column.  So call it at a boundary,
     695             :              * and reset ka.
     696             :              */
     697       21870 :             ginInsertBAEntries(accum, &heapptr, attrnum,
     698             :                                ka->keys, ka->categories, ka->nvalues);
     699       21870 :             ka->nvalues = 0;
     700       21870 :             heapptr = itup->t_tid;
     701       21870 :             attrnum = curattnum;
     702             :         }
     703             : 
     704             :         /* Add key to KeyArray */
     705       65994 :         curkey = gintuple_get_key(accum->ginstate, itup, &curcategory);
     706       65994 :         addDatum(ka, curkey, curcategory);
     707             :     }
     708             : 
     709             :     /* Dump out all remaining keys */
     710         130 :     ginInsertBAEntries(accum, &heapptr, attrnum,
     711             :                        ka->keys, ka->categories, ka->nvalues);
     712         130 : }
     713             : 
     714             : /*
     715             :  * Move tuples from pending pages into regular GIN structure.
     716             :  *
     717             :  * On first glance it looks completely not crash-safe. But if we crash
     718             :  * after posting entries to the main index and before removing them from the
     719             :  * pending list, it's okay because when we redo the posting later on, nothing
     720             :  * bad will happen.
     721             :  *
     722             :  * fill_fsm indicates that ginInsertCleanup should add deleted pages
     723             :  * to FSM otherwise caller is responsible to put deleted pages into
     724             :  * FSM.
     725             :  *
     726             :  * If stats isn't null, we count deleted pending pages into the counts.
     727             :  */
     728             : void
     729           7 : ginInsertCleanup(GinState *ginstate, bool full_clean,
     730             :                  bool fill_fsm, IndexBulkDeleteResult *stats)
     731             : {
     732           7 :     Relation    index = ginstate->index;
     733             :     Buffer      metabuffer,
     734             :                 buffer;
     735             :     Page        metapage,
     736             :                 page;
     737             :     GinMetaPageData *metadata;
     738             :     MemoryContext opCtx,
     739             :                 oldCtx;
     740             :     BuildAccumulator accum;
     741             :     KeyArray    datums;
     742             :     BlockNumber blkno,
     743             :                 blknoFinish;
     744           7 :     bool        cleanupFinish = false;
     745           7 :     bool        fsm_vac = false;
     746             :     Size        workMemory;
     747           7 :     bool        inVacuum = (stats == NULL);
     748             : 
     749             :     /*
     750             :      * We would like to prevent concurrent cleanup process. For that we will
     751             :      * lock metapage in exclusive mode using LockPage() call. Nobody other
     752             :      * will use that lock for metapage, so we keep possibility of concurrent
     753             :      * insertion into pending list
     754             :      */
     755             : 
     756           7 :     if (inVacuum)
     757             :     {
     758             :         /*
     759             :          * We are called from [auto]vacuum/analyze or gin_clean_pending_list()
     760             :          * and we would like to wait concurrent cleanup to finish.
     761             :          */
     762           0 :         LockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
     763           0 :         workMemory =
     764           0 :             (IsAutoVacuumWorkerProcess() && autovacuum_work_mem != -1) ?
     765             :             autovacuum_work_mem : maintenance_work_mem;
     766             :     }
     767             :     else
     768             :     {
     769             :         /*
     770             :          * We are called from regular insert and if we see concurrent cleanup
     771             :          * just exit in hope that concurrent process will clean up pending
     772             :          * list.
     773             :          */
     774           7 :         if (!ConditionalLockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock))
     775           5 :             return;
     776           7 :         workMemory = work_mem;
     777             :     }
     778             : 
     779           7 :     metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
     780           7 :     LockBuffer(metabuffer, GIN_SHARE);
     781           7 :     metapage = BufferGetPage(metabuffer);
     782           7 :     metadata = GinPageGetMeta(metapage);
     783             : 
     784           7 :     if (metadata->head == InvalidBlockNumber)
     785             :     {
     786             :         /* Nothing to do */
     787           5 :         UnlockReleaseBuffer(metabuffer);
     788           5 :         UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
     789           5 :         return;
     790             :     }
     791             : 
     792             :     /*
     793             :      * Remember a tail page to prevent infinite cleanup if other backends add
     794             :      * new tuples faster than we can cleanup.
     795             :      */
     796           2 :     blknoFinish = metadata->tail;
     797             : 
     798             :     /*
     799             :      * Read and lock head of pending list
     800             :      */
     801           2 :     blkno = metadata->head;
     802           2 :     buffer = ReadBuffer(index, blkno);
     803           2 :     LockBuffer(buffer, GIN_SHARE);
     804           2 :     page = BufferGetPage(buffer);
     805             : 
     806           2 :     LockBuffer(metabuffer, GIN_UNLOCK);
     807             : 
     808             :     /*
     809             :      * Initialize.  All temporary space will be in opCtx
     810             :      */
     811           2 :     opCtx = AllocSetContextCreate(CurrentMemoryContext,
     812             :                                   "GIN insert cleanup temporary context",
     813             :                                   ALLOCSET_DEFAULT_SIZES);
     814             : 
     815           2 :     oldCtx = MemoryContextSwitchTo(opCtx);
     816             : 
     817           2 :     initKeyArray(&datums, 128);
     818           2 :     ginInitBA(&accum);
     819           2 :     accum.ginstate = ginstate;
     820             : 
     821             :     /*
     822             :      * At the top of this loop, we have pin and lock on the current page of
     823             :      * the pending list.  However, we'll release that before exiting the loop.
     824             :      * Note we also have pin but not lock on the metapage.
     825             :      */
     826             :     for (;;)
     827             :     {
     828         130 :         Assert(!GinPageIsDeleted(page));
     829             : 
     830             :         /*
     831             :          * Are we walk through the page which as we remember was a tail when
     832             :          * we start our cleanup?  But if caller asks us to clean up whole
     833             :          * pending list then ignore old tail, we will work until list becomes
     834             :          * empty.
     835             :          */
     836         130 :         if (blkno == blknoFinish && full_clean == false)
     837           0 :             cleanupFinish = true;
     838             : 
     839             :         /*
     840             :          * read page's datums into accum
     841             :          */
     842         130 :         processPendingPage(&accum, &datums, page, FirstOffsetNumber);
     843             : 
     844         130 :         vacuum_delay_point();
     845             : 
     846             :         /*
     847             :          * Is it time to flush memory to disk?  Flush if we are at the end of
     848             :          * the pending list, or if we have a full row and memory is getting
     849             :          * full.
     850             :          */
     851         258 :         if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
     852         256 :             (GinPageHasFullRow(page) &&
     853         128 :              (accum.allocatedMemory >= workMemory * 1024L)))
     854           0 :         {
     855             :             ItemPointerData *list;
     856             :             uint32      nlist;
     857             :             Datum       key;
     858             :             GinNullCategory category;
     859             :             OffsetNumber maxoff,
     860             :                         attnum;
     861             : 
     862             :             /*
     863             :              * Unlock current page to increase performance. Changes of page
     864             :              * will be checked later by comparing maxoff after completion of
     865             :              * memory flush.
     866             :              */
     867           2 :             maxoff = PageGetMaxOffsetNumber(page);
     868           2 :             LockBuffer(buffer, GIN_UNLOCK);
     869             : 
     870             :             /*
     871             :              * Moving collected data into regular structure can take
     872             :              * significant amount of time - so, run it without locking pending
     873             :              * list.
     874             :              */
     875           2 :             ginBeginBAScan(&accum);
     876       21004 :             while ((list = ginGetBAEntry(&accum,
     877             :                                          &attnum, &key, &category, &nlist)) != NULL)
     878             :             {
     879       21000 :                 ginEntryInsert(ginstate, attnum, key, category,
     880             :                                list, nlist, NULL);
     881       21000 :                 vacuum_delay_point();
     882             :             }
     883             : 
     884             :             /*
     885             :              * Lock the whole list to remove pages
     886             :              */
     887           2 :             LockBuffer(metabuffer, GIN_EXCLUSIVE);
     888           2 :             LockBuffer(buffer, GIN_SHARE);
     889             : 
     890           2 :             Assert(!GinPageIsDeleted(page));
     891             : 
     892             :             /*
     893             :              * While we left the page unlocked, more stuff might have gotten
     894             :              * added to it.  If so, process those entries immediately.  There
     895             :              * shouldn't be very many, so we don't worry about the fact that
     896             :              * we're doing this with exclusive lock. Insertion algorithm
     897             :              * guarantees that inserted row(s) will not continue on next page.
     898             :              * NOTE: intentionally no vacuum_delay_point in this loop.
     899             :              */
     900           2 :             if (PageGetMaxOffsetNumber(page) != maxoff)
     901             :             {
     902           0 :                 ginInitBA(&accum);
     903           0 :                 processPendingPage(&accum, &datums, page, maxoff + 1);
     904             : 
     905           0 :                 ginBeginBAScan(&accum);
     906           0 :                 while ((list = ginGetBAEntry(&accum,
     907             :                                              &attnum, &key, &category, &nlist)) != NULL)
     908           0 :                     ginEntryInsert(ginstate, attnum, key, category,
     909             :                                    list, nlist, NULL);
     910             :             }
     911             : 
     912             :             /*
     913             :              * Remember next page - it will become the new list head
     914             :              */
     915           2 :             blkno = GinPageGetOpaque(page)->rightlink;
     916           2 :             UnlockReleaseBuffer(buffer);    /* shiftList will do exclusive
     917             :                                              * locking */
     918             : 
     919             :             /*
     920             :              * remove read pages from pending list, at this point all content
     921             :              * of read pages is in regular structure
     922             :              */
     923           2 :             shiftList(index, metabuffer, blkno, fill_fsm, stats);
     924             : 
     925             :             /* At this point, some pending pages have been freed up */
     926           2 :             fsm_vac = true;
     927             : 
     928           2 :             Assert(blkno == metadata->head);
     929           2 :             LockBuffer(metabuffer, GIN_UNLOCK);
     930             : 
     931             :             /*
     932             :              * if we removed the whole pending list or we cleanup tail (which
     933             :              * we remembered on start our cleanup process) then just exit
     934             :              */
     935           2 :             if (blkno == InvalidBlockNumber || cleanupFinish)
     936             :                 break;
     937             : 
     938             :             /*
     939             :              * release memory used so far and reinit state
     940             :              */
     941           0 :             MemoryContextReset(opCtx);
     942           0 :             initKeyArray(&datums, datums.maxvalues);
     943           0 :             ginInitBA(&accum);
     944             :         }
     945             :         else
     946             :         {
     947         128 :             blkno = GinPageGetOpaque(page)->rightlink;
     948         128 :             UnlockReleaseBuffer(buffer);
     949             :         }
     950             : 
     951             :         /*
     952             :          * Read next page in pending list
     953             :          */
     954         128 :         vacuum_delay_point();
     955         128 :         buffer = ReadBuffer(index, blkno);
     956         128 :         LockBuffer(buffer, GIN_SHARE);
     957         128 :         page = BufferGetPage(buffer);
     958         128 :     }
     959             : 
     960           2 :     UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
     961           2 :     ReleaseBuffer(metabuffer);
     962             : 
     963             :     /*
     964             :      * As pending list pages can have a high churn rate, it is desirable to
     965             :      * recycle them immediately to the FreeSpace Map when ordinary backends
     966             :      * clean the list.
     967             :      */
     968           4 :     if (fsm_vac && fill_fsm)
     969           1 :         IndexFreeSpaceMapVacuum(index);
     970             : 
     971             : 
     972             :     /* Clean up temporary space */
     973           2 :     MemoryContextSwitchTo(oldCtx);
     974           2 :     MemoryContextDelete(opCtx);
     975             : }
     976             : 
     977             : /*
     978             :  * SQL-callable function to clean the insert pending list
     979             :  */
     980             : Datum
     981           2 : gin_clean_pending_list(PG_FUNCTION_ARGS)
     982             : {
     983           2 :     Oid         indexoid = PG_GETARG_OID(0);
     984           2 :     Relation    indexRel = index_open(indexoid, AccessShareLock);
     985             :     IndexBulkDeleteResult stats;
     986             :     GinState    ginstate;
     987             : 
     988           2 :     if (RecoveryInProgress())
     989           0 :         ereport(ERROR,
     990             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     991             :                  errmsg("recovery is in progress"),
     992             :                  errhint("GIN pending list cannot be cleaned up during recovery.")));
     993             : 
     994             :     /* Must be a GIN index */
     995           4 :     if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
     996           2 :         indexRel->rd_rel->relam != GIN_AM_OID)
     997           0 :         ereport(ERROR,
     998             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     999             :                  errmsg("\"%s\" is not a GIN index",
    1000             :                         RelationGetRelationName(indexRel))));
    1001             : 
    1002             :     /*
    1003             :      * Reject attempts to read non-local temporary relations; we would be
    1004             :      * likely to get wrong data since we have no visibility into the owning
    1005             :      * session's local buffers.
    1006             :      */
    1007           2 :     if (RELATION_IS_OTHER_TEMP(indexRel))
    1008           0 :         ereport(ERROR,
    1009             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1010             :                  errmsg("cannot access temporary indexes of other sessions")));
    1011             : 
    1012             :     /* User must own the index (comparable to privileges needed for VACUUM) */
    1013           2 :     if (!pg_class_ownercheck(indexoid, GetUserId()))
    1014           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
    1015           0 :                        RelationGetRelationName(indexRel));
    1016             : 
    1017           2 :     memset(&stats, 0, sizeof(stats));
    1018           2 :     initGinState(&ginstate, indexRel);
    1019           2 :     ginInsertCleanup(&ginstate, true, true, &stats);
    1020             : 
    1021           2 :     index_close(indexRel, AccessShareLock);
    1022             : 
    1023           2 :     PG_RETURN_INT64((int64) stats.pages_deleted);
    1024             : }

Generated by: LCOV version 1.11