Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * ginfast.c
4 : * Fast insert routines for the Postgres inverted index access method.
5 : * Pending entries are stored in linear list of pages. Later on
6 : * (typically during VACUUM), ginInsertCleanup() will be invoked to
7 : * transfer pending entries into the regular index structure. This
8 : * wins because bulk insertion is much more efficient than retail.
9 : *
10 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
11 : * Portions Copyright (c) 1994, Regents of the University of California
12 : *
13 : * IDENTIFICATION
14 : * src/backend/access/gin/ginfast.c
15 : *
16 : *-------------------------------------------------------------------------
17 : */
18 :
19 : #include "postgres.h"
20 :
21 : #include "access/gin_private.h"
22 : #include "access/ginxlog.h"
23 : #include "access/xloginsert.h"
24 : #include "access/xlog.h"
25 : #include "commands/vacuum.h"
26 : #include "catalog/pg_am.h"
27 : #include "miscadmin.h"
28 : #include "utils/memutils.h"
29 : #include "utils/rel.h"
30 : #include "utils/acl.h"
31 : #include "postmaster/autovacuum.h"
32 : #include "storage/indexfsm.h"
33 : #include "storage/lmgr.h"
34 : #include "utils/builtins.h"
35 :
36 : /* GUC parameter */
37 : int gin_pending_list_limit = 0;
38 :
39 : #define GIN_PAGE_FREESIZE \
40 : ( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) )
41 :
42 : typedef struct KeyArray
43 : {
44 : Datum *keys; /* expansible array */
45 : GinNullCategory *categories; /* another expansible array */
46 : int32 nvalues; /* current number of valid entries */
47 : int32 maxvalues; /* allocated size of arrays */
48 : } KeyArray;
49 :
50 :
51 : /*
52 : * Build a pending-list page from the given array of tuples, and write it out.
53 : *
54 : * Returns amount of free space left on the page.
55 : */
56 : static int32
57 132 : writeListPage(Relation index, Buffer buffer,
58 : IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
59 : {
60 132 : Page page = BufferGetPage(buffer);
61 : int32 i,
62 : freesize,
63 132 : size = 0;
64 : OffsetNumber l,
65 : off;
66 : char *workspace;
67 : char *ptr;
68 :
69 : /* workspace could be a local array; we use palloc for alignment */
70 132 : workspace = palloc(BLCKSZ);
71 :
72 132 : START_CRIT_SECTION();
73 :
74 132 : GinInitBuffer(buffer, GIN_LIST);
75 :
76 132 : off = FirstOffsetNumber;
77 132 : ptr = workspace;
78 :
79 524 : for (i = 0; i < ntuples; i++)
80 : {
81 392 : int this_size = IndexTupleSize(tuples[i]);
82 :
83 392 : memcpy(ptr, tuples[i], this_size);
84 392 : ptr += this_size;
85 392 : size += this_size;
86 :
87 392 : l = PageAddItem(page, (Item) tuples[i], this_size, off, false, false);
88 :
89 392 : if (l == InvalidOffsetNumber)
90 0 : elog(ERROR, "failed to add item to index page in \"%s\"",
91 : RelationGetRelationName(index));
92 :
93 392 : off++;
94 : }
95 :
96 132 : Assert(size <= BLCKSZ); /* else we overran workspace */
97 :
98 132 : GinPageGetOpaque(page)->rightlink = rightlink;
99 :
100 : /*
101 : * tail page may contain only whole row(s) or final part of row placed on
102 : * previous pages (a "row" here meaning all the index tuples generated for
103 : * one heap tuple)
104 : */
105 132 : if (rightlink == InvalidBlockNumber)
106 : {
107 132 : GinPageSetFullRow(page);
108 132 : GinPageGetOpaque(page)->maxoff = 1;
109 : }
110 : else
111 : {
112 0 : GinPageGetOpaque(page)->maxoff = 0;
113 : }
114 :
115 132 : MarkBufferDirty(buffer);
116 :
117 132 : if (RelationNeedsWAL(index))
118 : {
119 : ginxlogInsertListPage data;
120 : XLogRecPtr recptr;
121 :
122 131 : data.rightlink = rightlink;
123 131 : data.ntuples = ntuples;
124 :
125 131 : XLogBeginInsert();
126 131 : XLogRegisterData((char *) &data, sizeof(ginxlogInsertListPage));
127 :
128 131 : XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
129 131 : XLogRegisterBufData(0, workspace, size);
130 :
131 131 : recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE);
132 131 : PageSetLSN(page, recptr);
133 : }
134 :
135 : /* get free space before releasing buffer */
136 132 : freesize = PageGetExactFreeSpace(page);
137 :
138 132 : UnlockReleaseBuffer(buffer);
139 :
140 132 : END_CRIT_SECTION();
141 :
142 132 : pfree(workspace);
143 :
144 132 : return freesize;
145 : }
146 :
147 : static void
148 132 : makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
149 : GinMetaPageData *res)
150 : {
151 132 : Buffer curBuffer = InvalidBuffer;
152 132 : Buffer prevBuffer = InvalidBuffer;
153 : int i,
154 132 : size = 0,
155 : tupsize;
156 132 : int startTuple = 0;
157 :
158 132 : Assert(ntuples > 0);
159 :
160 : /*
161 : * Split tuples into pages
162 : */
163 524 : for (i = 0; i < ntuples; i++)
164 : {
165 392 : if (curBuffer == InvalidBuffer)
166 : {
167 132 : curBuffer = GinNewBuffer(index);
168 :
169 132 : if (prevBuffer != InvalidBuffer)
170 : {
171 0 : res->nPendingPages++;
172 0 : writeListPage(index, prevBuffer,
173 0 : tuples + startTuple,
174 : i - startTuple,
175 : BufferGetBlockNumber(curBuffer));
176 : }
177 : else
178 : {
179 132 : res->head = BufferGetBlockNumber(curBuffer);
180 : }
181 :
182 132 : prevBuffer = curBuffer;
183 132 : startTuple = i;
184 132 : size = 0;
185 : }
186 :
187 392 : tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
188 :
189 392 : if (size + tupsize > GinListPageSize)
190 : {
191 : /* won't fit, force a new page and reprocess */
192 0 : i--;
193 0 : curBuffer = InvalidBuffer;
194 : }
195 : else
196 : {
197 392 : size += tupsize;
198 : }
199 : }
200 :
201 : /*
202 : * Write last page
203 : */
204 132 : res->tail = BufferGetBlockNumber(curBuffer);
205 264 : res->tailFreeSize = writeListPage(index, curBuffer,
206 132 : tuples + startTuple,
207 : ntuples - startTuple,
208 : InvalidBlockNumber);
209 132 : res->nPendingPages++;
210 : /* that was only one heap tuple */
211 132 : res->nPendingHeapTuples = 1;
212 132 : }
213 :
214 : /*
215 : * Write the index tuples contained in *collector into the index's
216 : * pending list.
217 : *
218 : * Function guarantees that all these tuples will be inserted consecutively,
219 : * preserving order
220 : */
221 : void
222 22006 : ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
223 : {
224 22006 : Relation index = ginstate->index;
225 : Buffer metabuffer;
226 : Page metapage;
227 22006 : GinMetaPageData *metadata = NULL;
228 22006 : Buffer buffer = InvalidBuffer;
229 22006 : Page page = NULL;
230 : ginxlogUpdateMeta data;
231 22006 : bool separateList = false;
232 22006 : bool needCleanup = false;
233 : int cleanupSize;
234 : bool needWal;
235 :
236 22006 : if (collector->ntuples == 0)
237 22006 : return;
238 :
239 22006 : needWal = RelationNeedsWAL(index);
240 :
241 22006 : data.node = index->rd_node;
242 22006 : data.ntuples = 0;
243 22006 : data.newRightlink = data.prevTail = InvalidBlockNumber;
244 :
245 22006 : metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
246 22006 : metapage = BufferGetPage(metabuffer);
247 :
248 22006 : if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
249 : {
250 : /*
251 : * Total size is greater than one page => make sublist
252 : */
253 0 : separateList = true;
254 : }
255 : else
256 : {
257 22006 : LockBuffer(metabuffer, GIN_EXCLUSIVE);
258 22006 : metadata = GinPageGetMeta(metapage);
259 :
260 44008 : if (metadata->head == InvalidBlockNumber ||
261 22002 : collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
262 : {
263 : /*
264 : * Pending list is empty or total size is greater than freespace
265 : * on tail page => make sublist
266 : *
267 : * We unlock metabuffer to keep high concurrency
268 : */
269 132 : separateList = true;
270 132 : LockBuffer(metabuffer, GIN_UNLOCK);
271 : }
272 : }
273 :
274 22006 : if (separateList)
275 : {
276 : /*
277 : * We should make sublist separately and append it to the tail
278 : */
279 : GinMetaPageData sublist;
280 :
281 132 : memset(&sublist, 0, sizeof(GinMetaPageData));
282 132 : makeSublist(index, collector->tuples, collector->ntuples, &sublist);
283 :
284 132 : if (needWal)
285 131 : XLogBeginInsert();
286 :
287 : /*
288 : * metapage was unlocked, see above
289 : */
290 132 : LockBuffer(metabuffer, GIN_EXCLUSIVE);
291 132 : metadata = GinPageGetMeta(metapage);
292 :
293 132 : if (metadata->head == InvalidBlockNumber)
294 : {
295 : /*
296 : * Main list is empty, so just insert sublist as main list
297 : */
298 4 : START_CRIT_SECTION();
299 :
300 4 : metadata->head = sublist.head;
301 4 : metadata->tail = sublist.tail;
302 4 : metadata->tailFreeSize = sublist.tailFreeSize;
303 :
304 4 : metadata->nPendingPages = sublist.nPendingPages;
305 4 : metadata->nPendingHeapTuples = sublist.nPendingHeapTuples;
306 : }
307 : else
308 : {
309 : /*
310 : * Merge lists
311 : */
312 128 : data.prevTail = metadata->tail;
313 128 : data.newRightlink = sublist.head;
314 :
315 128 : buffer = ReadBuffer(index, metadata->tail);
316 128 : LockBuffer(buffer, GIN_EXCLUSIVE);
317 128 : page = BufferGetPage(buffer);
318 :
319 128 : Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
320 :
321 128 : START_CRIT_SECTION();
322 :
323 128 : GinPageGetOpaque(page)->rightlink = sublist.head;
324 :
325 128 : MarkBufferDirty(buffer);
326 :
327 128 : metadata->tail = sublist.tail;
328 128 : metadata->tailFreeSize = sublist.tailFreeSize;
329 :
330 128 : metadata->nPendingPages += sublist.nPendingPages;
331 128 : metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
332 :
333 128 : if (needWal)
334 128 : XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
335 : }
336 : }
337 : else
338 : {
339 : /*
340 : * Insert into tail page. Metapage is already locked
341 : */
342 : OffsetNumber l,
343 : off;
344 : int i,
345 : tupsize;
346 : char *ptr;
347 : char *collectordata;
348 :
349 21874 : buffer = ReadBuffer(index, metadata->tail);
350 21874 : LockBuffer(buffer, GIN_EXCLUSIVE);
351 21874 : page = BufferGetPage(buffer);
352 :
353 65622 : off = (PageIsEmpty(page)) ? FirstOffsetNumber :
354 43748 : OffsetNumberNext(PageGetMaxOffsetNumber(page));
355 :
356 21874 : collectordata = ptr = (char *) palloc(collector->sumsize);
357 :
358 21874 : data.ntuples = collector->ntuples;
359 :
360 21874 : if (needWal)
361 21873 : XLogBeginInsert();
362 :
363 21874 : START_CRIT_SECTION();
364 :
365 : /*
366 : * Increase counter of heap tuples
367 : */
368 21874 : Assert(GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples);
369 21874 : GinPageGetOpaque(page)->maxoff++;
370 21874 : metadata->nPendingHeapTuples++;
371 :
372 87487 : for (i = 0; i < collector->ntuples; i++)
373 : {
374 65613 : tupsize = IndexTupleSize(collector->tuples[i]);
375 65613 : l = PageAddItem(page, (Item) collector->tuples[i], tupsize, off, false, false);
376 :
377 65613 : if (l == InvalidOffsetNumber)
378 0 : elog(ERROR, "failed to add item to index page in \"%s\"",
379 : RelationGetRelationName(index));
380 :
381 65613 : memcpy(ptr, collector->tuples[i], tupsize);
382 65613 : ptr += tupsize;
383 :
384 65613 : off++;
385 : }
386 :
387 21874 : Assert((ptr - collectordata) <= collector->sumsize);
388 21874 : if (needWal)
389 : {
390 21873 : XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
391 21873 : XLogRegisterBufData(1, collectordata, collector->sumsize);
392 : }
393 :
394 21874 : metadata->tailFreeSize = PageGetExactFreeSpace(page);
395 :
396 21874 : MarkBufferDirty(buffer);
397 : }
398 :
399 : /*
400 : * Write metabuffer, make xlog entry
401 : */
402 22006 : MarkBufferDirty(metabuffer);
403 :
404 22006 : if (needWal)
405 : {
406 : XLogRecPtr recptr;
407 :
408 22004 : memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
409 :
410 22004 : XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT);
411 22004 : XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
412 :
413 22004 : recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
414 22004 : PageSetLSN(metapage, recptr);
415 :
416 22004 : if (buffer != InvalidBuffer)
417 : {
418 22001 : PageSetLSN(page, recptr);
419 : }
420 : }
421 :
422 22006 : if (buffer != InvalidBuffer)
423 22002 : UnlockReleaseBuffer(buffer);
424 :
425 : /*
426 : * Force pending list cleanup when it becomes too long. And,
427 : * ginInsertCleanup could take significant amount of time, so we prefer to
428 : * call it when it can do all the work in a single collection cycle. In
429 : * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it
430 : * while pending list is still small enough to fit into
431 : * gin_pending_list_limit.
432 : *
433 : * ginInsertCleanup() should not be called inside our CRIT_SECTION.
434 : */
435 22006 : cleanupSize = GinGetPendingListCleanupSize(index);
436 22006 : if (metadata->nPendingPages * GIN_PAGE_FREESIZE > cleanupSize * 1024L)
437 0 : needCleanup = true;
438 :
439 22006 : UnlockReleaseBuffer(metabuffer);
440 :
441 22006 : END_CRIT_SECTION();
442 :
443 22006 : if (needCleanup)
444 0 : ginInsertCleanup(ginstate, false, true, NULL);
445 : }
446 :
447 : /*
448 : * Create temporary index tuples for a single indexable item (one index column
449 : * for the heap tuple specified by ht_ctid), and append them to the array
450 : * in *collector. They will subsequently be written out using
451 : * ginHeapTupleFastInsert. Note that to guarantee consistent state, all
452 : * temp tuples for a given heap tuple must be written in one call to
453 : * ginHeapTupleFastInsert.
454 : */
455 : void
456 22006 : ginHeapTupleFastCollect(GinState *ginstate,
457 : GinTupleCollector *collector,
458 : OffsetNumber attnum, Datum value, bool isNull,
459 : ItemPointer ht_ctid)
460 : {
461 : Datum *entries;
462 : GinNullCategory *categories;
463 : int32 i,
464 : nentries;
465 :
466 : /*
467 : * Extract the key values that need to be inserted in the index
468 : */
469 22006 : entries = ginExtractEntries(ginstate, attnum, value, isNull,
470 : &nentries, &categories);
471 :
472 : /*
473 : * Allocate/reallocate memory for storing collected tuples
474 : */
475 22006 : if (collector->tuples == NULL)
476 : {
477 22006 : collector->lentuples = nentries * ginstate->origTupdesc->natts;
478 22006 : collector->tuples = (IndexTuple *) palloc(sizeof(IndexTuple) * collector->lentuples);
479 : }
480 :
481 44012 : while (collector->ntuples + nentries > collector->lentuples)
482 : {
483 0 : collector->lentuples *= 2;
484 0 : collector->tuples = (IndexTuple *) repalloc(collector->tuples,
485 0 : sizeof(IndexTuple) * collector->lentuples);
486 : }
487 :
488 : /*
489 : * Build an index tuple for each key value, and add to array. In pending
490 : * tuples we just stick the heap TID into t_tid.
491 : */
492 88011 : for (i = 0; i < nentries; i++)
493 : {
494 : IndexTuple itup;
495 :
496 66005 : itup = GinFormTuple(ginstate, attnum, entries[i], categories[i],
497 : NULL, 0, 0, true);
498 66005 : itup->t_tid = *ht_ctid;
499 66005 : collector->tuples[collector->ntuples++] = itup;
500 66005 : collector->sumsize += IndexTupleSize(itup);
501 : }
502 22006 : }
503 :
504 : /*
505 : * Deletes pending list pages up to (not including) newHead page.
506 : * If newHead == InvalidBlockNumber then function drops the whole list.
507 : *
508 : * metapage is pinned and exclusive-locked throughout this function.
509 : */
510 : static void
511 2 : shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
512 : bool fill_fsm, IndexBulkDeleteResult *stats)
513 : {
514 : Page metapage;
515 : GinMetaPageData *metadata;
516 : BlockNumber blknoToDelete;
517 :
518 2 : metapage = BufferGetPage(metabuffer);
519 2 : metadata = GinPageGetMeta(metapage);
520 2 : blknoToDelete = metadata->head;
521 :
522 : do
523 : {
524 : Page page;
525 : int i;
526 9 : int64 nDeletedHeapTuples = 0;
527 : ginxlogDeleteListPages data;
528 : Buffer buffers[GIN_NDELETE_AT_ONCE];
529 : BlockNumber freespace[GIN_NDELETE_AT_ONCE];
530 :
531 9 : data.ndeleted = 0;
532 148 : while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
533 : {
534 130 : freespace[data.ndeleted] = blknoToDelete;
535 130 : buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
536 130 : LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
537 130 : page = BufferGetPage(buffers[data.ndeleted]);
538 :
539 130 : data.ndeleted++;
540 :
541 130 : Assert(!GinPageIsDeleted(page));
542 :
543 130 : nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
544 130 : blknoToDelete = GinPageGetOpaque(page)->rightlink;
545 : }
546 :
547 9 : if (stats)
548 9 : stats->pages_deleted += data.ndeleted;
549 :
550 : /*
551 : * This operation touches an unusually large number of pages, so
552 : * prepare the XLogInsert machinery for that before entering the
553 : * critical section.
554 : */
555 9 : if (RelationNeedsWAL(index))
556 9 : XLogEnsureRecordSpace(data.ndeleted, 0);
557 :
558 9 : START_CRIT_SECTION();
559 :
560 9 : metadata->head = blknoToDelete;
561 :
562 9 : Assert(metadata->nPendingPages >= data.ndeleted);
563 9 : metadata->nPendingPages -= data.ndeleted;
564 9 : Assert(metadata->nPendingHeapTuples >= nDeletedHeapTuples);
565 9 : metadata->nPendingHeapTuples -= nDeletedHeapTuples;
566 :
567 9 : if (blknoToDelete == InvalidBlockNumber)
568 : {
569 2 : metadata->tail = InvalidBlockNumber;
570 2 : metadata->tailFreeSize = 0;
571 2 : metadata->nPendingPages = 0;
572 2 : metadata->nPendingHeapTuples = 0;
573 : }
574 :
575 9 : MarkBufferDirty(metabuffer);
576 :
577 139 : for (i = 0; i < data.ndeleted; i++)
578 : {
579 130 : page = BufferGetPage(buffers[i]);
580 130 : GinPageGetOpaque(page)->flags = GIN_DELETED;
581 130 : MarkBufferDirty(buffers[i]);
582 : }
583 :
584 9 : if (RelationNeedsWAL(index))
585 : {
586 : XLogRecPtr recptr;
587 :
588 9 : XLogBeginInsert();
589 9 : XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT);
590 139 : for (i = 0; i < data.ndeleted; i++)
591 130 : XLogRegisterBuffer(i + 1, buffers[i], REGBUF_WILL_INIT);
592 :
593 9 : memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
594 :
595 9 : XLogRegisterData((char *) &data,
596 : sizeof(ginxlogDeleteListPages));
597 :
598 9 : recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE);
599 9 : PageSetLSN(metapage, recptr);
600 :
601 139 : for (i = 0; i < data.ndeleted; i++)
602 : {
603 130 : page = BufferGetPage(buffers[i]);
604 130 : PageSetLSN(page, recptr);
605 : }
606 : }
607 :
608 139 : for (i = 0; i < data.ndeleted; i++)
609 130 : UnlockReleaseBuffer(buffers[i]);
610 :
611 9 : END_CRIT_SECTION();
612 :
613 133 : for (i = 0; fill_fsm && i < data.ndeleted; i++)
614 124 : RecordFreeIndexPage(index, freespace[i]);
615 :
616 9 : } while (blknoToDelete != newHead);
617 2 : }
618 :
619 : /* Initialize empty KeyArray */
620 : static void
621 2 : initKeyArray(KeyArray *keys, int32 maxvalues)
622 : {
623 2 : keys->keys = (Datum *) palloc(sizeof(Datum) * maxvalues);
624 2 : keys->categories = (GinNullCategory *)
625 2 : palloc(sizeof(GinNullCategory) * maxvalues);
626 2 : keys->nvalues = 0;
627 2 : keys->maxvalues = maxvalues;
628 2 : }
629 :
630 : /* Add datum to KeyArray, resizing if needed */
631 : static void
632 65994 : addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
633 : {
634 65994 : if (keys->nvalues >= keys->maxvalues)
635 : {
636 0 : keys->maxvalues *= 2;
637 0 : keys->keys = (Datum *)
638 0 : repalloc(keys->keys, sizeof(Datum) * keys->maxvalues);
639 0 : keys->categories = (GinNullCategory *)
640 0 : repalloc(keys->categories, sizeof(GinNullCategory) * keys->maxvalues);
641 : }
642 :
643 65994 : keys->keys[keys->nvalues] = datum;
644 65994 : keys->categories[keys->nvalues] = category;
645 65994 : keys->nvalues++;
646 65994 : }
647 :
648 : /*
649 : * Collect data from a pending-list page in preparation for insertion into
650 : * the main index.
651 : *
652 : * Go through all tuples >= startoff on page and collect values in accum
653 : *
654 : * Note that ka is just workspace --- it does not carry any state across
655 : * calls.
656 : */
657 : static void
658 130 : processPendingPage(BuildAccumulator *accum, KeyArray *ka,
659 : Page page, OffsetNumber startoff)
660 : {
661 : ItemPointerData heapptr;
662 : OffsetNumber i,
663 : maxoff;
664 : OffsetNumber attrnum;
665 :
666 : /* reset *ka to empty */
667 130 : ka->nvalues = 0;
668 :
669 130 : maxoff = PageGetMaxOffsetNumber(page);
670 130 : Assert(maxoff >= FirstOffsetNumber);
671 130 : ItemPointerSetInvalid(&heapptr);
672 130 : attrnum = 0;
673 :
674 66124 : for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
675 : {
676 65994 : IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
677 : OffsetNumber curattnum;
678 : Datum curkey;
679 : GinNullCategory curcategory;
680 :
681 : /* Check for change of heap TID or attnum */
682 65994 : curattnum = gintuple_get_attrnum(accum->ginstate, itup);
683 :
684 65994 : if (!ItemPointerIsValid(&heapptr))
685 : {
686 130 : heapptr = itup->t_tid;
687 130 : attrnum = curattnum;
688 : }
689 65864 : else if (!(ItemPointerEquals(&heapptr, &itup->t_tid) &&
690 : curattnum == attrnum))
691 : {
692 : /*
693 : * ginInsertBAEntries can insert several datums per call, but only
694 : * for one heap tuple and one column. So call it at a boundary,
695 : * and reset ka.
696 : */
697 21870 : ginInsertBAEntries(accum, &heapptr, attrnum,
698 : ka->keys, ka->categories, ka->nvalues);
699 21870 : ka->nvalues = 0;
700 21870 : heapptr = itup->t_tid;
701 21870 : attrnum = curattnum;
702 : }
703 :
704 : /* Add key to KeyArray */
705 65994 : curkey = gintuple_get_key(accum->ginstate, itup, &curcategory);
706 65994 : addDatum(ka, curkey, curcategory);
707 : }
708 :
709 : /* Dump out all remaining keys */
710 130 : ginInsertBAEntries(accum, &heapptr, attrnum,
711 : ka->keys, ka->categories, ka->nvalues);
712 130 : }
713 :
714 : /*
715 : * Move tuples from pending pages into regular GIN structure.
716 : *
717 : * On first glance it looks completely not crash-safe. But if we crash
718 : * after posting entries to the main index and before removing them from the
719 : * pending list, it's okay because when we redo the posting later on, nothing
720 : * bad will happen.
721 : *
722 : * fill_fsm indicates that ginInsertCleanup should add deleted pages
723 : * to FSM otherwise caller is responsible to put deleted pages into
724 : * FSM.
725 : *
726 : * If stats isn't null, we count deleted pending pages into the counts.
727 : */
728 : void
729 7 : ginInsertCleanup(GinState *ginstate, bool full_clean,
730 : bool fill_fsm, IndexBulkDeleteResult *stats)
731 : {
732 7 : Relation index = ginstate->index;
733 : Buffer metabuffer,
734 : buffer;
735 : Page metapage,
736 : page;
737 : GinMetaPageData *metadata;
738 : MemoryContext opCtx,
739 : oldCtx;
740 : BuildAccumulator accum;
741 : KeyArray datums;
742 : BlockNumber blkno,
743 : blknoFinish;
744 7 : bool cleanupFinish = false;
745 7 : bool fsm_vac = false;
746 : Size workMemory;
747 7 : bool inVacuum = (stats == NULL);
748 :
749 : /*
750 : * We would like to prevent concurrent cleanup process. For that we will
751 : * lock metapage in exclusive mode using LockPage() call. Nobody other
752 : * will use that lock for metapage, so we keep possibility of concurrent
753 : * insertion into pending list
754 : */
755 :
756 7 : if (inVacuum)
757 : {
758 : /*
759 : * We are called from [auto]vacuum/analyze or gin_clean_pending_list()
760 : * and we would like to wait concurrent cleanup to finish.
761 : */
762 0 : LockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
763 0 : workMemory =
764 0 : (IsAutoVacuumWorkerProcess() && autovacuum_work_mem != -1) ?
765 : autovacuum_work_mem : maintenance_work_mem;
766 : }
767 : else
768 : {
769 : /*
770 : * We are called from regular insert and if we see concurrent cleanup
771 : * just exit in hope that concurrent process will clean up pending
772 : * list.
773 : */
774 7 : if (!ConditionalLockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock))
775 5 : return;
776 7 : workMemory = work_mem;
777 : }
778 :
779 7 : metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
780 7 : LockBuffer(metabuffer, GIN_SHARE);
781 7 : metapage = BufferGetPage(metabuffer);
782 7 : metadata = GinPageGetMeta(metapage);
783 :
784 7 : if (metadata->head == InvalidBlockNumber)
785 : {
786 : /* Nothing to do */
787 5 : UnlockReleaseBuffer(metabuffer);
788 5 : UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
789 5 : return;
790 : }
791 :
792 : /*
793 : * Remember a tail page to prevent infinite cleanup if other backends add
794 : * new tuples faster than we can cleanup.
795 : */
796 2 : blknoFinish = metadata->tail;
797 :
798 : /*
799 : * Read and lock head of pending list
800 : */
801 2 : blkno = metadata->head;
802 2 : buffer = ReadBuffer(index, blkno);
803 2 : LockBuffer(buffer, GIN_SHARE);
804 2 : page = BufferGetPage(buffer);
805 :
806 2 : LockBuffer(metabuffer, GIN_UNLOCK);
807 :
808 : /*
809 : * Initialize. All temporary space will be in opCtx
810 : */
811 2 : opCtx = AllocSetContextCreate(CurrentMemoryContext,
812 : "GIN insert cleanup temporary context",
813 : ALLOCSET_DEFAULT_SIZES);
814 :
815 2 : oldCtx = MemoryContextSwitchTo(opCtx);
816 :
817 2 : initKeyArray(&datums, 128);
818 2 : ginInitBA(&accum);
819 2 : accum.ginstate = ginstate;
820 :
821 : /*
822 : * At the top of this loop, we have pin and lock on the current page of
823 : * the pending list. However, we'll release that before exiting the loop.
824 : * Note we also have pin but not lock on the metapage.
825 : */
826 : for (;;)
827 : {
828 130 : Assert(!GinPageIsDeleted(page));
829 :
830 : /*
831 : * Are we walk through the page which as we remember was a tail when
832 : * we start our cleanup? But if caller asks us to clean up whole
833 : * pending list then ignore old tail, we will work until list becomes
834 : * empty.
835 : */
836 130 : if (blkno == blknoFinish && full_clean == false)
837 0 : cleanupFinish = true;
838 :
839 : /*
840 : * read page's datums into accum
841 : */
842 130 : processPendingPage(&accum, &datums, page, FirstOffsetNumber);
843 :
844 130 : vacuum_delay_point();
845 :
846 : /*
847 : * Is it time to flush memory to disk? Flush if we are at the end of
848 : * the pending list, or if we have a full row and memory is getting
849 : * full.
850 : */
851 258 : if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
852 256 : (GinPageHasFullRow(page) &&
853 128 : (accum.allocatedMemory >= workMemory * 1024L)))
854 0 : {
855 : ItemPointerData *list;
856 : uint32 nlist;
857 : Datum key;
858 : GinNullCategory category;
859 : OffsetNumber maxoff,
860 : attnum;
861 :
862 : /*
863 : * Unlock current page to increase performance. Changes of page
864 : * will be checked later by comparing maxoff after completion of
865 : * memory flush.
866 : */
867 2 : maxoff = PageGetMaxOffsetNumber(page);
868 2 : LockBuffer(buffer, GIN_UNLOCK);
869 :
870 : /*
871 : * Moving collected data into regular structure can take
872 : * significant amount of time - so, run it without locking pending
873 : * list.
874 : */
875 2 : ginBeginBAScan(&accum);
876 21004 : while ((list = ginGetBAEntry(&accum,
877 : &attnum, &key, &category, &nlist)) != NULL)
878 : {
879 21000 : ginEntryInsert(ginstate, attnum, key, category,
880 : list, nlist, NULL);
881 21000 : vacuum_delay_point();
882 : }
883 :
884 : /*
885 : * Lock the whole list to remove pages
886 : */
887 2 : LockBuffer(metabuffer, GIN_EXCLUSIVE);
888 2 : LockBuffer(buffer, GIN_SHARE);
889 :
890 2 : Assert(!GinPageIsDeleted(page));
891 :
892 : /*
893 : * While we left the page unlocked, more stuff might have gotten
894 : * added to it. If so, process those entries immediately. There
895 : * shouldn't be very many, so we don't worry about the fact that
896 : * we're doing this with exclusive lock. Insertion algorithm
897 : * guarantees that inserted row(s) will not continue on next page.
898 : * NOTE: intentionally no vacuum_delay_point in this loop.
899 : */
900 2 : if (PageGetMaxOffsetNumber(page) != maxoff)
901 : {
902 0 : ginInitBA(&accum);
903 0 : processPendingPage(&accum, &datums, page, maxoff + 1);
904 :
905 0 : ginBeginBAScan(&accum);
906 0 : while ((list = ginGetBAEntry(&accum,
907 : &attnum, &key, &category, &nlist)) != NULL)
908 0 : ginEntryInsert(ginstate, attnum, key, category,
909 : list, nlist, NULL);
910 : }
911 :
912 : /*
913 : * Remember next page - it will become the new list head
914 : */
915 2 : blkno = GinPageGetOpaque(page)->rightlink;
916 2 : UnlockReleaseBuffer(buffer); /* shiftList will do exclusive
917 : * locking */
918 :
919 : /*
920 : * remove read pages from pending list, at this point all content
921 : * of read pages is in regular structure
922 : */
923 2 : shiftList(index, metabuffer, blkno, fill_fsm, stats);
924 :
925 : /* At this point, some pending pages have been freed up */
926 2 : fsm_vac = true;
927 :
928 2 : Assert(blkno == metadata->head);
929 2 : LockBuffer(metabuffer, GIN_UNLOCK);
930 :
931 : /*
932 : * if we removed the whole pending list or we cleanup tail (which
933 : * we remembered on start our cleanup process) then just exit
934 : */
935 2 : if (blkno == InvalidBlockNumber || cleanupFinish)
936 : break;
937 :
938 : /*
939 : * release memory used so far and reinit state
940 : */
941 0 : MemoryContextReset(opCtx);
942 0 : initKeyArray(&datums, datums.maxvalues);
943 0 : ginInitBA(&accum);
944 : }
945 : else
946 : {
947 128 : blkno = GinPageGetOpaque(page)->rightlink;
948 128 : UnlockReleaseBuffer(buffer);
949 : }
950 :
951 : /*
952 : * Read next page in pending list
953 : */
954 128 : vacuum_delay_point();
955 128 : buffer = ReadBuffer(index, blkno);
956 128 : LockBuffer(buffer, GIN_SHARE);
957 128 : page = BufferGetPage(buffer);
958 128 : }
959 :
960 2 : UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
961 2 : ReleaseBuffer(metabuffer);
962 :
963 : /*
964 : * As pending list pages can have a high churn rate, it is desirable to
965 : * recycle them immediately to the FreeSpace Map when ordinary backends
966 : * clean the list.
967 : */
968 4 : if (fsm_vac && fill_fsm)
969 1 : IndexFreeSpaceMapVacuum(index);
970 :
971 :
972 : /* Clean up temporary space */
973 2 : MemoryContextSwitchTo(oldCtx);
974 2 : MemoryContextDelete(opCtx);
975 : }
976 :
977 : /*
978 : * SQL-callable function to clean the insert pending list
979 : */
980 : Datum
981 2 : gin_clean_pending_list(PG_FUNCTION_ARGS)
982 : {
983 2 : Oid indexoid = PG_GETARG_OID(0);
984 2 : Relation indexRel = index_open(indexoid, AccessShareLock);
985 : IndexBulkDeleteResult stats;
986 : GinState ginstate;
987 :
988 2 : if (RecoveryInProgress())
989 0 : ereport(ERROR,
990 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
991 : errmsg("recovery is in progress"),
992 : errhint("GIN pending list cannot be cleaned up during recovery.")));
993 :
994 : /* Must be a GIN index */
995 4 : if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
996 2 : indexRel->rd_rel->relam != GIN_AM_OID)
997 0 : ereport(ERROR,
998 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
999 : errmsg("\"%s\" is not a GIN index",
1000 : RelationGetRelationName(indexRel))));
1001 :
1002 : /*
1003 : * Reject attempts to read non-local temporary relations; we would be
1004 : * likely to get wrong data since we have no visibility into the owning
1005 : * session's local buffers.
1006 : */
1007 2 : if (RELATION_IS_OTHER_TEMP(indexRel))
1008 0 : ereport(ERROR,
1009 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1010 : errmsg("cannot access temporary indexes of other sessions")));
1011 :
1012 : /* User must own the index (comparable to privileges needed for VACUUM) */
1013 2 : if (!pg_class_ownercheck(indexoid, GetUserId()))
1014 0 : aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
1015 0 : RelationGetRelationName(indexRel));
1016 :
1017 2 : memset(&stats, 0, sizeof(stats));
1018 2 : initGinState(&ginstate, indexRel);
1019 2 : ginInsertCleanup(&ginstate, true, true, &stats);
1020 :
1021 2 : index_close(indexRel, AccessShareLock);
1022 :
1023 2 : PG_RETURN_INT64((int64) stats.pages_deleted);
1024 : }
|