]> granicus.if.org Git - postgresql/blob - src/backend/access/gin/gindatapage.c
76e0cb3e19f7bdea54edb9e468efea1d902b8940
[postgresql] / src / backend / access / gin / gindatapage.c
1 /*-------------------------------------------------------------------------
2  *
3  * gindatapage.c
4  *        routines for handling GIN posting tree pages.
5  *
6  *
7  * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *                      src/backend/access/gin/gindatapage.c
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres.h"
16
17 #include "access/gin_private.h"
18 #include "lib/ilist.h"
19 #include "miscadmin.h"
20 #include "utils/memutils.h"
21 #include "utils/rel.h"
22
23 /*
24  * Min, Max and Target size of posting lists stored on leaf pages, in bytes.
25  *
26  * The code can deal with any size, but random access is more efficient when
27  * a number of smaller lists are stored, rather than one big list. If a
28  * posting list would become larger than Max size as a result of insertions,
29  * it is split into two. If a posting list would be smaller than minimum
30  * size, it is merged with the next posting list.
31  */
32 #define GinPostingListSegmentMaxSize 384
33 #define GinPostingListSegmentTargetSize 256
34 #define GinPostingListSegmentMinSize 128
35
36 /*
37  * At least this many items fit in a GinPostingListSegmentMaxSize-bytes
38  * long segment. This is used when estimating how much space is required
39  * for N items, at minimum.
40  */
41 #define MinTuplesPerSegment ((GinPostingListSegmentMaxSize - 2) / 6)
42
43 /*
44  * A working struct for manipulating a posting tree leaf page.
45  */
46 typedef struct
47 {
48         dlist_head      segments;               /* a list of leafSegmentInfos */
49
50         /*
51          * The following fields represent how the segments are split across pages,
52          * if a page split is required. Filled in by leafRepackItems.
53          */
54         dlist_node *lastleft;           /* last segment on left page */
55         int                     lsize;                  /* total size on left page */
56         int                     rsize;                  /* total size on right page */
57
58         bool            oldformat;              /* page is in pre-9.4 format on disk */
59 } disassembledLeaf;
60
61 typedef struct
62 {
63         dlist_node      node;                   /* linked list pointers */
64
65         /*-------------
66          * 'action' indicates the status of this in-memory segment, compared to
67          * what's on disk. It is one of the GIN_SEGMENT_* action codes:
68          *
69          * UNMODIFIED   no changes
70          * DELETE               the segment is to be removed. 'seg' and 'items' are
71          *                              ignored
72          * INSERT               this is a completely new segment
73          * REPLACE              this replaces an existing segment with new content
74          * ADDITEMS             like REPLACE, but no items have been removed, and we track
75          *                              in detail what items have been added to this segment, in
76          *                              'modifieditems'
77          *-------------
78          */
79         char            action;
80
81         ItemPointerData *modifieditems;
82         int                     nmodifieditems;
83
84         /*
85          * The following fields represent the items in this segment. If 'items' is
86          * not NULL, it contains a palloc'd array of the itemsin this segment. If
87          * 'seg' is not NULL, it contains the items in an already-compressed
88          * format. It can point to an on-disk page (!modified), or a palloc'd
89          * segment in memory. If both are set, they must represent the same items.
90          */
91         GinPostingList *seg;
92         ItemPointer items;
93         int                     nitems;                 /* # of items in 'items', if items != NULL */
94 } leafSegmentInfo;
95
96 static ItemPointer dataLeafPageGetUncompressed(Page page, int *nitems);
97 static void dataSplitPageInternal(GinBtree btree, Buffer origbuf,
98                                           GinBtreeStack *stack,
99                                           void *insertdata, BlockNumber updateblkno,
100                                           XLogRecData **prdata, Page *newlpage, Page *newrpage);
101
102 static disassembledLeaf *disassembleLeaf(Page page);
103 static bool leafRepackItems(disassembledLeaf *leaf, ItemPointer remaining);
104 static bool addItemsToLeaf(disassembledLeaf *leaf, ItemPointer newItems,
105                            int nNewItems);
106
107 static XLogRecData *constructLeafRecompressWALData(Buffer buf,
108                                                            disassembledLeaf *leaf);
109 static void dataPlaceToPageLeafRecompress(Buffer buf, disassembledLeaf *leaf);
110 static void dataPlaceToPageLeafSplit(Buffer buf,
111                                                  disassembledLeaf *leaf,
112                                                  ItemPointerData lbound, ItemPointerData rbound,
113                                                  XLogRecData **prdata, Page lpage, Page rpage);
114
115 /*
116  * Read TIDs from leaf data page to single uncompressed array. The TIDs are
117  * returned in ascending order.
118  *
119  * advancePast is a hint, indicating that the caller is only interested in
120  * TIDs > advancePast. To return all items, use ItemPointerSetMin.
121  *
122  * Note: This function can still return items smaller than advancePast that
123  * are in the same posting list as the items of interest, so the caller must
124  * still check all the returned items. But passing it allows this function to
125  * skip whole posting lists.
126  */
127 ItemPointer
128 GinDataLeafPageGetItems(Page page, int *nitems, ItemPointerData advancePast)
129 {
130         ItemPointer result;
131
132         if (GinPageIsCompressed(page))
133         {
134                 GinPostingList *seg = GinDataLeafPageGetPostingList(page);
135                 Size            len = GinDataLeafPageGetPostingListSize(page);
136                 Pointer         endptr = ((Pointer) seg) + len;
137                 GinPostingList *next;
138
139                 /* Skip to the segment containing advancePast+1 */
140                 if (ItemPointerIsValid(&advancePast))
141                 {
142                         next = GinNextPostingListSegment(seg);
143                         while ((Pointer) next < endptr &&
144                                    ginCompareItemPointers(&next->first, &advancePast) <= 0)
145                         {
146                                 seg = next;
147                                 next = GinNextPostingListSegment(seg);
148                         }
149                         len = endptr - (Pointer) seg;
150                 }
151
152                 if (len > 0)
153                         result = ginPostingListDecodeAllSegments(seg, len, nitems);
154                 else
155                 {
156                         result = NULL;
157                         *nitems = 0;
158                 }
159         }
160         else
161         {
162                 ItemPointer tmp = dataLeafPageGetUncompressed(page, nitems);
163
164                 result = palloc((*nitems) * sizeof(ItemPointerData));
165                 memcpy(result, tmp, (*nitems) * sizeof(ItemPointerData));
166         }
167
168         return result;
169 }
170
171 /*
172  * Places all TIDs from leaf data page to bitmap.
173  */
174 int
175 GinDataLeafPageGetItemsToTbm(Page page, TIDBitmap *tbm)
176 {
177         ItemPointer uncompressed;
178         int                     nitems;
179
180         if (GinPageIsCompressed(page))
181         {
182                 GinPostingList *segment = GinDataLeafPageGetPostingList(page);
183                 Size            len = GinDataLeafPageGetPostingListSize(page);
184
185                 nitems = ginPostingListDecodeAllSegmentsToTbm(segment, len, tbm);
186         }
187         else
188         {
189                 uncompressed = dataLeafPageGetUncompressed(page, &nitems);
190
191                 if (nitems > 0)
192                         tbm_add_tuples(tbm, uncompressed, nitems, false);
193         }
194
195         return nitems;
196 }
197
198 /*
199  * Get pointer to the uncompressed array of items on a pre-9.4 format
200  * uncompressed leaf page. The number of items in the array is returned in
201  * *nitems.
202  */
203 static ItemPointer
204 dataLeafPageGetUncompressed(Page page, int *nitems)
205 {
206         ItemPointer items;
207
208         Assert(!GinPageIsCompressed(page));
209
210         /*
211          * In the old pre-9.4 page format, the whole page content is used for
212          * uncompressed items, and the number of items is stored in 'maxoff'
213          */
214         items = (ItemPointer) GinDataPageGetData(page);
215         *nitems = GinPageGetOpaque(page)->maxoff;
216
217         return items;
218 }
219
220 /*
221  * Check if we should follow the right link to find the item we're searching
222  * for.
223  *
224  * Compares inserting item pointer with the right bound of the current page.
225  */
226 static bool
227 dataIsMoveRight(GinBtree btree, Page page)
228 {
229         ItemPointer iptr = GinDataPageGetRightBound(page);
230
231         if (GinPageRightMost(page))
232                 return FALSE;
233
234         return (ginCompareItemPointers(&btree->itemptr, iptr) > 0) ? TRUE : FALSE;
235 }
236
237 /*
238  * Find correct PostingItem in non-leaf page. It is assumed that this is
239  * the correct page, and the searched value SHOULD be on the page.
240  */
241 static BlockNumber
242 dataLocateItem(GinBtree btree, GinBtreeStack *stack)
243 {
244         OffsetNumber low,
245                                 high,
246                                 maxoff;
247         PostingItem *pitem = NULL;
248         int                     result;
249         Page            page = BufferGetPage(stack->buffer);
250
251         Assert(!GinPageIsLeaf(page));
252         Assert(GinPageIsData(page));
253
254         if (btree->fullScan)
255         {
256                 stack->off = FirstOffsetNumber;
257                 stack->predictNumber *= GinPageGetOpaque(page)->maxoff;
258                 return btree->getLeftMostChild(btree, page);
259         }
260
261         low = FirstOffsetNumber;
262         maxoff = high = GinPageGetOpaque(page)->maxoff;
263         Assert(high >= low);
264
265         high++;
266
267         while (high > low)
268         {
269                 OffsetNumber mid = low + ((high - low) / 2);
270
271                 pitem = GinDataPageGetPostingItem(page, mid);
272
273                 if (mid == maxoff)
274                 {
275                         /*
276                          * Right infinity, page already correctly chosen with a help of
277                          * dataIsMoveRight
278                          */
279                         result = -1;
280                 }
281                 else
282                 {
283                         pitem = GinDataPageGetPostingItem(page, mid);
284                         result = ginCompareItemPointers(&btree->itemptr, &(pitem->key));
285                 }
286
287                 if (result == 0)
288                 {
289                         stack->off = mid;
290                         return PostingItemGetBlockNumber(pitem);
291                 }
292                 else if (result > 0)
293                         low = mid + 1;
294                 else
295                         high = mid;
296         }
297
298         Assert(high >= FirstOffsetNumber && high <= maxoff);
299
300         stack->off = high;
301         pitem = GinDataPageGetPostingItem(page, high);
302         return PostingItemGetBlockNumber(pitem);
303 }
304
305 /*
306  * Find link to blkno on non-leaf page, returns offset of PostingItem
307  */
308 static OffsetNumber
309 dataFindChildPtr(GinBtree btree, Page page, BlockNumber blkno, OffsetNumber storedOff)
310 {
311         OffsetNumber i,
312                                 maxoff = GinPageGetOpaque(page)->maxoff;
313         PostingItem *pitem;
314
315         Assert(!GinPageIsLeaf(page));
316         Assert(GinPageIsData(page));
317
318         /* if page isn't changed, we return storedOff */
319         if (storedOff >= FirstOffsetNumber && storedOff <= maxoff)
320         {
321                 pitem = GinDataPageGetPostingItem(page, storedOff);
322                 if (PostingItemGetBlockNumber(pitem) == blkno)
323                         return storedOff;
324
325                 /*
326                  * we hope, that needed pointer goes to right. It's true if there
327                  * wasn't a deletion
328                  */
329                 for (i = storedOff + 1; i <= maxoff; i++)
330                 {
331                         pitem = GinDataPageGetPostingItem(page, i);
332                         if (PostingItemGetBlockNumber(pitem) == blkno)
333                                 return i;
334                 }
335
336                 maxoff = storedOff - 1;
337         }
338
339         /* last chance */
340         for (i = FirstOffsetNumber; i <= maxoff; i++)
341         {
342                 pitem = GinDataPageGetPostingItem(page, i);
343                 if (PostingItemGetBlockNumber(pitem) == blkno)
344                         return i;
345         }
346
347         return InvalidOffsetNumber;
348 }
349
350 /*
351  * Return blkno of leftmost child
352  */
353 static BlockNumber
354 dataGetLeftMostPage(GinBtree btree, Page page)
355 {
356         PostingItem *pitem;
357
358         Assert(!GinPageIsLeaf(page));
359         Assert(GinPageIsData(page));
360         Assert(GinPageGetOpaque(page)->maxoff >= FirstOffsetNumber);
361
362         pitem = GinDataPageGetPostingItem(page, FirstOffsetNumber);
363         return PostingItemGetBlockNumber(pitem);
364 }
365
366 /*
367  * Add PostingItem to a non-leaf page.
368  */
369 void
370 GinDataPageAddPostingItem(Page page, PostingItem *data, OffsetNumber offset)
371 {
372         OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
373         char       *ptr;
374
375         Assert(PostingItemGetBlockNumber(data) != InvalidBlockNumber);
376         Assert(!GinPageIsLeaf(page));
377
378         if (offset == InvalidOffsetNumber)
379         {
380                 ptr = (char *) GinDataPageGetPostingItem(page, maxoff + 1);
381         }
382         else
383         {
384                 ptr = (char *) GinDataPageGetPostingItem(page, offset);
385                 if (offset != maxoff + 1)
386                         memmove(ptr + sizeof(PostingItem),
387                                         ptr,
388                                         (maxoff - offset + 1) *sizeof(PostingItem));
389         }
390         memcpy(ptr, data, sizeof(PostingItem));
391
392         maxoff++;
393         GinPageGetOpaque(page)->maxoff = maxoff;
394
395         /*
396          * Also set pd_lower to the end of the posting items, to follow the
397          * "standard" page layout, so that we can squeeze out the unused space
398          * from full-page images.
399          */
400         GinDataPageSetDataSize(page, maxoff * sizeof(PostingItem));
401 }
402
403 /*
404  * Delete posting item from non-leaf page
405  */
406 void
407 GinPageDeletePostingItem(Page page, OffsetNumber offset)
408 {
409         OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
410
411         Assert(!GinPageIsLeaf(page));
412         Assert(offset >= FirstOffsetNumber && offset <= maxoff);
413
414         if (offset != maxoff)
415                 memmove(GinDataPageGetPostingItem(page, offset),
416                                 GinDataPageGetPostingItem(page, offset + 1),
417                                 sizeof(PostingItem) * (maxoff - offset));
418
419         maxoff--;
420         GinPageGetOpaque(page)->maxoff = maxoff;
421
422         GinDataPageSetDataSize(page, maxoff * sizeof(PostingItem));
423 }
424
425 /*
426  * Places keys to leaf data page and fills WAL record.
427  */
428 static GinPlaceToPageRC
429 dataPlaceToPageLeaf(GinBtree btree, Buffer buf, GinBtreeStack *stack,
430                                         void *insertdata, XLogRecData **prdata,
431                                         Page *newlpage, Page *newrpage)
432 {
433         GinBtreeDataLeafInsertData *items = insertdata;
434         ItemPointer newItems = &items->items[items->curitem];
435         int                     maxitems = items->nitem - items->curitem;
436         Page            page = BufferGetPage(buf);
437         int                     i;
438         ItemPointerData rbound;
439         ItemPointerData lbound;
440         bool            needsplit;
441         bool            append;
442         int                     segsize;
443         Size            freespace;
444         MemoryContext tmpCxt;
445         MemoryContext oldCxt;
446         disassembledLeaf *leaf;
447         leafSegmentInfo *lastleftinfo;
448         ItemPointerData maxOldItem;
449         ItemPointerData remaining;
450
451         Assert(GinPageIsData(page));
452
453         rbound = *GinDataPageGetRightBound(page);
454
455         /*
456          * Count how many of the new items belong to this page.
457          */
458         if (!GinPageRightMost(page))
459         {
460                 for (i = 0; i < maxitems; i++)
461                 {
462                         if (ginCompareItemPointers(&newItems[i], &rbound) > 0)
463                         {
464                                 /*
465                                  * This needs to go to some other location in the tree. (The
466                                  * caller should've chosen the insert location so that at
467                                  * least the first item goes here.)
468                                  */
469                                 Assert(i > 0);
470                                 break;
471                         }
472                 }
473                 maxitems = i;
474         }
475
476         /*
477          * The following operations do quite a lot of small memory allocations,
478          * create a temporary memory context so that we don't need to keep track
479          * of them individually.
480          */
481         tmpCxt = AllocSetContextCreate(CurrentMemoryContext,
482                                                                    "Gin split temporary context",
483                                                                    ALLOCSET_DEFAULT_MINSIZE,
484                                                                    ALLOCSET_DEFAULT_INITSIZE,
485                                                                    ALLOCSET_DEFAULT_MAXSIZE);
486         oldCxt = MemoryContextSwitchTo(tmpCxt);
487
488         leaf = disassembleLeaf(page);
489
490         /*
491          * Are we appending to the end of the page? IOW, are all the new items
492          * larger than any of the existing items.
493          */
494         if (!dlist_is_empty(&leaf->segments))
495         {
496                 lastleftinfo = dlist_container(leafSegmentInfo, node,
497                                                                            dlist_tail_node(&leaf->segments));
498                 if (!lastleftinfo->items)
499                         lastleftinfo->items = ginPostingListDecode(lastleftinfo->seg,
500                                                                                                            &lastleftinfo->nitems);
501                 maxOldItem = lastleftinfo->items[lastleftinfo->nitems - 1];
502                 if (ginCompareItemPointers(&newItems[0], &maxOldItem) >= 0)
503                         append = true;
504                 else
505                         append = false;
506         }
507         else
508         {
509                 ItemPointerSetMin(&maxOldItem);
510                 append = true;
511         }
512
513         /*
514          * If we're appending to the end of the page, we will append as many items
515          * as we can fit (after splitting), and stop when the pages becomes full.
516          * Otherwise we have to limit the number of new items to insert, because
517          * once we start packing we can't just stop when we run out of space,
518          * because we must make sure that all the old items still fit.
519          */
520         if (GinPageIsCompressed(page))
521                 freespace = GinDataLeafPageGetFreeSpace(page);
522         else
523                 freespace = 0;
524         if (append)
525         {
526                 /*
527                  * Even when appending, trying to append more items than will fit is
528                  * not completely free, because we will merge the new items and old
529                  * items into an array below. In the best case, every new item fits in
530                  * a single byte, and we can use all the free space on the old page as
531                  * well as the new page. For simplicity, ignore segment overhead etc.
532                  */
533                 maxitems = Min(maxitems, freespace + GinDataPageMaxDataSize);
534         }
535         else
536         {
537                 /*
538                  * Calculate a conservative estimate of how many new items we can fit
539                  * on the two pages after splitting.
540                  *
541                  * We can use any remaining free space on the old page to store full
542                  * segments, as well as the new page. Each full-sized segment can hold
543                  * at least MinTuplesPerSegment items
544                  */
545                 int                     nnewsegments;
546
547                 nnewsegments = freespace / GinPostingListSegmentMaxSize;
548                 nnewsegments += GinDataPageMaxDataSize / GinPostingListSegmentMaxSize;
549                 maxitems = Min(maxitems, nnewsegments * MinTuplesPerSegment);
550         }
551
552         /* Add the new items to the segments */
553         if (!addItemsToLeaf(leaf, newItems, maxitems))
554         {
555                 /* all items were duplicates, we have nothing to do */
556                 items->curitem += maxitems;
557
558                 MemoryContextSwitchTo(oldCxt);
559                 MemoryContextDelete(tmpCxt);
560
561                 return UNMODIFIED;
562         }
563
564         /*
565          * Pack the items back to compressed segments, ready for writing to disk.
566          */
567         needsplit = leafRepackItems(leaf, &remaining);
568
569         /*
570          * Did all the new items fit?
571          *
572          * If we're appending, it's OK if they didn't. But as a sanity check,
573          * verify that all the old items fit.
574          */
575         if (ItemPointerIsValid(&remaining))
576         {
577                 if (!append || ItemPointerCompare(&maxOldItem, &remaining) >= 0)
578                         elog(ERROR, "could not split GIN page; all old items didn't fit");
579
580                 /* Count how many of the new items did fit. */
581                 for (i = 0; i < maxitems; i++)
582                 {
583                         if (ginCompareItemPointers(&newItems[i], &remaining) >= 0)
584                                 break;
585                 }
586                 if (i == 0)
587                         elog(ERROR, "could not split GIN page; no new items fit");
588                 maxitems = i;
589         }
590
591         if (!needsplit)
592         {
593                 /*
594                  * Great, all the items fit on a single page. Construct a WAL record
595                  * describing the changes we made, and write the segments back to the
596                  * page.
597                  *
598                  * Once we start modifying the page, there's no turning back. The
599                  * caller is responsible for calling END_CRIT_SECTION() after writing
600                  * the WAL record.
601                  */
602                 MemoryContextSwitchTo(oldCxt);
603                 if (RelationNeedsWAL(btree->index))
604                         *prdata = constructLeafRecompressWALData(buf, leaf);
605                 else
606                         *prdata = NULL;
607                 START_CRIT_SECTION();
608                 dataPlaceToPageLeafRecompress(buf, leaf);
609
610                 if (append)
611                         elog(DEBUG2, "appended %d new items to block %u; %d bytes (%d to go)",
612                                  maxitems, BufferGetBlockNumber(buf), (int) leaf->lsize,
613                                  items->nitem - items->curitem - maxitems);
614                 else
615                         elog(DEBUG2, "inserted %d new items to block %u; %d bytes (%d to go)",
616                                  maxitems, BufferGetBlockNumber(buf), (int) leaf->lsize,
617                                  items->nitem - items->curitem - maxitems);
618         }
619         else
620         {
621                 /*
622                  * Had to split.
623                  *
624                  * We already divided the segments between the left and the right
625                  * page. The left page was filled as full as possible, and the rest
626                  * overflowed to the right page. When building a new index, that's
627                  * good, because the table is scanned from beginning to end and there
628                  * won't be any more insertions to the left page during the build.
629                  * This packs the index as tight as possible. But otherwise, split
630                  * 50/50, by moving segments from the left page to the right page
631                  * until they're balanced.
632                  *
633                  * As a further heuristic, when appending items to the end of the
634                  * page, split 75/25, one the assumption that subsequent insertions
635                  * will probably also go to the end. This packs the index somewhat
636                  * tighter when appending to a table, which is very common.
637                  */
638                 if (!btree->isBuild)
639                 {
640                         while (dlist_has_prev(&leaf->segments, leaf->lastleft))
641                         {
642                                 lastleftinfo = dlist_container(leafSegmentInfo, node, leaf->lastleft);
643
644                                 /* ignore deleted segments */
645                                 if (lastleftinfo->action != GIN_SEGMENT_DELETE)
646                                 {
647                                         segsize = SizeOfGinPostingList(lastleftinfo->seg);
648                                         if (append)
649                                         {
650                                                 if ((leaf->lsize - segsize) - (leaf->lsize - segsize) < BLCKSZ / 4)
651                                                         break;
652                                         }
653                                         else
654                                         {
655                                                 if ((leaf->lsize - segsize) - (leaf->rsize + segsize) < 0)
656                                                         break;
657                                         }
658
659                                         leaf->lsize -= segsize;
660                                         leaf->rsize += segsize;
661                                 }
662                                 leaf->lastleft = dlist_prev_node(&leaf->segments, leaf->lastleft);
663                         }
664                 }
665                 Assert(leaf->lsize <= GinDataPageMaxDataSize);
666                 Assert(leaf->rsize <= GinDataPageMaxDataSize);
667
668                 /*
669                  * Fetch the max item in the left page's last segment; it becomes the
670                  * right bound of the page.
671                  */
672                 lastleftinfo = dlist_container(leafSegmentInfo, node, leaf->lastleft);
673                 if (!lastleftinfo->items)
674                         lastleftinfo->items = ginPostingListDecode(lastleftinfo->seg,
675                                                                                                            &lastleftinfo->nitems);
676                 lbound = lastleftinfo->items[lastleftinfo->nitems - 1];
677
678                 *newlpage = MemoryContextAlloc(oldCxt, BLCKSZ);
679                 *newrpage = MemoryContextAlloc(oldCxt, BLCKSZ);
680
681                 dataPlaceToPageLeafSplit(buf, leaf, lbound, rbound,
682                                                                  prdata, *newlpage, *newrpage);
683
684                 Assert(GinPageRightMost(page) ||
685                            ginCompareItemPointers(GinDataPageGetRightBound(*newlpage),
686                                                                    GinDataPageGetRightBound(*newrpage)) < 0);
687
688                 if (append)
689                         elog(DEBUG2, "appended %d items to block %u; split %d/%d (%d to go)",
690                                  maxitems, BufferGetBlockNumber(buf), (int) leaf->lsize, (int) leaf->rsize,
691                                  items->nitem - items->curitem - maxitems);
692                 else
693                         elog(DEBUG2, "inserted %d items to block %u; split %d/%d (%d to go)",
694                                  maxitems, BufferGetBlockNumber(buf), (int) leaf->lsize, (int) leaf->rsize,
695                                  items->nitem - items->curitem - maxitems);
696         }
697
698         MemoryContextSwitchTo(oldCxt);
699         MemoryContextDelete(tmpCxt);
700
701         items->curitem += maxitems;
702
703         return needsplit ? SPLIT : INSERTED;
704 }
705
706 /*
707  * Vacuum a posting tree leaf page.
708  */
709 void
710 ginVacuumPostingTreeLeaf(Relation indexrel, Buffer buffer, GinVacuumState *gvs)
711 {
712         Page            page = BufferGetPage(buffer);
713         disassembledLeaf *leaf;
714         bool            removedsomething = false;
715         dlist_iter      iter;
716
717         leaf = disassembleLeaf(page);
718
719         /* Vacuum each segment. */
720         dlist_foreach(iter, &leaf->segments)
721         {
722                 leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node, iter.cur);
723                 int                     oldsegsize;
724                 ItemPointer cleaned;
725                 int                     ncleaned;
726
727                 if (!seginfo->items)
728                         seginfo->items = ginPostingListDecode(seginfo->seg,
729                                                                                                   &seginfo->nitems);
730                 if (seginfo->seg)
731                         oldsegsize = SizeOfGinPostingList(seginfo->seg);
732                 else
733                         oldsegsize = GinDataPageMaxDataSize;
734
735                 cleaned = ginVacuumItemPointers(gvs,
736                                                                                 seginfo->items,
737                                                                                 seginfo->nitems,
738                                                                                 &ncleaned);
739                 pfree(seginfo->items);
740                 seginfo->items = NULL;
741                 seginfo->nitems = 0;
742                 if (cleaned)
743                 {
744                         if (ncleaned > 0)
745                         {
746                                 int                     npacked;
747
748                                 seginfo->seg = ginCompressPostingList(cleaned,
749                                                                                                           ncleaned,
750                                                                                                           oldsegsize,
751                                                                                                           &npacked);
752                                 /* Removing an item never increases the size of the segment */
753                                 if (npacked != ncleaned)
754                                         elog(ERROR, "could not fit vacuumed posting list");
755                                 seginfo->action = GIN_SEGMENT_REPLACE;
756                         }
757                         else
758                         {
759                                 seginfo->seg = NULL;
760                                 seginfo->items = NULL;
761                                 seginfo->action = GIN_SEGMENT_DELETE;
762                         }
763                         seginfo->nitems = ncleaned;
764
765                         removedsomething = true;
766                 }
767         }
768
769         /*
770          * If we removed any items, reconstruct the page from the pieces.
771          *
772          * We don't try to re-encode the segments here, even though some of them
773          * might be really small now that we've removed some items from them. It
774          * seems like a waste of effort, as there isn't really any benefit from
775          * larger segments per se; larger segments only help to pack more items in
776          * the same space. We might as well delay doing that until the next
777          * insertion, which will need to re-encode at least part of the page
778          * anyway.
779          *
780          * Also note if the page was in uncompressed, pre-9.4 format before, it is
781          * now represented as one huge segment that contains all the items. It
782          * might make sense to split that, to speed up random access, but we don't
783          * bother. You'll have to REINDEX anyway if you want the full gain of the
784          * new tighter index format.
785          */
786         if (removedsomething)
787         {
788                 XLogRecData *payloadrdata = NULL;
789                 bool            modified;
790
791                 /*
792                  * Make sure we have a palloc'd copy of all segments, after the first
793                  * segment that is modified. (dataPlaceToPageLeafRecompress requires
794                  * this).
795                  */
796                 modified = false;
797                 dlist_foreach(iter, &leaf->segments)
798                 {
799                         leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node,
800                                                                                                            iter.cur);
801
802                         if (seginfo->action != GIN_SEGMENT_UNMODIFIED)
803                                 modified = true;
804                         if (modified && seginfo->action != GIN_SEGMENT_DELETE)
805                         {
806                                 int                     segsize = SizeOfGinPostingList(seginfo->seg);
807                                 GinPostingList *tmp = (GinPostingList *) palloc(segsize);
808
809                                 memcpy(tmp, seginfo->seg, segsize);
810                                 seginfo->seg = tmp;
811                         }
812                 }
813
814                 if (RelationNeedsWAL(indexrel))
815                         payloadrdata = constructLeafRecompressWALData(buffer, leaf);
816                 START_CRIT_SECTION();
817                 dataPlaceToPageLeafRecompress(buffer, leaf);
818
819                 MarkBufferDirty(buffer);
820
821                 if (RelationNeedsWAL(indexrel))
822                 {
823                         XLogRecPtr      recptr;
824                         XLogRecData rdata;
825                         ginxlogVacuumDataLeafPage xlrec;
826
827                         xlrec.node = indexrel->rd_node;
828                         xlrec.blkno = BufferGetBlockNumber(buffer);
829
830                         rdata.buffer = InvalidBuffer;
831                         rdata.data = (char *) &xlrec;
832                         rdata.len = offsetof(ginxlogVacuumDataLeafPage, data);
833                         rdata.next = payloadrdata;
834
835                         recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_VACUUM_DATA_LEAF_PAGE, &rdata);
836                         PageSetLSN(page, recptr);
837                 }
838
839                 END_CRIT_SECTION();
840         }
841 }
842
843 /*
844  * Construct a ginxlogRecompressDataLeaf record representing the changes
845  * in *leaf.
846  */
847 static XLogRecData *
848 constructLeafRecompressWALData(Buffer buf, disassembledLeaf *leaf)
849 {
850         int                     nmodified = 0;
851         char       *walbufbegin;
852         char       *walbufend;
853         XLogRecData *rdata;
854         dlist_iter      iter;
855         int                     segno;
856         ginxlogRecompressDataLeaf *recompress_xlog;
857
858         /* Count the modified segments */
859         dlist_foreach(iter, &leaf->segments)
860         {
861                 leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node,
862                                                                                                    iter.cur);
863
864                 if (seginfo->action != GIN_SEGMENT_UNMODIFIED)
865                         nmodified++;
866         }
867
868         walbufbegin = palloc(
869                                                  sizeof(ginxlogRecompressDataLeaf) +
870                                                  BLCKSZ +               /* max size needed to hold the segment
871                                                                                  * data */
872                                                  nmodified * 2 +                /* (segno + action) per action */
873                                                  sizeof(XLogRecData));
874         walbufend = walbufbegin;
875
876         recompress_xlog = (ginxlogRecompressDataLeaf *) walbufend;
877         walbufend += sizeof(ginxlogRecompressDataLeaf);
878
879         recompress_xlog->nactions = nmodified;
880
881         segno = 0;
882         dlist_foreach(iter, &leaf->segments)
883         {
884                 leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node,
885                                                                                                    iter.cur);
886                 int                     segsize = 0;
887                 int                     datalen;
888                 uint8           action = seginfo->action;
889
890                 if (action == GIN_SEGMENT_UNMODIFIED)
891                 {
892                         segno++;
893                         continue;
894                 }
895
896                 if (action != GIN_SEGMENT_DELETE)
897                         segsize = SizeOfGinPostingList(seginfo->seg);
898
899                 /*
900                  * If storing the uncompressed list of added item pointers would take
901                  * more space than storing the compressed segment as is, do that
902                  * instead.
903                  */
904                 if (action == GIN_SEGMENT_ADDITEMS &&
905                         seginfo->nmodifieditems * sizeof(ItemPointerData) > segsize)
906                 {
907                         action = GIN_SEGMENT_REPLACE;
908                 }
909
910                 *((uint8 *) (walbufend++)) = segno;
911                 *(walbufend++) = action;
912
913                 switch (action)
914                 {
915                         case GIN_SEGMENT_DELETE:
916                                 datalen = 0;
917                                 break;
918
919                         case GIN_SEGMENT_ADDITEMS:
920                                 datalen = seginfo->nmodifieditems * sizeof(ItemPointerData);
921                                 memcpy(walbufend, &seginfo->nmodifieditems, sizeof(uint16));
922                                 memcpy(walbufend + sizeof(uint16), seginfo->modifieditems, datalen);
923                                 datalen += sizeof(uint16);
924                                 break;
925
926                         case GIN_SEGMENT_INSERT:
927                         case GIN_SEGMENT_REPLACE:
928                                 datalen = SHORTALIGN(segsize);
929                                 memcpy(walbufend, seginfo->seg, segsize);
930                                 break;
931
932                         default:
933                                 elog(ERROR, "unexpected GIN leaf action %d", action);
934                 }
935                 walbufend += datalen;
936
937                 if (action != GIN_SEGMENT_INSERT)
938                         segno++;
939         }
940
941         rdata = (XLogRecData *) MAXALIGN(walbufend);
942         rdata->buffer = buf;
943         rdata->buffer_std = TRUE;
944         rdata->data = walbufbegin;
945         rdata->len = walbufend - walbufbegin;
946         rdata->next = NULL;
947
948         return rdata;
949 }
950
951 /*
952  * Assemble a disassembled posting tree leaf page back to a buffer.
953  *
954  * *prdata is filled with WAL information about this operation. The caller
955  * is responsible for inserting to the WAL, along with any other information
956  * about the operation that triggered this recompression.
957  *
958  * NOTE: The segment pointers must not point directly to the same buffer,
959  * except for segments that have not been modified and whose preceding
960  * segments have not been modified either.
961  */
962 static void
963 dataPlaceToPageLeafRecompress(Buffer buf, disassembledLeaf *leaf)
964 {
965         Page            page = BufferGetPage(buf);
966         char       *ptr;
967         int                     newsize;
968         bool            modified = false;
969         dlist_iter      iter;
970         int                     segsize;
971
972         /*
973          * If the page was in pre-9.4 format before, convert the header, and force
974          * all segments to be copied to the page whether they were modified or
975          * not.
976          */
977         if (!GinPageIsCompressed(page))
978         {
979                 Assert(leaf->oldformat);
980                 GinPageSetCompressed(page);
981                 GinPageGetOpaque(page)->maxoff = InvalidOffsetNumber;
982                 modified = true;
983         }
984
985         ptr = (char *) GinDataLeafPageGetPostingList(page);
986         newsize = 0;
987         dlist_foreach(iter, &leaf->segments)
988         {
989                 leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node, iter.cur);
990
991                 if (seginfo->action != GIN_SEGMENT_UNMODIFIED)
992                         modified = true;
993
994                 if (seginfo->action != GIN_SEGMENT_DELETE)
995                 {
996                         segsize = SizeOfGinPostingList(seginfo->seg);
997
998                         if (modified)
999                                 memcpy(ptr, seginfo->seg, segsize);
1000
1001                         ptr += segsize;
1002                         newsize += segsize;
1003                 }
1004         }
1005
1006         Assert(newsize <= GinDataPageMaxDataSize);
1007         GinDataPageSetDataSize(page, newsize);
1008 }
1009
1010 /*
1011  * Like dataPlaceToPageLeafRecompress, but writes the disassembled leaf
1012  * segments to two pages instead of one.
1013  *
1014  * This is different from the non-split cases in that this does not modify
1015  * the original page directly, but to temporary in-memory copies of the new
1016  * left and right pages.
1017  */
1018 static void
1019 dataPlaceToPageLeafSplit(Buffer buf, disassembledLeaf *leaf,
1020                                                  ItemPointerData lbound, ItemPointerData rbound,
1021                                                  XLogRecData **prdata, Page lpage, Page rpage)
1022 {
1023         char       *ptr;
1024         int                     segsize;
1025         int                     lsize;
1026         int                     rsize;
1027         dlist_node *node;
1028         dlist_node *firstright;
1029         leafSegmentInfo *seginfo;
1030
1031         /* these must be static so they can be returned to caller */
1032         static ginxlogSplitDataLeaf split_xlog;
1033         static XLogRecData rdata[3];
1034
1035         /* Initialize temporary pages to hold the new left and right pages */
1036         GinInitPage(lpage, GIN_DATA | GIN_LEAF | GIN_COMPRESSED, BLCKSZ);
1037         GinInitPage(rpage, GIN_DATA | GIN_LEAF | GIN_COMPRESSED, BLCKSZ);
1038
1039         /*
1040          * Copy the segments that go to the left page.
1041          *
1042          * XXX: We should skip copying the unmodified part of the left page, like
1043          * we do when recompressing.
1044          */
1045         lsize = 0;
1046         ptr = (char *) GinDataLeafPageGetPostingList(lpage);
1047         firstright = dlist_next_node(&leaf->segments, leaf->lastleft);
1048         for (node = dlist_head_node(&leaf->segments);
1049                  node != firstright;
1050                  node = dlist_next_node(&leaf->segments, node))
1051         {
1052                 seginfo = dlist_container(leafSegmentInfo, node, node);
1053
1054                 if (seginfo->action != GIN_SEGMENT_DELETE)
1055                 {
1056                         segsize = SizeOfGinPostingList(seginfo->seg);
1057                         memcpy(ptr, seginfo->seg, segsize);
1058                         ptr += segsize;
1059                         lsize += segsize;
1060                 }
1061         }
1062         Assert(lsize == leaf->lsize);
1063         GinDataPageSetDataSize(lpage, lsize);
1064         *GinDataPageGetRightBound(lpage) = lbound;
1065
1066         /* Copy the segments that go to the right page */
1067         ptr = (char *) GinDataLeafPageGetPostingList(rpage);
1068         rsize = 0;
1069         for (node = firstright;
1070                  ;
1071                  node = dlist_next_node(&leaf->segments, node))
1072         {
1073                 seginfo = dlist_container(leafSegmentInfo, node, node);
1074
1075                 if (seginfo->action != GIN_SEGMENT_DELETE)
1076                 {
1077                         segsize = SizeOfGinPostingList(seginfo->seg);
1078                         memcpy(ptr, seginfo->seg, segsize);
1079                         ptr += segsize;
1080                         rsize += segsize;
1081                 }
1082
1083                 if (!dlist_has_next(&leaf->segments, node))
1084                         break;
1085         }
1086         Assert(rsize == leaf->rsize);
1087         GinDataPageSetDataSize(rpage, rsize);
1088         *GinDataPageGetRightBound(rpage) = rbound;
1089
1090         /* Create WAL record */
1091         split_xlog.lsize = lsize;
1092         split_xlog.rsize = rsize;
1093         split_xlog.lrightbound = lbound;
1094         split_xlog.rrightbound = rbound;
1095
1096         rdata[0].buffer = InvalidBuffer;
1097         rdata[0].data = (char *) &split_xlog;
1098         rdata[0].len = sizeof(ginxlogSplitDataLeaf);
1099         rdata[0].next = &rdata[1];
1100
1101         rdata[1].buffer = InvalidBuffer;
1102         rdata[1].data = (char *) GinDataLeafPageGetPostingList(lpage);
1103         rdata[1].len = lsize;
1104         rdata[1].next = &rdata[2];
1105
1106         rdata[2].buffer = InvalidBuffer;
1107         rdata[2].data = (char *) GinDataLeafPageGetPostingList(rpage);
1108         rdata[2].len = rsize;
1109         rdata[2].next = NULL;
1110
1111         *prdata = rdata;
1112 }
1113
1114 /*
1115  * Place a PostingItem to page, and fill a WAL record.
1116  *
1117  * If the item doesn't fit, returns false without modifying the page.
1118  *
1119  * In addition to inserting the given item, the downlink of the existing item
1120  * at 'off' is updated to point to 'updateblkno'.
1121  */
1122 static GinPlaceToPageRC
1123 dataPlaceToPageInternal(GinBtree btree, Buffer buf, GinBtreeStack *stack,
1124                                                 void *insertdata, BlockNumber updateblkno,
1125                                                 XLogRecData **prdata, Page *newlpage, Page *newrpage)
1126 {
1127         Page            page = BufferGetPage(buf);
1128         OffsetNumber off = stack->off;
1129         PostingItem *pitem;
1130
1131         /* these must be static so they can be returned to caller */
1132         static XLogRecData rdata;
1133         static ginxlogInsertDataInternal data;
1134
1135         /* split if we have to */
1136         if (GinNonLeafDataPageGetFreeSpace(page) < sizeof(PostingItem))
1137         {
1138                 dataSplitPageInternal(btree, buf, stack, insertdata, updateblkno,
1139                                                           prdata, newlpage, newrpage);
1140                 return SPLIT;
1141         }
1142
1143         *prdata = &rdata;
1144         Assert(GinPageIsData(page));
1145
1146         START_CRIT_SECTION();
1147
1148         /* Update existing downlink to point to next page (on internal page) */
1149         pitem = GinDataPageGetPostingItem(page, off);
1150         PostingItemSetBlockNumber(pitem, updateblkno);
1151
1152         /* Add new item */
1153         pitem = (PostingItem *) insertdata;
1154         GinDataPageAddPostingItem(page, pitem, off);
1155
1156         data.offset = off;
1157         data.newitem = *pitem;
1158
1159         rdata.buffer = buf;
1160         rdata.buffer_std = TRUE;
1161         rdata.data = (char *) &data;
1162         rdata.len = sizeof(ginxlogInsertDataInternal);
1163         rdata.next = NULL;
1164
1165         return INSERTED;
1166 }
1167
1168 /*
1169  * Places an item (or items) to a posting tree. Calls relevant function of
1170  * internal of leaf page because they are handled very differently.
1171  */
1172 static GinPlaceToPageRC
1173 dataPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
1174                                 void *insertdata, BlockNumber updateblkno,
1175                                 XLogRecData **prdata,
1176                                 Page *newlpage, Page *newrpage)
1177 {
1178         Page            page = BufferGetPage(buf);
1179
1180         Assert(GinPageIsData(page));
1181
1182         if (GinPageIsLeaf(page))
1183                 return dataPlaceToPageLeaf(btree, buf, stack, insertdata,
1184                                                                    prdata, newlpage, newrpage);
1185         else
1186                 return dataPlaceToPageInternal(btree, buf, stack,
1187                                                                            insertdata, updateblkno,
1188                                                                            prdata, newlpage, newrpage);
1189 }
1190
1191 /*
1192  * Split page and fill WAL record. Returns a new temp buffer filled with data
1193  * that should go to the left page. The original buffer is left untouched.
1194  */
1195 static void
1196 dataSplitPageInternal(GinBtree btree, Buffer origbuf,
1197                                           GinBtreeStack *stack,
1198                                           void *insertdata, BlockNumber updateblkno,
1199                                           XLogRecData **prdata, Page *newlpage, Page *newrpage)
1200 {
1201         Page            oldpage = BufferGetPage(origbuf);
1202         OffsetNumber off = stack->off;
1203         int                     nitems = GinPageGetOpaque(oldpage)->maxoff;
1204         int                     nleftitems;
1205         int                     nrightitems;
1206         Size            pageSize = PageGetPageSize(oldpage);
1207         ItemPointerData oldbound = *GinDataPageGetRightBound(oldpage);
1208         ItemPointer bound;
1209         Page            lpage;
1210         Page            rpage;
1211         OffsetNumber separator;
1212
1213         /* these must be static so they can be returned to caller */
1214         static ginxlogSplitDataInternal data;
1215         static XLogRecData rdata[4];
1216         static PostingItem allitems[(BLCKSZ / sizeof(PostingItem)) + 1];
1217
1218         lpage = PageGetTempPage(oldpage);
1219         rpage = PageGetTempPage(oldpage);
1220         GinInitPage(lpage, GinPageGetOpaque(oldpage)->flags, pageSize);
1221         GinInitPage(rpage, GinPageGetOpaque(oldpage)->flags, pageSize);
1222
1223         *prdata = rdata;
1224
1225         /*
1226          * First construct a new list of PostingItems, which includes all the old
1227          * items, and the new item.
1228          */
1229         memcpy(allitems, GinDataPageGetPostingItem(oldpage, FirstOffsetNumber),
1230                    (off - 1) * sizeof(PostingItem));
1231
1232         allitems[off - 1] = *((PostingItem *) insertdata);
1233         memcpy(&allitems[off], GinDataPageGetPostingItem(oldpage, off),
1234                    (nitems - (off - 1)) * sizeof(PostingItem));
1235         nitems++;
1236
1237         /* Update existing downlink to point to next page */
1238         PostingItemSetBlockNumber(&allitems[off], updateblkno);
1239
1240         /*
1241          * When creating a new index, fit as many tuples as possible on the left
1242          * page, on the assumption that the table is scanned from beginning to
1243          * end. This packs the index as tight as possible.
1244          */
1245         if (btree->isBuild && GinPageRightMost(oldpage))
1246                 separator = GinNonLeafDataPageGetFreeSpace(rpage) / sizeof(PostingItem);
1247         else
1248                 separator = nitems / 2;
1249         nleftitems = separator;
1250         nrightitems = nitems - separator;
1251
1252         memcpy(GinDataPageGetPostingItem(lpage, FirstOffsetNumber),
1253                    allitems,
1254                    nleftitems * sizeof(PostingItem));
1255         GinPageGetOpaque(lpage)->maxoff = nleftitems;
1256         memcpy(GinDataPageGetPostingItem(rpage, FirstOffsetNumber),
1257                    &allitems[separator],
1258                    nrightitems * sizeof(PostingItem));
1259         GinPageGetOpaque(rpage)->maxoff = nrightitems;
1260
1261         /*
1262          * Also set pd_lower for both pages, like GinDataPageAddPostingItem does.
1263          */
1264         GinDataPageSetDataSize(lpage, nleftitems * sizeof(PostingItem));
1265         GinDataPageSetDataSize(rpage, nrightitems * sizeof(PostingItem));
1266
1267         /* set up right bound for left page */
1268         bound = GinDataPageGetRightBound(lpage);
1269         *bound = GinDataPageGetPostingItem(lpage, nleftitems)->key;
1270
1271         /* set up right bound for right page */
1272         *GinDataPageGetRightBound(rpage) = oldbound;
1273
1274         data.separator = separator;
1275         data.nitem = nitems;
1276         data.rightbound = oldbound;
1277
1278         rdata[0].buffer = InvalidBuffer;
1279         rdata[0].data = (char *) &data;
1280         rdata[0].len = sizeof(ginxlogSplitDataInternal);
1281         rdata[0].next = &rdata[1];
1282
1283         rdata[1].buffer = InvalidBuffer;
1284         rdata[1].data = (char *) allitems;
1285         rdata[1].len = nitems * sizeof(PostingItem);
1286         rdata[1].next = NULL;
1287
1288         *newlpage = lpage;
1289         *newrpage = rpage;
1290 }
1291
1292 /*
1293  * Construct insertion payload for inserting the downlink for given buffer.
1294  */
1295 static void *
1296 dataPrepareDownlink(GinBtree btree, Buffer lbuf)
1297 {
1298         PostingItem *pitem = palloc(sizeof(PostingItem));
1299         Page            lpage = BufferGetPage(lbuf);
1300
1301         PostingItemSetBlockNumber(pitem, BufferGetBlockNumber(lbuf));
1302         pitem->key = *GinDataPageGetRightBound(lpage);
1303
1304         return pitem;
1305 }
1306
1307 /*
1308  * Fills new root by right bound values from child.
1309  * Also called from ginxlog, should not use btree
1310  */
1311 void
1312 ginDataFillRoot(GinBtree btree, Page root, BlockNumber lblkno, Page lpage, BlockNumber rblkno, Page rpage)
1313 {
1314         PostingItem li,
1315                                 ri;
1316
1317         li.key = *GinDataPageGetRightBound(lpage);
1318         PostingItemSetBlockNumber(&li, lblkno);
1319         GinDataPageAddPostingItem(root, &li, InvalidOffsetNumber);
1320
1321         ri.key = *GinDataPageGetRightBound(rpage);
1322         PostingItemSetBlockNumber(&ri, rblkno);
1323         GinDataPageAddPostingItem(root, &ri, InvalidOffsetNumber);
1324 }
1325
1326
1327 /*** Functions to work with disassembled leaf pages ***/
1328
1329 /*
1330  * Disassemble page into a disassembledLeaf struct.
1331  */
1332 static disassembledLeaf *
1333 disassembleLeaf(Page page)
1334 {
1335         disassembledLeaf *leaf;
1336         GinPostingList *seg;
1337         Pointer         segbegin;
1338         Pointer         segend;
1339
1340         leaf = palloc0(sizeof(disassembledLeaf));
1341         dlist_init(&leaf->segments);
1342
1343         if (GinPageIsCompressed(page))
1344         {
1345                 /*
1346                  * Create a leafSegment entry for each segment.
1347                  */
1348                 seg = GinDataLeafPageGetPostingList(page);
1349                 segbegin = (Pointer) seg;
1350                 segend = segbegin + GinDataLeafPageGetPostingListSize(page);
1351                 while ((Pointer) seg < segend)
1352                 {
1353                         leafSegmentInfo *seginfo = palloc(sizeof(leafSegmentInfo));
1354
1355                         seginfo->action = GIN_SEGMENT_UNMODIFIED;
1356                         seginfo->seg = seg;
1357                         seginfo->items = NULL;
1358                         seginfo->nitems = 0;
1359                         dlist_push_tail(&leaf->segments, &seginfo->node);
1360
1361                         seg = GinNextPostingListSegment(seg);
1362                 }
1363                 leaf->oldformat = false;
1364         }
1365         else
1366         {
1367                 /*
1368                  * A pre-9.4 format uncompressed page is represented by a single
1369                  * segment, with an array of items.
1370                  */
1371                 ItemPointer uncompressed;
1372                 int                     nuncompressed;
1373                 leafSegmentInfo *seginfo;
1374
1375                 uncompressed = dataLeafPageGetUncompressed(page, &nuncompressed);
1376
1377                 seginfo = palloc(sizeof(leafSegmentInfo));
1378
1379                 seginfo->action = GIN_SEGMENT_REPLACE;
1380                 seginfo->seg = NULL;
1381                 seginfo->items = palloc(nuncompressed * sizeof(ItemPointerData));
1382                 memcpy(seginfo->items, uncompressed, nuncompressed * sizeof(ItemPointerData));
1383                 seginfo->nitems = nuncompressed;
1384
1385                 dlist_push_tail(&leaf->segments, &seginfo->node);
1386
1387                 leaf->oldformat = true;
1388         }
1389
1390         return leaf;
1391 }
1392
1393 /*
1394  * Distribute newItems to the segments.
1395  *
1396  * Any segments that acquire new items are decoded, and the new items are
1397  * merged with the old items.
1398  *
1399  * Returns true if any new items were added. False means they were all
1400  * duplicates of existing items on the page.
1401  */
1402 static bool
1403 addItemsToLeaf(disassembledLeaf *leaf, ItemPointer newItems, int nNewItems)
1404 {
1405         dlist_iter      iter;
1406         ItemPointer nextnew = newItems;
1407         int                     newleft = nNewItems;
1408         bool            modified = false;
1409         leafSegmentInfo *newseg;
1410
1411         /*
1412          * If the page is completely empty, just construct one new segment to hold
1413          * all the new items.
1414          */
1415         if (dlist_is_empty(&leaf->segments))
1416         {
1417                 newseg = palloc(sizeof(leafSegmentInfo));
1418                 newseg->seg = NULL;
1419                 newseg->items = newItems;
1420                 newseg->nitems = nNewItems;
1421                 newseg->action = GIN_SEGMENT_INSERT;
1422                 dlist_push_tail(&leaf->segments, &newseg->node);
1423                 return true;
1424         }
1425
1426         dlist_foreach(iter, &leaf->segments)
1427         {
1428                 leafSegmentInfo *cur = (leafSegmentInfo *) dlist_container(leafSegmentInfo, node, iter.cur);
1429                 int                     nthis;
1430                 ItemPointer tmpitems;
1431                 int                     ntmpitems;
1432
1433                 /*
1434                  * How many of the new items fall into this segment?
1435                  */
1436                 if (!dlist_has_next(&leaf->segments, iter.cur))
1437                         nthis = newleft;
1438                 else
1439                 {
1440                         leafSegmentInfo *next;
1441                         ItemPointerData next_first;
1442
1443                         next = (leafSegmentInfo *) dlist_container(leafSegmentInfo, node,
1444                                                                  dlist_next_node(&leaf->segments, iter.cur));
1445                         if (next->items)
1446                                 next_first = next->items[0];
1447                         else
1448                         {
1449                                 Assert(next->seg != NULL);
1450                                 next_first = next->seg->first;
1451                         }
1452
1453                         nthis = 0;
1454                         while (nthis < newleft && ginCompareItemPointers(&nextnew[nthis], &next_first) < 0)
1455                                 nthis++;
1456                 }
1457                 if (nthis == 0)
1458                         continue;
1459
1460                 /* Merge the new items with the existing items. */
1461                 if (!cur->items)
1462                         cur->items = ginPostingListDecode(cur->seg, &cur->nitems);
1463
1464                 /*
1465                  * Fast path for the important special case that we're appending to
1466                  * the end of the page: don't let the last segment on the page grow
1467                  * larger than the target, create a new segment before that happens.
1468                  */
1469                 if (!dlist_has_next(&leaf->segments, iter.cur) &&
1470                         ginCompareItemPointers(&cur->items[cur->nitems - 1], &nextnew[0]) < 0 &&
1471                         cur->seg != NULL &&
1472                         SizeOfGinPostingList(cur->seg) >= GinPostingListSegmentTargetSize)
1473                 {
1474                         newseg = palloc(sizeof(leafSegmentInfo));
1475                         newseg->seg = NULL;
1476                         newseg->items = nextnew;
1477                         newseg->nitems = nthis;
1478                         newseg->action = GIN_SEGMENT_INSERT;
1479                         dlist_push_tail(&leaf->segments, &newseg->node);
1480                         modified = true;
1481                         break;
1482                 }
1483
1484                 tmpitems = ginMergeItemPointers(cur->items, cur->nitems,
1485                                                                                 nextnew, nthis,
1486                                                                                 &ntmpitems);
1487                 if (ntmpitems != cur->nitems)
1488                 {
1489                         /*
1490                          * If there are no duplicates, track the added items so that we
1491                          * can emit a compact ADDITEMS WAL record later on. (it doesn't
1492                          * seem worth re-checking which items were duplicates, if there
1493                          * were any)
1494                          */
1495                         if (ntmpitems == nthis + cur->nitems &&
1496                                 cur->action == GIN_SEGMENT_UNMODIFIED)
1497                         {
1498                                 cur->action = GIN_SEGMENT_ADDITEMS;
1499                                 cur->modifieditems = nextnew;
1500                                 cur->nmodifieditems = nthis;
1501                         }
1502                         else
1503                                 cur->action = GIN_SEGMENT_REPLACE;
1504
1505                         cur->items = tmpitems;
1506                         cur->nitems = ntmpitems;
1507                         cur->seg = NULL;
1508                         modified = true;
1509                 }
1510
1511                 nextnew += nthis;
1512                 newleft -= nthis;
1513                 if (newleft == 0)
1514                         break;
1515         }
1516
1517         return modified;
1518 }
1519
1520 /*
1521  * Recompresses all segments that have been modified.
1522  *
1523  * If not all the items fit on two pages (ie. after split), we store as
1524  * many items as fit, and set *remaining to the first item that didn't fit.
1525  * If all items fit, *remaining is set to invalid.
1526  *
1527  * Returns true if the page has to be split.
1528  */
1529 static bool
1530 leafRepackItems(disassembledLeaf *leaf, ItemPointer remaining)
1531 {
1532         int                     pgused = 0;
1533         bool            needsplit = false;
1534         dlist_iter      iter;
1535         int                     segsize;
1536         leafSegmentInfo *nextseg;
1537         int                     npacked;
1538         bool            modified;
1539         dlist_node *cur_node;
1540         dlist_node *next_node;
1541
1542         ItemPointerSetInvalid(remaining);
1543
1544         /*
1545          * cannot use dlist_foreach_modify here because we insert adjacent items
1546          * while iterating.
1547          */
1548         for (cur_node = dlist_head_node(&leaf->segments);
1549                  cur_node != NULL;
1550                  cur_node = next_node)
1551         {
1552                 leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node,
1553                                                                                                    cur_node);
1554
1555                 if (dlist_has_next(&leaf->segments, cur_node))
1556                         next_node = dlist_next_node(&leaf->segments, cur_node);
1557                 else
1558                         next_node = NULL;
1559
1560                 /* Compress the posting list, if necessary */
1561                 if (seginfo->action != GIN_SEGMENT_DELETE)
1562                 {
1563                         if (seginfo->seg == NULL)
1564                         {
1565                                 if (seginfo->nitems > GinPostingListSegmentMaxSize)
1566                                         npacked = 0;    /* no chance that it would fit. */
1567                                 else
1568                                 {
1569                                         seginfo->seg = ginCompressPostingList(seginfo->items,
1570                                                                                                                   seginfo->nitems,
1571                                                                                                 GinPostingListSegmentMaxSize,
1572                                                                                                                   &npacked);
1573                                 }
1574                                 if (npacked != seginfo->nitems)
1575                                 {
1576                                         /*
1577                                          * Too large. Compress again to the target size, and
1578                                          * create a new segment to represent the remaining items.
1579                                          * The new segment is inserted after this one, so it will
1580                                          * be processed in the next iteration of this loop.
1581                                          */
1582                                         if (seginfo->seg)
1583                                                 pfree(seginfo->seg);
1584                                         seginfo->seg = ginCompressPostingList(seginfo->items,
1585                                                                                                                   seginfo->nitems,
1586                                                                                          GinPostingListSegmentTargetSize,
1587                                                                                                                   &npacked);
1588                                         if (seginfo->action != GIN_SEGMENT_INSERT)
1589                                                 seginfo->action = GIN_SEGMENT_REPLACE;
1590
1591                                         nextseg = palloc(sizeof(leafSegmentInfo));
1592                                         nextseg->action = GIN_SEGMENT_INSERT;
1593                                         nextseg->seg = NULL;
1594                                         nextseg->items = &seginfo->items[npacked];
1595                                         nextseg->nitems = seginfo->nitems - npacked;
1596                                         next_node = &nextseg->node;
1597                                         dlist_insert_after(cur_node, next_node);
1598                                 }
1599                         }
1600
1601                         /*
1602                          * If the segment is very small, merge it with the next segment.
1603                          */
1604                         if (SizeOfGinPostingList(seginfo->seg) < GinPostingListSegmentMinSize && next_node)
1605                         {
1606                                 int                     nmerged;
1607
1608                                 nextseg = dlist_container(leafSegmentInfo, node, next_node);
1609
1610                                 if (seginfo->items == NULL)
1611                                         seginfo->items = ginPostingListDecode(seginfo->seg,
1612                                                                                                                   &seginfo->nitems);
1613                                 if (nextseg->items == NULL)
1614                                         nextseg->items = ginPostingListDecode(nextseg->seg,
1615                                                                                                                   &nextseg->nitems);
1616                                 nextseg->items =
1617                                         ginMergeItemPointers(seginfo->items, seginfo->nitems,
1618                                                                                  nextseg->items, nextseg->nitems,
1619                                                                                  &nmerged);
1620                                 Assert(nmerged == seginfo->nitems + nextseg->nitems);
1621                                 nextseg->nitems = nmerged;
1622                                 nextseg->seg = NULL;
1623
1624                                 nextseg->action = GIN_SEGMENT_REPLACE;
1625                                 nextseg->modifieditems = NULL;
1626                                 nextseg->nmodifieditems = 0;
1627
1628                                 if (seginfo->action == GIN_SEGMENT_INSERT)
1629                                 {
1630                                         dlist_delete(cur_node);
1631                                         continue;
1632                                 }
1633                                 else
1634                                 {
1635                                         seginfo->action = GIN_SEGMENT_DELETE;
1636                                         seginfo->seg = NULL;
1637                                 }
1638                         }
1639
1640                         seginfo->items = NULL;
1641                         seginfo->nitems = 0;
1642                 }
1643
1644                 if (seginfo->action == GIN_SEGMENT_DELETE)
1645                         continue;
1646
1647                 /*
1648                  * OK, we now have a compressed version of this segment ready for
1649                  * copying to the page. Did we exceed the size that fits on one page?
1650                  */
1651                 segsize = SizeOfGinPostingList(seginfo->seg);
1652                 if (pgused + segsize > GinDataPageMaxDataSize)
1653                 {
1654                         if (!needsplit)
1655                         {
1656                                 /* switch to right page */
1657                                 Assert(pgused > 0);
1658                                 leaf->lastleft = dlist_prev_node(&leaf->segments, cur_node);
1659                                 needsplit = true;
1660                                 leaf->lsize = pgused;
1661                                 pgused = 0;
1662                         }
1663                         else
1664                         {
1665                                 /*
1666                                  * Filled both pages. The last segment we constructed did not
1667                                  * fit.
1668                                  */
1669                                 *remaining = seginfo->seg->first;
1670
1671                                 /*
1672                                  * remove all segments that did not fit from the list.
1673                                  */
1674                                 while (dlist_has_next(&leaf->segments, cur_node))
1675                                         dlist_delete(dlist_next_node(&leaf->segments, cur_node));
1676                                 dlist_delete(cur_node);
1677                                 break;
1678                         }
1679                 }
1680
1681                 pgused += segsize;
1682         }
1683
1684         if (!needsplit)
1685         {
1686                 leaf->lsize = pgused;
1687                 leaf->rsize = 0;
1688         }
1689         else
1690                 leaf->rsize = pgused;
1691
1692         Assert(leaf->lsize <= GinDataPageMaxDataSize);
1693         Assert(leaf->rsize <= GinDataPageMaxDataSize);
1694
1695         /*
1696          * Make a palloc'd copy of every segment after the first modified one,
1697          * because as we start copying items to the original page, we might
1698          * overwrite an existing segment.
1699          */
1700         modified = false;
1701         dlist_foreach(iter, &leaf->segments)
1702         {
1703                 leafSegmentInfo *seginfo = dlist_container(leafSegmentInfo, node,
1704                                                                                                    iter.cur);
1705
1706                 if (!modified && seginfo->action != GIN_SEGMENT_UNMODIFIED)
1707                 {
1708                         modified = true;
1709                 }
1710                 else if (modified && seginfo->action == GIN_SEGMENT_UNMODIFIED)
1711                 {
1712                         GinPostingList *tmp;
1713
1714                         segsize = SizeOfGinPostingList(seginfo->seg);
1715                         tmp = palloc(segsize);
1716                         memcpy(tmp, seginfo->seg, segsize);
1717                         seginfo->seg = tmp;
1718                 }
1719         }
1720
1721         return needsplit;
1722 }
1723
1724
1725 /*** Functions that are exported to the rest of the GIN code ***/
1726
1727 /*
1728  * Creates new posting tree containing the given TIDs. Returns the page
1729  * number of the root of the new posting tree.
1730  *
1731  * items[] must be in sorted order with no duplicates.
1732  */
1733 BlockNumber
1734 createPostingTree(Relation index, ItemPointerData *items, uint32 nitems,
1735                                   GinStatsData *buildStats)
1736 {
1737         BlockNumber blkno;
1738         Buffer          buffer;
1739         Page            tmppage;
1740         Page            page;
1741         Pointer         ptr;
1742         int                     nrootitems;
1743         int                     rootsize;
1744
1745         /* Construct the new root page in memory first. */
1746         tmppage = (Page) palloc(BLCKSZ);
1747         GinInitPage(tmppage, GIN_DATA | GIN_LEAF | GIN_COMPRESSED, BLCKSZ);
1748         GinPageGetOpaque(tmppage)->rightlink = InvalidBlockNumber;
1749
1750         /*
1751          * Write as many of the items to the root page as fit. In segments of max
1752          * GinPostingListSegmentMaxSize bytes each.
1753          */
1754         nrootitems = 0;
1755         rootsize = 0;
1756         ptr = (Pointer) GinDataLeafPageGetPostingList(tmppage);
1757         while (nrootitems < nitems)
1758         {
1759                 GinPostingList *segment;
1760                 int                     npacked;
1761                 int                     segsize;
1762
1763                 segment = ginCompressPostingList(&items[nrootitems],
1764                                                                                  nitems - nrootitems,
1765                                                                                  GinPostingListSegmentMaxSize,
1766                                                                                  &npacked);
1767                 segsize = SizeOfGinPostingList(segment);
1768                 if (rootsize + segsize > GinDataPageMaxDataSize)
1769                         break;
1770
1771                 memcpy(ptr, segment, segsize);
1772                 ptr += segsize;
1773                 rootsize += segsize;
1774                 nrootitems += npacked;
1775                 pfree(segment);
1776         }
1777         GinDataPageSetDataSize(tmppage, rootsize);
1778
1779         /*
1780          * All set. Get a new physical page, and copy the in-memory page to it.
1781          */
1782         buffer = GinNewBuffer(index);
1783         page = BufferGetPage(buffer);
1784         blkno = BufferGetBlockNumber(buffer);
1785
1786         START_CRIT_SECTION();
1787
1788         PageRestoreTempPage(tmppage, page);
1789         MarkBufferDirty(buffer);
1790
1791         if (RelationNeedsWAL(index))
1792         {
1793                 XLogRecPtr      recptr;
1794                 XLogRecData rdata[2];
1795                 ginxlogCreatePostingTree data;
1796
1797                 data.node = index->rd_node;
1798                 data.blkno = blkno;
1799                 data.size = rootsize;
1800
1801                 rdata[0].buffer = InvalidBuffer;
1802                 rdata[0].data = (char *) &data;
1803                 rdata[0].len = sizeof(ginxlogCreatePostingTree);
1804                 rdata[0].next = &rdata[1];
1805
1806                 rdata[1].buffer = InvalidBuffer;
1807                 rdata[1].data = (char *) GinDataLeafPageGetPostingList(page);
1808                 rdata[1].len = rootsize;
1809                 rdata[1].next = NULL;
1810
1811                 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_CREATE_PTREE, rdata);
1812                 PageSetLSN(page, recptr);
1813         }
1814
1815         UnlockReleaseBuffer(buffer);
1816
1817         END_CRIT_SECTION();
1818
1819         /* During index build, count the newly-added data page */
1820         if (buildStats)
1821                 buildStats->nDataPages++;
1822
1823         elog(DEBUG2, "created GIN posting tree with %d items", nrootitems);
1824
1825         /*
1826          * Add any remaining TIDs to the newly-created posting tree.
1827          */
1828         if (nitems > nrootitems)
1829         {
1830                 ginInsertItemPointers(index, blkno,
1831                                                           items + nrootitems,
1832                                                           nitems - nrootitems,
1833                                                           buildStats);
1834         }
1835
1836         return blkno;
1837 }
1838
1839 void
1840 ginPrepareDataScan(GinBtree btree, Relation index, BlockNumber rootBlkno)
1841 {
1842         memset(btree, 0, sizeof(GinBtreeData));
1843
1844         btree->index = index;
1845         btree->rootBlkno = rootBlkno;
1846
1847         btree->findChildPage = dataLocateItem;
1848         btree->getLeftMostChild = dataGetLeftMostPage;
1849         btree->isMoveRight = dataIsMoveRight;
1850         btree->findItem = NULL;
1851         btree->findChildPtr = dataFindChildPtr;
1852         btree->placeToPage = dataPlaceToPage;
1853         btree->fillRoot = ginDataFillRoot;
1854         btree->prepareDownlink = dataPrepareDownlink;
1855
1856         btree->isData = TRUE;
1857         btree->fullScan = FALSE;
1858         btree->isBuild = FALSE;
1859 }
1860
1861 /*
1862  * Inserts array of item pointers, may execute several tree scan (very rare)
1863  */
1864 void
1865 ginInsertItemPointers(Relation index, BlockNumber rootBlkno,
1866                                           ItemPointerData *items, uint32 nitem,
1867                                           GinStatsData *buildStats)
1868 {
1869         GinBtreeData btree;
1870         GinBtreeDataLeafInsertData insertdata;
1871         GinBtreeStack *stack;
1872
1873         ginPrepareDataScan(&btree, index, rootBlkno);
1874         btree.isBuild = (buildStats != NULL);
1875         insertdata.items = items;
1876         insertdata.nitem = nitem;
1877         insertdata.curitem = 0;
1878
1879         while (insertdata.curitem < insertdata.nitem)
1880         {
1881                 /* search for the leaf page where the first item should go to */
1882                 btree.itemptr = insertdata.items[insertdata.curitem];
1883                 stack = ginFindLeafPage(&btree, false);
1884
1885                 ginInsertValue(&btree, stack, &insertdata, buildStats);
1886         }
1887 }
1888
1889 /*
1890  * Starts a new scan on a posting tree.
1891  */
1892 GinBtreeStack *
1893 ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno)
1894 {
1895         GinBtreeStack *stack;
1896
1897         ginPrepareDataScan(btree, index, rootBlkno);
1898
1899         btree->fullScan = TRUE;
1900
1901         stack = ginFindLeafPage(btree, TRUE);
1902
1903         return stack;
1904 }