Line data Source code
1 : /*
2 : * brin.c
3 : * Implementation of BRIN indexes for Postgres
4 : *
5 : * See src/backend/access/brin/README for details.
6 : *
7 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994, Regents of the University of California
9 : *
10 : * IDENTIFICATION
11 : * src/backend/access/brin/brin.c
12 : *
13 : * TODO
14 : * * ScalarArrayOpExpr (amsearcharray -> SK_SEARCHARRAY)
15 : */
16 : #include "postgres.h"
17 :
18 : #include "access/brin.h"
19 : #include "access/brin_page.h"
20 : #include "access/brin_pageops.h"
21 : #include "access/brin_xlog.h"
22 : #include "access/reloptions.h"
23 : #include "access/relscan.h"
24 : #include "access/xloginsert.h"
25 : #include "catalog/index.h"
26 : #include "catalog/pg_am.h"
27 : #include "miscadmin.h"
28 : #include "pgstat.h"
29 : #include "postmaster/autovacuum.h"
30 : #include "storage/bufmgr.h"
31 : #include "storage/freespace.h"
32 : #include "utils/builtins.h"
33 : #include "utils/index_selfuncs.h"
34 : #include "utils/memutils.h"
35 : #include "utils/rel.h"
36 :
37 :
38 : /*
39 : * We use a BrinBuildState during initial construction of a BRIN index.
40 : * The running state is kept in a BrinMemTuple.
41 : */
42 : typedef struct BrinBuildState
43 : {
44 : Relation bs_irel;
45 : int bs_numtuples;
46 : Buffer bs_currentInsertBuf;
47 : BlockNumber bs_pagesPerRange;
48 : BlockNumber bs_currRangeStart;
49 : BrinRevmap *bs_rmAccess;
50 : BrinDesc *bs_bdesc;
51 : BrinMemTuple *bs_dtuple;
52 : } BrinBuildState;
53 :
54 : /*
55 : * Struct used as "opaque" during index scans
56 : */
57 : typedef struct BrinOpaque
58 : {
59 : BlockNumber bo_pagesPerRange;
60 : BrinRevmap *bo_rmAccess;
61 : BrinDesc *bo_bdesc;
62 : } BrinOpaque;
63 :
64 : #define BRIN_ALL_BLOCKRANGES InvalidBlockNumber
65 :
66 : static BrinBuildState *initialize_brin_buildstate(Relation idxRel,
67 : BrinRevmap *revmap, BlockNumber pagesPerRange);
68 : static void terminate_brin_buildstate(BrinBuildState *state);
69 : static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
70 : double *numSummarized, double *numExisting);
71 : static void form_and_insert_tuple(BrinBuildState *state);
72 : static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
73 : BrinTuple *b);
74 : static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
75 :
76 :
77 : /*
78 : * BRIN handler function: return IndexAmRoutine with access method parameters
79 : * and callbacks.
80 : */
81 : Datum
82 70 : brinhandler(PG_FUNCTION_ARGS)
83 : {
84 70 : IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
85 :
86 70 : amroutine->amstrategies = 0;
87 70 : amroutine->amsupport = BRIN_LAST_OPTIONAL_PROCNUM;
88 70 : amroutine->amcanorder = false;
89 70 : amroutine->amcanorderbyop = false;
90 70 : amroutine->amcanbackward = false;
91 70 : amroutine->amcanunique = false;
92 70 : amroutine->amcanmulticol = true;
93 70 : amroutine->amoptionalkey = true;
94 70 : amroutine->amsearcharray = false;
95 70 : amroutine->amsearchnulls = true;
96 70 : amroutine->amstorage = true;
97 70 : amroutine->amclusterable = false;
98 70 : amroutine->ampredlocks = false;
99 70 : amroutine->amcanparallel = false;
100 70 : amroutine->amkeytype = InvalidOid;
101 :
102 70 : amroutine->ambuild = brinbuild;
103 70 : amroutine->ambuildempty = brinbuildempty;
104 70 : amroutine->aminsert = brininsert;
105 70 : amroutine->ambulkdelete = brinbulkdelete;
106 70 : amroutine->amvacuumcleanup = brinvacuumcleanup;
107 70 : amroutine->amcanreturn = NULL;
108 70 : amroutine->amcostestimate = brincostestimate;
109 70 : amroutine->amoptions = brinoptions;
110 70 : amroutine->amproperty = NULL;
111 70 : amroutine->amvalidate = brinvalidate;
112 70 : amroutine->ambeginscan = brinbeginscan;
113 70 : amroutine->amrescan = brinrescan;
114 70 : amroutine->amgettuple = NULL;
115 70 : amroutine->amgetbitmap = bringetbitmap;
116 70 : amroutine->amendscan = brinendscan;
117 70 : amroutine->ammarkpos = NULL;
118 70 : amroutine->amrestrpos = NULL;
119 70 : amroutine->amestimateparallelscan = NULL;
120 70 : amroutine->aminitparallelscan = NULL;
121 70 : amroutine->amparallelrescan = NULL;
122 :
123 70 : PG_RETURN_POINTER(amroutine);
124 : }
125 :
126 : /*
127 : * A tuple in the heap is being inserted. To keep a brin index up to date,
128 : * we need to obtain the relevant index tuple and compare its stored values
129 : * with those of the new tuple. If the tuple values are not consistent with
130 : * the summary tuple, we need to update the index tuple.
131 : *
132 : * If autosummarization is enabled, check if we need to summarize the previous
133 : * page range.
134 : *
135 : * If the range is not currently summarized (i.e. the revmap returns NULL for
136 : * it), there's nothing to do for this tuple.
137 : */
138 : bool
139 264 : brininsert(Relation idxRel, Datum *values, bool *nulls,
140 : ItemPointer heaptid, Relation heapRel,
141 : IndexUniqueCheck checkUnique,
142 : IndexInfo *indexInfo)
143 : {
144 : BlockNumber pagesPerRange;
145 : BlockNumber origHeapBlk;
146 : BlockNumber heapBlk;
147 264 : BrinDesc *bdesc = (BrinDesc *) indexInfo->ii_AmCache;
148 : BrinRevmap *revmap;
149 264 : Buffer buf = InvalidBuffer;
150 264 : MemoryContext tupcxt = NULL;
151 264 : MemoryContext oldcxt = CurrentMemoryContext;
152 264 : bool autosummarize = BrinGetAutoSummarize(idxRel);
153 :
154 264 : revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL);
155 :
156 : /*
157 : * origHeapBlk is the block number where the insertion occurred. heapBlk
158 : * is the first block in the corresponding page range.
159 : */
160 264 : origHeapBlk = ItemPointerGetBlockNumber(heaptid);
161 264 : heapBlk = (origHeapBlk / pagesPerRange) * pagesPerRange;
162 :
163 : for (;;)
164 : {
165 264 : bool need_insert = false;
166 : OffsetNumber off;
167 : BrinTuple *brtup;
168 : BrinMemTuple *dtup;
169 : int keyno;
170 :
171 264 : CHECK_FOR_INTERRUPTS();
172 :
173 : /*
174 : * If auto-summarization is enabled and we just inserted the first
175 : * tuple into the first block of a new non-first page range, request a
176 : * summarization run of the previous range.
177 : */
178 264 : if (autosummarize &&
179 0 : heapBlk > 0 &&
180 0 : heapBlk == origHeapBlk &&
181 0 : ItemPointerGetOffsetNumber(heaptid) == FirstOffsetNumber)
182 : {
183 0 : BlockNumber lastPageRange = heapBlk - 1;
184 : BrinTuple *lastPageTuple;
185 :
186 0 : lastPageTuple =
187 : brinGetTupleForHeapBlock(revmap, lastPageRange, &buf, &off,
188 : NULL, BUFFER_LOCK_SHARE, NULL);
189 0 : if (!lastPageTuple)
190 0 : AutoVacuumRequestWork(AVW_BRINSummarizeRange,
191 : RelationGetRelid(idxRel),
192 : lastPageRange);
193 : else
194 0 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
195 : }
196 :
197 264 : brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off,
198 : NULL, BUFFER_LOCK_SHARE, NULL);
199 :
200 : /* if range is unsummarized, there's nothing to do */
201 264 : if (!brtup)
202 270 : break;
203 :
204 : /* First time through in this statement? */
205 258 : if (bdesc == NULL)
206 : {
207 50 : MemoryContextSwitchTo(indexInfo->ii_Context);
208 50 : bdesc = brin_build_desc(idxRel);
209 50 : indexInfo->ii_AmCache = (void *) bdesc;
210 50 : MemoryContextSwitchTo(oldcxt);
211 : }
212 : /* First time through in this brininsert call? */
213 258 : if (tupcxt == NULL)
214 : {
215 258 : tupcxt = AllocSetContextCreate(CurrentMemoryContext,
216 : "brininsert cxt",
217 : ALLOCSET_DEFAULT_SIZES);
218 258 : MemoryContextSwitchTo(tupcxt);
219 : }
220 :
221 258 : dtup = brin_deform_tuple(bdesc, brtup, NULL);
222 :
223 : /*
224 : * Compare the key values of the new tuple to the stored index values;
225 : * our deformed tuple will get updated if the new tuple doesn't fit
226 : * the original range (note this means we can't break out of the loop
227 : * early). Make a note of whether this happens, so that we know to
228 : * insert the modified tuple later.
229 : */
230 6606 : for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
231 : {
232 : Datum result;
233 : BrinValues *bval;
234 : FmgrInfo *addValue;
235 :
236 6348 : bval = &dtup->bt_columns[keyno];
237 6348 : addValue = index_getprocinfo(idxRel, keyno + 1,
238 : BRIN_PROCNUM_ADDVALUE);
239 19044 : result = FunctionCall4Coll(addValue,
240 6348 : idxRel->rd_indcollation[keyno],
241 : PointerGetDatum(bdesc),
242 : PointerGetDatum(bval),
243 6348 : values[keyno],
244 6348 : nulls[keyno]);
245 : /* if that returned true, we need to insert the updated tuple */
246 6348 : need_insert |= DatumGetBool(result);
247 : }
248 :
249 258 : if (!need_insert)
250 : {
251 : /*
252 : * The tuple is consistent with the new values, so there's nothing
253 : * to do.
254 : */
255 47 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
256 : }
257 : else
258 : {
259 211 : Page page = BufferGetPage(buf);
260 211 : ItemId lp = PageGetItemId(page, off);
261 : Size origsz;
262 : BrinTuple *origtup;
263 : Size newsz;
264 : BrinTuple *newtup;
265 : bool samepage;
266 :
267 : /*
268 : * Make a copy of the old tuple, so that we can compare it after
269 : * re-acquiring the lock.
270 : */
271 211 : origsz = ItemIdGetLength(lp);
272 211 : origtup = brin_copy_tuple(brtup, origsz, NULL, NULL);
273 :
274 : /*
275 : * Before releasing the lock, check if we can attempt a same-page
276 : * update. Another process could insert a tuple concurrently in
277 : * the same page though, so downstream we must be prepared to cope
278 : * if this turns out to not be possible after all.
279 : */
280 211 : newtup = brin_form_tuple(bdesc, heapBlk, dtup, &newsz);
281 211 : samepage = brin_can_do_samepage_update(buf, origsz, newsz);
282 211 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
283 :
284 : /*
285 : * Try to update the tuple. If this doesn't work for whatever
286 : * reason, we need to restart from the top; the revmap might be
287 : * pointing at a different tuple for this block now, so we need to
288 : * recompute to ensure both our new heap tuple and the other
289 : * inserter's are covered by the combined tuple. It might be that
290 : * we don't need to update at all.
291 : */
292 211 : if (!brin_doupdate(idxRel, pagesPerRange, revmap, heapBlk,
293 : buf, off, origtup, origsz, newtup, newsz,
294 : samepage))
295 : {
296 : /* no luck; start over */
297 0 : MemoryContextResetAndDeleteChildren(tupcxt);
298 0 : continue;
299 : }
300 : }
301 :
302 : /* success! */
303 258 : break;
304 0 : }
305 :
306 264 : brinRevmapTerminate(revmap);
307 264 : if (BufferIsValid(buf))
308 258 : ReleaseBuffer(buf);
309 264 : MemoryContextSwitchTo(oldcxt);
310 264 : if (tupcxt != NULL)
311 258 : MemoryContextDelete(tupcxt);
312 :
313 264 : return false;
314 : }
315 :
316 : /*
317 : * Initialize state for a BRIN index scan.
318 : *
319 : * We read the metapage here to determine the pages-per-range number that this
320 : * index was built with. Note that since this cannot be changed while we're
321 : * holding lock on index, it's not necessary to recompute it during brinrescan.
322 : */
323 : IndexScanDesc
324 248 : brinbeginscan(Relation r, int nkeys, int norderbys)
325 : {
326 : IndexScanDesc scan;
327 : BrinOpaque *opaque;
328 :
329 248 : scan = RelationGetIndexScan(r, nkeys, norderbys);
330 :
331 248 : opaque = (BrinOpaque *) palloc(sizeof(BrinOpaque));
332 248 : opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange,
333 : scan->xs_snapshot);
334 248 : opaque->bo_bdesc = brin_build_desc(r);
335 248 : scan->opaque = opaque;
336 :
337 248 : return scan;
338 : }
339 :
340 : /*
341 : * Execute the index scan.
342 : *
343 : * This works by reading index TIDs from the revmap, and obtaining the index
344 : * tuples pointed to by them; the summary values in the index tuples are
345 : * compared to the scan keys. We return into the TID bitmap all the pages in
346 : * ranges corresponding to index tuples that match the scan keys.
347 : *
348 : * If a TID from the revmap is read as InvalidTID, we know that range is
349 : * unsummarized. Pages in those ranges need to be returned regardless of scan
350 : * keys.
351 : */
352 : int64
353 248 : bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
354 : {
355 248 : Relation idxRel = scan->indexRelation;
356 248 : Buffer buf = InvalidBuffer;
357 : BrinDesc *bdesc;
358 : Oid heapOid;
359 : Relation heapRel;
360 : BrinOpaque *opaque;
361 : BlockNumber nblocks;
362 : BlockNumber heapBlk;
363 248 : int totalpages = 0;
364 : FmgrInfo *consistentFn;
365 : MemoryContext oldcxt;
366 : MemoryContext perRangeCxt;
367 : BrinMemTuple *dtup;
368 248 : BrinTuple *btup = NULL;
369 248 : Size btupsz = 0;
370 :
371 248 : opaque = (BrinOpaque *) scan->opaque;
372 248 : bdesc = opaque->bo_bdesc;
373 248 : pgstat_count_index_scan(idxRel);
374 :
375 : /*
376 : * We need to know the size of the table so that we know how long to
377 : * iterate on the revmap.
378 : */
379 248 : heapOid = IndexGetRelation(RelationGetRelid(idxRel), false);
380 248 : heapRel = heap_open(heapOid, AccessShareLock);
381 248 : nblocks = RelationGetNumberOfBlocks(heapRel);
382 248 : heap_close(heapRel, AccessShareLock);
383 :
384 : /*
385 : * Make room for the consistent support procedures of indexed columns. We
386 : * don't look them up here; we do that lazily the first time we see a scan
387 : * key reference each of them. We rely on zeroing fn_oid to InvalidOid.
388 : */
389 248 : consistentFn = palloc0(sizeof(FmgrInfo) * bdesc->bd_tupdesc->natts);
390 :
391 : /* allocate an initial in-memory tuple, out of the per-range memcxt */
392 248 : dtup = brin_new_memtuple(bdesc);
393 :
394 : /*
395 : * Setup and use a per-range memory context, which is reset every time we
396 : * loop below. This avoids having to free the tuples within the loop.
397 : */
398 248 : perRangeCxt = AllocSetContextCreate(CurrentMemoryContext,
399 : "bringetbitmap cxt",
400 : ALLOCSET_DEFAULT_SIZES);
401 248 : oldcxt = MemoryContextSwitchTo(perRangeCxt);
402 :
403 : /*
404 : * Now scan the revmap. We start by querying for heap page 0,
405 : * incrementing by the number of pages per range; this gives us a full
406 : * view of the table.
407 : */
408 25048 : for (heapBlk = 0; heapBlk < nblocks; heapBlk += opaque->bo_pagesPerRange)
409 : {
410 : bool addrange;
411 24800 : bool gottuple = false;
412 : BrinTuple *tup;
413 : OffsetNumber off;
414 : Size size;
415 :
416 24800 : CHECK_FOR_INTERRUPTS();
417 :
418 24800 : MemoryContextResetAndDeleteChildren(perRangeCxt);
419 :
420 24800 : tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
421 : &off, &size, BUFFER_LOCK_SHARE,
422 : scan->xs_snapshot);
423 24800 : if (tup)
424 : {
425 24800 : gottuple = true;
426 24800 : btup = brin_copy_tuple(tup, size, btup, &btupsz);
427 24800 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
428 : }
429 :
430 : /*
431 : * For page ranges with no indexed tuple, we must return the whole
432 : * range; otherwise, compare it to the scan keys.
433 : */
434 24800 : if (!gottuple)
435 : {
436 0 : addrange = true;
437 : }
438 : else
439 : {
440 24800 : dtup = brin_deform_tuple(bdesc, btup, dtup);
441 24800 : if (dtup->bt_placeholder)
442 : {
443 : /*
444 : * Placeholder tuples are always returned, regardless of the
445 : * values stored in them.
446 : */
447 0 : addrange = true;
448 : }
449 : else
450 : {
451 : int keyno;
452 :
453 : /*
454 : * Compare scan keys with summary values stored for the range.
455 : * If scan keys are matched, the page range must be added to
456 : * the bitmap. We initially assume the range needs to be
457 : * added; in particular this serves the case where there are
458 : * no keys.
459 : */
460 24800 : addrange = true;
461 43249 : for (keyno = 0; keyno < scan->numberOfKeys; keyno++)
462 : {
463 24800 : ScanKey key = &scan->keyData[keyno];
464 24800 : AttrNumber keyattno = key->sk_attno;
465 24800 : BrinValues *bval = &dtup->bt_columns[keyattno - 1];
466 : Datum add;
467 :
468 : /*
469 : * The collation of the scan key must match the collation
470 : * used in the index column (but only if the search is not
471 : * IS NULL/ IS NOT NULL). Otherwise we shouldn't be using
472 : * this index ...
473 : */
474 24800 : Assert((key->sk_flags & SK_ISNULL) ||
475 : (key->sk_collation ==
476 : TupleDescAttr(bdesc->bd_tupdesc,
477 : keyattno - 1)->attcollation));
478 :
479 : /* First time this column? look up consistent function */
480 24800 : if (consistentFn[keyattno - 1].fn_oid == InvalidOid)
481 : {
482 : FmgrInfo *tmp;
483 :
484 248 : tmp = index_getprocinfo(idxRel, keyattno,
485 : BRIN_PROCNUM_CONSISTENT);
486 248 : fmgr_info_copy(&consistentFn[keyattno - 1], tmp,
487 : CurrentMemoryContext);
488 : }
489 :
490 : /*
491 : * Check whether the scan key is consistent with the page
492 : * range values; if so, have the pages in the range added
493 : * to the output bitmap.
494 : *
495 : * When there are multiple scan keys, failure to meet the
496 : * criteria for a single one of them is enough to discard
497 : * the range as a whole, so break out of the loop as soon
498 : * as a false return value is obtained.
499 : */
500 24800 : add = FunctionCall3Coll(&consistentFn[keyattno - 1],
501 : key->sk_collation,
502 : PointerGetDatum(bdesc),
503 : PointerGetDatum(bval),
504 : PointerGetDatum(key));
505 24800 : addrange = DatumGetBool(add);
506 24800 : if (!addrange)
507 6351 : break;
508 : }
509 : }
510 : }
511 :
512 : /* add the pages in the range to the output bitmap, if needed */
513 24800 : if (addrange)
514 : {
515 : BlockNumber pageno;
516 :
517 55347 : for (pageno = heapBlk;
518 36898 : pageno <= heapBlk + opaque->bo_pagesPerRange - 1;
519 18449 : pageno++)
520 : {
521 18449 : MemoryContextSwitchTo(oldcxt);
522 18449 : tbm_add_page(tbm, pageno);
523 18449 : totalpages++;
524 18449 : MemoryContextSwitchTo(perRangeCxt);
525 : }
526 : }
527 : }
528 :
529 248 : MemoryContextSwitchTo(oldcxt);
530 248 : MemoryContextDelete(perRangeCxt);
531 :
532 248 : if (buf != InvalidBuffer)
533 248 : ReleaseBuffer(buf);
534 :
535 : /*
536 : * XXX We have an approximation of the number of *pages* that our scan
537 : * returns, but we don't have a precise idea of the number of heap tuples
538 : * involved.
539 : */
540 248 : return totalpages * 10;
541 : }
542 :
543 : /*
544 : * Re-initialize state for a BRIN index scan
545 : */
546 : void
547 248 : brinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
548 : ScanKey orderbys, int norderbys)
549 : {
550 : /*
551 : * Other index AMs preprocess the scan keys at this point, or sometime
552 : * early during the scan; this lets them optimize by removing redundant
553 : * keys, or doing early returns when they are impossible to satisfy; see
554 : * _bt_preprocess_keys for an example. Something like that could be added
555 : * here someday, too.
556 : */
557 :
558 248 : if (scankey && scan->numberOfKeys > 0)
559 248 : memmove(scan->keyData, scankey,
560 248 : scan->numberOfKeys * sizeof(ScanKeyData));
561 248 : }
562 :
563 : /*
564 : * Close down a BRIN index scan
565 : */
566 : void
567 248 : brinendscan(IndexScanDesc scan)
568 : {
569 248 : BrinOpaque *opaque = (BrinOpaque *) scan->opaque;
570 :
571 248 : brinRevmapTerminate(opaque->bo_rmAccess);
572 248 : brin_free_desc(opaque->bo_bdesc);
573 248 : pfree(opaque);
574 248 : }
575 :
576 : /*
577 : * Per-heap-tuple callback for IndexBuildHeapScan.
578 : *
579 : * Note we don't worry about the page range at the end of the table here; it is
580 : * present in the build state struct after we're called the last time, but not
581 : * inserted into the index. Caller must ensure to do so, if appropriate.
582 : */
583 : static void
584 20143 : brinbuildCallback(Relation index,
585 : HeapTuple htup,
586 : Datum *values,
587 : bool *isnull,
588 : bool tupleIsAlive,
589 : void *brstate)
590 : {
591 20143 : BrinBuildState *state = (BrinBuildState *) brstate;
592 : BlockNumber thisblock;
593 : int i;
594 :
595 20143 : thisblock = ItemPointerGetBlockNumber(&htup->t_self);
596 :
597 : /*
598 : * If we're in a block that belongs to a future range, summarize what
599 : * we've got and start afresh. Note the scan might have skipped many
600 : * pages, if they were devoid of live tuples; make sure to insert index
601 : * tuples for those too.
602 : */
603 40429 : while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
604 : {
605 :
606 : BRIN_elog((DEBUG2,
607 : "brinbuildCallback: completed a range: %u--%u",
608 : state->bs_currRangeStart,
609 : state->bs_currRangeStart + state->bs_pagesPerRange));
610 :
611 : /* create the index tuple and insert it */
612 143 : form_and_insert_tuple(state);
613 :
614 : /* set state to correspond to the next range */
615 143 : state->bs_currRangeStart += state->bs_pagesPerRange;
616 :
617 : /* re-initialize state for it */
618 143 : brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
619 : }
620 :
621 : /* Accumulate the current tuple into the running state */
622 44404 : for (i = 0; i < state->bs_bdesc->bd_tupdesc->natts; i++)
623 : {
624 : FmgrInfo *addValue;
625 : BrinValues *col;
626 24261 : Form_pg_attribute attr = TupleDescAttr(state->bs_bdesc->bd_tupdesc, i);
627 :
628 24261 : col = &state->bs_dtuple->bt_columns[i];
629 24261 : addValue = index_getprocinfo(index, i + 1,
630 : BRIN_PROCNUM_ADDVALUE);
631 :
632 : /*
633 : * Update dtuple state, if and as necessary.
634 : */
635 72783 : FunctionCall4Coll(addValue,
636 : attr->attcollation,
637 24261 : PointerGetDatum(state->bs_bdesc),
638 : PointerGetDatum(col),
639 48522 : values[i], isnull[i]);
640 : }
641 20143 : }
642 :
643 : /*
644 : * brinbuild() -- build a new BRIN index.
645 : */
646 : IndexBuildResult *
647 4 : brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
648 : {
649 : IndexBuildResult *result;
650 : double reltuples;
651 : double idxtuples;
652 : BrinRevmap *revmap;
653 : BrinBuildState *state;
654 : Buffer meta;
655 : BlockNumber pagesPerRange;
656 :
657 : /*
658 : * We expect to be called exactly once for any index relation.
659 : */
660 4 : if (RelationGetNumberOfBlocks(index) != 0)
661 0 : elog(ERROR, "index \"%s\" already contains data",
662 : RelationGetRelationName(index));
663 :
664 : /*
665 : * Critical section not required, because on error the creation of the
666 : * whole relation will be rolled back.
667 : */
668 :
669 4 : meta = ReadBuffer(index, P_NEW);
670 4 : Assert(BufferGetBlockNumber(meta) == BRIN_METAPAGE_BLKNO);
671 4 : LockBuffer(meta, BUFFER_LOCK_EXCLUSIVE);
672 :
673 4 : brin_metapage_init(BufferGetPage(meta), BrinGetPagesPerRange(index),
674 : BRIN_CURRENT_VERSION);
675 4 : MarkBufferDirty(meta);
676 :
677 4 : if (RelationNeedsWAL(index))
678 : {
679 : xl_brin_createidx xlrec;
680 : XLogRecPtr recptr;
681 : Page page;
682 :
683 4 : xlrec.version = BRIN_CURRENT_VERSION;
684 4 : xlrec.pagesPerRange = BrinGetPagesPerRange(index);
685 :
686 4 : XLogBeginInsert();
687 4 : XLogRegisterData((char *) &xlrec, SizeOfBrinCreateIdx);
688 4 : XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT);
689 :
690 4 : recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX);
691 :
692 4 : page = BufferGetPage(meta);
693 4 : PageSetLSN(page, recptr);
694 : }
695 :
696 4 : UnlockReleaseBuffer(meta);
697 :
698 : /*
699 : * Initialize our state, including the deformed tuple state.
700 : */
701 4 : revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
702 4 : state = initialize_brin_buildstate(index, revmap, pagesPerRange);
703 :
704 : /*
705 : * Now scan the relation. No syncscan allowed here because we want the
706 : * heap blocks in physical order.
707 : */
708 4 : reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
709 : brinbuildCallback, (void *) state);
710 :
711 : /* process the final batch */
712 4 : form_and_insert_tuple(state);
713 :
714 : /* release resources */
715 4 : idxtuples = state->bs_numtuples;
716 4 : brinRevmapTerminate(state->bs_rmAccess);
717 4 : terminate_brin_buildstate(state);
718 :
719 : /*
720 : * Return statistics
721 : */
722 4 : result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
723 :
724 4 : result->heap_tuples = reltuples;
725 4 : result->index_tuples = idxtuples;
726 :
727 4 : return result;
728 : }
729 :
730 : void
731 0 : brinbuildempty(Relation index)
732 : {
733 : Buffer metabuf;
734 :
735 : /* An empty BRIN index has a metapage only. */
736 0 : metabuf =
737 : ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
738 0 : LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
739 :
740 : /* Initialize and xlog metabuffer. */
741 0 : START_CRIT_SECTION();
742 0 : brin_metapage_init(BufferGetPage(metabuf), BrinGetPagesPerRange(index),
743 : BRIN_CURRENT_VERSION);
744 0 : MarkBufferDirty(metabuf);
745 0 : log_newpage_buffer(metabuf, false);
746 0 : END_CRIT_SECTION();
747 :
748 0 : UnlockReleaseBuffer(metabuf);
749 0 : }
750 :
751 : /*
752 : * brinbulkdelete
753 : * Since there are no per-heap-tuple index tuples in BRIN indexes,
754 : * there's not a lot we can do here.
755 : *
756 : * XXX we could mark item tuples as "dirty" (when a minimum or maximum heap
757 : * tuple is deleted), meaning the need to re-run summarization on the affected
758 : * range. Would need to add an extra flag in brintuples for that.
759 : */
760 : IndexBulkDeleteResult *
761 1 : brinbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
762 : IndexBulkDeleteCallback callback, void *callback_state)
763 : {
764 : /* allocate stats if first time through, else re-use existing struct */
765 1 : if (stats == NULL)
766 1 : stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
767 :
768 1 : return stats;
769 : }
770 :
771 : /*
772 : * This routine is in charge of "vacuuming" a BRIN index: we just summarize
773 : * ranges that are currently unsummarized.
774 : */
775 : IndexBulkDeleteResult *
776 4 : brinvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
777 : {
778 : Relation heapRel;
779 :
780 : /* No-op in ANALYZE ONLY mode */
781 4 : if (info->analyze_only)
782 0 : return stats;
783 :
784 4 : if (!stats)
785 3 : stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
786 4 : stats->num_pages = RelationGetNumberOfBlocks(info->index);
787 : /* rest of stats is initialized by zeroing */
788 :
789 4 : heapRel = heap_open(IndexGetRelation(RelationGetRelid(info->index), false),
790 : AccessShareLock);
791 :
792 4 : brin_vacuum_scan(info->index, info->strategy);
793 :
794 4 : brinsummarize(info->index, heapRel, BRIN_ALL_BLOCKRANGES,
795 : &stats->num_index_tuples, &stats->num_index_tuples);
796 :
797 4 : heap_close(heapRel, AccessShareLock);
798 :
799 4 : return stats;
800 : }
801 :
802 : /*
803 : * reloptions processor for BRIN indexes
804 : */
805 : bytea *
806 19 : brinoptions(Datum reloptions, bool validate)
807 : {
808 : relopt_value *options;
809 : BrinOptions *rdopts;
810 : int numoptions;
811 : static const relopt_parse_elt tab[] = {
812 : {"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, pagesPerRange)},
813 : {"autosummarize", RELOPT_TYPE_BOOL, offsetof(BrinOptions, autosummarize)}
814 : };
815 :
816 19 : options = parseRelOptions(reloptions, validate, RELOPT_KIND_BRIN,
817 : &numoptions);
818 :
819 : /* if none set, we're done */
820 19 : if (numoptions == 0)
821 0 : return NULL;
822 :
823 19 : rdopts = allocateReloptStruct(sizeof(BrinOptions), options, numoptions);
824 :
825 19 : fillRelOptions((void *) rdopts, sizeof(BrinOptions), options, numoptions,
826 : validate, tab, lengthof(tab));
827 :
828 19 : pfree(options);
829 :
830 19 : return (bytea *) rdopts;
831 : }
832 :
833 : /*
834 : * SQL-callable function to scan through an index and summarize all ranges
835 : * that are not currently summarized.
836 : */
837 : Datum
838 3 : brin_summarize_new_values(PG_FUNCTION_ARGS)
839 : {
840 3 : Datum relation = PG_GETARG_DATUM(0);
841 :
842 3 : return DirectFunctionCall2(brin_summarize_range,
843 : relation,
844 : Int64GetDatum((int64) BRIN_ALL_BLOCKRANGES));
845 : }
846 :
847 : /*
848 : * SQL-callable function to summarize the indicated page range, if not already
849 : * summarized. If the second argument is BRIN_ALL_BLOCKRANGES, all
850 : * unsummarized ranges are summarized.
851 : */
852 : Datum
853 9 : brin_summarize_range(PG_FUNCTION_ARGS)
854 : {
855 9 : Oid indexoid = PG_GETARG_OID(0);
856 9 : int64 heapBlk64 = PG_GETARG_INT64(1);
857 : BlockNumber heapBlk;
858 : Oid heapoid;
859 : Relation indexRel;
860 : Relation heapRel;
861 9 : double numSummarized = 0;
862 :
863 9 : if (heapBlk64 > BRIN_ALL_BLOCKRANGES || heapBlk64 < 0)
864 : {
865 2 : char *blk = psprintf(INT64_FORMAT, heapBlk64);
866 :
867 2 : ereport(ERROR,
868 : (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
869 : errmsg("block number out of range: %s", blk)));
870 : }
871 7 : heapBlk = (BlockNumber) heapBlk64;
872 :
873 : /*
874 : * We must lock table before index to avoid deadlocks. However, if the
875 : * passed indexoid isn't an index then IndexGetRelation() will fail.
876 : * Rather than emitting a not-very-helpful error message, postpone
877 : * complaining, expecting that the is-it-an-index test below will fail.
878 : */
879 7 : heapoid = IndexGetRelation(indexoid, true);
880 7 : if (OidIsValid(heapoid))
881 6 : heapRel = heap_open(heapoid, ShareUpdateExclusiveLock);
882 : else
883 1 : heapRel = NULL;
884 :
885 7 : indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
886 :
887 : /* Must be a BRIN index */
888 12 : if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
889 6 : indexRel->rd_rel->relam != BRIN_AM_OID)
890 1 : ereport(ERROR,
891 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
892 : errmsg("\"%s\" is not a BRIN index",
893 : RelationGetRelationName(indexRel))));
894 :
895 : /* User must own the index (comparable to privileges needed for VACUUM) */
896 5 : if (!pg_class_ownercheck(indexoid, GetUserId()))
897 0 : aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
898 0 : RelationGetRelationName(indexRel));
899 :
900 : /*
901 : * Since we did the IndexGetRelation call above without any lock, it's
902 : * barely possible that a race against an index drop/recreation could have
903 : * netted us the wrong table. Recheck.
904 : */
905 5 : if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
906 0 : ereport(ERROR,
907 : (errcode(ERRCODE_UNDEFINED_TABLE),
908 : errmsg("could not open parent table of index %s",
909 : RelationGetRelationName(indexRel))));
910 :
911 : /* OK, do it */
912 5 : brinsummarize(indexRel, heapRel, heapBlk, &numSummarized, NULL);
913 :
914 5 : relation_close(indexRel, ShareUpdateExclusiveLock);
915 5 : relation_close(heapRel, ShareUpdateExclusiveLock);
916 :
917 5 : PG_RETURN_INT32((int32) numSummarized);
918 : }
919 :
920 : /*
921 : * SQL-callable interface to mark a range as no longer summarized
922 : */
923 : Datum
924 5 : brin_desummarize_range(PG_FUNCTION_ARGS)
925 : {
926 5 : Oid indexoid = PG_GETARG_OID(0);
927 5 : int64 heapBlk64 = PG_GETARG_INT64(1);
928 : BlockNumber heapBlk;
929 : Oid heapoid;
930 : Relation heapRel;
931 : Relation indexRel;
932 : bool done;
933 :
934 5 : if (heapBlk64 > MaxBlockNumber || heapBlk64 < 0)
935 : {
936 1 : char *blk = psprintf(INT64_FORMAT, heapBlk64);
937 :
938 1 : ereport(ERROR,
939 : (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
940 : errmsg("block number out of range: %s", blk)));
941 : }
942 4 : heapBlk = (BlockNumber) heapBlk64;
943 :
944 : /*
945 : * We must lock table before index to avoid deadlocks. However, if the
946 : * passed indexoid isn't an index then IndexGetRelation() will fail.
947 : * Rather than emitting a not-very-helpful error message, postpone
948 : * complaining, expecting that the is-it-an-index test below will fail.
949 : */
950 4 : heapoid = IndexGetRelation(indexoid, true);
951 4 : if (OidIsValid(heapoid))
952 4 : heapRel = heap_open(heapoid, ShareUpdateExclusiveLock);
953 : else
954 0 : heapRel = NULL;
955 :
956 4 : indexRel = index_open(indexoid, ShareUpdateExclusiveLock);
957 :
958 : /* Must be a BRIN index */
959 8 : if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
960 4 : indexRel->rd_rel->relam != BRIN_AM_OID)
961 0 : ereport(ERROR,
962 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
963 : errmsg("\"%s\" is not a BRIN index",
964 : RelationGetRelationName(indexRel))));
965 :
966 : /* User must own the index (comparable to privileges needed for VACUUM) */
967 4 : if (!pg_class_ownercheck(indexoid, GetUserId()))
968 0 : aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
969 0 : RelationGetRelationName(indexRel));
970 :
971 : /*
972 : * Since we did the IndexGetRelation call above without any lock, it's
973 : * barely possible that a race against an index drop/recreation could have
974 : * netted us the wrong table. Recheck.
975 : */
976 4 : if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false))
977 0 : ereport(ERROR,
978 : (errcode(ERRCODE_UNDEFINED_TABLE),
979 : errmsg("could not open parent table of index %s",
980 : RelationGetRelationName(indexRel))));
981 :
982 : /* the revmap does the hard work */
983 : do
984 : {
985 4 : done = brinRevmapDesummarizeRange(indexRel, heapBlk);
986 : }
987 4 : while (!done);
988 :
989 4 : relation_close(indexRel, ShareUpdateExclusiveLock);
990 4 : relation_close(heapRel, ShareUpdateExclusiveLock);
991 :
992 4 : PG_RETURN_VOID();
993 : }
994 :
995 : /*
996 : * Build a BrinDesc used to create or scan a BRIN index
997 : */
998 : BrinDesc *
999 305 : brin_build_desc(Relation rel)
1000 : {
1001 : BrinOpcInfo **opcinfo;
1002 : BrinDesc *bdesc;
1003 : TupleDesc tupdesc;
1004 305 : int totalstored = 0;
1005 : int keyno;
1006 : long totalsize;
1007 : MemoryContext cxt;
1008 : MemoryContext oldcxt;
1009 :
1010 305 : cxt = AllocSetContextCreate(CurrentMemoryContext,
1011 : "brin desc cxt",
1012 : ALLOCSET_SMALL_SIZES);
1013 305 : oldcxt = MemoryContextSwitchTo(cxt);
1014 305 : tupdesc = RelationGetDescr(rel);
1015 :
1016 : /*
1017 : * Obtain BrinOpcInfo for each indexed column. While at it, accumulate
1018 : * the number of columns stored, since the number is opclass-defined.
1019 : */
1020 305 : opcinfo = (BrinOpcInfo **) palloc(sizeof(BrinOpcInfo *) * tupdesc->natts);
1021 7947 : for (keyno = 0; keyno < tupdesc->natts; keyno++)
1022 : {
1023 : FmgrInfo *opcInfoFn;
1024 7642 : Form_pg_attribute attr = TupleDescAttr(tupdesc, keyno);
1025 :
1026 7642 : opcInfoFn = index_getprocinfo(rel, keyno + 1, BRIN_PROCNUM_OPCINFO);
1027 :
1028 15284 : opcinfo[keyno] = (BrinOpcInfo *)
1029 7642 : DatumGetPointer(FunctionCall1(opcInfoFn, attr->atttypid));
1030 7642 : totalstored += opcinfo[keyno]->oi_nstored;
1031 : }
1032 :
1033 : /* Allocate our result struct and fill it in */
1034 305 : totalsize = offsetof(BrinDesc, bd_info) +
1035 305 : sizeof(BrinOpcInfo *) * tupdesc->natts;
1036 :
1037 305 : bdesc = palloc(totalsize);
1038 305 : bdesc->bd_context = cxt;
1039 305 : bdesc->bd_index = rel;
1040 305 : bdesc->bd_tupdesc = tupdesc;
1041 305 : bdesc->bd_disktdesc = NULL; /* generated lazily */
1042 305 : bdesc->bd_totalstored = totalstored;
1043 :
1044 7947 : for (keyno = 0; keyno < tupdesc->natts; keyno++)
1045 7642 : bdesc->bd_info[keyno] = opcinfo[keyno];
1046 305 : pfree(opcinfo);
1047 :
1048 305 : MemoryContextSwitchTo(oldcxt);
1049 :
1050 305 : return bdesc;
1051 : }
1052 :
1053 : void
1054 255 : brin_free_desc(BrinDesc *bdesc)
1055 : {
1056 : /* make sure the tupdesc is still valid */
1057 255 : Assert(bdesc->bd_tupdesc->tdrefcount >= 1);
1058 : /* no need for retail pfree */
1059 255 : MemoryContextDelete(bdesc->bd_context);
1060 255 : }
1061 :
1062 : /*
1063 : * Fetch index's statistical data into *stats
1064 : */
1065 : void
1066 995 : brinGetStats(Relation index, BrinStatsData *stats)
1067 : {
1068 : Buffer metabuffer;
1069 : Page metapage;
1070 : BrinMetaPageData *metadata;
1071 :
1072 995 : metabuffer = ReadBuffer(index, BRIN_METAPAGE_BLKNO);
1073 995 : LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
1074 995 : metapage = BufferGetPage(metabuffer);
1075 995 : metadata = (BrinMetaPageData *) PageGetContents(metapage);
1076 :
1077 995 : stats->pagesPerRange = metadata->pagesPerRange;
1078 995 : stats->revmapNumPages = metadata->lastRevmapPage - 1;
1079 :
1080 995 : UnlockReleaseBuffer(metabuffer);
1081 995 : }
1082 :
1083 : /*
1084 : * Initialize a BrinBuildState appropriate to create tuples on the given index.
1085 : */
1086 : static BrinBuildState *
1087 7 : initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
1088 : BlockNumber pagesPerRange)
1089 : {
1090 : BrinBuildState *state;
1091 :
1092 7 : state = palloc(sizeof(BrinBuildState));
1093 :
1094 7 : state->bs_irel = idxRel;
1095 7 : state->bs_numtuples = 0;
1096 7 : state->bs_currentInsertBuf = InvalidBuffer;
1097 7 : state->bs_pagesPerRange = pagesPerRange;
1098 7 : state->bs_currRangeStart = 0;
1099 7 : state->bs_rmAccess = revmap;
1100 7 : state->bs_bdesc = brin_build_desc(idxRel);
1101 7 : state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
1102 :
1103 7 : brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1104 :
1105 7 : return state;
1106 : }
1107 :
1108 : /*
1109 : * Release resources associated with a BrinBuildState.
1110 : */
1111 : static void
1112 7 : terminate_brin_buildstate(BrinBuildState *state)
1113 : {
1114 : /* release the last index buffer used */
1115 7 : if (!BufferIsInvalid(state->bs_currentInsertBuf))
1116 : {
1117 : Page page;
1118 :
1119 4 : page = BufferGetPage(state->bs_currentInsertBuf);
1120 4 : RecordPageWithFreeSpace(state->bs_irel,
1121 : BufferGetBlockNumber(state->bs_currentInsertBuf),
1122 : PageGetFreeSpace(page));
1123 4 : ReleaseBuffer(state->bs_currentInsertBuf);
1124 : }
1125 :
1126 7 : brin_free_desc(state->bs_bdesc);
1127 7 : pfree(state->bs_dtuple);
1128 7 : pfree(state);
1129 7 : }
1130 :
1131 : /*
1132 : * Summarize the given page range of the given index.
1133 : *
1134 : * This routine can run in parallel with insertions into the heap. To avoid
1135 : * missing those values from the summary tuple, we first insert a placeholder
1136 : * index tuple into the index, then execute the heap scan; transactions
1137 : * concurrent with the scan update the placeholder tuple. After the scan, we
1138 : * union the placeholder tuple with the one computed by this routine. The
1139 : * update of the index value happens in a loop, so that if somebody updates
1140 : * the placeholder tuple after we read it, we detect the case and try again.
1141 : * This ensures that the concurrently inserted tuples are not lost.
1142 : */
1143 : static void
1144 8 : summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
1145 : BlockNumber heapBlk, BlockNumber heapNumBlks)
1146 : {
1147 : Buffer phbuf;
1148 : BrinTuple *phtup;
1149 : Size phsz;
1150 : OffsetNumber offset;
1151 : BlockNumber scanNumBlks;
1152 :
1153 : /*
1154 : * Insert the placeholder tuple
1155 : */
1156 8 : phbuf = InvalidBuffer;
1157 8 : phtup = brin_form_placeholder_tuple(state->bs_bdesc, heapBlk, &phsz);
1158 8 : offset = brin_doinsert(state->bs_irel, state->bs_pagesPerRange,
1159 : state->bs_rmAccess, &phbuf,
1160 : heapBlk, phtup, phsz);
1161 :
1162 : /*
1163 : * Execute the partial heap scan covering the heap blocks in the specified
1164 : * page range, summarizing the heap tuples in it. This scan stops just
1165 : * short of brinbuildCallback creating the new index entry.
1166 : *
1167 : * Note that it is critical we use the "any visible" mode of
1168 : * IndexBuildHeapRangeScan here: otherwise, we would miss tuples inserted
1169 : * by transactions that are still in progress, among other corner cases.
1170 : */
1171 8 : state->bs_currRangeStart = heapBlk;
1172 16 : scanNumBlks = heapBlk + state->bs_pagesPerRange <= heapNumBlks ?
1173 8 : state->bs_pagesPerRange : heapNumBlks - heapBlk;
1174 8 : IndexBuildHeapRangeScan(heapRel, state->bs_irel, indexInfo, false, true,
1175 : heapBlk, scanNumBlks,
1176 : brinbuildCallback, (void *) state);
1177 :
1178 : /*
1179 : * Now we update the values obtained by the scan with the placeholder
1180 : * tuple. We do this in a loop which only terminates if we're able to
1181 : * update the placeholder tuple successfully; if we are not, this means
1182 : * somebody else modified the placeholder tuple after we read it.
1183 : */
1184 : for (;;)
1185 : {
1186 : BrinTuple *newtup;
1187 : Size newsize;
1188 : bool didupdate;
1189 : bool samepage;
1190 :
1191 8 : CHECK_FOR_INTERRUPTS();
1192 :
1193 : /*
1194 : * Update the summary tuple and try to update.
1195 : */
1196 8 : newtup = brin_form_tuple(state->bs_bdesc,
1197 : heapBlk, state->bs_dtuple, &newsize);
1198 8 : samepage = brin_can_do_samepage_update(phbuf, phsz, newsize);
1199 8 : didupdate =
1200 8 : brin_doupdate(state->bs_irel, state->bs_pagesPerRange,
1201 : state->bs_rmAccess, heapBlk, phbuf, offset,
1202 : phtup, phsz, newtup, newsize, samepage);
1203 8 : brin_free_tuple(phtup);
1204 8 : brin_free_tuple(newtup);
1205 :
1206 : /* If the update succeeded, we're done. */
1207 8 : if (didupdate)
1208 8 : break;
1209 :
1210 : /*
1211 : * If the update didn't work, it might be because somebody updated the
1212 : * placeholder tuple concurrently. Extract the new version, union it
1213 : * with the values we have from the scan, and start over. (There are
1214 : * other reasons for the update to fail, but it's simple to treat them
1215 : * the same.)
1216 : */
1217 0 : phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
1218 : &offset, &phsz, BUFFER_LOCK_SHARE,
1219 : NULL);
1220 : /* the placeholder tuple must exist */
1221 0 : if (phtup == NULL)
1222 0 : elog(ERROR, "missing placeholder tuple");
1223 0 : phtup = brin_copy_tuple(phtup, phsz, NULL, NULL);
1224 0 : LockBuffer(phbuf, BUFFER_LOCK_UNLOCK);
1225 :
1226 : /* merge it into the tuple from the heap scan */
1227 0 : union_tuples(state->bs_bdesc, state->bs_dtuple, phtup);
1228 0 : }
1229 :
1230 8 : ReleaseBuffer(phbuf);
1231 8 : }
1232 :
1233 : /*
1234 : * Summarize page ranges that are not already summarized. If pageRange is
1235 : * BRIN_ALL_BLOCKRANGES then the whole table is scanned; otherwise, only the
1236 : * page range containing the given heap page number is scanned.
1237 : *
1238 : * For each new index tuple inserted, *numSummarized (if not NULL) is
1239 : * incremented; for each existing tuple, *numExisting (if not NULL) is
1240 : * incremented.
1241 : */
1242 : static void
1243 9 : brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
1244 : double *numSummarized, double *numExisting)
1245 : {
1246 : BrinRevmap *revmap;
1247 9 : BrinBuildState *state = NULL;
1248 9 : IndexInfo *indexInfo = NULL;
1249 : BlockNumber heapNumBlocks;
1250 : BlockNumber heapBlk;
1251 : BlockNumber pagesPerRange;
1252 : Buffer buf;
1253 : BlockNumber startBlk;
1254 : BlockNumber endBlk;
1255 :
1256 : /* determine range of pages to process; nothing to do for an empty table */
1257 9 : heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
1258 9 : if (heapNumBlocks == 0)
1259 0 : return;
1260 :
1261 9 : revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
1262 :
1263 9 : if (pageRange == BRIN_ALL_BLOCKRANGES)
1264 : {
1265 6 : startBlk = 0;
1266 6 : endBlk = heapNumBlocks;
1267 : }
1268 : else
1269 : {
1270 3 : startBlk = (pageRange / pagesPerRange) * pagesPerRange;
1271 : /* Nothing to do if start point is beyond end of table */
1272 3 : if (startBlk > heapNumBlocks)
1273 : {
1274 0 : brinRevmapTerminate(revmap);
1275 0 : return;
1276 : }
1277 3 : endBlk = startBlk + pagesPerRange;
1278 3 : if (endBlk > heapNumBlocks)
1279 1 : endBlk = heapNumBlocks;
1280 : }
1281 :
1282 : /*
1283 : * Scan the revmap to find unsummarized items.
1284 : */
1285 9 : buf = InvalidBuffer;
1286 375 : for (heapBlk = startBlk; heapBlk < endBlk; heapBlk += pagesPerRange)
1287 : {
1288 : BrinTuple *tup;
1289 : OffsetNumber off;
1290 :
1291 366 : CHECK_FOR_INTERRUPTS();
1292 :
1293 366 : tup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off, NULL,
1294 : BUFFER_LOCK_SHARE, NULL);
1295 366 : if (tup == NULL)
1296 : {
1297 : /* no revmap entry for this heap range. Summarize it. */
1298 8 : if (state == NULL)
1299 : {
1300 : /* first time through */
1301 3 : Assert(!indexInfo);
1302 3 : state = initialize_brin_buildstate(index, revmap,
1303 : pagesPerRange);
1304 3 : indexInfo = BuildIndexInfo(index);
1305 : }
1306 8 : summarize_range(indexInfo, state, heapRel, heapBlk, heapNumBlocks);
1307 :
1308 : /* and re-initialize state for the next range */
1309 8 : brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
1310 :
1311 8 : if (numSummarized)
1312 8 : *numSummarized += 1.0;
1313 : }
1314 : else
1315 : {
1316 358 : if (numExisting)
1317 249 : *numExisting += 1.0;
1318 358 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1319 : }
1320 : }
1321 :
1322 9 : if (BufferIsValid(buf))
1323 8 : ReleaseBuffer(buf);
1324 :
1325 : /* free resources */
1326 9 : brinRevmapTerminate(revmap);
1327 9 : if (state)
1328 : {
1329 3 : terminate_brin_buildstate(state);
1330 3 : pfree(indexInfo);
1331 : }
1332 : }
1333 :
1334 : /*
1335 : * Given a deformed tuple in the build state, convert it into the on-disk
1336 : * format and insert it into the index, making the revmap point to it.
1337 : */
1338 : static void
1339 147 : form_and_insert_tuple(BrinBuildState *state)
1340 : {
1341 : BrinTuple *tup;
1342 : Size size;
1343 :
1344 147 : tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
1345 : state->bs_dtuple, &size);
1346 147 : brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
1347 : &state->bs_currentInsertBuf, state->bs_currRangeStart,
1348 : tup, size);
1349 147 : state->bs_numtuples++;
1350 :
1351 147 : pfree(tup);
1352 147 : }
1353 :
1354 : /*
1355 : * Given two deformed tuples, adjust the first one so that it's consistent
1356 : * with the summary values in both.
1357 : */
1358 : static void
1359 0 : union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
1360 : {
1361 : int keyno;
1362 : BrinMemTuple *db;
1363 : MemoryContext cxt;
1364 : MemoryContext oldcxt;
1365 :
1366 : /* Use our own memory context to avoid retail pfree */
1367 0 : cxt = AllocSetContextCreate(CurrentMemoryContext,
1368 : "brin union",
1369 : ALLOCSET_DEFAULT_SIZES);
1370 0 : oldcxt = MemoryContextSwitchTo(cxt);
1371 0 : db = brin_deform_tuple(bdesc, b, NULL);
1372 0 : MemoryContextSwitchTo(oldcxt);
1373 :
1374 0 : for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
1375 : {
1376 : FmgrInfo *unionFn;
1377 0 : BrinValues *col_a = &a->bt_columns[keyno];
1378 0 : BrinValues *col_b = &db->bt_columns[keyno];
1379 :
1380 0 : unionFn = index_getprocinfo(bdesc->bd_index, keyno + 1,
1381 : BRIN_PROCNUM_UNION);
1382 0 : FunctionCall3Coll(unionFn,
1383 0 : bdesc->bd_index->rd_indcollation[keyno],
1384 : PointerGetDatum(bdesc),
1385 : PointerGetDatum(col_a),
1386 : PointerGetDatum(col_b));
1387 : }
1388 :
1389 0 : MemoryContextDelete(cxt);
1390 0 : }
1391 :
1392 : /*
1393 : * brin_vacuum_scan
1394 : * Do a complete scan of the index during VACUUM.
1395 : *
1396 : * This routine scans the complete index looking for uncatalogued index pages,
1397 : * i.e. those that might have been lost due to a crash after index extension
1398 : * and such.
1399 : */
1400 : static void
1401 4 : brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy)
1402 : {
1403 4 : bool vacuum_fsm = false;
1404 : BlockNumber blkno;
1405 :
1406 : /*
1407 : * Scan the index in physical order, and clean up any possible mess in
1408 : * each page.
1409 : */
1410 35 : for (blkno = 0; blkno < RelationGetNumberOfBlocks(idxrel); blkno++)
1411 : {
1412 : Buffer buf;
1413 :
1414 31 : CHECK_FOR_INTERRUPTS();
1415 :
1416 31 : buf = ReadBufferExtended(idxrel, MAIN_FORKNUM, blkno,
1417 : RBM_NORMAL, strategy);
1418 :
1419 31 : vacuum_fsm |= brin_page_cleanup(idxrel, buf);
1420 :
1421 31 : ReleaseBuffer(buf);
1422 : }
1423 :
1424 : /*
1425 : * If we made any change to the FSM, make sure the new info is visible all
1426 : * the way to the top.
1427 : */
1428 4 : if (vacuum_fsm)
1429 4 : FreeSpaceMapVacuum(idxrel);
1430 4 : }
|