]> granicus.if.org Git - postgresql/blob - src/backend/access/gin/ginbtree.c
Reduce pinning and buffer content locking for btree scans.
[postgresql] / src / backend / access / gin / ginbtree.c
1 /*-------------------------------------------------------------------------
2  *
3  * ginbtree.c
4  *        page utilities routines for the postgres inverted index access method.
5  *
6  *
7  * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *                      src/backend/access/gin/ginbtree.c
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres.h"
16
17 #include "access/gin_private.h"
18 #include "access/xloginsert.h"
19 #include "miscadmin.h"
20 #include "utils/rel.h"
21
22 static void ginFindParents(GinBtree btree, GinBtreeStack *stack);
23 static bool ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
24                            void *insertdata, BlockNumber updateblkno,
25                            Buffer childbuf, GinStatsData *buildStats);
26 static void ginFinishSplit(GinBtree btree, GinBtreeStack *stack,
27                            bool freestack, GinStatsData *buildStats);
28
29 /*
30  * Lock buffer by needed method for search.
31  */
32 static int
33 ginTraverseLock(Buffer buffer, bool searchMode)
34 {
35         Page            page;
36         int                     access = GIN_SHARE;
37
38         LockBuffer(buffer, GIN_SHARE);
39         page = BufferGetPage(buffer);
40         if (GinPageIsLeaf(page))
41         {
42                 if (searchMode == FALSE)
43                 {
44                         /* we should relock our page */
45                         LockBuffer(buffer, GIN_UNLOCK);
46                         LockBuffer(buffer, GIN_EXCLUSIVE);
47
48                         /* But root can become non-leaf during relock */
49                         if (!GinPageIsLeaf(page))
50                         {
51                                 /* restore old lock type (very rare) */
52                                 LockBuffer(buffer, GIN_UNLOCK);
53                                 LockBuffer(buffer, GIN_SHARE);
54                         }
55                         else
56                                 access = GIN_EXCLUSIVE;
57                 }
58         }
59
60         return access;
61 }
62
63 /*
64  * Descend the tree to the leaf page that contains or would contain the key
65  * we're searching for. The key should already be filled in 'btree', in
66  * tree-type specific manner. If btree->fullScan is true, descends to the
67  * leftmost leaf page.
68  *
69  * If 'searchmode' is false, on return stack->buffer is exclusively locked,
70  * and the stack represents the full path to the root. Otherwise stack->buffer
71  * is share-locked, and stack->parent is NULL.
72  */
73 GinBtreeStack *
74 ginFindLeafPage(GinBtree btree, bool searchMode)
75 {
76         GinBtreeStack *stack;
77
78         stack = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
79         stack->blkno = btree->rootBlkno;
80         stack->buffer = ReadBuffer(btree->index, btree->rootBlkno);
81         stack->parent = NULL;
82         stack->predictNumber = 1;
83
84         for (;;)
85         {
86                 Page            page;
87                 BlockNumber child;
88                 int                     access;
89
90                 stack->off = InvalidOffsetNumber;
91
92                 page = BufferGetPage(stack->buffer);
93
94                 access = ginTraverseLock(stack->buffer, searchMode);
95
96                 /*
97                  * If we're going to modify the tree, finish any incomplete splits we
98                  * encounter on the way.
99                  */
100                 if (!searchMode && GinPageIsIncompleteSplit(page))
101                         ginFinishSplit(btree, stack, false, NULL);
102
103                 /*
104                  * ok, page is correctly locked, we should check to move right ..,
105                  * root never has a right link, so small optimization
106                  */
107                 while (btree->fullScan == FALSE && stack->blkno != btree->rootBlkno &&
108                            btree->isMoveRight(btree, page))
109                 {
110                         BlockNumber rightlink = GinPageGetOpaque(page)->rightlink;
111
112                         if (rightlink == InvalidBlockNumber)
113                                 /* rightmost page */
114                                 break;
115
116                         stack->buffer = ginStepRight(stack->buffer, btree->index, access);
117                         stack->blkno = rightlink;
118                         page = BufferGetPage(stack->buffer);
119
120                         if (!searchMode && GinPageIsIncompleteSplit(page))
121                                 ginFinishSplit(btree, stack, false, NULL);
122                 }
123
124                 if (GinPageIsLeaf(page))        /* we found, return locked page */
125                         return stack;
126
127                 /* now we have correct buffer, try to find child */
128                 child = btree->findChildPage(btree, stack);
129
130                 LockBuffer(stack->buffer, GIN_UNLOCK);
131                 Assert(child != InvalidBlockNumber);
132                 Assert(stack->blkno != child);
133
134                 if (searchMode)
135                 {
136                         /* in search mode we may forget path to leaf */
137                         stack->blkno = child;
138                         stack->buffer = ReleaseAndReadBuffer(stack->buffer, btree->index, stack->blkno);
139                 }
140                 else
141                 {
142                         GinBtreeStack *ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
143
144                         ptr->parent = stack;
145                         stack = ptr;
146                         stack->blkno = child;
147                         stack->buffer = ReadBuffer(btree->index, stack->blkno);
148                         stack->predictNumber = 1;
149                 }
150         }
151 }
152
153 /*
154  * Step right from current page.
155  *
156  * The next page is locked first, before releasing the current page. This is
157  * crucial to protect from concurrent page deletion (see comment in
158  * ginDeletePage).
159  */
160 Buffer
161 ginStepRight(Buffer buffer, Relation index, int lockmode)
162 {
163         Buffer          nextbuffer;
164         Page            page = BufferGetPage(buffer);
165         bool            isLeaf = GinPageIsLeaf(page);
166         bool            isData = GinPageIsData(page);
167         BlockNumber blkno = GinPageGetOpaque(page)->rightlink;
168
169         nextbuffer = ReadBuffer(index, blkno);
170         LockBuffer(nextbuffer, lockmode);
171         UnlockReleaseBuffer(buffer);
172
173         /* Sanity check that the page we stepped to is of similar kind. */
174         page = BufferGetPage(nextbuffer);
175         if (isLeaf != GinPageIsLeaf(page) || isData != GinPageIsData(page))
176                 elog(ERROR, "right sibling of GIN page is of different type");
177
178         /*
179          * Given the proper lock sequence above, we should never land on a deleted
180          * page.
181          */
182         if (GinPageIsDeleted(page))
183                 elog(ERROR, "right sibling of GIN page was deleted");
184
185         return nextbuffer;
186 }
187
188 void
189 freeGinBtreeStack(GinBtreeStack *stack)
190 {
191         while (stack)
192         {
193                 GinBtreeStack *tmp = stack->parent;
194
195                 if (stack->buffer != InvalidBuffer)
196                         ReleaseBuffer(stack->buffer);
197
198                 pfree(stack);
199                 stack = tmp;
200         }
201 }
202
203 /*
204  * Try to find parent for current stack position. Returns correct parent and
205  * child's offset in stack->parent. The root page is never released, to
206  * to prevent conflict with vacuum process.
207  */
208 static void
209 ginFindParents(GinBtree btree, GinBtreeStack *stack)
210 {
211         Page            page;
212         Buffer          buffer;
213         BlockNumber blkno,
214                                 leftmostBlkno;
215         OffsetNumber offset;
216         GinBtreeStack *root;
217         GinBtreeStack *ptr;
218
219         /*
220          * Unwind the stack all the way up to the root, leaving only the root
221          * item.
222          *
223          * Be careful not to release the pin on the root page! The pin on root
224          * page is required to lock out concurrent vacuums on the tree.
225          */
226         root = stack->parent;
227         while (root->parent)
228         {
229                 ReleaseBuffer(root->buffer);
230                 root = root->parent;
231         }
232
233         Assert(root->blkno == btree->rootBlkno);
234         Assert(BufferGetBlockNumber(root->buffer) == btree->rootBlkno);
235         root->off = InvalidOffsetNumber;
236
237         blkno = root->blkno;
238         buffer = root->buffer;
239         offset = InvalidOffsetNumber;
240
241         ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
242
243         for (;;)
244         {
245                 LockBuffer(buffer, GIN_EXCLUSIVE);
246                 page = BufferGetPage(buffer);
247                 if (GinPageIsLeaf(page))
248                         elog(ERROR, "Lost path");
249
250                 if (GinPageIsIncompleteSplit(page))
251                 {
252                         Assert(blkno != btree->rootBlkno);
253                         ptr->blkno = blkno;
254                         ptr->buffer = buffer;
255
256                         /*
257                          * parent may be wrong, but if so, the ginFinishSplit call will
258                          * recurse to call ginFindParents again to fix it.
259                          */
260                         ptr->parent = root;
261                         ptr->off = InvalidOffsetNumber;
262
263                         ginFinishSplit(btree, ptr, false, NULL);
264                 }
265
266                 leftmostBlkno = btree->getLeftMostChild(btree, page);
267
268                 while ((offset = btree->findChildPtr(btree, page, stack->blkno, InvalidOffsetNumber)) == InvalidOffsetNumber)
269                 {
270                         blkno = GinPageGetOpaque(page)->rightlink;
271                         if (blkno == InvalidBlockNumber)
272                         {
273                                 UnlockReleaseBuffer(buffer);
274                                 break;
275                         }
276                         buffer = ginStepRight(buffer, btree->index, GIN_EXCLUSIVE);
277                         page = BufferGetPage(buffer);
278
279                         /* finish any incomplete splits, as above */
280                         if (GinPageIsIncompleteSplit(page))
281                         {
282                                 Assert(blkno != btree->rootBlkno);
283                                 ptr->blkno = blkno;
284                                 ptr->buffer = buffer;
285                                 ptr->parent = root;
286                                 ptr->off = InvalidOffsetNumber;
287
288                                 ginFinishSplit(btree, ptr, false, NULL);
289                         }
290                 }
291
292                 if (blkno != InvalidBlockNumber)
293                 {
294                         ptr->blkno = blkno;
295                         ptr->buffer = buffer;
296                         ptr->parent = root; /* it may be wrong, but in next call we will
297                                                                  * correct */
298                         ptr->off = offset;
299                         stack->parent = ptr;
300                         return;
301                 }
302
303                 /* Descend down to next level */
304                 blkno = leftmostBlkno;
305                 buffer = ReadBuffer(btree->index, blkno);
306         }
307 }
308
309 /*
310  * Insert a new item to a page.
311  *
312  * Returns true if the insertion was finished. On false, the page was split and
313  * the parent needs to be updated. (a root split returns true as it doesn't
314  * need any further action by the caller to complete)
315  *
316  * When inserting a downlink to a internal page, 'childbuf' contains the
317  * child page that was split. Its GIN_INCOMPLETE_SPLIT flag will be cleared
318  * atomically with the insert. Also, the existing item at the given location
319  * is updated to point to 'updateblkno'.
320  *
321  * stack->buffer is locked on entry, and is kept locked.
322  */
323 static bool
324 ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
325                            void *insertdata, BlockNumber updateblkno,
326                            Buffer childbuf, GinStatsData *buildStats)
327 {
328         Page            page = BufferGetPage(stack->buffer);
329         GinPlaceToPageRC rc;
330         uint16          xlflags = 0;
331         Page            childpage = NULL;
332         Page            newlpage = NULL,
333                                 newrpage = NULL;
334
335         if (GinPageIsData(page))
336                 xlflags |= GIN_INSERT_ISDATA;
337         if (GinPageIsLeaf(page))
338         {
339                 xlflags |= GIN_INSERT_ISLEAF;
340                 Assert(!BufferIsValid(childbuf));
341                 Assert(updateblkno == InvalidBlockNumber);
342         }
343         else
344         {
345                 Assert(BufferIsValid(childbuf));
346                 Assert(updateblkno != InvalidBlockNumber);
347                 childpage = BufferGetPage(childbuf);
348         }
349
350         /*
351          * Try to put the incoming tuple on the page. placeToPage will decide if
352          * the page needs to be split.
353          *
354          * WAL-logging this operation is a bit funny:
355          *
356          * We're responsible for calling XLogBeginInsert() and XLogInsert().
357          * XLogBeginInsert() must be called before placeToPage, because
358          * placeToPage can register some data to the WAL record.
359          *
360          * If placeToPage returns INSERTED, placeToPage has already called
361          * START_CRIT_SECTION(), and we're responsible for calling
362          * END_CRIT_SECTION. When it returns INSERTED, it is also responsible for
363          * registering any data required to replay the operation with
364          * XLogRegisterData(0, ...). It may only add data to block index 0; the
365          * main data of the WAL record is reserved for this function.
366          *
367          * If placeToPage returns SPLIT, we're wholly responsible for WAL logging.
368          * Splits happen infrequently, so we just make a full-page image of all
369          * the pages involved.
370          */
371
372         if (RelationNeedsWAL(btree->index))
373                 XLogBeginInsert();
374
375         rc = btree->placeToPage(btree, stack->buffer, stack,
376                                                         insertdata, updateblkno,
377                                                         &newlpage, &newrpage);
378         if (rc == UNMODIFIED)
379         {
380                 XLogResetInsertion();
381                 return true;
382         }
383         else if (rc == INSERTED)
384         {
385                 /* placeToPage did START_CRIT_SECTION() */
386                 MarkBufferDirty(stack->buffer);
387
388                 /* An insert to an internal page finishes the split of the child. */
389                 if (childbuf != InvalidBuffer)
390                 {
391                         GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
392                         MarkBufferDirty(childbuf);
393                 }
394
395                 if (RelationNeedsWAL(btree->index))
396                 {
397                         XLogRecPtr      recptr;
398                         ginxlogInsert xlrec;
399                         BlockIdData childblknos[2];
400
401                         /*
402                          * placetopage already registered stack->buffer as block 0.
403                          */
404                         xlrec.flags = xlflags;
405
406                         if (childbuf != InvalidBuffer)
407                                 XLogRegisterBuffer(1, childbuf, REGBUF_STANDARD);
408
409                         XLogRegisterData((char *) &xlrec, sizeof(ginxlogInsert));
410
411                         /*
412                          * Log information about child if this was an insertion of a
413                          * downlink.
414                          */
415                         if (childbuf != InvalidBuffer)
416                         {
417                                 BlockIdSet(&childblknos[0], BufferGetBlockNumber(childbuf));
418                                 BlockIdSet(&childblknos[1], GinPageGetOpaque(childpage)->rightlink);
419                                 XLogRegisterData((char *) childblknos,
420                                                                  sizeof(BlockIdData) * 2);
421                         }
422
423                         recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT);
424                         PageSetLSN(page, recptr);
425                         if (childbuf != InvalidBuffer)
426                                 PageSetLSN(childpage, recptr);
427                 }
428
429                 END_CRIT_SECTION();
430
431                 return true;
432         }
433         else if (rc == SPLIT)
434         {
435                 /* Didn't fit, had to split */
436                 Buffer          rbuffer;
437                 BlockNumber savedRightLink;
438                 ginxlogSplit data;
439                 Buffer          lbuffer = InvalidBuffer;
440                 Page            newrootpg = NULL;
441
442                 rbuffer = GinNewBuffer(btree->index);
443
444                 /* During index build, count the new page */
445                 if (buildStats)
446                 {
447                         if (btree->isData)
448                                 buildStats->nDataPages++;
449                         else
450                                 buildStats->nEntryPages++;
451                 }
452
453                 savedRightLink = GinPageGetOpaque(page)->rightlink;
454
455                 /*
456                  * newlpage and newrpage are pointers to memory pages, not associated
457                  * with buffers. stack->buffer is not touched yet.
458                  */
459
460                 data.node = btree->index->rd_node;
461                 data.flags = xlflags;
462                 if (childbuf != InvalidBuffer)
463                 {
464                         Page            childpage = BufferGetPage(childbuf);
465
466                         GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
467
468                         data.leftChildBlkno = BufferGetBlockNumber(childbuf);
469                         data.rightChildBlkno = GinPageGetOpaque(childpage)->rightlink;
470                 }
471                 else
472                         data.leftChildBlkno = data.rightChildBlkno = InvalidBlockNumber;
473
474                 if (stack->parent == NULL)
475                 {
476                         /*
477                          * split root, so we need to allocate new left page and place
478                          * pointer on root to left and right page
479                          */
480                         lbuffer = GinNewBuffer(btree->index);
481
482                         /* During index build, count the newly-added root page */
483                         if (buildStats)
484                         {
485                                 if (btree->isData)
486                                         buildStats->nDataPages++;
487                                 else
488                                         buildStats->nEntryPages++;
489                         }
490
491                         data.rrlink = InvalidBlockNumber;
492                         data.flags |= GIN_SPLIT_ROOT;
493
494                         GinPageGetOpaque(newrpage)->rightlink = InvalidBlockNumber;
495                         GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
496
497                         /*
498                          * Construct a new root page containing downlinks to the new left
499                          * and right pages. (do this in a temporary copy first rather than
500                          * overwriting the original page directly, so that we can still
501                          * abort gracefully if this fails.)
502                          */
503                         newrootpg = PageGetTempPage(newrpage);
504                         GinInitPage(newrootpg, GinPageGetOpaque(newlpage)->flags & ~(GIN_LEAF | GIN_COMPRESSED), BLCKSZ);
505
506                         btree->fillRoot(btree, newrootpg,
507                                                         BufferGetBlockNumber(lbuffer), newlpage,
508                                                         BufferGetBlockNumber(rbuffer), newrpage);
509                 }
510                 else
511                 {
512                         /* split non-root page */
513                         data.rrlink = savedRightLink;
514
515                         GinPageGetOpaque(newrpage)->rightlink = savedRightLink;
516                         GinPageGetOpaque(newlpage)->flags |= GIN_INCOMPLETE_SPLIT;
517                         GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
518                 }
519
520                 /*
521                  * Ok, we have the new contents of the left page in a temporary copy
522                  * now (newlpage), and the newly-allocated right block has been filled
523                  * in. The original page is still unchanged.
524                  *
525                  * If this is a root split, we also have a temporary page containing
526                  * the new contents of the root. Copy the new left page to a
527                  * newly-allocated block, and initialize the (original) root page the
528                  * new copy. Otherwise, copy over the temporary copy of the new left
529                  * page over the old left page.
530                  */
531
532                 START_CRIT_SECTION();
533
534                 MarkBufferDirty(rbuffer);
535                 MarkBufferDirty(stack->buffer);
536                 if (BufferIsValid(childbuf))
537                         MarkBufferDirty(childbuf);
538
539                 /*
540                  * Restore the temporary copies over the real buffers. But don't free
541                  * the temporary copies yet, WAL record data points to them.
542                  */
543                 if (stack->parent == NULL)
544                 {
545                         MarkBufferDirty(lbuffer);
546                         memcpy(BufferGetPage(stack->buffer), newrootpg, BLCKSZ);
547                         memcpy(BufferGetPage(lbuffer), newlpage, BLCKSZ);
548                         memcpy(BufferGetPage(rbuffer), newrpage, BLCKSZ);
549                 }
550                 else
551                 {
552                         memcpy(BufferGetPage(stack->buffer), newlpage, BLCKSZ);
553                         memcpy(BufferGetPage(rbuffer), newrpage, BLCKSZ);
554                 }
555
556                 /* write WAL record */
557                 if (RelationNeedsWAL(btree->index))
558                 {
559                         XLogRecPtr      recptr;
560
561                         /*
562                          * We just take full page images of all the split pages. Splits
563                          * are uncommon enough that it's not worth complicating the code
564                          * to be more efficient.
565                          */
566                         if (stack->parent == NULL)
567                         {
568                                 XLogRegisterBuffer(0, lbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
569                                 XLogRegisterBuffer(1, rbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
570                                 XLogRegisterBuffer(2, stack->buffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
571                         }
572                         else
573                         {
574                                 XLogRegisterBuffer(0, stack->buffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
575                                 XLogRegisterBuffer(1, rbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
576                         }
577                         if (BufferIsValid(childbuf))
578                                 XLogRegisterBuffer(3, childbuf, 0);
579
580                         XLogRegisterData((char *) &data, sizeof(ginxlogSplit));
581
582                         recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT);
583                         PageSetLSN(BufferGetPage(stack->buffer), recptr);
584                         PageSetLSN(BufferGetPage(rbuffer), recptr);
585                         if (stack->parent == NULL)
586                                 PageSetLSN(BufferGetPage(lbuffer), recptr);
587                         if (BufferIsValid(childbuf))
588                                 PageSetLSN(childpage, recptr);
589                 }
590                 END_CRIT_SECTION();
591
592                 /*
593                  * We can release the lock on the right page now, but keep the
594                  * original buffer locked.
595                  */
596                 UnlockReleaseBuffer(rbuffer);
597                 if (stack->parent == NULL)
598                         UnlockReleaseBuffer(lbuffer);
599
600                 pfree(newlpage);
601                 pfree(newrpage);
602                 if (newrootpg)
603                         pfree(newrootpg);
604
605                 /*
606                  * If we split the root, we're done. Otherwise the split is not
607                  * complete until the downlink for the new page has been inserted to
608                  * the parent.
609                  */
610                 if (stack->parent == NULL)
611                         return true;
612                 else
613                         return false;
614         }
615         else
616         {
617                 elog(ERROR, "unknown return code from GIN placeToPage method: %d", rc);
618                 return false;                   /* keep compiler quiet */
619         }
620 }
621
622 /*
623  * Finish a split by inserting the downlink for the new page to parent.
624  *
625  * On entry, stack->buffer is exclusively locked.
626  *
627  * If freestack is true, all the buffers are released and unlocked as we
628  * crawl up the tree, and 'stack' is freed. Otherwise stack->buffer is kept
629  * locked, and stack is unmodified, except for possibly moving right to find
630  * the correct parent of page.
631  */
632 static void
633 ginFinishSplit(GinBtree btree, GinBtreeStack *stack, bool freestack,
634                            GinStatsData *buildStats)
635 {
636         Page            page;
637         bool            done;
638         bool            first = true;
639
640         /*
641          * freestack == false when we encounter an incompletely split page during
642          * a scan, while freestack == true is used in the normal scenario that a
643          * split is finished right after the initial insert.
644          */
645         if (!freestack)
646                 elog(DEBUG1, "finishing incomplete split of block %u in gin index \"%s\"",
647                          stack->blkno, RelationGetRelationName(btree->index));
648
649         /* this loop crawls up the stack until the insertion is complete */
650         do
651         {
652                 GinBtreeStack *parent = stack->parent;
653                 void       *insertdata;
654                 BlockNumber updateblkno;
655
656                 /* search parent to lock */
657                 LockBuffer(parent->buffer, GIN_EXCLUSIVE);
658
659                 /*
660                  * If the parent page was incompletely split, finish that split first,
661                  * then continue with the current one.
662                  *
663                  * Note: we have to finish *all* incomplete splits we encounter, even
664                  * if we have to move right. Otherwise we might choose as the target a
665                  * page that has no downlink in the parent, and splitting it further
666                  * would fail.
667                  */
668                 if (GinPageIsIncompleteSplit(BufferGetPage(parent->buffer)))
669                         ginFinishSplit(btree, parent, false, buildStats);
670
671                 /* move right if it's needed */
672                 page = BufferGetPage(parent->buffer);
673                 while ((parent->off = btree->findChildPtr(btree, page, stack->blkno, parent->off)) == InvalidOffsetNumber)
674                 {
675                         if (GinPageRightMost(page))
676                         {
677                                 /*
678                                  * rightmost page, but we don't find parent, we should use
679                                  * plain search...
680                                  */
681                                 LockBuffer(parent->buffer, GIN_UNLOCK);
682                                 ginFindParents(btree, stack);
683                                 parent = stack->parent;
684                                 Assert(parent != NULL);
685                                 break;
686                         }
687
688                         parent->buffer = ginStepRight(parent->buffer, btree->index, GIN_EXCLUSIVE);
689                         parent->blkno = BufferGetBlockNumber(parent->buffer);
690                         page = BufferGetPage(parent->buffer);
691
692                         if (GinPageIsIncompleteSplit(BufferGetPage(parent->buffer)))
693                                 ginFinishSplit(btree, parent, false, buildStats);
694                 }
695
696                 /* insert the downlink */
697                 insertdata = btree->prepareDownlink(btree, stack->buffer);
698                 updateblkno = GinPageGetOpaque(BufferGetPage(stack->buffer))->rightlink;
699                 done = ginPlaceToPage(btree, parent,
700                                                           insertdata, updateblkno,
701                                                           stack->buffer, buildStats);
702                 pfree(insertdata);
703
704                 /*
705                  * If the caller requested to free the stack, unlock and release the
706                  * child buffer now. Otherwise keep it pinned and locked, but if we
707                  * have to recurse up the tree, we can unlock the upper pages, only
708                  * keeping the page at the bottom of the stack locked.
709                  */
710                 if (!first || freestack)
711                         LockBuffer(stack->buffer, GIN_UNLOCK);
712                 if (freestack)
713                 {
714                         ReleaseBuffer(stack->buffer);
715                         pfree(stack);
716                 }
717                 stack = parent;
718
719                 first = false;
720         } while (!done);
721
722         /* unlock the parent */
723         LockBuffer(stack->buffer, GIN_UNLOCK);
724
725         if (freestack)
726                 freeGinBtreeStack(stack);
727 }
728
729 /*
730  * Insert a value to tree described by stack.
731  *
732  * The value to be inserted is given in 'insertdata'. Its format depends
733  * on whether this is an entry or data tree, ginInsertValue just passes it
734  * through to the tree-specific callback function.
735  *
736  * During an index build, buildStats is non-null and the counters it contains
737  * are incremented as needed.
738  *
739  * NB: the passed-in stack is freed, as though by freeGinBtreeStack.
740  */
741 void
742 ginInsertValue(GinBtree btree, GinBtreeStack *stack, void *insertdata,
743                            GinStatsData *buildStats)
744 {
745         bool            done;
746
747         /* If the leaf page was incompletely split, finish the split first */
748         if (GinPageIsIncompleteSplit(BufferGetPage(stack->buffer)))
749                 ginFinishSplit(btree, stack, false, buildStats);
750
751         done = ginPlaceToPage(btree, stack,
752                                                   insertdata, InvalidBlockNumber,
753                                                   InvalidBuffer, buildStats);
754         if (done)
755         {
756                 LockBuffer(stack->buffer, GIN_UNLOCK);
757                 freeGinBtreeStack(stack);
758         }
759         else
760                 ginFinishSplit(btree, stack, true, buildStats);
761 }