LCOV - code coverage report
Current view: top level - src/backend/access/nbtree - nbtinsert.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 506 625 81.0 %
Date: 2017-09-29 13:40:31 Functions: 13 14 92.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * nbtinsert.c
       4             :  *    Item insertion in Lehman and Yao btrees for Postgres.
       5             :  *
       6             :  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/access/nbtree/nbtinsert.c
      12             :  *
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : 
      16             : #include "postgres.h"
      17             : 
      18             : #include "access/heapam.h"
      19             : #include "access/nbtree.h"
      20             : #include "access/nbtxlog.h"
      21             : #include "access/transam.h"
      22             : #include "access/xloginsert.h"
      23             : #include "miscadmin.h"
      24             : #include "storage/lmgr.h"
      25             : #include "storage/predicate.h"
      26             : #include "utils/tqual.h"
      27             : 
      28             : 
      29             : typedef struct
      30             : {
      31             :     /* context data for _bt_checksplitloc */
      32             :     Size        newitemsz;      /* size of new item to be inserted */
      33             :     int         fillfactor;     /* needed when splitting rightmost page */
      34             :     bool        is_leaf;        /* T if splitting a leaf page */
      35             :     bool        is_rightmost;   /* T if splitting a rightmost page */
      36             :     OffsetNumber newitemoff;    /* where the new item is to be inserted */
      37             :     int         leftspace;      /* space available for items on left page */
      38             :     int         rightspace;     /* space available for items on right page */
      39             :     int         olddataitemstotal;  /* space taken by old items */
      40             : 
      41             :     bool        have_split;     /* found a valid split? */
      42             : 
      43             :     /* these fields valid only if have_split is true */
      44             :     bool        newitemonleft;  /* new item on left or right of best split */
      45             :     OffsetNumber firstright;    /* best split point */
      46             :     int         best_delta;     /* best size delta so far */
      47             : } FindSplitData;
      48             : 
      49             : 
      50             : static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
      51             : 
      52             : static TransactionId _bt_check_unique(Relation rel, IndexTuple itup,
      53             :                  Relation heapRel, Buffer buf, OffsetNumber offset,
      54             :                  ScanKey itup_scankey,
      55             :                  IndexUniqueCheck checkUnique, bool *is_unique,
      56             :                  uint32 *speculativeToken);
      57             : static void _bt_findinsertloc(Relation rel,
      58             :                   Buffer *bufptr,
      59             :                   OffsetNumber *offsetptr,
      60             :                   int keysz,
      61             :                   ScanKey scankey,
      62             :                   IndexTuple newtup,
      63             :                   BTStack stack,
      64             :                   Relation heapRel);
      65             : static void _bt_insertonpg(Relation rel, Buffer buf, Buffer cbuf,
      66             :                BTStack stack,
      67             :                IndexTuple itup,
      68             :                OffsetNumber newitemoff,
      69             :                bool split_only_page);
      70             : static Buffer _bt_split(Relation rel, Buffer buf, Buffer cbuf,
      71             :           OffsetNumber firstright, OffsetNumber newitemoff, Size newitemsz,
      72             :           IndexTuple newitem, bool newitemonleft);
      73             : static void _bt_insert_parent(Relation rel, Buffer buf, Buffer rbuf,
      74             :                   BTStack stack, bool is_root, bool is_only);
      75             : static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
      76             :                  OffsetNumber newitemoff,
      77             :                  Size newitemsz,
      78             :                  bool *newitemonleft);
      79             : static void _bt_checksplitloc(FindSplitData *state,
      80             :                   OffsetNumber firstoldonright, bool newitemonleft,
      81             :                   int dataitemstoleft, Size firstoldonrightsz);
      82             : static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup,
      83             :              OffsetNumber itup_off);
      84             : static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
      85             :             int keysz, ScanKey scankey);
      86             : static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel);
      87             : 
      88             : 
      89             : /*
      90             :  *  _bt_doinsert() -- Handle insertion of a single index tuple in the tree.
      91             :  *
      92             :  *      This routine is called by the public interface routine, btinsert.
      93             :  *      By here, itup is filled in, including the TID.
      94             :  *
      95             :  *      If checkUnique is UNIQUE_CHECK_NO or UNIQUE_CHECK_PARTIAL, this
      96             :  *      will allow duplicates.  Otherwise (UNIQUE_CHECK_YES or
      97             :  *      UNIQUE_CHECK_EXISTING) it will throw error for a duplicate.
      98             :  *      For UNIQUE_CHECK_EXISTING we merely run the duplicate check, and
      99             :  *      don't actually insert.
     100             :  *
     101             :  *      The result value is only significant for UNIQUE_CHECK_PARTIAL:
     102             :  *      it must be TRUE if the entry is known unique, else FALSE.
     103             :  *      (In the current implementation we'll also return TRUE after a
     104             :  *      successful UNIQUE_CHECK_YES or UNIQUE_CHECK_EXISTING call, but
     105             :  *      that's just a coding artifact.)
     106             :  */
     107             : bool
     108      205514 : _bt_doinsert(Relation rel, IndexTuple itup,
     109             :              IndexUniqueCheck checkUnique, Relation heapRel)
     110             : {
     111      205514 :     bool        is_unique = false;
     112      205514 :     int         natts = rel->rd_rel->relnatts;
     113             :     ScanKey     itup_scankey;
     114             :     BTStack     stack;
     115             :     Buffer      buf;
     116             :     OffsetNumber offset;
     117             : 
     118             :     /* we need an insertion scan key to do our search, so build one */
     119      205514 :     itup_scankey = _bt_mkscankey(rel, itup);
     120             : 
     121             : top:
     122             :     /* find the first page containing this key */
     123      205514 :     stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE, NULL);
     124             : 
     125      205514 :     offset = InvalidOffsetNumber;
     126             : 
     127             :     /* trade in our read lock for a write lock */
     128      205514 :     LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     129      205514 :     LockBuffer(buf, BT_WRITE);
     130             : 
     131             :     /*
     132             :      * If the page was split between the time that we surrendered our read
     133             :      * lock and acquired our write lock, then this page may no longer be the
     134             :      * right place for the key we want to insert.  In this case, we need to
     135             :      * move right in the tree.  See Lehman and Yao for an excruciatingly
     136             :      * precise description.
     137             :      */
     138      205514 :     buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
     139             :                         true, stack, BT_WRITE, NULL);
     140             : 
     141             :     /*
     142             :      * If we're not allowing duplicates, make sure the key isn't already in
     143             :      * the index.
     144             :      *
     145             :      * NOTE: obviously, _bt_check_unique can only detect keys that are already
     146             :      * in the index; so it cannot defend against concurrent insertions of the
     147             :      * same key.  We protect against that by means of holding a write lock on
     148             :      * the target page.  Any other would-be inserter of the same key must
     149             :      * acquire a write lock on the same target page, so only one would-be
     150             :      * inserter can be making the check at one time.  Furthermore, once we are
     151             :      * past the check we hold write locks continuously until we have performed
     152             :      * our insertion, so no later inserter can fail to see our insertion.
     153             :      * (This requires some care in _bt_insertonpg.)
     154             :      *
     155             :      * If we must wait for another xact, we release the lock while waiting,
     156             :      * and then must start over completely.
     157             :      *
     158             :      * For a partial uniqueness check, we don't wait for the other xact. Just
     159             :      * let the tuple in and return false for possibly non-unique, or true for
     160             :      * definitely unique.
     161             :      */
     162      205514 :     if (checkUnique != UNIQUE_CHECK_NO)
     163             :     {
     164             :         TransactionId xwait;
     165             :         uint32      speculativeToken;
     166             : 
     167      114495 :         offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
     168      114495 :         xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey,
     169             :                                  checkUnique, &is_unique, &speculativeToken);
     170             : 
     171      114463 :         if (TransactionIdIsValid(xwait))
     172             :         {
     173             :             /* Have to wait for the other guy ... */
     174           0 :             _bt_relbuf(rel, buf);
     175             : 
     176             :             /*
     177             :              * If it's a speculative insertion, wait for it to finish (ie. to
     178             :              * go ahead with the insertion, or kill the tuple).  Otherwise
     179             :              * wait for the transaction to finish as usual.
     180             :              */
     181           0 :             if (speculativeToken)
     182           0 :                 SpeculativeInsertionWait(xwait, speculativeToken);
     183             :             else
     184           0 :                 XactLockTableWait(xwait, rel, &itup->t_tid, XLTW_InsertIndex);
     185             : 
     186             :             /* start over... */
     187           0 :             _bt_freestack(stack);
     188           0 :             goto top;
     189             :         }
     190             :     }
     191             : 
     192      205482 :     if (checkUnique != UNIQUE_CHECK_EXISTING)
     193             :     {
     194             :         /*
     195             :          * The only conflict predicate locking cares about for indexes is when
     196             :          * an index tuple insert conflicts with an existing lock.  Since the
     197             :          * actual location of the insert is hard to predict because of the
     198             :          * random search used to prevent O(N^2) performance when there are
     199             :          * many duplicate entries, we can just use the "first valid" page.
     200             :          */
     201      205473 :         CheckForSerializableConflictIn(rel, NULL, buf);
     202             :         /* do the insertion */
     203      205473 :         _bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup,
     204             :                           stack, heapRel);
     205      205473 :         _bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false);
     206             :     }
     207             :     else
     208             :     {
     209             :         /* just release the buffer */
     210           9 :         _bt_relbuf(rel, buf);
     211             :     }
     212             : 
     213             :     /* be tidy */
     214      205482 :     _bt_freestack(stack);
     215      205482 :     _bt_freeskey(itup_scankey);
     216             : 
     217      205482 :     return is_unique;
     218             : }
     219             : 
     220             : /*
     221             :  *  _bt_check_unique() -- Check for violation of unique index constraint
     222             :  *
     223             :  * offset points to the first possible item that could conflict. It can
     224             :  * also point to end-of-page, which means that the first tuple to check
     225             :  * is the first tuple on the next page.
     226             :  *
     227             :  * Returns InvalidTransactionId if there is no conflict, else an xact ID
     228             :  * we must wait for to see if it commits a conflicting tuple.   If an actual
     229             :  * conflict is detected, no return --- just ereport().  If an xact ID is
     230             :  * returned, and the conflicting tuple still has a speculative insertion in
     231             :  * progress, *speculativeToken is set to non-zero, and the caller can wait for
     232             :  * the verdict on the insertion using SpeculativeInsertionWait().
     233             :  *
     234             :  * However, if checkUnique == UNIQUE_CHECK_PARTIAL, we always return
     235             :  * InvalidTransactionId because we don't want to wait.  In this case we
     236             :  * set *is_unique to false if there is a potential conflict, and the
     237             :  * core code must redo the uniqueness check later.
     238             :  */
     239             : static TransactionId
     240      114495 : _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
     241             :                  Buffer buf, OffsetNumber offset, ScanKey itup_scankey,
     242             :                  IndexUniqueCheck checkUnique, bool *is_unique,
     243             :                  uint32 *speculativeToken)
     244             : {
     245      114495 :     TupleDesc   itupdesc = RelationGetDescr(rel);
     246      114495 :     int         natts = rel->rd_rel->relnatts;
     247             :     SnapshotData SnapshotDirty;
     248             :     OffsetNumber maxoff;
     249             :     Page        page;
     250             :     BTPageOpaque opaque;
     251      114495 :     Buffer      nbuf = InvalidBuffer;
     252      114495 :     bool        found = false;
     253             : 
     254             :     /* Assume unique until we find a duplicate */
     255      114495 :     *is_unique = true;
     256             : 
     257      114495 :     InitDirtySnapshot(SnapshotDirty);
     258             : 
     259      114495 :     page = BufferGetPage(buf);
     260      114495 :     opaque = (BTPageOpaque) PageGetSpecialPointer(page);
     261      114495 :     maxoff = PageGetMaxOffsetNumber(page);
     262             : 
     263             :     /*
     264             :      * Scan over all equal tuples, looking for live conflicts.
     265             :      */
     266             :     for (;;)
     267             :     {
     268             :         ItemId      curitemid;
     269             :         IndexTuple  curitup;
     270             :         BlockNumber nblkno;
     271             : 
     272             :         /*
     273             :          * make sure the offset points to an actual item before trying to
     274             :          * examine it...
     275             :          */
     276      126354 :         if (offset <= maxoff)
     277             :         {
     278       62144 :             curitemid = PageGetItemId(page, offset);
     279             : 
     280             :             /*
     281             :              * We can skip items that are marked killed.
     282             :              *
     283             :              * Formerly, we applied _bt_isequal() before checking the kill
     284             :              * flag, so as to fall out of the item loop as soon as possible.
     285             :              * However, in the presence of heavy update activity an index may
     286             :              * contain many killed items with the same key; running
     287             :              * _bt_isequal() on each killed item gets expensive. Furthermore
     288             :              * it is likely that the non-killed version of each key appears
     289             :              * first, so that we didn't actually get to exit any sooner
     290             :              * anyway. So now we just advance over killed items as quickly as
     291             :              * we can. We only apply _bt_isequal() when we get to a non-killed
     292             :              * item or the end of the page.
     293             :              */
     294       62144 :             if (!ItemIdIsDead(curitemid))
     295             :             {
     296             :                 ItemPointerData htid;
     297             :                 bool        all_dead;
     298             : 
     299             :                 /*
     300             :                  * _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's
     301             :                  * how we handling NULLs - and so we must not use _bt_compare
     302             :                  * in real comparison, but only for ordering/finding items on
     303             :                  * pages. - vadim 03/24/97
     304             :                  */
     305       56401 :                 if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
     306       99878 :                     break;      /* we're past all the equal tuples */
     307             : 
     308             :                 /* okay, we gotta fetch the heap tuple ... */
     309        6462 :                 curitup = (IndexTuple) PageGetItem(page, curitemid);
     310        6462 :                 htid = curitup->t_tid;
     311             : 
     312             :                 /*
     313             :                  * If we are doing a recheck, we expect to find the tuple we
     314             :                  * are rechecking.  It's not a duplicate, but we have to keep
     315             :                  * scanning.
     316             :                  */
     317        6495 :                 if (checkUnique == UNIQUE_CHECK_EXISTING &&
     318          33 :                     ItemPointerCompare(&htid, &itup->t_tid) == 0)
     319             :                 {
     320          14 :                     found = true;
     321             :                 }
     322             : 
     323             :                 /*
     324             :                  * We check the whole HOT-chain to see if there is any tuple
     325             :                  * that satisfies SnapshotDirty.  This is necessary because we
     326             :                  * have just a single index entry for the entire chain.
     327             :                  */
     328        6448 :                 else if (heap_hot_search(&htid, heapRel, &SnapshotDirty,
     329             :                                          &all_dead))
     330             :                 {
     331             :                     TransactionId xwait;
     332             : 
     333             :                     /*
     334             :                      * It is a duplicate. If we are only doing a partial
     335             :                      * check, then don't bother checking if the tuple is being
     336             :                      * updated in another transaction. Just return the fact
     337             :                      * that it is a potential conflict and leave the full
     338             :                      * check till later.
     339             :                      */
     340          46 :                     if (checkUnique == UNIQUE_CHECK_PARTIAL)
     341             :                     {
     342          14 :                         if (nbuf != InvalidBuffer)
     343           0 :                             _bt_relbuf(rel, nbuf);
     344          14 :                         *is_unique = false;
     345          28 :                         return InvalidTransactionId;
     346             :                     }
     347             : 
     348             :                     /*
     349             :                      * If this tuple is being updated by other transaction
     350             :                      * then we have to wait for its commit/abort.
     351             :                      */
     352          64 :                     xwait = (TransactionIdIsValid(SnapshotDirty.xmin)) ?
     353          32 :                         SnapshotDirty.xmin : SnapshotDirty.xmax;
     354             : 
     355          32 :                     if (TransactionIdIsValid(xwait))
     356             :                     {
     357           0 :                         if (nbuf != InvalidBuffer)
     358           0 :                             _bt_relbuf(rel, nbuf);
     359             :                         /* Tell _bt_doinsert to wait... */
     360           0 :                         *speculativeToken = SnapshotDirty.speculativeToken;
     361           0 :                         return xwait;
     362             :                     }
     363             : 
     364             :                     /*
     365             :                      * Otherwise we have a definite conflict.  But before
     366             :                      * complaining, look to see if the tuple we want to insert
     367             :                      * is itself now committed dead --- if so, don't complain.
     368             :                      * This is a waste of time in normal scenarios but we must
     369             :                      * do it to support CREATE INDEX CONCURRENTLY.
     370             :                      *
     371             :                      * We must follow HOT-chains here because during
     372             :                      * concurrent index build, we insert the root TID though
     373             :                      * the actual tuple may be somewhere in the HOT-chain.
     374             :                      * While following the chain we might not stop at the
     375             :                      * exact tuple which triggered the insert, but that's OK
     376             :                      * because if we find a live tuple anywhere in this chain,
     377             :                      * we have a unique key conflict.  The other live tuple is
     378             :                      * not part of this chain because it had a different index
     379             :                      * entry.
     380             :                      */
     381          32 :                     htid = itup->t_tid;
     382          32 :                     if (heap_hot_search(&htid, heapRel, SnapshotSelf, NULL))
     383             :                     {
     384             :                         /* Normal case --- it's still live */
     385             :                     }
     386             :                     else
     387             :                     {
     388             :                         /*
     389             :                          * It's been deleted, so no error, and no need to
     390             :                          * continue searching
     391             :                          */
     392           0 :                         break;
     393             :                     }
     394             : 
     395             :                     /*
     396             :                      * Check for a conflict-in as we would if we were going to
     397             :                      * write to this page.  We aren't actually going to write,
     398             :                      * but we want a chance to report SSI conflicts that would
     399             :                      * otherwise be masked by this unique constraint
     400             :                      * violation.
     401             :                      */
     402          32 :                     CheckForSerializableConflictIn(rel, NULL, buf);
     403             : 
     404             :                     /*
     405             :                      * This is a definite conflict.  Break the tuple down into
     406             :                      * datums and report the error.  But first, make sure we
     407             :                      * release the buffer locks we're holding ---
     408             :                      * BuildIndexValueDescription could make catalog accesses,
     409             :                      * which in the worst case might touch this same index and
     410             :                      * cause deadlocks.
     411             :                      */
     412          32 :                     if (nbuf != InvalidBuffer)
     413           0 :                         _bt_relbuf(rel, nbuf);
     414          32 :                     _bt_relbuf(rel, buf);
     415             : 
     416             :                     {
     417             :                         Datum       values[INDEX_MAX_KEYS];
     418             :                         bool        isnull[INDEX_MAX_KEYS];
     419             :                         char       *key_desc;
     420             : 
     421          32 :                         index_deform_tuple(itup, RelationGetDescr(rel),
     422             :                                            values, isnull);
     423             : 
     424          32 :                         key_desc = BuildIndexValueDescription(rel, values,
     425             :                                                               isnull);
     426             : 
     427          32 :                         ereport(ERROR,
     428             :                                 (errcode(ERRCODE_UNIQUE_VIOLATION),
     429             :                                  errmsg("duplicate key value violates unique constraint \"%s\"",
     430             :                                         RelationGetRelationName(rel)),
     431             :                                  key_desc ? errdetail("Key %s already exists.",
     432             :                                                       key_desc) : 0,
     433             :                                  errtableconstraint(heapRel,
     434             :                                                     RelationGetRelationName(rel))));
     435             :                     }
     436             :                 }
     437        6402 :                 else if (all_dead)
     438             :                 {
     439             :                     /*
     440             :                      * The conflicting tuple (or whole HOT chain) is dead to
     441             :                      * everyone, so we may as well mark the index entry
     442             :                      * killed.
     443             :                      */
     444         260 :                     ItemIdMarkDead(curitemid);
     445         260 :                     opaque->btpo_flags |= BTP_HAS_GARBAGE;
     446             : 
     447             :                     /*
     448             :                      * Mark buffer with a dirty hint, since state is not
     449             :                      * crucial. Be sure to mark the proper buffer dirty.
     450             :                      */
     451         260 :                     if (nbuf != InvalidBuffer)
     452           0 :                         MarkBufferDirtyHint(nbuf, true);
     453             :                     else
     454         260 :                         MarkBufferDirtyHint(buf, true);
     455             :                 }
     456             :             }
     457             :         }
     458             : 
     459             :         /*
     460             :          * Advance to next tuple to continue checking.
     461             :          */
     462       76369 :         if (offset < maxoff)
     463       11855 :             offset = OffsetNumberNext(offset);
     464             :         else
     465             :         {
     466             :             /* If scankey == hikey we gotta check the next page too */
     467       64514 :             if (P_RIGHTMOST(opaque))
     468       64291 :                 break;
     469         223 :             if (!_bt_isequal(itupdesc, page, P_HIKEY,
     470             :                              natts, itup_scankey))
     471         219 :                 break;
     472             :             /* Advance to next non-dead page --- there must be one */
     473             :             for (;;)
     474             :             {
     475           4 :                 nblkno = opaque->btpo_next;
     476           4 :                 nbuf = _bt_relandgetbuf(rel, nbuf, nblkno, BT_READ);
     477           4 :                 page = BufferGetPage(nbuf);
     478           4 :                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
     479           4 :                 if (!P_IGNORE(opaque))
     480           4 :                     break;
     481           0 :                 if (P_RIGHTMOST(opaque))
     482           0 :                     elog(ERROR, "fell off the end of index \"%s\"",
     483             :                          RelationGetRelationName(rel));
     484           0 :             }
     485           4 :             maxoff = PageGetMaxOffsetNumber(page);
     486           4 :             offset = P_FIRSTDATAKEY(opaque);
     487             :         }
     488       11859 :     }
     489             : 
     490             :     /*
     491             :      * If we are doing a recheck then we should have found the tuple we are
     492             :      * checking.  Otherwise there's something very wrong --- probably, the
     493             :      * index is on a non-immutable expression.
     494             :      */
     495      114449 :     if (checkUnique == UNIQUE_CHECK_EXISTING && !found)
     496           0 :         ereport(ERROR,
     497             :                 (errcode(ERRCODE_INTERNAL_ERROR),
     498             :                  errmsg("failed to re-find tuple within index \"%s\"",
     499             :                         RelationGetRelationName(rel)),
     500             :                  errhint("This may be because of a non-immutable index expression."),
     501             :                  errtableconstraint(heapRel,
     502             :                                     RelationGetRelationName(rel))));
     503             : 
     504      114449 :     if (nbuf != InvalidBuffer)
     505           4 :         _bt_relbuf(rel, nbuf);
     506             : 
     507      114449 :     return InvalidTransactionId;
     508             : }
     509             : 
     510             : 
     511             : /*
     512             :  *  _bt_findinsertloc() -- Finds an insert location for a tuple
     513             :  *
     514             :  *      If the new key is equal to one or more existing keys, we can
     515             :  *      legitimately place it anywhere in the series of equal keys --- in fact,
     516             :  *      if the new key is equal to the page's "high key" we can place it on
     517             :  *      the next page.  If it is equal to the high key, and there's not room
     518             :  *      to insert the new tuple on the current page without splitting, then
     519             :  *      we can move right hoping to find more free space and avoid a split.
     520             :  *      (We should not move right indefinitely, however, since that leads to
     521             :  *      O(N^2) insertion behavior in the presence of many equal keys.)
     522             :  *      Once we have chosen the page to put the key on, we'll insert it before
     523             :  *      any existing equal keys because of the way _bt_binsrch() works.
     524             :  *
     525             :  *      If there's not enough room in the space, we try to make room by
     526             :  *      removing any LP_DEAD tuples.
     527             :  *
     528             :  *      On entry, *bufptr and *offsetptr point to the first legal position
     529             :  *      where the new tuple could be inserted.  The caller should hold an
     530             :  *      exclusive lock on *bufptr.  *offsetptr can also be set to
     531             :  *      InvalidOffsetNumber, in which case the function will search for the
     532             :  *      right location within the page if needed.  On exit, they point to the
     533             :  *      chosen insert location.  If _bt_findinsertloc decides to move right,
     534             :  *      the lock and pin on the original page will be released and the new
     535             :  *      page returned to the caller is exclusively locked instead.
     536             :  *
     537             :  *      newtup is the new tuple we're inserting, and scankey is an insertion
     538             :  *      type scan key for it.
     539             :  */
     540             : static void
     541      205473 : _bt_findinsertloc(Relation rel,
     542             :                   Buffer *bufptr,
     543             :                   OffsetNumber *offsetptr,
     544             :                   int keysz,
     545             :                   ScanKey scankey,
     546             :                   IndexTuple newtup,
     547             :                   BTStack stack,
     548             :                   Relation heapRel)
     549             : {
     550      205473 :     Buffer      buf = *bufptr;
     551      205473 :     Page        page = BufferGetPage(buf);
     552             :     Size        itemsz;
     553             :     BTPageOpaque lpageop;
     554             :     bool        movedright,
     555             :                 vacuumed;
     556             :     OffsetNumber newitemoff;
     557      205473 :     OffsetNumber firstlegaloff = *offsetptr;
     558             : 
     559      205473 :     lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
     560             : 
     561      205473 :     itemsz = IndexTupleDSize(*newtup);
     562      205473 :     itemsz = MAXALIGN(itemsz);  /* be safe, PageAddItem will do this but we
     563             :                                  * need to be consistent */
     564             : 
     565             :     /*
     566             :      * Check whether the item can fit on a btree page at all. (Eventually, we
     567             :      * ought to try to apply TOAST methods if not.) We actually need to be
     568             :      * able to fit three items on every page, so restrict any one item to 1/3
     569             :      * the per-page available space. Note that at this point, itemsz doesn't
     570             :      * include the ItemId.
     571             :      *
     572             :      * NOTE: if you change this, see also the similar code in _bt_buildadd().
     573             :      */
     574      205473 :     if (itemsz > BTMaxItemSize(page))
     575           0 :         ereport(ERROR,
     576             :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     577             :                  errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
     578             :                         itemsz, BTMaxItemSize(page),
     579             :                         RelationGetRelationName(rel)),
     580             :                  errhint("Values larger than 1/3 of a buffer page cannot be indexed.\n"
     581             :                          "Consider a function index of an MD5 hash of the value, "
     582             :                          "or use full text indexing."),
     583             :                  errtableconstraint(heapRel,
     584             :                                     RelationGetRelationName(rel))));
     585             : 
     586             :     /*----------
     587             :      * If we will need to split the page to put the item on this page,
     588             :      * check whether we can put the tuple somewhere to the right,
     589             :      * instead.  Keep scanning right until we
     590             :      *      (a) find a page with enough free space,
     591             :      *      (b) reach the last page where the tuple can legally go, or
     592             :      *      (c) get tired of searching.
     593             :      * (c) is not flippant; it is important because if there are many
     594             :      * pages' worth of equal keys, it's better to split one of the early
     595             :      * pages than to scan all the way to the end of the run of equal keys
     596             :      * on every insert.  We implement "get tired" as a random choice,
     597             :      * since stopping after scanning a fixed number of pages wouldn't work
     598             :      * well (we'd never reach the right-hand side of previously split
     599             :      * pages).  Currently the probability of moving right is set at 0.99,
     600             :      * which may seem too high to change the behavior much, but it does an
     601             :      * excellent job of preventing O(N^2) behavior with many equal keys.
     602             :      *----------
     603             :      */
     604      205473 :     movedright = false;
     605      205473 :     vacuumed = false;
     606      412777 :     while (PageGetFreeSpace(page) < itemsz)
     607             :     {
     608             :         Buffer      rbuf;
     609             :         BlockNumber rblkno;
     610             : 
     611             :         /*
     612             :          * before considering moving right, see if we can obtain enough space
     613             :          * by erasing LP_DEAD items
     614             :          */
     615        2936 :         if (P_ISLEAF(lpageop) && P_HAS_GARBAGE(lpageop))
     616             :         {
     617         262 :             _bt_vacuum_one_page(rel, buf, heapRel);
     618             : 
     619             :             /*
     620             :              * remember that we vacuumed this page, because that makes the
     621             :              * hint supplied by the caller invalid
     622             :              */
     623         262 :             vacuumed = true;
     624             : 
     625         262 :             if (PageGetFreeSpace(page) >= itemsz)
     626         261 :                 break;          /* OK, now we have enough space */
     627             :         }
     628             : 
     629             :         /*
     630             :          * nope, so check conditions (b) and (c) enumerated above
     631             :          */
     632        4916 :         if (P_RIGHTMOST(lpageop) ||
     633        4095 :             _bt_compare(rel, keysz, scankey, page, P_HIKEY) != 0 ||
     634        1854 :             random() <= (MAX_RANDOM_VALUE / 100))
     635             :             break;
     636             : 
     637             :         /*
     638             :          * step right to next non-dead page
     639             :          *
     640             :          * must write-lock that page before releasing write lock on current
     641             :          * page; else someone else's _bt_check_unique scan could fail to see
     642             :          * our insertion.  write locks on intermediate dead pages won't do
     643             :          * because we don't know when they will get de-linked from the tree.
     644             :          */
     645        1831 :         rbuf = InvalidBuffer;
     646             : 
     647        1831 :         rblkno = lpageop->btpo_next;
     648             :         for (;;)
     649             :         {
     650        1831 :             rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
     651        1831 :             page = BufferGetPage(rbuf);
     652        1831 :             lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
     653             : 
     654             :             /*
     655             :              * If this page was incompletely split, finish the split now. We
     656             :              * do this while holding a lock on the left sibling, which is not
     657             :              * good because finishing the split could be a fairly lengthy
     658             :              * operation.  But this should happen very seldom.
     659             :              */
     660        1831 :             if (P_INCOMPLETE_SPLIT(lpageop))
     661             :             {
     662           0 :                 _bt_finish_split(rel, rbuf, stack);
     663           0 :                 rbuf = InvalidBuffer;
     664           0 :                 continue;
     665             :             }
     666             : 
     667        1831 :             if (!P_IGNORE(lpageop))
     668        1831 :                 break;
     669           0 :             if (P_RIGHTMOST(lpageop))
     670           0 :                 elog(ERROR, "fell off the end of index \"%s\"",
     671             :                      RelationGetRelationName(rel));
     672             : 
     673           0 :             rblkno = lpageop->btpo_next;
     674           0 :         }
     675        1831 :         _bt_relbuf(rel, buf);
     676        1831 :         buf = rbuf;
     677        1831 :         movedright = true;
     678        1831 :         vacuumed = false;
     679             :     }
     680             : 
     681             :     /*
     682             :      * Now we are on the right page, so find the insert position. If we moved
     683             :      * right at all, we know we should insert at the start of the page. If we
     684             :      * didn't move right, we can use the firstlegaloff hint if the caller
     685             :      * supplied one, unless we vacuumed the page which might have moved tuples
     686             :      * around making the hint invalid. If we didn't move right or can't use
     687             :      * the hint, find the position by searching.
     688             :      */
     689      205473 :     if (movedright)
     690        1785 :         newitemoff = P_FIRSTDATAKEY(lpageop);
     691      203688 :     else if (firstlegaloff != InvalidOffsetNumber && !vacuumed)
     692      114324 :         newitemoff = firstlegaloff;
     693             :     else
     694       89364 :         newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);
     695             : 
     696      205473 :     *bufptr = buf;
     697      205473 :     *offsetptr = newitemoff;
     698      205473 : }
     699             : 
     700             : /*----------
     701             :  *  _bt_insertonpg() -- Insert a tuple on a particular page in the index.
     702             :  *
     703             :  *      This recursive procedure does the following things:
     704             :  *
     705             :  *          +  if necessary, splits the target page (making sure that the
     706             :  *             split is equitable as far as post-insert free space goes).
     707             :  *          +  inserts the tuple.
     708             :  *          +  if the page was split, pops the parent stack, and finds the
     709             :  *             right place to insert the new child pointer (by walking
     710             :  *             right using information stored in the parent stack).
     711             :  *          +  invokes itself with the appropriate tuple for the right
     712             :  *             child page on the parent.
     713             :  *          +  updates the metapage if a true root or fast root is split.
     714             :  *
     715             :  *      On entry, we must have the correct buffer in which to do the
     716             :  *      insertion, and the buffer must be pinned and write-locked.  On return,
     717             :  *      we will have dropped both the pin and the lock on the buffer.
     718             :  *
     719             :  *      When inserting to a non-leaf page, 'cbuf' is the left-sibling of the
     720             :  *      page we're inserting the downlink for.  This function will clear the
     721             :  *      INCOMPLETE_SPLIT flag on it, and release the buffer.
     722             :  *
     723             :  *      The locking interactions in this code are critical.  You should
     724             :  *      grok Lehman and Yao's paper before making any changes.  In addition,
     725             :  *      you need to understand how we disambiguate duplicate keys in this
     726             :  *      implementation, in order to be able to find our location using
     727             :  *      L&Y "move right" operations.  Since we may insert duplicate user
     728             :  *      keys, and since these dups may propagate up the tree, we use the
     729             :  *      'afteritem' parameter to position ourselves correctly for the
     730             :  *      insertion on internal pages.
     731             :  *----------
     732             :  */
     733             : static void
     734      206330 : _bt_insertonpg(Relation rel,
     735             :                Buffer buf,
     736             :                Buffer cbuf,
     737             :                BTStack stack,
     738             :                IndexTuple itup,
     739             :                OffsetNumber newitemoff,
     740             :                bool split_only_page)
     741             : {
     742             :     Page        page;
     743             :     BTPageOpaque lpageop;
     744      206330 :     OffsetNumber firstright = InvalidOffsetNumber;
     745             :     Size        itemsz;
     746             : 
     747      206330 :     page = BufferGetPage(buf);
     748      206330 :     lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
     749             : 
     750             :     /* child buffer must be given iff inserting on an internal page */
     751      206330 :     Assert(P_ISLEAF(lpageop) == !BufferIsValid(cbuf));
     752             : 
     753             :     /* The caller should've finished any incomplete splits already. */
     754      206330 :     if (P_INCOMPLETE_SPLIT(lpageop))
     755           0 :         elog(ERROR, "cannot insert to incompletely split page %u",
     756             :              BufferGetBlockNumber(buf));
     757             : 
     758      206330 :     itemsz = IndexTupleDSize(*itup);
     759      206330 :     itemsz = MAXALIGN(itemsz);  /* be safe, PageAddItem will do this but we
     760             :                                  * need to be consistent */
     761             : 
     762             :     /*
     763             :      * Do we need to split the page to fit the item on it?
     764             :      *
     765             :      * Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its result,
     766             :      * so this comparison is correct even though we appear to be accounting
     767             :      * only for the item and not for its line pointer.
     768             :      */
     769      206330 :     if (PageGetFreeSpace(page) < itemsz)
     770             :     {
     771         888 :         bool        is_root = P_ISROOT(lpageop);
     772         888 :         bool        is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(lpageop);
     773             :         bool        newitemonleft;
     774             :         Buffer      rbuf;
     775             : 
     776             :         /* Choose the split point */
     777         888 :         firstright = _bt_findsplitloc(rel, page,
     778             :                                       newitemoff, itemsz,
     779             :                                       &newitemonleft);
     780             : 
     781             :         /* split the buffer into left and right halves */
     782         888 :         rbuf = _bt_split(rel, buf, cbuf, firstright,
     783             :                          newitemoff, itemsz, itup, newitemonleft);
     784         888 :         PredicateLockPageSplit(rel,
     785             :                                BufferGetBlockNumber(buf),
     786             :                                BufferGetBlockNumber(rbuf));
     787             : 
     788             :         /*----------
     789             :          * By here,
     790             :          *
     791             :          *      +  our target page has been split;
     792             :          *      +  the original tuple has been inserted;
     793             :          *      +  we have write locks on both the old (left half)
     794             :          *         and new (right half) buffers, after the split; and
     795             :          *      +  we know the key we want to insert into the parent
     796             :          *         (it's the "high key" on the left child page).
     797             :          *
     798             :          * We're ready to do the parent insertion.  We need to hold onto the
     799             :          * locks for the child pages until we locate the parent, but we can
     800             :          * release them before doing the actual insertion (see Lehman and Yao
     801             :          * for the reasoning).
     802             :          *----------
     803             :          */
     804         888 :         _bt_insert_parent(rel, buf, rbuf, stack, is_root, is_only);
     805             :     }
     806             :     else
     807             :     {
     808      205442 :         Buffer      metabuf = InvalidBuffer;
     809      205442 :         Page        metapg = NULL;
     810      205442 :         BTMetaPageData *metad = NULL;
     811             :         OffsetNumber itup_off;
     812             :         BlockNumber itup_blkno;
     813             : 
     814      205442 :         itup_off = newitemoff;
     815      205442 :         itup_blkno = BufferGetBlockNumber(buf);
     816             : 
     817             :         /*
     818             :          * If we are doing this insert because we split a page that was the
     819             :          * only one on its tree level, but was not the root, it may have been
     820             :          * the "fast root".  We need to ensure that the fast root link points
     821             :          * at or above the current page.  We can safely acquire a lock on the
     822             :          * metapage here --- see comments for _bt_newroot().
     823             :          */
     824      205442 :         if (split_only_page)
     825             :         {
     826           0 :             Assert(!P_ISLEAF(lpageop));
     827             : 
     828           0 :             metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
     829           0 :             metapg = BufferGetPage(metabuf);
     830           0 :             metad = BTPageGetMeta(metapg);
     831             : 
     832           0 :             if (metad->btm_fastlevel >= lpageop->btpo.level)
     833             :             {
     834             :                 /* no update wanted */
     835           0 :                 _bt_relbuf(rel, metabuf);
     836           0 :                 metabuf = InvalidBuffer;
     837             :             }
     838             :         }
     839             : 
     840             :         /* Do the update.  No ereport(ERROR) until changes are logged */
     841      205442 :         START_CRIT_SECTION();
     842             : 
     843      205442 :         if (!_bt_pgaddtup(page, itemsz, itup, newitemoff))
     844           0 :             elog(PANIC, "failed to add new item to block %u in index \"%s\"",
     845             :                  itup_blkno, RelationGetRelationName(rel));
     846             : 
     847      205442 :         MarkBufferDirty(buf);
     848             : 
     849      205442 :         if (BufferIsValid(metabuf))
     850             :         {
     851           0 :             metad->btm_fastroot = itup_blkno;
     852           0 :             metad->btm_fastlevel = lpageop->btpo.level;
     853           0 :             MarkBufferDirty(metabuf);
     854             :         }
     855             : 
     856             :         /* clear INCOMPLETE_SPLIT flag on child if inserting a downlink */
     857      205442 :         if (BufferIsValid(cbuf))
     858             :         {
     859         813 :             Page        cpage = BufferGetPage(cbuf);
     860         813 :             BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage);
     861             : 
     862         813 :             Assert(P_INCOMPLETE_SPLIT(cpageop));
     863         813 :             cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
     864         813 :             MarkBufferDirty(cbuf);
     865             :         }
     866             : 
     867             :         /* XLOG stuff */
     868      205442 :         if (RelationNeedsWAL(rel))
     869             :         {
     870             :             xl_btree_insert xlrec;
     871             :             xl_btree_metadata xlmeta;
     872             :             uint8       xlinfo;
     873             :             XLogRecPtr  recptr;
     874             :             IndexTupleData trunctuple;
     875             : 
     876      205108 :             xlrec.offnum = itup_off;
     877             : 
     878      205108 :             XLogBeginInsert();
     879      205108 :             XLogRegisterData((char *) &xlrec, SizeOfBtreeInsert);
     880             : 
     881      205108 :             if (P_ISLEAF(lpageop))
     882      204295 :                 xlinfo = XLOG_BTREE_INSERT_LEAF;
     883             :             else
     884             :             {
     885             :                 /*
     886             :                  * Register the left child whose INCOMPLETE_SPLIT flag was
     887             :                  * cleared.
     888             :                  */
     889         813 :                 XLogRegisterBuffer(1, cbuf, REGBUF_STANDARD);
     890             : 
     891         813 :                 xlinfo = XLOG_BTREE_INSERT_UPPER;
     892             :             }
     893             : 
     894      205108 :             if (BufferIsValid(metabuf))
     895             :             {
     896           0 :                 xlmeta.root = metad->btm_root;
     897           0 :                 xlmeta.level = metad->btm_level;
     898           0 :                 xlmeta.fastroot = metad->btm_fastroot;
     899           0 :                 xlmeta.fastlevel = metad->btm_fastlevel;
     900             : 
     901           0 :                 XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT);
     902           0 :                 XLogRegisterBufData(2, (char *) &xlmeta, sizeof(xl_btree_metadata));
     903             : 
     904           0 :                 xlinfo = XLOG_BTREE_INSERT_META;
     905             :             }
     906             : 
     907             :             /* Read comments in _bt_pgaddtup */
     908      205108 :             XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
     909      205108 :             if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
     910             :             {
     911           0 :                 trunctuple = *itup;
     912           0 :                 trunctuple.t_info = sizeof(IndexTupleData);
     913           0 :                 XLogRegisterBufData(0, (char *) &trunctuple,
     914             :                                     sizeof(IndexTupleData));
     915             :             }
     916             :             else
     917      205108 :                 XLogRegisterBufData(0, (char *) itup, IndexTupleDSize(*itup));
     918             : 
     919      205108 :             recptr = XLogInsert(RM_BTREE_ID, xlinfo);
     920             : 
     921      205108 :             if (BufferIsValid(metabuf))
     922             :             {
     923           0 :                 PageSetLSN(metapg, recptr);
     924             :             }
     925      205108 :             if (BufferIsValid(cbuf))
     926             :             {
     927         813 :                 PageSetLSN(BufferGetPage(cbuf), recptr);
     928             :             }
     929             : 
     930      205108 :             PageSetLSN(page, recptr);
     931             :         }
     932             : 
     933      205442 :         END_CRIT_SECTION();
     934             : 
     935             :         /* release buffers */
     936      205442 :         if (BufferIsValid(metabuf))
     937           0 :             _bt_relbuf(rel, metabuf);
     938      205442 :         if (BufferIsValid(cbuf))
     939         813 :             _bt_relbuf(rel, cbuf);
     940      205442 :         _bt_relbuf(rel, buf);
     941             :     }
     942      206330 : }
     943             : 
     944             : /*
     945             :  *  _bt_split() -- split a page in the btree.
     946             :  *
     947             :  *      On entry, buf is the page to split, and is pinned and write-locked.
     948             :  *      firstright is the item index of the first item to be moved to the
     949             :  *      new right page.  newitemoff etc. tell us about the new item that
     950             :  *      must be inserted along with the data from the old page.
     951             :  *
     952             :  *      When splitting a non-leaf page, 'cbuf' is the left-sibling of the
     953             :  *      page we're inserting the downlink for.  This function will clear the
     954             :  *      INCOMPLETE_SPLIT flag on it, and release the buffer.
     955             :  *
     956             :  *      Returns the new right sibling of buf, pinned and write-locked.
     957             :  *      The pin and lock on buf are maintained.
     958             :  */
     959             : static Buffer
     960         888 : _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
     961             :           OffsetNumber newitemoff, Size newitemsz, IndexTuple newitem,
     962             :           bool newitemonleft)
     963             : {
     964             :     Buffer      rbuf;
     965             :     Page        origpage;
     966             :     Page        leftpage,
     967             :                 rightpage;
     968             :     BlockNumber origpagenumber,
     969             :                 rightpagenumber;
     970             :     BTPageOpaque ropaque,
     971             :                 lopaque,
     972             :                 oopaque;
     973         888 :     Buffer      sbuf = InvalidBuffer;
     974         888 :     Page        spage = NULL;
     975         888 :     BTPageOpaque sopaque = NULL;
     976             :     Size        itemsz;
     977             :     ItemId      itemid;
     978             :     IndexTuple  item;
     979             :     OffsetNumber leftoff,
     980             :                 rightoff;
     981             :     OffsetNumber maxoff;
     982             :     OffsetNumber i;
     983             :     bool        isleaf;
     984             : 
     985             :     /* Acquire a new page to split into */
     986         888 :     rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
     987             : 
     988             :     /*
     989             :      * origpage is the original page to be split.  leftpage is a temporary
     990             :      * buffer that receives the left-sibling data, which will be copied back
     991             :      * into origpage on success.  rightpage is the new page that receives the
     992             :      * right-sibling data.  If we fail before reaching the critical section,
     993             :      * origpage hasn't been modified and leftpage is only workspace. In
     994             :      * principle we shouldn't need to worry about rightpage either, because it
     995             :      * hasn't been linked into the btree page structure; but to avoid leaving
     996             :      * possibly-confusing junk behind, we are careful to rewrite rightpage as
     997             :      * zeroes before throwing any error.
     998             :      */
     999         888 :     origpage = BufferGetPage(buf);
    1000         888 :     leftpage = PageGetTempPage(origpage);
    1001         888 :     rightpage = BufferGetPage(rbuf);
    1002             : 
    1003         888 :     origpagenumber = BufferGetBlockNumber(buf);
    1004         888 :     rightpagenumber = BufferGetBlockNumber(rbuf);
    1005             : 
    1006         888 :     _bt_pageinit(leftpage, BufferGetPageSize(buf));
    1007             :     /* rightpage was already initialized by _bt_getbuf */
    1008             : 
    1009             :     /*
    1010             :      * Copy the original page's LSN into leftpage, which will become the
    1011             :      * updated version of the page.  We need this because XLogInsert will
    1012             :      * examine the LSN and possibly dump it in a page image.
    1013             :      */
    1014         888 :     PageSetLSN(leftpage, PageGetLSN(origpage));
    1015             : 
    1016             :     /* init btree private data */
    1017         888 :     oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
    1018         888 :     lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
    1019         888 :     ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
    1020             : 
    1021         888 :     isleaf = P_ISLEAF(oopaque);
    1022             : 
    1023             :     /* if we're splitting this page, it won't be the root when we're done */
    1024             :     /* also, clear the SPLIT_END and HAS_GARBAGE flags in both pages */
    1025         888 :     lopaque->btpo_flags = oopaque->btpo_flags;
    1026         888 :     lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
    1027         888 :     ropaque->btpo_flags = lopaque->btpo_flags;
    1028             :     /* set flag in left page indicating that the right page has no downlink */
    1029         888 :     lopaque->btpo_flags |= BTP_INCOMPLETE_SPLIT;
    1030         888 :     lopaque->btpo_prev = oopaque->btpo_prev;
    1031         888 :     lopaque->btpo_next = rightpagenumber;
    1032         888 :     ropaque->btpo_prev = origpagenumber;
    1033         888 :     ropaque->btpo_next = oopaque->btpo_next;
    1034         888 :     lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
    1035             :     /* Since we already have write-lock on both pages, ok to read cycleid */
    1036         888 :     lopaque->btpo_cycleid = _bt_vacuum_cycleid(rel);
    1037         888 :     ropaque->btpo_cycleid = lopaque->btpo_cycleid;
    1038             : 
    1039             :     /*
    1040             :      * If the page we're splitting is not the rightmost page at its level in
    1041             :      * the tree, then the first entry on the page is the high key for the
    1042             :      * page.  We need to copy that to the right half.  Otherwise (meaning the
    1043             :      * rightmost page case), all the items on the right half will be user
    1044             :      * data.
    1045             :      */
    1046         888 :     rightoff = P_HIKEY;
    1047             : 
    1048         888 :     if (!P_RIGHTMOST(oopaque))
    1049             :     {
    1050         410 :         itemid = PageGetItemId(origpage, P_HIKEY);
    1051         410 :         itemsz = ItemIdGetLength(itemid);
    1052         410 :         item = (IndexTuple) PageGetItem(origpage, itemid);
    1053         410 :         if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
    1054             :                         false, false) == InvalidOffsetNumber)
    1055             :         {
    1056           0 :             memset(rightpage, 0, BufferGetPageSize(rbuf));
    1057           0 :             elog(ERROR, "failed to add hikey to the right sibling"
    1058             :                  " while splitting block %u of index \"%s\"",
    1059             :                  origpagenumber, RelationGetRelationName(rel));
    1060             :         }
    1061         410 :         rightoff = OffsetNumberNext(rightoff);
    1062             :     }
    1063             : 
    1064             :     /*
    1065             :      * The "high key" for the new left page will be the first key that's going
    1066             :      * to go into the new right page.  This might be either the existing data
    1067             :      * item at position firstright, or the incoming tuple.
    1068             :      */
    1069         888 :     leftoff = P_HIKEY;
    1070         888 :     if (!newitemonleft && newitemoff == firstright)
    1071             :     {
    1072             :         /* incoming tuple will become first on right page */
    1073           0 :         itemsz = newitemsz;
    1074           0 :         item = newitem;
    1075             :     }
    1076             :     else
    1077             :     {
    1078             :         /* existing item at firstright will become first on right page */
    1079         888 :         itemid = PageGetItemId(origpage, firstright);
    1080         888 :         itemsz = ItemIdGetLength(itemid);
    1081         888 :         item = (IndexTuple) PageGetItem(origpage, itemid);
    1082             :     }
    1083         888 :     if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
    1084             :                     false, false) == InvalidOffsetNumber)
    1085             :     {
    1086           0 :         memset(rightpage, 0, BufferGetPageSize(rbuf));
    1087           0 :         elog(ERROR, "failed to add hikey to the left sibling"
    1088             :              " while splitting block %u of index \"%s\"",
    1089             :              origpagenumber, RelationGetRelationName(rel));
    1090             :     }
    1091         888 :     leftoff = OffsetNumberNext(leftoff);
    1092             : 
    1093             :     /*
    1094             :      * Now transfer all the data items to the appropriate page.
    1095             :      *
    1096             :      * Note: we *must* insert at least the right page's items in item-number
    1097             :      * order, for the benefit of _bt_restore_page().
    1098             :      */
    1099         888 :     maxoff = PageGetMaxOffsetNumber(origpage);
    1100             : 
    1101      255528 :     for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i))
    1102             :     {
    1103      254640 :         itemid = PageGetItemId(origpage, i);
    1104      254640 :         itemsz = ItemIdGetLength(itemid);
    1105      254640 :         item = (IndexTuple) PageGetItem(origpage, itemid);
    1106             : 
    1107             :         /* does new item belong before this one? */
    1108      254640 :         if (i == newitemoff)
    1109             :         {
    1110         539 :             if (newitemonleft)
    1111             :             {
    1112         149 :                 if (!_bt_pgaddtup(leftpage, newitemsz, newitem, leftoff))
    1113             :                 {
    1114           0 :                     memset(rightpage, 0, BufferGetPageSize(rbuf));
    1115           0 :                     elog(ERROR, "failed to add new item to the left sibling"
    1116             :                          " while splitting block %u of index \"%s\"",
    1117             :                          origpagenumber, RelationGetRelationName(rel));
    1118             :                 }
    1119         149 :                 leftoff = OffsetNumberNext(leftoff);
    1120             :             }
    1121             :             else
    1122             :             {
    1123         390 :                 if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
    1124             :                 {
    1125           0 :                     memset(rightpage, 0, BufferGetPageSize(rbuf));
    1126           0 :                     elog(ERROR, "failed to add new item to the right sibling"
    1127             :                          " while splitting block %u of index \"%s\"",
    1128             :                          origpagenumber, RelationGetRelationName(rel));
    1129             :                 }
    1130         390 :                 rightoff = OffsetNumberNext(rightoff);
    1131             :             }
    1132             :         }
    1133             : 
    1134             :         /* decide which page to put it on */
    1135      254640 :         if (i < firstright)
    1136             :         {
    1137      165711 :             if (!_bt_pgaddtup(leftpage, itemsz, item, leftoff))
    1138             :             {
    1139           0 :                 memset(rightpage, 0, BufferGetPageSize(rbuf));
    1140           0 :                 elog(ERROR, "failed to add old item to the left sibling"
    1141             :                      " while splitting block %u of index \"%s\"",
    1142             :                      origpagenumber, RelationGetRelationName(rel));
    1143             :             }
    1144      165711 :             leftoff = OffsetNumberNext(leftoff);
    1145             :         }
    1146             :         else
    1147             :         {
    1148       88929 :             if (!_bt_pgaddtup(rightpage, itemsz, item, rightoff))
    1149             :             {
    1150           0 :                 memset(rightpage, 0, BufferGetPageSize(rbuf));
    1151           0 :                 elog(ERROR, "failed to add old item to the right sibling"
    1152             :                      " while splitting block %u of index \"%s\"",
    1153             :                      origpagenumber, RelationGetRelationName(rel));
    1154             :             }
    1155       88929 :             rightoff = OffsetNumberNext(rightoff);
    1156             :         }
    1157             :     }
    1158             : 
    1159             :     /* cope with possibility that newitem goes at the end */
    1160         888 :     if (i <= newitemoff)
    1161             :     {
    1162             :         /*
    1163             :          * Can't have newitemonleft here; that would imply we were told to put
    1164             :          * *everything* on the left page, which cannot fit (if it could, we'd
    1165             :          * not be splitting the page).
    1166             :          */
    1167         349 :         Assert(!newitemonleft);
    1168         349 :         if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
    1169             :         {
    1170           0 :             memset(rightpage, 0, BufferGetPageSize(rbuf));
    1171           0 :             elog(ERROR, "failed to add new item to the right sibling"
    1172             :                  " while splitting block %u of index \"%s\"",
    1173             :                  origpagenumber, RelationGetRelationName(rel));
    1174             :         }
    1175         349 :         rightoff = OffsetNumberNext(rightoff);
    1176             :     }
    1177             : 
    1178             :     /*
    1179             :      * We have to grab the right sibling (if any) and fix the prev pointer
    1180             :      * there. We are guaranteed that this is deadlock-free since no other
    1181             :      * writer will be holding a lock on that page and trying to move left, and
    1182             :      * all readers release locks on a page before trying to fetch its
    1183             :      * neighbors.
    1184             :      */
    1185             : 
    1186         888 :     if (!P_RIGHTMOST(oopaque))
    1187             :     {
    1188         410 :         sbuf = _bt_getbuf(rel, oopaque->btpo_next, BT_WRITE);
    1189         410 :         spage = BufferGetPage(sbuf);
    1190         410 :         sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
    1191         410 :         if (sopaque->btpo_prev != origpagenumber)
    1192             :         {
    1193           0 :             memset(rightpage, 0, BufferGetPageSize(rbuf));
    1194           0 :             elog(ERROR, "right sibling's left-link doesn't match: "
    1195             :                  "block %u links to %u instead of expected %u in index \"%s\"",
    1196             :                  oopaque->btpo_next, sopaque->btpo_prev, origpagenumber,
    1197             :                  RelationGetRelationName(rel));
    1198             :         }
    1199             : 
    1200             :         /*
    1201             :          * Check to see if we can set the SPLIT_END flag in the right-hand
    1202             :          * split page; this can save some I/O for vacuum since it need not
    1203             :          * proceed to the right sibling.  We can set the flag if the right
    1204             :          * sibling has a different cycleid: that means it could not be part of
    1205             :          * a group of pages that were all split off from the same ancestor
    1206             :          * page.  If you're confused, imagine that page A splits to A B and
    1207             :          * then again, yielding A C B, while vacuum is in progress.  Tuples
    1208             :          * originally in A could now be in either B or C, hence vacuum must
    1209             :          * examine both pages.  But if D, our right sibling, has a different
    1210             :          * cycleid then it could not contain any tuples that were in A when
    1211             :          * the vacuum started.
    1212             :          */
    1213         410 :         if (sopaque->btpo_cycleid != ropaque->btpo_cycleid)
    1214           0 :             ropaque->btpo_flags |= BTP_SPLIT_END;
    1215             :     }
    1216             : 
    1217             :     /*
    1218             :      * Right sibling is locked, new siblings are prepared, but original page
    1219             :      * is not updated yet.
    1220             :      *
    1221             :      * NO EREPORT(ERROR) till right sibling is updated.  We can get away with
    1222             :      * not starting the critical section till here because we haven't been
    1223             :      * scribbling on the original page yet; see comments above.
    1224             :      */
    1225         888 :     START_CRIT_SECTION();
    1226             : 
    1227             :     /*
    1228             :      * By here, the original data page has been split into two new halves, and
    1229             :      * these are correct.  The algorithm requires that the left page never
    1230             :      * move during a split, so we copy the new left page back on top of the
    1231             :      * original.  Note that this is not a waste of time, since we also require
    1232             :      * (in the page management code) that the center of a page always be
    1233             :      * clean, and the most efficient way to guarantee this is just to compact
    1234             :      * the data by reinserting it into a new left page.  (XXX the latter
    1235             :      * comment is probably obsolete; but in any case it's good to not scribble
    1236             :      * on the original page until we enter the critical section.)
    1237             :      *
    1238             :      * We need to do this before writing the WAL record, so that XLogInsert
    1239             :      * can WAL log an image of the page if necessary.
    1240             :      */
    1241         888 :     PageRestoreTempPage(leftpage, origpage);
    1242             :     /* leftpage, lopaque must not be used below here */
    1243             : 
    1244         888 :     MarkBufferDirty(buf);
    1245         888 :     MarkBufferDirty(rbuf);
    1246             : 
    1247         888 :     if (!P_RIGHTMOST(ropaque))
    1248             :     {
    1249         410 :         sopaque->btpo_prev = rightpagenumber;
    1250         410 :         MarkBufferDirty(sbuf);
    1251             :     }
    1252             : 
    1253             :     /*
    1254             :      * Clear INCOMPLETE_SPLIT flag on child if inserting the new item finishes
    1255             :      * a split.
    1256             :      */
    1257         888 :     if (!isleaf)
    1258             :     {
    1259          44 :         Page        cpage = BufferGetPage(cbuf);
    1260          44 :         BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage);
    1261             : 
    1262          44 :         cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
    1263          44 :         MarkBufferDirty(cbuf);
    1264             :     }
    1265             : 
    1266             :     /* XLOG stuff */
    1267         888 :     if (RelationNeedsWAL(rel))
    1268             :     {
    1269             :         xl_btree_split xlrec;
    1270             :         uint8       xlinfo;
    1271             :         XLogRecPtr  recptr;
    1272             : 
    1273         888 :         xlrec.level = ropaque->btpo.level;
    1274         888 :         xlrec.firstright = firstright;
    1275         888 :         xlrec.newitemoff = newitemoff;
    1276             : 
    1277         888 :         XLogBeginInsert();
    1278         888 :         XLogRegisterData((char *) &xlrec, SizeOfBtreeSplit);
    1279             : 
    1280         888 :         XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
    1281         888 :         XLogRegisterBuffer(1, rbuf, REGBUF_WILL_INIT);
    1282             :         /* Log the right sibling, because we've changed its prev-pointer. */
    1283         888 :         if (!P_RIGHTMOST(ropaque))
    1284         410 :             XLogRegisterBuffer(2, sbuf, REGBUF_STANDARD);
    1285         888 :         if (BufferIsValid(cbuf))
    1286          44 :             XLogRegisterBuffer(3, cbuf, REGBUF_STANDARD);
    1287             : 
    1288             :         /*
    1289             :          * Log the new item, if it was inserted on the left page. (If it was
    1290             :          * put on the right page, we don't need to explicitly WAL log it
    1291             :          * because it's included with all the other items on the right page.)
    1292             :          * Show the new item as belonging to the left page buffer, so that it
    1293             :          * is not stored if XLogInsert decides it needs a full-page image of
    1294             :          * the left page.  We store the offset anyway, though, to support
    1295             :          * archive compression of these records.
    1296             :          */
    1297         888 :         if (newitemonleft)
    1298         149 :             XLogRegisterBufData(0, (char *) newitem, MAXALIGN(newitemsz));
    1299             : 
    1300             :         /* Log left page */
    1301         888 :         if (!isleaf)
    1302             :         {
    1303             :             /*
    1304             :              * We must also log the left page's high key, because the right
    1305             :              * page's leftmost key is suppressed on non-leaf levels.  Show it
    1306             :              * as belonging to the left page buffer, so that it is not stored
    1307             :              * if XLogInsert decides it needs a full-page image of the left
    1308             :              * page.
    1309             :              */
    1310          44 :             itemid = PageGetItemId(origpage, P_HIKEY);
    1311          44 :             item = (IndexTuple) PageGetItem(origpage, itemid);
    1312          44 :             XLogRegisterBufData(0, (char *) item, MAXALIGN(IndexTupleSize(item)));
    1313             :         }
    1314             : 
    1315             :         /*
    1316             :          * Log the contents of the right page in the format understood by
    1317             :          * _bt_restore_page(). We set lastrdata->buffer to InvalidBuffer,
    1318             :          * because we're going to recreate the whole page anyway, so it should
    1319             :          * never be stored by XLogInsert.
    1320             :          *
    1321             :          * Direct access to page is not good but faster - we should implement
    1322             :          * some new func in page API.  Note we only store the tuples
    1323             :          * themselves, knowing that they were inserted in item-number order
    1324             :          * and so the item pointers can be reconstructed.  See comments for
    1325             :          * _bt_restore_page().
    1326             :          */
    1327        2664 :         XLogRegisterBufData(1,
    1328         888 :                             (char *) rightpage + ((PageHeader) rightpage)->pd_upper,
    1329        1776 :                             ((PageHeader) rightpage)->pd_special - ((PageHeader) rightpage)->pd_upper);
    1330             : 
    1331         888 :         xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
    1332         888 :         recptr = XLogInsert(RM_BTREE_ID, xlinfo);
    1333             : 
    1334         888 :         PageSetLSN(origpage, recptr);
    1335         888 :         PageSetLSN(rightpage, recptr);
    1336         888 :         if (!P_RIGHTMOST(ropaque))
    1337             :         {
    1338         410 :             PageSetLSN(spage, recptr);
    1339             :         }
    1340         888 :         if (!isleaf)
    1341             :         {
    1342          44 :             PageSetLSN(BufferGetPage(cbuf), recptr);
    1343             :         }
    1344             :     }
    1345             : 
    1346         888 :     END_CRIT_SECTION();
    1347             : 
    1348             :     /* release the old right sibling */
    1349         888 :     if (!P_RIGHTMOST(ropaque))
    1350         410 :         _bt_relbuf(rel, sbuf);
    1351             : 
    1352             :     /* release the child */
    1353         888 :     if (!isleaf)
    1354          44 :         _bt_relbuf(rel, cbuf);
    1355             : 
    1356             :     /* split's done */
    1357         888 :     return rbuf;
    1358             : }
    1359             : 
    1360             : /*
    1361             :  *  _bt_findsplitloc() -- find an appropriate place to split a page.
    1362             :  *
    1363             :  * The idea here is to equalize the free space that will be on each split
    1364             :  * page, *after accounting for the inserted tuple*.  (If we fail to account
    1365             :  * for it, we might find ourselves with too little room on the page that
    1366             :  * it needs to go into!)
    1367             :  *
    1368             :  * If the page is the rightmost page on its level, we instead try to arrange
    1369             :  * to leave the left split page fillfactor% full.  In this way, when we are
    1370             :  * inserting successively increasing keys (consider sequences, timestamps,
    1371             :  * etc) we will end up with a tree whose pages are about fillfactor% full,
    1372             :  * instead of the 50% full result that we'd get without this special case.
    1373             :  * This is the same as nbtsort.c produces for a newly-created tree.  Note
    1374             :  * that leaf and nonleaf pages use different fillfactors.
    1375             :  *
    1376             :  * We are passed the intended insert position of the new tuple, expressed as
    1377             :  * the offsetnumber of the tuple it must go in front of.  (This could be
    1378             :  * maxoff+1 if the tuple is to go at the end.)
    1379             :  *
    1380             :  * We return the index of the first existing tuple that should go on the
    1381             :  * righthand page, plus a boolean indicating whether the new tuple goes on
    1382             :  * the left or right page.  The bool is necessary to disambiguate the case
    1383             :  * where firstright == newitemoff.
    1384             :  */
    1385             : static OffsetNumber
    1386         888 : _bt_findsplitloc(Relation rel,
    1387             :                  Page page,
    1388             :                  OffsetNumber newitemoff,
    1389             :                  Size newitemsz,
    1390             :                  bool *newitemonleft)
    1391             : {
    1392             :     BTPageOpaque opaque;
    1393             :     OffsetNumber offnum;
    1394             :     OffsetNumber maxoff;
    1395             :     ItemId      itemid;
    1396             :     FindSplitData state;
    1397             :     int         leftspace,
    1398             :                 rightspace,
    1399             :                 goodenough,
    1400             :                 olddataitemstotal,
    1401             :                 olddataitemstoleft;
    1402             :     bool        goodenoughfound;
    1403             : 
    1404         888 :     opaque = (BTPageOpaque) PageGetSpecialPointer(page);
    1405             : 
    1406             :     /* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
    1407         888 :     newitemsz += sizeof(ItemIdData);
    1408             : 
    1409             :     /* Total free space available on a btree page, after fixed overhead */
    1410         888 :     leftspace = rightspace =
    1411         888 :         PageGetPageSize(page) - SizeOfPageHeaderData -
    1412             :         MAXALIGN(sizeof(BTPageOpaqueData));
    1413             : 
    1414             :     /* The right page will have the same high key as the old page */
    1415         888 :     if (!P_RIGHTMOST(opaque))
    1416             :     {
    1417         410 :         itemid = PageGetItemId(page, P_HIKEY);
    1418         410 :         rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
    1419             :                              sizeof(ItemIdData));
    1420             :     }
    1421             : 
    1422             :     /* Count up total space in data items without actually scanning 'em */
    1423         888 :     olddataitemstotal = rightspace - (int) PageGetExactFreeSpace(page);
    1424             : 
    1425         888 :     state.newitemsz = newitemsz;
    1426         888 :     state.is_leaf = P_ISLEAF(opaque);
    1427         888 :     state.is_rightmost = P_RIGHTMOST(opaque);
    1428         888 :     state.have_split = false;
    1429         888 :     if (state.is_leaf)
    1430         844 :         state.fillfactor = RelationGetFillFactor(rel,
    1431             :                                                  BTREE_DEFAULT_FILLFACTOR);
    1432             :     else
    1433          44 :         state.fillfactor = BTREE_NONLEAF_FILLFACTOR;
    1434         888 :     state.newitemonleft = false;    /* these just to keep compiler quiet */
    1435         888 :     state.firstright = 0;
    1436         888 :     state.best_delta = 0;
    1437         888 :     state.leftspace = leftspace;
    1438         888 :     state.rightspace = rightspace;
    1439         888 :     state.olddataitemstotal = olddataitemstotal;
    1440         888 :     state.newitemoff = newitemoff;
    1441             : 
    1442             :     /*
    1443             :      * Finding the best possible split would require checking all the possible
    1444             :      * split points, because of the high-key and left-key special cases.
    1445             :      * That's probably more work than it's worth; instead, stop as soon as we
    1446             :      * find a "good-enough" split, where good-enough is defined as an
    1447             :      * imbalance in free space of no more than pagesize/16 (arbitrary...) This
    1448             :      * should let us stop near the middle on most pages, instead of plowing to
    1449             :      * the end.
    1450             :      */
    1451         888 :     goodenough = leftspace / 16;
    1452             : 
    1453             :     /*
    1454             :      * Scan through the data items and calculate space usage for a split at
    1455             :      * each possible position.
    1456             :      */
    1457         888 :     olddataitemstoleft = 0;
    1458         888 :     goodenoughfound = false;
    1459         888 :     maxoff = PageGetMaxOffsetNumber(page);
    1460             : 
    1461      172849 :     for (offnum = P_FIRSTDATAKEY(opaque);
    1462             :          offnum <= maxoff;
    1463      171073 :          offnum = OffsetNumberNext(offnum))
    1464             :     {
    1465             :         Size        itemsz;
    1466             : 
    1467      171709 :         itemid = PageGetItemId(page, offnum);
    1468      171709 :         itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
    1469             : 
    1470             :         /*
    1471             :          * Will the new item go to left or right of split?
    1472             :          */
    1473      171709 :         if (offnum > newitemoff)
    1474       15204 :             _bt_checksplitloc(&state, offnum, true,
    1475             :                               olddataitemstoleft, itemsz);
    1476             : 
    1477      156505 :         else if (offnum < newitemoff)
    1478      156324 :             _bt_checksplitloc(&state, offnum, false,
    1479             :                               olddataitemstoleft, itemsz);
    1480             :         else
    1481             :         {
    1482             :             /* need to try it both ways! */
    1483         181 :             _bt_checksplitloc(&state, offnum, true,
    1484             :                               olddataitemstoleft, itemsz);
    1485             : 
    1486         181 :             _bt_checksplitloc(&state, offnum, false,
    1487             :                               olddataitemstoleft, itemsz);
    1488             :         }
    1489             : 
    1490             :         /* Abort scan once we find a good-enough choice */
    1491      171709 :         if (state.have_split && state.best_delta <= goodenough)
    1492             :         {
    1493         636 :             goodenoughfound = true;
    1494         636 :             break;
    1495             :         }
    1496             : 
    1497      171073 :         olddataitemstoleft += itemsz;
    1498             :     }
    1499             : 
    1500             :     /*
    1501             :      * If the new item goes as the last item, check for splitting so that all
    1502             :      * the old items go to the left page and the new item goes to the right
    1503             :      * page.
    1504             :      */
    1505         888 :     if (newitemoff > maxoff && !goodenoughfound)
    1506         216 :         _bt_checksplitloc(&state, newitemoff, false, olddataitemstotal, 0);
    1507             : 
    1508             :     /*
    1509             :      * I believe it is not possible to fail to find a feasible split, but just
    1510             :      * in case ...
    1511             :      */
    1512         888 :     if (!state.have_split)
    1513           0 :         elog(ERROR, "could not find a feasible split point for index \"%s\"",
    1514             :              RelationGetRelationName(rel));
    1515             : 
    1516         888 :     *newitemonleft = state.newitemonleft;
    1517         888 :     return state.firstright;
    1518             : }
    1519             : 
    1520             : /*
    1521             :  * Subroutine to analyze a particular possible split choice (ie, firstright
    1522             :  * and newitemonleft settings), and record the best split so far in *state.
    1523             :  *
    1524             :  * firstoldonright is the offset of the first item on the original page
    1525             :  * that goes to the right page, and firstoldonrightsz is the size of that
    1526             :  * tuple. firstoldonright can be > max offset, which means that all the old
    1527             :  * items go to the left page and only the new item goes to the right page.
    1528             :  * In that case, firstoldonrightsz is not used.
    1529             :  *
    1530             :  * olddataitemstoleft is the total size of all old items to the left of
    1531             :  * firstoldonright.
    1532             :  */
    1533             : static void
    1534      172106 : _bt_checksplitloc(FindSplitData *state,
    1535             :                   OffsetNumber firstoldonright,
    1536             :                   bool newitemonleft,
    1537             :                   int olddataitemstoleft,
    1538             :                   Size firstoldonrightsz)
    1539             : {
    1540             :     int         leftfree,
    1541             :                 rightfree;
    1542             :     Size        firstrightitemsz;
    1543             :     bool        newitemisfirstonright;
    1544             : 
    1545             :     /* Is the new item going to be the first item on the right page? */
    1546      344212 :     newitemisfirstonright = (firstoldonright == state->newitemoff
    1547      172106 :                              && !newitemonleft);
    1548             : 
    1549      172106 :     if (newitemisfirstonright)
    1550         397 :         firstrightitemsz = state->newitemsz;
    1551             :     else
    1552      171709 :         firstrightitemsz = firstoldonrightsz;
    1553             : 
    1554             :     /* Account for all the old tuples */
    1555      172106 :     leftfree = state->leftspace - olddataitemstoleft;
    1556      344212 :     rightfree = state->rightspace -
    1557      172106 :         (state->olddataitemstotal - olddataitemstoleft);
    1558             : 
    1559             :     /*
    1560             :      * The first item on the right page becomes the high key of the left page;
    1561             :      * therefore it counts against left space as well as right space.
    1562             :      */
    1563      172106 :     leftfree -= firstrightitemsz;
    1564             : 
    1565             :     /* account for the new item */
    1566      172106 :     if (newitemonleft)
    1567       15385 :         leftfree -= (int) state->newitemsz;
    1568             :     else
    1569      156721 :         rightfree -= (int) state->newitemsz;
    1570             : 
    1571             :     /*
    1572             :      * If we are not on the leaf level, we will be able to discard the key
    1573             :      * data from the first item that winds up on the right page.
    1574             :      */
    1575      172106 :     if (!state->is_leaf)
    1576         291 :         rightfree += (int) firstrightitemsz -
    1577             :             (int) (MAXALIGN(sizeof(IndexTupleData)) + sizeof(ItemIdData));
    1578             : 
    1579             :     /*
    1580             :      * If feasible split point, remember best delta.
    1581             :      */
    1582      172106 :     if (leftfree >= 0 && rightfree >= 0)
    1583             :     {
    1584             :         int         delta;
    1585             : 
    1586      170952 :         if (state->is_rightmost)
    1587             :         {
    1588             :             /*
    1589             :              * If splitting a rightmost page, try to put (100-fillfactor)% of
    1590             :              * free space on left page. See comments for _bt_findsplitloc.
    1591             :              */
    1592      210416 :             delta = (state->fillfactor * leftfree)
    1593      105208 :                 - ((100 - state->fillfactor) * rightfree);
    1594             :         }
    1595             :         else
    1596             :         {
    1597             :             /* Otherwise, aim for equal free space on both sides */
    1598       65744 :             delta = leftfree - rightfree;
    1599             :         }
    1600             : 
    1601      170952 :         if (delta < 0)
    1602        5485 :             delta = -delta;
    1603      170952 :         if (!state->have_split || delta < state->best_delta)
    1604             :         {
    1605      165727 :             state->have_split = true;
    1606      165727 :             state->newitemonleft = newitemonleft;
    1607      165727 :             state->firstright = firstoldonright;
    1608      165727 :             state->best_delta = delta;
    1609             :         }
    1610             :     }
    1611      172106 : }
    1612             : 
    1613             : /*
    1614             :  * _bt_insert_parent() -- Insert downlink into parent after a page split.
    1615             :  *
    1616             :  * On entry, buf and rbuf are the left and right split pages, which we
    1617             :  * still hold write locks on per the L&Y algorithm.  We release the
    1618             :  * write locks once we have write lock on the parent page.  (Any sooner,
    1619             :  * and it'd be possible for some other process to try to split or delete
    1620             :  * one of these pages, and get confused because it cannot find the downlink.)
    1621             :  *
    1622             :  * stack - stack showing how we got here.  May be NULL in cases that don't
    1623             :  *          have to be efficient (concurrent ROOT split, WAL recovery)
    1624             :  * is_root - we split the true root
    1625             :  * is_only - we split a page alone on its level (might have been fast root)
    1626             :  */
    1627             : static void
    1628         888 : _bt_insert_parent(Relation rel,
    1629             :                   Buffer buf,
    1630             :                   Buffer rbuf,
    1631             :                   BTStack stack,
    1632             :                   bool is_root,
    1633             :                   bool is_only)
    1634             : {
    1635             :     /*
    1636             :      * Here we have to do something Lehman and Yao don't talk about: deal with
    1637             :      * a root split and construction of a new root.  If our stack is empty
    1638             :      * then we have just split a node on what had been the root level when we
    1639             :      * descended the tree.  If it was still the root then we perform a
    1640             :      * new-root construction.  If it *wasn't* the root anymore, search to find
    1641             :      * the next higher level that someone constructed meanwhile, and find the
    1642             :      * right place to insert as for the normal case.
    1643             :      *
    1644             :      * If we have to search for the parent level, we do so by re-descending
    1645             :      * from the root.  This is not super-efficient, but it's rare enough not
    1646             :      * to matter.
    1647             :      */
    1648         888 :     if (is_root)
    1649             :     {
    1650             :         Buffer      rootbuf;
    1651             : 
    1652          31 :         Assert(stack == NULL);
    1653          31 :         Assert(is_only);
    1654             :         /* create a new root node and update the metapage */
    1655          31 :         rootbuf = _bt_newroot(rel, buf, rbuf);
    1656             :         /* release the split buffers */
    1657          31 :         _bt_relbuf(rel, rootbuf);
    1658          31 :         _bt_relbuf(rel, rbuf);
    1659          31 :         _bt_relbuf(rel, buf);
    1660             :     }
    1661             :     else
    1662             :     {
    1663         857 :         BlockNumber bknum = BufferGetBlockNumber(buf);
    1664         857 :         BlockNumber rbknum = BufferGetBlockNumber(rbuf);
    1665         857 :         Page        page = BufferGetPage(buf);
    1666             :         IndexTuple  new_item;
    1667             :         BTStackData fakestack;
    1668             :         IndexTuple  ritem;
    1669             :         Buffer      pbuf;
    1670             : 
    1671         857 :         if (stack == NULL)
    1672             :         {
    1673             :             BTPageOpaque lpageop;
    1674             : 
    1675           0 :             elog(DEBUG2, "concurrent ROOT page split");
    1676           0 :             lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
    1677             :             /* Find the leftmost page at the next level up */
    1678           0 :             pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false,
    1679             :                                     NULL);
    1680             :             /* Set up a phony stack entry pointing there */
    1681           0 :             stack = &fakestack;
    1682           0 :             stack->bts_blkno = BufferGetBlockNumber(pbuf);
    1683           0 :             stack->bts_offset = InvalidOffsetNumber;
    1684             :             /* bts_btentry will be initialized below */
    1685           0 :             stack->bts_parent = NULL;
    1686           0 :             _bt_relbuf(rel, pbuf);
    1687             :         }
    1688             : 
    1689             :         /* get high key from left page == lowest key on new right page */
    1690         857 :         ritem = (IndexTuple) PageGetItem(page,
    1691             :                                          PageGetItemId(page, P_HIKEY));
    1692             : 
    1693             :         /* form an index tuple that points at the new right page */
    1694         857 :         new_item = CopyIndexTuple(ritem);
    1695         857 :         ItemPointerSet(&(new_item->t_tid), rbknum, P_HIKEY);
    1696             : 
    1697             :         /*
    1698             :          * Find the parent buffer and get the parent page.
    1699             :          *
    1700             :          * Oops - if we were moved right then we need to change stack item! We
    1701             :          * want to find parent pointing to where we are, right ?    - vadim
    1702             :          * 05/27/97
    1703             :          */
    1704         857 :         ItemPointerSet(&(stack->bts_btentry.t_tid), bknum, P_HIKEY);
    1705         857 :         pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
    1706             : 
    1707             :         /*
    1708             :          * Now we can unlock the right child. The left child will be unlocked
    1709             :          * by _bt_insertonpg().
    1710             :          */
    1711         857 :         _bt_relbuf(rel, rbuf);
    1712             : 
    1713             :         /* Check for error only after writing children */
    1714         857 :         if (pbuf == InvalidBuffer)
    1715           0 :             elog(ERROR, "failed to re-find parent key in index \"%s\" for split pages %u/%u",
    1716             :                  RelationGetRelationName(rel), bknum, rbknum);
    1717             : 
    1718             :         /* Recursively update the parent */
    1719        1714 :         _bt_insertonpg(rel, pbuf, buf, stack->bts_parent,
    1720         857 :                        new_item, stack->bts_offset + 1,
    1721             :                        is_only);
    1722             : 
    1723             :         /* be tidy */
    1724         857 :         pfree(new_item);
    1725             :     }
    1726         888 : }
    1727             : 
    1728             : /*
    1729             :  * _bt_finish_split() -- Finish an incomplete split
    1730             :  *
    1731             :  * A crash or other failure can leave a split incomplete.  The insertion
    1732             :  * routines won't allow to insert on a page that is incompletely split.
    1733             :  * Before inserting on such a page, call _bt_finish_split().
    1734             :  *
    1735             :  * On entry, 'lbuf' must be locked in write-mode.  On exit, it is unlocked
    1736             :  * and unpinned.
    1737             :  */
    1738             : void
    1739           0 : _bt_finish_split(Relation rel, Buffer lbuf, BTStack stack)
    1740             : {
    1741           0 :     Page        lpage = BufferGetPage(lbuf);
    1742           0 :     BTPageOpaque lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage);
    1743             :     Buffer      rbuf;
    1744             :     Page        rpage;
    1745             :     BTPageOpaque rpageop;
    1746             :     bool        was_root;
    1747             :     bool        was_only;
    1748             : 
    1749           0 :     Assert(P_INCOMPLETE_SPLIT(lpageop));
    1750             : 
    1751             :     /* Lock right sibling, the one missing the downlink */
    1752           0 :     rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE);
    1753           0 :     rpage = BufferGetPage(rbuf);
    1754           0 :     rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
    1755             : 
    1756             :     /* Could this be a root split? */
    1757           0 :     if (!stack)
    1758             :     {
    1759             :         Buffer      metabuf;
    1760             :         Page        metapg;
    1761             :         BTMetaPageData *metad;
    1762             : 
    1763             :         /* acquire lock on the metapage */
    1764           0 :         metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
    1765           0 :         metapg = BufferGetPage(metabuf);
    1766           0 :         metad = BTPageGetMeta(metapg);
    1767             : 
    1768           0 :         was_root = (metad->btm_root == BufferGetBlockNumber(lbuf));
    1769             : 
    1770           0 :         _bt_relbuf(rel, metabuf);
    1771             :     }
    1772             :     else
    1773           0 :         was_root = false;
    1774             : 
    1775             :     /* Was this the only page on the level before split? */
    1776           0 :     was_only = (P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop));
    1777             : 
    1778           0 :     elog(DEBUG1, "finishing incomplete split of %u/%u",
    1779             :          BufferGetBlockNumber(lbuf), BufferGetBlockNumber(rbuf));
    1780             : 
    1781           0 :     _bt_insert_parent(rel, lbuf, rbuf, stack, was_root, was_only);
    1782           0 : }
    1783             : 
    1784             : /*
    1785             :  *  _bt_getstackbuf() -- Walk back up the tree one step, and find the item
    1786             :  *                       we last looked at in the parent.
    1787             :  *
    1788             :  *      This is possible because we save the downlink from the parent item,
    1789             :  *      which is enough to uniquely identify it.  Insertions into the parent
    1790             :  *      level could cause the item to move right; deletions could cause it
    1791             :  *      to move left, but not left of the page we previously found it in.
    1792             :  *
    1793             :  *      Adjusts bts_blkno & bts_offset if changed.
    1794             :  *
    1795             :  *      Returns InvalidBuffer if item not found (should not happen).
    1796             :  */
    1797             : Buffer
    1798         900 : _bt_getstackbuf(Relation rel, BTStack stack, int access)
    1799             : {
    1800             :     BlockNumber blkno;
    1801             :     OffsetNumber start;
    1802             : 
    1803         900 :     blkno = stack->bts_blkno;
    1804         900 :     start = stack->bts_offset;
    1805             : 
    1806             :     for (;;)
    1807             :     {
    1808             :         Buffer      buf;
    1809             :         Page        page;
    1810             :         BTPageOpaque opaque;
    1811             : 
    1812         900 :         buf = _bt_getbuf(rel, blkno, access);
    1813         900 :         page = BufferGetPage(buf);
    1814         900 :         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
    1815             : 
    1816         900 :         if (access == BT_WRITE && P_INCOMPLETE_SPLIT(opaque))
    1817             :         {
    1818           0 :             _bt_finish_split(rel, buf, stack->bts_parent);
    1819           0 :             continue;
    1820             :         }
    1821             : 
    1822         900 :         if (!P_IGNORE(opaque))
    1823             :         {
    1824             :             OffsetNumber offnum,
    1825             :                         minoff,
    1826             :                         maxoff;
    1827             :             ItemId      itemid;
    1828             :             IndexTuple  item;
    1829             : 
    1830         900 :             minoff = P_FIRSTDATAKEY(opaque);
    1831         900 :             maxoff = PageGetMaxOffsetNumber(page);
    1832             : 
    1833             :             /*
    1834             :              * start = InvalidOffsetNumber means "search the whole page". We
    1835             :              * need this test anyway due to possibility that page has a high
    1836             :              * key now when it didn't before.
    1837             :              */
    1838         900 :             if (start < minoff)
    1839           0 :                 start = minoff;
    1840             : 
    1841             :             /*
    1842             :              * Need this check too, to guard against possibility that page
    1843             :              * split since we visited it originally.
    1844             :              */
    1845         900 :             if (start > maxoff)
    1846           0 :                 start = OffsetNumberNext(maxoff);
    1847             : 
    1848             :             /*
    1849             :              * These loops will check every item on the page --- but in an
    1850             :              * order that's attuned to the probability of where it actually
    1851             :              * is.  Scan to the right first, then to the left.
    1852             :              */
    1853        1801 :             for (offnum = start;
    1854             :                  offnum <= maxoff;
    1855           1 :                  offnum = OffsetNumberNext(offnum))
    1856             :             {
    1857         901 :                 itemid = PageGetItemId(page, offnum);
    1858         901 :                 item = (IndexTuple) PageGetItem(page, itemid);
    1859         901 :                 if (BTEntrySame(item, &stack->bts_btentry))
    1860             :                 {
    1861             :                     /* Return accurate pointer to where link is now */
    1862         900 :                     stack->bts_blkno = blkno;
    1863         900 :                     stack->bts_offset = offnum;
    1864         900 :                     return buf;
    1865             :                 }
    1866             :             }
    1867             : 
    1868           0 :             for (offnum = OffsetNumberPrev(start);
    1869             :                  offnum >= minoff;
    1870           0 :                  offnum = OffsetNumberPrev(offnum))
    1871             :             {
    1872           0 :                 itemid = PageGetItemId(page, offnum);
    1873           0 :                 item = (IndexTuple) PageGetItem(page, itemid);
    1874           0 :                 if (BTEntrySame(item, &stack->bts_btentry))
    1875             :                 {
    1876             :                     /* Return accurate pointer to where link is now */
    1877           0 :                     stack->bts_blkno = blkno;
    1878           0 :                     stack->bts_offset = offnum;
    1879           0 :                     return buf;
    1880             :                 }
    1881             :             }
    1882             :         }
    1883             : 
    1884             :         /*
    1885             :          * The item we're looking for moved right at least one page.
    1886             :          */
    1887           0 :         if (P_RIGHTMOST(opaque))
    1888             :         {
    1889           0 :             _bt_relbuf(rel, buf);
    1890           0 :             return InvalidBuffer;
    1891             :         }
    1892           0 :         blkno = opaque->btpo_next;
    1893           0 :         start = InvalidOffsetNumber;
    1894           0 :         _bt_relbuf(rel, buf);
    1895           0 :     }
    1896             : }
    1897             : 
    1898             : /*
    1899             :  *  _bt_newroot() -- Create a new root page for the index.
    1900             :  *
    1901             :  *      We've just split the old root page and need to create a new one.
    1902             :  *      In order to do this, we add a new root page to the file, then lock
    1903             :  *      the metadata page and update it.  This is guaranteed to be deadlock-
    1904             :  *      free, because all readers release their locks on the metadata page
    1905             :  *      before trying to lock the root, and all writers lock the root before
    1906             :  *      trying to lock the metadata page.  We have a write lock on the old
    1907             :  *      root page, so we have not introduced any cycles into the waits-for
    1908             :  *      graph.
    1909             :  *
    1910             :  *      On entry, lbuf (the old root) and rbuf (its new peer) are write-
    1911             :  *      locked. On exit, a new root page exists with entries for the
    1912             :  *      two new children, metapage is updated and unlocked/unpinned.
    1913             :  *      The new root buffer is returned to caller which has to unlock/unpin
    1914             :  *      lbuf, rbuf & rootbuf.
    1915             :  */
    1916             : static Buffer
    1917          31 : _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
    1918             : {
    1919             :     Buffer      rootbuf;
    1920             :     Page        lpage,
    1921             :                 rootpage;
    1922             :     BlockNumber lbkno,
    1923             :                 rbkno;
    1924             :     BlockNumber rootblknum;
    1925             :     BTPageOpaque rootopaque;
    1926             :     BTPageOpaque lopaque;
    1927             :     ItemId      itemid;
    1928             :     IndexTuple  item;
    1929             :     IndexTuple  left_item;
    1930             :     Size        left_item_sz;
    1931             :     IndexTuple  right_item;
    1932             :     Size        right_item_sz;
    1933             :     Buffer      metabuf;
    1934             :     Page        metapg;
    1935             :     BTMetaPageData *metad;
    1936             : 
    1937          31 :     lbkno = BufferGetBlockNumber(lbuf);
    1938          31 :     rbkno = BufferGetBlockNumber(rbuf);
    1939          31 :     lpage = BufferGetPage(lbuf);
    1940          31 :     lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
    1941             : 
    1942             :     /* get a new root page */
    1943          31 :     rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
    1944          31 :     rootpage = BufferGetPage(rootbuf);
    1945          31 :     rootblknum = BufferGetBlockNumber(rootbuf);
    1946             : 
    1947             :     /* acquire lock on the metapage */
    1948          31 :     metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
    1949          31 :     metapg = BufferGetPage(metabuf);
    1950          31 :     metad = BTPageGetMeta(metapg);
    1951             : 
    1952             :     /*
    1953             :      * Create downlink item for left page (old root).  Since this will be the
    1954             :      * first item in a non-leaf page, it implicitly has minus-infinity key
    1955             :      * value, so we need not store any actual key in it.
    1956             :      */
    1957          31 :     left_item_sz = sizeof(IndexTupleData);
    1958          31 :     left_item = (IndexTuple) palloc(left_item_sz);
    1959          31 :     left_item->t_info = left_item_sz;
    1960          31 :     ItemPointerSet(&(left_item->t_tid), lbkno, P_HIKEY);
    1961             : 
    1962             :     /*
    1963             :      * Create downlink item for right page.  The key for it is obtained from
    1964             :      * the "high key" position in the left page.
    1965             :      */
    1966          31 :     itemid = PageGetItemId(lpage, P_HIKEY);
    1967          31 :     right_item_sz = ItemIdGetLength(itemid);
    1968          31 :     item = (IndexTuple) PageGetItem(lpage, itemid);
    1969          31 :     right_item = CopyIndexTuple(item);
    1970          31 :     ItemPointerSet(&(right_item->t_tid), rbkno, P_HIKEY);
    1971             : 
    1972             :     /* NO EREPORT(ERROR) from here till newroot op is logged */
    1973          31 :     START_CRIT_SECTION();
    1974             : 
    1975             :     /* set btree special data */
    1976          31 :     rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
    1977          31 :     rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
    1978          31 :     rootopaque->btpo_flags = BTP_ROOT;
    1979          31 :     rootopaque->btpo.level =
    1980          31 :         ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo.level + 1;
    1981          31 :     rootopaque->btpo_cycleid = 0;
    1982             : 
    1983             :     /* update metapage data */
    1984          31 :     metad->btm_root = rootblknum;
    1985          31 :     metad->btm_level = rootopaque->btpo.level;
    1986          31 :     metad->btm_fastroot = rootblknum;
    1987          31 :     metad->btm_fastlevel = rootopaque->btpo.level;
    1988             : 
    1989             :     /*
    1990             :      * Insert the left page pointer into the new root page.  The root page is
    1991             :      * the rightmost page on its level so there is no "high key" in it; the
    1992             :      * two items will go into positions P_HIKEY and P_FIRSTKEY.
    1993             :      *
    1994             :      * Note: we *must* insert the two items in item-number order, for the
    1995             :      * benefit of _bt_restore_page().
    1996             :      */
    1997          31 :     if (PageAddItem(rootpage, (Item) left_item, left_item_sz, P_HIKEY,
    1998             :                     false, false) == InvalidOffsetNumber)
    1999           0 :         elog(PANIC, "failed to add leftkey to new root page"
    2000             :              " while splitting block %u of index \"%s\"",
    2001             :              BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
    2002             : 
    2003             :     /*
    2004             :      * insert the right page pointer into the new root page.
    2005             :      */
    2006          31 :     if (PageAddItem(rootpage, (Item) right_item, right_item_sz, P_FIRSTKEY,
    2007             :                     false, false) == InvalidOffsetNumber)
    2008           0 :         elog(PANIC, "failed to add rightkey to new root page"
    2009             :              " while splitting block %u of index \"%s\"",
    2010             :              BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
    2011             : 
    2012             :     /* Clear the incomplete-split flag in the left child */
    2013          31 :     Assert(P_INCOMPLETE_SPLIT(lopaque));
    2014          31 :     lopaque->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
    2015          31 :     MarkBufferDirty(lbuf);
    2016             : 
    2017          31 :     MarkBufferDirty(rootbuf);
    2018          31 :     MarkBufferDirty(metabuf);
    2019             : 
    2020             :     /* XLOG stuff */
    2021          31 :     if (RelationNeedsWAL(rel))
    2022             :     {
    2023             :         xl_btree_newroot xlrec;
    2024             :         XLogRecPtr  recptr;
    2025             :         xl_btree_metadata md;
    2026             : 
    2027          31 :         xlrec.rootblk = rootblknum;
    2028          31 :         xlrec.level = metad->btm_level;
    2029             : 
    2030          31 :         XLogBeginInsert();
    2031          31 :         XLogRegisterData((char *) &xlrec, SizeOfBtreeNewroot);
    2032             : 
    2033          31 :         XLogRegisterBuffer(0, rootbuf, REGBUF_WILL_INIT);
    2034          31 :         XLogRegisterBuffer(1, lbuf, REGBUF_STANDARD);
    2035          31 :         XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT);
    2036             : 
    2037          31 :         md.root = rootblknum;
    2038          31 :         md.level = metad->btm_level;
    2039          31 :         md.fastroot = rootblknum;
    2040          31 :         md.fastlevel = metad->btm_level;
    2041             : 
    2042          31 :         XLogRegisterBufData(2, (char *) &md, sizeof(xl_btree_metadata));
    2043             : 
    2044             :         /*
    2045             :          * Direct access to page is not good but faster - we should implement
    2046             :          * some new func in page API.
    2047             :          */
    2048          93 :         XLogRegisterBufData(0,
    2049          31 :                             (char *) rootpage + ((PageHeader) rootpage)->pd_upper,
    2050          31 :                             ((PageHeader) rootpage)->pd_special -
    2051          31 :                             ((PageHeader) rootpage)->pd_upper);
    2052             : 
    2053          31 :         recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT);
    2054             : 
    2055          31 :         PageSetLSN(lpage, recptr);
    2056          31 :         PageSetLSN(rootpage, recptr);
    2057          31 :         PageSetLSN(metapg, recptr);
    2058             :     }
    2059             : 
    2060          31 :     END_CRIT_SECTION();
    2061             : 
    2062             :     /* done with metapage */
    2063          31 :     _bt_relbuf(rel, metabuf);
    2064             : 
    2065          31 :     pfree(left_item);
    2066          31 :     pfree(right_item);
    2067             : 
    2068          31 :     return rootbuf;
    2069             : }
    2070             : 
    2071             : /*
    2072             :  *  _bt_pgaddtup() -- add a tuple to a particular page in the index.
    2073             :  *
    2074             :  *      This routine adds the tuple to the page as requested.  It does
    2075             :  *      not affect pin/lock status, but you'd better have a write lock
    2076             :  *      and pin on the target buffer!  Don't forget to write and release
    2077             :  *      the buffer afterwards, either.
    2078             :  *
    2079             :  *      The main difference between this routine and a bare PageAddItem call
    2080             :  *      is that this code knows that the leftmost index tuple on a non-leaf
    2081             :  *      btree page doesn't need to have a key.  Therefore, it strips such
    2082             :  *      tuples down to just the tuple header.  CAUTION: this works ONLY if
    2083             :  *      we insert the tuples in order, so that the given itup_off does
    2084             :  *      represent the final position of the tuple!
    2085             :  */
    2086             : static bool
    2087      460970 : _bt_pgaddtup(Page page,
    2088             :              Size itemsize,
    2089             :              IndexTuple itup,
    2090             :              OffsetNumber itup_off)
    2091             : {
    2092      460970 :     BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
    2093             :     IndexTupleData trunctuple;
    2094             : 
    2095      460970 :     if (!P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque))
    2096             :     {
    2097          88 :         trunctuple = *itup;
    2098          88 :         trunctuple.t_info = sizeof(IndexTupleData);
    2099          88 :         itup = &trunctuple;
    2100          88 :         itemsize = sizeof(IndexTupleData);
    2101             :     }
    2102             : 
    2103      460970 :     if (PageAddItem(page, (Item) itup, itemsize, itup_off,
    2104             :                     false, false) == InvalidOffsetNumber)
    2105           0 :         return false;
    2106             : 
    2107      460970 :     return true;
    2108             : }
    2109             : 
    2110             : /*
    2111             :  * _bt_isequal - used in _bt_doinsert in check for duplicates.
    2112             :  *
    2113             :  * This is very similar to _bt_compare, except for NULL handling.
    2114             :  * Rule is simple: NOT_NULL not equal NULL, NULL not equal NULL too.
    2115             :  */
    2116             : static bool
    2117       56624 : _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
    2118             :             int keysz, ScanKey scankey)
    2119             : {
    2120             :     IndexTuple  itup;
    2121             :     int         i;
    2122             : 
    2123             :     /* Better be comparing to a leaf item */
    2124       56624 :     Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page)));
    2125             : 
    2126       56624 :     itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
    2127             : 
    2128      197704 :     for (i = 1; i <= keysz; i++)
    2129             :     {
    2130             :         AttrNumber  attno;
    2131             :         Datum       datum;
    2132             :         bool        isNull;
    2133             :         int32       result;
    2134             : 
    2135       92386 :         attno = scankey->sk_attno;
    2136       92386 :         Assert(attno == i);
    2137       92386 :         datum = index_getattr(itup, attno, itupdesc, &isNull);
    2138             : 
    2139             :         /* NULLs are never equal to anything */
    2140       92386 :         if (isNull || (scankey->sk_flags & SK_ISNULL))
    2141       50176 :             return false;
    2142             : 
    2143       92368 :         result = DatumGetInt32(FunctionCall2Coll(&scankey->sk_func,
    2144             :                                                  scankey->sk_collation,
    2145             :                                                  datum,
    2146             :                                                  scankey->sk_argument));
    2147             : 
    2148       92368 :         if (result != 0)
    2149       50140 :             return false;
    2150             : 
    2151       42228 :         scankey++;
    2152             :     }
    2153             : 
    2154             :     /* if we get here, the keys are equal */
    2155        6466 :     return true;
    2156             : }
    2157             : 
    2158             : /*
    2159             :  * _bt_vacuum_one_page - vacuum just one index page.
    2160             :  *
    2161             :  * Try to remove LP_DEAD items from the given page.  The passed buffer
    2162             :  * must be exclusive-locked, but unlike a real VACUUM, we don't need a
    2163             :  * super-exclusive "cleanup" lock (see nbtree/README).
    2164             :  */
    2165             : static void
    2166         262 : _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel)
    2167             : {
    2168             :     OffsetNumber deletable[MaxOffsetNumber];
    2169         262 :     int         ndeletable = 0;
    2170             :     OffsetNumber offnum,
    2171             :                 minoff,
    2172             :                 maxoff;
    2173         262 :     Page        page = BufferGetPage(buffer);
    2174         262 :     BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
    2175             : 
    2176             :     /*
    2177             :      * Scan over all items to see which ones need to be deleted according to
    2178             :      * LP_DEAD flags.
    2179             :      */
    2180         262 :     minoff = P_FIRSTDATAKEY(opaque);
    2181         262 :     maxoff = PageGetMaxOffsetNumber(page);
    2182       87607 :     for (offnum = minoff;
    2183             :          offnum <= maxoff;
    2184       87083 :          offnum = OffsetNumberNext(offnum))
    2185             :     {
    2186       87083 :         ItemId      itemId = PageGetItemId(page, offnum);
    2187             : 
    2188       87083 :         if (ItemIdIsDead(itemId))
    2189        3149 :             deletable[ndeletable++] = offnum;
    2190             :     }
    2191             : 
    2192         262 :     if (ndeletable > 0)
    2193         262 :         _bt_delitems_delete(rel, buffer, deletable, ndeletable, heapRel);
    2194             : 
    2195             :     /*
    2196             :      * Note: if we didn't find any LP_DEAD items, then the page's
    2197             :      * BTP_HAS_GARBAGE hint bit is falsely set.  We do not bother expending a
    2198             :      * separate write to clear it, however.  We will clear it when we split
    2199             :      * the page.
    2200             :      */
    2201         262 : }

Generated by: LCOV version 1.11