1 /*-------------------------------------------------------------------------
4 * Fast insert routines for the Postgres inverted index access method.
5 * Pending entries are stored in linear list of pages. Later on
6 * (typically during VACUUM), ginInsertCleanup() will be invoked to
7 * transfer pending entries into the regular index structure. This
8 * wins because bulk insertion is much more efficient than retail.
10 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
11 * Portions Copyright (c) 1994, Regents of the University of California
14 * src/backend/access/gin/ginfast.c
16 *-------------------------------------------------------------------------
21 #include "access/gin_private.h"
22 #include "access/xloginsert.h"
23 #include "access/xlog.h"
24 #include "commands/vacuum.h"
25 #include "catalog/pg_am.h"
26 #include "miscadmin.h"
27 #include "utils/memutils.h"
28 #include "utils/rel.h"
29 #include "utils/acl.h"
30 #include "storage/indexfsm.h"
33 int gin_pending_list_limit = 0;
35 #define GIN_PAGE_FREESIZE \
36 ( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) )
38 typedef struct KeyArray
40 Datum *keys; /* expansible array */
41 GinNullCategory *categories; /* another expansible array */
42 int32 nvalues; /* current number of valid entries */
43 int32 maxvalues; /* allocated size of arrays */
48 * Build a pending-list page from the given array of tuples, and write it out.
50 * Returns amount of free space left on the page.
53 writeListPage(Relation index, Buffer buffer,
54 IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
56 Page page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
65 /* workspace could be a local array; we use palloc for alignment */
66 workspace = palloc(BLCKSZ);
70 GinInitBuffer(buffer, GIN_LIST);
72 off = FirstOffsetNumber;
75 for (i = 0; i < ntuples; i++)
77 int this_size = IndexTupleSize(tuples[i]);
79 memcpy(ptr, tuples[i], this_size);
83 l = PageAddItem(page, (Item) tuples[i], this_size, off, false, false);
85 if (l == InvalidOffsetNumber)
86 elog(ERROR, "failed to add item to index page in \"%s\"",
87 RelationGetRelationName(index));
92 Assert(size <= BLCKSZ); /* else we overran workspace */
94 GinPageGetOpaque(page)->rightlink = rightlink;
97 * tail page may contain only whole row(s) or final part of row placed on
98 * previous pages (a "row" here meaning all the index tuples generated for
101 if (rightlink == InvalidBlockNumber)
103 GinPageSetFullRow(page);
104 GinPageGetOpaque(page)->maxoff = 1;
108 GinPageGetOpaque(page)->maxoff = 0;
111 MarkBufferDirty(buffer);
113 if (RelationNeedsWAL(index))
115 ginxlogInsertListPage data;
118 data.rightlink = rightlink;
119 data.ntuples = ntuples;
122 XLogRegisterData((char *) &data, sizeof(ginxlogInsertListPage));
124 XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
125 XLogRegisterBufData(0, workspace, size);
127 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE);
128 PageSetLSN(page, recptr);
131 /* get free space before releasing buffer */
132 freesize = PageGetExactFreeSpace(page);
134 UnlockReleaseBuffer(buffer);
144 makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
145 GinMetaPageData *res)
147 Buffer curBuffer = InvalidBuffer;
148 Buffer prevBuffer = InvalidBuffer;
157 * Split tuples into pages
159 for (i = 0; i < ntuples; i++)
161 if (curBuffer == InvalidBuffer)
163 curBuffer = GinNewBuffer(index);
165 if (prevBuffer != InvalidBuffer)
167 res->nPendingPages++;
168 writeListPage(index, prevBuffer,
171 BufferGetBlockNumber(curBuffer));
175 res->head = BufferGetBlockNumber(curBuffer);
178 prevBuffer = curBuffer;
183 tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
185 if (size + tupsize > GinListPageSize)
187 /* won't fit, force a new page and reprocess */
189 curBuffer = InvalidBuffer;
200 res->tail = BufferGetBlockNumber(curBuffer);
201 res->tailFreeSize = writeListPage(index, curBuffer,
203 ntuples - startTuple,
205 res->nPendingPages++;
206 /* that was only one heap tuple */
207 res->nPendingHeapTuples = 1;
211 * Write the index tuples contained in *collector into the index's
214 * Function guarantees that all these tuples will be inserted consecutively,
218 ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
220 Relation index = ginstate->index;
223 GinMetaPageData *metadata = NULL;
224 Buffer buffer = InvalidBuffer;
226 ginxlogUpdateMeta data;
227 bool separateList = false;
228 bool needCleanup = false;
232 if (collector->ntuples == 0)
235 needWal = RelationNeedsWAL(index);
237 data.node = index->rd_node;
239 data.newRightlink = data.prevTail = InvalidBlockNumber;
241 metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
242 metapage = BufferGetPage(metabuffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
244 if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
247 * Total size is greater than one page => make sublist
253 LockBuffer(metabuffer, GIN_EXCLUSIVE);
254 metadata = GinPageGetMeta(metapage);
256 if (metadata->head == InvalidBlockNumber ||
257 collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
260 * Pending list is empty or total size is greater than freespace
261 * on tail page => make sublist
263 * We unlock metabuffer to keep high concurrency
266 LockBuffer(metabuffer, GIN_UNLOCK);
273 * We should make sublist separately and append it to the tail
275 GinMetaPageData sublist;
277 memset(&sublist, 0, sizeof(GinMetaPageData));
278 makeSublist(index, collector->tuples, collector->ntuples, &sublist);
284 * metapage was unlocked, see above
286 LockBuffer(metabuffer, GIN_EXCLUSIVE);
287 metadata = GinPageGetMeta(metapage);
289 if (metadata->head == InvalidBlockNumber)
292 * Main list is empty, so just insert sublist as main list
294 START_CRIT_SECTION();
296 metadata->head = sublist.head;
297 metadata->tail = sublist.tail;
298 metadata->tailFreeSize = sublist.tailFreeSize;
300 metadata->nPendingPages = sublist.nPendingPages;
301 metadata->nPendingHeapTuples = sublist.nPendingHeapTuples;
308 data.prevTail = metadata->tail;
309 data.newRightlink = sublist.head;
311 buffer = ReadBuffer(index, metadata->tail);
312 LockBuffer(buffer, GIN_EXCLUSIVE);
313 page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
315 Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
317 START_CRIT_SECTION();
319 GinPageGetOpaque(page)->rightlink = sublist.head;
321 MarkBufferDirty(buffer);
323 metadata->tail = sublist.tail;
324 metadata->tailFreeSize = sublist.tailFreeSize;
326 metadata->nPendingPages += sublist.nPendingPages;
327 metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
330 XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
336 * Insert into tail page. Metapage is already locked
345 buffer = ReadBuffer(index, metadata->tail);
346 LockBuffer(buffer, GIN_EXCLUSIVE);
347 page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
349 off = (PageIsEmpty(page)) ? FirstOffsetNumber :
350 OffsetNumberNext(PageGetMaxOffsetNumber(page));
352 collectordata = ptr = (char *) palloc(collector->sumsize);
354 data.ntuples = collector->ntuples;
359 START_CRIT_SECTION();
362 * Increase counter of heap tuples
364 Assert(GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples);
365 GinPageGetOpaque(page)->maxoff++;
366 metadata->nPendingHeapTuples++;
368 for (i = 0; i < collector->ntuples; i++)
370 tupsize = IndexTupleSize(collector->tuples[i]);
371 l = PageAddItem(page, (Item) collector->tuples[i], tupsize, off, false, false);
373 if (l == InvalidOffsetNumber)
374 elog(ERROR, "failed to add item to index page in \"%s\"",
375 RelationGetRelationName(index));
377 memcpy(ptr, collector->tuples[i], tupsize);
383 Assert((ptr - collectordata) <= collector->sumsize);
386 XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
387 XLogRegisterBufData(1, collectordata, collector->sumsize);
390 metadata->tailFreeSize = PageGetExactFreeSpace(page);
392 MarkBufferDirty(buffer);
396 * Write metabuffer, make xlog entry
398 MarkBufferDirty(metabuffer);
404 memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
406 XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT);
407 XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
409 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
410 PageSetLSN(metapage, recptr);
412 if (buffer != InvalidBuffer)
414 PageSetLSN(page, recptr);
418 if (buffer != InvalidBuffer)
419 UnlockReleaseBuffer(buffer);
422 * Force pending list cleanup when it becomes too long. And,
423 * ginInsertCleanup could take significant amount of time, so we prefer to
424 * call it when it can do all the work in a single collection cycle. In
425 * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it
426 * while pending list is still small enough to fit into
427 * gin_pending_list_limit.
429 * ginInsertCleanup() should not be called inside our CRIT_SECTION.
431 cleanupSize = GinGetPendingListCleanupSize(index);
432 if (metadata->nPendingPages * GIN_PAGE_FREESIZE > cleanupSize * 1024L)
435 UnlockReleaseBuffer(metabuffer);
440 ginInsertCleanup(ginstate, true, NULL);
444 * Create temporary index tuples for a single indexable item (one index column
445 * for the heap tuple specified by ht_ctid), and append them to the array
446 * in *collector. They will subsequently be written out using
447 * ginHeapTupleFastInsert. Note that to guarantee consistent state, all
448 * temp tuples for a given heap tuple must be written in one call to
449 * ginHeapTupleFastInsert.
452 ginHeapTupleFastCollect(GinState *ginstate,
453 GinTupleCollector *collector,
454 OffsetNumber attnum, Datum value, bool isNull,
458 GinNullCategory *categories;
463 * Extract the key values that need to be inserted in the index
465 entries = ginExtractEntries(ginstate, attnum, value, isNull,
466 &nentries, &categories);
469 * Allocate/reallocate memory for storing collected tuples
471 if (collector->tuples == NULL)
473 collector->lentuples = nentries * ginstate->origTupdesc->natts;
474 collector->tuples = (IndexTuple *) palloc(sizeof(IndexTuple) * collector->lentuples);
477 while (collector->ntuples + nentries > collector->lentuples)
479 collector->lentuples *= 2;
480 collector->tuples = (IndexTuple *) repalloc(collector->tuples,
481 sizeof(IndexTuple) * collector->lentuples);
485 * Build an index tuple for each key value, and add to array. In pending
486 * tuples we just stick the heap TID into t_tid.
488 for (i = 0; i < nentries; i++)
492 itup = GinFormTuple(ginstate, attnum, entries[i], categories[i],
494 itup->t_tid = *ht_ctid;
495 collector->tuples[collector->ntuples++] = itup;
496 collector->sumsize += IndexTupleSize(itup);
501 * Deletes pending list pages up to (not including) newHead page.
502 * If newHead == InvalidBlockNumber then function drops the whole list.
504 * metapage is pinned and exclusive-locked throughout this function.
506 * Returns true if another cleanup process is running concurrently
507 * (if so, we can just abandon our own efforts)
510 shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
511 bool fill_fsm, IndexBulkDeleteResult *stats)
514 GinMetaPageData *metadata;
515 BlockNumber blknoToDelete;
517 metapage = BufferGetPage(metabuffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
518 metadata = GinPageGetMeta(metapage);
519 blknoToDelete = metadata->head;
525 int64 nDeletedHeapTuples = 0;
526 ginxlogDeleteListPages data;
527 Buffer buffers[GIN_NDELETE_AT_ONCE];
528 BlockNumber freespace[GIN_NDELETE_AT_ONCE];
531 while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
533 freespace[data.ndeleted] = blknoToDelete;
534 buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
535 LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
536 page = BufferGetPage(buffers[data.ndeleted], NULL, NULL, BGP_NO_SNAPSHOT_TEST);
540 if (GinPageIsDeleted(page))
542 /* concurrent cleanup process is detected */
543 for (i = 0; i < data.ndeleted; i++)
544 UnlockReleaseBuffer(buffers[i]);
549 nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
550 blknoToDelete = GinPageGetOpaque(page)->rightlink;
554 stats->pages_deleted += data.ndeleted;
557 * This operation touches an unusually large number of pages, so
558 * prepare the XLogInsert machinery for that before entering the
561 if (RelationNeedsWAL(index))
562 XLogEnsureRecordSpace(data.ndeleted, 0);
564 START_CRIT_SECTION();
566 metadata->head = blknoToDelete;
568 Assert(metadata->nPendingPages >= data.ndeleted);
569 metadata->nPendingPages -= data.ndeleted;
570 Assert(metadata->nPendingHeapTuples >= nDeletedHeapTuples);
571 metadata->nPendingHeapTuples -= nDeletedHeapTuples;
573 if (blknoToDelete == InvalidBlockNumber)
575 metadata->tail = InvalidBlockNumber;
576 metadata->tailFreeSize = 0;
577 metadata->nPendingPages = 0;
578 metadata->nPendingHeapTuples = 0;
581 MarkBufferDirty(metabuffer);
583 for (i = 0; i < data.ndeleted; i++)
585 page = BufferGetPage(buffers[i], NULL, NULL, BGP_NO_SNAPSHOT_TEST);
586 GinPageGetOpaque(page)->flags = GIN_DELETED;
587 MarkBufferDirty(buffers[i]);
590 if (RelationNeedsWAL(index))
595 XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT);
596 for (i = 0; i < data.ndeleted; i++)
597 XLogRegisterBuffer(i + 1, buffers[i], REGBUF_WILL_INIT);
599 memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
601 XLogRegisterData((char *) &data,
602 sizeof(ginxlogDeleteListPages));
604 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE);
605 PageSetLSN(metapage, recptr);
607 for (i = 0; i < data.ndeleted; i++)
609 page = BufferGetPage(buffers[i], NULL, NULL, BGP_NO_SNAPSHOT_TEST);
610 PageSetLSN(page, recptr);
614 for (i = 0; i < data.ndeleted; i++)
615 UnlockReleaseBuffer(buffers[i]);
619 for (i = 0; fill_fsm && i < data.ndeleted; i++)
620 RecordFreeIndexPage(index, freespace[i]);
622 } while (blknoToDelete != newHead);
627 /* Initialize empty KeyArray */
629 initKeyArray(KeyArray *keys, int32 maxvalues)
631 keys->keys = (Datum *) palloc(sizeof(Datum) * maxvalues);
632 keys->categories = (GinNullCategory *)
633 palloc(sizeof(GinNullCategory) * maxvalues);
635 keys->maxvalues = maxvalues;
638 /* Add datum to KeyArray, resizing if needed */
640 addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
642 if (keys->nvalues >= keys->maxvalues)
644 keys->maxvalues *= 2;
645 keys->keys = (Datum *)
646 repalloc(keys->keys, sizeof(Datum) * keys->maxvalues);
647 keys->categories = (GinNullCategory *)
648 repalloc(keys->categories, sizeof(GinNullCategory) * keys->maxvalues);
651 keys->keys[keys->nvalues] = datum;
652 keys->categories[keys->nvalues] = category;
657 * Collect data from a pending-list page in preparation for insertion into
660 * Go through all tuples >= startoff on page and collect values in accum
662 * Note that ka is just workspace --- it does not carry any state across
666 processPendingPage(BuildAccumulator *accum, KeyArray *ka,
667 Page page, OffsetNumber startoff)
669 ItemPointerData heapptr;
672 OffsetNumber attrnum;
674 /* reset *ka to empty */
677 maxoff = PageGetMaxOffsetNumber(page);
678 Assert(maxoff >= FirstOffsetNumber);
679 ItemPointerSetInvalid(&heapptr);
682 for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
684 IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
685 OffsetNumber curattnum;
687 GinNullCategory curcategory;
689 /* Check for change of heap TID or attnum */
690 curattnum = gintuple_get_attrnum(accum->ginstate, itup);
692 if (!ItemPointerIsValid(&heapptr))
694 heapptr = itup->t_tid;
697 else if (!(ItemPointerEquals(&heapptr, &itup->t_tid) &&
698 curattnum == attrnum))
701 * ginInsertBAEntries can insert several datums per call, but only
702 * for one heap tuple and one column. So call it at a boundary,
705 ginInsertBAEntries(accum, &heapptr, attrnum,
706 ka->keys, ka->categories, ka->nvalues);
708 heapptr = itup->t_tid;
712 /* Add key to KeyArray */
713 curkey = gintuple_get_key(accum->ginstate, itup, &curcategory);
714 addDatum(ka, curkey, curcategory);
717 /* Dump out all remaining keys */
718 ginInsertBAEntries(accum, &heapptr, attrnum,
719 ka->keys, ka->categories, ka->nvalues);
723 * Move tuples from pending pages into regular GIN structure.
725 * This can be called concurrently by multiple backends, so it must cope.
726 * On first glance it looks completely not concurrent-safe and not crash-safe
727 * either. The reason it's okay is that multiple insertion of the same entry
728 * is detected and treated as a no-op by gininsert.c. If we crash after
729 * posting entries to the main index and before removing them from the
730 * pending list, it's okay because when we redo the posting later on, nothing
731 * bad will happen. Likewise, if two backends simultaneously try to post
732 * a pending entry into the main index, one will succeed and one will do
733 * nothing. We try to notice when someone else is a little bit ahead of
734 * us in the process, but that's just to avoid wasting cycles. Only the
735 * action of removing a page from the pending list really needs exclusive
738 * fill_fsm indicates that ginInsertCleanup should add deleted pages
739 * to FSM otherwise caller is responsible to put deleted pages into
742 * If stats isn't null, we count deleted pending pages into the counts.
745 ginInsertCleanup(GinState *ginstate,
746 bool fill_fsm, IndexBulkDeleteResult *stats)
748 Relation index = ginstate->index;
753 GinMetaPageData *metadata;
756 BuildAccumulator accum;
759 bool fsm_vac = false;
761 metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
762 LockBuffer(metabuffer, GIN_SHARE);
763 metapage = BufferGetPage(metabuffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
764 metadata = GinPageGetMeta(metapage);
766 if (metadata->head == InvalidBlockNumber)
769 UnlockReleaseBuffer(metabuffer);
774 * Read and lock head of pending list
776 blkno = metadata->head;
777 buffer = ReadBuffer(index, blkno);
778 LockBuffer(buffer, GIN_SHARE);
779 page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
781 LockBuffer(metabuffer, GIN_UNLOCK);
784 * Initialize. All temporary space will be in opCtx
786 opCtx = AllocSetContextCreate(CurrentMemoryContext,
787 "GIN insert cleanup temporary context",
788 ALLOCSET_DEFAULT_MINSIZE,
789 ALLOCSET_DEFAULT_INITSIZE,
790 ALLOCSET_DEFAULT_MAXSIZE);
792 oldCtx = MemoryContextSwitchTo(opCtx);
794 initKeyArray(&datums, 128);
796 accum.ginstate = ginstate;
799 * At the top of this loop, we have pin and lock on the current page of
800 * the pending list. However, we'll release that before exiting the loop.
801 * Note we also have pin but not lock on the metapage.
805 if (GinPageIsDeleted(page))
807 /* another cleanup process is running concurrently */
808 UnlockReleaseBuffer(buffer);
814 * read page's datums into accum
816 processPendingPage(&accum, &datums, page, FirstOffsetNumber);
818 vacuum_delay_point();
821 * Is it time to flush memory to disk? Flush if we are at the end of
822 * the pending list, or if we have a full row and memory is getting
825 * XXX using up maintenance_work_mem here is probably unreasonably
826 * much, since vacuum might already be using that much.
828 if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
829 (GinPageHasFullRow(page) &&
830 (accum.allocatedMemory >= (Size)maintenance_work_mem * 1024L)))
832 ItemPointerData *list;
835 GinNullCategory category;
840 * Unlock current page to increase performance. Changes of page
841 * will be checked later by comparing maxoff after completion of
844 maxoff = PageGetMaxOffsetNumber(page);
845 LockBuffer(buffer, GIN_UNLOCK);
848 * Moving collected data into regular structure can take
849 * significant amount of time - so, run it without locking pending
852 ginBeginBAScan(&accum);
853 while ((list = ginGetBAEntry(&accum,
854 &attnum, &key, &category, &nlist)) != NULL)
856 ginEntryInsert(ginstate, attnum, key, category,
858 vacuum_delay_point();
862 * Lock the whole list to remove pages
864 LockBuffer(metabuffer, GIN_EXCLUSIVE);
865 LockBuffer(buffer, GIN_SHARE);
867 if (GinPageIsDeleted(page))
869 /* another cleanup process is running concurrently */
870 UnlockReleaseBuffer(buffer);
871 LockBuffer(metabuffer, GIN_UNLOCK);
877 * While we left the page unlocked, more stuff might have gotten
878 * added to it. If so, process those entries immediately. There
879 * shouldn't be very many, so we don't worry about the fact that
880 * we're doing this with exclusive lock. Insertion algorithm
881 * guarantees that inserted row(s) will not continue on next page.
882 * NOTE: intentionally no vacuum_delay_point in this loop.
884 if (PageGetMaxOffsetNumber(page) != maxoff)
887 processPendingPage(&accum, &datums, page, maxoff + 1);
889 ginBeginBAScan(&accum);
890 while ((list = ginGetBAEntry(&accum,
891 &attnum, &key, &category, &nlist)) != NULL)
892 ginEntryInsert(ginstate, attnum, key, category,
897 * Remember next page - it will become the new list head
899 blkno = GinPageGetOpaque(page)->rightlink;
900 UnlockReleaseBuffer(buffer); /* shiftList will do exclusive
904 * remove read pages from pending list, at this point all
905 * content of read pages is in regular structure
907 if (shiftList(index, metabuffer, blkno, fill_fsm, stats))
909 /* another cleanup process is running concurrently */
910 LockBuffer(metabuffer, GIN_UNLOCK);
915 /* At this point, some pending pages have been freed up */
918 Assert(blkno == metadata->head);
919 LockBuffer(metabuffer, GIN_UNLOCK);
922 * if we removed the whole pending list just exit
924 if (blkno == InvalidBlockNumber)
928 * release memory used so far and reinit state
930 MemoryContextReset(opCtx);
931 initKeyArray(&datums, datums.maxvalues);
936 blkno = GinPageGetOpaque(page)->rightlink;
937 UnlockReleaseBuffer(buffer);
941 * Read next page in pending list
943 vacuum_delay_point();
944 buffer = ReadBuffer(index, blkno);
945 LockBuffer(buffer, GIN_SHARE);
946 page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
949 ReleaseBuffer(metabuffer);
952 * As pending list pages can have a high churn rate, it is
953 * desirable to recycle them immediately to the FreeSpace Map when
954 * ordinary backends clean the list.
956 if (fsm_vac && fill_fsm)
957 IndexFreeSpaceMapVacuum(index);
960 /* Clean up temporary space */
961 MemoryContextSwitchTo(oldCtx);
962 MemoryContextDelete(opCtx);
966 * SQL-callable function to clean the insert pending list
969 gin_clean_pending_list(PG_FUNCTION_ARGS)
971 Oid indexoid = PG_GETARG_OID(0);
972 Relation indexRel = index_open(indexoid, AccessShareLock);
973 IndexBulkDeleteResult stats;
976 if (RecoveryInProgress())
978 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
979 errmsg("recovery is in progress"),
980 errhint("GIN pending list cannot be cleaned up during recovery.")));
982 /* Must be a GIN index */
983 if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
984 indexRel->rd_rel->relam != GIN_AM_OID)
986 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
987 errmsg("\"%s\" is not a GIN index",
988 RelationGetRelationName(indexRel))));
991 * Reject attempts to read non-local temporary relations; we would be
992 * likely to get wrong data since we have no visibility into the owning
993 * session's local buffers.
995 if (RELATION_IS_OTHER_TEMP(indexRel))
997 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
998 errmsg("cannot access temporary indexes of other sessions")));
1000 /* User must own the index (comparable to privileges needed for VACUUM) */
1001 if (!pg_class_ownercheck(indexoid, GetUserId()))
1002 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
1003 RelationGetRelationName(indexRel));
1005 memset(&stats, 0, sizeof(stats));
1006 initGinState(&ginstate, indexRel);
1007 ginInsertCleanup(&ginstate, true, &stats);
1009 index_close(indexRel, AccessShareLock);
1011 PG_RETURN_INT64((int64) stats.pages_deleted);