Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nbtinsert.c
4 : * Item insertion in Lehman and Yao btrees for Postgres.
5 : *
6 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/access/nbtree/nbtinsert.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : #include "postgres.h"
17 :
18 : #include "access/heapam.h"
19 : #include "access/nbtree.h"
20 : #include "access/nbtxlog.h"
21 : #include "access/transam.h"
22 : #include "access/xloginsert.h"
23 : #include "miscadmin.h"
24 : #include "storage/lmgr.h"
25 : #include "storage/predicate.h"
26 : #include "utils/tqual.h"
27 :
28 :
29 : typedef struct
30 : {
31 : /* context data for _bt_checksplitloc */
32 : Size newitemsz; /* size of new item to be inserted */
33 : int fillfactor; /* needed when splitting rightmost page */
34 : bool is_leaf; /* T if splitting a leaf page */
35 : bool is_rightmost; /* T if splitting a rightmost page */
36 : OffsetNumber newitemoff; /* where the new item is to be inserted */
37 : int leftspace; /* space available for items on left page */
38 : int rightspace; /* space available for items on right page */
39 : int olddataitemstotal; /* space taken by old items */
40 :
41 : bool have_split; /* found a valid split? */
42 :
43 : /* these fields valid only if have_split is true */
44 : bool newitemonleft; /* new item on left or right of best split */
45 : OffsetNumber firstright; /* best split point */
46 : int best_delta; /* best size delta so far */
47 : } FindSplitData;
48 :
49 :
50 : static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
51 :
52 : static TransactionId _bt_check_unique(Relation rel, IndexTuple itup,
53 : Relation heapRel, Buffer buf, OffsetNumber offset,
54 : ScanKey itup_scankey,
55 : IndexUniqueCheck checkUnique, bool *is_unique,
56 : uint32 *speculativeToken);
57 : static void _bt_findinsertloc(Relation rel,
58 : Buffer *bufptr,
59 : OffsetNumber *offsetptr,
60 : int keysz,
61 : ScanKey scankey,
62 : IndexTuple newtup,
63 : BTStack stack,
64 : Relation heapRel);
65 : static void _bt_insertonpg(Relation rel, Buffer buf, Buffer cbuf,
66 : BTStack stack,
67 : IndexTuple itup,
68 : OffsetNumber newitemoff,
69 : bool split_only_page);
70 : static Buffer _bt_split(Relation rel, Buffer buf, Buffer cbuf,
71 : OffsetNumber firstright, OffsetNumber newitemoff, Size newitemsz,
72 : IndexTuple newitem, bool newitemonleft);
73 : static void _bt_insert_parent(Relation rel, Buffer buf, Buffer rbuf,
74 : BTStack stack, bool is_root, bool is_only);
75 : static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
76 : OffsetNumber newitemoff,
77 : Size newitemsz,
78 : bool *newitemonleft);
79 : static void _bt_checksplitloc(FindSplitData *state,
80 : OffsetNumber firstoldonright, bool newitemonleft,
81 : int dataitemstoleft, Size firstoldonrightsz);
82 : static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup,
83 : OffsetNumber itup_off);
84 : static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
85 : int keysz, ScanKey scankey);
86 : static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel);
87 :
88 :
89 : /*
90 : * _bt_doinsert() -- Handle insertion of a single index tuple in the tree.
91 : *
92 : * This routine is called by the public interface routine, btinsert.
93 : * By here, itup is filled in, including the TID.
94 : *
95 : * If checkUnique is UNIQUE_CHECK_NO or UNIQUE_CHECK_PARTIAL, this
96 : * will allow duplicates. Otherwise (UNIQUE_CHECK_YES or
97 : * UNIQUE_CHECK_EXISTING) it will throw error for a duplicate.
98 : * For UNIQUE_CHECK_EXISTING we merely run the duplicate check, and
99 : * don't actually insert.
100 : *
101 : * The result value is only significant for UNIQUE_CHECK_PARTIAL:
102 : * it must be TRUE if the entry is known unique, else FALSE.
103 : * (In the current implementation we'll also return TRUE after a
104 : * successful UNIQUE_CHECK_YES or UNIQUE_CHECK_EXISTING call, but
105 : * that's just a coding artifact.)
106 : */
107 : bool
108 206009 : _bt_doinsert(Relation rel, IndexTuple itup,
109 : IndexUniqueCheck checkUnique, Relation heapRel)
110 : {
111 206009 : bool is_unique = false;
112 206009 : int natts = rel->rd_rel->relnatts;
113 : ScanKey itup_scankey;
114 : BTStack stack;
115 : Buffer buf;
116 : OffsetNumber offset;
117 :
118 : /* we need an insertion scan key to do our search, so build one */
119 206009 : itup_scankey = _bt_mkscankey(rel, itup);
120 :
121 : top:
122 : /* find the first page containing this key */
123 206009 : stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE, NULL);
124 :
125 206009 : offset = InvalidOffsetNumber;
126 :
127 : /* trade in our read lock for a write lock */
128 206009 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
129 206009 : LockBuffer(buf, BT_WRITE);
130 :
131 : /*
132 : * If the page was split between the time that we surrendered our read
133 : * lock and acquired our write lock, then this page may no longer be the
134 : * right place for the key we want to insert. In this case, we need to
135 : * move right in the tree. See Lehman and Yao for an excruciatingly
136 : * precise description.
137 : */
138 206009 : buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
139 : true, stack, BT_WRITE, NULL);
140 :
141 : /*
142 : * If we're not allowing duplicates, make sure the key isn't already in
143 : * the index.
144 : *
145 : * NOTE: obviously, _bt_check_unique can only detect keys that are already
146 : * in the index; so it cannot defend against concurrent insertions of the
147 : * same key. We protect against that by means of holding a write lock on
148 : * the target page. Any other would-be inserter of the same key must
149 : * acquire a write lock on the same target page, so only one would-be
150 : * inserter can be making the check at one time. Furthermore, once we are
151 : * past the check we hold write locks continuously until we have performed
152 : * our insertion, so no later inserter can fail to see our insertion.
153 : * (This requires some care in _bt_insertonpg.)
154 : *
155 : * If we must wait for another xact, we release the lock while waiting,
156 : * and then must start over completely.
157 : *
158 : * For a partial uniqueness check, we don't wait for the other xact. Just
159 : * let the tuple in and return false for possibly non-unique, or true for
160 : * definitely unique.
161 : */
162 206009 : if (checkUnique != UNIQUE_CHECK_NO)
163 : {
164 : TransactionId xwait;
165 : uint32 speculativeToken;
166 :
167 114888 : offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
168 114888 : xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey,
169 : checkUnique, &is_unique, &speculativeToken);
170 :
171 114856 : if (TransactionIdIsValid(xwait))
172 : {
173 : /* Have to wait for the other guy ... */
174 0 : _bt_relbuf(rel, buf);
175 :
176 : /*
177 : * If it's a speculative insertion, wait for it to finish (ie. to
178 : * go ahead with the insertion, or kill the tuple). Otherwise
179 : * wait for the transaction to finish as usual.
180 : */
181 0 : if (speculativeToken)
182 0 : SpeculativeInsertionWait(xwait, speculativeToken);
183 : else
184 0 : XactLockTableWait(xwait, rel, &itup->t_tid, XLTW_InsertIndex);
185 :
186 : /* start over... */
187 0 : _bt_freestack(stack);
188 0 : goto top;
189 : }
190 : }
191 :
192 205977 : if (checkUnique != UNIQUE_CHECK_EXISTING)
193 : {
194 : /*
195 : * The only conflict predicate locking cares about for indexes is when
196 : * an index tuple insert conflicts with an existing lock. Since the
197 : * actual location of the insert is hard to predict because of the
198 : * random search used to prevent O(N^2) performance when there are
199 : * many duplicate entries, we can just use the "first valid" page.
200 : */
201 205968 : CheckForSerializableConflictIn(rel, NULL, buf);
202 : /* do the insertion */
203 205968 : _bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup,
204 : stack, heapRel);
205 205968 : _bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false);
206 : }
207 : else
208 : {
209 : /* just release the buffer */
210 9 : _bt_relbuf(rel, buf);
211 : }
212 :
213 : /* be tidy */
214 205977 : _bt_freestack(stack);
215 205977 : _bt_freeskey(itup_scankey);
216 :
217 205977 : return is_unique;
218 : }
219 :
220 : /*
221 : * _bt_check_unique() -- Check for violation of unique index constraint
222 : *
223 : * offset points to the first possible item that could conflict. It can
224 : * also point to end-of-page, which means that the first tuple to check
225 : * is the first tuple on the next page.
226 : *
227 : * Returns InvalidTransactionId if there is no conflict, else an xact ID
228 : * we must wait for to see if it commits a conflicting tuple. If an actual
229 : * conflict is detected, no return --- just ereport(). If an xact ID is
230 : * returned, and the conflicting tuple still has a speculative insertion in
231 : * progress, *speculativeToken is set to non-zero, and the caller can wait for
232 : * the verdict on the insertion using SpeculativeInsertionWait().
233 : *
234 : * However, if checkUnique == UNIQUE_CHECK_PARTIAL, we always return
235 : * InvalidTransactionId because we don't want to wait. In this case we
236 : * set *is_unique to false if there is a potential conflict, and the
237 : * core code must redo the uniqueness check later.
238 : */
239 : static TransactionId
240 114888 : _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
241 : Buffer buf, OffsetNumber offset, ScanKey itup_scankey,
242 : IndexUniqueCheck checkUnique, bool *is_unique,
243 : uint32 *speculativeToken)
244 : {
245 114888 : TupleDesc itupdesc = RelationGetDescr(rel);
246 114888 : int natts = rel->rd_rel->relnatts;
247 : SnapshotData SnapshotDirty;
248 : OffsetNumber maxoff;
249 : Page page;
250 : BTPageOpaque opaque;
251 114888 : Buffer nbuf = InvalidBuffer;
252 114888 : bool found = false;
253 :
254 : /* Assume unique until we find a duplicate */
255 114888 : *is_unique = true;
256 :
257 114888 : InitDirtySnapshot(SnapshotDirty);
258 :
259 114888 : page = BufferGetPage(buf);
260 114888 : opaque = (BTPageOpaque) PageGetSpecialPointer(page);
261 114888 : maxoff = PageGetMaxOffsetNumber(page);
262 :
263 : /*
264 : * Scan over all equal tuples, looking for live conflicts.
265 : */
266 : for (;;)
267 : {
268 : ItemId curitemid;
269 : IndexTuple curitup;
270 : BlockNumber nblkno;
271 :
272 : /*
273 : * make sure the offset points to an actual item before trying to
274 : * examine it...
275 : */
276 126632 : if (offset <= maxoff)
277 : {
278 62457 : curitemid = PageGetItemId(page, offset);
279 :
280 : /*
281 : * We can skip items that are marked killed.
282 : *
283 : * Formerly, we applied _bt_isequal() before checking the kill
284 : * flag, so as to fall out of the item loop as soon as possible.
285 : * However, in the presence of heavy update activity an index may
286 : * contain many killed items with the same key; running
287 : * _bt_isequal() on each killed item gets expensive. Furthermore
288 : * it is likely that the non-killed version of each key appears
289 : * first, so that we didn't actually get to exit any sooner
290 : * anyway. So now we just advance over killed items as quickly as
291 : * we can. We only apply _bt_isequal() when we get to a non-killed
292 : * item or the end of the page.
293 : */
294 62457 : if (!ItemIdIsDead(curitemid))
295 : {
296 : ItemPointerData htid;
297 : bool all_dead;
298 :
299 : /*
300 : * _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's
301 : * how we handling NULLs - and so we must not use _bt_compare
302 : * in real comparison, but only for ordering/finding items on
303 : * pages. - vadim 03/24/97
304 : */
305 56462 : if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
306 100706 : break; /* we're past all the equal tuples */
307 :
308 : /* okay, we gotta fetch the heap tuple ... */
309 6109 : curitup = (IndexTuple) PageGetItem(page, curitemid);
310 6109 : htid = curitup->t_tid;
311 :
312 : /*
313 : * If we are doing a recheck, we expect to find the tuple we
314 : * are rechecking. It's not a duplicate, but we have to keep
315 : * scanning.
316 : */
317 6138 : if (checkUnique == UNIQUE_CHECK_EXISTING &&
318 29 : ItemPointerCompare(&htid, &itup->t_tid) == 0)
319 : {
320 14 : found = true;
321 : }
322 :
323 : /*
324 : * We check the whole HOT-chain to see if there is any tuple
325 : * that satisfies SnapshotDirty. This is necessary because we
326 : * have just a single index entry for the entire chain.
327 : */
328 6095 : else if (heap_hot_search(&htid, heapRel, &SnapshotDirty,
329 : &all_dead))
330 : {
331 : TransactionId xwait;
332 :
333 : /*
334 : * It is a duplicate. If we are only doing a partial
335 : * check, then don't bother checking if the tuple is being
336 : * updated in another transaction. Just return the fact
337 : * that it is a potential conflict and leave the full
338 : * check till later.
339 : */
340 46 : if (checkUnique == UNIQUE_CHECK_PARTIAL)
341 : {
342 14 : if (nbuf != InvalidBuffer)
343 0 : _bt_relbuf(rel, nbuf);
344 14 : *is_unique = false;
345 28 : return InvalidTransactionId;
346 : }
347 :
348 : /*
349 : * If this tuple is being updated by other transaction
350 : * then we have to wait for its commit/abort.
351 : */
352 64 : xwait = (TransactionIdIsValid(SnapshotDirty.xmin)) ?
353 32 : SnapshotDirty.xmin : SnapshotDirty.xmax;
354 :
355 32 : if (TransactionIdIsValid(xwait))
356 : {
357 0 : if (nbuf != InvalidBuffer)
358 0 : _bt_relbuf(rel, nbuf);
359 : /* Tell _bt_doinsert to wait... */
360 0 : *speculativeToken = SnapshotDirty.speculativeToken;
361 0 : return xwait;
362 : }
363 :
364 : /*
365 : * Otherwise we have a definite conflict. But before
366 : * complaining, look to see if the tuple we want to insert
367 : * is itself now committed dead --- if so, don't complain.
368 : * This is a waste of time in normal scenarios but we must
369 : * do it to support CREATE INDEX CONCURRENTLY.
370 : *
371 : * We must follow HOT-chains here because during
372 : * concurrent index build, we insert the root TID though
373 : * the actual tuple may be somewhere in the HOT-chain.
374 : * While following the chain we might not stop at the
375 : * exact tuple which triggered the insert, but that's OK
376 : * because if we find a live tuple anywhere in this chain,
377 : * we have a unique key conflict. The other live tuple is
378 : * not part of this chain because it had a different index
379 : * entry.
380 : */
381 32 : htid = itup->t_tid;
382 32 : if (heap_hot_search(&htid, heapRel, SnapshotSelf, NULL))
383 : {
384 : /* Normal case --- it's still live */
385 : }
386 : else
387 : {
388 : /*
389 : * It's been deleted, so no error, and no need to
390 : * continue searching
391 : */
392 0 : break;
393 : }
394 :
395 : /*
396 : * Check for a conflict-in as we would if we were going to
397 : * write to this page. We aren't actually going to write,
398 : * but we want a chance to report SSI conflicts that would
399 : * otherwise be masked by this unique constraint
400 : * violation.
401 : */
402 32 : CheckForSerializableConflictIn(rel, NULL, buf);
403 :
404 : /*
405 : * This is a definite conflict. Break the tuple down into
406 : * datums and report the error. But first, make sure we
407 : * release the buffer locks we're holding ---
408 : * BuildIndexValueDescription could make catalog accesses,
409 : * which in the worst case might touch this same index and
410 : * cause deadlocks.
411 : */
412 32 : if (nbuf != InvalidBuffer)
413 0 : _bt_relbuf(rel, nbuf);
414 32 : _bt_relbuf(rel, buf);
415 :
416 : {
417 : Datum values[INDEX_MAX_KEYS];
418 : bool isnull[INDEX_MAX_KEYS];
419 : char *key_desc;
420 :
421 32 : index_deform_tuple(itup, RelationGetDescr(rel),
422 : values, isnull);
423 :
424 32 : key_desc = BuildIndexValueDescription(rel, values,
425 : isnull);
426 :
427 32 : ereport(ERROR,
428 : (errcode(ERRCODE_UNIQUE_VIOLATION),
429 : errmsg("duplicate key value violates unique constraint \"%s\"",
430 : RelationGetRelationName(rel)),
431 : key_desc ? errdetail("Key %s already exists.",
432 : key_desc) : 0,
433 : errtableconstraint(heapRel,
434 : RelationGetRelationName(rel))));
435 : }
436 : }
437 6049 : else if (all_dead)
438 : {
439 : /*
440 : * The conflicting tuple (or whole HOT chain) is dead to
441 : * everyone, so we may as well mark the index entry
442 : * killed.
443 : */
444 209 : ItemIdMarkDead(curitemid);
445 209 : opaque->btpo_flags |= BTP_HAS_GARBAGE;
446 :
447 : /*
448 : * Mark buffer with a dirty hint, since state is not
449 : * crucial. Be sure to mark the proper buffer dirty.
450 : */
451 209 : if (nbuf != InvalidBuffer)
452 1 : MarkBufferDirtyHint(nbuf, true);
453 : else
454 208 : MarkBufferDirtyHint(buf, true);
455 : }
456 : }
457 : }
458 :
459 : /*
460 : * Advance to next tuple to continue checking.
461 : */
462 76233 : if (offset < maxoff)
463 11738 : offset = OffsetNumberNext(offset);
464 : else
465 : {
466 : /* If scankey == hikey we gotta check the next page too */
467 64495 : if (P_RIGHTMOST(opaque))
468 64435 : break;
469 60 : if (!_bt_isequal(itupdesc, page, P_HIKEY,
470 : natts, itup_scankey))
471 54 : break;
472 : /* Advance to next non-dead page --- there must be one */
473 : for (;;)
474 : {
475 6 : nblkno = opaque->btpo_next;
476 6 : nbuf = _bt_relandgetbuf(rel, nbuf, nblkno, BT_READ);
477 6 : page = BufferGetPage(nbuf);
478 6 : opaque = (BTPageOpaque) PageGetSpecialPointer(page);
479 6 : if (!P_IGNORE(opaque))
480 6 : break;
481 0 : if (P_RIGHTMOST(opaque))
482 0 : elog(ERROR, "fell off the end of index \"%s\"",
483 : RelationGetRelationName(rel));
484 0 : }
485 6 : maxoff = PageGetMaxOffsetNumber(page);
486 6 : offset = P_FIRSTDATAKEY(opaque);
487 : }
488 11744 : }
489 :
490 : /*
491 : * If we are doing a recheck then we should have found the tuple we are
492 : * checking. Otherwise there's something very wrong --- probably, the
493 : * index is on a non-immutable expression.
494 : */
495 114842 : if (checkUnique == UNIQUE_CHECK_EXISTING && !found)
496 0 : ereport(ERROR,
497 : (errcode(ERRCODE_INTERNAL_ERROR),
498 : errmsg("failed to re-find tuple within index \"%s\"",
499 : RelationGetRelationName(rel)),
500 : errhint("This may be because of a non-immutable index expression."),
501 : errtableconstraint(heapRel,
502 : RelationGetRelationName(rel))));
503 :
504 114842 : if (nbuf != InvalidBuffer)
505 6 : _bt_relbuf(rel, nbuf);
506 :
507 114842 : return InvalidTransactionId;
508 : }
509 :
510 :
511 : /*
512 : * _bt_findinsertloc() -- Finds an insert location for a tuple
513 : *
514 : * If the new key is equal to one or more existing keys, we can
515 : * legitimately place it anywhere in the series of equal keys --- in fact,
516 : * if the new key is equal to the page's "high key" we can place it on
517 : * the next page. If it is equal to the high key, and there's not room
518 : * to insert the new tuple on the current page without splitting, then
519 : * we can move right hoping to find more free space and avoid a split.
520 : * (We should not move right indefinitely, however, since that leads to
521 : * O(N^2) insertion behavior in the presence of many equal keys.)
522 : * Once we have chosen the page to put the key on, we'll insert it before
523 : * any existing equal keys because of the way _bt_binsrch() works.
524 : *
525 : * If there's not enough room in the space, we try to make room by
526 : * removing any LP_DEAD tuples.
527 : *
528 : * On entry, *bufptr and *offsetptr point to the first legal position
529 : * where the new tuple could be inserted. The caller should hold an
530 : * exclusive lock on *bufptr. *offsetptr can also be set to
531 : * InvalidOffsetNumber, in which case the function will search for the
532 : * right location within the page if needed. On exit, they point to the
533 : * chosen insert location. If _bt_findinsertloc decides to move right,
534 : * the lock and pin on the original page will be released and the new
535 : * page returned to the caller is exclusively locked instead.
536 : *
537 : * newtup is the new tuple we're inserting, and scankey is an insertion
538 : * type scan key for it.
539 : */
540 : static void
541 205968 : _bt_findinsertloc(Relation rel,
542 : Buffer *bufptr,
543 : OffsetNumber *offsetptr,
544 : int keysz,
545 : ScanKey scankey,
546 : IndexTuple newtup,
547 : BTStack stack,
548 : Relation heapRel)
549 : {
550 205968 : Buffer buf = *bufptr;
551 205968 : Page page = BufferGetPage(buf);
552 : Size itemsz;
553 : BTPageOpaque lpageop;
554 : bool movedright,
555 : vacuumed;
556 : OffsetNumber newitemoff;
557 205968 : OffsetNumber firstlegaloff = *offsetptr;
558 :
559 205968 : lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
560 :
561 205968 : itemsz = IndexTupleDSize(*newtup);
562 205968 : itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but we
563 : * need to be consistent */
564 :
565 : /*
566 : * Check whether the item can fit on a btree page at all. (Eventually, we
567 : * ought to try to apply TOAST methods if not.) We actually need to be
568 : * able to fit three items on every page, so restrict any one item to 1/3
569 : * the per-page available space. Note that at this point, itemsz doesn't
570 : * include the ItemId.
571 : *
572 : * NOTE: if you change this, see also the similar code in _bt_buildadd().
573 : */
574 205968 : if (itemsz > BTMaxItemSize(page))
575 0 : ereport(ERROR,
576 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
577 : errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
578 : itemsz, BTMaxItemSize(page),
579 : RelationGetRelationName(rel)),
580 : errhint("Values larger than 1/3 of a buffer page cannot be indexed.\n"
581 : "Consider a function index of an MD5 hash of the value, "
582 : "or use full text indexing."),
583 : errtableconstraint(heapRel,
584 : RelationGetRelationName(rel))));
585 :
586 : /*----------
587 : * If we will need to split the page to put the item on this page,
588 : * check whether we can put the tuple somewhere to the right,
589 : * instead. Keep scanning right until we
590 : * (a) find a page with enough free space,
591 : * (b) reach the last page where the tuple can legally go, or
592 : * (c) get tired of searching.
593 : * (c) is not flippant; it is important because if there are many
594 : * pages' worth of equal keys, it's better to split one of the early
595 : * pages than to scan all the way to the end of the run of equal keys
596 : * on every insert. We implement "get tired" as a random choice,
597 : * since stopping after scanning a fixed number of pages wouldn't work
598 : * well (we'd never reach the right-hand side of previously split
599 : * pages). Currently the probability of moving right is set at 0.99,
600 : * which may seem too high to change the behavior much, but it does an
601 : * excellent job of preventing O(N^2) behavior with many equal keys.
602 : *----------
603 : */
604 205968 : movedright = false;
605 205968 : vacuumed = false;
606 414307 : while (PageGetFreeSpace(page) < itemsz)
607 : {
608 : Buffer rbuf;
609 : BlockNumber rblkno;
610 :
611 : /*
612 : * before considering moving right, see if we can obtain enough space
613 : * by erasing LP_DEAD items
614 : */
615 3454 : if (P_ISLEAF(lpageop) && P_HAS_GARBAGE(lpageop))
616 : {
617 257 : _bt_vacuum_one_page(rel, buf, heapRel);
618 :
619 : /*
620 : * remember that we vacuumed this page, because that makes the
621 : * hint supplied by the caller invalid
622 : */
623 257 : vacuumed = true;
624 :
625 257 : if (PageGetFreeSpace(page) >= itemsz)
626 256 : break; /* OK, now we have enough space */
627 : }
628 :
629 : /*
630 : * nope, so check conditions (b) and (c) enumerated above
631 : */
632 5961 : if (P_RIGHTMOST(lpageop) ||
633 5159 : _bt_compare(rel, keysz, scankey, page, P_HIKEY) != 0 ||
634 2396 : random() <= (MAX_RANDOM_VALUE / 100))
635 : break;
636 :
637 : /*
638 : * step right to next non-dead page
639 : *
640 : * must write-lock that page before releasing write lock on current
641 : * page; else someone else's _bt_check_unique scan could fail to see
642 : * our insertion. write locks on intermediate dead pages won't do
643 : * because we don't know when they will get de-linked from the tree.
644 : */
645 2371 : rbuf = InvalidBuffer;
646 :
647 2371 : rblkno = lpageop->btpo_next;
648 : for (;;)
649 : {
650 2371 : rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
651 2371 : page = BufferGetPage(rbuf);
652 2371 : lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
653 :
654 : /*
655 : * If this page was incompletely split, finish the split now. We
656 : * do this while holding a lock on the left sibling, which is not
657 : * good because finishing the split could be a fairly lengthy
658 : * operation. But this should happen very seldom.
659 : */
660 2371 : if (P_INCOMPLETE_SPLIT(lpageop))
661 : {
662 0 : _bt_finish_split(rel, rbuf, stack);
663 0 : rbuf = InvalidBuffer;
664 0 : continue;
665 : }
666 :
667 2371 : if (!P_IGNORE(lpageop))
668 2371 : break;
669 0 : if (P_RIGHTMOST(lpageop))
670 0 : elog(ERROR, "fell off the end of index \"%s\"",
671 : RelationGetRelationName(rel));
672 :
673 0 : rblkno = lpageop->btpo_next;
674 0 : }
675 2371 : _bt_relbuf(rel, buf);
676 2371 : buf = rbuf;
677 2371 : movedright = true;
678 2371 : vacuumed = false;
679 : }
680 :
681 : /*
682 : * Now we are on the right page, so find the insert position. If we moved
683 : * right at all, we know we should insert at the start of the page. If we
684 : * didn't move right, we can use the firstlegaloff hint if the caller
685 : * supplied one, unless we vacuumed the page which might have moved tuples
686 : * around making the hint invalid. If we didn't move right or can't use
687 : * the hint, find the position by searching.
688 : */
689 205968 : if (movedright)
690 2325 : newitemoff = P_FIRSTDATAKEY(lpageop);
691 203643 : else if (firstlegaloff != InvalidOffsetNumber && !vacuumed)
692 114729 : newitemoff = firstlegaloff;
693 : else
694 88914 : newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);
695 :
696 205968 : *bufptr = buf;
697 205968 : *offsetptr = newitemoff;
698 205968 : }
699 :
700 : /*----------
701 : * _bt_insertonpg() -- Insert a tuple on a particular page in the index.
702 : *
703 : * This recursive procedure does the following things:
704 : *
705 : * + if necessary, splits the target page (making sure that the
706 : * split is equitable as far as post-insert free space goes).
707 : * + inserts the tuple.
708 : * + if the page was split, pops the parent stack, and finds the
709 : * right place to insert the new child pointer (by walking
710 : * right using information stored in the parent stack).
711 : * + invokes itself with the appropriate tuple for the right
712 : * child page on the parent.
713 : * + updates the metapage if a true root or fast root is split.
714 : *
715 : * On entry, we must have the correct buffer in which to do the
716 : * insertion, and the buffer must be pinned and write-locked. On return,
717 : * we will have dropped both the pin and the lock on the buffer.
718 : *
719 : * When inserting to a non-leaf page, 'cbuf' is the left-sibling of the
720 : * page we're inserting the downlink for. This function will clear the
721 : * INCOMPLETE_SPLIT flag on it, and release the buffer.
722 : *
723 : * The locking interactions in this code are critical. You should
724 : * grok Lehman and Yao's paper before making any changes. In addition,
725 : * you need to understand how we disambiguate duplicate keys in this
726 : * implementation, in order to be able to find our location using
727 : * L&Y "move right" operations. Since we may insert duplicate user
728 : * keys, and since these dups may propagate up the tree, we use the
729 : * 'afteritem' parameter to position ourselves correctly for the
730 : * insertion on internal pages.
731 : *----------
732 : */
733 : static void
734 206808 : _bt_insertonpg(Relation rel,
735 : Buffer buf,
736 : Buffer cbuf,
737 : BTStack stack,
738 : IndexTuple itup,
739 : OffsetNumber newitemoff,
740 : bool split_only_page)
741 : {
742 : Page page;
743 : BTPageOpaque lpageop;
744 206808 : OffsetNumber firstright = InvalidOffsetNumber;
745 : Size itemsz;
746 :
747 206808 : page = BufferGetPage(buf);
748 206808 : lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
749 :
750 : /* child buffer must be given iff inserting on an internal page */
751 206808 : Assert(P_ISLEAF(lpageop) == !BufferIsValid(cbuf));
752 :
753 : /* The caller should've finished any incomplete splits already. */
754 206808 : if (P_INCOMPLETE_SPLIT(lpageop))
755 0 : elog(ERROR, "cannot insert to incompletely split page %u",
756 : BufferGetBlockNumber(buf));
757 :
758 206808 : itemsz = IndexTupleDSize(*itup);
759 206808 : itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but we
760 : * need to be consistent */
761 :
762 : /*
763 : * Do we need to split the page to fit the item on it?
764 : *
765 : * Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its result,
766 : * so this comparison is correct even though we appear to be accounting
767 : * only for the item and not for its line pointer.
768 : */
769 206808 : if (PageGetFreeSpace(page) < itemsz)
770 : {
771 869 : bool is_root = P_ISROOT(lpageop);
772 869 : bool is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(lpageop);
773 : bool newitemonleft;
774 : Buffer rbuf;
775 :
776 : /* Choose the split point */
777 869 : firstright = _bt_findsplitloc(rel, page,
778 : newitemoff, itemsz,
779 : &newitemonleft);
780 :
781 : /* split the buffer into left and right halves */
782 869 : rbuf = _bt_split(rel, buf, cbuf, firstright,
783 : newitemoff, itemsz, itup, newitemonleft);
784 869 : PredicateLockPageSplit(rel,
785 : BufferGetBlockNumber(buf),
786 : BufferGetBlockNumber(rbuf));
787 :
788 : /*----------
789 : * By here,
790 : *
791 : * + our target page has been split;
792 : * + the original tuple has been inserted;
793 : * + we have write locks on both the old (left half)
794 : * and new (right half) buffers, after the split; and
795 : * + we know the key we want to insert into the parent
796 : * (it's the "high key" on the left child page).
797 : *
798 : * We're ready to do the parent insertion. We need to hold onto the
799 : * locks for the child pages until we locate the parent, but we can
800 : * release them before doing the actual insertion (see Lehman and Yao
801 : * for the reasoning).
802 : *----------
803 : */
804 869 : _bt_insert_parent(rel, buf, rbuf, stack, is_root, is_only);
805 : }
806 : else
807 : {
808 205939 : Buffer metabuf = InvalidBuffer;
809 205939 : Page metapg = NULL;
810 205939 : BTMetaPageData *metad = NULL;
811 : OffsetNumber itup_off;
812 : BlockNumber itup_blkno;
813 :
814 205939 : itup_off = newitemoff;
815 205939 : itup_blkno = BufferGetBlockNumber(buf);
816 :
817 : /*
818 : * If we are doing this insert because we split a page that was the
819 : * only one on its tree level, but was not the root, it may have been
820 : * the "fast root". We need to ensure that the fast root link points
821 : * at or above the current page. We can safely acquire a lock on the
822 : * metapage here --- see comments for _bt_newroot().
823 : */
824 205939 : if (split_only_page)
825 : {
826 1 : Assert(!P_ISLEAF(lpageop));
827 :
828 1 : metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
829 1 : metapg = BufferGetPage(metabuf);
830 1 : metad = BTPageGetMeta(metapg);
831 :
832 1 : if (metad->btm_fastlevel >= lpageop->btpo.level)
833 : {
834 : /* no update wanted */
835 0 : _bt_relbuf(rel, metabuf);
836 0 : metabuf = InvalidBuffer;
837 : }
838 : }
839 :
840 : /* Do the update. No ereport(ERROR) until changes are logged */
841 205939 : START_CRIT_SECTION();
842 :
843 205939 : if (!_bt_pgaddtup(page, itemsz, itup, newitemoff))
844 0 : elog(PANIC, "failed to add new item to block %u in index \"%s\"",
845 : itup_blkno, RelationGetRelationName(rel));
846 :
847 205939 : MarkBufferDirty(buf);
848 :
849 205939 : if (BufferIsValid(metabuf))
850 : {
851 1 : metad->btm_fastroot = itup_blkno;
852 1 : metad->btm_fastlevel = lpageop->btpo.level;
853 1 : MarkBufferDirty(metabuf);
854 : }
855 :
856 : /* clear INCOMPLETE_SPLIT flag on child if inserting a downlink */
857 205939 : if (BufferIsValid(cbuf))
858 : {
859 798 : Page cpage = BufferGetPage(cbuf);
860 798 : BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage);
861 :
862 798 : Assert(P_INCOMPLETE_SPLIT(cpageop));
863 798 : cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
864 798 : MarkBufferDirty(cbuf);
865 : }
866 :
867 : /* XLOG stuff */
868 205939 : if (RelationNeedsWAL(rel))
869 : {
870 : xl_btree_insert xlrec;
871 : xl_btree_metadata xlmeta;
872 : uint8 xlinfo;
873 : XLogRecPtr recptr;
874 : IndexTupleData trunctuple;
875 :
876 205605 : xlrec.offnum = itup_off;
877 :
878 205605 : XLogBeginInsert();
879 205605 : XLogRegisterData((char *) &xlrec, SizeOfBtreeInsert);
880 :
881 205605 : if (P_ISLEAF(lpageop))
882 204807 : xlinfo = XLOG_BTREE_INSERT_LEAF;
883 : else
884 : {
885 : /*
886 : * Register the left child whose INCOMPLETE_SPLIT flag was
887 : * cleared.
888 : */
889 798 : XLogRegisterBuffer(1, cbuf, REGBUF_STANDARD);
890 :
891 798 : xlinfo = XLOG_BTREE_INSERT_UPPER;
892 : }
893 :
894 205605 : if (BufferIsValid(metabuf))
895 : {
896 1 : xlmeta.root = metad->btm_root;
897 1 : xlmeta.level = metad->btm_level;
898 1 : xlmeta.fastroot = metad->btm_fastroot;
899 1 : xlmeta.fastlevel = metad->btm_fastlevel;
900 :
901 1 : XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT);
902 1 : XLogRegisterBufData(2, (char *) &xlmeta, sizeof(xl_btree_metadata));
903 :
904 1 : xlinfo = XLOG_BTREE_INSERT_META;
905 : }
906 :
907 : /* Read comments in _bt_pgaddtup */
908 205605 : XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
909 205605 : if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
910 : {
911 0 : trunctuple = *itup;
912 0 : trunctuple.t_info = sizeof(IndexTupleData);
913 0 : XLogRegisterBufData(0, (char *) &trunctuple,
914 : sizeof(IndexTupleData));
915 : }
916 : else
917 205605 : XLogRegisterBufData(0, (char *) itup, IndexTupleDSize(*itup));
918 :
919 205605 : recptr = XLogInsert(RM_BTREE_ID, xlinfo);
920 :
921 205605 : if (BufferIsValid(metabuf))
922 : {
923 1 : PageSetLSN(metapg, recptr);
924 : }
925 205605 : if (BufferIsValid(cbuf))
926 : {
927 798 : PageSetLSN(BufferGetPage(cbuf), recptr);
928 : }
929 :
930 205605 : PageSetLSN(page, recptr);
931 : }
932 :
933 205939 : END_CRIT_SECTION();
934 :
935 : /* release buffers */
936 205939 : if (BufferIsValid(metabuf))
937 1 : _bt_relbuf(rel, metabuf);
938 205939 : if (BufferIsValid(cbuf))
939 798 : _bt_relbuf(rel, cbuf);
940 205939 : _bt_relbuf(rel, buf);
941 : }
942 206808 : }
943 :
944 : /*
945 : * _bt_split() -- split a page in the btree.
946 : *
947 : * On entry, buf is the page to split, and is pinned and write-locked.
948 : * firstright is the item index of the first item to be moved to the
949 : * new right page. newitemoff etc. tell us about the new item that
950 : * must be inserted along with the data from the old page.
951 : *
952 : * When splitting a non-leaf page, 'cbuf' is the left-sibling of the
953 : * page we're inserting the downlink for. This function will clear the
954 : * INCOMPLETE_SPLIT flag on it, and release the buffer.
955 : *
956 : * Returns the new right sibling of buf, pinned and write-locked.
957 : * The pin and lock on buf are maintained.
958 : */
959 : static Buffer
960 869 : _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
961 : OffsetNumber newitemoff, Size newitemsz, IndexTuple newitem,
962 : bool newitemonleft)
963 : {
964 : Buffer rbuf;
965 : Page origpage;
966 : Page leftpage,
967 : rightpage;
968 : BlockNumber origpagenumber,
969 : rightpagenumber;
970 : BTPageOpaque ropaque,
971 : lopaque,
972 : oopaque;
973 869 : Buffer sbuf = InvalidBuffer;
974 869 : Page spage = NULL;
975 869 : BTPageOpaque sopaque = NULL;
976 : Size itemsz;
977 : ItemId itemid;
978 : IndexTuple item;
979 : OffsetNumber leftoff,
980 : rightoff;
981 : OffsetNumber maxoff;
982 : OffsetNumber i;
983 : bool isleaf;
984 :
985 : /* Acquire a new page to split into */
986 869 : rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
987 :
988 : /*
989 : * origpage is the original page to be split. leftpage is a temporary
990 : * buffer that receives the left-sibling data, which will be copied back
991 : * into origpage on success. rightpage is the new page that receives the
992 : * right-sibling data. If we fail before reaching the critical section,
993 : * origpage hasn't been modified and leftpage is only workspace. In
994 : * principle we shouldn't need to worry about rightpage either, because it
995 : * hasn't been linked into the btree page structure; but to avoid leaving
996 : * possibly-confusing junk behind, we are careful to rewrite rightpage as
997 : * zeroes before throwing any error.
998 : */
999 869 : origpage = BufferGetPage(buf);
1000 869 : leftpage = PageGetTempPage(origpage);
1001 869 : rightpage = BufferGetPage(rbuf);
1002 :
1003 869 : origpagenumber = BufferGetBlockNumber(buf);
1004 869 : rightpagenumber = BufferGetBlockNumber(rbuf);
1005 :
1006 869 : _bt_pageinit(leftpage, BufferGetPageSize(buf));
1007 : /* rightpage was already initialized by _bt_getbuf */
1008 :
1009 : /*
1010 : * Copy the original page's LSN into leftpage, which will become the
1011 : * updated version of the page. We need this because XLogInsert will
1012 : * examine the LSN and possibly dump it in a page image.
1013 : */
1014 869 : PageSetLSN(leftpage, PageGetLSN(origpage));
1015 :
1016 : /* init btree private data */
1017 869 : oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
1018 869 : lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
1019 869 : ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
1020 :
1021 869 : isleaf = P_ISLEAF(oopaque);
1022 :
1023 : /* if we're splitting this page, it won't be the root when we're done */
1024 : /* also, clear the SPLIT_END and HAS_GARBAGE flags in both pages */
1025 869 : lopaque->btpo_flags = oopaque->btpo_flags;
1026 869 : lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
1027 869 : ropaque->btpo_flags = lopaque->btpo_flags;
1028 : /* set flag in left page indicating that the right page has no downlink */
1029 869 : lopaque->btpo_flags |= BTP_INCOMPLETE_SPLIT;
1030 869 : lopaque->btpo_prev = oopaque->btpo_prev;
1031 869 : lopaque->btpo_next = rightpagenumber;
1032 869 : ropaque->btpo_prev = origpagenumber;
1033 869 : ropaque->btpo_next = oopaque->btpo_next;
1034 869 : lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
1035 : /* Since we already have write-lock on both pages, ok to read cycleid */
1036 869 : lopaque->btpo_cycleid = _bt_vacuum_cycleid(rel);
1037 869 : ropaque->btpo_cycleid = lopaque->btpo_cycleid;
1038 :
1039 : /*
1040 : * If the page we're splitting is not the rightmost page at its level in
1041 : * the tree, then the first entry on the page is the high key for the
1042 : * page. We need to copy that to the right half. Otherwise (meaning the
1043 : * rightmost page case), all the items on the right half will be user
1044 : * data.
1045 : */
1046 869 : rightoff = P_HIKEY;
1047 :
1048 869 : if (!P_RIGHTMOST(oopaque))
1049 : {
1050 392 : itemid = PageGetItemId(origpage, P_HIKEY);
1051 392 : itemsz = ItemIdGetLength(itemid);
1052 392 : item = (IndexTuple) PageGetItem(origpage, itemid);
1053 392 : if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
1054 : false, false) == InvalidOffsetNumber)
1055 : {
1056 0 : memset(rightpage, 0, BufferGetPageSize(rbuf));
1057 0 : elog(ERROR, "failed to add hikey to the right sibling"
1058 : " while splitting block %u of index \"%s\"",
1059 : origpagenumber, RelationGetRelationName(rel));
1060 : }
1061 392 : rightoff = OffsetNumberNext(rightoff);
1062 : }
1063 :
1064 : /*
1065 : * The "high key" for the new left page will be the first key that's going
1066 : * to go into the new right page. This might be either the existing data
1067 : * item at position firstright, or the incoming tuple.
1068 : */
1069 869 : leftoff = P_HIKEY;
1070 869 : if (!newitemonleft && newitemoff == firstright)
1071 : {
1072 : /* incoming tuple will become first on right page */
1073 1 : itemsz = newitemsz;
1074 1 : item = newitem;
1075 : }
1076 : else
1077 : {
1078 : /* existing item at firstright will become first on right page */
1079 868 : itemid = PageGetItemId(origpage, firstright);
1080 868 : itemsz = ItemIdGetLength(itemid);
1081 868 : item = (IndexTuple) PageGetItem(origpage, itemid);
1082 : }
1083 869 : if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
1084 : false, false) == InvalidOffsetNumber)
1085 : {
1086 0 : memset(rightpage, 0, BufferGetPageSize(rbuf));
1087 0 : elog(ERROR, "failed to add hikey to the left sibling"
1088 : " while splitting block %u of index \"%s\"",
1089 : origpagenumber, RelationGetRelationName(rel));
1090 : }
1091 869 : leftoff = OffsetNumberNext(leftoff);
1092 :
1093 : /*
1094 : * Now transfer all the data items to the appropriate page.
1095 : *
1096 : * Note: we *must* insert at least the right page's items in item-number
1097 : * order, for the benefit of _bt_restore_page().
1098 : */
1099 869 : maxoff = PageGetMaxOffsetNumber(origpage);
1100 :
1101 248163 : for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i))
1102 : {
1103 247294 : itemid = PageGetItemId(origpage, i);
1104 247294 : itemsz = ItemIdGetLength(itemid);
1105 247294 : item = (IndexTuple) PageGetItem(origpage, itemid);
1106 :
1107 : /* does new item belong before this one? */
1108 247294 : if (i == newitemoff)
1109 : {
1110 525 : if (newitemonleft)
1111 : {
1112 141 : if (!_bt_pgaddtup(leftpage, newitemsz, newitem, leftoff))
1113 : {
1114 0 : memset(rightpage, 0, BufferGetPageSize(rbuf));
1115 0 : elog(ERROR, "failed to add new item to the left sibling"
1116 : " while splitting block %u of index \"%s\"",
1117 : origpagenumber, RelationGetRelationName(rel));
1118 : }
1119 141 : leftoff = OffsetNumberNext(leftoff);
1120 : }
1121 : else
1122 : {
1123 384 : if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
1124 : {
1125 0 : memset(rightpage, 0, BufferGetPageSize(rbuf));
1126 0 : elog(ERROR, "failed to add new item to the right sibling"
1127 : " while splitting block %u of index \"%s\"",
1128 : origpagenumber, RelationGetRelationName(rel));
1129 : }
1130 384 : rightoff = OffsetNumberNext(rightoff);
1131 : }
1132 : }
1133 :
1134 : /* decide which page to put it on */
1135 247294 : if (i < firstright)
1136 : {
1137 161082 : if (!_bt_pgaddtup(leftpage, itemsz, item, leftoff))
1138 : {
1139 0 : memset(rightpage, 0, BufferGetPageSize(rbuf));
1140 0 : elog(ERROR, "failed to add old item to the left sibling"
1141 : " while splitting block %u of index \"%s\"",
1142 : origpagenumber, RelationGetRelationName(rel));
1143 : }
1144 161082 : leftoff = OffsetNumberNext(leftoff);
1145 : }
1146 : else
1147 : {
1148 86212 : if (!_bt_pgaddtup(rightpage, itemsz, item, rightoff))
1149 : {
1150 0 : memset(rightpage, 0, BufferGetPageSize(rbuf));
1151 0 : elog(ERROR, "failed to add old item to the right sibling"
1152 : " while splitting block %u of index \"%s\"",
1153 : origpagenumber, RelationGetRelationName(rel));
1154 : }
1155 86212 : rightoff = OffsetNumberNext(rightoff);
1156 : }
1157 : }
1158 :
1159 : /* cope with possibility that newitem goes at the end */
1160 869 : if (i <= newitemoff)
1161 : {
1162 : /*
1163 : * Can't have newitemonleft here; that would imply we were told to put
1164 : * *everything* on the left page, which cannot fit (if it could, we'd
1165 : * not be splitting the page).
1166 : */
1167 344 : Assert(!newitemonleft);
1168 344 : if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
1169 : {
1170 0 : memset(rightpage, 0, BufferGetPageSize(rbuf));
1171 0 : elog(ERROR, "failed to add new item to the right sibling"
1172 : " while splitting block %u of index \"%s\"",
1173 : origpagenumber, RelationGetRelationName(rel));
1174 : }
1175 344 : rightoff = OffsetNumberNext(rightoff);
1176 : }
1177 :
1178 : /*
1179 : * We have to grab the right sibling (if any) and fix the prev pointer
1180 : * there. We are guaranteed that this is deadlock-free since no other
1181 : * writer will be holding a lock on that page and trying to move left, and
1182 : * all readers release locks on a page before trying to fetch its
1183 : * neighbors.
1184 : */
1185 :
1186 869 : if (!P_RIGHTMOST(oopaque))
1187 : {
1188 392 : sbuf = _bt_getbuf(rel, oopaque->btpo_next, BT_WRITE);
1189 392 : spage = BufferGetPage(sbuf);
1190 392 : sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
1191 392 : if (sopaque->btpo_prev != origpagenumber)
1192 : {
1193 0 : memset(rightpage, 0, BufferGetPageSize(rbuf));
1194 0 : elog(ERROR, "right sibling's left-link doesn't match: "
1195 : "block %u links to %u instead of expected %u in index \"%s\"",
1196 : oopaque->btpo_next, sopaque->btpo_prev, origpagenumber,
1197 : RelationGetRelationName(rel));
1198 : }
1199 :
1200 : /*
1201 : * Check to see if we can set the SPLIT_END flag in the right-hand
1202 : * split page; this can save some I/O for vacuum since it need not
1203 : * proceed to the right sibling. We can set the flag if the right
1204 : * sibling has a different cycleid: that means it could not be part of
1205 : * a group of pages that were all split off from the same ancestor
1206 : * page. If you're confused, imagine that page A splits to A B and
1207 : * then again, yielding A C B, while vacuum is in progress. Tuples
1208 : * originally in A could now be in either B or C, hence vacuum must
1209 : * examine both pages. But if D, our right sibling, has a different
1210 : * cycleid then it could not contain any tuples that were in A when
1211 : * the vacuum started.
1212 : */
1213 392 : if (sopaque->btpo_cycleid != ropaque->btpo_cycleid)
1214 0 : ropaque->btpo_flags |= BTP_SPLIT_END;
1215 : }
1216 :
1217 : /*
1218 : * Right sibling is locked, new siblings are prepared, but original page
1219 : * is not updated yet.
1220 : *
1221 : * NO EREPORT(ERROR) till right sibling is updated. We can get away with
1222 : * not starting the critical section till here because we haven't been
1223 : * scribbling on the original page yet; see comments above.
1224 : */
1225 869 : START_CRIT_SECTION();
1226 :
1227 : /*
1228 : * By here, the original data page has been split into two new halves, and
1229 : * these are correct. The algorithm requires that the left page never
1230 : * move during a split, so we copy the new left page back on top of the
1231 : * original. Note that this is not a waste of time, since we also require
1232 : * (in the page management code) that the center of a page always be
1233 : * clean, and the most efficient way to guarantee this is just to compact
1234 : * the data by reinserting it into a new left page. (XXX the latter
1235 : * comment is probably obsolete; but in any case it's good to not scribble
1236 : * on the original page until we enter the critical section.)
1237 : *
1238 : * We need to do this before writing the WAL record, so that XLogInsert
1239 : * can WAL log an image of the page if necessary.
1240 : */
1241 869 : PageRestoreTempPage(leftpage, origpage);
1242 : /* leftpage, lopaque must not be used below here */
1243 :
1244 869 : MarkBufferDirty(buf);
1245 869 : MarkBufferDirty(rbuf);
1246 :
1247 869 : if (!P_RIGHTMOST(ropaque))
1248 : {
1249 392 : sopaque->btpo_prev = rightpagenumber;
1250 392 : MarkBufferDirty(sbuf);
1251 : }
1252 :
1253 : /*
1254 : * Clear INCOMPLETE_SPLIT flag on child if inserting the new item finishes
1255 : * a split.
1256 : */
1257 869 : if (!isleaf)
1258 : {
1259 42 : Page cpage = BufferGetPage(cbuf);
1260 42 : BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage);
1261 :
1262 42 : cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
1263 42 : MarkBufferDirty(cbuf);
1264 : }
1265 :
1266 : /* XLOG stuff */
1267 869 : if (RelationNeedsWAL(rel))
1268 : {
1269 : xl_btree_split xlrec;
1270 : uint8 xlinfo;
1271 : XLogRecPtr recptr;
1272 :
1273 869 : xlrec.level = ropaque->btpo.level;
1274 869 : xlrec.firstright = firstright;
1275 869 : xlrec.newitemoff = newitemoff;
1276 :
1277 869 : XLogBeginInsert();
1278 869 : XLogRegisterData((char *) &xlrec, SizeOfBtreeSplit);
1279 :
1280 869 : XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
1281 869 : XLogRegisterBuffer(1, rbuf, REGBUF_WILL_INIT);
1282 : /* Log the right sibling, because we've changed its prev-pointer. */
1283 869 : if (!P_RIGHTMOST(ropaque))
1284 392 : XLogRegisterBuffer(2, sbuf, REGBUF_STANDARD);
1285 869 : if (BufferIsValid(cbuf))
1286 42 : XLogRegisterBuffer(3, cbuf, REGBUF_STANDARD);
1287 :
1288 : /*
1289 : * Log the new item, if it was inserted on the left page. (If it was
1290 : * put on the right page, we don't need to explicitly WAL log it
1291 : * because it's included with all the other items on the right page.)
1292 : * Show the new item as belonging to the left page buffer, so that it
1293 : * is not stored if XLogInsert decides it needs a full-page image of
1294 : * the left page. We store the offset anyway, though, to support
1295 : * archive compression of these records.
1296 : */
1297 869 : if (newitemonleft)
1298 141 : XLogRegisterBufData(0, (char *) newitem, MAXALIGN(newitemsz));
1299 :
1300 : /* Log left page */
1301 869 : if (!isleaf)
1302 : {
1303 : /*
1304 : * We must also log the left page's high key, because the right
1305 : * page's leftmost key is suppressed on non-leaf levels. Show it
1306 : * as belonging to the left page buffer, so that it is not stored
1307 : * if XLogInsert decides it needs a full-page image of the left
1308 : * page.
1309 : */
1310 42 : itemid = PageGetItemId(origpage, P_HIKEY);
1311 42 : item = (IndexTuple) PageGetItem(origpage, itemid);
1312 42 : XLogRegisterBufData(0, (char *) item, MAXALIGN(IndexTupleSize(item)));
1313 : }
1314 :
1315 : /*
1316 : * Log the contents of the right page in the format understood by
1317 : * _bt_restore_page(). We set lastrdata->buffer to InvalidBuffer,
1318 : * because we're going to recreate the whole page anyway, so it should
1319 : * never be stored by XLogInsert.
1320 : *
1321 : * Direct access to page is not good but faster - we should implement
1322 : * some new func in page API. Note we only store the tuples
1323 : * themselves, knowing that they were inserted in item-number order
1324 : * and so the item pointers can be reconstructed. See comments for
1325 : * _bt_restore_page().
1326 : */
1327 2607 : XLogRegisterBufData(1,
1328 869 : (char *) rightpage + ((PageHeader) rightpage)->pd_upper,
1329 1738 : ((PageHeader) rightpage)->pd_special - ((PageHeader) rightpage)->pd_upper);
1330 :
1331 869 : xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
1332 869 : recptr = XLogInsert(RM_BTREE_ID, xlinfo);
1333 :
1334 869 : PageSetLSN(origpage, recptr);
1335 869 : PageSetLSN(rightpage, recptr);
1336 869 : if (!P_RIGHTMOST(ropaque))
1337 : {
1338 392 : PageSetLSN(spage, recptr);
1339 : }
1340 869 : if (!isleaf)
1341 : {
1342 42 : PageSetLSN(BufferGetPage(cbuf), recptr);
1343 : }
1344 : }
1345 :
1346 869 : END_CRIT_SECTION();
1347 :
1348 : /* release the old right sibling */
1349 869 : if (!P_RIGHTMOST(ropaque))
1350 392 : _bt_relbuf(rel, sbuf);
1351 :
1352 : /* release the child */
1353 869 : if (!isleaf)
1354 42 : _bt_relbuf(rel, cbuf);
1355 :
1356 : /* split's done */
1357 869 : return rbuf;
1358 : }
1359 :
1360 : /*
1361 : * _bt_findsplitloc() -- find an appropriate place to split a page.
1362 : *
1363 : * The idea here is to equalize the free space that will be on each split
1364 : * page, *after accounting for the inserted tuple*. (If we fail to account
1365 : * for it, we might find ourselves with too little room on the page that
1366 : * it needs to go into!)
1367 : *
1368 : * If the page is the rightmost page on its level, we instead try to arrange
1369 : * to leave the left split page fillfactor% full. In this way, when we are
1370 : * inserting successively increasing keys (consider sequences, timestamps,
1371 : * etc) we will end up with a tree whose pages are about fillfactor% full,
1372 : * instead of the 50% full result that we'd get without this special case.
1373 : * This is the same as nbtsort.c produces for a newly-created tree. Note
1374 : * that leaf and nonleaf pages use different fillfactors.
1375 : *
1376 : * We are passed the intended insert position of the new tuple, expressed as
1377 : * the offsetnumber of the tuple it must go in front of. (This could be
1378 : * maxoff+1 if the tuple is to go at the end.)
1379 : *
1380 : * We return the index of the first existing tuple that should go on the
1381 : * righthand page, plus a boolean indicating whether the new tuple goes on
1382 : * the left or right page. The bool is necessary to disambiguate the case
1383 : * where firstright == newitemoff.
1384 : */
1385 : static OffsetNumber
1386 869 : _bt_findsplitloc(Relation rel,
1387 : Page page,
1388 : OffsetNumber newitemoff,
1389 : Size newitemsz,
1390 : bool *newitemonleft)
1391 : {
1392 : BTPageOpaque opaque;
1393 : OffsetNumber offnum;
1394 : OffsetNumber maxoff;
1395 : ItemId itemid;
1396 : FindSplitData state;
1397 : int leftspace,
1398 : rightspace,
1399 : goodenough,
1400 : olddataitemstotal,
1401 : olddataitemstoleft;
1402 : bool goodenoughfound;
1403 :
1404 869 : opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1405 :
1406 : /* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
1407 869 : newitemsz += sizeof(ItemIdData);
1408 :
1409 : /* Total free space available on a btree page, after fixed overhead */
1410 869 : leftspace = rightspace =
1411 869 : PageGetPageSize(page) - SizeOfPageHeaderData -
1412 : MAXALIGN(sizeof(BTPageOpaqueData));
1413 :
1414 : /* The right page will have the same high key as the old page */
1415 869 : if (!P_RIGHTMOST(opaque))
1416 : {
1417 392 : itemid = PageGetItemId(page, P_HIKEY);
1418 392 : rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
1419 : sizeof(ItemIdData));
1420 : }
1421 :
1422 : /* Count up total space in data items without actually scanning 'em */
1423 869 : olddataitemstotal = rightspace - (int) PageGetExactFreeSpace(page);
1424 :
1425 869 : state.newitemsz = newitemsz;
1426 869 : state.is_leaf = P_ISLEAF(opaque);
1427 869 : state.is_rightmost = P_RIGHTMOST(opaque);
1428 869 : state.have_split = false;
1429 869 : if (state.is_leaf)
1430 827 : state.fillfactor = RelationGetFillFactor(rel,
1431 : BTREE_DEFAULT_FILLFACTOR);
1432 : else
1433 42 : state.fillfactor = BTREE_NONLEAF_FILLFACTOR;
1434 869 : state.newitemonleft = false; /* these just to keep compiler quiet */
1435 869 : state.firstright = 0;
1436 869 : state.best_delta = 0;
1437 869 : state.leftspace = leftspace;
1438 869 : state.rightspace = rightspace;
1439 869 : state.olddataitemstotal = olddataitemstotal;
1440 869 : state.newitemoff = newitemoff;
1441 :
1442 : /*
1443 : * Finding the best possible split would require checking all the possible
1444 : * split points, because of the high-key and left-key special cases.
1445 : * That's probably more work than it's worth; instead, stop as soon as we
1446 : * find a "good-enough" split, where good-enough is defined as an
1447 : * imbalance in free space of no more than pagesize/16 (arbitrary...) This
1448 : * should let us stop near the middle on most pages, instead of plowing to
1449 : * the end.
1450 : */
1451 869 : goodenough = leftspace / 16;
1452 :
1453 : /*
1454 : * Scan through the data items and calculate space usage for a split at
1455 : * each possible position.
1456 : */
1457 869 : olddataitemstoleft = 0;
1458 869 : goodenoughfound = false;
1459 869 : maxoff = PageGetMaxOffsetNumber(page);
1460 :
1461 168644 : for (offnum = P_FIRSTDATAKEY(opaque);
1462 : offnum <= maxoff;
1463 166906 : offnum = OffsetNumberNext(offnum))
1464 : {
1465 : Size itemsz;
1466 :
1467 167521 : itemid = PageGetItemId(page, offnum);
1468 167521 : itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
1469 :
1470 : /*
1471 : * Will the new item go to left or right of split?
1472 : */
1473 167521 : if (offnum > newitemoff)
1474 15044 : _bt_checksplitloc(&state, offnum, true,
1475 : olddataitemstoleft, itemsz);
1476 :
1477 152477 : else if (offnum < newitemoff)
1478 152308 : _bt_checksplitloc(&state, offnum, false,
1479 : olddataitemstoleft, itemsz);
1480 : else
1481 : {
1482 : /* need to try it both ways! */
1483 169 : _bt_checksplitloc(&state, offnum, true,
1484 : olddataitemstoleft, itemsz);
1485 :
1486 169 : _bt_checksplitloc(&state, offnum, false,
1487 : olddataitemstoleft, itemsz);
1488 : }
1489 :
1490 : /* Abort scan once we find a good-enough choice */
1491 167521 : if (state.have_split && state.best_delta <= goodenough)
1492 : {
1493 615 : goodenoughfound = true;
1494 615 : break;
1495 : }
1496 :
1497 166906 : olddataitemstoleft += itemsz;
1498 : }
1499 :
1500 : /*
1501 : * If the new item goes as the last item, check for splitting so that all
1502 : * the old items go to the left page and the new item goes to the right
1503 : * page.
1504 : */
1505 869 : if (newitemoff > maxoff && !goodenoughfound)
1506 224 : _bt_checksplitloc(&state, newitemoff, false, olddataitemstotal, 0);
1507 :
1508 : /*
1509 : * I believe it is not possible to fail to find a feasible split, but just
1510 : * in case ...
1511 : */
1512 869 : if (!state.have_split)
1513 0 : elog(ERROR, "could not find a feasible split point for index \"%s\"",
1514 : RelationGetRelationName(rel));
1515 :
1516 869 : *newitemonleft = state.newitemonleft;
1517 869 : return state.firstright;
1518 : }
1519 :
1520 : /*
1521 : * Subroutine to analyze a particular possible split choice (ie, firstright
1522 : * and newitemonleft settings), and record the best split so far in *state.
1523 : *
1524 : * firstoldonright is the offset of the first item on the original page
1525 : * that goes to the right page, and firstoldonrightsz is the size of that
1526 : * tuple. firstoldonright can be > max offset, which means that all the old
1527 : * items go to the left page and only the new item goes to the right page.
1528 : * In that case, firstoldonrightsz is not used.
1529 : *
1530 : * olddataitemstoleft is the total size of all old items to the left of
1531 : * firstoldonright.
1532 : */
1533 : static void
1534 167914 : _bt_checksplitloc(FindSplitData *state,
1535 : OffsetNumber firstoldonright,
1536 : bool newitemonleft,
1537 : int olddataitemstoleft,
1538 : Size firstoldonrightsz)
1539 : {
1540 : int leftfree,
1541 : rightfree;
1542 : Size firstrightitemsz;
1543 : bool newitemisfirstonright;
1544 :
1545 : /* Is the new item going to be the first item on the right page? */
1546 335828 : newitemisfirstonright = (firstoldonright == state->newitemoff
1547 167914 : && !newitemonleft);
1548 :
1549 167914 : if (newitemisfirstonright)
1550 393 : firstrightitemsz = state->newitemsz;
1551 : else
1552 167521 : firstrightitemsz = firstoldonrightsz;
1553 :
1554 : /* Account for all the old tuples */
1555 167914 : leftfree = state->leftspace - olddataitemstoleft;
1556 335828 : rightfree = state->rightspace -
1557 167914 : (state->olddataitemstotal - olddataitemstoleft);
1558 :
1559 : /*
1560 : * The first item on the right page becomes the high key of the left page;
1561 : * therefore it counts against left space as well as right space.
1562 : */
1563 167914 : leftfree -= firstrightitemsz;
1564 :
1565 : /* account for the new item */
1566 167914 : if (newitemonleft)
1567 15213 : leftfree -= (int) state->newitemsz;
1568 : else
1569 152701 : rightfree -= (int) state->newitemsz;
1570 :
1571 : /*
1572 : * If we are not on the leaf level, we will be able to discard the key
1573 : * data from the first item that winds up on the right page.
1574 : */
1575 167914 : if (!state->is_leaf)
1576 252 : rightfree += (int) firstrightitemsz -
1577 : (int) (MAXALIGN(sizeof(IndexTupleData)) + sizeof(ItemIdData));
1578 :
1579 : /*
1580 : * If feasible split point, remember best delta.
1581 : */
1582 167914 : if (leftfree >= 0 && rightfree >= 0)
1583 : {
1584 : int delta;
1585 :
1586 166777 : if (state->is_rightmost)
1587 : {
1588 : /*
1589 : * If splitting a rightmost page, try to put (100-fillfactor)% of
1590 : * free space on left page. See comments for _bt_findsplitloc.
1591 : */
1592 207184 : delta = (state->fillfactor * leftfree)
1593 103592 : - ((100 - state->fillfactor) * rightfree);
1594 : }
1595 : else
1596 : {
1597 : /* Otherwise, aim for equal free space on both sides */
1598 63185 : delta = leftfree - rightfree;
1599 : }
1600 :
1601 166777 : if (delta < 0)
1602 5944 : delta = -delta;
1603 166777 : if (!state->have_split || delta < state->best_delta)
1604 : {
1605 161101 : state->have_split = true;
1606 161101 : state->newitemonleft = newitemonleft;
1607 161101 : state->firstright = firstoldonright;
1608 161101 : state->best_delta = delta;
1609 : }
1610 : }
1611 167914 : }
1612 :
1613 : /*
1614 : * _bt_insert_parent() -- Insert downlink into parent after a page split.
1615 : *
1616 : * On entry, buf and rbuf are the left and right split pages, which we
1617 : * still hold write locks on per the L&Y algorithm. We release the
1618 : * write locks once we have write lock on the parent page. (Any sooner,
1619 : * and it'd be possible for some other process to try to split or delete
1620 : * one of these pages, and get confused because it cannot find the downlink.)
1621 : *
1622 : * stack - stack showing how we got here. May be NULL in cases that don't
1623 : * have to be efficient (concurrent ROOT split, WAL recovery)
1624 : * is_root - we split the true root
1625 : * is_only - we split a page alone on its level (might have been fast root)
1626 : */
1627 : static void
1628 869 : _bt_insert_parent(Relation rel,
1629 : Buffer buf,
1630 : Buffer rbuf,
1631 : BTStack stack,
1632 : bool is_root,
1633 : bool is_only)
1634 : {
1635 : /*
1636 : * Here we have to do something Lehman and Yao don't talk about: deal with
1637 : * a root split and construction of a new root. If our stack is empty
1638 : * then we have just split a node on what had been the root level when we
1639 : * descended the tree. If it was still the root then we perform a
1640 : * new-root construction. If it *wasn't* the root anymore, search to find
1641 : * the next higher level that someone constructed meanwhile, and find the
1642 : * right place to insert as for the normal case.
1643 : *
1644 : * If we have to search for the parent level, we do so by re-descending
1645 : * from the root. This is not super-efficient, but it's rare enough not
1646 : * to matter.
1647 : */
1648 869 : if (is_root)
1649 : {
1650 : Buffer rootbuf;
1651 :
1652 29 : Assert(stack == NULL);
1653 29 : Assert(is_only);
1654 : /* create a new root node and update the metapage */
1655 29 : rootbuf = _bt_newroot(rel, buf, rbuf);
1656 : /* release the split buffers */
1657 29 : _bt_relbuf(rel, rootbuf);
1658 29 : _bt_relbuf(rel, rbuf);
1659 29 : _bt_relbuf(rel, buf);
1660 : }
1661 : else
1662 : {
1663 840 : BlockNumber bknum = BufferGetBlockNumber(buf);
1664 840 : BlockNumber rbknum = BufferGetBlockNumber(rbuf);
1665 840 : Page page = BufferGetPage(buf);
1666 : IndexTuple new_item;
1667 : BTStackData fakestack;
1668 : IndexTuple ritem;
1669 : Buffer pbuf;
1670 :
1671 840 : if (stack == NULL)
1672 : {
1673 : BTPageOpaque lpageop;
1674 :
1675 1 : elog(DEBUG2, "concurrent ROOT page split");
1676 1 : lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
1677 : /* Find the leftmost page at the next level up */
1678 1 : pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false,
1679 : NULL);
1680 : /* Set up a phony stack entry pointing there */
1681 1 : stack = &fakestack;
1682 1 : stack->bts_blkno = BufferGetBlockNumber(pbuf);
1683 1 : stack->bts_offset = InvalidOffsetNumber;
1684 : /* bts_btentry will be initialized below */
1685 1 : stack->bts_parent = NULL;
1686 1 : _bt_relbuf(rel, pbuf);
1687 : }
1688 :
1689 : /* get high key from left page == lowest key on new right page */
1690 840 : ritem = (IndexTuple) PageGetItem(page,
1691 : PageGetItemId(page, P_HIKEY));
1692 :
1693 : /* form an index tuple that points at the new right page */
1694 840 : new_item = CopyIndexTuple(ritem);
1695 840 : ItemPointerSet(&(new_item->t_tid), rbknum, P_HIKEY);
1696 :
1697 : /*
1698 : * Find the parent buffer and get the parent page.
1699 : *
1700 : * Oops - if we were moved right then we need to change stack item! We
1701 : * want to find parent pointing to where we are, right ? - vadim
1702 : * 05/27/97
1703 : */
1704 840 : ItemPointerSet(&(stack->bts_btentry.t_tid), bknum, P_HIKEY);
1705 840 : pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
1706 :
1707 : /*
1708 : * Now we can unlock the right child. The left child will be unlocked
1709 : * by _bt_insertonpg().
1710 : */
1711 840 : _bt_relbuf(rel, rbuf);
1712 :
1713 : /* Check for error only after writing children */
1714 840 : if (pbuf == InvalidBuffer)
1715 0 : elog(ERROR, "failed to re-find parent key in index \"%s\" for split pages %u/%u",
1716 : RelationGetRelationName(rel), bknum, rbknum);
1717 :
1718 : /* Recursively update the parent */
1719 1680 : _bt_insertonpg(rel, pbuf, buf, stack->bts_parent,
1720 840 : new_item, stack->bts_offset + 1,
1721 : is_only);
1722 :
1723 : /* be tidy */
1724 840 : pfree(new_item);
1725 : }
1726 869 : }
1727 :
1728 : /*
1729 : * _bt_finish_split() -- Finish an incomplete split
1730 : *
1731 : * A crash or other failure can leave a split incomplete. The insertion
1732 : * routines won't allow to insert on a page that is incompletely split.
1733 : * Before inserting on such a page, call _bt_finish_split().
1734 : *
1735 : * On entry, 'lbuf' must be locked in write-mode. On exit, it is unlocked
1736 : * and unpinned.
1737 : */
1738 : void
1739 0 : _bt_finish_split(Relation rel, Buffer lbuf, BTStack stack)
1740 : {
1741 0 : Page lpage = BufferGetPage(lbuf);
1742 0 : BTPageOpaque lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage);
1743 : Buffer rbuf;
1744 : Page rpage;
1745 : BTPageOpaque rpageop;
1746 : bool was_root;
1747 : bool was_only;
1748 :
1749 0 : Assert(P_INCOMPLETE_SPLIT(lpageop));
1750 :
1751 : /* Lock right sibling, the one missing the downlink */
1752 0 : rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE);
1753 0 : rpage = BufferGetPage(rbuf);
1754 0 : rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
1755 :
1756 : /* Could this be a root split? */
1757 0 : if (!stack)
1758 : {
1759 : Buffer metabuf;
1760 : Page metapg;
1761 : BTMetaPageData *metad;
1762 :
1763 : /* acquire lock on the metapage */
1764 0 : metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1765 0 : metapg = BufferGetPage(metabuf);
1766 0 : metad = BTPageGetMeta(metapg);
1767 :
1768 0 : was_root = (metad->btm_root == BufferGetBlockNumber(lbuf));
1769 :
1770 0 : _bt_relbuf(rel, metabuf);
1771 : }
1772 : else
1773 0 : was_root = false;
1774 :
1775 : /* Was this the only page on the level before split? */
1776 0 : was_only = (P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop));
1777 :
1778 0 : elog(DEBUG1, "finishing incomplete split of %u/%u",
1779 : BufferGetBlockNumber(lbuf), BufferGetBlockNumber(rbuf));
1780 :
1781 0 : _bt_insert_parent(rel, lbuf, rbuf, stack, was_root, was_only);
1782 0 : }
1783 :
1784 : /*
1785 : * _bt_getstackbuf() -- Walk back up the tree one step, and find the item
1786 : * we last looked at in the parent.
1787 : *
1788 : * This is possible because we save the downlink from the parent item,
1789 : * which is enough to uniquely identify it. Insertions into the parent
1790 : * level could cause the item to move right; deletions could cause it
1791 : * to move left, but not left of the page we previously found it in.
1792 : *
1793 : * Adjusts bts_blkno & bts_offset if changed.
1794 : *
1795 : * Returns InvalidBuffer if item not found (should not happen).
1796 : */
1797 : Buffer
1798 1017 : _bt_getstackbuf(Relation rel, BTStack stack, int access)
1799 : {
1800 : BlockNumber blkno;
1801 : OffsetNumber start;
1802 :
1803 1017 : blkno = stack->bts_blkno;
1804 1017 : start = stack->bts_offset;
1805 :
1806 : for (;;)
1807 : {
1808 : Buffer buf;
1809 : Page page;
1810 : BTPageOpaque opaque;
1811 :
1812 1017 : buf = _bt_getbuf(rel, blkno, access);
1813 1017 : page = BufferGetPage(buf);
1814 1017 : opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1815 :
1816 1017 : if (access == BT_WRITE && P_INCOMPLETE_SPLIT(opaque))
1817 : {
1818 0 : _bt_finish_split(rel, buf, stack->bts_parent);
1819 0 : continue;
1820 : }
1821 :
1822 1017 : if (!P_IGNORE(opaque))
1823 : {
1824 : OffsetNumber offnum,
1825 : minoff,
1826 : maxoff;
1827 : ItemId itemid;
1828 : IndexTuple item;
1829 :
1830 1017 : minoff = P_FIRSTDATAKEY(opaque);
1831 1017 : maxoff = PageGetMaxOffsetNumber(page);
1832 :
1833 : /*
1834 : * start = InvalidOffsetNumber means "search the whole page". We
1835 : * need this test anyway due to possibility that page has a high
1836 : * key now when it didn't before.
1837 : */
1838 1017 : if (start < minoff)
1839 1 : start = minoff;
1840 :
1841 : /*
1842 : * Need this check too, to guard against possibility that page
1843 : * split since we visited it originally.
1844 : */
1845 1017 : if (start > maxoff)
1846 0 : start = OffsetNumberNext(maxoff);
1847 :
1848 : /*
1849 : * These loops will check every item on the page --- but in an
1850 : * order that's attuned to the probability of where it actually
1851 : * is. Scan to the right first, then to the left.
1852 : */
1853 2038 : for (offnum = start;
1854 : offnum <= maxoff;
1855 4 : offnum = OffsetNumberNext(offnum))
1856 : {
1857 1021 : itemid = PageGetItemId(page, offnum);
1858 1021 : item = (IndexTuple) PageGetItem(page, itemid);
1859 1021 : if (BTEntrySame(item, &stack->bts_btentry))
1860 : {
1861 : /* Return accurate pointer to where link is now */
1862 1017 : stack->bts_blkno = blkno;
1863 1017 : stack->bts_offset = offnum;
1864 1017 : return buf;
1865 : }
1866 : }
1867 :
1868 0 : for (offnum = OffsetNumberPrev(start);
1869 : offnum >= minoff;
1870 0 : offnum = OffsetNumberPrev(offnum))
1871 : {
1872 0 : itemid = PageGetItemId(page, offnum);
1873 0 : item = (IndexTuple) PageGetItem(page, itemid);
1874 0 : if (BTEntrySame(item, &stack->bts_btentry))
1875 : {
1876 : /* Return accurate pointer to where link is now */
1877 0 : stack->bts_blkno = blkno;
1878 0 : stack->bts_offset = offnum;
1879 0 : return buf;
1880 : }
1881 : }
1882 : }
1883 :
1884 : /*
1885 : * The item we're looking for moved right at least one page.
1886 : */
1887 0 : if (P_RIGHTMOST(opaque))
1888 : {
1889 0 : _bt_relbuf(rel, buf);
1890 0 : return InvalidBuffer;
1891 : }
1892 0 : blkno = opaque->btpo_next;
1893 0 : start = InvalidOffsetNumber;
1894 0 : _bt_relbuf(rel, buf);
1895 0 : }
1896 : }
1897 :
1898 : /*
1899 : * _bt_newroot() -- Create a new root page for the index.
1900 : *
1901 : * We've just split the old root page and need to create a new one.
1902 : * In order to do this, we add a new root page to the file, then lock
1903 : * the metadata page and update it. This is guaranteed to be deadlock-
1904 : * free, because all readers release their locks on the metadata page
1905 : * before trying to lock the root, and all writers lock the root before
1906 : * trying to lock the metadata page. We have a write lock on the old
1907 : * root page, so we have not introduced any cycles into the waits-for
1908 : * graph.
1909 : *
1910 : * On entry, lbuf (the old root) and rbuf (its new peer) are write-
1911 : * locked. On exit, a new root page exists with entries for the
1912 : * two new children, metapage is updated and unlocked/unpinned.
1913 : * The new root buffer is returned to caller which has to unlock/unpin
1914 : * lbuf, rbuf & rootbuf.
1915 : */
1916 : static Buffer
1917 29 : _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1918 : {
1919 : Buffer rootbuf;
1920 : Page lpage,
1921 : rootpage;
1922 : BlockNumber lbkno,
1923 : rbkno;
1924 : BlockNumber rootblknum;
1925 : BTPageOpaque rootopaque;
1926 : BTPageOpaque lopaque;
1927 : ItemId itemid;
1928 : IndexTuple item;
1929 : IndexTuple left_item;
1930 : Size left_item_sz;
1931 : IndexTuple right_item;
1932 : Size right_item_sz;
1933 : Buffer metabuf;
1934 : Page metapg;
1935 : BTMetaPageData *metad;
1936 :
1937 29 : lbkno = BufferGetBlockNumber(lbuf);
1938 29 : rbkno = BufferGetBlockNumber(rbuf);
1939 29 : lpage = BufferGetPage(lbuf);
1940 29 : lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
1941 :
1942 : /* get a new root page */
1943 29 : rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
1944 29 : rootpage = BufferGetPage(rootbuf);
1945 29 : rootblknum = BufferGetBlockNumber(rootbuf);
1946 :
1947 : /* acquire lock on the metapage */
1948 29 : metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1949 29 : metapg = BufferGetPage(metabuf);
1950 29 : metad = BTPageGetMeta(metapg);
1951 :
1952 : /*
1953 : * Create downlink item for left page (old root). Since this will be the
1954 : * first item in a non-leaf page, it implicitly has minus-infinity key
1955 : * value, so we need not store any actual key in it.
1956 : */
1957 29 : left_item_sz = sizeof(IndexTupleData);
1958 29 : left_item = (IndexTuple) palloc(left_item_sz);
1959 29 : left_item->t_info = left_item_sz;
1960 29 : ItemPointerSet(&(left_item->t_tid), lbkno, P_HIKEY);
1961 :
1962 : /*
1963 : * Create downlink item for right page. The key for it is obtained from
1964 : * the "high key" position in the left page.
1965 : */
1966 29 : itemid = PageGetItemId(lpage, P_HIKEY);
1967 29 : right_item_sz = ItemIdGetLength(itemid);
1968 29 : item = (IndexTuple) PageGetItem(lpage, itemid);
1969 29 : right_item = CopyIndexTuple(item);
1970 29 : ItemPointerSet(&(right_item->t_tid), rbkno, P_HIKEY);
1971 :
1972 : /* NO EREPORT(ERROR) from here till newroot op is logged */
1973 29 : START_CRIT_SECTION();
1974 :
1975 : /* set btree special data */
1976 29 : rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
1977 29 : rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
1978 29 : rootopaque->btpo_flags = BTP_ROOT;
1979 29 : rootopaque->btpo.level =
1980 29 : ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo.level + 1;
1981 29 : rootopaque->btpo_cycleid = 0;
1982 :
1983 : /* update metapage data */
1984 29 : metad->btm_root = rootblknum;
1985 29 : metad->btm_level = rootopaque->btpo.level;
1986 29 : metad->btm_fastroot = rootblknum;
1987 29 : metad->btm_fastlevel = rootopaque->btpo.level;
1988 :
1989 : /*
1990 : * Insert the left page pointer into the new root page. The root page is
1991 : * the rightmost page on its level so there is no "high key" in it; the
1992 : * two items will go into positions P_HIKEY and P_FIRSTKEY.
1993 : *
1994 : * Note: we *must* insert the two items in item-number order, for the
1995 : * benefit of _bt_restore_page().
1996 : */
1997 29 : if (PageAddItem(rootpage, (Item) left_item, left_item_sz, P_HIKEY,
1998 : false, false) == InvalidOffsetNumber)
1999 0 : elog(PANIC, "failed to add leftkey to new root page"
2000 : " while splitting block %u of index \"%s\"",
2001 : BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
2002 :
2003 : /*
2004 : * insert the right page pointer into the new root page.
2005 : */
2006 29 : if (PageAddItem(rootpage, (Item) right_item, right_item_sz, P_FIRSTKEY,
2007 : false, false) == InvalidOffsetNumber)
2008 0 : elog(PANIC, "failed to add rightkey to new root page"
2009 : " while splitting block %u of index \"%s\"",
2010 : BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
2011 :
2012 : /* Clear the incomplete-split flag in the left child */
2013 29 : Assert(P_INCOMPLETE_SPLIT(lopaque));
2014 29 : lopaque->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
2015 29 : MarkBufferDirty(lbuf);
2016 :
2017 29 : MarkBufferDirty(rootbuf);
2018 29 : MarkBufferDirty(metabuf);
2019 :
2020 : /* XLOG stuff */
2021 29 : if (RelationNeedsWAL(rel))
2022 : {
2023 : xl_btree_newroot xlrec;
2024 : XLogRecPtr recptr;
2025 : xl_btree_metadata md;
2026 :
2027 29 : xlrec.rootblk = rootblknum;
2028 29 : xlrec.level = metad->btm_level;
2029 :
2030 29 : XLogBeginInsert();
2031 29 : XLogRegisterData((char *) &xlrec, SizeOfBtreeNewroot);
2032 :
2033 29 : XLogRegisterBuffer(0, rootbuf, REGBUF_WILL_INIT);
2034 29 : XLogRegisterBuffer(1, lbuf, REGBUF_STANDARD);
2035 29 : XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT);
2036 :
2037 29 : md.root = rootblknum;
2038 29 : md.level = metad->btm_level;
2039 29 : md.fastroot = rootblknum;
2040 29 : md.fastlevel = metad->btm_level;
2041 :
2042 29 : XLogRegisterBufData(2, (char *) &md, sizeof(xl_btree_metadata));
2043 :
2044 : /*
2045 : * Direct access to page is not good but faster - we should implement
2046 : * some new func in page API.
2047 : */
2048 87 : XLogRegisterBufData(0,
2049 29 : (char *) rootpage + ((PageHeader) rootpage)->pd_upper,
2050 29 : ((PageHeader) rootpage)->pd_special -
2051 29 : ((PageHeader) rootpage)->pd_upper);
2052 :
2053 29 : recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT);
2054 :
2055 29 : PageSetLSN(lpage, recptr);
2056 29 : PageSetLSN(rootpage, recptr);
2057 29 : PageSetLSN(metapg, recptr);
2058 : }
2059 :
2060 29 : END_CRIT_SECTION();
2061 :
2062 : /* done with metapage */
2063 29 : _bt_relbuf(rel, metabuf);
2064 :
2065 29 : pfree(left_item);
2066 29 : pfree(right_item);
2067 :
2068 29 : return rootbuf;
2069 : }
2070 :
2071 : /*
2072 : * _bt_pgaddtup() -- add a tuple to a particular page in the index.
2073 : *
2074 : * This routine adds the tuple to the page as requested. It does
2075 : * not affect pin/lock status, but you'd better have a write lock
2076 : * and pin on the target buffer! Don't forget to write and release
2077 : * the buffer afterwards, either.
2078 : *
2079 : * The main difference between this routine and a bare PageAddItem call
2080 : * is that this code knows that the leftmost index tuple on a non-leaf
2081 : * btree page doesn't need to have a key. Therefore, it strips such
2082 : * tuples down to just the tuple header. CAUTION: this works ONLY if
2083 : * we insert the tuples in order, so that the given itup_off does
2084 : * represent the final position of the tuple!
2085 : */
2086 : static bool
2087 454102 : _bt_pgaddtup(Page page,
2088 : Size itemsize,
2089 : IndexTuple itup,
2090 : OffsetNumber itup_off)
2091 : {
2092 454102 : BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
2093 : IndexTupleData trunctuple;
2094 :
2095 454102 : if (!P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque))
2096 : {
2097 84 : trunctuple = *itup;
2098 84 : trunctuple.t_info = sizeof(IndexTupleData);
2099 84 : itup = &trunctuple;
2100 84 : itemsize = sizeof(IndexTupleData);
2101 : }
2102 :
2103 454102 : if (PageAddItem(page, (Item) itup, itemsize, itup_off,
2104 : false, false) == InvalidOffsetNumber)
2105 0 : return false;
2106 :
2107 454102 : return true;
2108 : }
2109 :
2110 : /*
2111 : * _bt_isequal - used in _bt_doinsert in check for duplicates.
2112 : *
2113 : * This is very similar to _bt_compare, except for NULL handling.
2114 : * Rule is simple: NOT_NULL not equal NULL, NULL not equal NULL too.
2115 : */
2116 : static bool
2117 56522 : _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
2118 : int keysz, ScanKey scankey)
2119 : {
2120 : IndexTuple itup;
2121 : int i;
2122 :
2123 : /* Better be comparing to a leaf item */
2124 56522 : Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page)));
2125 :
2126 56522 : itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
2127 :
2128 196076 : for (i = 1; i <= keysz; i++)
2129 : {
2130 : AttrNumber attno;
2131 : Datum datum;
2132 : bool isNull;
2133 : int32 result;
2134 :
2135 91923 : attno = scankey->sk_attno;
2136 91923 : Assert(attno == i);
2137 91923 : datum = index_getattr(itup, attno, itupdesc, &isNull);
2138 :
2139 : /* NULLs are never equal to anything */
2140 91923 : if (isNull || (scankey->sk_flags & SK_ISNULL))
2141 50425 : return false;
2142 :
2143 91905 : result = DatumGetInt32(FunctionCall2Coll(&scankey->sk_func,
2144 : scankey->sk_collation,
2145 : datum,
2146 : scankey->sk_argument));
2147 :
2148 91905 : if (result != 0)
2149 50389 : return false;
2150 :
2151 41516 : scankey++;
2152 : }
2153 :
2154 : /* if we get here, the keys are equal */
2155 6115 : return true;
2156 : }
2157 :
2158 : /*
2159 : * _bt_vacuum_one_page - vacuum just one index page.
2160 : *
2161 : * Try to remove LP_DEAD items from the given page. The passed buffer
2162 : * must be exclusive-locked, but unlike a real VACUUM, we don't need a
2163 : * super-exclusive "cleanup" lock (see nbtree/README).
2164 : */
2165 : static void
2166 257 : _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel)
2167 : {
2168 : OffsetNumber deletable[MaxOffsetNumber];
2169 257 : int ndeletable = 0;
2170 : OffsetNumber offnum,
2171 : minoff,
2172 : maxoff;
2173 257 : Page page = BufferGetPage(buffer);
2174 257 : BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
2175 :
2176 : /*
2177 : * Scan over all items to see which ones need to be deleted according to
2178 : * LP_DEAD flags.
2179 : */
2180 257 : minoff = P_FIRSTDATAKEY(opaque);
2181 257 : maxoff = PageGetMaxOffsetNumber(page);
2182 85380 : for (offnum = minoff;
2183 : offnum <= maxoff;
2184 84866 : offnum = OffsetNumberNext(offnum))
2185 : {
2186 84866 : ItemId itemId = PageGetItemId(page, offnum);
2187 :
2188 84866 : if (ItemIdIsDead(itemId))
2189 2747 : deletable[ndeletable++] = offnum;
2190 : }
2191 :
2192 257 : if (ndeletable > 0)
2193 257 : _bt_delitems_delete(rel, buffer, deletable, ndeletable, heapRel);
2194 :
2195 : /*
2196 : * Note: if we didn't find any LP_DEAD items, then the page's
2197 : * BTP_HAS_GARBAGE hint bit is falsely set. We do not bother expending a
2198 : * separate write to clear it, however. We will clear it when we split
2199 : * the page.
2200 : */
2201 257 : }
|