]> granicus.if.org Git - postgresql/blob - src/backend/access/gin/ginfast.c
Update copyright via script for 2017
[postgresql] / src / backend / access / gin / ginfast.c
1 /*-------------------------------------------------------------------------
2  *
3  * ginfast.c
4  *        Fast insert routines for the Postgres inverted index access method.
5  *        Pending entries are stored in linear list of pages.  Later on
6  *        (typically during VACUUM), ginInsertCleanup() will be invoked to
7  *        transfer pending entries into the regular index structure.  This
8  *        wins because bulk insertion is much more efficient than retail.
9  *
10  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
11  * Portions Copyright (c) 1994, Regents of the University of California
12  *
13  * IDENTIFICATION
14  *                      src/backend/access/gin/ginfast.c
15  *
16  *-------------------------------------------------------------------------
17  */
18
19 #include "postgres.h"
20
21 #include "access/gin_private.h"
22 #include "access/xloginsert.h"
23 #include "access/xlog.h"
24 #include "commands/vacuum.h"
25 #include "catalog/pg_am.h"
26 #include "miscadmin.h"
27 #include "utils/memutils.h"
28 #include "utils/rel.h"
29 #include "utils/acl.h"
30 #include "postmaster/autovacuum.h"
31 #include "storage/indexfsm.h"
32 #include "storage/lmgr.h"
33
34 /* GUC parameter */
35 int                     gin_pending_list_limit = 0;
36
37 #define GIN_PAGE_FREESIZE \
38         ( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) )
39
40 typedef struct KeyArray
41 {
42         Datum      *keys;                       /* expansible array */
43         GinNullCategory *categories;    /* another expansible array */
44         int32           nvalues;                /* current number of valid entries */
45         int32           maxvalues;              /* allocated size of arrays */
46 } KeyArray;
47
48
49 /*
50  * Build a pending-list page from the given array of tuples, and write it out.
51  *
52  * Returns amount of free space left on the page.
53  */
54 static int32
55 writeListPage(Relation index, Buffer buffer,
56                           IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
57 {
58         Page            page = BufferGetPage(buffer);
59         int32           i,
60                                 freesize,
61                                 size = 0;
62         OffsetNumber l,
63                                 off;
64         char       *workspace;
65         char       *ptr;
66
67         /* workspace could be a local array; we use palloc for alignment */
68         workspace = palloc(BLCKSZ);
69
70         START_CRIT_SECTION();
71
72         GinInitBuffer(buffer, GIN_LIST);
73
74         off = FirstOffsetNumber;
75         ptr = workspace;
76
77         for (i = 0; i < ntuples; i++)
78         {
79                 int                     this_size = IndexTupleSize(tuples[i]);
80
81                 memcpy(ptr, tuples[i], this_size);
82                 ptr += this_size;
83                 size += this_size;
84
85                 l = PageAddItem(page, (Item) tuples[i], this_size, off, false, false);
86
87                 if (l == InvalidOffsetNumber)
88                         elog(ERROR, "failed to add item to index page in \"%s\"",
89                                  RelationGetRelationName(index));
90
91                 off++;
92         }
93
94         Assert(size <= BLCKSZ);         /* else we overran workspace */
95
96         GinPageGetOpaque(page)->rightlink = rightlink;
97
98         /*
99          * tail page may contain only whole row(s) or final part of row placed on
100          * previous pages (a "row" here meaning all the index tuples generated for
101          * one heap tuple)
102          */
103         if (rightlink == InvalidBlockNumber)
104         {
105                 GinPageSetFullRow(page);
106                 GinPageGetOpaque(page)->maxoff = 1;
107         }
108         else
109         {
110                 GinPageGetOpaque(page)->maxoff = 0;
111         }
112
113         MarkBufferDirty(buffer);
114
115         if (RelationNeedsWAL(index))
116         {
117                 ginxlogInsertListPage data;
118                 XLogRecPtr      recptr;
119
120                 data.rightlink = rightlink;
121                 data.ntuples = ntuples;
122
123                 XLogBeginInsert();
124                 XLogRegisterData((char *) &data, sizeof(ginxlogInsertListPage));
125
126                 XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
127                 XLogRegisterBufData(0, workspace, size);
128
129                 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE);
130                 PageSetLSN(page, recptr);
131         }
132
133         /* get free space before releasing buffer */
134         freesize = PageGetExactFreeSpace(page);
135
136         UnlockReleaseBuffer(buffer);
137
138         END_CRIT_SECTION();
139
140         pfree(workspace);
141
142         return freesize;
143 }
144
145 static void
146 makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
147                         GinMetaPageData *res)
148 {
149         Buffer          curBuffer = InvalidBuffer;
150         Buffer          prevBuffer = InvalidBuffer;
151         int                     i,
152                                 size = 0,
153                                 tupsize;
154         int                     startTuple = 0;
155
156         Assert(ntuples > 0);
157
158         /*
159          * Split tuples into pages
160          */
161         for (i = 0; i < ntuples; i++)
162         {
163                 if (curBuffer == InvalidBuffer)
164                 {
165                         curBuffer = GinNewBuffer(index);
166
167                         if (prevBuffer != InvalidBuffer)
168                         {
169                                 res->nPendingPages++;
170                                 writeListPage(index, prevBuffer,
171                                                           tuples + startTuple,
172                                                           i - startTuple,
173                                                           BufferGetBlockNumber(curBuffer));
174                         }
175                         else
176                         {
177                                 res->head = BufferGetBlockNumber(curBuffer);
178                         }
179
180                         prevBuffer = curBuffer;
181                         startTuple = i;
182                         size = 0;
183                 }
184
185                 tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
186
187                 if (size + tupsize > GinListPageSize)
188                 {
189                         /* won't fit, force a new page and reprocess */
190                         i--;
191                         curBuffer = InvalidBuffer;
192                 }
193                 else
194                 {
195                         size += tupsize;
196                 }
197         }
198
199         /*
200          * Write last page
201          */
202         res->tail = BufferGetBlockNumber(curBuffer);
203         res->tailFreeSize = writeListPage(index, curBuffer,
204                                                                           tuples + startTuple,
205                                                                           ntuples - startTuple,
206                                                                           InvalidBlockNumber);
207         res->nPendingPages++;
208         /* that was only one heap tuple */
209         res->nPendingHeapTuples = 1;
210 }
211
212 /*
213  * Write the index tuples contained in *collector into the index's
214  * pending list.
215  *
216  * Function guarantees that all these tuples will be inserted consecutively,
217  * preserving order
218  */
219 void
220 ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
221 {
222         Relation        index = ginstate->index;
223         Buffer          metabuffer;
224         Page            metapage;
225         GinMetaPageData *metadata = NULL;
226         Buffer          buffer = InvalidBuffer;
227         Page            page = NULL;
228         ginxlogUpdateMeta data;
229         bool            separateList = false;
230         bool            needCleanup = false;
231         int                     cleanupSize;
232         bool            needWal;
233
234         if (collector->ntuples == 0)
235                 return;
236
237         needWal = RelationNeedsWAL(index);
238
239         data.node = index->rd_node;
240         data.ntuples = 0;
241         data.newRightlink = data.prevTail = InvalidBlockNumber;
242
243         metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
244         metapage = BufferGetPage(metabuffer);
245
246         if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
247         {
248                 /*
249                  * Total size is greater than one page => make sublist
250                  */
251                 separateList = true;
252         }
253         else
254         {
255                 LockBuffer(metabuffer, GIN_EXCLUSIVE);
256                 metadata = GinPageGetMeta(metapage);
257
258                 if (metadata->head == InvalidBlockNumber ||
259                         collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
260                 {
261                         /*
262                          * Pending list is empty or total size is greater than freespace
263                          * on tail page => make sublist
264                          *
265                          * We unlock metabuffer to keep high concurrency
266                          */
267                         separateList = true;
268                         LockBuffer(metabuffer, GIN_UNLOCK);
269                 }
270         }
271
272         if (separateList)
273         {
274                 /*
275                  * We should make sublist separately and append it to the tail
276                  */
277                 GinMetaPageData sublist;
278
279                 memset(&sublist, 0, sizeof(GinMetaPageData));
280                 makeSublist(index, collector->tuples, collector->ntuples, &sublist);
281
282                 if (needWal)
283                         XLogBeginInsert();
284
285                 /*
286                  * metapage was unlocked, see above
287                  */
288                 LockBuffer(metabuffer, GIN_EXCLUSIVE);
289                 metadata = GinPageGetMeta(metapage);
290
291                 if (metadata->head == InvalidBlockNumber)
292                 {
293                         /*
294                          * Main list is empty, so just insert sublist as main list
295                          */
296                         START_CRIT_SECTION();
297
298                         metadata->head = sublist.head;
299                         metadata->tail = sublist.tail;
300                         metadata->tailFreeSize = sublist.tailFreeSize;
301
302                         metadata->nPendingPages = sublist.nPendingPages;
303                         metadata->nPendingHeapTuples = sublist.nPendingHeapTuples;
304                 }
305                 else
306                 {
307                         /*
308                          * Merge lists
309                          */
310                         data.prevTail = metadata->tail;
311                         data.newRightlink = sublist.head;
312
313                         buffer = ReadBuffer(index, metadata->tail);
314                         LockBuffer(buffer, GIN_EXCLUSIVE);
315                         page = BufferGetPage(buffer);
316
317                         Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
318
319                         START_CRIT_SECTION();
320
321                         GinPageGetOpaque(page)->rightlink = sublist.head;
322
323                         MarkBufferDirty(buffer);
324
325                         metadata->tail = sublist.tail;
326                         metadata->tailFreeSize = sublist.tailFreeSize;
327
328                         metadata->nPendingPages += sublist.nPendingPages;
329                         metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
330
331                         if (needWal)
332                                 XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
333                 }
334         }
335         else
336         {
337                 /*
338                  * Insert into tail page.  Metapage is already locked
339                  */
340                 OffsetNumber l,
341                                         off;
342                 int                     i,
343                                         tupsize;
344                 char       *ptr;
345                 char       *collectordata;
346
347                 buffer = ReadBuffer(index, metadata->tail);
348                 LockBuffer(buffer, GIN_EXCLUSIVE);
349                 page = BufferGetPage(buffer);
350
351                 off = (PageIsEmpty(page)) ? FirstOffsetNumber :
352                         OffsetNumberNext(PageGetMaxOffsetNumber(page));
353
354                 collectordata = ptr = (char *) palloc(collector->sumsize);
355
356                 data.ntuples = collector->ntuples;
357
358                 if (needWal)
359                         XLogBeginInsert();
360
361                 START_CRIT_SECTION();
362
363                 /*
364                  * Increase counter of heap tuples
365                  */
366                 Assert(GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples);
367                 GinPageGetOpaque(page)->maxoff++;
368                 metadata->nPendingHeapTuples++;
369
370                 for (i = 0; i < collector->ntuples; i++)
371                 {
372                         tupsize = IndexTupleSize(collector->tuples[i]);
373                         l = PageAddItem(page, (Item) collector->tuples[i], tupsize, off, false, false);
374
375                         if (l == InvalidOffsetNumber)
376                                 elog(ERROR, "failed to add item to index page in \"%s\"",
377                                          RelationGetRelationName(index));
378
379                         memcpy(ptr, collector->tuples[i], tupsize);
380                         ptr += tupsize;
381
382                         off++;
383                 }
384
385                 Assert((ptr - collectordata) <= collector->sumsize);
386                 if (needWal)
387                 {
388                         XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
389                         XLogRegisterBufData(1, collectordata, collector->sumsize);
390                 }
391
392                 metadata->tailFreeSize = PageGetExactFreeSpace(page);
393
394                 MarkBufferDirty(buffer);
395         }
396
397         /*
398          * Write metabuffer, make xlog entry
399          */
400         MarkBufferDirty(metabuffer);
401
402         if (needWal)
403         {
404                 XLogRecPtr      recptr;
405
406                 memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
407
408                 XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT);
409                 XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
410
411                 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
412                 PageSetLSN(metapage, recptr);
413
414                 if (buffer != InvalidBuffer)
415                 {
416                         PageSetLSN(page, recptr);
417                 }
418         }
419
420         if (buffer != InvalidBuffer)
421                 UnlockReleaseBuffer(buffer);
422
423         /*
424          * Force pending list cleanup when it becomes too long. And,
425          * ginInsertCleanup could take significant amount of time, so we prefer to
426          * call it when it can do all the work in a single collection cycle. In
427          * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it
428          * while pending list is still small enough to fit into
429          * gin_pending_list_limit.
430          *
431          * ginInsertCleanup() should not be called inside our CRIT_SECTION.
432          */
433         cleanupSize = GinGetPendingListCleanupSize(index);
434         if (metadata->nPendingPages * GIN_PAGE_FREESIZE > cleanupSize * 1024L)
435                 needCleanup = true;
436
437         UnlockReleaseBuffer(metabuffer);
438
439         END_CRIT_SECTION();
440
441         if (needCleanup)
442                 ginInsertCleanup(ginstate, false, true, NULL);
443 }
444
445 /*
446  * Create temporary index tuples for a single indexable item (one index column
447  * for the heap tuple specified by ht_ctid), and append them to the array
448  * in *collector.  They will subsequently be written out using
449  * ginHeapTupleFastInsert.  Note that to guarantee consistent state, all
450  * temp tuples for a given heap tuple must be written in one call to
451  * ginHeapTupleFastInsert.
452  */
453 void
454 ginHeapTupleFastCollect(GinState *ginstate,
455                                                 GinTupleCollector *collector,
456                                                 OffsetNumber attnum, Datum value, bool isNull,
457                                                 ItemPointer ht_ctid)
458 {
459         Datum      *entries;
460         GinNullCategory *categories;
461         int32           i,
462                                 nentries;
463
464         /*
465          * Extract the key values that need to be inserted in the index
466          */
467         entries = ginExtractEntries(ginstate, attnum, value, isNull,
468                                                                 &nentries, &categories);
469
470         /*
471          * Allocate/reallocate memory for storing collected tuples
472          */
473         if (collector->tuples == NULL)
474         {
475                 collector->lentuples = nentries * ginstate->origTupdesc->natts;
476                 collector->tuples = (IndexTuple *) palloc(sizeof(IndexTuple) * collector->lentuples);
477         }
478
479         while (collector->ntuples + nentries > collector->lentuples)
480         {
481                 collector->lentuples *= 2;
482                 collector->tuples = (IndexTuple *) repalloc(collector->tuples,
483                                                                   sizeof(IndexTuple) * collector->lentuples);
484         }
485
486         /*
487          * Build an index tuple for each key value, and add to array.  In pending
488          * tuples we just stick the heap TID into t_tid.
489          */
490         for (i = 0; i < nentries; i++)
491         {
492                 IndexTuple      itup;
493
494                 itup = GinFormTuple(ginstate, attnum, entries[i], categories[i],
495                                                         NULL, 0, 0, true);
496                 itup->t_tid = *ht_ctid;
497                 collector->tuples[collector->ntuples++] = itup;
498                 collector->sumsize += IndexTupleSize(itup);
499         }
500 }
501
502 /*
503  * Deletes pending list pages up to (not including) newHead page.
504  * If newHead == InvalidBlockNumber then function drops the whole list.
505  *
506  * metapage is pinned and exclusive-locked throughout this function.
507  */
508 static void
509 shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
510                   bool fill_fsm, IndexBulkDeleteResult *stats)
511 {
512         Page            metapage;
513         GinMetaPageData *metadata;
514         BlockNumber blknoToDelete;
515
516         metapage = BufferGetPage(metabuffer);
517         metadata = GinPageGetMeta(metapage);
518         blknoToDelete = metadata->head;
519
520         do
521         {
522                 Page            page;
523                 int                     i;
524                 int64           nDeletedHeapTuples = 0;
525                 ginxlogDeleteListPages data;
526                 Buffer          buffers[GIN_NDELETE_AT_ONCE];
527                 BlockNumber freespace[GIN_NDELETE_AT_ONCE];
528
529                 data.ndeleted = 0;
530                 while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
531                 {
532                         freespace[data.ndeleted] = blknoToDelete;
533                         buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
534                         LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
535                         page = BufferGetPage(buffers[data.ndeleted]);
536
537                         data.ndeleted++;
538
539                         Assert(!GinPageIsDeleted(page));
540
541                         nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
542                         blknoToDelete = GinPageGetOpaque(page)->rightlink;
543                 }
544
545                 if (stats)
546                         stats->pages_deleted += data.ndeleted;
547
548                 /*
549                  * This operation touches an unusually large number of pages, so
550                  * prepare the XLogInsert machinery for that before entering the
551                  * critical section.
552                  */
553                 if (RelationNeedsWAL(index))
554                         XLogEnsureRecordSpace(data.ndeleted, 0);
555
556                 START_CRIT_SECTION();
557
558                 metadata->head = blknoToDelete;
559
560                 Assert(metadata->nPendingPages >= data.ndeleted);
561                 metadata->nPendingPages -= data.ndeleted;
562                 Assert(metadata->nPendingHeapTuples >= nDeletedHeapTuples);
563                 metadata->nPendingHeapTuples -= nDeletedHeapTuples;
564
565                 if (blknoToDelete == InvalidBlockNumber)
566                 {
567                         metadata->tail = InvalidBlockNumber;
568                         metadata->tailFreeSize = 0;
569                         metadata->nPendingPages = 0;
570                         metadata->nPendingHeapTuples = 0;
571                 }
572
573                 MarkBufferDirty(metabuffer);
574
575                 for (i = 0; i < data.ndeleted; i++)
576                 {
577                         page = BufferGetPage(buffers[i]);
578                         GinPageGetOpaque(page)->flags = GIN_DELETED;
579                         MarkBufferDirty(buffers[i]);
580                 }
581
582                 if (RelationNeedsWAL(index))
583                 {
584                         XLogRecPtr      recptr;
585
586                         XLogBeginInsert();
587                         XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT);
588                         for (i = 0; i < data.ndeleted; i++)
589                                 XLogRegisterBuffer(i + 1, buffers[i], REGBUF_WILL_INIT);
590
591                         memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
592
593                         XLogRegisterData((char *) &data,
594                                                          sizeof(ginxlogDeleteListPages));
595
596                         recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE);
597                         PageSetLSN(metapage, recptr);
598
599                         for (i = 0; i < data.ndeleted; i++)
600                         {
601                                 page = BufferGetPage(buffers[i]);
602                                 PageSetLSN(page, recptr);
603                         }
604                 }
605
606                 for (i = 0; i < data.ndeleted; i++)
607                         UnlockReleaseBuffer(buffers[i]);
608
609                 END_CRIT_SECTION();
610
611                 for (i = 0; fill_fsm && i < data.ndeleted; i++)
612                         RecordFreeIndexPage(index, freespace[i]);
613
614         } while (blknoToDelete != newHead);
615 }
616
617 /* Initialize empty KeyArray */
618 static void
619 initKeyArray(KeyArray *keys, int32 maxvalues)
620 {
621         keys->keys = (Datum *) palloc(sizeof(Datum) * maxvalues);
622         keys->categories = (GinNullCategory *)
623                 palloc(sizeof(GinNullCategory) * maxvalues);
624         keys->nvalues = 0;
625         keys->maxvalues = maxvalues;
626 }
627
628 /* Add datum to KeyArray, resizing if needed */
629 static void
630 addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
631 {
632         if (keys->nvalues >= keys->maxvalues)
633         {
634                 keys->maxvalues *= 2;
635                 keys->keys = (Datum *)
636                         repalloc(keys->keys, sizeof(Datum) * keys->maxvalues);
637                 keys->categories = (GinNullCategory *)
638                         repalloc(keys->categories, sizeof(GinNullCategory) * keys->maxvalues);
639         }
640
641         keys->keys[keys->nvalues] = datum;
642         keys->categories[keys->nvalues] = category;
643         keys->nvalues++;
644 }
645
646 /*
647  * Collect data from a pending-list page in preparation for insertion into
648  * the main index.
649  *
650  * Go through all tuples >= startoff on page and collect values in accum
651  *
652  * Note that ka is just workspace --- it does not carry any state across
653  * calls.
654  */
655 static void
656 processPendingPage(BuildAccumulator *accum, KeyArray *ka,
657                                    Page page, OffsetNumber startoff)
658 {
659         ItemPointerData heapptr;
660         OffsetNumber i,
661                                 maxoff;
662         OffsetNumber attrnum;
663
664         /* reset *ka to empty */
665         ka->nvalues = 0;
666
667         maxoff = PageGetMaxOffsetNumber(page);
668         Assert(maxoff >= FirstOffsetNumber);
669         ItemPointerSetInvalid(&heapptr);
670         attrnum = 0;
671
672         for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
673         {
674                 IndexTuple      itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
675                 OffsetNumber curattnum;
676                 Datum           curkey;
677                 GinNullCategory curcategory;
678
679                 /* Check for change of heap TID or attnum */
680                 curattnum = gintuple_get_attrnum(accum->ginstate, itup);
681
682                 if (!ItemPointerIsValid(&heapptr))
683                 {
684                         heapptr = itup->t_tid;
685                         attrnum = curattnum;
686                 }
687                 else if (!(ItemPointerEquals(&heapptr, &itup->t_tid) &&
688                                    curattnum == attrnum))
689                 {
690                         /*
691                          * ginInsertBAEntries can insert several datums per call, but only
692                          * for one heap tuple and one column.  So call it at a boundary,
693                          * and reset ka.
694                          */
695                         ginInsertBAEntries(accum, &heapptr, attrnum,
696                                                            ka->keys, ka->categories, ka->nvalues);
697                         ka->nvalues = 0;
698                         heapptr = itup->t_tid;
699                         attrnum = curattnum;
700                 }
701
702                 /* Add key to KeyArray */
703                 curkey = gintuple_get_key(accum->ginstate, itup, &curcategory);
704                 addDatum(ka, curkey, curcategory);
705         }
706
707         /* Dump out all remaining keys */
708         ginInsertBAEntries(accum, &heapptr, attrnum,
709                                            ka->keys, ka->categories, ka->nvalues);
710 }
711
712 /*
713  * Move tuples from pending pages into regular GIN structure.
714  *
715  * On first glance it looks completely not crash-safe. But if we crash
716  * after posting entries to the main index and before removing them from the
717  * pending list, it's okay because when we redo the posting later on, nothing
718  * bad will happen.
719  *
720  * fill_fsm indicates that ginInsertCleanup should add deleted pages
721  * to FSM otherwise caller is responsible to put deleted pages into
722  * FSM.
723  *
724  * If stats isn't null, we count deleted pending pages into the counts.
725  */
726 void
727 ginInsertCleanup(GinState *ginstate, bool full_clean,
728                                  bool fill_fsm, IndexBulkDeleteResult *stats)
729 {
730         Relation        index = ginstate->index;
731         Buffer          metabuffer,
732                                 buffer;
733         Page            metapage,
734                                 page;
735         GinMetaPageData *metadata;
736         MemoryContext opCtx,
737                                 oldCtx;
738         BuildAccumulator accum;
739         KeyArray        datums;
740         BlockNumber blkno,
741                                 blknoFinish;
742         bool            cleanupFinish = false;
743         bool            fsm_vac = false;
744         Size            workMemory;
745         bool            inVacuum = (stats == NULL);
746
747         /*
748          * We would like to prevent concurrent cleanup process. For that we will
749          * lock metapage in exclusive mode using LockPage() call. Nobody other
750          * will use that lock for metapage, so we keep possibility of concurrent
751          * insertion into pending list
752          */
753
754         if (inVacuum)
755         {
756                 /*
757                  * We are called from [auto]vacuum/analyze or gin_clean_pending_list()
758                  * and we would like to wait concurrent cleanup to finish.
759                  */
760                 LockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
761                 workMemory =
762                         (IsAutoVacuumWorkerProcess() && autovacuum_work_mem != -1) ?
763                         autovacuum_work_mem : maintenance_work_mem;
764         }
765         else
766         {
767                 /*
768                  * We are called from regular insert and if we see concurrent cleanup
769                  * just exit in hope that concurrent process will clean up pending
770                  * list.
771                  */
772                 if (!ConditionalLockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock))
773                         return;
774                 workMemory = work_mem;
775         }
776
777         metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
778         LockBuffer(metabuffer, GIN_SHARE);
779         metapage = BufferGetPage(metabuffer);
780         metadata = GinPageGetMeta(metapage);
781
782         if (metadata->head == InvalidBlockNumber)
783         {
784                 /* Nothing to do */
785                 UnlockReleaseBuffer(metabuffer);
786                 UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
787                 return;
788         }
789
790         /*
791          * Remember a tail page to prevent infinite cleanup if other backends add
792          * new tuples faster than we can cleanup.
793          */
794         blknoFinish = metadata->tail;
795
796         /*
797          * Read and lock head of pending list
798          */
799         blkno = metadata->head;
800         buffer = ReadBuffer(index, blkno);
801         LockBuffer(buffer, GIN_SHARE);
802         page = BufferGetPage(buffer);
803
804         LockBuffer(metabuffer, GIN_UNLOCK);
805
806         /*
807          * Initialize.  All temporary space will be in opCtx
808          */
809         opCtx = AllocSetContextCreate(CurrentMemoryContext,
810                                                                   "GIN insert cleanup temporary context",
811                                                                   ALLOCSET_DEFAULT_SIZES);
812
813         oldCtx = MemoryContextSwitchTo(opCtx);
814
815         initKeyArray(&datums, 128);
816         ginInitBA(&accum);
817         accum.ginstate = ginstate;
818
819         /*
820          * At the top of this loop, we have pin and lock on the current page of
821          * the pending list.  However, we'll release that before exiting the loop.
822          * Note we also have pin but not lock on the metapage.
823          */
824         for (;;)
825         {
826                 Assert(!GinPageIsDeleted(page));
827
828                 /*
829                  * Are we walk through the page which as we remember was a tail when
830                  * we start our cleanup?  But if caller asks us to clean up whole
831                  * pending list then ignore old tail, we will work until list becomes
832                  * empty.
833                  */
834                 if (blkno == blknoFinish && full_clean == false)
835                         cleanupFinish = true;
836
837                 /*
838                  * read page's datums into accum
839                  */
840                 processPendingPage(&accum, &datums, page, FirstOffsetNumber);
841
842                 vacuum_delay_point();
843
844                 /*
845                  * Is it time to flush memory to disk?  Flush if we are at the end of
846                  * the pending list, or if we have a full row and memory is getting
847                  * full.
848                  */
849                 if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
850                         (GinPageHasFullRow(page) &&
851                          (accum.allocatedMemory >= workMemory * 1024L)))
852                 {
853                         ItemPointerData *list;
854                         uint32          nlist;
855                         Datum           key;
856                         GinNullCategory category;
857                         OffsetNumber maxoff,
858                                                 attnum;
859
860                         /*
861                          * Unlock current page to increase performance. Changes of page
862                          * will be checked later by comparing maxoff after completion of
863                          * memory flush.
864                          */
865                         maxoff = PageGetMaxOffsetNumber(page);
866                         LockBuffer(buffer, GIN_UNLOCK);
867
868                         /*
869                          * Moving collected data into regular structure can take
870                          * significant amount of time - so, run it without locking pending
871                          * list.
872                          */
873                         ginBeginBAScan(&accum);
874                         while ((list = ginGetBAEntry(&accum,
875                                                                   &attnum, &key, &category, &nlist)) != NULL)
876                         {
877                                 ginEntryInsert(ginstate, attnum, key, category,
878                                                            list, nlist, NULL);
879                                 vacuum_delay_point();
880                         }
881
882                         /*
883                          * Lock the whole list to remove pages
884                          */
885                         LockBuffer(metabuffer, GIN_EXCLUSIVE);
886                         LockBuffer(buffer, GIN_SHARE);
887
888                         Assert(!GinPageIsDeleted(page));
889
890                         /*
891                          * While we left the page unlocked, more stuff might have gotten
892                          * added to it.  If so, process those entries immediately.  There
893                          * shouldn't be very many, so we don't worry about the fact that
894                          * we're doing this with exclusive lock. Insertion algorithm
895                          * guarantees that inserted row(s) will not continue on next page.
896                          * NOTE: intentionally no vacuum_delay_point in this loop.
897                          */
898                         if (PageGetMaxOffsetNumber(page) != maxoff)
899                         {
900                                 ginInitBA(&accum);
901                                 processPendingPage(&accum, &datums, page, maxoff + 1);
902
903                                 ginBeginBAScan(&accum);
904                                 while ((list = ginGetBAEntry(&accum,
905                                                                   &attnum, &key, &category, &nlist)) != NULL)
906                                         ginEntryInsert(ginstate, attnum, key, category,
907                                                                    list, nlist, NULL);
908                         }
909
910                         /*
911                          * Remember next page - it will become the new list head
912                          */
913                         blkno = GinPageGetOpaque(page)->rightlink;
914                         UnlockReleaseBuffer(buffer);            /* shiftList will do exclusive
915                                                                                                  * locking */
916
917                         /*
918                          * remove read pages from pending list, at this point all content
919                          * of read pages is in regular structure
920                          */
921                         shiftList(index, metabuffer, blkno, fill_fsm, stats);
922
923                         /* At this point, some pending pages have been freed up */
924                         fsm_vac = true;
925
926                         Assert(blkno == metadata->head);
927                         LockBuffer(metabuffer, GIN_UNLOCK);
928
929                         /*
930                          * if we removed the whole pending list or we cleanup tail (which
931                          * we remembered on start our cleanup process) then just exit
932                          */
933                         if (blkno == InvalidBlockNumber || cleanupFinish)
934                                 break;
935
936                         /*
937                          * release memory used so far and reinit state
938                          */
939                         MemoryContextReset(opCtx);
940                         initKeyArray(&datums, datums.maxvalues);
941                         ginInitBA(&accum);
942                 }
943                 else
944                 {
945                         blkno = GinPageGetOpaque(page)->rightlink;
946                         UnlockReleaseBuffer(buffer);
947                 }
948
949                 /*
950                  * Read next page in pending list
951                  */
952                 vacuum_delay_point();
953                 buffer = ReadBuffer(index, blkno);
954                 LockBuffer(buffer, GIN_SHARE);
955                 page = BufferGetPage(buffer);
956         }
957
958         UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
959         ReleaseBuffer(metabuffer);
960
961         /*
962          * As pending list pages can have a high churn rate, it is desirable to
963          * recycle them immediately to the FreeSpace Map when ordinary backends
964          * clean the list.
965          */
966         if (fsm_vac && fill_fsm)
967                 IndexFreeSpaceMapVacuum(index);
968
969
970         /* Clean up temporary space */
971         MemoryContextSwitchTo(oldCtx);
972         MemoryContextDelete(opCtx);
973 }
974
975 /*
976  * SQL-callable function to clean the insert pending list
977  */
978 Datum
979 gin_clean_pending_list(PG_FUNCTION_ARGS)
980 {
981         Oid                     indexoid = PG_GETARG_OID(0);
982         Relation        indexRel = index_open(indexoid, AccessShareLock);
983         IndexBulkDeleteResult stats;
984         GinState        ginstate;
985
986         if (RecoveryInProgress())
987                 ereport(ERROR,
988                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
989                                  errmsg("recovery is in progress"),
990                  errhint("GIN pending list cannot be cleaned up during recovery.")));
991
992         /* Must be a GIN index */
993         if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
994                 indexRel->rd_rel->relam != GIN_AM_OID)
995                 ereport(ERROR,
996                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
997                                  errmsg("\"%s\" is not a GIN index",
998                                                 RelationGetRelationName(indexRel))));
999
1000         /*
1001          * Reject attempts to read non-local temporary relations; we would be
1002          * likely to get wrong data since we have no visibility into the owning
1003          * session's local buffers.
1004          */
1005         if (RELATION_IS_OTHER_TEMP(indexRel))
1006                 ereport(ERROR,
1007                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1008                            errmsg("cannot access temporary indexes of other sessions")));
1009
1010         /* User must own the index (comparable to privileges needed for VACUUM) */
1011         if (!pg_class_ownercheck(indexoid, GetUserId()))
1012                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
1013                                            RelationGetRelationName(indexRel));
1014
1015         memset(&stats, 0, sizeof(stats));
1016         initGinState(&ginstate, indexRel);
1017         ginInsertCleanup(&ginstate, true, true, &stats);
1018
1019         index_close(indexRel, AccessShareLock);
1020
1021         PG_RETURN_INT64((int64) stats.pages_deleted);
1022 }