]> granicus.if.org Git - postgresql/blob - src/backend/access/gin/ginfast.c
Modify BufferGetPage() to prepare for "snapshot too old" feature
[postgresql] / src / backend / access / gin / ginfast.c
1 /*-------------------------------------------------------------------------
2  *
3  * ginfast.c
4  *        Fast insert routines for the Postgres inverted index access method.
5  *        Pending entries are stored in linear list of pages.  Later on
6  *        (typically during VACUUM), ginInsertCleanup() will be invoked to
7  *        transfer pending entries into the regular index structure.  This
8  *        wins because bulk insertion is much more efficient than retail.
9  *
10  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
11  * Portions Copyright (c) 1994, Regents of the University of California
12  *
13  * IDENTIFICATION
14  *                      src/backend/access/gin/ginfast.c
15  *
16  *-------------------------------------------------------------------------
17  */
18
19 #include "postgres.h"
20
21 #include "access/gin_private.h"
22 #include "access/xloginsert.h"
23 #include "access/xlog.h"
24 #include "commands/vacuum.h"
25 #include "catalog/pg_am.h"
26 #include "miscadmin.h"
27 #include "utils/memutils.h"
28 #include "utils/rel.h"
29 #include "utils/acl.h"
30 #include "storage/indexfsm.h"
31
32 /* GUC parameter */
33 int                     gin_pending_list_limit = 0;
34
35 #define GIN_PAGE_FREESIZE \
36         ( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) )
37
38 typedef struct KeyArray
39 {
40         Datum      *keys;                       /* expansible array */
41         GinNullCategory *categories;    /* another expansible array */
42         int32           nvalues;                /* current number of valid entries */
43         int32           maxvalues;              /* allocated size of arrays */
44 } KeyArray;
45
46
47 /*
48  * Build a pending-list page from the given array of tuples, and write it out.
49  *
50  * Returns amount of free space left on the page.
51  */
52 static int32
53 writeListPage(Relation index, Buffer buffer,
54                           IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
55 {
56         Page            page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
57         int32           i,
58                                 freesize,
59                                 size = 0;
60         OffsetNumber l,
61                                 off;
62         char       *workspace;
63         char       *ptr;
64
65         /* workspace could be a local array; we use palloc for alignment */
66         workspace = palloc(BLCKSZ);
67
68         START_CRIT_SECTION();
69
70         GinInitBuffer(buffer, GIN_LIST);
71
72         off = FirstOffsetNumber;
73         ptr = workspace;
74
75         for (i = 0; i < ntuples; i++)
76         {
77                 int                     this_size = IndexTupleSize(tuples[i]);
78
79                 memcpy(ptr, tuples[i], this_size);
80                 ptr += this_size;
81                 size += this_size;
82
83                 l = PageAddItem(page, (Item) tuples[i], this_size, off, false, false);
84
85                 if (l == InvalidOffsetNumber)
86                         elog(ERROR, "failed to add item to index page in \"%s\"",
87                                  RelationGetRelationName(index));
88
89                 off++;
90         }
91
92         Assert(size <= BLCKSZ);         /* else we overran workspace */
93
94         GinPageGetOpaque(page)->rightlink = rightlink;
95
96         /*
97          * tail page may contain only whole row(s) or final part of row placed on
98          * previous pages (a "row" here meaning all the index tuples generated for
99          * one heap tuple)
100          */
101         if (rightlink == InvalidBlockNumber)
102         {
103                 GinPageSetFullRow(page);
104                 GinPageGetOpaque(page)->maxoff = 1;
105         }
106         else
107         {
108                 GinPageGetOpaque(page)->maxoff = 0;
109         }
110
111         MarkBufferDirty(buffer);
112
113         if (RelationNeedsWAL(index))
114         {
115                 ginxlogInsertListPage data;
116                 XLogRecPtr      recptr;
117
118                 data.rightlink = rightlink;
119                 data.ntuples = ntuples;
120
121                 XLogBeginInsert();
122                 XLogRegisterData((char *) &data, sizeof(ginxlogInsertListPage));
123
124                 XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
125                 XLogRegisterBufData(0, workspace, size);
126
127                 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE);
128                 PageSetLSN(page, recptr);
129         }
130
131         /* get free space before releasing buffer */
132         freesize = PageGetExactFreeSpace(page);
133
134         UnlockReleaseBuffer(buffer);
135
136         END_CRIT_SECTION();
137
138         pfree(workspace);
139
140         return freesize;
141 }
142
143 static void
144 makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
145                         GinMetaPageData *res)
146 {
147         Buffer          curBuffer = InvalidBuffer;
148         Buffer          prevBuffer = InvalidBuffer;
149         int                     i,
150                                 size = 0,
151                                 tupsize;
152         int                     startTuple = 0;
153
154         Assert(ntuples > 0);
155
156         /*
157          * Split tuples into pages
158          */
159         for (i = 0; i < ntuples; i++)
160         {
161                 if (curBuffer == InvalidBuffer)
162                 {
163                         curBuffer = GinNewBuffer(index);
164
165                         if (prevBuffer != InvalidBuffer)
166                         {
167                                 res->nPendingPages++;
168                                 writeListPage(index, prevBuffer,
169                                                           tuples + startTuple,
170                                                           i - startTuple,
171                                                           BufferGetBlockNumber(curBuffer));
172                         }
173                         else
174                         {
175                                 res->head = BufferGetBlockNumber(curBuffer);
176                         }
177
178                         prevBuffer = curBuffer;
179                         startTuple = i;
180                         size = 0;
181                 }
182
183                 tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
184
185                 if (size + tupsize > GinListPageSize)
186                 {
187                         /* won't fit, force a new page and reprocess */
188                         i--;
189                         curBuffer = InvalidBuffer;
190                 }
191                 else
192                 {
193                         size += tupsize;
194                 }
195         }
196
197         /*
198          * Write last page
199          */
200         res->tail = BufferGetBlockNumber(curBuffer);
201         res->tailFreeSize = writeListPage(index, curBuffer,
202                                                                           tuples + startTuple,
203                                                                           ntuples - startTuple,
204                                                                           InvalidBlockNumber);
205         res->nPendingPages++;
206         /* that was only one heap tuple */
207         res->nPendingHeapTuples = 1;
208 }
209
210 /*
211  * Write the index tuples contained in *collector into the index's
212  * pending list.
213  *
214  * Function guarantees that all these tuples will be inserted consecutively,
215  * preserving order
216  */
217 void
218 ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
219 {
220         Relation        index = ginstate->index;
221         Buffer          metabuffer;
222         Page            metapage;
223         GinMetaPageData *metadata = NULL;
224         Buffer          buffer = InvalidBuffer;
225         Page            page = NULL;
226         ginxlogUpdateMeta data;
227         bool            separateList = false;
228         bool            needCleanup = false;
229         int                     cleanupSize;
230         bool            needWal;
231
232         if (collector->ntuples == 0)
233                 return;
234
235         needWal = RelationNeedsWAL(index);
236
237         data.node = index->rd_node;
238         data.ntuples = 0;
239         data.newRightlink = data.prevTail = InvalidBlockNumber;
240
241         metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
242         metapage = BufferGetPage(metabuffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
243
244         if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
245         {
246                 /*
247                  * Total size is greater than one page => make sublist
248                  */
249                 separateList = true;
250         }
251         else
252         {
253                 LockBuffer(metabuffer, GIN_EXCLUSIVE);
254                 metadata = GinPageGetMeta(metapage);
255
256                 if (metadata->head == InvalidBlockNumber ||
257                         collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
258                 {
259                         /*
260                          * Pending list is empty or total size is greater than freespace
261                          * on tail page => make sublist
262                          *
263                          * We unlock metabuffer to keep high concurrency
264                          */
265                         separateList = true;
266                         LockBuffer(metabuffer, GIN_UNLOCK);
267                 }
268         }
269
270         if (separateList)
271         {
272                 /*
273                  * We should make sublist separately and append it to the tail
274                  */
275                 GinMetaPageData sublist;
276
277                 memset(&sublist, 0, sizeof(GinMetaPageData));
278                 makeSublist(index, collector->tuples, collector->ntuples, &sublist);
279
280                 if (needWal)
281                         XLogBeginInsert();
282
283                 /*
284                  * metapage was unlocked, see above
285                  */
286                 LockBuffer(metabuffer, GIN_EXCLUSIVE);
287                 metadata = GinPageGetMeta(metapage);
288
289                 if (metadata->head == InvalidBlockNumber)
290                 {
291                         /*
292                          * Main list is empty, so just insert sublist as main list
293                          */
294                         START_CRIT_SECTION();
295
296                         metadata->head = sublist.head;
297                         metadata->tail = sublist.tail;
298                         metadata->tailFreeSize = sublist.tailFreeSize;
299
300                         metadata->nPendingPages = sublist.nPendingPages;
301                         metadata->nPendingHeapTuples = sublist.nPendingHeapTuples;
302                 }
303                 else
304                 {
305                         /*
306                          * Merge lists
307                          */
308                         data.prevTail = metadata->tail;
309                         data.newRightlink = sublist.head;
310
311                         buffer = ReadBuffer(index, metadata->tail);
312                         LockBuffer(buffer, GIN_EXCLUSIVE);
313                         page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
314
315                         Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
316
317                         START_CRIT_SECTION();
318
319                         GinPageGetOpaque(page)->rightlink = sublist.head;
320
321                         MarkBufferDirty(buffer);
322
323                         metadata->tail = sublist.tail;
324                         metadata->tailFreeSize = sublist.tailFreeSize;
325
326                         metadata->nPendingPages += sublist.nPendingPages;
327                         metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
328
329                         if (needWal)
330                                 XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
331                 }
332         }
333         else
334         {
335                 /*
336                  * Insert into tail page.  Metapage is already locked
337                  */
338                 OffsetNumber l,
339                                         off;
340                 int                     i,
341                                         tupsize;
342                 char       *ptr;
343                 char       *collectordata;
344
345                 buffer = ReadBuffer(index, metadata->tail);
346                 LockBuffer(buffer, GIN_EXCLUSIVE);
347                 page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
348
349                 off = (PageIsEmpty(page)) ? FirstOffsetNumber :
350                         OffsetNumberNext(PageGetMaxOffsetNumber(page));
351
352                 collectordata = ptr = (char *) palloc(collector->sumsize);
353
354                 data.ntuples = collector->ntuples;
355
356                 if (needWal)
357                         XLogBeginInsert();
358
359                 START_CRIT_SECTION();
360
361                 /*
362                  * Increase counter of heap tuples
363                  */
364                 Assert(GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples);
365                 GinPageGetOpaque(page)->maxoff++;
366                 metadata->nPendingHeapTuples++;
367
368                 for (i = 0; i < collector->ntuples; i++)
369                 {
370                         tupsize = IndexTupleSize(collector->tuples[i]);
371                         l = PageAddItem(page, (Item) collector->tuples[i], tupsize, off, false, false);
372
373                         if (l == InvalidOffsetNumber)
374                                 elog(ERROR, "failed to add item to index page in \"%s\"",
375                                          RelationGetRelationName(index));
376
377                         memcpy(ptr, collector->tuples[i], tupsize);
378                         ptr += tupsize;
379
380                         off++;
381                 }
382
383                 Assert((ptr - collectordata) <= collector->sumsize);
384                 if (needWal)
385                 {
386                         XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
387                         XLogRegisterBufData(1, collectordata, collector->sumsize);
388                 }
389
390                 metadata->tailFreeSize = PageGetExactFreeSpace(page);
391
392                 MarkBufferDirty(buffer);
393         }
394
395         /*
396          * Write metabuffer, make xlog entry
397          */
398         MarkBufferDirty(metabuffer);
399
400         if (needWal)
401         {
402                 XLogRecPtr      recptr;
403
404                 memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
405
406                 XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT);
407                 XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
408
409                 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
410                 PageSetLSN(metapage, recptr);
411
412                 if (buffer != InvalidBuffer)
413                 {
414                         PageSetLSN(page, recptr);
415                 }
416         }
417
418         if (buffer != InvalidBuffer)
419                 UnlockReleaseBuffer(buffer);
420
421         /*
422          * Force pending list cleanup when it becomes too long. And,
423          * ginInsertCleanup could take significant amount of time, so we prefer to
424          * call it when it can do all the work in a single collection cycle. In
425          * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it
426          * while pending list is still small enough to fit into
427          * gin_pending_list_limit.
428          *
429          * ginInsertCleanup() should not be called inside our CRIT_SECTION.
430          */
431         cleanupSize = GinGetPendingListCleanupSize(index);
432         if (metadata->nPendingPages * GIN_PAGE_FREESIZE > cleanupSize * 1024L)
433                 needCleanup = true;
434
435         UnlockReleaseBuffer(metabuffer);
436
437         END_CRIT_SECTION();
438
439         if (needCleanup)
440                 ginInsertCleanup(ginstate, true, NULL);
441 }
442
443 /*
444  * Create temporary index tuples for a single indexable item (one index column
445  * for the heap tuple specified by ht_ctid), and append them to the array
446  * in *collector.  They will subsequently be written out using
447  * ginHeapTupleFastInsert.  Note that to guarantee consistent state, all
448  * temp tuples for a given heap tuple must be written in one call to
449  * ginHeapTupleFastInsert.
450  */
451 void
452 ginHeapTupleFastCollect(GinState *ginstate,
453                                                 GinTupleCollector *collector,
454                                                 OffsetNumber attnum, Datum value, bool isNull,
455                                                 ItemPointer ht_ctid)
456 {
457         Datum      *entries;
458         GinNullCategory *categories;
459         int32           i,
460                                 nentries;
461
462         /*
463          * Extract the key values that need to be inserted in the index
464          */
465         entries = ginExtractEntries(ginstate, attnum, value, isNull,
466                                                                 &nentries, &categories);
467
468         /*
469          * Allocate/reallocate memory for storing collected tuples
470          */
471         if (collector->tuples == NULL)
472         {
473                 collector->lentuples = nentries * ginstate->origTupdesc->natts;
474                 collector->tuples = (IndexTuple *) palloc(sizeof(IndexTuple) * collector->lentuples);
475         }
476
477         while (collector->ntuples + nentries > collector->lentuples)
478         {
479                 collector->lentuples *= 2;
480                 collector->tuples = (IndexTuple *) repalloc(collector->tuples,
481                                                                   sizeof(IndexTuple) * collector->lentuples);
482         }
483
484         /*
485          * Build an index tuple for each key value, and add to array.  In pending
486          * tuples we just stick the heap TID into t_tid.
487          */
488         for (i = 0; i < nentries; i++)
489         {
490                 IndexTuple      itup;
491
492                 itup = GinFormTuple(ginstate, attnum, entries[i], categories[i],
493                                                         NULL, 0, 0, true);
494                 itup->t_tid = *ht_ctid;
495                 collector->tuples[collector->ntuples++] = itup;
496                 collector->sumsize += IndexTupleSize(itup);
497         }
498 }
499
500 /*
501  * Deletes pending list pages up to (not including) newHead page.
502  * If newHead == InvalidBlockNumber then function drops the whole list.
503  *
504  * metapage is pinned and exclusive-locked throughout this function.
505  *
506  * Returns true if another cleanup process is running concurrently
507  * (if so, we can just abandon our own efforts)
508  */
509 static bool
510 shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
511                   bool fill_fsm, IndexBulkDeleteResult *stats)
512 {
513         Page            metapage;
514         GinMetaPageData *metadata;
515         BlockNumber blknoToDelete;
516
517         metapage = BufferGetPage(metabuffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
518         metadata = GinPageGetMeta(metapage);
519         blknoToDelete = metadata->head;
520
521         do
522         {
523                 Page            page;
524                 int                     i;
525                 int64           nDeletedHeapTuples = 0;
526                 ginxlogDeleteListPages data;
527                 Buffer          buffers[GIN_NDELETE_AT_ONCE];
528                 BlockNumber     freespace[GIN_NDELETE_AT_ONCE];
529
530                 data.ndeleted = 0;
531                 while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
532                 {
533                         freespace[data.ndeleted] = blknoToDelete;
534                         buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
535                         LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
536                         page = BufferGetPage(buffers[data.ndeleted], NULL, NULL, BGP_NO_SNAPSHOT_TEST);
537
538                         data.ndeleted++;
539
540                         if (GinPageIsDeleted(page))
541                         {
542                                 /* concurrent cleanup process is detected */
543                                 for (i = 0; i < data.ndeleted; i++)
544                                         UnlockReleaseBuffer(buffers[i]);
545
546                                 return true;
547                         }
548
549                         nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
550                         blknoToDelete = GinPageGetOpaque(page)->rightlink;
551                 }
552
553                 if (stats)
554                         stats->pages_deleted += data.ndeleted;
555
556                 /*
557                  * This operation touches an unusually large number of pages, so
558                  * prepare the XLogInsert machinery for that before entering the
559                  * critical section.
560                  */
561                 if (RelationNeedsWAL(index))
562                         XLogEnsureRecordSpace(data.ndeleted, 0);
563
564                 START_CRIT_SECTION();
565
566                 metadata->head = blknoToDelete;
567
568                 Assert(metadata->nPendingPages >= data.ndeleted);
569                 metadata->nPendingPages -= data.ndeleted;
570                 Assert(metadata->nPendingHeapTuples >= nDeletedHeapTuples);
571                 metadata->nPendingHeapTuples -= nDeletedHeapTuples;
572
573                 if (blknoToDelete == InvalidBlockNumber)
574                 {
575                         metadata->tail = InvalidBlockNumber;
576                         metadata->tailFreeSize = 0;
577                         metadata->nPendingPages = 0;
578                         metadata->nPendingHeapTuples = 0;
579                 }
580
581                 MarkBufferDirty(metabuffer);
582
583                 for (i = 0; i < data.ndeleted; i++)
584                 {
585                         page = BufferGetPage(buffers[i], NULL, NULL, BGP_NO_SNAPSHOT_TEST);
586                         GinPageGetOpaque(page)->flags = GIN_DELETED;
587                         MarkBufferDirty(buffers[i]);
588                 }
589
590                 if (RelationNeedsWAL(index))
591                 {
592                         XLogRecPtr      recptr;
593
594                         XLogBeginInsert();
595                         XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT);
596                         for (i = 0; i < data.ndeleted; i++)
597                                 XLogRegisterBuffer(i + 1, buffers[i], REGBUF_WILL_INIT);
598
599                         memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
600
601                         XLogRegisterData((char *) &data,
602                                                          sizeof(ginxlogDeleteListPages));
603
604                         recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE);
605                         PageSetLSN(metapage, recptr);
606
607                         for (i = 0; i < data.ndeleted; i++)
608                         {
609                                 page = BufferGetPage(buffers[i], NULL, NULL, BGP_NO_SNAPSHOT_TEST);
610                                 PageSetLSN(page, recptr);
611                         }
612                 }
613
614                 for (i = 0; i < data.ndeleted; i++)
615                         UnlockReleaseBuffer(buffers[i]);
616
617                 END_CRIT_SECTION();
618
619                 for (i = 0; fill_fsm && i < data.ndeleted; i++)
620                         RecordFreeIndexPage(index, freespace[i]);
621
622         } while (blknoToDelete != newHead);
623
624         return false;
625 }
626
627 /* Initialize empty KeyArray */
628 static void
629 initKeyArray(KeyArray *keys, int32 maxvalues)
630 {
631         keys->keys = (Datum *) palloc(sizeof(Datum) * maxvalues);
632         keys->categories = (GinNullCategory *)
633                 palloc(sizeof(GinNullCategory) * maxvalues);
634         keys->nvalues = 0;
635         keys->maxvalues = maxvalues;
636 }
637
638 /* Add datum to KeyArray, resizing if needed */
639 static void
640 addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
641 {
642         if (keys->nvalues >= keys->maxvalues)
643         {
644                 keys->maxvalues *= 2;
645                 keys->keys = (Datum *)
646                         repalloc(keys->keys, sizeof(Datum) * keys->maxvalues);
647                 keys->categories = (GinNullCategory *)
648                         repalloc(keys->categories, sizeof(GinNullCategory) * keys->maxvalues);
649         }
650
651         keys->keys[keys->nvalues] = datum;
652         keys->categories[keys->nvalues] = category;
653         keys->nvalues++;
654 }
655
656 /*
657  * Collect data from a pending-list page in preparation for insertion into
658  * the main index.
659  *
660  * Go through all tuples >= startoff on page and collect values in accum
661  *
662  * Note that ka is just workspace --- it does not carry any state across
663  * calls.
664  */
665 static void
666 processPendingPage(BuildAccumulator *accum, KeyArray *ka,
667                                    Page page, OffsetNumber startoff)
668 {
669         ItemPointerData heapptr;
670         OffsetNumber i,
671                                 maxoff;
672         OffsetNumber attrnum;
673
674         /* reset *ka to empty */
675         ka->nvalues = 0;
676
677         maxoff = PageGetMaxOffsetNumber(page);
678         Assert(maxoff >= FirstOffsetNumber);
679         ItemPointerSetInvalid(&heapptr);
680         attrnum = 0;
681
682         for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
683         {
684                 IndexTuple      itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
685                 OffsetNumber curattnum;
686                 Datum           curkey;
687                 GinNullCategory curcategory;
688
689                 /* Check for change of heap TID or attnum */
690                 curattnum = gintuple_get_attrnum(accum->ginstate, itup);
691
692                 if (!ItemPointerIsValid(&heapptr))
693                 {
694                         heapptr = itup->t_tid;
695                         attrnum = curattnum;
696                 }
697                 else if (!(ItemPointerEquals(&heapptr, &itup->t_tid) &&
698                                    curattnum == attrnum))
699                 {
700                         /*
701                          * ginInsertBAEntries can insert several datums per call, but only
702                          * for one heap tuple and one column.  So call it at a boundary,
703                          * and reset ka.
704                          */
705                         ginInsertBAEntries(accum, &heapptr, attrnum,
706                                                            ka->keys, ka->categories, ka->nvalues);
707                         ka->nvalues = 0;
708                         heapptr = itup->t_tid;
709                         attrnum = curattnum;
710                 }
711
712                 /* Add key to KeyArray */
713                 curkey = gintuple_get_key(accum->ginstate, itup, &curcategory);
714                 addDatum(ka, curkey, curcategory);
715         }
716
717         /* Dump out all remaining keys */
718         ginInsertBAEntries(accum, &heapptr, attrnum,
719                                            ka->keys, ka->categories, ka->nvalues);
720 }
721
722 /*
723  * Move tuples from pending pages into regular GIN structure.
724  *
725  * This can be called concurrently by multiple backends, so it must cope.
726  * On first glance it looks completely not concurrent-safe and not crash-safe
727  * either.  The reason it's okay is that multiple insertion of the same entry
728  * is detected and treated as a no-op by gininsert.c.  If we crash after
729  * posting entries to the main index and before removing them from the
730  * pending list, it's okay because when we redo the posting later on, nothing
731  * bad will happen.  Likewise, if two backends simultaneously try to post
732  * a pending entry into the main index, one will succeed and one will do
733  * nothing.  We try to notice when someone else is a little bit ahead of
734  * us in the process, but that's just to avoid wasting cycles.  Only the
735  * action of removing a page from the pending list really needs exclusive
736  * lock.
737  *
738  * fill_fsm indicates that ginInsertCleanup should add deleted pages
739  * to FSM otherwise caller is responsible to put deleted pages into
740  * FSM.
741  *
742  * If stats isn't null, we count deleted pending pages into the counts.
743  */
744 void
745 ginInsertCleanup(GinState *ginstate,
746                                  bool fill_fsm, IndexBulkDeleteResult *stats)
747 {
748         Relation        index = ginstate->index;
749         Buffer          metabuffer,
750                                 buffer;
751         Page            metapage,
752                                 page;
753         GinMetaPageData *metadata;
754         MemoryContext opCtx,
755                                 oldCtx;
756         BuildAccumulator accum;
757         KeyArray        datums;
758         BlockNumber blkno;
759         bool            fsm_vac = false;
760
761         metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
762         LockBuffer(metabuffer, GIN_SHARE);
763         metapage = BufferGetPage(metabuffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
764         metadata = GinPageGetMeta(metapage);
765
766         if (metadata->head == InvalidBlockNumber)
767         {
768                 /* Nothing to do */
769                 UnlockReleaseBuffer(metabuffer);
770                 return;
771         }
772
773         /*
774          * Read and lock head of pending list
775          */
776         blkno = metadata->head;
777         buffer = ReadBuffer(index, blkno);
778         LockBuffer(buffer, GIN_SHARE);
779         page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
780
781         LockBuffer(metabuffer, GIN_UNLOCK);
782
783         /*
784          * Initialize.  All temporary space will be in opCtx
785          */
786         opCtx = AllocSetContextCreate(CurrentMemoryContext,
787                                                                   "GIN insert cleanup temporary context",
788                                                                   ALLOCSET_DEFAULT_MINSIZE,
789                                                                   ALLOCSET_DEFAULT_INITSIZE,
790                                                                   ALLOCSET_DEFAULT_MAXSIZE);
791
792         oldCtx = MemoryContextSwitchTo(opCtx);
793
794         initKeyArray(&datums, 128);
795         ginInitBA(&accum);
796         accum.ginstate = ginstate;
797
798         /*
799          * At the top of this loop, we have pin and lock on the current page of
800          * the pending list.  However, we'll release that before exiting the loop.
801          * Note we also have pin but not lock on the metapage.
802          */
803         for (;;)
804         {
805                 if (GinPageIsDeleted(page))
806                 {
807                         /* another cleanup process is running concurrently */
808                         UnlockReleaseBuffer(buffer);
809                         fsm_vac = false;
810                         break;
811                 }
812
813                 /*
814                  * read page's datums into accum
815                  */
816                 processPendingPage(&accum, &datums, page, FirstOffsetNumber);
817
818                 vacuum_delay_point();
819
820                 /*
821                  * Is it time to flush memory to disk?  Flush if we are at the end of
822                  * the pending list, or if we have a full row and memory is getting
823                  * full.
824                  *
825                  * XXX using up maintenance_work_mem here is probably unreasonably
826                  * much, since vacuum might already be using that much.
827                  */
828                 if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
829                         (GinPageHasFullRow(page) &&
830                          (accum.allocatedMemory >= (Size)maintenance_work_mem * 1024L)))
831                 {
832                         ItemPointerData *list;
833                         uint32          nlist;
834                         Datum           key;
835                         GinNullCategory category;
836                         OffsetNumber maxoff,
837                                                 attnum;
838
839                         /*
840                          * Unlock current page to increase performance. Changes of page
841                          * will be checked later by comparing maxoff after completion of
842                          * memory flush.
843                          */
844                         maxoff = PageGetMaxOffsetNumber(page);
845                         LockBuffer(buffer, GIN_UNLOCK);
846
847                         /*
848                          * Moving collected data into regular structure can take
849                          * significant amount of time - so, run it without locking pending
850                          * list.
851                          */
852                         ginBeginBAScan(&accum);
853                         while ((list = ginGetBAEntry(&accum,
854                                                                   &attnum, &key, &category, &nlist)) != NULL)
855                         {
856                                 ginEntryInsert(ginstate, attnum, key, category,
857                                                            list, nlist, NULL);
858                                 vacuum_delay_point();
859                         }
860
861                         /*
862                          * Lock the whole list to remove pages
863                          */
864                         LockBuffer(metabuffer, GIN_EXCLUSIVE);
865                         LockBuffer(buffer, GIN_SHARE);
866
867                         if (GinPageIsDeleted(page))
868                         {
869                                 /* another cleanup process is running concurrently */
870                                 UnlockReleaseBuffer(buffer);
871                                 LockBuffer(metabuffer, GIN_UNLOCK);
872                                 fsm_vac = false;
873                                 break;
874                         }
875
876                         /*
877                          * While we left the page unlocked, more stuff might have gotten
878                          * added to it.  If so, process those entries immediately.  There
879                          * shouldn't be very many, so we don't worry about the fact that
880                          * we're doing this with exclusive lock. Insertion algorithm
881                          * guarantees that inserted row(s) will not continue on next page.
882                          * NOTE: intentionally no vacuum_delay_point in this loop.
883                          */
884                         if (PageGetMaxOffsetNumber(page) != maxoff)
885                         {
886                                 ginInitBA(&accum);
887                                 processPendingPage(&accum, &datums, page, maxoff + 1);
888
889                                 ginBeginBAScan(&accum);
890                                 while ((list = ginGetBAEntry(&accum,
891                                                                   &attnum, &key, &category, &nlist)) != NULL)
892                                         ginEntryInsert(ginstate, attnum, key, category,
893                                                                    list, nlist, NULL);
894                         }
895
896                         /*
897                          * Remember next page - it will become the new list head
898                          */
899                         blkno = GinPageGetOpaque(page)->rightlink;
900                         UnlockReleaseBuffer(buffer);            /* shiftList will do exclusive
901                                                                                                  * locking */
902
903                         /*
904                          * remove read pages from pending list, at this point all
905                          * content of read pages is in regular structure
906                          */
907                         if (shiftList(index, metabuffer, blkno, fill_fsm, stats))
908                         {
909                                 /* another cleanup process is running concurrently */
910                                 LockBuffer(metabuffer, GIN_UNLOCK);
911                                 fsm_vac = false;
912                                 break;
913                         }
914
915                         /* At this point, some pending pages have been freed up */
916                         fsm_vac = true;
917
918                         Assert(blkno == metadata->head);
919                         LockBuffer(metabuffer, GIN_UNLOCK);
920
921                         /*
922                          * if we removed the whole pending list just exit
923                          */
924                         if (blkno == InvalidBlockNumber)
925                                 break;
926
927                         /*
928                          * release memory used so far and reinit state
929                          */
930                         MemoryContextReset(opCtx);
931                         initKeyArray(&datums, datums.maxvalues);
932                         ginInitBA(&accum);
933                 }
934                 else
935                 {
936                         blkno = GinPageGetOpaque(page)->rightlink;
937                         UnlockReleaseBuffer(buffer);
938                 }
939
940                 /*
941                  * Read next page in pending list
942                  */
943                 vacuum_delay_point();
944                 buffer = ReadBuffer(index, blkno);
945                 LockBuffer(buffer, GIN_SHARE);
946                 page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
947         }
948
949         ReleaseBuffer(metabuffer);
950
951         /*
952          * As pending list pages can have a high churn rate, it is
953          * desirable to recycle them immediately to the FreeSpace Map when
954          * ordinary backends clean the list.
955          */
956         if (fsm_vac && fill_fsm)
957                 IndexFreeSpaceMapVacuum(index);
958
959
960         /* Clean up temporary space */
961         MemoryContextSwitchTo(oldCtx);
962         MemoryContextDelete(opCtx);
963 }
964
965 /*
966  * SQL-callable function to clean the insert pending list
967  */
968 Datum
969 gin_clean_pending_list(PG_FUNCTION_ARGS)
970 {
971         Oid                     indexoid = PG_GETARG_OID(0);
972         Relation        indexRel = index_open(indexoid, AccessShareLock);
973         IndexBulkDeleteResult stats;
974         GinState        ginstate;
975
976         if (RecoveryInProgress())
977                 ereport(ERROR,
978                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
979                                  errmsg("recovery is in progress"),
980                                  errhint("GIN pending list cannot be cleaned up during recovery.")));
981
982         /* Must be a GIN index */
983         if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
984                 indexRel->rd_rel->relam != GIN_AM_OID)
985                 ereport(ERROR,
986                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
987                                  errmsg("\"%s\" is not a GIN index",
988                                                 RelationGetRelationName(indexRel))));
989
990         /*
991          * Reject attempts to read non-local temporary relations; we would be
992          * likely to get wrong data since we have no visibility into the owning
993          * session's local buffers.
994          */
995         if (RELATION_IS_OTHER_TEMP(indexRel))
996                 ereport(ERROR,
997                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
998                            errmsg("cannot access temporary indexes of other sessions")));
999
1000         /* User must own the index (comparable to privileges needed for VACUUM) */
1001         if (!pg_class_ownercheck(indexoid, GetUserId()))
1002                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
1003                                            RelationGetRelationName(indexRel));
1004
1005         memset(&stats, 0, sizeof(stats));
1006         initGinState(&ginstate, indexRel);
1007         ginInsertCleanup(&ginstate, true, &stats);
1008
1009         index_close(indexRel, AccessShareLock);
1010
1011         PG_RETURN_INT64((int64) stats.pages_deleted);
1012 }