Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * spgutils.c
4 : * various support functions for SP-GiST
5 : *
6 : *
7 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994, Regents of the University of California
9 : *
10 : * IDENTIFICATION
11 : * src/backend/access/spgist/spgutils.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : #include "postgres.h"
17 :
18 : #include "access/reloptions.h"
19 : #include "access/spgist_private.h"
20 : #include "access/transam.h"
21 : #include "access/xact.h"
22 : #include "storage/bufmgr.h"
23 : #include "storage/indexfsm.h"
24 : #include "storage/lmgr.h"
25 : #include "utils/builtins.h"
26 : #include "utils/index_selfuncs.h"
27 : #include "utils/lsyscache.h"
28 :
29 :
30 : /*
31 : * SP-GiST handler function: return IndexAmRoutine with access method parameters
32 : * and callbacks.
33 : */
34 : Datum
35 85 : spghandler(PG_FUNCTION_ARGS)
36 : {
37 85 : IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
38 :
39 85 : amroutine->amstrategies = 0;
40 85 : amroutine->amsupport = SPGISTNProc;
41 85 : amroutine->amcanorder = false;
42 85 : amroutine->amcanorderbyop = false;
43 85 : amroutine->amcanbackward = false;
44 85 : amroutine->amcanunique = false;
45 85 : amroutine->amcanmulticol = false;
46 85 : amroutine->amoptionalkey = true;
47 85 : amroutine->amsearcharray = false;
48 85 : amroutine->amsearchnulls = true;
49 85 : amroutine->amstorage = false;
50 85 : amroutine->amclusterable = false;
51 85 : amroutine->ampredlocks = false;
52 85 : amroutine->amcanparallel = false;
53 85 : amroutine->amkeytype = InvalidOid;
54 :
55 85 : amroutine->ambuild = spgbuild;
56 85 : amroutine->ambuildempty = spgbuildempty;
57 85 : amroutine->aminsert = spginsert;
58 85 : amroutine->ambulkdelete = spgbulkdelete;
59 85 : amroutine->amvacuumcleanup = spgvacuumcleanup;
60 85 : amroutine->amcanreturn = spgcanreturn;
61 85 : amroutine->amcostestimate = spgcostestimate;
62 85 : amroutine->amoptions = spgoptions;
63 85 : amroutine->amproperty = NULL;
64 85 : amroutine->amvalidate = spgvalidate;
65 85 : amroutine->ambeginscan = spgbeginscan;
66 85 : amroutine->amrescan = spgrescan;
67 85 : amroutine->amgettuple = spggettuple;
68 85 : amroutine->amgetbitmap = spggetbitmap;
69 85 : amroutine->amendscan = spgendscan;
70 85 : amroutine->ammarkpos = NULL;
71 85 : amroutine->amrestrpos = NULL;
72 85 : amroutine->amestimateparallelscan = NULL;
73 85 : amroutine->aminitparallelscan = NULL;
74 85 : amroutine->amparallelrescan = NULL;
75 :
76 85 : PG_RETURN_POINTER(amroutine);
77 : }
78 :
79 : /* Fill in a SpGistTypeDesc struct with info about the specified data type */
80 : static void
81 90 : fillTypeDesc(SpGistTypeDesc *desc, Oid type)
82 : {
83 90 : desc->type = type;
84 90 : get_typlenbyval(type, &desc->attlen, &desc->attbyval);
85 90 : }
86 :
87 : /*
88 : * Fetch local cache of AM-specific info about the index, initializing it
89 : * if necessary
90 : */
91 : SpGistCache *
92 282900 : spgGetCache(Relation index)
93 : {
94 : SpGistCache *cache;
95 :
96 282900 : if (index->rd_amcache == NULL)
97 : {
98 : Oid atttype;
99 : spgConfigIn in;
100 : FmgrInfo *procinfo;
101 : Buffer metabuffer;
102 : SpGistMetaPageData *metadata;
103 :
104 30 : cache = MemoryContextAllocZero(index->rd_indexcxt,
105 : sizeof(SpGistCache));
106 :
107 : /* SPGiST doesn't support multi-column indexes */
108 30 : Assert(index->rd_att->natts == 1);
109 :
110 : /*
111 : * Get the actual data type of the indexed column from the index
112 : * tupdesc. We pass this to the opclass config function so that
113 : * polymorphic opclasses are possible.
114 : */
115 30 : atttype = TupleDescAttr(index->rd_att, 0)->atttypid;
116 :
117 : /* Call the config function to get config info for the opclass */
118 30 : in.attType = atttype;
119 :
120 30 : procinfo = index_getprocinfo(index, 1, SPGIST_CONFIG_PROC);
121 60 : FunctionCall2Coll(procinfo,
122 30 : index->rd_indcollation[0],
123 : PointerGetDatum(&in),
124 30 : PointerGetDatum(&cache->config));
125 :
126 : /* Get the information we need about each relevant datatype */
127 30 : fillTypeDesc(&cache->attType, atttype);
128 30 : fillTypeDesc(&cache->attPrefixType, cache->config.prefixType);
129 30 : fillTypeDesc(&cache->attLabelType, cache->config.labelType);
130 :
131 : /* Last, get the lastUsedPages data from the metapage */
132 30 : metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO);
133 30 : LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
134 :
135 30 : metadata = SpGistPageGetMeta(BufferGetPage(metabuffer));
136 :
137 30 : if (metadata->magicNumber != SPGIST_MAGIC_NUMBER)
138 0 : elog(ERROR, "index \"%s\" is not an SP-GiST index",
139 : RelationGetRelationName(index));
140 :
141 30 : cache->lastUsedPages = metadata->lastUsedPages;
142 :
143 30 : UnlockReleaseBuffer(metabuffer);
144 :
145 30 : index->rd_amcache = (void *) cache;
146 : }
147 : else
148 : {
149 : /* assume it's up to date */
150 282870 : cache = (SpGistCache *) index->rd_amcache;
151 : }
152 :
153 282900 : return cache;
154 : }
155 :
156 : /* Initialize SpGistState for working with the given index */
157 : void
158 37454 : initSpGistState(SpGistState *state, Relation index)
159 : {
160 : SpGistCache *cache;
161 :
162 : /* Get cached static information about index */
163 37454 : cache = spgGetCache(index);
164 :
165 37454 : state->config = cache->config;
166 37454 : state->attType = cache->attType;
167 37454 : state->attPrefixType = cache->attPrefixType;
168 37454 : state->attLabelType = cache->attLabelType;
169 :
170 : /* Make workspace for constructing dead tuples */
171 37454 : state->deadTupleStorage = palloc0(SGDTSIZE);
172 :
173 : /* Set XID to use in redirection tuples */
174 37454 : state->myXid = GetTopTransactionIdIfAny();
175 :
176 : /* Assume we're not in an index build (spgbuild will override) */
177 37454 : state->isBuild = false;
178 37454 : }
179 :
180 : /*
181 : * Allocate a new page (either by recycling, or by extending the index file).
182 : *
183 : * The returned buffer is already pinned and exclusive-locked.
184 : * Caller is responsible for initializing the page by calling SpGistInitBuffer.
185 : */
186 : Buffer
187 559 : SpGistNewBuffer(Relation index)
188 : {
189 : Buffer buffer;
190 : bool needLock;
191 :
192 : /* First, try to get a page from FSM */
193 : for (;;)
194 : {
195 559 : BlockNumber blkno = GetFreeIndexPage(index);
196 :
197 559 : if (blkno == InvalidBlockNumber)
198 559 : break; /* nothing known to FSM */
199 :
200 : /*
201 : * The fixed pages shouldn't ever be listed in FSM, but just in case
202 : * one is, ignore it.
203 : */
204 0 : if (SpGistBlockIsFixed(blkno))
205 0 : continue;
206 :
207 0 : buffer = ReadBuffer(index, blkno);
208 :
209 : /*
210 : * We have to guard against the possibility that someone else already
211 : * recycled this page; the buffer may be locked if so.
212 : */
213 0 : if (ConditionalLockBuffer(buffer))
214 : {
215 0 : Page page = BufferGetPage(buffer);
216 :
217 0 : if (PageIsNew(page))
218 0 : return buffer; /* OK to use, if never initialized */
219 :
220 0 : if (SpGistPageIsDeleted(page) || PageIsEmpty(page))
221 0 : return buffer; /* OK to use */
222 :
223 0 : LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
224 : }
225 :
226 : /* Can't use it, so release buffer and try again */
227 0 : ReleaseBuffer(buffer);
228 0 : }
229 :
230 : /* Must extend the file */
231 559 : needLock = !RELATION_IS_LOCAL(index);
232 559 : if (needLock)
233 199 : LockRelationForExtension(index, ExclusiveLock);
234 :
235 559 : buffer = ReadBuffer(index, P_NEW);
236 559 : LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
237 :
238 559 : if (needLock)
239 199 : UnlockRelationForExtension(index, ExclusiveLock);
240 :
241 559 : return buffer;
242 : }
243 :
244 : /*
245 : * Update index metapage's lastUsedPages info from local cache, if possible
246 : *
247 : * Updating meta page isn't critical for index working, so
248 : * 1 use ConditionalLockBuffer to improve concurrency
249 : * 2 don't WAL-log metabuffer changes to decrease WAL traffic
250 : */
251 : void
252 37334 : SpGistUpdateMetaPage(Relation index)
253 : {
254 37334 : SpGistCache *cache = (SpGistCache *) index->rd_amcache;
255 :
256 37334 : if (cache != NULL)
257 : {
258 : Buffer metabuffer;
259 : SpGistMetaPageData *metadata;
260 :
261 37334 : metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO);
262 :
263 37334 : if (ConditionalLockBuffer(metabuffer))
264 : {
265 37334 : metadata = SpGistPageGetMeta(BufferGetPage(metabuffer));
266 37334 : metadata->lastUsedPages = cache->lastUsedPages;
267 :
268 37334 : MarkBufferDirty(metabuffer);
269 37334 : UnlockReleaseBuffer(metabuffer);
270 : }
271 : else
272 : {
273 0 : ReleaseBuffer(metabuffer);
274 : }
275 : }
276 37334 : }
277 :
278 : /* Macro to select proper element of lastUsedPages cache depending on flags */
279 : /* Masking flags with SPGIST_CACHED_PAGES is just for paranoia's sake */
280 : #define GET_LUP(c, f) (&(c)->lastUsedPages.cachedPage[((unsigned int) (f)) % SPGIST_CACHED_PAGES])
281 :
282 : /*
283 : * Allocate and initialize a new buffer of the type and parity specified by
284 : * flags. The returned buffer is already pinned and exclusive-locked.
285 : *
286 : * When requesting an inner page, if we get one with the wrong parity,
287 : * we just release the buffer and try again. We will get a different page
288 : * because GetFreeIndexPage will have marked the page used in FSM. The page
289 : * is entered in our local lastUsedPages cache, so there's some hope of
290 : * making use of it later in this session, but otherwise we rely on VACUUM
291 : * to eventually re-enter the page in FSM, making it available for recycling.
292 : * Note that such a page does not get marked dirty here, so unless it's used
293 : * fairly soon, the buffer will just get discarded and the page will remain
294 : * as it was on disk.
295 : *
296 : * When we return a buffer to the caller, the page is *not* entered into
297 : * the lastUsedPages cache; we expect the caller will do so after it's taken
298 : * whatever space it will use. This is because after the caller has used up
299 : * some space, the page might have less space than whatever was cached already
300 : * so we'd rather not trash the old cache entry.
301 : */
302 : static Buffer
303 522 : allocNewBuffer(Relation index, int flags)
304 : {
305 522 : SpGistCache *cache = spgGetCache(index);
306 522 : uint16 pageflags = 0;
307 :
308 522 : if (GBUF_REQ_LEAF(flags))
309 512 : pageflags |= SPGIST_LEAF;
310 522 : if (GBUF_REQ_NULLS(flags))
311 0 : pageflags |= SPGIST_NULLS;
312 :
313 : for (;;)
314 : {
315 : Buffer buffer;
316 :
317 529 : buffer = SpGistNewBuffer(index);
318 529 : SpGistInitBuffer(buffer, pageflags);
319 :
320 529 : if (pageflags & SPGIST_LEAF)
321 : {
322 : /* Leaf pages have no parity concerns, so just use it */
323 512 : return buffer;
324 : }
325 : else
326 : {
327 17 : BlockNumber blkno = BufferGetBlockNumber(buffer);
328 17 : int blkFlags = GBUF_INNER_PARITY(blkno);
329 :
330 17 : if ((flags & GBUF_PARITY_MASK) == blkFlags)
331 : {
332 : /* Page has right parity, use it */
333 10 : return buffer;
334 : }
335 : else
336 : {
337 : /* Page has wrong parity, record it in cache and try again */
338 7 : if (pageflags & SPGIST_NULLS)
339 0 : blkFlags |= GBUF_NULLS;
340 7 : cache->lastUsedPages.cachedPage[blkFlags].blkno = blkno;
341 7 : cache->lastUsedPages.cachedPage[blkFlags].freeSpace =
342 7 : PageGetExactFreeSpace(BufferGetPage(buffer));
343 7 : UnlockReleaseBuffer(buffer);
344 : }
345 : }
346 7 : }
347 : }
348 :
349 : /*
350 : * Get a buffer of the type and parity specified by flags, having at least
351 : * as much free space as indicated by needSpace. We use the lastUsedPages
352 : * cache to assign the same buffer previously requested when possible.
353 : * The returned buffer is already pinned and exclusive-locked.
354 : *
355 : * *isNew is set true if the page was initialized here, false if it was
356 : * already valid.
357 : */
358 : Buffer
359 980 : SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew)
360 : {
361 980 : SpGistCache *cache = spgGetCache(index);
362 : SpGistLastUsedPage *lup;
363 :
364 : /* Bail out if even an empty page wouldn't meet the demand */
365 980 : if (needSpace > SPGIST_PAGE_CAPACITY)
366 0 : elog(ERROR, "desired SPGiST tuple size is too big");
367 :
368 : /*
369 : * If possible, increase the space request to include relation's
370 : * fillfactor. This ensures that when we add unrelated tuples to a page,
371 : * we try to keep 100-fillfactor% available for adding tuples that are
372 : * related to the ones already on it. But fillfactor mustn't cause an
373 : * error for requests that would otherwise be legal.
374 : */
375 980 : needSpace += RelationGetTargetPageFreeSpace(index,
376 : SPGIST_DEFAULT_FILLFACTOR);
377 980 : needSpace = Min(needSpace, SPGIST_PAGE_CAPACITY);
378 :
379 : /* Get the cache entry for this flags setting */
380 980 : lup = GET_LUP(cache, flags);
381 :
382 : /* If we have nothing cached, just turn it over to allocNewBuffer */
383 980 : if (lup->blkno == InvalidBlockNumber)
384 : {
385 17 : *isNew = true;
386 17 : return allocNewBuffer(index, flags);
387 : }
388 :
389 : /* fixed pages should never be in cache */
390 963 : Assert(!SpGistBlockIsFixed(lup->blkno));
391 :
392 : /* If cached freeSpace isn't enough, don't bother looking at the page */
393 963 : if (lup->freeSpace >= needSpace)
394 : {
395 : Buffer buffer;
396 : Page page;
397 :
398 458 : buffer = ReadBuffer(index, lup->blkno);
399 :
400 458 : if (!ConditionalLockBuffer(buffer))
401 : {
402 : /*
403 : * buffer is locked by another process, so return a new buffer
404 : */
405 0 : ReleaseBuffer(buffer);
406 0 : *isNew = true;
407 0 : return allocNewBuffer(index, flags);
408 : }
409 :
410 458 : page = BufferGetPage(buffer);
411 :
412 458 : if (PageIsNew(page) || SpGistPageIsDeleted(page) || PageIsEmpty(page))
413 : {
414 : /* OK to initialize the page */
415 17 : uint16 pageflags = 0;
416 :
417 17 : if (GBUF_REQ_LEAF(flags))
418 16 : pageflags |= SPGIST_LEAF;
419 17 : if (GBUF_REQ_NULLS(flags))
420 0 : pageflags |= SPGIST_NULLS;
421 17 : SpGistInitBuffer(buffer, pageflags);
422 17 : lup->freeSpace = PageGetExactFreeSpace(page) - needSpace;
423 17 : *isNew = true;
424 17 : return buffer;
425 : }
426 :
427 : /*
428 : * Check that page is of right type and has enough space. We must
429 : * recheck this since our cache isn't necessarily up to date.
430 : */
431 1323 : if ((GBUF_REQ_LEAF(flags) ? SpGistPageIsLeaf(page) : !SpGistPageIsLeaf(page)) &&
432 882 : (GBUF_REQ_NULLS(flags) ? SpGistPageStoresNulls(page) : !SpGistPageStoresNulls(page)))
433 : {
434 441 : int freeSpace = PageGetExactFreeSpace(page);
435 :
436 441 : if (freeSpace >= needSpace)
437 : {
438 : /* Success, update freespace info and return the buffer */
439 441 : lup->freeSpace = freeSpace - needSpace;
440 441 : *isNew = false;
441 441 : return buffer;
442 : }
443 : }
444 :
445 : /*
446 : * fallback to allocation of new buffer
447 : */
448 0 : UnlockReleaseBuffer(buffer);
449 : }
450 :
451 : /* No success with cache, so return a new buffer */
452 505 : *isNew = true;
453 505 : return allocNewBuffer(index, flags);
454 : }
455 :
456 : /*
457 : * Update lastUsedPages cache when done modifying a page.
458 : *
459 : * We update the appropriate cache entry if it already contained this page
460 : * (its freeSpace is likely obsolete), or if this page has more space than
461 : * whatever we had cached.
462 : */
463 : void
464 243711 : SpGistSetLastUsedPage(Relation index, Buffer buffer)
465 : {
466 243711 : SpGistCache *cache = spgGetCache(index);
467 : SpGistLastUsedPage *lup;
468 : int freeSpace;
469 243711 : Page page = BufferGetPage(buffer);
470 243711 : BlockNumber blkno = BufferGetBlockNumber(buffer);
471 : int flags;
472 :
473 : /* Never enter fixed pages (root pages) in cache, though */
474 243711 : if (SpGistBlockIsFixed(blkno))
475 326556 : return;
476 :
477 160866 : if (SpGistPageIsLeaf(page))
478 82373 : flags = GBUF_LEAF;
479 : else
480 78493 : flags = GBUF_INNER_PARITY(blkno);
481 160866 : if (SpGistPageStoresNulls(page))
482 0 : flags |= GBUF_NULLS;
483 :
484 160866 : lup = GET_LUP(cache, flags);
485 :
486 160866 : freeSpace = PageGetExactFreeSpace(page);
487 209380 : if (lup->blkno == InvalidBlockNumber || lup->blkno == blkno ||
488 48514 : lup->freeSpace < freeSpace)
489 : {
490 112974 : lup->blkno = blkno;
491 112974 : lup->freeSpace = freeSpace;
492 : }
493 : }
494 :
495 : /*
496 : * Initialize an SPGiST page to empty, with specified flags
497 : */
498 : void
499 617 : SpGistInitPage(Page page, uint16 f)
500 : {
501 : SpGistPageOpaque opaque;
502 :
503 617 : PageInit(page, BLCKSZ, MAXALIGN(sizeof(SpGistPageOpaqueData)));
504 617 : opaque = SpGistPageGetOpaque(page);
505 617 : memset(opaque, 0, sizeof(SpGistPageOpaqueData));
506 617 : opaque->flags = f;
507 617 : opaque->spgist_page_id = SPGIST_PAGE_ID;
508 617 : }
509 :
510 : /*
511 : * Initialize a buffer's page to empty, with specified flags
512 : */
513 : void
514 607 : SpGistInitBuffer(Buffer b, uint16 f)
515 : {
516 607 : Assert(BufferGetPageSize(b) == BLCKSZ);
517 607 : SpGistInitPage(BufferGetPage(b), f);
518 607 : }
519 :
520 : /*
521 : * Initialize metadata page
522 : */
523 : void
524 10 : SpGistInitMetapage(Page page)
525 : {
526 : SpGistMetaPageData *metadata;
527 : int i;
528 :
529 10 : SpGistInitPage(page, SPGIST_META);
530 10 : metadata = SpGistPageGetMeta(page);
531 10 : memset(metadata, 0, sizeof(SpGistMetaPageData));
532 10 : metadata->magicNumber = SPGIST_MAGIC_NUMBER;
533 :
534 : /* initialize last-used-page cache to empty */
535 90 : for (i = 0; i < SPGIST_CACHED_PAGES; i++)
536 80 : metadata->lastUsedPages.cachedPage[i].blkno = InvalidBlockNumber;
537 10 : }
538 :
539 : /*
540 : * reloptions processing for SPGiST
541 : */
542 : bytea *
543 14 : spgoptions(Datum reloptions, bool validate)
544 : {
545 14 : return default_reloptions(reloptions, validate, RELOPT_KIND_SPGIST);
546 : }
547 :
548 : /*
549 : * Get the space needed to store a non-null datum of the indicated type.
550 : * Note the result is already rounded up to a MAXALIGN boundary.
551 : * Also, we follow the SPGiST convention that pass-by-val types are
552 : * just stored in their Datum representation (compare memcpyDatum).
553 : */
554 : unsigned int
555 1756608 : SpGistGetTypeSize(SpGistTypeDesc *att, Datum datum)
556 : {
557 : unsigned int size;
558 :
559 1756608 : if (att->attbyval)
560 812 : size = sizeof(Datum);
561 1755796 : else if (att->attlen > 0)
562 1525933 : size = att->attlen;
563 : else
564 229863 : size = VARSIZE_ANY(datum);
565 :
566 1756608 : return MAXALIGN(size);
567 : }
568 :
569 : /*
570 : * Copy the given non-null datum to *target
571 : */
572 : static void
573 160635 : memcpyDatum(void *target, SpGistTypeDesc *att, Datum datum)
574 : {
575 : unsigned int size;
576 :
577 160635 : if (att->attbyval)
578 : {
579 812 : memcpy(target, &datum, sizeof(Datum));
580 : }
581 : else
582 : {
583 159823 : size = (att->attlen > 0) ? att->attlen : VARSIZE_ANY(datum);
584 159823 : memcpy(target, DatumGetPointer(datum), size);
585 : }
586 160635 : }
587 :
588 : /*
589 : * Construct a leaf tuple containing the given heap TID and datum value
590 : */
591 : SpGistLeafTuple
592 159157 : spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr,
593 : Datum datum, bool isnull)
594 : {
595 : SpGistLeafTuple tup;
596 : unsigned int size;
597 :
598 : /* compute space needed (note result is already maxaligned) */
599 159157 : size = SGLTHDRSZ;
600 159157 : if (!isnull)
601 159148 : size += SpGistGetTypeSize(&state->attType, datum);
602 :
603 : /*
604 : * Ensure that we can replace the tuple with a dead tuple later. This
605 : * test is unnecessary when !isnull, but let's be safe.
606 : */
607 159157 : if (size < SGDTSIZE)
608 9 : size = SGDTSIZE;
609 :
610 : /* OK, form the tuple */
611 159157 : tup = (SpGistLeafTuple) palloc0(size);
612 :
613 159157 : tup->size = size;
614 159157 : tup->nextOffset = InvalidOffsetNumber;
615 159157 : tup->heapPtr = *heapPtr;
616 159157 : if (!isnull)
617 159148 : memcpyDatum(SGLTDATAPTR(tup), &state->attType, datum);
618 :
619 159157 : return tup;
620 : }
621 :
622 : /*
623 : * Construct a node (to go into an inner tuple) containing the given label
624 : *
625 : * Note that the node's downlink is just set invalid here. Caller will fill
626 : * it in later.
627 : */
628 : SpGistNodeTuple
629 3680 : spgFormNodeTuple(SpGistState *state, Datum label, bool isnull)
630 : {
631 : SpGistNodeTuple tup;
632 : unsigned int size;
633 3680 : unsigned short infomask = 0;
634 :
635 : /* compute space needed (note result is already maxaligned) */
636 3680 : size = SGNTHDRSZ;
637 3680 : if (!isnull)
638 812 : size += SpGistGetTypeSize(&state->attLabelType, label);
639 :
640 : /*
641 : * Here we make sure that the size will fit in the field reserved for it
642 : * in t_info.
643 : */
644 3680 : if ((size & INDEX_SIZE_MASK) != size)
645 0 : ereport(ERROR,
646 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
647 : errmsg("index row requires %zu bytes, maximum size is %zu",
648 : (Size) size, (Size) INDEX_SIZE_MASK)));
649 :
650 3680 : tup = (SpGistNodeTuple) palloc0(size);
651 :
652 3680 : if (isnull)
653 2868 : infomask |= INDEX_NULL_MASK;
654 : /* we don't bother setting the INDEX_VAR_MASK bit */
655 3680 : infomask |= size;
656 3680 : tup->t_info = infomask;
657 :
658 : /* The TID field will be filled in later */
659 3680 : ItemPointerSetInvalid(&tup->t_tid);
660 :
661 3680 : if (!isnull)
662 812 : memcpyDatum(SGNTDATAPTR(tup), &state->attLabelType, label);
663 :
664 3680 : return tup;
665 : }
666 :
667 : /*
668 : * Construct an inner tuple containing the given prefix and node array
669 : */
670 : SpGistInnerTuple
671 954 : spgFormInnerTuple(SpGistState *state, bool hasPrefix, Datum prefix,
672 : int nNodes, SpGistNodeTuple *nodes)
673 : {
674 : SpGistInnerTuple tup;
675 : unsigned int size;
676 : unsigned int prefixSize;
677 : int i;
678 : char *ptr;
679 :
680 : /* Compute size needed */
681 954 : if (hasPrefix)
682 675 : prefixSize = SpGistGetTypeSize(&state->attPrefixType, prefix);
683 : else
684 279 : prefixSize = 0;
685 :
686 954 : size = SGITHDRSZ + prefixSize;
687 :
688 : /* Note: we rely on node tuple sizes to be maxaligned already */
689 5970 : for (i = 0; i < nNodes; i++)
690 5016 : size += IndexTupleSize(nodes[i]);
691 :
692 : /*
693 : * Ensure that we can replace the tuple with a dead tuple later. This
694 : * test is unnecessary given current tuple layouts, but let's be safe.
695 : */
696 954 : if (size < SGDTSIZE)
697 0 : size = SGDTSIZE;
698 :
699 : /*
700 : * Inner tuple should be small enough to fit on a page
701 : */
702 954 : if (size > SPGIST_PAGE_CAPACITY - sizeof(ItemIdData))
703 0 : ereport(ERROR,
704 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
705 : errmsg("SP-GiST inner tuple size %zu exceeds maximum %zu",
706 : (Size) size,
707 : SPGIST_PAGE_CAPACITY - sizeof(ItemIdData)),
708 : errhint("Values larger than a buffer page cannot be indexed.")));
709 :
710 : /*
711 : * Check for overflow of header fields --- probably can't fail if the
712 : * above succeeded, but let's be paranoid
713 : */
714 954 : if (size > SGITMAXSIZE ||
715 954 : prefixSize > SGITMAXPREFIXSIZE ||
716 : nNodes > SGITMAXNNODES)
717 0 : elog(ERROR, "SPGiST inner tuple header field is too small");
718 :
719 : /* OK, form the tuple */
720 954 : tup = (SpGistInnerTuple) palloc0(size);
721 :
722 954 : tup->nNodes = nNodes;
723 954 : tup->prefixSize = prefixSize;
724 954 : tup->size = size;
725 :
726 954 : if (hasPrefix)
727 675 : memcpyDatum(SGITDATAPTR(tup), &state->attPrefixType, prefix);
728 :
729 954 : ptr = (char *) SGITNODEPTR(tup);
730 :
731 5970 : for (i = 0; i < nNodes; i++)
732 : {
733 5016 : SpGistNodeTuple node = nodes[i];
734 :
735 5016 : memcpy(ptr, node, IndexTupleSize(node));
736 5016 : ptr += IndexTupleSize(node);
737 : }
738 :
739 954 : return tup;
740 : }
741 :
742 : /*
743 : * Construct a "dead" tuple to replace a tuple being deleted.
744 : *
745 : * The state can be SPGIST_REDIRECT, SPGIST_DEAD, or SPGIST_PLACEHOLDER.
746 : * For a REDIRECT tuple, a pointer (blkno+offset) must be supplied, and
747 : * the xid field is filled in automatically.
748 : *
749 : * This is called in critical sections, so we don't use palloc; the tuple
750 : * is built in preallocated storage. It should be copied before another
751 : * call with different parameters can occur.
752 : */
753 : SpGistDeadTuple
754 1311 : spgFormDeadTuple(SpGistState *state, int tupstate,
755 : BlockNumber blkno, OffsetNumber offnum)
756 : {
757 1311 : SpGistDeadTuple tuple = (SpGistDeadTuple) state->deadTupleStorage;
758 :
759 1311 : tuple->tupstate = tupstate;
760 1311 : tuple->size = SGDTSIZE;
761 1311 : tuple->nextOffset = InvalidOffsetNumber;
762 :
763 1311 : if (tupstate == SPGIST_REDIRECT)
764 : {
765 243 : ItemPointerSet(&tuple->pointer, blkno, offnum);
766 243 : Assert(TransactionIdIsValid(state->myXid));
767 243 : tuple->xid = state->myXid;
768 : }
769 : else
770 : {
771 1068 : ItemPointerSetInvalid(&tuple->pointer);
772 1068 : tuple->xid = InvalidTransactionId;
773 : }
774 :
775 1311 : return tuple;
776 : }
777 :
778 : /*
779 : * Extract the label datums of the nodes within innerTuple
780 : *
781 : * Returns NULL if label datums are NULLs
782 : */
783 : Datum *
784 1516161 : spgExtractNodeLabels(SpGistState *state, SpGistInnerTuple innerTuple)
785 : {
786 : Datum *nodeLabels;
787 : int i;
788 : SpGistNodeTuple node;
789 :
790 : /* Either all the labels must be NULL, or none. */
791 1516161 : node = SGITNODEPTR(innerTuple);
792 1516161 : if (IndexTupleHasNulls(node))
793 : {
794 7939522 : SGITITERATE(innerTuple, i, node)
795 : {
796 6456673 : if (!IndexTupleHasNulls(node))
797 0 : elog(ERROR, "some but not all node labels are null in SPGiST inner tuple");
798 : }
799 : /* They're all null, so just return NULL */
800 1482849 : return NULL;
801 : }
802 : else
803 : {
804 33312 : nodeLabels = (Datum *) palloc(sizeof(Datum) * innerTuple->nNodes);
805 361620 : SGITITERATE(innerTuple, i, node)
806 : {
807 328308 : if (IndexTupleHasNulls(node))
808 0 : elog(ERROR, "some but not all node labels are null in SPGiST inner tuple");
809 328308 : nodeLabels[i] = SGNTDATUM(node, state);
810 : }
811 33312 : return nodeLabels;
812 : }
813 : }
814 :
815 : /*
816 : * Add a new item to the page, replacing a PLACEHOLDER item if possible.
817 : * Return the location it's inserted at, or InvalidOffsetNumber on failure.
818 : *
819 : * If startOffset isn't NULL, we start searching for placeholders at
820 : * *startOffset, and update that to the next place to search. This is just
821 : * an optimization for repeated insertions.
822 : *
823 : * If errorOK is false, we throw error when there's not enough room,
824 : * rather than returning InvalidOffsetNumber.
825 : */
826 : OffsetNumber
827 164389 : SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size,
828 : OffsetNumber *startOffset, bool errorOK)
829 : {
830 164389 : SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
831 : OffsetNumber i,
832 : maxoff,
833 : offnum;
834 :
835 205029 : if (opaque->nPlaceholder > 0 &&
836 40640 : PageGetExactFreeSpace(page) + SGDTSIZE >= MAXALIGN(size))
837 : {
838 : /* Try to replace a placeholder */
839 40640 : maxoff = PageGetMaxOffsetNumber(page);
840 40640 : offnum = InvalidOffsetNumber;
841 :
842 : for (;;)
843 : {
844 40640 : if (startOffset && *startOffset != InvalidOffsetNumber)
845 8233 : i = *startOffset;
846 : else
847 32407 : i = FirstOffsetNumber;
848 3668380 : for (; i <= maxoff; i++)
849 : {
850 3668380 : SpGistDeadTuple it = (SpGistDeadTuple) PageGetItem(page,
851 : PageGetItemId(page, i));
852 :
853 3668380 : if (it->tupstate == SPGIST_PLACEHOLDER)
854 : {
855 40640 : offnum = i;
856 40640 : break;
857 : }
858 : }
859 :
860 : /* Done if we found a placeholder */
861 40640 : if (offnum != InvalidOffsetNumber)
862 40640 : break;
863 :
864 0 : if (startOffset && *startOffset != InvalidOffsetNumber)
865 : {
866 : /* Hint was no good, re-search from beginning */
867 0 : *startOffset = InvalidOffsetNumber;
868 0 : continue;
869 : }
870 :
871 : /* Hmm, no placeholder found? */
872 0 : opaque->nPlaceholder = 0;
873 0 : break;
874 0 : }
875 :
876 40640 : if (offnum != InvalidOffsetNumber)
877 : {
878 : /* Replace the placeholder tuple */
879 40640 : PageIndexTupleDelete(page, offnum);
880 :
881 40640 : offnum = PageAddItem(page, item, size, offnum, false, false);
882 :
883 : /*
884 : * We should not have failed given the size check at the top of
885 : * the function, but test anyway. If we did fail, we must PANIC
886 : * because we've already deleted the placeholder tuple, and
887 : * there's no other way to keep the damage from getting to disk.
888 : */
889 40640 : if (offnum != InvalidOffsetNumber)
890 : {
891 40640 : Assert(opaque->nPlaceholder > 0);
892 40640 : opaque->nPlaceholder--;
893 40640 : if (startOffset)
894 8425 : *startOffset = offnum + 1;
895 : }
896 : else
897 0 : elog(PANIC, "failed to add item of size %u to SPGiST index page",
898 : (int) size);
899 :
900 40640 : return offnum;
901 : }
902 : }
903 :
904 : /* No luck in replacing a placeholder, so just add it to the page */
905 123749 : offnum = PageAddItem(page, item, size,
906 : InvalidOffsetNumber, false, false);
907 :
908 123749 : if (offnum == InvalidOffsetNumber && !errorOK)
909 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
910 : (int) size);
911 :
912 123749 : return offnum;
913 : }
|