]> granicus.if.org Git - postgresql/blob - src/backend/access/gin/gindatapage.c
Reset unused fields in GIN data leaf page footer.
[postgresql] / src / backend / access / gin / gindatapage.c
1 /*-------------------------------------------------------------------------
2  *
3  * gindatapage.c
4  *        routines for handling GIN posting tree pages.
5  *
6  *
7  * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *                      src/backend/access/gin/gindatapage.c
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres.h"
16
17 #include "access/gin_private.h"
18 #include "access/heapam_xlog.h"
19 #include "lib/ilist.h"
20 #include "miscadmin.h"
21 #include "utils/memutils.h"
22 #include "utils/rel.h"
23
24 /*
25  * Size of the posting lists stored on leaf pages, in bytes. The code can
26  * deal with any size, but random access is more efficient when a number of
27  * smaller lists are stored, rather than one big list.
28  */
29 #define GinPostingListSegmentMaxSize 256
30
31 /*
32  * Existing posting lists smaller than this are recompressed, when inserting
33  * new items to page.
34  */
35 #define GinPostingListSegmentMinSize 192
36
37 /*
38  * At least this many items fit in a GinPostingListSegmentMaxSize-bytes
39  * long segment. This is used when estimating how much space is required
40  * for N items, at minimum.
41  */
42 #define MinTuplesPerSegment ((GinPostingListSegmentMaxSize - 2) / 6)
43
44 /*
45  * A working struct for manipulating a posting tree leaf page.
46  */
47 typedef struct
48 {
49         dlist_head      segments;               /* a list of leafSegmentInfos */
50
51         /*
52          * The following fields represent how the segments are split across
53          * pages, if a page split is required. Filled in by leafRepackItems.
54          */
55         dlist_node *lastleft;           /* last segment on left page */
56         int                     lsize;                  /* total size on left page */
57         int                     rsize;                  /* total size on right page */
58 } disassembledLeaf;
59
60 typedef struct
61 {
62         dlist_node      node;           /* linked list pointers */
63
64         /*
65          * The following fields represent the items in this segment.
66          * If 'items' is not NULL, it contains a palloc'd array of the items
67          * in this segment. If 'seg' is not NULL, it contains the items in an
68          * already-compressed format. It can point to an on-disk page (!modified),
69          * or a palloc'd segment in memory. If both are set, they must represent
70          * the same items.
71          */
72         GinPostingList *seg;
73         ItemPointer items;
74         int                     nitems;                 /* # of items in 'items', if items != NULL */
75
76         bool            modified;               /* is this segment on page already? */
77 } leafSegmentInfo;
78
79 static ItemPointer dataLeafPageGetUncompressed(Page page, int *nitems);
80 static void dataSplitPageInternal(GinBtree btree, Buffer origbuf,
81                                           GinBtreeStack *stack,
82                                           void *insertdata, BlockNumber updateblkno,
83                                           XLogRecData **prdata, Page *newlpage, Page *newrpage);
84
85 static disassembledLeaf *disassembleLeaf(Page page);
86 static bool leafRepackItems(disassembledLeaf *leaf, ItemPointer remaining);
87 static bool addItemsToLeaf(disassembledLeaf *leaf, ItemPointer newItems, int nNewItems);
88
89
90 static void dataPlaceToPageLeafRecompress(Buffer buf,
91                                                           disassembledLeaf *leaf,
92                                                           XLogRecData **prdata);
93 static void dataPlaceToPageLeafSplit(Buffer buf,
94                                                  disassembledLeaf *leaf,
95                                                  ItemPointerData lbound, ItemPointerData rbound,
96                                                  XLogRecData **prdata, Page lpage, Page rpage);
97
98 /*
99  * Read all TIDs from leaf data page to single uncompressed array.
100  */
101 ItemPointer
102 GinDataLeafPageGetItems(Page page, int *nitems)
103 {
104         ItemPointer result;
105
106         if (GinPageIsCompressed(page))
107         {
108                 GinPostingList *ptr = GinDataLeafPageGetPostingList(page);
109                 Size            len = GinDataLeafPageGetPostingListSize(page);
110
111                 result = ginPostingListDecodeAllSegments(ptr, len, nitems);
112         }
113         else
114         {
115                 ItemPointer tmp = dataLeafPageGetUncompressed(page, nitems);
116
117                 result = palloc((*nitems) * sizeof(ItemPointerData));
118                 memcpy(result, tmp, (*nitems) * sizeof(ItemPointerData));
119         }
120
121         return result;
122 }
123
124 /*
125  * Places all TIDs from leaf data page to bitmap.
126  */
127 int
128 GinDataLeafPageGetItemsToTbm(Page page, TIDBitmap *tbm)
129 {
130         ItemPointer uncompressed;
131         int                     nitems;
132
133         if (GinPageIsCompressed(page))
134         {
135                 GinPostingList *segment = GinDataLeafPageGetPostingList(page);
136                 Size            len = GinDataLeafPageGetPostingListSize(page);
137
138                 nitems = ginPostingListDecodeAllSegmentsToTbm(segment, len, tbm);
139         }
140         else
141         {
142                 uncompressed = dataLeafPageGetUncompressed(page, &nitems);
143
144                 if (nitems > 0)
145                         tbm_add_tuples(tbm, uncompressed, nitems, false);
146         }
147
148         return nitems;
149 }
150
151 /*
152  * Get pointer to the uncompressed array of items on a pre-9.4 format
153  * uncompressed leaf page. The number of items in the array is returned in
154  * *nitems.
155  */
156 static ItemPointer
157 dataLeafPageGetUncompressed(Page page, int *nitems)
158 {
159         ItemPointer items;
160
161         Assert(!GinPageIsCompressed(page));
162
163         /*
164          * In the old pre-9.4 page format, the whole page content is used for
165          * uncompressed items, and the number of items is stored in 'maxoff'
166          */
167         items = (ItemPointer) GinDataPageGetData(page);
168         *nitems = GinPageGetOpaque(page)->maxoff;
169
170         return items;
171 }
172
173 /*
174  * Check if we should follow the right link to find the item we're searching
175  * for.
176  *
177  * Compares inserting item pointer with the right bound of the current page.
178  */
179 static bool
180 dataIsMoveRight(GinBtree btree, Page page)
181 {
182         ItemPointer iptr = GinDataPageGetRightBound(page);
183
184         if (GinPageRightMost(page))
185                 return FALSE;
186
187         return (ginCompareItemPointers(&btree->itemptr, iptr) > 0) ? TRUE : FALSE;
188 }
189
190 /*
191  * Find correct PostingItem in non-leaf page. It is assumed that this is
192  * the correct page, and the searched value SHOULD be on the page.
193  */
194 static BlockNumber
195 dataLocateItem(GinBtree btree, GinBtreeStack *stack)
196 {
197         OffsetNumber low,
198                                 high,
199                                 maxoff;
200         PostingItem *pitem = NULL;
201         int                     result;
202         Page            page = BufferGetPage(stack->buffer);
203
204         Assert(!GinPageIsLeaf(page));
205         Assert(GinPageIsData(page));
206
207         if (btree->fullScan)
208         {
209                 stack->off = FirstOffsetNumber;
210                 stack->predictNumber *= GinPageGetOpaque(page)->maxoff;
211                 return btree->getLeftMostChild(btree, page);
212         }
213
214         low = FirstOffsetNumber;
215         maxoff = high = GinPageGetOpaque(page)->maxoff;
216         Assert(high >= low);
217
218         high++;
219
220         while (high > low)
221         {
222                 OffsetNumber mid = low + ((high - low) / 2);
223
224                 pitem = GinDataPageGetPostingItem(page, mid);
225
226                 if (mid == maxoff)
227                 {
228                         /*
229                          * Right infinity, page already correctly chosen with a help of
230                          * dataIsMoveRight
231                          */
232                         result = -1;
233                 }
234                 else
235                 {
236                         pitem = GinDataPageGetPostingItem(page, mid);
237                         result = ginCompareItemPointers(&btree->itemptr, &(pitem->key));
238                 }
239
240                 if (result == 0)
241                 {
242                         stack->off = mid;
243                         return PostingItemGetBlockNumber(pitem);
244                 }
245                 else if (result > 0)
246                         low = mid + 1;
247                 else
248                         high = mid;
249         }
250
251         Assert(high >= FirstOffsetNumber && high <= maxoff);
252
253         stack->off = high;
254         pitem = GinDataPageGetPostingItem(page, high);
255         return PostingItemGetBlockNumber(pitem);
256 }
257
258 /*
259  * Find link to blkno on non-leaf page, returns offset of PostingItem
260  */
261 static OffsetNumber
262 dataFindChildPtr(GinBtree btree, Page page, BlockNumber blkno, OffsetNumber storedOff)
263 {
264         OffsetNumber i,
265                                 maxoff = GinPageGetOpaque(page)->maxoff;
266         PostingItem *pitem;
267
268         Assert(!GinPageIsLeaf(page));
269         Assert(GinPageIsData(page));
270
271         /* if page isn't changed, we return storedOff */
272         if (storedOff >= FirstOffsetNumber && storedOff <= maxoff)
273         {
274                 pitem = GinDataPageGetPostingItem(page, storedOff);
275                 if (PostingItemGetBlockNumber(pitem) == blkno)
276                         return storedOff;
277
278                 /*
279                  * we hope, that needed pointer goes to right. It's true if there
280                  * wasn't a deletion
281                  */
282                 for (i = storedOff + 1; i <= maxoff; i++)
283                 {
284                         pitem = GinDataPageGetPostingItem(page, i);
285                         if (PostingItemGetBlockNumber(pitem) == blkno)
286                                 return i;
287                 }
288
289                 maxoff = storedOff - 1;
290         }
291
292         /* last chance */
293         for (i = FirstOffsetNumber; i <= maxoff; i++)
294         {
295                 pitem = GinDataPageGetPostingItem(page, i);
296                 if (PostingItemGetBlockNumber(pitem) == blkno)
297                         return i;
298         }
299
300         return InvalidOffsetNumber;
301 }
302
303 /*
304  * Return blkno of leftmost child
305  */
306 static BlockNumber
307 dataGetLeftMostPage(GinBtree btree, Page page)
308 {
309         PostingItem *pitem;
310
311         Assert(!GinPageIsLeaf(page));
312         Assert(GinPageIsData(page));
313         Assert(GinPageGetOpaque(page)->maxoff >= FirstOffsetNumber);
314
315         pitem = GinDataPageGetPostingItem(page, FirstOffsetNumber);
316         return PostingItemGetBlockNumber(pitem);
317 }
318
319 /*
320  * Add PostingItem to a non-leaf page.
321  */
322 void
323 GinDataPageAddPostingItem(Page page, PostingItem *data, OffsetNumber offset)
324 {
325         OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
326         char       *ptr;
327
328         Assert(PostingItemGetBlockNumber(data) != InvalidBlockNumber);
329         Assert(!GinPageIsLeaf(page));
330
331         if (offset == InvalidOffsetNumber)
332         {
333                 ptr = (char *) GinDataPageGetPostingItem(page, maxoff + 1);
334         }
335         else
336         {
337                 ptr = (char *) GinDataPageGetPostingItem(page, offset);
338                 if (offset != maxoff + 1)
339                         memmove(ptr + sizeof(PostingItem),
340                                         ptr,
341                                         (maxoff - offset + 1) * sizeof(PostingItem));
342         }
343         memcpy(ptr, data, sizeof(PostingItem));
344
345         GinPageGetOpaque(page)->maxoff++;
346 }
347
348 /*
349  * Delete posting item from non-leaf page
350  */
351 void
352 GinPageDeletePostingItem(Page page, OffsetNumber offset)
353 {
354         OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
355
356         Assert(!GinPageIsLeaf(page));
357         Assert(offset >= FirstOffsetNumber && offset <= maxoff);
358
359         if (offset != maxoff)
360                 memmove(GinDataPageGetPostingItem(page, offset),
361                                 GinDataPageGetPostingItem(page, offset + 1),
362                                 sizeof(PostingItem) * (maxoff - offset));
363
364         GinPageGetOpaque(page)->maxoff--;
365 }
366
367 /*
368  * Places keys to leaf data page and fills WAL record.
369  */
370 static GinPlaceToPageRC
371 dataPlaceToPageLeaf(GinBtree btree, Buffer buf, GinBtreeStack *stack,
372                                         void *insertdata, XLogRecData **prdata,
373                                         Page *newlpage, Page *newrpage)
374 {
375         GinBtreeDataLeafInsertData *items = insertdata;
376         ItemPointer newItems = &items->items[items->curitem];
377         int                     maxitems = items->nitem - items->curitem;
378         Page            page = BufferGetPage(buf);
379         int                     i;
380         ItemPointerData rbound;
381         ItemPointerData lbound;
382         bool            needsplit;
383         bool            append;
384         int                     segsize;
385         Size            freespace;
386         MemoryContext tmpCxt;
387         MemoryContext oldCxt;
388         disassembledLeaf *leaf;
389         leafSegmentInfo *lastleftinfo;
390         ItemPointerData maxOldItem;
391         ItemPointerData remaining;
392
393         Assert(GinPageIsData(page));
394
395         rbound  = *GinDataPageGetRightBound(page);
396
397         /*
398          * Count how many of the new items belong to this page.
399          */
400         if (!GinPageRightMost(page))
401         {
402                 for (i = 0; i < maxitems; i++)
403                 {
404                         if (ginCompareItemPointers(&newItems[i], &rbound) > 0)
405                         {
406                                 /*
407                                  * This needs to go to some other location in the tree. (The
408                                  * caller should've chosen the insert location so that at least
409                                  * the first item goes here.)
410                                  */
411                                 Assert(i > 0);
412                                 break;
413                         }
414                 }
415                 maxitems = i;
416         }
417
418         /*
419          * The following operations do quite a lot of small memory allocations,
420          * create a temporary memory context so that we don't need to keep track
421          * of them individually.
422          */
423         tmpCxt = AllocSetContextCreate(CurrentMemoryContext,
424                                                                    "Gin split temporary context",
425                                                                    ALLOCSET_DEFAULT_MINSIZE,
426                                                                    ALLOCSET_DEFAULT_INITSIZE,
427                                                                    ALLOCSET_DEFAULT_MAXSIZE);
428         oldCxt = MemoryContextSwitchTo(tmpCxt);
429
430         leaf = disassembleLeaf(page);
431
432         /*
433          * Are we appending to the end of the page? IOW, are all the new items
434          * larger than any of the existing items.
435          */
436         if (!dlist_is_empty(&leaf->segments))
437         {
438                 lastleftinfo = dlist_container(leafSegmentInfo, node,
439                                                                            dlist_tail_node(&leaf->segments));
440                 if (!lastleftinfo->items)
441                         lastleftinfo->items = ginPostingListDecode(lastleftinfo->seg,
442                                                                                                            &lastleftinfo->nitems);
443                 maxOldItem = lastleftinfo->items[lastleftinfo->nitems - 1];
444                 if (ginCompareItemPointers(&newItems[0], &maxOldItem) >= 0)
445                         append = true;
446                 else
447                         append = false;
448         }
449         else
450         {
451                 ItemPointerSetMin(&maxOldItem);
452                 append = true;
453         }
454
455         /*
456          * If we're appending to the end of the page, we will append as many items
457          * as we can fit (after splitting), and stop when the pages becomes full.
458          * Otherwise we have to limit the number of new items to insert, because
459          * once we start packing we can't just stop when we run out of space,
460          * because we must make sure that all the old items still fit.
461          */
462         if (GinPageIsCompressed(page))
463                 freespace = GinDataLeafPageGetFreeSpace(page);
464         else
465                 freespace = 0;
466         if (append)
467         {
468                 /*
469                  * Even when appending, trying to append more items than will fit is
470                  * not completely free, because we will merge the new items and old
471                  * items into an array below. In the best case, every new item fits in
472                  * a single byte, and we can use all the free space on the old page as
473                  * well as the new page. For simplicity, ignore segment overhead etc.
474                  */
475                 maxitems = Min(maxitems, freespace + GinDataLeafMaxContentSize);
476         }
477         else
478         {
479                 /*
480                  * Calculate a conservative estimate of how many new items we can fit
481                  * on the two pages after splitting.
482                  *
483                  * We can use any remaining free space on the old page to store full
484                  * segments, as well as the new page. Each full-sized segment can hold
485                  * at least MinTuplesPerSegment items
486                  */
487                 int                     nnewsegments;
488
489                 nnewsegments = freespace / GinPostingListSegmentMaxSize;
490                 nnewsegments += GinDataLeafMaxContentSize / GinPostingListSegmentMaxSize;
491                 maxitems = Min(maxitems, nnewsegments * MinTuplesPerSegment);
492         }
493
494         /* Add the new items to the segments */
495         if (!addItemsToLeaf(leaf, newItems, maxitems))
496         {
497                  /* all items were duplicates, we have nothing to do */
498                 items->curitem += maxitems;
499
500                 MemoryContextSwitchTo(oldCxt);
501                 MemoryContextDelete(tmpCxt);
502
503                 return UNMODIFIED;
504         }
505
506         /*
507          * Pack the items back to compressed segments, ready for writing to disk.
508          */
509         needsplit = leafRepackItems(leaf, &remaining);
510
511         /*
512          * Did all the new items fit?
513          *
514          * If we're appending, it's OK if they didn't. But as a sanity check,
515          * verify that all the old items fit.
516          */
517         if (ItemPointerIsValid(&remaining))
518         {
519                 if (!append || ItemPointerCompare(&maxOldItem, &remaining) >= 0)
520                         elog(ERROR, "could not split GIN page; all old items didn't fit");
521
522                 /* Count how many of the new items did fit. */
523                 for (i = 0; i < maxitems; i++)
524                 {
525                         if (ginCompareItemPointers(&newItems[i], &remaining) >= 0)
526                                 break;
527                 }
528                 if (i == 0)
529                         elog(ERROR, "could not split GIN page; no new items fit");
530                 maxitems = i;
531         }
532
533         if (!needsplit)
534         {
535                 /*
536                  * Great, all the items fit on a single page. Write the segments to
537                  * the page, and WAL-log appropriately.
538                  *
539                  * Once we start modifying the page, there's no turning back. The
540                  * caller is responsible for calling END_CRIT_SECTION() after writing
541                  * the WAL record.
542                  */
543                 START_CRIT_SECTION();
544                 dataPlaceToPageLeafRecompress(buf, leaf, prdata);
545
546                 if (append)
547                         elog(DEBUG2, "appended %d new items to block %u; %d bytes (%d to go)",
548                                  maxitems, BufferGetBlockNumber(buf), (int) leaf->lsize,
549                                  items->nitem - items->curitem - maxitems);
550                 else
551                         elog(DEBUG2, "inserted %d new items to block %u; %d bytes (%d to go)",
552                                  maxitems, BufferGetBlockNumber(buf), (int) leaf->lsize,
553                                  items->nitem - items->curitem - maxitems);
554         }
555         else
556         {
557                 /*
558                  * Had to split.
559                  *
560                  * We already divided the segments between the left and the right
561                  * page. The left page was filled as full as possible, and the rest
562                  * overflowed to the right page. When building a new index, that's
563                  * good, because the table is scanned from beginning to end and there
564                  * won't be any more insertions to the left page during the build.
565                  * This packs the index as tight as possible. But otherwise, split
566                  * 50/50, by moving segments from the left page to the right page
567                  * until they're balanced.
568                  *
569                  * As a further heuristic, when appending items to the end of the
570                  * page, split 75/25, one the assumption that subsequent insertions
571                  * will probably also go to the end. This packs the index somewhat
572                  * tighter when appending to a table, which is very common.
573                  */
574                 if (!btree->isBuild)
575                 {
576                         while (dlist_has_prev(&leaf->segments, leaf->lastleft))
577                         {
578                                 lastleftinfo = dlist_container(leafSegmentInfo, node, leaf->lastleft);
579
580                                 segsize = SizeOfGinPostingList(lastleftinfo->seg);
581                                 if (append)
582                                 {
583                                         if ((leaf->lsize - segsize) - (leaf->lsize - segsize) < BLCKSZ / 4)
584                                                 break;
585                                 }
586                                 else
587                                 {
588                                         if ((leaf->lsize - segsize) - (leaf->rsize + segsize) < 0)
589                                                 break;
590                                 }
591
592                                  /* don't consider segments moved to right as unmodified */
593                                 lastleftinfo->modified = true;
594                                 leaf->lsize -= segsize;
595                                 leaf->rsize += segsize;
596                                 leaf->lastleft = dlist_prev_node(&leaf->segments, leaf->lastleft);
597                         }
598                 }
599                 Assert(leaf->lsize <= GinDataLeafMaxContentSize);
600                 Assert(leaf->rsize <= GinDataLeafMaxContentSize);
601
602                 /*
603                  * Fetch the max item in the left page's last segment; it becomes the
604                  * right bound of the page.
605                  */
606                 lastleftinfo = dlist_container(leafSegmentInfo, node, leaf->lastleft);
607                 if (!lastleftinfo->items)
608                         lastleftinfo->items = ginPostingListDecode(lastleftinfo->seg,
609                                                                                                            &lastleftinfo->nitems);
610                 lbound = lastleftinfo->items[lastleftinfo->nitems - 1];
611
612                 *newlpage = MemoryContextAlloc(oldCxt, BLCKSZ);
613                 *newrpage = MemoryContextAlloc(oldCxt, BLCKSZ);
614
615                 dataPlaceToPageLeafSplit(buf, leaf, lbound, rbound,
616                                                                  prdata, *newlpage, *newrpage);
617
618                 Assert(GinPageRightMost(page) ||
619                            ginCompareItemPointers(GinDataPageGetRightBound(*newlpage),
620                                                                           GinDataPageGetRightBound(*newrpage)) < 0);
621
622                 if (append)
623                         elog(DEBUG2, "appended %d items to block %u; split %d/%d (%d to go)",
624                                  maxitems, BufferGetBlockNumber(buf), (int) leaf->lsize, (int) leaf->rsize,
625                                  items->nitem - items->curitem - maxitems);
626                 else
627                         elog(DEBUG2, "inserted %d items to block %u; split %d/%d (%d to go)",
628                                  maxitems, BufferGetBlockNumber(buf), (int) leaf->lsize, (int) leaf->rsize,
629                                  items->nitem - items->curitem - maxitems);
630         }
631
632         MemoryContextSwitchTo(oldCxt);
633         MemoryContextDelete(tmpCxt);
634
635         items->curitem += maxitems;
636
637         return needsplit ? SPLIT : INSERTED;
638 }
639
640 /*
641  * Vacuum a posting tree leaf page.
642  */
643 void
644 ginVacuumPostingTreeLeaf(Relation indexrel, Buffer buffer, GinVacuumState *gvs)
645 {
646         Page            page = BufferGetPage(buffer);
647         disassembledLeaf *leaf;
648         bool            removedsomething = false;
649         dlist_iter      iter;
650
651         leaf = disassembleLeaf(page);
652
653         /* Vacuum each segment. */
654         dlist_foreach(iter, &leaf->segments)
655         {
656                 leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node, iter.cur);
657                 int                     oldsegsize;
658                 ItemPointer cleaned;
659                 int                     ncleaned;
660
661                 if (!seginfo->items)
662                         seginfo->items = ginPostingListDecode(seginfo->seg,
663                                                                                                   &seginfo->nitems);
664                 if (seginfo->seg)
665                         oldsegsize = SizeOfGinPostingList(seginfo->seg);
666                 else
667                         oldsegsize = GinDataLeafMaxContentSize;
668
669                 cleaned = ginVacuumItemPointers(gvs,
670                                                                                 seginfo->items,
671                                                                                 seginfo->nitems,
672                                                                                 &ncleaned);
673                 pfree(seginfo->items);
674                 seginfo->items = NULL;
675                 seginfo->nitems = 0;
676                 if (cleaned)
677                 {
678                         if (ncleaned > 0)
679                         {
680                                 int                     npacked;
681
682                                 seginfo->seg = ginCompressPostingList(cleaned,
683                                                                                                           ncleaned,
684                                                                                                           oldsegsize,
685                                                                                                           &npacked);
686                                 /* Removing an item never increases the size of the segment */
687                                 if (npacked != ncleaned)
688                                         elog(ERROR, "could not fit vacuumed posting list");
689                         }
690                         else
691                         {
692                                 seginfo->seg = NULL;
693                                 seginfo->items = NULL;
694                         }
695                         seginfo->nitems = ncleaned;
696                         seginfo->modified = true;
697
698                         removedsomething = true;
699                 }
700         }
701
702         /*
703          * If we removed any items, reconstruct the page from the pieces.
704          *
705          * We don't try to re-encode the segments here, even though some of them
706          * might be really small, now that we've removed some items from them.
707          * It seems like a waste of effort, as there isn't really any benefit from
708          * larger segments per se; larger segments only help you to pack more
709          * items in the same space. We might as well delay doing that until the
710          * next insertion, which will need to re-encode at least part of the page
711          * anyway.
712          *
713          * Also note if the page was in uncompressed, pre-9.4 format before, it
714          * is now represented as one huge segment that contains all the items.
715          * It might make sense to split that, to speed up random access, but we
716          * don't bother. You'll have to REINDEX anyway if you want the full gain
717          * of the new tighter index format.
718          */
719         if (removedsomething)
720         {
721                 XLogRecData *payloadrdata;
722
723                 START_CRIT_SECTION();
724                 dataPlaceToPageLeafRecompress(buffer, leaf, &payloadrdata);
725
726                 MarkBufferDirty(buffer);
727
728                 if (RelationNeedsWAL(indexrel))
729                 {
730                         XLogRecPtr      recptr;
731                         XLogRecData rdata;
732                         ginxlogVacuumDataLeafPage xlrec;
733
734                         xlrec.node = indexrel->rd_node;
735                         xlrec.blkno = BufferGetBlockNumber(buffer);
736
737                         rdata.buffer = InvalidBuffer;
738                         rdata.data = (char *) &xlrec;
739                         rdata.len = offsetof(ginxlogVacuumDataLeafPage, data);
740                         rdata.next = payloadrdata;
741
742                         recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_VACUUM_DATA_LEAF_PAGE, &rdata);
743                         PageSetLSN(page, recptr);
744                 }
745
746                 END_CRIT_SECTION();
747         }
748 }
749
750 /*
751  * Assemble a disassembled posting tree leaf page back to a buffer.
752  *
753  * *prdata is filled with WAL information about this operation. The caller
754  * is responsible for inserting to the WAL, along with any other information
755  * about the operation that triggered this recompression.
756  *
757  * NOTE: The segment pointers can point directly to the same buffer, with
758  * the limitation that any earlier segment must not overlap with an original,
759  * later segment. In other words, some segments may point the original buffer
760  * as long as you don't make any segments larger. Currently, leafRepackItems
761  * satisies this rule because it rewrites all segments after the first
762  * modified one, and vacuum can only make segments shorter.
763  */
764 static void
765 dataPlaceToPageLeafRecompress(Buffer buf, disassembledLeaf *leaf,
766                                                           XLogRecData **prdata)
767 {
768         Page            page = BufferGetPage(buf);
769         char       *ptr;
770         int                     newsize;
771         int                     unmodifiedsize;
772         bool            modified = false;
773         dlist_iter      iter;
774         /*
775          * these must be static so they can be returned to caller (no pallocs
776          * since we're in a critical section!)
777          */
778         static ginxlogRecompressDataLeaf recompress_xlog;
779         static XLogRecData rdata[2];
780
781         ptr = (char *) GinDataLeafPageGetPostingList(page);
782         newsize = 0;
783         unmodifiedsize = 0;
784         modified = false;
785         dlist_foreach(iter, &leaf->segments)
786         {
787                 leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node, iter.cur);
788                 int                     segsize;
789
790                 if (seginfo->modified)
791                         modified = true;
792
793                 /*
794                  * Nothing to do with empty segments, except keep track if they've been
795                  * modified
796                  */
797                 if (seginfo->seg == NULL)
798                 {
799                         Assert(seginfo->items == NULL);
800                         continue;
801                 }
802
803                 segsize = SizeOfGinPostingList(seginfo->seg);
804
805                 if (!modified)
806                         unmodifiedsize += segsize;
807                 else
808                 {
809                         /*
810                          * Use memmove rather than memcpy, in case the segment points
811                          * to the same buffer
812                          */
813                         memmove(ptr, seginfo->seg, segsize);
814                 }
815                 ptr += segsize;
816                 newsize += segsize;
817         }
818         Assert(newsize <= GinDataLeafMaxContentSize);
819         GinDataLeafPageSetPostingListSize(page, newsize);
820
821         /* Reset these in case the page was in pre-9.4 format before */
822         GinPageSetCompressed(page);
823         GinPageGetOpaque(page)->maxoff = InvalidOffsetNumber;
824
825         /* Put WAL data */
826         recompress_xlog.length = (uint16) newsize;
827         recompress_xlog.unmodifiedsize = unmodifiedsize;
828
829         rdata[0].buffer = InvalidBuffer;
830         rdata[0].data = (char *) &recompress_xlog;
831         rdata[0].len = offsetof(ginxlogRecompressDataLeaf, newdata);
832         rdata[0].next = &rdata[1];
833
834         rdata[1].buffer = buf;
835         rdata[1].buffer_std = TRUE;
836         rdata[1].data = ((char *) GinDataLeafPageGetPostingList(page)) + unmodifiedsize;
837         rdata[1].len = newsize - unmodifiedsize;
838         rdata[1].next = NULL;
839
840         *prdata = rdata;
841 }
842
843 /*
844  * Like dataPlaceToPageLeafRecompress, but writes the disassembled leaf
845  * segments to two pages instead of one.
846  *
847  * This is different from the non-split cases in that this does not modify
848  * the original page directly, but to temporary in-memory copies of the new
849  * left and right pages.
850  */
851 static void
852 dataPlaceToPageLeafSplit(Buffer buf, disassembledLeaf *leaf,
853                                                  ItemPointerData lbound, ItemPointerData rbound,
854                                                  XLogRecData **prdata, Page lpage, Page rpage)
855 {
856         char       *ptr;
857         int                     segsize;
858         int                     lsize;
859         int                     rsize;
860         dlist_node *node;
861         dlist_node *firstright;
862         leafSegmentInfo *seginfo;
863         /* these must be static so they can be returned to caller */
864         static ginxlogSplitDataLeaf split_xlog;
865         static XLogRecData rdata[3];
866
867         /* Initialize temporary pages to hold the new left and right pages */
868         GinInitPage(lpage, GIN_DATA | GIN_LEAF | GIN_COMPRESSED, BLCKSZ);
869         GinInitPage(rpage, GIN_DATA | GIN_LEAF | GIN_COMPRESSED, BLCKSZ);
870
871         /*
872          * Copy the segments that go to the left page.
873          *
874          * XXX: We should skip copying the unmodified part of the left page, like
875          * we do when recompressing.
876          */
877         lsize = 0;
878         ptr = (char *) GinDataLeafPageGetPostingList(lpage);
879         firstright = dlist_next_node(&leaf->segments, leaf->lastleft);
880         for (node = dlist_head_node(&leaf->segments);
881                  node != firstright;
882                  node = dlist_next_node(&leaf->segments, node))
883         {
884                 seginfo = dlist_container(leafSegmentInfo, node, node);
885                 segsize = SizeOfGinPostingList(seginfo->seg);
886
887                 memcpy(ptr, seginfo->seg, segsize);
888                 ptr += segsize;
889                 lsize += segsize;
890         }
891         Assert(lsize == leaf->lsize);
892         GinDataLeafPageSetPostingListSize(lpage, lsize);
893         *GinDataPageGetRightBound(lpage) = lbound;
894
895         /* Copy the segments that go to the right page */
896         ptr = (char *) GinDataLeafPageGetPostingList(rpage);
897         rsize = 0;
898         for (node = firstright;
899                  ;
900                  node = dlist_next_node(&leaf->segments, node))
901         {
902                 seginfo = dlist_container(leafSegmentInfo, node, node);
903                 segsize = SizeOfGinPostingList(seginfo->seg);
904
905                 memcpy(ptr, seginfo->seg, segsize);
906                 ptr += segsize;
907                 rsize += segsize;
908
909                 if (!dlist_has_next(&leaf->segments, node))
910                         break;
911         }
912         Assert(rsize == leaf->rsize);
913         GinDataLeafPageSetPostingListSize(rpage, rsize);
914         *GinDataPageGetRightBound(rpage) = rbound;
915
916         /* Create WAL record */
917         split_xlog.lsize = lsize;
918         split_xlog.rsize = rsize;
919         split_xlog.lrightbound = lbound;
920         split_xlog.rrightbound = rbound;
921
922         rdata[0].buffer = InvalidBuffer;
923         rdata[0].data = (char *) &split_xlog;
924         rdata[0].len = sizeof(ginxlogSplitDataLeaf);
925         rdata[0].next = &rdata[1];
926
927         rdata[1].buffer = InvalidBuffer;
928         rdata[1].data = (char *) GinDataLeafPageGetPostingList(lpage);
929         rdata[1].len = lsize;
930         rdata[1].next = &rdata[2];
931
932         rdata[2].buffer = InvalidBuffer;
933         rdata[2].data = (char *) GinDataLeafPageGetPostingList(rpage);
934         rdata[2].len = rsize;
935         rdata[2].next = NULL;
936
937         *prdata = rdata;
938 }
939
940 /*
941  * Place a PostingItem to page, and fill a WAL record.
942  *
943  * If the item doesn't fit, returns false without modifying the page.
944  *
945  * In addition to inserting the given item, the downlink of the existing item
946  * at 'off' is updated to point to 'updateblkno'.
947  */
948 static GinPlaceToPageRC
949 dataPlaceToPageInternal(GinBtree btree, Buffer buf, GinBtreeStack *stack,
950                                                 void *insertdata, BlockNumber updateblkno,
951                                                 XLogRecData **prdata, Page *newlpage, Page *newrpage)
952 {
953         Page            page = BufferGetPage(buf);
954         OffsetNumber off = stack->off;
955         PostingItem *pitem;
956         /* these must be static so they can be returned to caller */
957         static XLogRecData rdata;
958         static ginxlogInsertDataInternal data;
959
960         /* split if we have to */
961         if (GinNonLeafDataPageGetFreeSpace(page) < sizeof(PostingItem))
962         {
963                 dataSplitPageInternal(btree, buf, stack, insertdata, updateblkno,
964                                                           prdata, newlpage, newrpage);
965                 return SPLIT;
966         }
967
968         *prdata = &rdata;
969         Assert(GinPageIsData(page));
970
971         START_CRIT_SECTION();
972
973         /* Update existing downlink to point to next page (on internal page) */
974         pitem = GinDataPageGetPostingItem(page, off);
975         PostingItemSetBlockNumber(pitem, updateblkno);
976
977         /* Add new item */
978         pitem = (PostingItem *) insertdata;
979         GinDataPageAddPostingItem(page, pitem, off);
980
981         data.offset = off;
982         data.newitem = *pitem;
983
984         rdata.buffer = buf;
985         rdata.buffer_std = false;
986         rdata.data = (char *) &data;
987         rdata.len = sizeof(ginxlogInsertDataInternal);
988         rdata.next = NULL;
989
990         return INSERTED;
991 }
992
993 /*
994  * Places an item (or items) to a posting tree. Calls relevant function of
995  * internal of leaf page because they are handled very differently.
996  */
997 static GinPlaceToPageRC
998 dataPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
999                                 void *insertdata, BlockNumber updateblkno,
1000                                 XLogRecData **prdata,
1001                                 Page *newlpage, Page *newrpage)
1002 {
1003         Page            page = BufferGetPage(buf);
1004
1005         Assert(GinPageIsData(page));
1006
1007         if (GinPageIsLeaf(page))
1008                 return dataPlaceToPageLeaf(btree, buf, stack, insertdata,
1009                                                                    prdata, newlpage, newrpage);
1010         else
1011                 return dataPlaceToPageInternal(btree, buf, stack,
1012                                                                            insertdata, updateblkno,
1013                                                                            prdata, newlpage, newrpage);
1014 }
1015
1016 /*
1017  * Split page and fill WAL record. Returns a new temp buffer filled with data
1018  * that should go to the left page. The original buffer is left untouched.
1019  */
1020 static void
1021 dataSplitPageInternal(GinBtree btree, Buffer origbuf,
1022                                           GinBtreeStack *stack,
1023                                           void *insertdata, BlockNumber updateblkno,
1024                                           XLogRecData **prdata, Page *newlpage, Page *newrpage)
1025 {
1026         Page            oldpage = BufferGetPage(origbuf);
1027         OffsetNumber off = stack->off;
1028         int                     nitems = GinPageGetOpaque(oldpage)->maxoff;
1029         Size            pageSize = PageGetPageSize(oldpage);
1030         ItemPointerData oldbound = *GinDataPageGetRightBound(oldpage);
1031         ItemPointer     bound;
1032         Page            lpage;
1033         Page            rpage;
1034         OffsetNumber separator;
1035
1036         /* these must be static so they can be returned to caller */
1037         static ginxlogSplitDataInternal data;
1038         static XLogRecData rdata[4];
1039         static PostingItem allitems[(BLCKSZ / sizeof(PostingItem)) + 1];
1040
1041         lpage = PageGetTempPage(oldpage);
1042         rpage = PageGetTempPage(oldpage);
1043         GinInitPage(lpage, GinPageGetOpaque(oldpage)->flags, pageSize);
1044         GinInitPage(rpage, GinPageGetOpaque(oldpage)->flags, pageSize);
1045
1046         *prdata = rdata;
1047
1048         /*
1049          * First construct a new list of PostingItems, which includes all the
1050          * old items, and the new item.
1051          */
1052         memcpy(allitems, GinDataPageGetPostingItem(oldpage, FirstOffsetNumber),
1053                    (off - 1) * sizeof(PostingItem));
1054
1055         allitems[off - 1] = *((PostingItem *) insertdata);
1056         memcpy(&allitems[off], GinDataPageGetPostingItem(oldpage, off),
1057                    (nitems - (off - 1)) * sizeof(PostingItem));
1058         nitems++;
1059
1060         /* Update existing downlink to point to next page */
1061         PostingItemSetBlockNumber(&allitems[off], updateblkno);
1062
1063         /*
1064          * When creating a new index, fit as many tuples as possible on the left
1065          * page, on the assumption that the table is scanned from beginning to
1066          * end. This packs the index as tight as possible.
1067          */
1068         if (btree->isBuild && GinPageRightMost(oldpage))
1069                 separator = GinNonLeafDataPageGetFreeSpace(rpage) / sizeof(PostingItem);
1070         else
1071                 separator = nitems / 2;
1072
1073         memcpy(GinDataPageGetPostingItem(lpage, FirstOffsetNumber), allitems, separator * sizeof(PostingItem));
1074         GinPageGetOpaque(lpage)->maxoff = separator;
1075         memcpy(GinDataPageGetPostingItem(rpage, FirstOffsetNumber),
1076                  &allitems[separator], (nitems - separator) * sizeof(PostingItem));
1077         GinPageGetOpaque(rpage)->maxoff = nitems - separator;
1078
1079         /* set up right bound for left page */
1080         bound = GinDataPageGetRightBound(lpage);
1081         *bound = GinDataPageGetPostingItem(lpage,
1082                                                                   GinPageGetOpaque(lpage)->maxoff)->key;
1083
1084         /* set up right bound for right page */
1085         *GinDataPageGetRightBound(rpage) = oldbound;
1086
1087         data.separator = separator;
1088         data.nitem = nitems;
1089         data.rightbound = oldbound;
1090
1091         rdata[0].buffer = InvalidBuffer;
1092         rdata[0].data = (char *) &data;
1093         rdata[0].len = sizeof(ginxlogSplitDataInternal);
1094         rdata[0].next = &rdata[1];
1095
1096         rdata[1].buffer = InvalidBuffer;
1097         rdata[1].data = (char *) allitems;
1098         rdata[1].len = nitems * sizeof(PostingItem);
1099         rdata[1].next = NULL;
1100
1101         *newlpage = lpage;
1102         *newrpage = rpage;
1103 }
1104
1105 /*
1106  * Construct insertion payload for inserting the downlink for given buffer.
1107  */
1108 static void *
1109 dataPrepareDownlink(GinBtree btree, Buffer lbuf)
1110 {
1111         PostingItem *pitem = palloc(sizeof(PostingItem));
1112         Page            lpage = BufferGetPage(lbuf);
1113
1114         PostingItemSetBlockNumber(pitem, BufferGetBlockNumber(lbuf));
1115         pitem->key = *GinDataPageGetRightBound(lpage);
1116
1117         return pitem;
1118 }
1119
1120 /*
1121  * Fills new root by right bound values from child.
1122  * Also called from ginxlog, should not use btree
1123  */
1124 void
1125 ginDataFillRoot(GinBtree btree, Page root, BlockNumber lblkno, Page lpage, BlockNumber rblkno, Page rpage)
1126 {
1127         PostingItem li,
1128                                 ri;
1129
1130         li.key = *GinDataPageGetRightBound(lpage);
1131         PostingItemSetBlockNumber(&li, lblkno);
1132         GinDataPageAddPostingItem(root, &li, InvalidOffsetNumber);
1133
1134         ri.key = *GinDataPageGetRightBound(rpage);
1135         PostingItemSetBlockNumber(&ri, rblkno);
1136         GinDataPageAddPostingItem(root, &ri, InvalidOffsetNumber);
1137 }
1138
1139
1140 /*** Functions to work with disassembled leaf pages ***/
1141
1142 /*
1143  * Disassemble page into a disassembledLeaf struct.
1144  */
1145 static disassembledLeaf *
1146 disassembleLeaf(Page page)
1147 {
1148         disassembledLeaf *leaf;
1149         GinPostingList *seg;
1150         Pointer         segbegin;
1151         Pointer         segend;
1152
1153         leaf = palloc0(sizeof(disassembledLeaf));
1154         dlist_init(&leaf->segments);
1155
1156         if (GinPageIsCompressed(page))
1157         {
1158                 /*
1159                  * Create a leafSegment entry for each segment.
1160                  */
1161                 seg = GinDataLeafPageGetPostingList(page);
1162                 segbegin = (Pointer) seg;
1163                 segend = segbegin + GinDataLeafPageGetPostingListSize(page);
1164                 while ((Pointer) seg < segend)
1165                 {
1166                         leafSegmentInfo *seginfo = palloc(sizeof(leafSegmentInfo));
1167
1168                         seginfo->seg = seg;
1169                         seginfo->items = NULL;
1170                         seginfo->nitems = 0;
1171                         seginfo->modified = false;
1172                         dlist_push_tail(&leaf->segments, &seginfo->node);
1173
1174                         seg = GinNextPostingListSegment(seg);
1175                 }
1176         }
1177         else
1178         {
1179                 /*
1180                  * A pre-9.4 format uncompressed page is represented by a single
1181                  * segment, with an array of items.
1182                  */
1183                 ItemPointer uncompressed;
1184                 int                     nuncompressed;
1185                 leafSegmentInfo *seginfo;
1186
1187                 uncompressed = dataLeafPageGetUncompressed(page, &nuncompressed);
1188
1189                 seginfo = palloc(sizeof(leafSegmentInfo));
1190
1191                 seginfo->seg = NULL;
1192                 seginfo->items = palloc(nuncompressed * sizeof(ItemPointerData));
1193                 memcpy(seginfo->items, uncompressed, nuncompressed * sizeof(ItemPointerData));
1194                 seginfo->nitems = nuncompressed;
1195                 /* make sure we rewrite this to disk */
1196                 seginfo->modified = true;
1197
1198                 dlist_push_tail(&leaf->segments, &seginfo->node);
1199         }
1200
1201         return leaf;
1202 }
1203
1204 /*
1205  * Distribute newItems to the segments.
1206  *
1207  * Any segments that acquire new items are decoded, and the new items are
1208  * merged with the old items.
1209  *
1210  * Returns true if any new items were added. False means they were all
1211  * duplicates of existing items on the page.
1212  */
1213 static bool
1214 addItemsToLeaf(disassembledLeaf *leaf, ItemPointer newItems, int nNewItems)
1215 {
1216         dlist_iter      iter;
1217         ItemPointer nextnew = newItems;
1218         int                     newleft = nNewItems;
1219         bool            modified = false;
1220
1221         /*
1222          * If the page is completely empty, just construct one new segment to
1223          * hold all the new items.
1224          */
1225         if (dlist_is_empty(&leaf->segments))
1226         {
1227                 /* create a new segment for the new entries */
1228                 leafSegmentInfo *seginfo = palloc(sizeof(leafSegmentInfo));
1229
1230                 seginfo->seg = NULL;
1231                 seginfo->items = newItems;
1232                 seginfo->nitems = nNewItems;
1233                 seginfo->modified = true;
1234                 dlist_push_tail(&leaf->segments, &seginfo->node);
1235                 return true;
1236         }
1237
1238         dlist_foreach(iter, &leaf->segments)
1239         {
1240                 leafSegmentInfo *cur = (leafSegmentInfo *)  dlist_container(leafSegmentInfo, node, iter.cur);
1241                 int                     nthis;
1242                 ItemPointer     tmpitems;
1243                 int                     ntmpitems;
1244
1245                 /*
1246                  * How many of the new items fall into this segment?
1247                  */
1248                 if (!dlist_has_next(&leaf->segments, iter.cur))
1249                         nthis = newleft;
1250                 else
1251                 {
1252                         leafSegmentInfo *next;
1253                         ItemPointerData next_first;
1254
1255                         next = (leafSegmentInfo *) dlist_container(leafSegmentInfo, node,
1256                                                                    dlist_next_node(&leaf->segments, iter.cur));
1257                         if (next->items)
1258                                 next_first = next->items[0];
1259                         else
1260                         {
1261                                 Assert(next->seg != NULL);
1262                                 next_first = next->seg->first;
1263                         }
1264
1265                         nthis = 0;
1266                         while (nthis < newleft && ginCompareItemPointers(&nextnew[nthis], &next_first) < 0)
1267                                 nthis++;
1268                 }
1269                 if (nthis == 0)
1270                         continue;
1271
1272                 /* Merge the new items with the existing items. */
1273                 if (!cur->items)
1274                         cur->items = ginPostingListDecode(cur->seg, &cur->nitems);
1275
1276                 tmpitems = palloc((cur->nitems + nthis) * sizeof(ItemPointerData));
1277                 ntmpitems = ginMergeItemPointers(tmpitems,
1278                                                                                  cur->items, cur->nitems,
1279                                                                                  nextnew, nthis);
1280                 if (ntmpitems != cur->nitems)
1281                 {
1282                         cur->items = tmpitems;
1283                         cur->nitems = ntmpitems;
1284                         cur->seg = NULL;
1285                         modified = cur->modified = true;
1286                 }
1287
1288                 nextnew += nthis;
1289                 newleft -= nthis;
1290                 if (newleft == 0)
1291                         break;
1292         }
1293
1294         return modified;
1295 }
1296
1297 /*
1298  * Recompresses all segments that have been modified.
1299  *
1300  * If not all the items fit on two pages (ie. after split), we store as
1301  * many items as fit, and set *remaining to the first item that didn't fit.
1302  * If all items fit, *remaining is set to invalid.
1303  *
1304  * Returns true if the page has to be split.
1305  *
1306  * XXX: Actually, this re-encodes all segments after the first one that was
1307  * modified, to make sure the new segments are all more or less of equal
1308  * length. That's unnecessarily aggressive; if we've only added a single item
1309  * to one segment, for example, we could re-encode just that single segment,
1310  * as long as it's still smaller than, say, 2x the normal segment size.
1311  */
1312 static bool
1313 leafRepackItems(disassembledLeaf *leaf, ItemPointer remaining)
1314 {
1315         dlist_iter      iter;
1316         ItemPointer allitems;
1317         int                     nallitems;
1318         int                     pgused = 0;
1319         bool            needsplit;
1320         int                     totalpacked;
1321         dlist_mutable_iter miter;
1322         dlist_head      recode_list;
1323         int                     nrecode;
1324         bool            recoding;
1325
1326         ItemPointerSetInvalid(remaining);
1327
1328         /*
1329          * Find the first segment that needs to be re-coded. Move all segments
1330          * that need recoding to separate list, and count the total number of
1331          * items in them. Also, add up the number of bytes in unmodified segments
1332          * (pgused).
1333          */
1334         dlist_init(&recode_list);
1335         recoding = false;
1336         nrecode = 0;
1337         pgused = 0;
1338         dlist_foreach_modify(miter, &leaf->segments)
1339         {
1340                 leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node, miter.cur);
1341
1342                 /* If the segment was modified, re-encode it */
1343                 if (seginfo->modified || seginfo->seg == NULL)
1344                         recoding = true;
1345                 /*
1346                  * Also re-encode abnormally small or large segments. (Vacuum can
1347                  * leave behind small segments, and conversion from pre-9.4 format
1348                  * can leave behind large segments).
1349                  */
1350                 else if (SizeOfGinPostingList(seginfo->seg) < GinPostingListSegmentMinSize)
1351                         recoding = true;
1352                 else if (SizeOfGinPostingList(seginfo->seg) > GinPostingListSegmentMaxSize)
1353                         recoding = true;
1354
1355                 if (recoding)
1356                 {
1357                         if (!seginfo->items)
1358                                 seginfo->items = ginPostingListDecode(seginfo->seg,
1359                                                                                                           &seginfo->nitems);
1360                         nrecode += seginfo->nitems;
1361
1362                         dlist_delete(miter.cur);
1363                         dlist_push_tail(&recode_list, &seginfo->node);
1364                 }
1365                 else
1366                         pgused += SizeOfGinPostingList(seginfo->seg);
1367         }
1368
1369         if (nrecode == 0)
1370                 return false; /* nothing to do */
1371
1372         /*
1373          * Construct one big array of the items that need to be re-encoded.
1374          */
1375         allitems = palloc(nrecode * sizeof(ItemPointerData));
1376         nallitems = 0;
1377         dlist_foreach(iter, &recode_list)
1378         {
1379                 leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node, iter.cur);
1380                 memcpy(&allitems[nallitems], seginfo->items, seginfo->nitems * sizeof(ItemPointerData));
1381                 nallitems += seginfo->nitems;
1382         }
1383         Assert(nallitems == nrecode);
1384
1385         /*
1386          * Start packing the items into segments. Stop when we have consumed
1387          * enough space to fill both pages, or we run out of items.
1388          */
1389         totalpacked = 0;
1390         needsplit = false;
1391         while (totalpacked < nallitems)
1392         {
1393                 leafSegmentInfo *seginfo;
1394                 int                     npacked;
1395                 GinPostingList *seg;
1396                 int                     segsize;
1397
1398                 seg = ginCompressPostingList(&allitems[totalpacked],
1399                                                                          nallitems - totalpacked,
1400                                                                          GinPostingListSegmentMaxSize,
1401                                                                          &npacked);
1402                 segsize = SizeOfGinPostingList(seg);
1403                 if (pgused + segsize > GinDataLeafMaxContentSize)
1404                 {
1405                         if (!needsplit)
1406                         {
1407                                 /* switch to right page */
1408                                 Assert(pgused > 0);
1409                                 leaf->lastleft = dlist_tail_node(&leaf->segments);
1410                                 needsplit = true;
1411                                 leaf->lsize = pgused;
1412                                 pgused = 0;
1413                         }
1414                         else
1415                         {
1416                                 /* filled both pages */
1417                                 *remaining = allitems[totalpacked];
1418                                 break;
1419                         }
1420                 }
1421
1422                 seginfo = palloc(sizeof(leafSegmentInfo));
1423                 seginfo->seg = seg;
1424                 seginfo->items = &allitems[totalpacked];
1425                 seginfo->nitems = npacked;
1426                 seginfo->modified = true;
1427
1428                 dlist_push_tail(&leaf->segments, &seginfo->node);
1429
1430                 pgused += segsize;
1431                 totalpacked += npacked;
1432         }
1433
1434         if (!needsplit)
1435         {
1436                 leaf->lsize = pgused;
1437                 leaf->rsize = 0;
1438         }
1439         else
1440                 leaf->rsize = pgused;
1441
1442         Assert(leaf->lsize <= GinDataLeafMaxContentSize);
1443         Assert(leaf->rsize <= GinDataLeafMaxContentSize);
1444
1445         return needsplit;
1446 }
1447
1448
1449 /*** Functions that are exported to the rest of the GIN code ***/
1450
1451 /*
1452  * Creates new posting tree containing the given TIDs. Returns the page
1453  * number of the root of the new posting tree.
1454  *
1455  * items[] must be in sorted order with no duplicates.
1456  */
1457 BlockNumber
1458 createPostingTree(Relation index, ItemPointerData *items, uint32 nitems,
1459                                   GinStatsData *buildStats)
1460 {
1461         BlockNumber blkno;
1462         Buffer          buffer;
1463         Page            page;
1464         Pointer         ptr;
1465         int                     nrootitems;
1466         int                     rootsize;
1467
1468         /*
1469          * Create the root page.
1470          */
1471         buffer = GinNewBuffer(index);
1472         page = BufferGetPage(buffer);
1473         blkno = BufferGetBlockNumber(buffer);
1474
1475         START_CRIT_SECTION();
1476
1477         GinInitPage(page, GIN_DATA | GIN_LEAF | GIN_COMPRESSED, BLCKSZ);
1478         GinPageGetOpaque(page)->rightlink = InvalidBlockNumber;
1479
1480         /*
1481          * Write as many of the items to the root page as fit. In segments
1482          * of max GinPostingListSegmentMaxSize bytes each.
1483          */
1484         nrootitems = 0;
1485         rootsize = 0;
1486         ptr = (Pointer) GinDataLeafPageGetPostingList(page);
1487         while (nrootitems < nitems)
1488         {
1489                 GinPostingList *segment;
1490                 int                     npacked;
1491                 int                     segsize;
1492
1493                 segment = ginCompressPostingList(&items[nrootitems],
1494                                                                                  nitems - nrootitems,
1495                                                                                  GinPostingListSegmentMaxSize,
1496                                                                                  &npacked);
1497                 segsize = SizeOfGinPostingList(segment);
1498                 if (rootsize + segsize > GinDataLeafMaxContentSize)
1499                         break;
1500
1501                 memcpy(ptr, segment, segsize);
1502                 ptr += segsize;
1503                 rootsize += segsize;
1504                 nrootitems += npacked;
1505                 pfree(segment);
1506         }
1507         GinDataLeafPageSetPostingListSize(page, rootsize);
1508         MarkBufferDirty(buffer);
1509
1510         elog(DEBUG2, "created GIN posting tree with %d items", nrootitems);
1511
1512         if (RelationNeedsWAL(index))
1513         {
1514                 XLogRecPtr      recptr;
1515                 XLogRecData rdata[2];
1516                 ginxlogCreatePostingTree data;
1517
1518                 data.node = index->rd_node;
1519                 data.blkno = blkno;
1520                 data.size = rootsize;
1521
1522                 rdata[0].buffer = InvalidBuffer;
1523                 rdata[0].data = (char *) &data;
1524                 rdata[0].len = sizeof(ginxlogCreatePostingTree);
1525                 rdata[0].next = &rdata[1];
1526
1527                 rdata[1].buffer = InvalidBuffer;
1528                 rdata[1].data = (char *) GinDataLeafPageGetPostingList(page);
1529                 rdata[1].len = rootsize;
1530                 rdata[1].next = NULL;
1531
1532                 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_CREATE_PTREE, rdata);
1533                 PageSetLSN(page, recptr);
1534         }
1535
1536         UnlockReleaseBuffer(buffer);
1537
1538         END_CRIT_SECTION();
1539
1540         /* During index build, count the newly-added data page */
1541         if (buildStats)
1542                 buildStats->nDataPages++;
1543
1544         /*
1545          * Add any remaining TIDs to the newly-created posting tree.
1546          */
1547         if (nitems > nrootitems)
1548         {
1549                 ginInsertItemPointers(index, blkno,
1550                                                           items + nrootitems,
1551                                                           nitems - nrootitems,
1552                                                           buildStats);
1553         }
1554
1555         return blkno;
1556 }
1557
1558 void
1559 ginPrepareDataScan(GinBtree btree, Relation index, BlockNumber rootBlkno)
1560 {
1561         memset(btree, 0, sizeof(GinBtreeData));
1562
1563         btree->index = index;
1564         btree->rootBlkno = rootBlkno;
1565
1566         btree->findChildPage = dataLocateItem;
1567         btree->getLeftMostChild = dataGetLeftMostPage;
1568         btree->isMoveRight = dataIsMoveRight;
1569         btree->findItem = NULL;
1570         btree->findChildPtr = dataFindChildPtr;
1571         btree->placeToPage = dataPlaceToPage;
1572         btree->fillRoot = ginDataFillRoot;
1573         btree->prepareDownlink = dataPrepareDownlink;
1574
1575         btree->isData = TRUE;
1576         btree->fullScan = FALSE;
1577         btree->isBuild = FALSE;
1578 }
1579
1580 /*
1581  * Inserts array of item pointers, may execute several tree scan (very rare)
1582  */
1583 void
1584 ginInsertItemPointers(Relation index, BlockNumber rootBlkno,
1585                                           ItemPointerData *items, uint32 nitem,
1586                                           GinStatsData *buildStats)
1587 {
1588         GinBtreeData btree;
1589         GinBtreeDataLeafInsertData insertdata;
1590         GinBtreeStack *stack;
1591
1592         ginPrepareDataScan(&btree, index, rootBlkno);
1593         btree.isBuild = (buildStats != NULL);
1594         insertdata.items = items;
1595         insertdata.nitem = nitem;
1596         insertdata.curitem = 0;
1597
1598         while (insertdata.curitem < insertdata.nitem)
1599         {
1600                 /* search for the leaf page where the first item should go to */
1601                 btree.itemptr = insertdata.items[insertdata.curitem];
1602                 stack = ginFindLeafPage(&btree, false);
1603
1604                 ginInsertValue(&btree, stack, &insertdata, buildStats);
1605         }
1606 }
1607
1608 /*
1609  * Starts a new scan on a posting tree.
1610  */
1611 GinBtreeStack *
1612 ginScanBeginPostingTree(Relation index, BlockNumber rootBlkno)
1613 {
1614         GinBtreeData btree;
1615         GinBtreeStack *stack;
1616
1617         ginPrepareDataScan(&btree, index, rootBlkno);
1618
1619         btree.fullScan = TRUE;
1620
1621         stack = ginFindLeafPage(&btree, TRUE);
1622
1623         return stack;
1624 }