Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * spgdoinsert.c
4 : * implementation of insert algorithm
5 : *
6 : *
7 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994, Regents of the University of California
9 : *
10 : * IDENTIFICATION
11 : * src/backend/access/spgist/spgdoinsert.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : #include "postgres.h"
17 :
18 : #include "access/genam.h"
19 : #include "access/spgist_private.h"
20 : #include "access/spgxlog.h"
21 : #include "access/xloginsert.h"
22 : #include "miscadmin.h"
23 : #include "storage/bufmgr.h"
24 : #include "utils/rel.h"
25 :
26 :
27 : /*
28 : * SPPageDesc tracks all info about a page we are inserting into. In some
29 : * situations it actually identifies a tuple, or even a specific node within
30 : * an inner tuple. But any of the fields can be invalid. If the buffer
31 : * field is valid, it implies we hold pin and exclusive lock on that buffer.
32 : * page pointer should be valid exactly when buffer is.
33 : */
34 : typedef struct SPPageDesc
35 : {
36 : BlockNumber blkno; /* block number, or InvalidBlockNumber */
37 : Buffer buffer; /* page's buffer number, or InvalidBuffer */
38 : Page page; /* pointer to page buffer, or NULL */
39 : OffsetNumber offnum; /* offset of tuple, or InvalidOffsetNumber */
40 : int node; /* node number within inner tuple, or -1 */
41 : } SPPageDesc;
42 :
43 :
44 : /*
45 : * Set the item pointer in the nodeN'th entry in inner tuple tup. This
46 : * is used to update the parent inner tuple's downlink after a move or
47 : * split operation.
48 : */
49 : void
50 1106 : spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN,
51 : BlockNumber blkno, OffsetNumber offset)
52 : {
53 : int i;
54 : SpGistNodeTuple node;
55 :
56 5141 : SGITITERATE(tup, i, node)
57 : {
58 5141 : if (i == nodeN)
59 : {
60 1106 : ItemPointerSet(&node->t_tid, blkno, offset);
61 2212 : return;
62 : }
63 : }
64 :
65 0 : elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
66 : nodeN);
67 : }
68 :
69 : /*
70 : * Form a new inner tuple containing one more node than the given one, with
71 : * the specified label datum, inserted at offset "offset" in the node array.
72 : * The new tuple's prefix is the same as the old one's.
73 : *
74 : * Note that the new node initially has an invalid downlink. We'll find a
75 : * page to point it to later.
76 : */
77 : static SpGistInnerTuple
78 223 : addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
79 : {
80 : SpGistNodeTuple node,
81 : *nodes;
82 : int i;
83 :
84 : /* if offset is negative, insert at end */
85 223 : if (offset < 0)
86 0 : offset = tuple->nNodes;
87 223 : else if (offset > tuple->nNodes)
88 0 : elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");
89 :
90 223 : nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1));
91 1297 : SGITITERATE(tuple, i, node)
92 : {
93 1074 : if (i < offset)
94 1064 : nodes[i] = node;
95 : else
96 10 : nodes[i + 1] = node;
97 : }
98 :
99 223 : nodes[offset] = spgFormNodeTuple(state, label, false);
100 :
101 785 : return spgFormInnerTuple(state,
102 223 : (tuple->prefixSize > 0),
103 339 : SGITDATUM(tuple, state),
104 223 : tuple->nNodes + 1,
105 : nodes);
106 : }
107 :
108 : /* qsort comparator for sorting OffsetNumbers */
109 : static int
110 575366 : cmpOffsetNumbers(const void *a, const void *b)
111 : {
112 575366 : if (*(const OffsetNumber *) a == *(const OffsetNumber *) b)
113 0 : return 0;
114 575366 : return (*(const OffsetNumber *) a > *(const OffsetNumber *) b) ? 1 : -1;
115 : }
116 :
117 : /*
118 : * Delete multiple tuples from an index page, preserving tuple offset numbers.
119 : *
120 : * The first tuple in the given list is replaced with a dead tuple of type
121 : * "firststate" (REDIRECT/DEAD/PLACEHOLDER); the remaining tuples are replaced
122 : * with dead tuples of type "reststate". If either firststate or reststate
123 : * is REDIRECT, blkno/offnum specify where to link to.
124 : *
125 : * NB: this is used during WAL replay, so beware of trying to make it too
126 : * smart. In particular, it shouldn't use "state" except for calling
127 : * spgFormDeadTuple(). This is also used in a critical section, so no
128 : * pallocs either!
129 : */
130 : void
131 953 : spgPageIndexMultiDelete(SpGistState *state, Page page,
132 : OffsetNumber *itemnos, int nitems,
133 : int firststate, int reststate,
134 : BlockNumber blkno, OffsetNumber offnum)
135 : {
136 : OffsetNumber firstItem;
137 : OffsetNumber sortednos[MaxIndexTuplesPerPage];
138 953 : SpGistDeadTuple tuple = NULL;
139 : int i;
140 :
141 953 : if (nitems == 0)
142 1072 : return; /* nothing to do */
143 :
144 : /*
145 : * For efficiency we want to use PageIndexMultiDelete, which requires the
146 : * targets to be listed in sorted order, so we have to sort the itemnos
147 : * array. (This also greatly simplifies the math for reinserting the
148 : * replacement tuples.) However, we must not scribble on the caller's
149 : * array, so we have to make a copy.
150 : */
151 834 : memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
152 834 : if (nitems > 1)
153 717 : qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);
154 :
155 834 : PageIndexMultiDelete(page, sortednos, nitems);
156 :
157 834 : firstItem = itemnos[0];
158 :
159 87889 : for (i = 0; i < nitems; i++)
160 : {
161 87055 : OffsetNumber itemno = sortednos[i];
162 : int tupstate;
163 :
164 87055 : tupstate = (itemno == firstItem) ? firststate : reststate;
165 87055 : if (tuple == NULL || tuple->tupstate != tupstate)
166 1311 : tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);
167 :
168 87055 : if (PageAddItem(page, (Item) tuple, tuple->size,
169 : itemno, false, false) != itemno)
170 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
171 : tuple->size);
172 :
173 87055 : if (tupstate == SPGIST_REDIRECT)
174 243 : SpGistPageGetOpaque(page)->nRedirection++;
175 86812 : else if (tupstate == SPGIST_PLACEHOLDER)
176 86812 : SpGistPageGetOpaque(page)->nPlaceholder++;
177 : }
178 : }
179 :
180 : /*
181 : * Update the parent inner tuple's downlink, and mark the parent buffer
182 : * dirty (this must be the last change to the parent page in the current
183 : * WAL action).
184 : */
185 : static void
186 894 : saveNodeLink(Relation index, SPPageDesc *parent,
187 : BlockNumber blkno, OffsetNumber offnum)
188 : {
189 : SpGistInnerTuple innerTuple;
190 :
191 894 : innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
192 : PageGetItemId(parent->page, parent->offnum));
193 :
194 894 : spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);
195 :
196 894 : MarkBufferDirty(parent->buffer);
197 894 : }
198 :
199 : /*
200 : * Add a leaf tuple to a leaf page where there is known to be room for it
201 : */
202 : static void
203 82227 : addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
204 : SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
205 : {
206 : spgxlogAddLeaf xlrec;
207 :
208 82227 : xlrec.newPage = isNew;
209 82227 : xlrec.storesNulls = isNulls;
210 :
211 : /* these will be filled below as needed */
212 82227 : xlrec.offnumLeaf = InvalidOffsetNumber;
213 82227 : xlrec.offnumHeadLeaf = InvalidOffsetNumber;
214 82227 : xlrec.offnumParent = InvalidOffsetNumber;
215 82227 : xlrec.nodeI = 0;
216 :
217 82227 : START_CRIT_SECTION();
218 :
219 164192 : if (current->offnum == InvalidOffsetNumber ||
220 162261 : SpGistBlockIsRoot(current->blkno))
221 : {
222 : /* Tuple is not part of a chain */
223 1940 : leafTuple->nextOffset = InvalidOffsetNumber;
224 1940 : current->offnum = SpGistPageAddNewItem(state, current->page,
225 1940 : (Item) leafTuple, leafTuple->size,
226 : NULL, false);
227 :
228 1940 : xlrec.offnumLeaf = current->offnum;
229 :
230 : /* Must update parent's downlink if any */
231 3880 : if (parent->buffer != InvalidBuffer)
232 : {
233 262 : xlrec.offnumParent = parent->offnum;
234 262 : xlrec.nodeI = parent->node;
235 :
236 262 : saveNodeLink(index, parent, current->blkno, current->offnum);
237 : }
238 : }
239 : else
240 : {
241 : /*
242 : * Tuple must be inserted into existing chain. We mustn't change the
243 : * chain's head address, but we don't need to chase the entire chain
244 : * to put the tuple at the end; we can insert it second.
245 : *
246 : * Also, it's possible that the "chain" consists only of a DEAD tuple,
247 : * in which case we should replace the DEAD tuple in-place.
248 : */
249 : SpGistLeafTuple head;
250 : OffsetNumber offnum;
251 :
252 80287 : head = (SpGistLeafTuple) PageGetItem(current->page,
253 : PageGetItemId(current->page, current->offnum));
254 80287 : if (head->tupstate == SPGIST_LIVE)
255 : {
256 80287 : leafTuple->nextOffset = head->nextOffset;
257 80287 : offnum = SpGistPageAddNewItem(state, current->page,
258 80287 : (Item) leafTuple, leafTuple->size,
259 : NULL, false);
260 :
261 : /*
262 : * re-get head of list because it could have been moved on page,
263 : * and set new second element
264 : */
265 80287 : head = (SpGistLeafTuple) PageGetItem(current->page,
266 : PageGetItemId(current->page, current->offnum));
267 80287 : head->nextOffset = offnum;
268 :
269 80287 : xlrec.offnumLeaf = offnum;
270 80287 : xlrec.offnumHeadLeaf = current->offnum;
271 : }
272 0 : else if (head->tupstate == SPGIST_DEAD)
273 : {
274 0 : leafTuple->nextOffset = InvalidOffsetNumber;
275 0 : PageIndexTupleDelete(current->page, current->offnum);
276 0 : if (PageAddItem(current->page,
277 : (Item) leafTuple, leafTuple->size,
278 0 : current->offnum, false, false) != current->offnum)
279 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
280 : leafTuple->size);
281 :
282 : /* WAL replay distinguishes this case by equal offnums */
283 0 : xlrec.offnumLeaf = current->offnum;
284 0 : xlrec.offnumHeadLeaf = current->offnum;
285 : }
286 : else
287 0 : elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
288 : }
289 :
290 82227 : MarkBufferDirty(current->buffer);
291 :
292 82227 : if (RelationNeedsWAL(index))
293 : {
294 : XLogRecPtr recptr;
295 : int flags;
296 :
297 82171 : XLogBeginInsert();
298 82171 : XLogRegisterData((char *) &xlrec, sizeof(xlrec));
299 82171 : XLogRegisterData((char *) leafTuple, leafTuple->size);
300 :
301 82171 : flags = REGBUF_STANDARD;
302 82171 : if (xlrec.newPage)
303 16 : flags |= REGBUF_WILL_INIT;
304 82171 : XLogRegisterBuffer(0, current->buffer, flags);
305 82171 : if (xlrec.offnumParent != InvalidOffsetNumber)
306 262 : XLogRegisterBuffer(1, parent->buffer, REGBUF_STANDARD);
307 :
308 82171 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF);
309 :
310 82171 : PageSetLSN(current->page, recptr);
311 :
312 : /* update parent only if we actually changed it */
313 82171 : if (xlrec.offnumParent != InvalidOffsetNumber)
314 : {
315 262 : PageSetLSN(parent->page, recptr);
316 : }
317 : }
318 :
319 82227 : END_CRIT_SECTION();
320 82227 : }
321 :
322 : /*
323 : * Count the number and total size of leaf tuples in the chain starting at
324 : * current->offnum. Return number into *nToSplit and total size as function
325 : * result.
326 : *
327 : * Klugy special case when considering the root page (i.e., root is a leaf
328 : * page, but we're about to split for the first time): return fake large
329 : * values to force spgdoinsert() to take the doPickSplit rather than
330 : * moveLeafs code path. moveLeafs is not prepared to deal with root page.
331 : */
332 : static int
333 640 : checkSplitConditions(Relation index, SpGistState *state,
334 : SPPageDesc *current, int *nToSplit)
335 : {
336 : int i,
337 640 : n = 0,
338 640 : totalSize = 0;
339 :
340 640 : if (SpGistBlockIsRoot(current->blkno))
341 : {
342 : /* return impossible values to force split */
343 8 : *nToSplit = BLCKSZ;
344 8 : return BLCKSZ;
345 : }
346 :
347 632 : i = current->offnum;
348 80594 : while (i != InvalidOffsetNumber)
349 : {
350 : SpGistLeafTuple it;
351 :
352 79330 : Assert(i >= FirstOffsetNumber &&
353 : i <= PageGetMaxOffsetNumber(current->page));
354 79330 : it = (SpGistLeafTuple) PageGetItem(current->page,
355 : PageGetItemId(current->page, i));
356 79330 : if (it->tupstate == SPGIST_LIVE)
357 : {
358 79330 : n++;
359 79330 : totalSize += it->size + sizeof(ItemIdData);
360 : }
361 0 : else if (it->tupstate == SPGIST_DEAD)
362 : {
363 : /* We could see a DEAD tuple as first/only chain item */
364 0 : Assert(i == current->offnum);
365 0 : Assert(it->nextOffset == InvalidOffsetNumber);
366 : /* Don't count it in result, because it won't go to other page */
367 : }
368 : else
369 0 : elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
370 :
371 79330 : i = it->nextOffset;
372 : }
373 :
374 632 : *nToSplit = n;
375 :
376 632 : return totalSize;
377 : }
378 :
379 : /*
380 : * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
381 : * but the chain has to be moved because there's not enough room to add
382 : * newLeafTuple to its page. We use this method when the chain contains
383 : * very little data so a split would be inefficient. We are sure we can
384 : * fit the chain plus newLeafTuple on one other page.
385 : */
386 : static void
387 121 : moveLeafs(Relation index, SpGistState *state,
388 : SPPageDesc *current, SPPageDesc *parent,
389 : SpGistLeafTuple newLeafTuple, bool isNulls)
390 : {
391 : int i,
392 : nDelete,
393 : nInsert,
394 : size;
395 : Buffer nbuf;
396 : Page npage;
397 : SpGistLeafTuple it;
398 121 : OffsetNumber r = InvalidOffsetNumber,
399 121 : startOffset = InvalidOffsetNumber;
400 121 : bool replaceDead = false;
401 : OffsetNumber *toDelete;
402 : OffsetNumber *toInsert;
403 : BlockNumber nblkno;
404 : spgxlogMoveLeafs xlrec;
405 : char *leafdata,
406 : *leafptr;
407 :
408 : /* This doesn't work on root page */
409 121 : Assert(parent->buffer != InvalidBuffer);
410 121 : Assert(parent->buffer != current->buffer);
411 :
412 : /* Locate the tuples to be moved, and count up the space needed */
413 121 : i = PageGetMaxOffsetNumber(current->page);
414 121 : toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i);
415 121 : toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1));
416 :
417 121 : size = newLeafTuple->size + sizeof(ItemIdData);
418 :
419 121 : nDelete = 0;
420 121 : i = current->offnum;
421 5398 : while (i != InvalidOffsetNumber)
422 : {
423 : SpGistLeafTuple it;
424 :
425 5156 : Assert(i >= FirstOffsetNumber &&
426 : i <= PageGetMaxOffsetNumber(current->page));
427 5156 : it = (SpGistLeafTuple) PageGetItem(current->page,
428 : PageGetItemId(current->page, i));
429 :
430 5156 : if (it->tupstate == SPGIST_LIVE)
431 : {
432 5156 : toDelete[nDelete] = i;
433 5156 : size += it->size + sizeof(ItemIdData);
434 5156 : nDelete++;
435 : }
436 0 : else if (it->tupstate == SPGIST_DEAD)
437 : {
438 : /* We could see a DEAD tuple as first/only chain item */
439 0 : Assert(i == current->offnum);
440 0 : Assert(it->nextOffset == InvalidOffsetNumber);
441 : /* We don't want to move it, so don't count it in size */
442 0 : toDelete[nDelete] = i;
443 0 : nDelete++;
444 0 : replaceDead = true;
445 : }
446 : else
447 0 : elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
448 :
449 5156 : i = it->nextOffset;
450 : }
451 :
452 : /* Find a leaf page that will hold them */
453 121 : nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
454 : size, &xlrec.newPage);
455 121 : npage = BufferGetPage(nbuf);
456 121 : nblkno = BufferGetBlockNumber(nbuf);
457 121 : Assert(nblkno != current->blkno);
458 :
459 121 : leafdata = leafptr = palloc(size);
460 :
461 121 : START_CRIT_SECTION();
462 :
463 : /* copy all the old tuples to new page, unless they're dead */
464 121 : nInsert = 0;
465 121 : if (!replaceDead)
466 : {
467 5277 : for (i = 0; i < nDelete; i++)
468 : {
469 5156 : it = (SpGistLeafTuple) PageGetItem(current->page,
470 : PageGetItemId(current->page, toDelete[i]));
471 5156 : Assert(it->tupstate == SPGIST_LIVE);
472 :
473 : /*
474 : * Update chain link (notice the chain order gets reversed, but we
475 : * don't care). We're modifying the tuple on the source page
476 : * here, but it's okay since we're about to delete it.
477 : */
478 5156 : it->nextOffset = r;
479 :
480 5156 : r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
481 : &startOffset, false);
482 :
483 5156 : toInsert[nInsert] = r;
484 5156 : nInsert++;
485 :
486 : /* save modified tuple into leafdata as well */
487 5156 : memcpy(leafptr, it, it->size);
488 5156 : leafptr += it->size;
489 : }
490 : }
491 :
492 : /* add the new tuple as well */
493 121 : newLeafTuple->nextOffset = r;
494 121 : r = SpGistPageAddNewItem(state, npage,
495 121 : (Item) newLeafTuple, newLeafTuple->size,
496 : &startOffset, false);
497 121 : toInsert[nInsert] = r;
498 121 : nInsert++;
499 121 : memcpy(leafptr, newLeafTuple, newLeafTuple->size);
500 121 : leafptr += newLeafTuple->size;
501 :
502 : /*
503 : * Now delete the old tuples, leaving a redirection pointer behind for the
504 : * first one, unless we're doing an index build; in which case there can't
505 : * be any concurrent scan so we need not provide a redirect.
506 : */
507 242 : spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
508 121 : state->isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
509 : SPGIST_PLACEHOLDER,
510 : nblkno, r);
511 :
512 : /* Update parent's downlink and mark parent page dirty */
513 121 : saveNodeLink(index, parent, nblkno, r);
514 :
515 : /* Mark the leaf pages too */
516 121 : MarkBufferDirty(current->buffer);
517 121 : MarkBufferDirty(nbuf);
518 :
519 121 : if (RelationNeedsWAL(index))
520 : {
521 : XLogRecPtr recptr;
522 :
523 : /* prepare WAL info */
524 121 : STORE_STATE(state, xlrec.stateSrc);
525 :
526 121 : xlrec.nMoves = nDelete;
527 121 : xlrec.replaceDead = replaceDead;
528 121 : xlrec.storesNulls = isNulls;
529 :
530 121 : xlrec.offnumParent = parent->offnum;
531 121 : xlrec.nodeI = parent->node;
532 :
533 121 : XLogBeginInsert();
534 121 : XLogRegisterData((char *) &xlrec, SizeOfSpgxlogMoveLeafs);
535 121 : XLogRegisterData((char *) toDelete,
536 121 : sizeof(OffsetNumber) * nDelete);
537 121 : XLogRegisterData((char *) toInsert,
538 121 : sizeof(OffsetNumber) * nInsert);
539 121 : XLogRegisterData((char *) leafdata, leafptr - leafdata);
540 :
541 121 : XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
542 121 : XLogRegisterBuffer(1, nbuf, REGBUF_STANDARD | (xlrec.newPage ? REGBUF_WILL_INIT : 0));
543 121 : XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
544 :
545 121 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS);
546 :
547 121 : PageSetLSN(current->page, recptr);
548 121 : PageSetLSN(npage, recptr);
549 121 : PageSetLSN(parent->page, recptr);
550 : }
551 :
552 121 : END_CRIT_SECTION();
553 :
554 : /* Update local free-space cache and release new buffer */
555 121 : SpGistSetLastUsedPage(index, nbuf);
556 121 : UnlockReleaseBuffer(nbuf);
557 121 : }
558 :
559 : /*
560 : * Update previously-created redirection tuple with appropriate destination
561 : *
562 : * We use this when it's not convenient to know the destination first.
563 : * The tuple should have been made with the "impossible" destination of
564 : * the metapage.
565 : */
566 : static void
567 185 : setRedirectionTuple(SPPageDesc *current, OffsetNumber position,
568 : BlockNumber blkno, OffsetNumber offnum)
569 : {
570 : SpGistDeadTuple dt;
571 :
572 185 : dt = (SpGistDeadTuple) PageGetItem(current->page,
573 : PageGetItemId(current->page, position));
574 185 : Assert(dt->tupstate == SPGIST_REDIRECT);
575 185 : Assert(ItemPointerGetBlockNumber(&dt->pointer) == SPGIST_METAPAGE_BLKNO);
576 185 : ItemPointerSet(&dt->pointer, blkno, offnum);
577 185 : }
578 :
579 : /*
580 : * Test to see if the user-defined picksplit function failed to do its job,
581 : * ie, it put all the leaf tuples into the same node.
582 : * If so, randomly divide the tuples into several nodes (all with the same
583 : * label) and return TRUE to select allTheSame mode for this inner tuple.
584 : *
585 : * (This code is also used to forcibly select allTheSame mode for nulls.)
586 : *
587 : * If we know that the leaf tuples wouldn't all fit on one page, then we
588 : * exclude the last tuple (which is the incoming new tuple that forced a split)
589 : * from the check to see if more than one node is used. The reason for this
590 : * is that if the existing tuples are put into only one chain, then even if
591 : * we move them all to an empty page, there would still not be room for the
592 : * new tuple, so we'd get into an infinite loop of picksplit attempts.
593 : * Forcing allTheSame mode dodges this problem by ensuring the old tuples will
594 : * be split across pages. (Exercise for the reader: figure out why this
595 : * fixes the problem even when there is only one old tuple.)
596 : */
597 : static bool
598 519 : checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig,
599 : bool *includeNew)
600 : {
601 : int theNode;
602 : int limit;
603 : int i;
604 :
605 : /* For the moment, assume we can include the new leaf tuple */
606 519 : *includeNew = true;
607 :
608 : /* If there's only the new leaf tuple, don't select allTheSame mode */
609 519 : if (in->nTuples <= 1)
610 0 : return false;
611 :
612 : /* If tuple set doesn't fit on one page, ignore the new tuple in test */
613 519 : limit = tooBig ? in->nTuples - 1 : in->nTuples;
614 :
615 : /* Check to see if more than one node is populated */
616 519 : theNode = out->mapTuplesToNodes[0];
617 16730 : for (i = 1; i < limit; i++)
618 : {
619 16701 : if (out->mapTuplesToNodes[i] != theNode)
620 490 : return false;
621 : }
622 :
623 : /* Nope, so override the picksplit function's decisions */
624 :
625 : /* If the new tuple is in its own node, it can't be included in split */
626 29 : if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
627 0 : *includeNew = false;
628 :
629 29 : out->nNodes = 8; /* arbitrary number of child nodes */
630 :
631 : /* Random assignment of tuples to nodes (note we include new tuple) */
632 3071 : for (i = 0; i < in->nTuples; i++)
633 3042 : out->mapTuplesToNodes[i] = i % out->nNodes;
634 :
635 : /* The opclass may not use node labels, but if it does, duplicate 'em */
636 29 : if (out->nodeLabels)
637 : {
638 5 : Datum theLabel = out->nodeLabels[theNode];
639 :
640 5 : out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
641 45 : for (i = 0; i < out->nNodes; i++)
642 40 : out->nodeLabels[i] = theLabel;
643 : }
644 :
645 : /* We don't touch the prefix or the leaf tuple datum assignments */
646 :
647 29 : return true;
648 : }
649 :
650 : /*
651 : * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
652 : * but the chain has to be split because there's not enough room to add
653 : * newLeafTuple to its page.
654 : *
655 : * This function splits the leaf tuple set according to picksplit's rules,
656 : * creating one or more new chains that are spread across the current page
657 : * and an additional leaf page (we assume that two leaf pages will be
658 : * sufficient). A new inner tuple is created, and the parent downlink
659 : * pointer is updated to point to that inner tuple instead of the leaf chain.
660 : *
661 : * On exit, current contains the address of the new inner tuple.
662 : *
663 : * Returns true if we successfully inserted newLeafTuple during this function,
664 : * false if caller still has to do it (meaning another picksplit operation is
665 : * probably needed). Failure could occur if the picksplit result is fairly
666 : * unbalanced, or if newLeafTuple is just plain too big to fit on a page.
667 : * Because we force the picksplit result to be at least two chains, each
668 : * cycle will get rid of at least one leaf tuple from the chain, so the loop
669 : * will eventually terminate if lack of balance is the issue. If the tuple
670 : * is too big, we assume that repeated picksplit operations will eventually
671 : * make it small enough by repeated prefix-stripping. A broken opclass could
672 : * make this an infinite loop, though.
673 : */
674 : static bool
675 519 : doPickSplit(Relation index, SpGistState *state,
676 : SPPageDesc *current, SPPageDesc *parent,
677 : SpGistLeafTuple newLeafTuple,
678 : int level, bool isNulls, bool isNew)
679 : {
680 519 : bool insertedNew = false;
681 : spgPickSplitIn in;
682 : spgPickSplitOut out;
683 : FmgrInfo *procinfo;
684 : bool includeNew;
685 : int i,
686 : max,
687 : n;
688 : SpGistInnerTuple innerTuple;
689 : SpGistNodeTuple node,
690 : *nodes;
691 : Buffer newInnerBuffer,
692 : newLeafBuffer;
693 : ItemPointerData *heapPtrs;
694 : uint8 *leafPageSelect;
695 : int *leafSizes;
696 : OffsetNumber *toDelete;
697 : OffsetNumber *toInsert;
698 519 : OffsetNumber redirectTuplePos = InvalidOffsetNumber;
699 : OffsetNumber startOffsets[2];
700 : SpGistLeafTuple *newLeafs;
701 : int spaceToDelete;
702 : int currentFreeSpace;
703 : int totalLeafSizes;
704 : bool allTheSame;
705 : spgxlogPickSplit xlrec;
706 : char *leafdata,
707 : *leafptr;
708 : SPPageDesc saveCurrent;
709 : int nToDelete,
710 : nToInsert,
711 : maxToInclude;
712 :
713 519 : in.level = level;
714 :
715 : /*
716 : * Allocate per-leaf-tuple work arrays with max possible size
717 : */
718 519 : max = PageGetMaxOffsetNumber(current->page);
719 519 : n = max + 1;
720 519 : in.datums = (Datum *) palloc(sizeof(Datum) * n);
721 519 : heapPtrs = (ItemPointerData *) palloc(sizeof(ItemPointerData) * n);
722 519 : toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
723 519 : toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
724 519 : newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
725 519 : leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
726 :
727 519 : STORE_STATE(state, xlrec.stateSrc);
728 :
729 : /*
730 : * Form list of leaf tuples which will be distributed as split result;
731 : * also, count up the amount of space that will be freed from current.
732 : * (Note that in the non-root case, we won't actually delete the old
733 : * tuples, only replace them with redirects or placeholders.)
734 : *
735 : * Note: the SGLTDATUM calls here are safe even when dealing with a nulls
736 : * page. For a pass-by-value data type we will fetch a word that must
737 : * exist even though it may contain garbage (because of the fact that leaf
738 : * tuples must have size at least SGDTSIZE). For a pass-by-reference type
739 : * we are just computing a pointer that isn't going to get dereferenced.
740 : * So it's not worth guarding the calls with isNulls checks.
741 : */
742 519 : nToInsert = 0;
743 519 : nToDelete = 0;
744 519 : spaceToDelete = 0;
745 527 : if (SpGistBlockIsRoot(current->blkno))
746 : {
747 : /*
748 : * We are splitting the root (which up to now is also a leaf page).
749 : * Its tuples are not linked, so scan sequentially to get them all. We
750 : * ignore the original value of current->offnum.
751 : */
752 1605 : for (i = FirstOffsetNumber; i <= max; i++)
753 : {
754 : SpGistLeafTuple it;
755 :
756 1597 : it = (SpGistLeafTuple) PageGetItem(current->page,
757 : PageGetItemId(current->page, i));
758 1597 : if (it->tupstate == SPGIST_LIVE)
759 : {
760 1597 : in.datums[nToInsert] = SGLTDATUM(it, state);
761 1597 : heapPtrs[nToInsert] = it->heapPtr;
762 1597 : nToInsert++;
763 1597 : toDelete[nToDelete] = i;
764 1597 : nToDelete++;
765 : /* we will delete the tuple altogether, so count full space */
766 1597 : spaceToDelete += it->size + sizeof(ItemIdData);
767 : }
768 : else /* tuples on root should be live */
769 0 : elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
770 : }
771 : }
772 : else
773 : {
774 : /* Normal case, just collect the leaf tuples in the chain */
775 511 : i = current->offnum;
776 75196 : while (i != InvalidOffsetNumber)
777 : {
778 : SpGistLeafTuple it;
779 :
780 74174 : Assert(i >= FirstOffsetNumber && i <= max);
781 74174 : it = (SpGistLeafTuple) PageGetItem(current->page,
782 : PageGetItemId(current->page, i));
783 74174 : if (it->tupstate == SPGIST_LIVE)
784 : {
785 74174 : in.datums[nToInsert] = SGLTDATUM(it, state);
786 74174 : heapPtrs[nToInsert] = it->heapPtr;
787 74174 : nToInsert++;
788 74174 : toDelete[nToDelete] = i;
789 74174 : nToDelete++;
790 : /* we will not delete the tuple, only replace with dead */
791 74174 : Assert(it->size >= SGDTSIZE);
792 74174 : spaceToDelete += it->size - SGDTSIZE;
793 : }
794 0 : else if (it->tupstate == SPGIST_DEAD)
795 : {
796 : /* We could see a DEAD tuple as first/only chain item */
797 0 : Assert(i == current->offnum);
798 0 : Assert(it->nextOffset == InvalidOffsetNumber);
799 0 : toDelete[nToDelete] = i;
800 0 : nToDelete++;
801 : /* replacing it with redirect will save no space */
802 : }
803 : else
804 0 : elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
805 :
806 74174 : i = it->nextOffset;
807 : }
808 : }
809 519 : in.nTuples = nToInsert;
810 :
811 : /*
812 : * We may not actually insert new tuple because another picksplit may be
813 : * necessary due to too large value, but we will try to allocate enough
814 : * space to include it; and in any case it has to be included in the input
815 : * for the picksplit function. So don't increment nToInsert yet.
816 : */
817 519 : in.datums[in.nTuples] = SGLTDATUM(newLeafTuple, state);
818 519 : heapPtrs[in.nTuples] = newLeafTuple->heapPtr;
819 519 : in.nTuples++;
820 :
821 519 : memset(&out, 0, sizeof(out));
822 :
823 519 : if (!isNulls)
824 : {
825 : /*
826 : * Perform split using user-defined method.
827 : */
828 519 : procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
829 1038 : FunctionCall2Coll(procinfo,
830 519 : index->rd_indcollation[0],
831 : PointerGetDatum(&in),
832 : PointerGetDatum(&out));
833 :
834 : /*
835 : * Form new leaf tuples and count up the total space needed.
836 : */
837 519 : totalLeafSizes = 0;
838 76809 : for (i = 0; i < in.nTuples; i++)
839 : {
840 152580 : newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
841 76290 : out.leafTupleDatums[i],
842 : false);
843 76290 : totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
844 : }
845 : }
846 : else
847 : {
848 : /*
849 : * Perform dummy split that puts all tuples into one node.
850 : * checkAllTheSame will override this and force allTheSame mode.
851 : */
852 0 : out.hasPrefix = false;
853 0 : out.nNodes = 1;
854 0 : out.nodeLabels = NULL;
855 0 : out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
856 :
857 : /*
858 : * Form new leaf tuples and count up the total space needed.
859 : */
860 0 : totalLeafSizes = 0;
861 0 : for (i = 0; i < in.nTuples; i++)
862 : {
863 0 : newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
864 : (Datum) 0,
865 : true);
866 0 : totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
867 : }
868 : }
869 :
870 : /*
871 : * Check to see if the picksplit function failed to separate the values,
872 : * ie, it put them all into the same child node. If so, select allTheSame
873 : * mode and create a random split instead. See comments for
874 : * checkAllTheSame as to why we need to know if the new leaf tuples could
875 : * fit on one page.
876 : */
877 519 : allTheSame = checkAllTheSame(&in, &out,
878 : totalLeafSizes > SPGIST_PAGE_CAPACITY,
879 : &includeNew);
880 :
881 : /*
882 : * If checkAllTheSame decided we must exclude the new tuple, don't
883 : * consider it any further.
884 : */
885 519 : if (includeNew)
886 519 : maxToInclude = in.nTuples;
887 : else
888 : {
889 0 : maxToInclude = in.nTuples - 1;
890 0 : totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
891 : }
892 :
893 : /*
894 : * Allocate per-node work arrays. Since checkAllTheSame could replace
895 : * out.nNodes with a value larger than the number of tuples on the input
896 : * page, we can't allocate these arrays before here.
897 : */
898 519 : nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes);
899 519 : leafSizes = (int *) palloc0(sizeof(int) * out.nNodes);
900 :
901 : /*
902 : * Form nodes of inner tuple and inner tuple itself
903 : */
904 3870 : for (i = 0; i < out.nNodes; i++)
905 : {
906 3351 : Datum label = (Datum) 0;
907 3351 : bool labelisnull = (out.nodeLabels == NULL);
908 :
909 3351 : if (!labelisnull)
910 483 : label = out.nodeLabels[i];
911 3351 : nodes[i] = spgFormNodeTuple(state, label, labelisnull);
912 : }
913 1038 : innerTuple = spgFormInnerTuple(state,
914 519 : out.hasPrefix, out.prefixDatum,
915 : out.nNodes, nodes);
916 519 : innerTuple->allTheSame = allTheSame;
917 :
918 : /*
919 : * Update nodes[] array to point into the newly formed innerTuple, so that
920 : * we can adjust their downlinks below.
921 : */
922 3870 : SGITITERATE(innerTuple, i, node)
923 : {
924 3351 : nodes[i] = node;
925 : }
926 :
927 : /*
928 : * Re-scan new leaf tuples and count up the space needed under each node.
929 : */
930 76809 : for (i = 0; i < maxToInclude; i++)
931 : {
932 76290 : n = out.mapTuplesToNodes[i];
933 76290 : if (n < 0 || n >= out.nNodes)
934 0 : elog(ERROR, "inconsistent result of SPGiST picksplit function");
935 76290 : leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
936 : }
937 :
938 : /*
939 : * To perform the split, we must insert a new inner tuple, which can't go
940 : * on a leaf page; and unless we are splitting the root page, we must then
941 : * update the parent tuple's downlink to point to the inner tuple. If
942 : * there is room, we'll put the new inner tuple on the same page as the
943 : * parent tuple, otherwise we need another non-leaf buffer. But if the
944 : * parent page is the root, we can't add the new inner tuple there,
945 : * because the root page must have only one inner tuple.
946 : */
947 519 : xlrec.initInner = false;
948 1030 : if (parent->buffer != InvalidBuffer &&
949 1455 : !SpGistBlockIsRoot(parent->blkno) &&
950 472 : (SpGistPageGetFreeSpace(parent->page, 1) >=
951 472 : innerTuple->size + sizeof(ItemIdData)))
952 : {
953 : /* New inner tuple will fit on parent page */
954 434 : newInnerBuffer = parent->buffer;
955 : }
956 85 : else if (parent->buffer != InvalidBuffer)
957 : {
958 : /* Send tuple to page with next triple parity (see README) */
959 154 : newInnerBuffer = SpGistGetBuffer(index,
960 77 : GBUF_INNER_PARITY(parent->blkno + 1) |
961 : (isNulls ? GBUF_NULLS : 0),
962 77 : innerTuple->size + sizeof(ItemIdData),
963 : &xlrec.initInner);
964 : }
965 : else
966 : {
967 : /* Root page split ... inner tuple will go to root page */
968 8 : newInnerBuffer = InvalidBuffer;
969 : }
970 :
971 : /*
972 : * The new leaf tuples converted from the existing ones should require the
973 : * same or less space, and therefore should all fit onto one page
974 : * (although that's not necessarily the current page, since we can't
975 : * delete the old tuples but only replace them with placeholders).
976 : * However, the incoming new tuple might not also fit, in which case we
977 : * might need another picksplit cycle to reduce it some more.
978 : *
979 : * If there's not room to put everything back onto the current page, then
980 : * we decide on a per-node basis which tuples go to the new page. (We do
981 : * it like that because leaf tuple chains can't cross pages, so we must
982 : * place all leaf tuples belonging to the same parent node on the same
983 : * page.)
984 : *
985 : * If we are splitting the root page (turning it from a leaf page into an
986 : * inner page), then no leaf tuples can go back to the current page; they
987 : * must all go somewhere else.
988 : */
989 519 : if (!SpGistBlockIsRoot(current->blkno))
990 511 : currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
991 : else
992 8 : currentFreeSpace = 0; /* prevent assigning any tuples to current */
993 :
994 519 : xlrec.initDest = false;
995 :
996 519 : if (totalLeafSizes <= currentFreeSpace)
997 : {
998 : /* All the leaf tuples will fit on current page */
999 2 : newLeafBuffer = InvalidBuffer;
1000 : /* mark new leaf tuple as included in insertions, if allowed */
1001 2 : if (includeNew)
1002 : {
1003 2 : nToInsert++;
1004 2 : insertedNew = true;
1005 : }
1006 344 : for (i = 0; i < nToInsert; i++)
1007 342 : leafPageSelect[i] = 0; /* signifies current page */
1008 : }
1009 517 : else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
1010 : {
1011 : /*
1012 : * We're trying to split up a long value by repeated suffixing, but
1013 : * it's not going to fit yet. Don't bother allocating a second leaf
1014 : * buffer that we won't be able to use.
1015 : */
1016 0 : newLeafBuffer = InvalidBuffer;
1017 0 : Assert(includeNew);
1018 0 : Assert(nToInsert == 0);
1019 : }
1020 : else
1021 : {
1022 : /* We will need another leaf page */
1023 : uint8 *nodePageSelect;
1024 : int curspace;
1025 : int newspace;
1026 :
1027 517 : newLeafBuffer = SpGistGetBuffer(index,
1028 : GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
1029 517 : Min(totalLeafSizes,
1030 : SPGIST_PAGE_CAPACITY),
1031 : &xlrec.initDest);
1032 :
1033 : /*
1034 : * Attempt to assign node groups to the two pages. We might fail to
1035 : * do so, even if totalLeafSizes is less than the available space,
1036 : * because we can't split a group across pages.
1037 : */
1038 517 : nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes);
1039 :
1040 517 : curspace = currentFreeSpace;
1041 517 : newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1042 3864 : for (i = 0; i < out.nNodes; i++)
1043 : {
1044 3347 : if (leafSizes[i] <= curspace)
1045 : {
1046 1872 : nodePageSelect[i] = 0; /* signifies current page */
1047 1872 : curspace -= leafSizes[i];
1048 : }
1049 : else
1050 : {
1051 1475 : nodePageSelect[i] = 1; /* signifies new leaf page */
1052 1475 : newspace -= leafSizes[i];
1053 : }
1054 : }
1055 517 : if (curspace >= 0 && newspace >= 0)
1056 : {
1057 : /* Successful assignment, so we can include the new leaf tuple */
1058 990 : if (includeNew)
1059 : {
1060 495 : nToInsert++;
1061 495 : insertedNew = true;
1062 : }
1063 : }
1064 22 : else if (includeNew)
1065 : {
1066 : /* We must exclude the new leaf tuple from the split */
1067 22 : int nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
1068 :
1069 44 : leafSizes[nodeOfNewTuple] -=
1070 22 : newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
1071 :
1072 : /* Repeat the node assignment process --- should succeed now */
1073 22 : curspace = currentFreeSpace;
1074 22 : newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1075 90 : for (i = 0; i < out.nNodes; i++)
1076 : {
1077 68 : if (leafSizes[i] <= curspace)
1078 : {
1079 20 : nodePageSelect[i] = 0; /* signifies current page */
1080 20 : curspace -= leafSizes[i];
1081 : }
1082 : else
1083 : {
1084 48 : nodePageSelect[i] = 1; /* signifies new leaf page */
1085 48 : newspace -= leafSizes[i];
1086 : }
1087 : }
1088 22 : if (curspace < 0 || newspace < 0)
1089 0 : elog(ERROR, "failed to divide leaf tuple groups across pages");
1090 : }
1091 : else
1092 : {
1093 : /* oops, we already excluded new tuple ... should not get here */
1094 0 : elog(ERROR, "failed to divide leaf tuple groups across pages");
1095 : }
1096 : /* Expand the per-node assignments to be shown per leaf tuple */
1097 76443 : for (i = 0; i < nToInsert; i++)
1098 : {
1099 75926 : n = out.mapTuplesToNodes[i];
1100 75926 : leafPageSelect[i] = nodePageSelect[n];
1101 : }
1102 : }
1103 :
1104 : /* Start preparing WAL record */
1105 519 : xlrec.nDelete = 0;
1106 519 : xlrec.initSrc = isNew;
1107 519 : xlrec.storesNulls = isNulls;
1108 519 : xlrec.isRootSplit = SpGistBlockIsRoot(current->blkno);
1109 :
1110 519 : leafdata = leafptr = (char *) palloc(totalLeafSizes);
1111 :
1112 : /* Here we begin making the changes to the target pages */
1113 519 : START_CRIT_SECTION();
1114 :
1115 : /*
1116 : * Delete old leaf tuples from current buffer, except when we're splitting
1117 : * the root; in that case there's no need because we'll re-init the page
1118 : * below. We do this first to make room for reinserting new leaf tuples.
1119 : */
1120 519 : if (!SpGistBlockIsRoot(current->blkno))
1121 : {
1122 : /*
1123 : * Init buffer instead of deleting individual tuples, but only if
1124 : * there aren't any other live tuples and only during build; otherwise
1125 : * we need to set a redirection tuple for concurrent scans.
1126 : */
1127 837 : if (state->isBuild &&
1128 326 : nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
1129 326 : PageGetMaxOffsetNumber(current->page))
1130 : {
1131 33 : SpGistInitBuffer(current->buffer,
1132 : SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
1133 33 : xlrec.initSrc = true;
1134 : }
1135 478 : else if (isNew)
1136 : {
1137 : /* don't expose the freshly init'd buffer as a backup block */
1138 0 : Assert(nToDelete == 0);
1139 : }
1140 : else
1141 : {
1142 478 : xlrec.nDelete = nToDelete;
1143 :
1144 478 : if (!state->isBuild)
1145 : {
1146 : /*
1147 : * Need to create redirect tuple (it will point to new inner
1148 : * tuple) but right now the new tuple's location is not known
1149 : * yet. So, set the redirection pointer to "impossible" value
1150 : * and remember its position to update tuple later.
1151 : */
1152 185 : if (nToDelete > 0)
1153 185 : redirectTuplePos = toDelete[0];
1154 185 : spgPageIndexMultiDelete(state, current->page,
1155 : toDelete, nToDelete,
1156 : SPGIST_REDIRECT,
1157 : SPGIST_PLACEHOLDER,
1158 : SPGIST_METAPAGE_BLKNO,
1159 : FirstOffsetNumber);
1160 : }
1161 : else
1162 : {
1163 : /*
1164 : * During index build there is not concurrent searches, so we
1165 : * don't need to create redirection tuple.
1166 : */
1167 293 : spgPageIndexMultiDelete(state, current->page,
1168 : toDelete, nToDelete,
1169 : SPGIST_PLACEHOLDER,
1170 : SPGIST_PLACEHOLDER,
1171 : InvalidBlockNumber,
1172 : InvalidOffsetNumber);
1173 : }
1174 : }
1175 : }
1176 :
1177 : /*
1178 : * Put leaf tuples on proper pages, and update downlinks in innerTuple's
1179 : * nodes.
1180 : */
1181 519 : startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
1182 76787 : for (i = 0; i < nToInsert; i++)
1183 : {
1184 76268 : SpGistLeafTuple it = newLeafs[i];
1185 : Buffer leafBuffer;
1186 : BlockNumber leafBlock;
1187 : OffsetNumber newoffset;
1188 :
1189 : /* Which page is it going to? */
1190 76268 : leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
1191 76268 : leafBlock = BufferGetBlockNumber(leafBuffer);
1192 :
1193 : /* Link tuple into correct chain for its node */
1194 76268 : n = out.mapTuplesToNodes[i];
1195 :
1196 76268 : if (ItemPointerIsValid(&nodes[n]->t_tid))
1197 : {
1198 74405 : Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
1199 74405 : it->nextOffset = ItemPointerGetOffsetNumber(&nodes[n]->t_tid);
1200 : }
1201 : else
1202 1863 : it->nextOffset = InvalidOffsetNumber;
1203 :
1204 : /* Insert it on page */
1205 152536 : newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
1206 76268 : (Item) it, it->size,
1207 76268 : &startOffsets[leafPageSelect[i]],
1208 : false);
1209 76268 : toInsert[i] = newoffset;
1210 :
1211 : /* ... and complete the chain linking */
1212 76268 : ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);
1213 :
1214 : /* Also copy leaf tuple into WAL data */
1215 76268 : memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
1216 76268 : leafptr += newLeafs[i]->size;
1217 : }
1218 :
1219 : /*
1220 : * We're done modifying the other leaf buffer (if any), so mark it dirty.
1221 : * current->buffer will be marked below, after we're entirely done
1222 : * modifying it.
1223 : */
1224 519 : if (newLeafBuffer != InvalidBuffer)
1225 : {
1226 517 : MarkBufferDirty(newLeafBuffer);
1227 : }
1228 :
1229 : /* Remember current buffer, since we're about to change "current" */
1230 519 : saveCurrent = *current;
1231 :
1232 : /*
1233 : * Store the new innerTuple
1234 : */
1235 519 : if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
1236 : {
1237 : /*
1238 : * new inner tuple goes to parent page
1239 : */
1240 434 : Assert(current->buffer != parent->buffer);
1241 :
1242 : /* Repoint "current" at the new inner tuple */
1243 434 : current->blkno = parent->blkno;
1244 434 : current->buffer = parent->buffer;
1245 434 : current->page = parent->page;
1246 434 : xlrec.offnumInner = current->offnum =
1247 434 : SpGistPageAddNewItem(state, current->page,
1248 434 : (Item) innerTuple, innerTuple->size,
1249 : NULL, false);
1250 :
1251 : /*
1252 : * Update parent node link and mark parent page dirty
1253 : */
1254 434 : xlrec.innerIsParent = true;
1255 434 : xlrec.offnumParent = parent->offnum;
1256 434 : xlrec.nodeI = parent->node;
1257 434 : saveNodeLink(index, parent, current->blkno, current->offnum);
1258 :
1259 : /*
1260 : * Update redirection link (in old current buffer)
1261 : */
1262 434 : if (redirectTuplePos != InvalidOffsetNumber)
1263 170 : setRedirectionTuple(&saveCurrent, redirectTuplePos,
1264 170 : current->blkno, current->offnum);
1265 :
1266 : /* Done modifying old current buffer, mark it dirty */
1267 434 : MarkBufferDirty(saveCurrent.buffer);
1268 : }
1269 85 : else if (parent->buffer != InvalidBuffer)
1270 : {
1271 : /*
1272 : * new inner tuple will be stored on a new page
1273 : */
1274 77 : Assert(newInnerBuffer != InvalidBuffer);
1275 :
1276 : /* Repoint "current" at the new inner tuple */
1277 77 : current->buffer = newInnerBuffer;
1278 77 : current->blkno = BufferGetBlockNumber(current->buffer);
1279 77 : current->page = BufferGetPage(current->buffer);
1280 77 : xlrec.offnumInner = current->offnum =
1281 77 : SpGistPageAddNewItem(state, current->page,
1282 77 : (Item) innerTuple, innerTuple->size,
1283 : NULL, false);
1284 :
1285 : /* Done modifying new current buffer, mark it dirty */
1286 77 : MarkBufferDirty(current->buffer);
1287 :
1288 : /*
1289 : * Update parent node link and mark parent page dirty
1290 : */
1291 77 : xlrec.innerIsParent = (parent->buffer == current->buffer);
1292 77 : xlrec.offnumParent = parent->offnum;
1293 77 : xlrec.nodeI = parent->node;
1294 77 : saveNodeLink(index, parent, current->blkno, current->offnum);
1295 :
1296 : /*
1297 : * Update redirection link (in old current buffer)
1298 : */
1299 77 : if (redirectTuplePos != InvalidOffsetNumber)
1300 15 : setRedirectionTuple(&saveCurrent, redirectTuplePos,
1301 15 : current->blkno, current->offnum);
1302 :
1303 : /* Done modifying old current buffer, mark it dirty */
1304 77 : MarkBufferDirty(saveCurrent.buffer);
1305 : }
1306 : else
1307 : {
1308 : /*
1309 : * Splitting root page, which was a leaf but now becomes inner page
1310 : * (and so "current" continues to point at it)
1311 : */
1312 8 : Assert(SpGistBlockIsRoot(current->blkno));
1313 8 : Assert(redirectTuplePos == InvalidOffsetNumber);
1314 :
1315 8 : SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
1316 8 : xlrec.initInner = true;
1317 8 : xlrec.innerIsParent = false;
1318 :
1319 8 : xlrec.offnumInner = current->offnum =
1320 8 : PageAddItem(current->page, (Item) innerTuple, innerTuple->size,
1321 : InvalidOffsetNumber, false, false);
1322 8 : if (current->offnum != FirstOffsetNumber)
1323 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
1324 : innerTuple->size);
1325 :
1326 : /* No parent link to update, nor redirection to do */
1327 8 : xlrec.offnumParent = InvalidOffsetNumber;
1328 8 : xlrec.nodeI = 0;
1329 :
1330 : /* Done modifying new current buffer, mark it dirty */
1331 8 : MarkBufferDirty(current->buffer);
1332 :
1333 : /* saveCurrent doesn't represent a different buffer */
1334 8 : saveCurrent.buffer = InvalidBuffer;
1335 : }
1336 :
1337 519 : if (RelationNeedsWAL(index))
1338 : {
1339 : XLogRecPtr recptr;
1340 : int flags;
1341 :
1342 519 : XLogBeginInsert();
1343 :
1344 519 : xlrec.nInsert = nToInsert;
1345 519 : XLogRegisterData((char *) &xlrec, SizeOfSpgxlogPickSplit);
1346 :
1347 519 : XLogRegisterData((char *) toDelete,
1348 519 : sizeof(OffsetNumber) * xlrec.nDelete);
1349 519 : XLogRegisterData((char *) toInsert,
1350 519 : sizeof(OffsetNumber) * xlrec.nInsert);
1351 519 : XLogRegisterData((char *) leafPageSelect,
1352 519 : sizeof(uint8) * xlrec.nInsert);
1353 519 : XLogRegisterData((char *) innerTuple, innerTuple->size);
1354 519 : XLogRegisterData(leafdata, leafptr - leafdata);
1355 :
1356 : /* Old leaf page */
1357 519 : if (BufferIsValid(saveCurrent.buffer))
1358 : {
1359 511 : flags = REGBUF_STANDARD;
1360 511 : if (xlrec.initSrc)
1361 33 : flags |= REGBUF_WILL_INIT;
1362 511 : XLogRegisterBuffer(0, saveCurrent.buffer, flags);
1363 : }
1364 :
1365 : /* New leaf page */
1366 519 : if (BufferIsValid(newLeafBuffer))
1367 : {
1368 517 : flags = REGBUF_STANDARD;
1369 517 : if (xlrec.initDest)
1370 477 : flags |= REGBUF_WILL_INIT;
1371 517 : XLogRegisterBuffer(1, newLeafBuffer, flags);
1372 : }
1373 :
1374 : /* Inner page */
1375 519 : flags = REGBUF_STANDARD;
1376 519 : if (xlrec.initInner)
1377 17 : flags |= REGBUF_WILL_INIT;
1378 519 : XLogRegisterBuffer(2, current->buffer, flags);
1379 :
1380 : /* Parent page, if different from inner page */
1381 519 : if (parent->buffer != InvalidBuffer)
1382 : {
1383 511 : if (parent->buffer != current->buffer)
1384 77 : XLogRegisterBuffer(3, parent->buffer, REGBUF_STANDARD);
1385 : else
1386 434 : Assert(xlrec.innerIsParent);
1387 : }
1388 :
1389 : /* Issue the WAL record */
1390 519 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT);
1391 :
1392 : /* Update page LSNs on all affected pages */
1393 519 : if (newLeafBuffer != InvalidBuffer)
1394 : {
1395 517 : Page page = BufferGetPage(newLeafBuffer);
1396 :
1397 517 : PageSetLSN(page, recptr);
1398 : }
1399 :
1400 519 : if (saveCurrent.buffer != InvalidBuffer)
1401 : {
1402 511 : Page page = BufferGetPage(saveCurrent.buffer);
1403 :
1404 511 : PageSetLSN(page, recptr);
1405 : }
1406 :
1407 519 : PageSetLSN(current->page, recptr);
1408 :
1409 519 : if (parent->buffer != InvalidBuffer)
1410 : {
1411 511 : PageSetLSN(parent->page, recptr);
1412 : }
1413 : }
1414 :
1415 519 : END_CRIT_SECTION();
1416 :
1417 : /* Update local free-space cache and unlock buffers */
1418 519 : if (newLeafBuffer != InvalidBuffer)
1419 : {
1420 517 : SpGistSetLastUsedPage(index, newLeafBuffer);
1421 517 : UnlockReleaseBuffer(newLeafBuffer);
1422 : }
1423 519 : if (saveCurrent.buffer != InvalidBuffer)
1424 : {
1425 511 : SpGistSetLastUsedPage(index, saveCurrent.buffer);
1426 511 : UnlockReleaseBuffer(saveCurrent.buffer);
1427 : }
1428 :
1429 519 : return insertedNew;
1430 : }
1431 :
1432 : /*
1433 : * spgMatchNode action: descend to N'th child node of current inner tuple
1434 : */
1435 : static void
1436 1513137 : spgMatchNodeAction(Relation index, SpGistState *state,
1437 : SpGistInnerTuple innerTuple,
1438 : SPPageDesc *current, SPPageDesc *parent, int nodeN)
1439 : {
1440 : int i;
1441 : SpGistNodeTuple node;
1442 :
1443 : /* Release previous parent buffer if any */
1444 2945109 : if (parent->buffer != InvalidBuffer &&
1445 1431972 : parent->buffer != current->buffer)
1446 : {
1447 78404 : SpGistSetLastUsedPage(index, parent->buffer);
1448 78404 : UnlockReleaseBuffer(parent->buffer);
1449 : }
1450 :
1451 : /* Repoint parent to specified node of current inner tuple */
1452 1513137 : parent->blkno = current->blkno;
1453 1513137 : parent->buffer = current->buffer;
1454 1513137 : parent->page = current->page;
1455 1513137 : parent->offnum = current->offnum;
1456 1513137 : parent->node = nodeN;
1457 :
1458 : /* Locate that node */
1459 2637205 : SGITITERATE(innerTuple, i, node)
1460 : {
1461 2637205 : if (i == nodeN)
1462 1513137 : break;
1463 : }
1464 :
1465 1513137 : if (i != nodeN)
1466 0 : elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
1467 : nodeN);
1468 :
1469 : /* Point current to the downlink location, if any */
1470 1513137 : if (ItemPointerIsValid(&node->t_tid))
1471 : {
1472 1512875 : current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
1473 1512875 : current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
1474 : }
1475 : else
1476 : {
1477 : /* Downlink is empty, so we'll need to find a new page */
1478 262 : current->blkno = InvalidBlockNumber;
1479 262 : current->offnum = InvalidOffsetNumber;
1480 : }
1481 :
1482 1513137 : current->buffer = InvalidBuffer;
1483 1513137 : current->page = NULL;
1484 1513137 : }
1485 :
1486 : /*
1487 : * spgAddNode action: add a node to the inner tuple at current
1488 : */
1489 : static void
1490 223 : spgAddNodeAction(Relation index, SpGistState *state,
1491 : SpGistInnerTuple innerTuple,
1492 : SPPageDesc *current, SPPageDesc *parent,
1493 : int nodeN, Datum nodeLabel)
1494 : {
1495 : SpGistInnerTuple newInnerTuple;
1496 : spgxlogAddNode xlrec;
1497 :
1498 : /* Should not be applied to nulls */
1499 223 : Assert(!SpGistPageStoresNulls(current->page));
1500 :
1501 : /* Construct new inner tuple with additional node */
1502 223 : newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
1503 :
1504 : /* Prepare WAL record */
1505 223 : STORE_STATE(state, xlrec.stateSrc);
1506 223 : xlrec.offnum = current->offnum;
1507 :
1508 : /* we don't fill these unless we need to change the parent downlink */
1509 223 : xlrec.parentBlk = -1;
1510 223 : xlrec.offnumParent = InvalidOffsetNumber;
1511 223 : xlrec.nodeI = 0;
1512 :
1513 : /* we don't fill these unless tuple has to be moved */
1514 223 : xlrec.offnumNew = InvalidOffsetNumber;
1515 223 : xlrec.newPage = false;
1516 :
1517 446 : if (PageGetExactFreeSpace(current->page) >=
1518 223 : newInnerTuple->size - innerTuple->size)
1519 : {
1520 : /*
1521 : * We can replace the inner tuple by new version in-place
1522 : */
1523 223 : START_CRIT_SECTION();
1524 :
1525 223 : PageIndexTupleDelete(current->page, current->offnum);
1526 446 : if (PageAddItem(current->page,
1527 : (Item) newInnerTuple, newInnerTuple->size,
1528 223 : current->offnum, false, false) != current->offnum)
1529 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
1530 : newInnerTuple->size);
1531 :
1532 223 : MarkBufferDirty(current->buffer);
1533 :
1534 223 : if (RelationNeedsWAL(index))
1535 : {
1536 : XLogRecPtr recptr;
1537 :
1538 223 : XLogBeginInsert();
1539 223 : XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1540 223 : XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1541 :
1542 223 : XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
1543 :
1544 223 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1545 :
1546 223 : PageSetLSN(current->page, recptr);
1547 : }
1548 :
1549 223 : END_CRIT_SECTION();
1550 : }
1551 : else
1552 : {
1553 : /*
1554 : * move inner tuple to another page, and update parent
1555 : */
1556 : SpGistDeadTuple dt;
1557 : SPPageDesc saveCurrent;
1558 :
1559 : /*
1560 : * It should not be possible to get here for the root page, since we
1561 : * allow only one inner tuple on the root page, and spgFormInnerTuple
1562 : * always checks that inner tuples don't exceed the size of a page.
1563 : */
1564 0 : if (SpGistBlockIsRoot(current->blkno))
1565 0 : elog(ERROR, "cannot enlarge root tuple any more");
1566 0 : Assert(parent->buffer != InvalidBuffer);
1567 :
1568 0 : saveCurrent = *current;
1569 :
1570 0 : xlrec.offnumParent = parent->offnum;
1571 0 : xlrec.nodeI = parent->node;
1572 :
1573 : /*
1574 : * obtain new buffer with the same parity as current, since it will be
1575 : * a child of same parent tuple
1576 : */
1577 0 : current->buffer = SpGistGetBuffer(index,
1578 0 : GBUF_INNER_PARITY(current->blkno),
1579 0 : newInnerTuple->size + sizeof(ItemIdData),
1580 : &xlrec.newPage);
1581 0 : current->blkno = BufferGetBlockNumber(current->buffer);
1582 0 : current->page = BufferGetPage(current->buffer);
1583 :
1584 : /*
1585 : * Let's just make real sure new current isn't same as old. Right now
1586 : * that's impossible, but if SpGistGetBuffer ever got smart enough to
1587 : * delete placeholder tuples before checking space, maybe it wouldn't
1588 : * be impossible. The case would appear to work except that WAL
1589 : * replay would be subtly wrong, so I think a mere assert isn't enough
1590 : * here.
1591 : */
1592 0 : if (current->blkno == saveCurrent.blkno)
1593 0 : elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
1594 :
1595 : /*
1596 : * New current and parent buffer will both be modified; but note that
1597 : * parent buffer could be same as either new or old current.
1598 : */
1599 0 : if (parent->buffer == saveCurrent.buffer)
1600 0 : xlrec.parentBlk = 0;
1601 0 : else if (parent->buffer == current->buffer)
1602 0 : xlrec.parentBlk = 1;
1603 : else
1604 0 : xlrec.parentBlk = 2;
1605 :
1606 0 : START_CRIT_SECTION();
1607 :
1608 : /* insert new ... */
1609 0 : xlrec.offnumNew = current->offnum =
1610 0 : SpGistPageAddNewItem(state, current->page,
1611 0 : (Item) newInnerTuple, newInnerTuple->size,
1612 : NULL, false);
1613 :
1614 0 : MarkBufferDirty(current->buffer);
1615 :
1616 : /* update parent's downlink and mark parent page dirty */
1617 0 : saveNodeLink(index, parent, current->blkno, current->offnum);
1618 :
1619 : /*
1620 : * Replace old tuple with a placeholder or redirection tuple. Unless
1621 : * doing an index build, we have to insert a redirection tuple for
1622 : * possible concurrent scans. We can't just delete it in any case,
1623 : * because that could change the offsets of other tuples on the page,
1624 : * breaking downlinks from their parents.
1625 : */
1626 0 : if (state->isBuild)
1627 0 : dt = spgFormDeadTuple(state, SPGIST_PLACEHOLDER,
1628 : InvalidBlockNumber, InvalidOffsetNumber);
1629 : else
1630 0 : dt = spgFormDeadTuple(state, SPGIST_REDIRECT,
1631 0 : current->blkno, current->offnum);
1632 :
1633 0 : PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
1634 0 : if (PageAddItem(saveCurrent.page, (Item) dt, dt->size,
1635 : saveCurrent.offnum,
1636 0 : false, false) != saveCurrent.offnum)
1637 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
1638 : dt->size);
1639 :
1640 0 : if (state->isBuild)
1641 0 : SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
1642 : else
1643 0 : SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;
1644 :
1645 0 : MarkBufferDirty(saveCurrent.buffer);
1646 :
1647 0 : if (RelationNeedsWAL(index))
1648 : {
1649 : XLogRecPtr recptr;
1650 : int flags;
1651 :
1652 0 : XLogBeginInsert();
1653 :
1654 : /* orig page */
1655 0 : XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD);
1656 : /* new page */
1657 0 : flags = REGBUF_STANDARD;
1658 0 : if (xlrec.newPage)
1659 0 : flags |= REGBUF_WILL_INIT;
1660 0 : XLogRegisterBuffer(1, current->buffer, flags);
1661 : /* parent page (if different from orig and new) */
1662 0 : if (xlrec.parentBlk == 2)
1663 0 : XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
1664 :
1665 0 : XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1666 0 : XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1667 :
1668 0 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1669 :
1670 : /* we don't bother to check if any of these are redundant */
1671 0 : PageSetLSN(current->page, recptr);
1672 0 : PageSetLSN(parent->page, recptr);
1673 0 : PageSetLSN(saveCurrent.page, recptr);
1674 : }
1675 :
1676 0 : END_CRIT_SECTION();
1677 :
1678 : /* Release saveCurrent if it's not same as current or parent */
1679 0 : if (saveCurrent.buffer != current->buffer &&
1680 0 : saveCurrent.buffer != parent->buffer)
1681 : {
1682 0 : SpGistSetLastUsedPage(index, saveCurrent.buffer);
1683 0 : UnlockReleaseBuffer(saveCurrent.buffer);
1684 : }
1685 : }
1686 223 : }
1687 :
1688 : /*
1689 : * spgSplitNode action: split inner tuple at current into prefix and postfix
1690 : */
1691 : static void
1692 106 : spgSplitNodeAction(Relation index, SpGistState *state,
1693 : SpGistInnerTuple innerTuple,
1694 : SPPageDesc *current, spgChooseOut *out)
1695 : {
1696 : SpGistInnerTuple prefixTuple,
1697 : postfixTuple;
1698 : SpGistNodeTuple node,
1699 : *nodes;
1700 : BlockNumber postfixBlkno;
1701 : OffsetNumber postfixOffset;
1702 : int i;
1703 : spgxlogSplitTuple xlrec;
1704 106 : Buffer newBuffer = InvalidBuffer;
1705 :
1706 : /* Should not be applied to nulls */
1707 106 : Assert(!SpGistPageStoresNulls(current->page));
1708 :
1709 : /* Check opclass gave us sane values */
1710 212 : if (out->result.splitTuple.prefixNNodes <= 0 ||
1711 106 : out->result.splitTuple.prefixNNodes > SGITMAXNNODES)
1712 0 : elog(ERROR, "invalid number of prefix nodes: %d",
1713 : out->result.splitTuple.prefixNNodes);
1714 212 : if (out->result.splitTuple.childNodeN < 0 ||
1715 106 : out->result.splitTuple.childNodeN >=
1716 106 : out->result.splitTuple.prefixNNodes)
1717 0 : elog(ERROR, "invalid child node number: %d",
1718 : out->result.splitTuple.childNodeN);
1719 :
1720 : /*
1721 : * Construct new prefix tuple with requested number of nodes. We'll fill
1722 : * in the childNodeN'th node's downlink below.
1723 : */
1724 106 : nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) *
1725 106 : out->result.splitTuple.prefixNNodes);
1726 :
1727 212 : for (i = 0; i < out->result.splitTuple.prefixNNodes; i++)
1728 : {
1729 106 : Datum label = (Datum) 0;
1730 : bool labelisnull;
1731 :
1732 106 : labelisnull = (out->result.splitTuple.prefixNodeLabels == NULL);
1733 106 : if (!labelisnull)
1734 106 : label = out->result.splitTuple.prefixNodeLabels[i];
1735 106 : nodes[i] = spgFormNodeTuple(state, label, labelisnull);
1736 : }
1737 :
1738 212 : prefixTuple = spgFormInnerTuple(state,
1739 106 : out->result.splitTuple.prefixHasPrefix,
1740 : out->result.splitTuple.prefixPrefixDatum,
1741 : out->result.splitTuple.prefixNNodes,
1742 : nodes);
1743 :
1744 : /* it must fit in the space that innerTuple now occupies */
1745 106 : if (prefixTuple->size > innerTuple->size)
1746 0 : elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");
1747 :
1748 : /*
1749 : * Construct new postfix tuple, containing all nodes of innerTuple with
1750 : * same node datums, but with the prefix specified by the picksplit
1751 : * function.
1752 : */
1753 106 : nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes);
1754 368 : SGITITERATE(innerTuple, i, node)
1755 : {
1756 262 : nodes[i] = node;
1757 : }
1758 :
1759 212 : postfixTuple = spgFormInnerTuple(state,
1760 106 : out->result.splitTuple.postfixHasPrefix,
1761 : out->result.splitTuple.postfixPrefixDatum,
1762 106 : innerTuple->nNodes, nodes);
1763 :
1764 : /* Postfix tuple is allTheSame if original tuple was */
1765 106 : postfixTuple->allTheSame = innerTuple->allTheSame;
1766 :
1767 : /* prep data for WAL record */
1768 106 : xlrec.newPage = false;
1769 :
1770 : /*
1771 : * If we can't fit both tuples on the current page, get a new page for the
1772 : * postfix tuple. In particular, can't split to the root page.
1773 : *
1774 : * For the space calculation, note that prefixTuple replaces innerTuple
1775 : * but postfixTuple will be a new entry.
1776 : */
1777 210 : if (SpGistBlockIsRoot(current->blkno) ||
1778 104 : SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
1779 104 : prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
1780 : {
1781 : /*
1782 : * Choose page with next triple parity, because postfix tuple is a
1783 : * child of prefix one
1784 : */
1785 6 : newBuffer = SpGistGetBuffer(index,
1786 3 : GBUF_INNER_PARITY(current->blkno + 1),
1787 3 : postfixTuple->size + sizeof(ItemIdData),
1788 : &xlrec.newPage);
1789 : }
1790 :
1791 106 : START_CRIT_SECTION();
1792 :
1793 : /*
1794 : * Replace old tuple by prefix tuple
1795 : */
1796 106 : PageIndexTupleDelete(current->page, current->offnum);
1797 106 : xlrec.offnumPrefix = PageAddItem(current->page,
1798 : (Item) prefixTuple, prefixTuple->size,
1799 : current->offnum, false, false);
1800 106 : if (xlrec.offnumPrefix != current->offnum)
1801 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
1802 : prefixTuple->size);
1803 :
1804 : /*
1805 : * put postfix tuple into appropriate page
1806 : */
1807 106 : if (newBuffer == InvalidBuffer)
1808 : {
1809 103 : postfixBlkno = current->blkno;
1810 103 : xlrec.offnumPostfix = postfixOffset =
1811 103 : SpGistPageAddNewItem(state, current->page,
1812 103 : (Item) postfixTuple, postfixTuple->size,
1813 : NULL, false);
1814 103 : xlrec.postfixBlkSame = true;
1815 : }
1816 : else
1817 : {
1818 3 : postfixBlkno = BufferGetBlockNumber(newBuffer);
1819 3 : xlrec.offnumPostfix = postfixOffset =
1820 3 : SpGistPageAddNewItem(state, BufferGetPage(newBuffer),
1821 3 : (Item) postfixTuple, postfixTuple->size,
1822 : NULL, false);
1823 3 : MarkBufferDirty(newBuffer);
1824 3 : xlrec.postfixBlkSame = false;
1825 : }
1826 :
1827 : /*
1828 : * And set downlink pointer in the prefix tuple to point to postfix tuple.
1829 : * (We can't avoid this step by doing the above two steps in opposite
1830 : * order, because there might not be enough space on the page to insert
1831 : * the postfix tuple first.) We have to update the local copy of the
1832 : * prefixTuple too, because that's what will be written to WAL.
1833 : */
1834 106 : spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1835 : postfixBlkno, postfixOffset);
1836 106 : prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
1837 : PageGetItemId(current->page, current->offnum));
1838 106 : spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1839 : postfixBlkno, postfixOffset);
1840 :
1841 106 : MarkBufferDirty(current->buffer);
1842 :
1843 106 : if (RelationNeedsWAL(index))
1844 : {
1845 : XLogRecPtr recptr;
1846 :
1847 106 : XLogBeginInsert();
1848 106 : XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1849 106 : XLogRegisterData((char *) prefixTuple, prefixTuple->size);
1850 106 : XLogRegisterData((char *) postfixTuple, postfixTuple->size);
1851 :
1852 106 : XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
1853 106 : if (newBuffer != InvalidBuffer)
1854 : {
1855 : int flags;
1856 :
1857 3 : flags = REGBUF_STANDARD;
1858 3 : if (xlrec.newPage)
1859 2 : flags |= REGBUF_WILL_INIT;
1860 3 : XLogRegisterBuffer(1, newBuffer, flags);
1861 : }
1862 :
1863 106 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE);
1864 :
1865 106 : PageSetLSN(current->page, recptr);
1866 :
1867 106 : if (newBuffer != InvalidBuffer)
1868 : {
1869 3 : PageSetLSN(BufferGetPage(newBuffer), recptr);
1870 : }
1871 : }
1872 :
1873 106 : END_CRIT_SECTION();
1874 :
1875 : /* Update local free-space cache and release buffer */
1876 106 : if (newBuffer != InvalidBuffer)
1877 : {
1878 3 : SpGistSetLastUsedPage(index, newBuffer);
1879 3 : UnlockReleaseBuffer(newBuffer);
1880 : }
1881 106 : }
1882 :
1883 : /*
1884 : * Insert one item into the index.
1885 : *
1886 : * Returns true on success, false if we failed to complete the insertion
1887 : * because of conflict with a concurrent insert. In the latter case,
1888 : * caller should re-call spgdoinsert() with the same args.
1889 : */
1890 : bool
1891 82845 : spgdoinsert(Relation index, SpGistState *state,
1892 : ItemPointer heapPtr, Datum datum, bool isnull)
1893 : {
1894 82845 : int level = 0;
1895 : Datum leafDatum;
1896 : int leafSize;
1897 : SPPageDesc current,
1898 : parent;
1899 82845 : FmgrInfo *procinfo = NULL;
1900 :
1901 : /*
1902 : * Look up FmgrInfo of the user-defined choose function once, to save
1903 : * cycles in the loop below.
1904 : */
1905 82845 : if (!isnull)
1906 82836 : procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
1907 :
1908 : /*
1909 : * Since we don't use index_form_tuple in this AM, we have to make sure
1910 : * value to be inserted is not toasted; FormIndexDatum doesn't guarantee
1911 : * that.
1912 : */
1913 82845 : if (!isnull && state->attType.attlen == -1)
1914 29768 : datum = PointerGetDatum(PG_DETOAST_DATUM(datum));
1915 :
1916 82845 : leafDatum = datum;
1917 :
1918 : /*
1919 : * Compute space needed for a leaf tuple containing the given datum.
1920 : *
1921 : * If it isn't gonna fit, and the opclass can't reduce the datum size by
1922 : * suffixing, bail out now rather than getting into an endless loop.
1923 : */
1924 82845 : if (!isnull)
1925 82836 : leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
1926 82836 : SpGistGetTypeSize(&state->attType, leafDatum);
1927 : else
1928 9 : leafSize = SGDTSIZE + sizeof(ItemIdData);
1929 :
1930 82845 : if (leafSize > SPGIST_PAGE_CAPACITY && !state->config.longValuesOK)
1931 0 : ereport(ERROR,
1932 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1933 : errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
1934 : leafSize - sizeof(ItemIdData),
1935 : SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
1936 : RelationGetRelationName(index)),
1937 : errhint("Values larger than a buffer page cannot be indexed.")));
1938 :
1939 : /* Initialize "current" to the appropriate root page */
1940 82845 : current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
1941 82845 : current.buffer = InvalidBuffer;
1942 82845 : current.page = NULL;
1943 82845 : current.offnum = FirstOffsetNumber;
1944 82845 : current.node = -1;
1945 :
1946 : /* "parent" is invalid for the moment */
1947 82845 : parent.blkno = InvalidBlockNumber;
1948 82845 : parent.buffer = InvalidBuffer;
1949 82845 : parent.page = NULL;
1950 82845 : parent.offnum = InvalidOffsetNumber;
1951 82845 : parent.node = -1;
1952 :
1953 : for (;;)
1954 : {
1955 1595982 : bool isNew = false;
1956 :
1957 : /*
1958 : * Bail out if query cancel is pending. We must have this somewhere
1959 : * in the loop since a broken opclass could produce an infinite
1960 : * picksplit loop.
1961 : */
1962 1595982 : CHECK_FOR_INTERRUPTS();
1963 :
1964 1595982 : if (current.blkno == InvalidBlockNumber)
1965 : {
1966 : /*
1967 : * Create a leaf page. If leafSize is too large to fit on a page,
1968 : * we won't actually use the page yet, but it simplifies the API
1969 : * for doPickSplit to always have a leaf page at hand; so just
1970 : * quietly limit our request to a page size.
1971 : */
1972 262 : current.buffer =
1973 262 : SpGistGetBuffer(index,
1974 : GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
1975 262 : Min(leafSize, SPGIST_PAGE_CAPACITY),
1976 : &isNew);
1977 262 : current.blkno = BufferGetBlockNumber(current.buffer);
1978 : }
1979 1595720 : else if (parent.buffer == InvalidBuffer)
1980 : {
1981 : /* we hold no parent-page lock, so no deadlock is possible */
1982 82845 : current.buffer = ReadBuffer(index, current.blkno);
1983 82845 : LockBuffer(current.buffer, BUFFER_LOCK_EXCLUSIVE);
1984 : }
1985 1512875 : else if (current.blkno != parent.blkno)
1986 : {
1987 : /* descend to a new child page */
1988 159323 : current.buffer = ReadBuffer(index, current.blkno);
1989 :
1990 : /*
1991 : * Attempt to acquire lock on child page. We must beware of
1992 : * deadlock against another insertion process descending from that
1993 : * page to our parent page (see README). If we fail to get lock,
1994 : * abandon the insertion and tell our caller to start over.
1995 : *
1996 : * XXX this could be improved, because failing to get lock on a
1997 : * buffer is not proof of a deadlock situation; the lock might be
1998 : * held by a reader, or even just background writer/checkpointer
1999 : * process. Perhaps it'd be worth retrying after sleeping a bit?
2000 : */
2001 159323 : if (!ConditionalLockBuffer(current.buffer))
2002 : {
2003 0 : ReleaseBuffer(current.buffer);
2004 0 : UnlockReleaseBuffer(parent.buffer);
2005 0 : return false;
2006 : }
2007 : }
2008 : else
2009 : {
2010 : /* inner tuple can be stored on the same page as parent one */
2011 1353552 : current.buffer = parent.buffer;
2012 : }
2013 1595982 : current.page = BufferGetPage(current.buffer);
2014 :
2015 : /* should not arrive at a page of the wrong type */
2016 3191955 : if (isnull ? !SpGistPageStoresNulls(current.page) :
2017 1595973 : SpGistPageStoresNulls(current.page))
2018 0 : elog(ERROR, "SPGiST index page %u has wrong nulls flag",
2019 : current.blkno);
2020 :
2021 1595982 : if (SpGistPageIsLeaf(current.page))
2022 : {
2023 : SpGistLeafTuple leafTuple;
2024 : int nToSplit,
2025 : sizeToSplit;
2026 :
2027 82867 : leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum, isnull);
2028 165734 : if (leafTuple->size + sizeof(ItemIdData) <=
2029 82867 : SpGistPageGetFreeSpace(current.page, 1))
2030 : {
2031 : /* it fits on page, so insert it and we're done */
2032 82227 : addLeafTuple(index, state, leafTuple,
2033 : ¤t, &parent, isnull, isNew);
2034 82227 : break;
2035 : }
2036 1280 : else if ((sizeToSplit =
2037 : checkSplitConditions(index, state, ¤t,
2038 890 : &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
2039 371 : nToSplit < 64 &&
2040 121 : leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
2041 : {
2042 : /*
2043 : * the amount of data is pretty small, so just move the whole
2044 : * chain to another leaf page rather than splitting it.
2045 : */
2046 121 : Assert(!isNew);
2047 121 : moveLeafs(index, state, ¤t, &parent, leafTuple, isnull);
2048 121 : break; /* we're done */
2049 : }
2050 : else
2051 : {
2052 : /* picksplit */
2053 519 : if (doPickSplit(index, state, ¤t, &parent,
2054 : leafTuple, level, isnull, isNew))
2055 497 : break; /* doPickSplit installed new tuples */
2056 :
2057 : /* leaf tuple will not be inserted yet */
2058 22 : pfree(leafTuple);
2059 :
2060 : /*
2061 : * current now describes new inner tuple, go insert into it
2062 : */
2063 22 : Assert(!SpGistPageIsLeaf(current.page));
2064 22 : goto process_inner_tuple;
2065 : }
2066 : }
2067 : else /* non-leaf page */
2068 : {
2069 : /*
2070 : * Apply the opclass choose function to figure out how to insert
2071 : * the given datum into the current inner tuple.
2072 : */
2073 : SpGistInnerTuple innerTuple;
2074 : spgChooseIn in;
2075 : spgChooseOut out;
2076 :
2077 : /*
2078 : * spgAddNode and spgSplitTuple cases will loop back to here to
2079 : * complete the insertion operation. Just in case the choose
2080 : * function is broken and produces add or split requests
2081 : * repeatedly, check for query cancel.
2082 : */
2083 : process_inner_tuple:
2084 1513466 : CHECK_FOR_INTERRUPTS();
2085 :
2086 1513466 : innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
2087 : PageGetItemId(current.page, current.offnum));
2088 :
2089 1513466 : in.datum = datum;
2090 1513466 : in.leafDatum = leafDatum;
2091 1513466 : in.level = level;
2092 1513466 : in.allTheSame = innerTuple->allTheSame;
2093 1513466 : in.hasPrefix = (innerTuple->prefixSize > 0);
2094 1513466 : in.prefixDatum = SGITDATUM(innerTuple, state);
2095 1513466 : in.nNodes = innerTuple->nNodes;
2096 1513466 : in.nodeLabels = spgExtractNodeLabels(state, innerTuple);
2097 :
2098 1513466 : memset(&out, 0, sizeof(out));
2099 :
2100 1513466 : if (!isnull)
2101 : {
2102 : /* use user-defined choose method */
2103 3026932 : FunctionCall2Coll(procinfo,
2104 1513466 : index->rd_indcollation[0],
2105 : PointerGetDatum(&in),
2106 : PointerGetDatum(&out));
2107 : }
2108 : else
2109 : {
2110 : /* force "match" action (to insert to random subnode) */
2111 0 : out.resultType = spgMatchNode;
2112 : }
2113 :
2114 1513466 : if (innerTuple->allTheSame)
2115 : {
2116 : /*
2117 : * It's not allowed to do an AddNode at an allTheSame tuple.
2118 : * Opclass must say "match", in which case we choose a random
2119 : * one of the nodes to descend into, or "split".
2120 : */
2121 5047 : if (out.resultType == spgAddNode)
2122 0 : elog(ERROR, "cannot add a node to an allTheSame inner tuple");
2123 5047 : else if (out.resultType == spgMatchNode)
2124 5045 : out.result.matchNode.nodeN = random() % innerTuple->nNodes;
2125 : }
2126 :
2127 1513466 : switch (out.resultType)
2128 : {
2129 : case spgMatchNode:
2130 : /* Descend to N'th child node */
2131 1513137 : spgMatchNodeAction(index, state, innerTuple,
2132 : ¤t, &parent,
2133 : out.result.matchNode.nodeN);
2134 : /* Adjust level as per opclass request */
2135 1513137 : level += out.result.matchNode.levelAdd;
2136 : /* Replace leafDatum and recompute leafSize */
2137 1513137 : if (!isnull)
2138 : {
2139 1513137 : leafDatum = out.result.matchNode.restDatum;
2140 1513137 : leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
2141 1513137 : SpGistGetTypeSize(&state->attType, leafDatum);
2142 : }
2143 :
2144 : /*
2145 : * Loop around and attempt to insert the new leafDatum at
2146 : * "current" (which might reference an existing child
2147 : * tuple, or might be invalid to force us to find a new
2148 : * page for the tuple).
2149 : *
2150 : * Note: if the opclass sets longValuesOK, we rely on the
2151 : * choose function to eventually shorten the leafDatum
2152 : * enough to fit on a page. We could add a test here to
2153 : * complain if the datum doesn't get visibly shorter each
2154 : * time, but that could get in the way of opclasses that
2155 : * "simplify" datums in a way that doesn't necessarily
2156 : * lead to physical shortening on every cycle.
2157 : */
2158 1513137 : break;
2159 : case spgAddNode:
2160 : /* AddNode is not sensible if nodes don't have labels */
2161 223 : if (in.nodeLabels == NULL)
2162 0 : elog(ERROR, "cannot add a node to an inner tuple without node labels");
2163 : /* Add node to inner tuple, per request */
2164 223 : spgAddNodeAction(index, state, innerTuple,
2165 : ¤t, &parent,
2166 : out.result.addNode.nodeN,
2167 : out.result.addNode.nodeLabel);
2168 :
2169 : /*
2170 : * Retry insertion into the enlarged node. We assume that
2171 : * we'll get a MatchNode result this time.
2172 : */
2173 223 : goto process_inner_tuple;
2174 : break;
2175 : case spgSplitTuple:
2176 : /* Split inner tuple, per request */
2177 106 : spgSplitNodeAction(index, state, innerTuple,
2178 : ¤t, &out);
2179 :
2180 : /* Retry insertion into the split node */
2181 106 : goto process_inner_tuple;
2182 : break;
2183 : default:
2184 0 : elog(ERROR, "unrecognized SPGiST choose result: %d",
2185 : (int) out.resultType);
2186 : break;
2187 : }
2188 : }
2189 1513137 : } /* end loop */
2190 :
2191 : /*
2192 : * Release any buffers we're still holding. Beware of possibility that
2193 : * current and parent reference same buffer.
2194 : */
2195 82845 : if (current.buffer != InvalidBuffer)
2196 : {
2197 82845 : SpGistSetLastUsedPage(index, current.buffer);
2198 82845 : UnlockReleaseBuffer(current.buffer);
2199 : }
2200 164010 : if (parent.buffer != InvalidBuffer &&
2201 81165 : parent.buffer != current.buffer)
2202 : {
2203 80747 : SpGistSetLastUsedPage(index, parent.buffer);
2204 80747 : UnlockReleaseBuffer(parent.buffer);
2205 : }
2206 :
2207 82845 : return true;
2208 : }
|