Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * gist.c
4 : * interface routines for the postgres GiST index access method.
5 : *
6 : *
7 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994, Regents of the University of California
9 : *
10 : * IDENTIFICATION
11 : * src/backend/access/gist/gist.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : #include "postgres.h"
16 :
17 : #include "access/gist_private.h"
18 : #include "access/gistscan.h"
19 : #include "catalog/pg_collation.h"
20 : #include "miscadmin.h"
21 : #include "nodes/execnodes.h"
22 : #include "utils/builtins.h"
23 : #include "utils/index_selfuncs.h"
24 : #include "utils/memutils.h"
25 : #include "utils/rel.h"
26 :
27 :
28 : /* non-export function prototypes */
29 : static void gistfixsplit(GISTInsertState *state, GISTSTATE *giststate);
30 : static bool gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
31 : GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum);
32 : static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
33 : GISTSTATE *giststate,
34 : IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
35 : Buffer leftchild, Buffer rightchild,
36 : bool unlockbuf, bool unlockleftchild);
37 : static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
38 : GISTSTATE *giststate, List *splitinfo, bool releasebuf);
39 : static void gistvacuumpage(Relation rel, Page page, Buffer buffer);
40 :
41 :
42 : #define ROTATEDIST(d) do { \
43 : SplitedPageLayout *tmp=(SplitedPageLayout*)palloc(sizeof(SplitedPageLayout)); \
44 : memset(tmp,0,sizeof(SplitedPageLayout)); \
45 : tmp->block.blkno = InvalidBlockNumber; \
46 : tmp->buffer = InvalidBuffer; \
47 : tmp->next = (d); \
48 : (d)=tmp; \
49 : } while(0)
50 :
51 :
52 : /*
53 : * GiST handler function: return IndexAmRoutine with access method parameters
54 : * and callbacks.
55 : */
56 : Datum
57 230 : gisthandler(PG_FUNCTION_ARGS)
58 : {
59 230 : IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
60 :
61 230 : amroutine->amstrategies = 0;
62 230 : amroutine->amsupport = GISTNProcs;
63 230 : amroutine->amcanorder = false;
64 230 : amroutine->amcanorderbyop = true;
65 230 : amroutine->amcanbackward = false;
66 230 : amroutine->amcanunique = false;
67 230 : amroutine->amcanmulticol = true;
68 230 : amroutine->amoptionalkey = true;
69 230 : amroutine->amsearcharray = false;
70 230 : amroutine->amsearchnulls = true;
71 230 : amroutine->amstorage = true;
72 230 : amroutine->amclusterable = true;
73 230 : amroutine->ampredlocks = false;
74 230 : amroutine->amcanparallel = false;
75 230 : amroutine->amkeytype = InvalidOid;
76 :
77 230 : amroutine->ambuild = gistbuild;
78 230 : amroutine->ambuildempty = gistbuildempty;
79 230 : amroutine->aminsert = gistinsert;
80 230 : amroutine->ambulkdelete = gistbulkdelete;
81 230 : amroutine->amvacuumcleanup = gistvacuumcleanup;
82 230 : amroutine->amcanreturn = gistcanreturn;
83 230 : amroutine->amcostestimate = gistcostestimate;
84 230 : amroutine->amoptions = gistoptions;
85 230 : amroutine->amproperty = gistproperty;
86 230 : amroutine->amvalidate = gistvalidate;
87 230 : amroutine->ambeginscan = gistbeginscan;
88 230 : amroutine->amrescan = gistrescan;
89 230 : amroutine->amgettuple = gistgettuple;
90 230 : amroutine->amgetbitmap = gistgetbitmap;
91 230 : amroutine->amendscan = gistendscan;
92 230 : amroutine->ammarkpos = NULL;
93 230 : amroutine->amrestrpos = NULL;
94 230 : amroutine->amestimateparallelscan = NULL;
95 230 : amroutine->aminitparallelscan = NULL;
96 230 : amroutine->amparallelrescan = NULL;
97 :
98 230 : PG_RETURN_POINTER(amroutine);
99 : }
100 :
101 : /*
102 : * Create and return a temporary memory context for use by GiST. We
103 : * _always_ invoke user-provided methods in a temporary memory
104 : * context, so that memory leaks in those functions cannot cause
105 : * problems. Also, we use some additional temporary contexts in the
106 : * GiST code itself, to avoid the need to do some awkward manual
107 : * memory management.
108 : */
109 : MemoryContext
110 165 : createTempGistContext(void)
111 : {
112 165 : return AllocSetContextCreate(CurrentMemoryContext,
113 : "GiST temporary context",
114 : ALLOCSET_DEFAULT_SIZES);
115 : }
116 :
117 : /*
118 : * gistbuildempty() -- build an empty gist index in the initialization fork
119 : */
120 : void
121 0 : gistbuildempty(Relation index)
122 : {
123 : Buffer buffer;
124 :
125 : /* Initialize the root page */
126 0 : buffer = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
127 0 : LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
128 :
129 : /* Initialize and xlog buffer */
130 0 : START_CRIT_SECTION();
131 0 : GISTInitBuffer(buffer, F_LEAF);
132 0 : MarkBufferDirty(buffer);
133 0 : log_newpage_buffer(buffer, true);
134 0 : END_CRIT_SECTION();
135 :
136 : /* Unlock and release the buffer */
137 0 : UnlockReleaseBuffer(buffer);
138 0 : }
139 :
140 : /*
141 : * gistinsert -- wrapper for GiST tuple insertion.
142 : *
143 : * This is the public interface routine for tuple insertion in GiSTs.
144 : * It doesn't do any work; just locks the relation and passes the buck.
145 : */
146 : bool
147 26219 : gistinsert(Relation r, Datum *values, bool *isnull,
148 : ItemPointer ht_ctid, Relation heapRel,
149 : IndexUniqueCheck checkUnique,
150 : IndexInfo *indexInfo)
151 : {
152 26219 : GISTSTATE *giststate = (GISTSTATE *) indexInfo->ii_AmCache;
153 : IndexTuple itup;
154 : MemoryContext oldCxt;
155 :
156 : /* Initialize GISTSTATE cache if first call in this statement */
157 26219 : if (giststate == NULL)
158 : {
159 28 : oldCxt = MemoryContextSwitchTo(indexInfo->ii_Context);
160 28 : giststate = initGISTstate(r);
161 28 : giststate->tempCxt = createTempGistContext();
162 28 : indexInfo->ii_AmCache = (void *) giststate;
163 28 : MemoryContextSwitchTo(oldCxt);
164 : }
165 :
166 26219 : oldCxt = MemoryContextSwitchTo(giststate->tempCxt);
167 :
168 26219 : itup = gistFormTuple(giststate, r,
169 : values, isnull, true /* size is currently bogus */ );
170 26219 : itup->t_tid = *ht_ctid;
171 :
172 26219 : gistdoinsert(r, itup, 0, giststate);
173 :
174 : /* cleanup */
175 26219 : MemoryContextSwitchTo(oldCxt);
176 26219 : MemoryContextReset(giststate->tempCxt);
177 :
178 26219 : return false;
179 : }
180 :
181 :
182 : /*
183 : * Place tuples from 'itup' to 'buffer'. If 'oldoffnum' is valid, the tuple
184 : * at that offset is atomically removed along with inserting the new tuples.
185 : * This is used to replace a tuple with a new one.
186 : *
187 : * If 'leftchildbuf' is valid, we're inserting the downlink for the page
188 : * to the right of 'leftchildbuf', or updating the downlink for 'leftchildbuf'.
189 : * F_FOLLOW_RIGHT flag on 'leftchildbuf' is cleared and NSN is set.
190 : *
191 : * If 'markfollowright' is true and the page is split, the left child is
192 : * marked with F_FOLLOW_RIGHT flag. That is the normal case. During buffered
193 : * index build, however, there is no concurrent access and the page splitting
194 : * is done in a slightly simpler fashion, and false is passed.
195 : *
196 : * If there is not enough room on the page, it is split. All the split
197 : * pages are kept pinned and locked and returned in *splitinfo, the caller
198 : * is responsible for inserting the downlinks for them. However, if
199 : * 'buffer' is the root page and it needs to be split, gistplacetopage()
200 : * performs the split as one atomic operation, and *splitinfo is set to NIL.
201 : * In that case, we continue to hold the root page locked, and the child
202 : * pages are released; note that new tuple(s) are *not* on the root page
203 : * but in one of the new child pages.
204 : *
205 : * If 'newblkno' is not NULL, returns the block number of page the first
206 : * new/updated tuple was inserted to. Usually it's the given page, but could
207 : * be its right sibling if the page was split.
208 : *
209 : * Returns 'true' if the page was split, 'false' otherwise.
210 : */
211 : bool
212 210768 : gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
213 : Buffer buffer,
214 : IndexTuple *itup, int ntup, OffsetNumber oldoffnum,
215 : BlockNumber *newblkno,
216 : Buffer leftchildbuf,
217 : List **splitinfo,
218 : bool markfollowright)
219 : {
220 210768 : BlockNumber blkno = BufferGetBlockNumber(buffer);
221 210768 : Page page = BufferGetPage(buffer);
222 210768 : bool is_leaf = (GistPageIsLeaf(page)) ? true : false;
223 : XLogRecPtr recptr;
224 : int i;
225 : bool is_split;
226 :
227 : /*
228 : * Refuse to modify a page that's incompletely split. This should not
229 : * happen because we finish any incomplete splits while we walk down the
230 : * tree. However, it's remotely possible that another concurrent inserter
231 : * splits a parent page, and errors out before completing the split. We
232 : * will just throw an error in that case, and leave any split we had in
233 : * progress unfinished too. The next insert that comes along will clean up
234 : * the mess.
235 : */
236 210768 : if (GistFollowRight(page))
237 0 : elog(ERROR, "concurrent GiST page split was incomplete");
238 :
239 210768 : *splitinfo = NIL;
240 :
241 : /*
242 : * if isupdate, remove old key: This node's key has been modified, either
243 : * because a child split occurred or because we needed to adjust our key
244 : * for an insert in a child node. Therefore, remove the old version of
245 : * this node's key.
246 : *
247 : * for WAL replay, in the non-split case we handle this by setting up a
248 : * one-element todelete array; in the split case, it's handled implicitly
249 : * because the tuple vector passed to gistSplit won't include this tuple.
250 : */
251 210768 : is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
252 :
253 : /*
254 : * If leaf page is full, try at first to delete dead tuples. And then
255 : * check again.
256 : */
257 210768 : if (is_split && GistPageIsLeaf(page) && GistPageHasGarbage(page))
258 : {
259 0 : gistvacuumpage(rel, page, buffer);
260 0 : is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
261 : }
262 :
263 210768 : if (is_split)
264 : {
265 : /* no space for insertion */
266 : IndexTuple *itvec;
267 : int tlen;
268 1337 : SplitedPageLayout *dist = NULL,
269 : *ptr;
270 1337 : BlockNumber oldrlink = InvalidBlockNumber;
271 1337 : GistNSN oldnsn = 0;
272 : SplitedPageLayout rootpg;
273 : bool is_rootsplit;
274 : int npage;
275 :
276 1337 : is_rootsplit = (blkno == GIST_ROOT_BLKNO);
277 :
278 : /*
279 : * Form index tuples vector to split. If we're replacing an old tuple,
280 : * remove the old version from the vector.
281 : */
282 1337 : itvec = gistextractpage(page, &tlen);
283 1337 : if (OffsetNumberIsValid(oldoffnum))
284 : {
285 : /* on inner page we should remove old tuple */
286 8 : int pos = oldoffnum - FirstOffsetNumber;
287 :
288 8 : tlen--;
289 8 : if (pos != tlen)
290 3 : memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos));
291 : }
292 1337 : itvec = gistjoinvector(itvec, &tlen, itup, ntup);
293 1337 : dist = gistSplit(rel, page, itvec, tlen, giststate);
294 :
295 : /*
296 : * Check that split didn't produce too many pages.
297 : */
298 1337 : npage = 0;
299 4011 : for (ptr = dist; ptr; ptr = ptr->next)
300 2674 : npage++;
301 : /* in a root split, we'll add one more page to the list below */
302 1337 : if (is_rootsplit)
303 18 : npage++;
304 1337 : if (npage > GIST_MAX_SPLIT_PAGES)
305 0 : elog(ERROR, "GiST page split into too many halves (%d, maximum %d)",
306 : npage, GIST_MAX_SPLIT_PAGES);
307 :
308 : /*
309 : * Set up pages to work with. Allocate new buffers for all but the
310 : * leftmost page. The original page becomes the new leftmost page, and
311 : * is just replaced with the new contents.
312 : *
313 : * For a root-split, allocate new buffers for all child pages, the
314 : * original page is overwritten with new root page containing
315 : * downlinks to the new child pages.
316 : */
317 1337 : ptr = dist;
318 1337 : if (!is_rootsplit)
319 : {
320 : /* save old rightlink and NSN */
321 1319 : oldrlink = GistPageGetOpaque(page)->rightlink;
322 1319 : oldnsn = GistPageGetNSN(page);
323 :
324 1319 : dist->buffer = buffer;
325 1319 : dist->block.blkno = BufferGetBlockNumber(buffer);
326 1319 : dist->page = PageGetTempPageCopySpecial(BufferGetPage(buffer));
327 :
328 : /* clean all flags except F_LEAF */
329 1319 : GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
330 :
331 1319 : ptr = ptr->next;
332 : }
333 2692 : for (; ptr; ptr = ptr->next)
334 : {
335 : /* Allocate new page */
336 1355 : ptr->buffer = gistNewBuffer(rel);
337 1355 : GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
338 1355 : ptr->page = BufferGetPage(ptr->buffer);
339 1355 : ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
340 : }
341 :
342 : /*
343 : * Now that we know which blocks the new pages go to, set up downlink
344 : * tuples to point to them.
345 : */
346 4011 : for (ptr = dist; ptr; ptr = ptr->next)
347 : {
348 2674 : ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
349 2674 : GistTupleSetValid(ptr->itup);
350 : }
351 :
352 : /*
353 : * If this is a root split, we construct the new root page with the
354 : * downlinks here directly, instead of requiring the caller to insert
355 : * them. Add the new root page to the list along with the child pages.
356 : */
357 1337 : if (is_rootsplit)
358 : {
359 : IndexTuple *downlinks;
360 18 : int ndownlinks = 0;
361 : int i;
362 :
363 18 : rootpg.buffer = buffer;
364 18 : rootpg.page = PageGetTempPageCopySpecial(BufferGetPage(rootpg.buffer));
365 18 : GistPageGetOpaque(rootpg.page)->flags = 0;
366 :
367 : /* Prepare a vector of all the downlinks */
368 54 : for (ptr = dist; ptr; ptr = ptr->next)
369 36 : ndownlinks++;
370 18 : downlinks = palloc(sizeof(IndexTuple) * ndownlinks);
371 54 : for (i = 0, ptr = dist; ptr; ptr = ptr->next)
372 36 : downlinks[i++] = ptr->itup;
373 :
374 18 : rootpg.block.blkno = GIST_ROOT_BLKNO;
375 18 : rootpg.block.num = ndownlinks;
376 18 : rootpg.list = gistfillitupvec(downlinks, ndownlinks,
377 : &(rootpg.lenlist));
378 18 : rootpg.itup = NULL;
379 :
380 18 : rootpg.next = dist;
381 18 : dist = &rootpg;
382 : }
383 : else
384 : {
385 : /* Prepare split-info to be returned to caller */
386 3957 : for (ptr = dist; ptr; ptr = ptr->next)
387 : {
388 2638 : GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
389 :
390 2638 : si->buf = ptr->buffer;
391 2638 : si->downlink = ptr->itup;
392 2638 : *splitinfo = lappend(*splitinfo, si);
393 : }
394 : }
395 :
396 : /*
397 : * Fill all pages. All the pages are new, ie. freshly allocated empty
398 : * pages, or a temporary copy of the old page.
399 : */
400 4029 : for (ptr = dist; ptr; ptr = ptr->next)
401 : {
402 2692 : char *data = (char *) (ptr->list);
403 :
404 199041 : for (i = 0; i < ptr->block.num; i++)
405 : {
406 196349 : IndexTuple thistup = (IndexTuple) data;
407 :
408 196349 : if (PageAddItem(ptr->page, (Item) data, IndexTupleSize(thistup), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber)
409 0 : elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(rel));
410 :
411 : /*
412 : * If this is the first inserted/updated tuple, let the caller
413 : * know which page it landed on.
414 : */
415 196349 : if (newblkno && ItemPointerEquals(&thistup->t_tid, &(*itup)->t_tid))
416 208 : *newblkno = ptr->block.blkno;
417 :
418 196349 : data += IndexTupleSize(thistup);
419 : }
420 :
421 : /* Set up rightlinks */
422 2692 : if (ptr->next && ptr->block.blkno != GIST_ROOT_BLKNO)
423 2674 : GistPageGetOpaque(ptr->page)->rightlink =
424 1337 : ptr->next->block.blkno;
425 : else
426 1355 : GistPageGetOpaque(ptr->page)->rightlink = oldrlink;
427 :
428 : /*
429 : * Mark the all but the right-most page with the follow-right
430 : * flag. It will be cleared as soon as the downlink is inserted
431 : * into the parent, but this ensures that if we error out before
432 : * that, the index is still consistent. (in buffering build mode,
433 : * any error will abort the index build anyway, so this is not
434 : * needed.)
435 : */
436 2692 : if (ptr->next && !is_rootsplit && markfollowright)
437 1112 : GistMarkFollowRight(ptr->page);
438 : else
439 1580 : GistClearFollowRight(ptr->page);
440 :
441 : /*
442 : * Copy the NSN of the original page to all pages. The
443 : * F_FOLLOW_RIGHT flags ensure that scans will follow the
444 : * rightlinks until the downlinks are inserted.
445 : */
446 2692 : GistPageSetNSN(ptr->page, oldnsn);
447 : }
448 :
449 : /*
450 : * gistXLogSplit() needs to WAL log a lot of pages, prepare WAL
451 : * insertion for that. NB: The number of pages and data segments
452 : * specified here must match the calculations in gistXLogSplit()!
453 : */
454 1337 : if (RelationNeedsWAL(rel))
455 1274 : XLogEnsureRecordSpace(npage, 1 + npage * 2);
456 :
457 1337 : START_CRIT_SECTION();
458 :
459 : /*
460 : * Must mark buffers dirty before XLogInsert, even though we'll still
461 : * be changing their opaque fields below.
462 : */
463 4029 : for (ptr = dist; ptr; ptr = ptr->next)
464 2692 : MarkBufferDirty(ptr->buffer);
465 1337 : if (BufferIsValid(leftchildbuf))
466 4 : MarkBufferDirty(leftchildbuf);
467 :
468 : /*
469 : * The first page in the chain was a temporary working copy meant to
470 : * replace the old page. Copy it over the old page.
471 : */
472 1337 : PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
473 1337 : dist->page = BufferGetPage(dist->buffer);
474 :
475 : /* Write the WAL record */
476 1337 : if (RelationNeedsWAL(rel))
477 1274 : recptr = gistXLogSplit(is_leaf,
478 : dist, oldrlink, oldnsn, leftchildbuf,
479 : markfollowright);
480 : else
481 63 : recptr = gistGetFakeLSN(rel);
482 :
483 4029 : for (ptr = dist; ptr; ptr = ptr->next)
484 : {
485 2692 : PageSetLSN(ptr->page, recptr);
486 : }
487 :
488 : /*
489 : * Return the new child buffers to the caller.
490 : *
491 : * If this was a root split, we've already inserted the downlink
492 : * pointers, in the form of a new root page. Therefore we can release
493 : * all the new buffers, and keep just the root page locked.
494 : */
495 1337 : if (is_rootsplit)
496 : {
497 54 : for (ptr = dist->next; ptr; ptr = ptr->next)
498 36 : UnlockReleaseBuffer(ptr->buffer);
499 : }
500 : }
501 : else
502 : {
503 : /*
504 : * Enough space. We always get here if ntup==0.
505 : */
506 209431 : START_CRIT_SECTION();
507 :
508 : /*
509 : * Delete old tuple if any, then insert new tuple(s) if any. If
510 : * possible, use the fast path of PageIndexTupleOverwrite.
511 : */
512 209431 : if (OffsetNumberIsValid(oldoffnum))
513 : {
514 83267 : if (ntup == 1)
515 : {
516 : /* One-for-one replacement, so use PageIndexTupleOverwrite */
517 81956 : if (!PageIndexTupleOverwrite(page, oldoffnum, (Item) *itup,
518 81956 : IndexTupleSize(*itup)))
519 0 : elog(ERROR, "failed to add item to index page in \"%s\"",
520 : RelationGetRelationName(rel));
521 : }
522 : else
523 : {
524 : /* Delete old, then append new tuple(s) to page */
525 1311 : PageIndexTupleDelete(page, oldoffnum);
526 1311 : gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
527 : }
528 : }
529 : else
530 : {
531 : /* Just append new tuples at the end of the page */
532 126164 : gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
533 : }
534 :
535 209431 : MarkBufferDirty(buffer);
536 :
537 209431 : if (BufferIsValid(leftchildbuf))
538 1108 : MarkBufferDirty(leftchildbuf);
539 :
540 209431 : if (RelationNeedsWAL(rel))
541 : {
542 201398 : OffsetNumber ndeloffs = 0,
543 : deloffs[1];
544 :
545 201398 : if (OffsetNumberIsValid(oldoffnum))
546 : {
547 82931 : deloffs[0] = oldoffnum;
548 82931 : ndeloffs = 1;
549 : }
550 :
551 201398 : recptr = gistXLogUpdate(buffer,
552 : deloffs, ndeloffs, itup, ntup,
553 : leftchildbuf);
554 :
555 201398 : PageSetLSN(page, recptr);
556 : }
557 : else
558 : {
559 8033 : recptr = gistGetFakeLSN(rel);
560 8033 : PageSetLSN(page, recptr);
561 : }
562 :
563 209431 : if (newblkno)
564 27609 : *newblkno = blkno;
565 : }
566 :
567 : /*
568 : * If we inserted the downlink for a child page, set NSN and clear
569 : * F_FOLLOW_RIGHT flag on the left child, so that concurrent scans know to
570 : * follow the rightlink if and only if they looked at the parent page
571 : * before we inserted the downlink.
572 : *
573 : * Note that we do this *after* writing the WAL record. That means that
574 : * the possible full page image in the WAL record does not include these
575 : * changes, and they must be replayed even if the page is restored from
576 : * the full page image. There's a chicken-and-egg problem: if we updated
577 : * the child pages first, we wouldn't know the recptr of the WAL record
578 : * we're about to write.
579 : */
580 210768 : if (BufferIsValid(leftchildbuf))
581 : {
582 1112 : Page leftpg = BufferGetPage(leftchildbuf);
583 :
584 1112 : GistPageSetNSN(leftpg, recptr);
585 1112 : GistClearFollowRight(leftpg);
586 :
587 1112 : PageSetLSN(leftpg, recptr);
588 : }
589 :
590 210768 : END_CRIT_SECTION();
591 :
592 210768 : return is_split;
593 : }
594 :
595 : /*
596 : * Workhouse routine for doing insertion into a GiST index. Note that
597 : * this routine assumes it is invoked in a short-lived memory context,
598 : * so it does not bother releasing palloc'd allocations.
599 : */
600 : void
601 111589 : gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
602 : {
603 : ItemId iid;
604 : IndexTuple idxtuple;
605 : GISTInsertStack firststack;
606 : GISTInsertStack *stack;
607 : GISTInsertState state;
608 111589 : bool xlocked = false;
609 :
610 111589 : memset(&state, 0, sizeof(GISTInsertState));
611 111589 : state.freespace = freespace;
612 111589 : state.r = r;
613 :
614 : /* Start from the root */
615 111589 : firststack.blkno = GIST_ROOT_BLKNO;
616 111589 : firststack.lsn = 0;
617 111589 : firststack.parent = NULL;
618 111589 : firststack.downlinkoffnum = InvalidOffsetNumber;
619 111589 : state.stack = stack = &firststack;
620 :
621 : /*
622 : * Walk down along the path of smallest penalty, updating the parent
623 : * pointers with the key we're inserting as we go. If we crash in the
624 : * middle, the tree is consistent, although the possible parent updates
625 : * were a waste.
626 : */
627 : for (;;)
628 : {
629 228605 : if (XLogRecPtrIsInvalid(stack->lsn))
630 228605 : stack->buffer = ReadBuffer(state.r, stack->blkno);
631 :
632 : /*
633 : * Be optimistic and grab shared lock first. Swap it for an exclusive
634 : * lock later if we need to update the page.
635 : */
636 228605 : if (!xlocked)
637 : {
638 228605 : LockBuffer(stack->buffer, GIST_SHARE);
639 228605 : gistcheckpage(state.r, stack->buffer);
640 : }
641 :
642 228605 : stack->page = (Page) BufferGetPage(stack->buffer);
643 228605 : stack->lsn = PageGetLSN(stack->page);
644 228605 : Assert(!RelationNeedsWAL(state.r) || !XLogRecPtrIsInvalid(stack->lsn));
645 :
646 : /*
647 : * If this page was split but the downlink was never inserted to the
648 : * parent because the inserting backend crashed before doing that, fix
649 : * that now.
650 : */
651 228605 : if (GistFollowRight(stack->page))
652 : {
653 0 : if (!xlocked)
654 : {
655 0 : LockBuffer(stack->buffer, GIST_UNLOCK);
656 0 : LockBuffer(stack->buffer, GIST_EXCLUSIVE);
657 0 : xlocked = true;
658 : /* someone might've completed the split when we unlocked */
659 0 : if (!GistFollowRight(stack->page))
660 0 : continue;
661 : }
662 0 : gistfixsplit(&state, giststate);
663 :
664 0 : UnlockReleaseBuffer(stack->buffer);
665 0 : xlocked = false;
666 0 : state.stack = stack = stack->parent;
667 0 : continue;
668 : }
669 :
670 345621 : if (stack->blkno != GIST_ROOT_BLKNO &&
671 117016 : stack->parent->lsn < GistPageGetNSN(stack->page))
672 : {
673 : /*
674 : * Concurrent split detected. There's no guarantee that the
675 : * downlink for this page is consistent with the tuple we're
676 : * inserting anymore, so go back to parent and rechoose the best
677 : * child.
678 : */
679 0 : UnlockReleaseBuffer(stack->buffer);
680 0 : xlocked = false;
681 0 : state.stack = stack = stack->parent;
682 0 : continue;
683 : }
684 :
685 228605 : if (!GistPageIsLeaf(stack->page))
686 : {
687 : /*
688 : * This is an internal page so continue to walk down the tree.
689 : * Find the child node that has the minimum insertion penalty.
690 : */
691 : BlockNumber childblkno;
692 : IndexTuple newtup;
693 : GISTInsertStack *item;
694 : OffsetNumber downlinkoffnum;
695 :
696 117016 : downlinkoffnum = gistchoose(state.r, stack->page, itup, giststate);
697 117016 : iid = PageGetItemId(stack->page, downlinkoffnum);
698 117016 : idxtuple = (IndexTuple) PageGetItem(stack->page, iid);
699 117016 : childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
700 :
701 : /*
702 : * Check that it's not a leftover invalid tuple from pre-9.1
703 : */
704 117016 : if (GistTupleIsInvalid(idxtuple))
705 0 : ereport(ERROR,
706 : (errmsg("index \"%s\" contains an inner tuple marked as invalid",
707 : RelationGetRelationName(r)),
708 : errdetail("This is caused by an incomplete page split at crash recovery before upgrading to PostgreSQL 9.1."),
709 : errhint("Please REINDEX it.")));
710 :
711 : /*
712 : * Check that the key representing the target child node is
713 : * consistent with the key we're inserting. Update it if it's not.
714 : */
715 117016 : newtup = gistgetadjusted(state.r, idxtuple, itup, giststate);
716 117016 : if (newtup)
717 : {
718 : /*
719 : * Swap shared lock for an exclusive one. Beware, the page may
720 : * change while we unlock/lock the page...
721 : */
722 70250 : if (!xlocked)
723 : {
724 70250 : LockBuffer(stack->buffer, GIST_UNLOCK);
725 70250 : LockBuffer(stack->buffer, GIST_EXCLUSIVE);
726 70250 : xlocked = true;
727 70250 : stack->page = (Page) BufferGetPage(stack->buffer);
728 :
729 70250 : if (PageGetLSN(stack->page) != stack->lsn)
730 : {
731 : /* the page was changed while we unlocked it, retry */
732 0 : continue;
733 : }
734 : }
735 :
736 : /*
737 : * Update the tuple.
738 : *
739 : * We still hold the lock after gistinserttuple(), but it
740 : * might have to split the page to make the updated tuple fit.
741 : * In that case the updated tuple might migrate to the other
742 : * half of the split, so we have to go back to the parent and
743 : * descend back to the half that's a better fit for the new
744 : * tuple.
745 : */
746 70250 : if (gistinserttuple(&state, stack, giststate, newtup,
747 : downlinkoffnum))
748 : {
749 : /*
750 : * If this was a root split, the root page continues to be
751 : * the parent and the updated tuple went to one of the
752 : * child pages, so we just need to retry from the root
753 : * page.
754 : */
755 0 : if (stack->blkno != GIST_ROOT_BLKNO)
756 : {
757 0 : UnlockReleaseBuffer(stack->buffer);
758 0 : xlocked = false;
759 0 : state.stack = stack = stack->parent;
760 : }
761 0 : continue;
762 : }
763 : }
764 117016 : LockBuffer(stack->buffer, GIST_UNLOCK);
765 117016 : xlocked = false;
766 :
767 : /* descend to the chosen child */
768 117016 : item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
769 117016 : item->blkno = childblkno;
770 117016 : item->parent = stack;
771 117016 : item->downlinkoffnum = downlinkoffnum;
772 117016 : state.stack = stack = item;
773 : }
774 : else
775 : {
776 : /*
777 : * Leaf page. Insert the new key. We've already updated all the
778 : * parents on the way down, but we might have to split the page if
779 : * it doesn't fit. gistinserthere() will take care of that.
780 : */
781 :
782 : /*
783 : * Swap shared lock for an exclusive one. Be careful, the page may
784 : * change while we unlock/lock the page...
785 : */
786 111589 : if (!xlocked)
787 : {
788 111589 : LockBuffer(stack->buffer, GIST_UNLOCK);
789 111589 : LockBuffer(stack->buffer, GIST_EXCLUSIVE);
790 111589 : xlocked = true;
791 111589 : stack->page = (Page) BufferGetPage(stack->buffer);
792 111589 : stack->lsn = PageGetLSN(stack->page);
793 :
794 111589 : if (stack->blkno == GIST_ROOT_BLKNO)
795 : {
796 : /*
797 : * the only page that can become inner instead of leaf is
798 : * the root page, so for root we should recheck it
799 : */
800 2570 : if (!GistPageIsLeaf(stack->page))
801 : {
802 : /*
803 : * very rare situation: during unlock/lock index with
804 : * number of pages = 1 was increased
805 : */
806 0 : LockBuffer(stack->buffer, GIST_UNLOCK);
807 0 : xlocked = false;
808 0 : continue;
809 : }
810 :
811 : /*
812 : * we don't need to check root split, because checking
813 : * leaf/inner is enough to recognize split for root
814 : */
815 : }
816 218038 : else if (GistFollowRight(stack->page) ||
817 109019 : stack->parent->lsn < GistPageGetNSN(stack->page))
818 : {
819 : /*
820 : * The page was split while we momentarily unlocked the
821 : * page. Go back to parent.
822 : */
823 0 : UnlockReleaseBuffer(stack->buffer);
824 0 : xlocked = false;
825 0 : state.stack = stack = stack->parent;
826 0 : continue;
827 : }
828 : }
829 :
830 : /* now state.stack->(page, buffer and blkno) points to leaf page */
831 :
832 111589 : gistinserttuple(&state, stack, giststate, itup,
833 : InvalidOffsetNumber);
834 111589 : LockBuffer(stack->buffer, GIST_UNLOCK);
835 :
836 : /* Release any pins we might still hold before exiting */
837 340194 : for (; stack; stack = stack->parent)
838 228605 : ReleaseBuffer(stack->buffer);
839 111589 : break;
840 : }
841 117016 : }
842 111589 : }
843 :
844 : /*
845 : * Traverse the tree to find path from root page to specified "child" block.
846 : *
847 : * returns a new insertion stack, starting from the parent of "child", up
848 : * to the root. *downlinkoffnum is set to the offset of the downlink in the
849 : * direct parent of child.
850 : *
851 : * To prevent deadlocks, this should lock only one page at a time.
852 : */
853 : static GISTInsertStack *
854 0 : gistFindPath(Relation r, BlockNumber child, OffsetNumber *downlinkoffnum)
855 : {
856 : Page page;
857 : Buffer buffer;
858 : OffsetNumber i,
859 : maxoff;
860 : ItemId iid;
861 : IndexTuple idxtuple;
862 : List *fifo;
863 : GISTInsertStack *top,
864 : *ptr;
865 : BlockNumber blkno;
866 :
867 0 : top = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
868 0 : top->blkno = GIST_ROOT_BLKNO;
869 0 : top->downlinkoffnum = InvalidOffsetNumber;
870 :
871 0 : fifo = list_make1(top);
872 0 : while (fifo != NIL)
873 : {
874 : /* Get next page to visit */
875 0 : top = linitial(fifo);
876 0 : fifo = list_delete_first(fifo);
877 :
878 0 : buffer = ReadBuffer(r, top->blkno);
879 0 : LockBuffer(buffer, GIST_SHARE);
880 0 : gistcheckpage(r, buffer);
881 0 : page = (Page) BufferGetPage(buffer);
882 :
883 0 : if (GistPageIsLeaf(page))
884 : {
885 : /*
886 : * Because we scan the index top-down, all the rest of the pages
887 : * in the queue must be leaf pages as well.
888 : */
889 0 : UnlockReleaseBuffer(buffer);
890 0 : break;
891 : }
892 :
893 0 : top->lsn = PageGetLSN(page);
894 :
895 : /*
896 : * If F_FOLLOW_RIGHT is set, the page to the right doesn't have a
897 : * downlink. This should not normally happen..
898 : */
899 0 : if (GistFollowRight(page))
900 0 : elog(ERROR, "concurrent GiST page split was incomplete");
901 :
902 0 : if (top->parent && top->parent->lsn < GistPageGetNSN(page) &&
903 0 : GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */ )
904 : {
905 : /*
906 : * Page was split while we looked elsewhere. We didn't see the
907 : * downlink to the right page when we scanned the parent, so add
908 : * it to the queue now.
909 : *
910 : * Put the right page ahead of the queue, so that we visit it
911 : * next. That's important, because if this is the lowest internal
912 : * level, just above leaves, we might already have queued up some
913 : * leaf pages, and we assume that there can't be any non-leaf
914 : * pages behind leaf pages.
915 : */
916 0 : ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
917 0 : ptr->blkno = GistPageGetOpaque(page)->rightlink;
918 0 : ptr->downlinkoffnum = InvalidOffsetNumber;
919 0 : ptr->parent = top->parent;
920 :
921 0 : fifo = lcons(ptr, fifo);
922 : }
923 :
924 0 : maxoff = PageGetMaxOffsetNumber(page);
925 :
926 0 : for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
927 : {
928 0 : iid = PageGetItemId(page, i);
929 0 : idxtuple = (IndexTuple) PageGetItem(page, iid);
930 0 : blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
931 0 : if (blkno == child)
932 : {
933 : /* Found it! */
934 0 : UnlockReleaseBuffer(buffer);
935 0 : *downlinkoffnum = i;
936 0 : return top;
937 : }
938 : else
939 : {
940 : /* Append this child to the list of pages to visit later */
941 0 : ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
942 0 : ptr->blkno = blkno;
943 0 : ptr->downlinkoffnum = i;
944 0 : ptr->parent = top;
945 :
946 0 : fifo = lappend(fifo, ptr);
947 : }
948 : }
949 :
950 0 : UnlockReleaseBuffer(buffer);
951 : }
952 :
953 0 : elog(ERROR, "failed to re-find parent of a page in index \"%s\", block %u",
954 : RelationGetRelationName(r), child);
955 : return NULL; /* keep compiler quiet */
956 : }
957 :
958 : /*
959 : * Updates the stack so that child->parent is the correct parent of the
960 : * child. child->parent must be exclusively locked on entry, and will
961 : * remain so at exit, but it might not be the same page anymore.
962 : */
963 : static void
964 1112 : gistFindCorrectParent(Relation r, GISTInsertStack *child)
965 : {
966 1112 : GISTInsertStack *parent = child->parent;
967 :
968 1112 : gistcheckpage(r, parent->buffer);
969 1112 : parent->page = (Page) BufferGetPage(parent->buffer);
970 :
971 : /* here we don't need to distinguish between split and page update */
972 2224 : if (child->downlinkoffnum == InvalidOffsetNumber ||
973 1112 : parent->lsn != PageGetLSN(parent->page))
974 : {
975 : /* parent is changed, look child in right links until found */
976 : OffsetNumber i,
977 : maxoff;
978 : ItemId iid;
979 : IndexTuple idxtuple;
980 : GISTInsertStack *ptr;
981 :
982 : while (true)
983 : {
984 978 : maxoff = PageGetMaxOffsetNumber(parent->page);
985 46084 : for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
986 : {
987 46084 : iid = PageGetItemId(parent->page, i);
988 46084 : idxtuple = (IndexTuple) PageGetItem(parent->page, iid);
989 46084 : if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == child->blkno)
990 : {
991 : /* yes!!, found */
992 978 : child->downlinkoffnum = i;
993 978 : return;
994 : }
995 : }
996 :
997 0 : parent->blkno = GistPageGetOpaque(parent->page)->rightlink;
998 0 : UnlockReleaseBuffer(parent->buffer);
999 0 : if (parent->blkno == InvalidBlockNumber)
1000 : {
1001 : /*
1002 : * End of chain and still didn't find parent. It's a very-very
1003 : * rare situation when root splited.
1004 : */
1005 0 : break;
1006 : }
1007 0 : parent->buffer = ReadBuffer(r, parent->blkno);
1008 0 : LockBuffer(parent->buffer, GIST_EXCLUSIVE);
1009 0 : gistcheckpage(r, parent->buffer);
1010 0 : parent->page = (Page) BufferGetPage(parent->buffer);
1011 0 : }
1012 :
1013 : /*
1014 : * awful!!, we need search tree to find parent ... , but before we
1015 : * should release all old parent
1016 : */
1017 :
1018 0 : ptr = child->parent->parent; /* child->parent already released
1019 : * above */
1020 0 : while (ptr)
1021 : {
1022 0 : ReleaseBuffer(ptr->buffer);
1023 0 : ptr = ptr->parent;
1024 : }
1025 :
1026 : /* ok, find new path */
1027 0 : ptr = parent = gistFindPath(r, child->blkno, &child->downlinkoffnum);
1028 :
1029 : /* read all buffers as expected by caller */
1030 : /* note we don't lock them or gistcheckpage them here! */
1031 0 : while (ptr)
1032 : {
1033 0 : ptr->buffer = ReadBuffer(r, ptr->blkno);
1034 0 : ptr->page = (Page) BufferGetPage(ptr->buffer);
1035 0 : ptr = ptr->parent;
1036 : }
1037 :
1038 : /* install new chain of parents to stack */
1039 0 : child->parent = parent;
1040 :
1041 : /* make recursive call to normal processing */
1042 0 : LockBuffer(child->parent->buffer, GIST_EXCLUSIVE);
1043 0 : gistFindCorrectParent(r, child);
1044 : }
1045 :
1046 134 : return;
1047 : }
1048 :
1049 : /*
1050 : * Form a downlink pointer for the page in 'buf'.
1051 : */
1052 : static IndexTuple
1053 0 : gistformdownlink(Relation rel, Buffer buf, GISTSTATE *giststate,
1054 : GISTInsertStack *stack)
1055 : {
1056 0 : Page page = BufferGetPage(buf);
1057 : OffsetNumber maxoff;
1058 : OffsetNumber offset;
1059 0 : IndexTuple downlink = NULL;
1060 :
1061 0 : maxoff = PageGetMaxOffsetNumber(page);
1062 0 : for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset))
1063 : {
1064 0 : IndexTuple ituple = (IndexTuple)
1065 0 : PageGetItem(page, PageGetItemId(page, offset));
1066 :
1067 0 : if (downlink == NULL)
1068 0 : downlink = CopyIndexTuple(ituple);
1069 : else
1070 : {
1071 : IndexTuple newdownlink;
1072 :
1073 0 : newdownlink = gistgetadjusted(rel, downlink, ituple,
1074 : giststate);
1075 0 : if (newdownlink)
1076 0 : downlink = newdownlink;
1077 : }
1078 : }
1079 :
1080 : /*
1081 : * If the page is completely empty, we can't form a meaningful downlink
1082 : * for it. But we have to insert a downlink for the page. Any key will do,
1083 : * as long as its consistent with the downlink of parent page, so that we
1084 : * can legally insert it to the parent. A minimal one that matches as few
1085 : * scans as possible would be best, to keep scans from doing useless work,
1086 : * but we don't know how to construct that. So we just use the downlink of
1087 : * the original page that was split - that's as far from optimal as it can
1088 : * get but will do..
1089 : */
1090 0 : if (!downlink)
1091 : {
1092 : ItemId iid;
1093 :
1094 0 : LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1095 0 : gistFindCorrectParent(rel, stack);
1096 0 : iid = PageGetItemId(stack->parent->page, stack->downlinkoffnum);
1097 0 : downlink = (IndexTuple) PageGetItem(stack->parent->page, iid);
1098 0 : downlink = CopyIndexTuple(downlink);
1099 0 : LockBuffer(stack->parent->buffer, GIST_UNLOCK);
1100 : }
1101 :
1102 0 : ItemPointerSetBlockNumber(&(downlink->t_tid), BufferGetBlockNumber(buf));
1103 0 : GistTupleSetValid(downlink);
1104 :
1105 0 : return downlink;
1106 : }
1107 :
1108 :
1109 : /*
1110 : * Complete the incomplete split of state->stack->page.
1111 : */
1112 : static void
1113 0 : gistfixsplit(GISTInsertState *state, GISTSTATE *giststate)
1114 : {
1115 0 : GISTInsertStack *stack = state->stack;
1116 : Buffer buf;
1117 : Page page;
1118 0 : List *splitinfo = NIL;
1119 :
1120 0 : elog(LOG, "fixing incomplete split in index \"%s\", block %u",
1121 : RelationGetRelationName(state->r), stack->blkno);
1122 :
1123 0 : Assert(GistFollowRight(stack->page));
1124 0 : Assert(OffsetNumberIsValid(stack->downlinkoffnum));
1125 :
1126 0 : buf = stack->buffer;
1127 :
1128 : /*
1129 : * Read the chain of split pages, following the rightlinks. Construct a
1130 : * downlink tuple for each page.
1131 : */
1132 : for (;;)
1133 : {
1134 0 : GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
1135 : IndexTuple downlink;
1136 :
1137 0 : page = BufferGetPage(buf);
1138 :
1139 : /* Form the new downlink tuples to insert to parent */
1140 0 : downlink = gistformdownlink(state->r, buf, giststate, stack);
1141 :
1142 0 : si->buf = buf;
1143 0 : si->downlink = downlink;
1144 :
1145 0 : splitinfo = lappend(splitinfo, si);
1146 :
1147 0 : if (GistFollowRight(page))
1148 : {
1149 : /* lock next page */
1150 0 : buf = ReadBuffer(state->r, GistPageGetOpaque(page)->rightlink);
1151 0 : LockBuffer(buf, GIST_EXCLUSIVE);
1152 : }
1153 : else
1154 0 : break;
1155 0 : }
1156 :
1157 : /* Insert the downlinks */
1158 0 : gistfinishsplit(state, stack, giststate, splitinfo, false);
1159 0 : }
1160 :
1161 : /*
1162 : * Insert or replace a tuple in stack->buffer. If 'oldoffnum' is valid, the
1163 : * tuple at 'oldoffnum' is replaced, otherwise the tuple is inserted as new.
1164 : * 'stack' represents the path from the root to the page being updated.
1165 : *
1166 : * The caller must hold an exclusive lock on stack->buffer. The lock is still
1167 : * held on return, but the page might not contain the inserted tuple if the
1168 : * page was split. The function returns true if the page was split, false
1169 : * otherwise.
1170 : */
1171 : static bool
1172 181839 : gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
1173 : GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum)
1174 : {
1175 181839 : return gistinserttuples(state, stack, giststate, &tuple, 1, oldoffnum,
1176 : InvalidBuffer, InvalidBuffer, false, false);
1177 : }
1178 :
1179 : /* ----------------
1180 : * An extended workhorse version of gistinserttuple(). This version allows
1181 : * inserting multiple tuples, or replacing a single tuple with multiple tuples.
1182 : * This is used to recursively update the downlinks in the parent when a page
1183 : * is split.
1184 : *
1185 : * If leftchild and rightchild are valid, we're inserting/replacing the
1186 : * downlink for rightchild, and leftchild is its left sibling. We clear the
1187 : * F_FOLLOW_RIGHT flag and update NSN on leftchild, atomically with the
1188 : * insertion of the downlink.
1189 : *
1190 : * To avoid holding locks for longer than necessary, when recursing up the
1191 : * tree to update the parents, the locking is a bit peculiar here. On entry,
1192 : * the caller must hold an exclusive lock on stack->buffer, as well as
1193 : * leftchild and rightchild if given. On return:
1194 : *
1195 : * - Lock on stack->buffer is released, if 'unlockbuf' is true. The page is
1196 : * always kept pinned, however.
1197 : * - Lock on 'leftchild' is released, if 'unlockleftchild' is true. The page
1198 : * is kept pinned.
1199 : * - Lock and pin on 'rightchild' are always released.
1200 : *
1201 : * Returns 'true' if the page had to be split. Note that if the page was
1202 : * split, the inserted/updated tuples might've been inserted to a right
1203 : * sibling of stack->buffer instead of stack->buffer itself.
1204 : */
1205 : static bool
1206 182951 : gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
1207 : GISTSTATE *giststate,
1208 : IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
1209 : Buffer leftchild, Buffer rightchild,
1210 : bool unlockbuf, bool unlockleftchild)
1211 : {
1212 : List *splitinfo;
1213 : bool is_split;
1214 :
1215 : /* Insert the tuple(s) to the page, splitting the page if necessary */
1216 182951 : is_split = gistplacetopage(state->r, state->freespace, giststate,
1217 : stack->buffer,
1218 : tuples, ntup,
1219 : oldoffnum, NULL,
1220 : leftchild,
1221 : &splitinfo,
1222 : true);
1223 :
1224 : /*
1225 : * Before recursing up in case the page was split, release locks on the
1226 : * child pages. We don't need to keep them locked when updating the
1227 : * parent.
1228 : */
1229 182951 : if (BufferIsValid(rightchild))
1230 1112 : UnlockReleaseBuffer(rightchild);
1231 182951 : if (BufferIsValid(leftchild) && unlockleftchild)
1232 2 : LockBuffer(leftchild, GIST_UNLOCK);
1233 :
1234 : /*
1235 : * If we had to split, insert/update the downlinks in the parent. If the
1236 : * caller requested us to release the lock on stack->buffer, tell
1237 : * gistfinishsplit() to do that as soon as it's safe to do so. If we
1238 : * didn't have to split, release it ourselves.
1239 : */
1240 182951 : if (splitinfo)
1241 1112 : gistfinishsplit(state, stack, giststate, splitinfo, unlockbuf);
1242 181839 : else if (unlockbuf)
1243 1110 : LockBuffer(stack->buffer, GIST_UNLOCK);
1244 :
1245 182951 : return is_split;
1246 : }
1247 :
1248 : /*
1249 : * Finish an incomplete split by inserting/updating the downlinks in parent
1250 : * page. 'splitinfo' contains all the child pages involved in the split,
1251 : * from left-to-right.
1252 : *
1253 : * On entry, the caller must hold a lock on stack->buffer and all the child
1254 : * pages in 'splitinfo'. If 'unlockbuf' is true, the lock on stack->buffer is
1255 : * released on return. The child pages are always unlocked and unpinned.
1256 : */
1257 : static void
1258 1112 : gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
1259 : GISTSTATE *giststate, List *splitinfo, bool unlockbuf)
1260 : {
1261 : ListCell *lc;
1262 : List *reversed;
1263 : GISTPageSplitInfo *right;
1264 : GISTPageSplitInfo *left;
1265 : IndexTuple tuples[2];
1266 :
1267 : /* A split always contains at least two halves */
1268 1112 : Assert(list_length(splitinfo) >= 2);
1269 :
1270 : /*
1271 : * We need to insert downlinks for each new page, and update the downlink
1272 : * for the original (leftmost) page in the split. Begin at the rightmost
1273 : * page, inserting one downlink at a time until there's only two pages
1274 : * left. Finally insert the downlink for the last new page and update the
1275 : * downlink for the original page as one operation.
1276 : */
1277 :
1278 : /* for convenience, create a copy of the list in reverse order */
1279 1112 : reversed = NIL;
1280 3336 : foreach(lc, splitinfo)
1281 : {
1282 2224 : reversed = lcons(lfirst(lc), reversed);
1283 : }
1284 :
1285 1112 : LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1286 1112 : gistFindCorrectParent(state->r, stack);
1287 :
1288 : /*
1289 : * insert downlinks for the siblings from right to left, until there are
1290 : * only two siblings left.
1291 : */
1292 2224 : while (list_length(reversed) > 2)
1293 : {
1294 0 : right = (GISTPageSplitInfo *) linitial(reversed);
1295 0 : left = (GISTPageSplitInfo *) lsecond(reversed);
1296 :
1297 0 : if (gistinserttuples(state, stack->parent, giststate,
1298 : &right->downlink, 1,
1299 : InvalidOffsetNumber,
1300 : left->buf, right->buf, false, false))
1301 : {
1302 : /*
1303 : * If the parent page was split, need to relocate the original
1304 : * parent pointer.
1305 : */
1306 0 : gistFindCorrectParent(state->r, stack);
1307 : }
1308 : /* gistinserttuples() released the lock on right->buf. */
1309 0 : reversed = list_delete_first(reversed);
1310 : }
1311 :
1312 1112 : right = (GISTPageSplitInfo *) linitial(reversed);
1313 1112 : left = (GISTPageSplitInfo *) lsecond(reversed);
1314 :
1315 : /*
1316 : * Finally insert downlink for the remaining right page and update the
1317 : * downlink for the original page to not contain the tuples that were
1318 : * moved to the new pages.
1319 : */
1320 1112 : tuples[0] = left->downlink;
1321 1112 : tuples[1] = right->downlink;
1322 2224 : gistinserttuples(state, stack->parent, giststate,
1323 : tuples, 2,
1324 1112 : stack->downlinkoffnum,
1325 : left->buf, right->buf,
1326 : true, /* Unlock parent */
1327 : unlockbuf /* Unlock stack->buffer if caller wants that */
1328 : );
1329 1112 : Assert(left->buf == stack->buffer);
1330 1112 : }
1331 :
1332 : /*
1333 : * gistSplit -- split a page in the tree and fill struct
1334 : * used for XLOG and real writes buffers. Function is recursive, ie
1335 : * it will split page until keys will fit in every page.
1336 : */
1337 : SplitedPageLayout *
1338 1337 : gistSplit(Relation r,
1339 : Page page,
1340 : IndexTuple *itup, /* contains compressed entry */
1341 : int len,
1342 : GISTSTATE *giststate)
1343 : {
1344 : IndexTuple *lvectup,
1345 : *rvectup;
1346 : GistSplitVector v;
1347 : int i;
1348 1337 : SplitedPageLayout *res = NULL;
1349 :
1350 : /* this should never recurse very deeply, but better safe than sorry */
1351 1337 : check_stack_depth();
1352 :
1353 : /* there's no point in splitting an empty page */
1354 1337 : Assert(len > 0);
1355 :
1356 : /*
1357 : * If a single tuple doesn't fit on a page, no amount of splitting will
1358 : * help.
1359 : */
1360 1337 : if (len == 1)
1361 0 : ereport(ERROR,
1362 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1363 : errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
1364 : IndexTupleSize(itup[0]), GiSTPageSize,
1365 : RelationGetRelationName(r))));
1366 :
1367 1337 : memset(v.spl_lisnull, TRUE, sizeof(bool) * giststate->tupdesc->natts);
1368 1337 : memset(v.spl_risnull, TRUE, sizeof(bool) * giststate->tupdesc->natts);
1369 1337 : gistSplitByKey(r, page, itup, len, giststate, &v, 0);
1370 :
1371 : /* form left and right vector */
1372 1337 : lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1373 1337 : rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1374 :
1375 98671 : for (i = 0; i < v.splitVector.spl_nleft; i++)
1376 97334 : lvectup[i] = itup[v.splitVector.spl_left[i] - 1];
1377 :
1378 100316 : for (i = 0; i < v.splitVector.spl_nright; i++)
1379 98979 : rvectup[i] = itup[v.splitVector.spl_right[i] - 1];
1380 :
1381 : /* finalize splitting (may need another split) */
1382 1337 : if (!gistfitpage(rvectup, v.splitVector.spl_nright))
1383 : {
1384 0 : res = gistSplit(r, page, rvectup, v.splitVector.spl_nright, giststate);
1385 : }
1386 : else
1387 : {
1388 1337 : ROTATEDIST(res);
1389 1337 : res->block.num = v.splitVector.spl_nright;
1390 1337 : res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist));
1391 1337 : res->itup = gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false);
1392 : }
1393 :
1394 1337 : if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
1395 : {
1396 : SplitedPageLayout *resptr,
1397 : *subres;
1398 :
1399 0 : resptr = subres = gistSplit(r, page, lvectup, v.splitVector.spl_nleft, giststate);
1400 :
1401 : /* install on list's tail */
1402 0 : while (resptr->next)
1403 0 : resptr = resptr->next;
1404 :
1405 0 : resptr->next = res;
1406 0 : res = subres;
1407 : }
1408 : else
1409 : {
1410 1337 : ROTATEDIST(res);
1411 1337 : res->block.num = v.splitVector.spl_nleft;
1412 1337 : res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist));
1413 1337 : res->itup = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false);
1414 : }
1415 :
1416 1337 : return res;
1417 : }
1418 :
1419 : /*
1420 : * Create a GISTSTATE and fill it with information about the index
1421 : */
1422 : GISTSTATE *
1423 165 : initGISTstate(Relation index)
1424 : {
1425 : GISTSTATE *giststate;
1426 : MemoryContext scanCxt;
1427 : MemoryContext oldCxt;
1428 : int i;
1429 :
1430 : /* safety check to protect fixed-size arrays in GISTSTATE */
1431 165 : if (index->rd_att->natts > INDEX_MAX_KEYS)
1432 0 : elog(ERROR, "numberOfAttributes %d > %d",
1433 : index->rd_att->natts, INDEX_MAX_KEYS);
1434 :
1435 : /* Create the memory context that will hold the GISTSTATE */
1436 165 : scanCxt = AllocSetContextCreate(CurrentMemoryContext,
1437 : "GiST scan context",
1438 : ALLOCSET_DEFAULT_SIZES);
1439 165 : oldCxt = MemoryContextSwitchTo(scanCxt);
1440 :
1441 : /* Create and fill in the GISTSTATE */
1442 165 : giststate = (GISTSTATE *) palloc(sizeof(GISTSTATE));
1443 :
1444 165 : giststate->scanCxt = scanCxt;
1445 165 : giststate->tempCxt = scanCxt; /* caller must change this if needed */
1446 165 : giststate->tupdesc = index->rd_att;
1447 :
1448 368 : for (i = 0; i < index->rd_att->natts; i++)
1449 : {
1450 203 : fmgr_info_copy(&(giststate->consistentFn[i]),
1451 : index_getprocinfo(index, i + 1, GIST_CONSISTENT_PROC),
1452 : scanCxt);
1453 203 : fmgr_info_copy(&(giststate->unionFn[i]),
1454 : index_getprocinfo(index, i + 1, GIST_UNION_PROC),
1455 : scanCxt);
1456 203 : fmgr_info_copy(&(giststate->compressFn[i]),
1457 : index_getprocinfo(index, i + 1, GIST_COMPRESS_PROC),
1458 : scanCxt);
1459 203 : fmgr_info_copy(&(giststate->decompressFn[i]),
1460 : index_getprocinfo(index, i + 1, GIST_DECOMPRESS_PROC),
1461 : scanCxt);
1462 203 : fmgr_info_copy(&(giststate->penaltyFn[i]),
1463 : index_getprocinfo(index, i + 1, GIST_PENALTY_PROC),
1464 : scanCxt);
1465 203 : fmgr_info_copy(&(giststate->picksplitFn[i]),
1466 : index_getprocinfo(index, i + 1, GIST_PICKSPLIT_PROC),
1467 : scanCxt);
1468 203 : fmgr_info_copy(&(giststate->equalFn[i]),
1469 : index_getprocinfo(index, i + 1, GIST_EQUAL_PROC),
1470 : scanCxt);
1471 : /* opclasses are not required to provide a Distance method */
1472 203 : if (OidIsValid(index_getprocid(index, i + 1, GIST_DISTANCE_PROC)))
1473 80 : fmgr_info_copy(&(giststate->distanceFn[i]),
1474 : index_getprocinfo(index, i + 1, GIST_DISTANCE_PROC),
1475 : scanCxt);
1476 : else
1477 123 : giststate->distanceFn[i].fn_oid = InvalidOid;
1478 :
1479 : /* opclasses are not required to provide a Fetch method */
1480 203 : if (OidIsValid(index_getprocid(index, i + 1, GIST_FETCH_PROC)))
1481 137 : fmgr_info_copy(&(giststate->fetchFn[i]),
1482 : index_getprocinfo(index, i + 1, GIST_FETCH_PROC),
1483 : scanCxt);
1484 : else
1485 66 : giststate->fetchFn[i].fn_oid = InvalidOid;
1486 :
1487 : /*
1488 : * If the index column has a specified collation, we should honor that
1489 : * while doing comparisons. However, we may have a collatable storage
1490 : * type for a noncollatable indexed data type. If there's no index
1491 : * collation then specify default collation in case the support
1492 : * functions need collation. This is harmless if the support
1493 : * functions don't care about collation, so we just do it
1494 : * unconditionally. (We could alternatively call get_typcollation,
1495 : * but that seems like expensive overkill --- there aren't going to be
1496 : * any cases where a GiST storage type has a nondefault collation.)
1497 : */
1498 203 : if (OidIsValid(index->rd_indcollation[i]))
1499 0 : giststate->supportCollation[i] = index->rd_indcollation[i];
1500 : else
1501 203 : giststate->supportCollation[i] = DEFAULT_COLLATION_OID;
1502 : }
1503 :
1504 165 : MemoryContextSwitchTo(oldCxt);
1505 :
1506 165 : return giststate;
1507 : }
1508 :
1509 : void
1510 131 : freeGISTstate(GISTSTATE *giststate)
1511 : {
1512 : /* It's sufficient to delete the scanCxt */
1513 131 : MemoryContextDelete(giststate->scanCxt);
1514 131 : }
1515 :
1516 : /*
1517 : * gistvacuumpage() -- try to remove LP_DEAD items from the given page.
1518 : * Function assumes that buffer is exclusively locked.
1519 : */
1520 : static void
1521 0 : gistvacuumpage(Relation rel, Page page, Buffer buffer)
1522 : {
1523 : OffsetNumber deletable[MaxIndexTuplesPerPage];
1524 0 : int ndeletable = 0;
1525 : OffsetNumber offnum,
1526 : maxoff;
1527 :
1528 0 : Assert(GistPageIsLeaf(page));
1529 :
1530 : /*
1531 : * Scan over all items to see which ones need to be deleted according to
1532 : * LP_DEAD flags.
1533 : */
1534 0 : maxoff = PageGetMaxOffsetNumber(page);
1535 0 : for (offnum = FirstOffsetNumber;
1536 : offnum <= maxoff;
1537 0 : offnum = OffsetNumberNext(offnum))
1538 : {
1539 0 : ItemId itemId = PageGetItemId(page, offnum);
1540 :
1541 0 : if (ItemIdIsDead(itemId))
1542 0 : deletable[ndeletable++] = offnum;
1543 : }
1544 :
1545 0 : if (ndeletable > 0)
1546 : {
1547 0 : START_CRIT_SECTION();
1548 :
1549 0 : PageIndexMultiDelete(page, deletable, ndeletable);
1550 :
1551 : /*
1552 : * Mark the page as not containing any LP_DEAD items. This is not
1553 : * certainly true (there might be some that have recently been marked,
1554 : * but weren't included in our target-item list), but it will almost
1555 : * always be true and it doesn't seem worth an additional page scan to
1556 : * check it. Remember that F_HAS_GARBAGE is only a hint anyway.
1557 : */
1558 0 : GistClearPageHasGarbage(page);
1559 :
1560 0 : MarkBufferDirty(buffer);
1561 :
1562 : /* XLOG stuff */
1563 0 : if (RelationNeedsWAL(rel))
1564 : {
1565 : XLogRecPtr recptr;
1566 :
1567 0 : recptr = gistXLogUpdate(buffer,
1568 : deletable, ndeletable,
1569 : NULL, 0, InvalidBuffer);
1570 :
1571 0 : PageSetLSN(page, recptr);
1572 : }
1573 : else
1574 0 : PageSetLSN(page, gistGetFakeLSN(rel));
1575 :
1576 0 : END_CRIT_SECTION();
1577 : }
1578 :
1579 : /*
1580 : * Note: if we didn't find any LP_DEAD items, then the page's
1581 : * F_HAS_GARBAGE hint bit is falsely set. We do not bother expending a
1582 : * separate write to clear it, however. We will clear it when we split
1583 : * the page.
1584 : */
1585 0 : }
|