]> granicus.if.org Git - postgresql/commitdiff
Get rid of the post-recovery cleanup step of GIN page splits.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 27 Nov 2013 17:21:23 +0000 (19:21 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 27 Nov 2013 17:21:23 +0000 (19:21 +0200)
Replace it with an approach similar to what GiST uses: when a page is split,
the left sibling is marked with a flag indicating that the parent hasn't been
updated yet. When the parent is updated, the flag is cleared. If an insertion
steps on a page with the flag set, it will finish split before proceeding
with the insertion.

The post-recovery cleanup mechanism was never totally reliable, as insertion
to the parent could fail e.g because of running out of memory or disk space,
leaving the tree in an inconsistent state.

This also divides the responsibility of WAL-logging more clearly between
the generic ginbtree.c code, and the parts specific to entry and posting
trees. There is now a common WAL record format for insertions and deletions,
which is written by ginbtree.c, followed by tree-specific payload, which is
returned by the placetopage- and split- callbacks.

src/backend/access/gin/ginbtree.c
src/backend/access/gin/gindatapage.c
src/backend/access/gin/ginentrypage.c
src/backend/access/gin/ginxlog.c
src/backend/access/rmgrdesc/gindesc.c
src/include/access/gin.h
src/include/access/gin_private.h
src/include/access/rmgrlist.h
src/include/access/xlog_internal.h

index f3c0dfd128f3531633cb89040deb822d6673eabe..7b9b85dc04c353a3f952fe19cc751e2a3fea4861 100644 (file)
 #include "miscadmin.h"
 #include "utils/rel.h"
 
+static void ginFindParents(GinBtree btree, GinBtreeStack *stack);
+static bool ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
+                          void *insertdata, BlockNumber updateblkno,
+                          Buffer childbuf, GinStatsData *buildStats);
+static void ginFinishSplit(GinBtree btree, GinBtreeStack *stack,
+                          bool freestack, GinStatsData *buildStats);
+
 /*
  * Lock buffer by needed method for search.
  */
@@ -85,6 +92,13 @@ ginFindLeafPage(GinBtree btree, bool searchMode)
 
                access = ginTraverseLock(stack->buffer, searchMode);
 
+               /*
+                * If we're going to modify the tree, finish any incomplete splits we
+                * encounter on the way.
+                */
+               if (!searchMode && GinPageIsIncompleteSplit(page))
+                       ginFinishSplit(btree, stack, false, NULL);
+
                /*
                 * ok, page is correctly locked, we should check to move right ..,
                 * root never has a right link, so small optimization
@@ -101,6 +115,9 @@ ginFindLeafPage(GinBtree btree, bool searchMode)
                        stack->buffer = ginStepRight(stack->buffer, btree->index, access);
                        stack->blkno = rightlink;
                        page = BufferGetPage(stack->buffer);
+
+                       if (!searchMode && GinPageIsIncompleteSplit(page))
+                               ginFinishSplit(btree, stack, false, NULL);
                }
 
                if (GinPageIsLeaf(page))        /* we found, return locked page */
@@ -187,7 +204,7 @@ freeGinBtreeStack(GinBtreeStack *stack)
  * child's offset in stack->parent. The root page is never released, to
  * to prevent conflict with vacuum process.
  */
-void
+static void
 ginFindParents(GinBtree btree, GinBtreeStack *stack)
 {
        Page            page;
@@ -195,58 +212,55 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack)
        BlockNumber blkno,
                                leftmostBlkno;
        OffsetNumber offset;
-       GinBtreeStack *root = stack->parent;
+       GinBtreeStack *root;
        GinBtreeStack *ptr;
 
-       if (!root)
+       /*
+        * Unwind the stack all the way up to the root, leaving only the root
+        * item.
+        *
+        * Be careful not to release the pin on the root page! The pin on root
+        * page is required to lock out concurrent vacuums on the tree.
+        */
+       root = stack->parent;
+       while (root->parent)
        {
-               /* XLog mode... */
-               root = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
-               root->blkno = btree->rootBlkno;
-               root->buffer = ReadBuffer(btree->index, btree->rootBlkno);
-               LockBuffer(root->buffer, GIN_EXCLUSIVE);
-               root->parent = NULL;
+               ReleaseBuffer(root->buffer);
+               root = root->parent;
        }
-       else
-       {
-               /*
-                * find root, we should not release root page until update is
-                * finished!!
-                */
-               while (root->parent)
-               {
-                       ReleaseBuffer(root->buffer);
-                       root = root->parent;
-               }
 
-               Assert(root->blkno == btree->rootBlkno);
-               Assert(BufferGetBlockNumber(root->buffer) == btree->rootBlkno);
-               LockBuffer(root->buffer, GIN_EXCLUSIVE);
-       }
+       Assert(root->blkno == btree->rootBlkno);
+       Assert(BufferGetBlockNumber(root->buffer) == btree->rootBlkno);
        root->off = InvalidOffsetNumber;
 
-       page = BufferGetPage(root->buffer);
-       Assert(!GinPageIsLeaf(page));
-
-       /* check trivial case */
-       if ((root->off = btree->findChildPtr(btree, page, stack->blkno, InvalidOffsetNumber)) != InvalidOffsetNumber)
-       {
-               stack->parent = root;
-               return;
-       }
+       blkno = root->blkno;
+       buffer = root->buffer;
+       offset = InvalidOffsetNumber;
 
-       leftmostBlkno = blkno = btree->getLeftMostChild(btree, page);
-       LockBuffer(root->buffer, GIN_UNLOCK);
-       Assert(blkno != InvalidBlockNumber);
+       ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
 
        for (;;)
        {
-               buffer = ReadBuffer(btree->index, blkno);
                LockBuffer(buffer, GIN_EXCLUSIVE);
                page = BufferGetPage(buffer);
                if (GinPageIsLeaf(page))
                        elog(ERROR, "Lost path");
 
+               if (GinPageIsIncompleteSplit(page))
+               {
+                       Assert(blkno != btree->rootBlkno);
+                       ptr->blkno = blkno;
+                       ptr->buffer = buffer;
+                       /*
+                        * parent may be wrong, but if so, the ginFinishSplit call will
+                        * recurse to call ginFindParents again to fix it.
+                        */
+                       ptr->parent = root;
+                       ptr->off = InvalidOffsetNumber;
+
+                       ginFinishSplit(btree, ptr, false, NULL);
+               }
+
                leftmostBlkno = btree->getLeftMostChild(btree, page);
 
                while ((offset = btree->findChildPtr(btree, page, stack->blkno, InvalidOffsetNumber)) == InvalidOffsetNumber)
@@ -259,11 +273,22 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack)
                        }
                        buffer = ginStepRight(buffer, btree->index, GIN_EXCLUSIVE);
                        page = BufferGetPage(buffer);
+
+                       /* finish any incomplete splits, as above */
+                       if (GinPageIsIncompleteSplit(page))
+                       {
+                               Assert(blkno != btree->rootBlkno);
+                               ptr->blkno = blkno;
+                               ptr->buffer = buffer;
+                               ptr->parent = root;
+                               ptr->off = InvalidOffsetNumber;
+
+                               ginFinishSplit(btree, ptr, false, NULL);
+                       }
                }
 
                if (blkno != InvalidBlockNumber)
                {
-                       ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
                        ptr->blkno = blkno;
                        ptr->buffer = buffer;
                        ptr->parent = root; /* it may be wrong, but in next call we will
@@ -273,7 +298,9 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack)
                        return;
                }
 
+               /* Descend down to next level */
                blkno = leftmostBlkno;
+               buffer = ReadBuffer(btree->index, blkno);
        }
 }
 
@@ -284,19 +311,38 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack)
  * the parent needs to be updated. (a root split returns true as it doesn't
  * need any further action by the caller to complete)
  *
- * When inserting a downlink to a internal page, the existing item at the
- * given location is updated to point to 'updateblkno'.
+ * When inserting a downlink to a internal page, 'childbuf' contains the
+ * child page that was split. Its GIN_INCOMPLETE_SPLIT flag will be cleared
+ * atomically with the insert. Also, the existing item at the given location
+ * is updated to point to 'updateblkno'.
  *
  * stack->buffer is locked on entry, and is kept locked.
  */
 static bool
 ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
                           void *insertdata, BlockNumber updateblkno,
-                          GinStatsData *buildStats)
+                          Buffer childbuf, GinStatsData *buildStats)
 {
        Page            page = BufferGetPage(stack->buffer);
-       XLogRecData *rdata;
+       XLogRecData *payloadrdata;
        bool            fit;
+       uint16          xlflags = 0;
+       Page            childpage = NULL;
+
+       if (GinPageIsData(page))
+               xlflags |= GIN_INSERT_ISDATA;
+       if (GinPageIsLeaf(page))
+       {
+               xlflags |= GIN_INSERT_ISLEAF;
+               Assert(!BufferIsValid(childbuf));
+               Assert(updateblkno == InvalidBlockNumber);
+       }
+       else
+       {
+               Assert(BufferIsValid(childbuf));
+               Assert(updateblkno != InvalidBlockNumber);
+               childpage = BufferGetPage(childbuf);
+       }
 
        /*
         * Try to put the incoming tuple on the page. If it doesn't fit,
@@ -306,17 +352,63 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
        START_CRIT_SECTION();
        fit = btree->placeToPage(btree, stack->buffer, stack->off,
                                                         insertdata, updateblkno,
-                                                        &rdata);
+                                                        &payloadrdata);
        if (fit)
        {
                MarkBufferDirty(stack->buffer);
 
+               /* An insert to an internal page finishes the split of the child. */
+               if (childbuf != InvalidBuffer)
+               {
+                       GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
+                       MarkBufferDirty(childbuf);
+               }
+
                if (RelationNeedsWAL(btree->index))
                {
                        XLogRecPtr      recptr;
+                       XLogRecData rdata[3];
+                       ginxlogInsert xlrec;
+                       BlockIdData     childblknos[2];
+
+                       xlrec.node = btree->index->rd_node;
+                       xlrec.blkno = BufferGetBlockNumber(stack->buffer);
+                       xlrec.offset = stack->off;
+                       xlrec.flags = xlflags;
+
+                       rdata[0].buffer = InvalidBuffer;
+                       rdata[0].data = (char *) &xlrec;
+                       rdata[0].len = sizeof(ginxlogInsert);
+
+                       /*
+                        * Log information about child if this was an insertion of a
+                        * downlink.
+                        */
+                       if (childbuf != InvalidBuffer)
+                       {
+                               rdata[0].next = &rdata[1];
+
+                               BlockIdSet(&childblknos[0], BufferGetBlockNumber(childbuf));
+                               BlockIdSet(&childblknos[1], GinPageGetOpaque(childpage)->rightlink);
+
+                               rdata[1].buffer = InvalidBuffer;
+                               rdata[1].data = (char *) childblknos;
+                               rdata[1].len = sizeof(BlockIdData) * 2;
+                               rdata[1].next = &rdata[2];
+
+                               rdata[2].buffer = childbuf;
+                               rdata[2].buffer_std = false;
+                               rdata[2].data = NULL;
+                               rdata[2].len = 0;
+                               rdata[2].next = payloadrdata;
+                       }
+                       else
+                               rdata[0].next = payloadrdata;
 
                        recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT, rdata);
                        PageSetLSN(page, recptr);
+                       if (childbuf != InvalidBuffer)
+                               PageSetLSN(childpage, recptr);
                }
 
                END_CRIT_SECTION();
@@ -329,13 +421,16 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
                Buffer          rbuffer;
                Page            newlpage;
                BlockNumber savedRightLink;
-               GinBtreeStack *parent;
-               Page            lpage,
-                                       rpage;
+               Page            rpage;
+               XLogRecData rdata[2];
+               ginxlogSplit data;
+               Buffer          lbuffer = InvalidBuffer;
+               Page            newrootpg = NULL;
 
                END_CRIT_SECTION();
 
                rbuffer = GinNewBuffer(btree->index);
+
                /* During index build, count the new page */
                if (buildStats)
                {
@@ -353,21 +448,50 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
                 */
                newlpage = btree->splitPage(btree, stack->buffer, rbuffer, stack->off,
                                                                        insertdata, updateblkno,
-                                                                       &rdata);
+                                                                       &payloadrdata);
 
-               ((ginxlogSplit *) (rdata->data))->rootBlkno = btree->rootBlkno;
+               data.node = btree->index->rd_node;
+               data.rblkno = BufferGetBlockNumber(rbuffer);
+               data.flags = xlflags;
+               if (childbuf != InvalidBuffer)
+               {
+                       Page childpage = BufferGetPage(childbuf);
+                       GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
 
-               parent = stack->parent;
+                       data.leftChildBlkno = BufferGetBlockNumber(childbuf);
+                       data.rightChildBlkno = GinPageGetOpaque(childpage)->rightlink;
+               }
+               else
+                       data.leftChildBlkno = data.rightChildBlkno = InvalidBlockNumber;
+
+               rdata[0].buffer = InvalidBuffer;
+               rdata[0].data = (char *) &data;
+               rdata[0].len = sizeof(ginxlogSplit);
+
+               if (childbuf != InvalidBuffer)
+               {
+                       rdata[0].next = &rdata[1];
+
+                       rdata[1].buffer = childbuf;
+                       rdata[1].buffer_std = false;
+                       rdata[1].data = NULL;
+                       rdata[1].len = 0;
+                       rdata[1].next = payloadrdata;
+               }
+               else
+                       rdata[0].next = payloadrdata;
+
+               rpage = BufferGetPage(rbuffer);
 
-               if (parent == NULL)
+               if (stack->parent == NULL)
                {
                        /*
                         * split root, so we need to allocate new left page and place
                         * pointer on root to left and right page
                         */
-                       Buffer          lbuffer = GinNewBuffer(btree->index);
+                       lbuffer = GinNewBuffer(btree->index);
 
-                       /* During index build, count the new page */
+                       /* During index build, count the newly-added root page */
                        if (buildStats)
                        {
                                if (btree->isData)
@@ -376,82 +500,97 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
                                        buildStats->nEntryPages++;
                        }
 
-                       ((ginxlogSplit *) (rdata->data))->isRootSplit = TRUE;
-                       ((ginxlogSplit *) (rdata->data))->rrlink = InvalidBlockNumber;
-
-                       lpage = BufferGetPage(lbuffer);
-                       rpage = BufferGetPage(rbuffer);
+                       /*
+                        * root never has a right-link, so we borrow the rrlink field to
+                        * store the root block number.
+                        */
+                       data.rrlink = BufferGetBlockNumber(stack->buffer);
+                       data.lblkno = BufferGetBlockNumber(lbuffer);
+                       data.flags |= GIN_SPLIT_ROOT;
 
                        GinPageGetOpaque(rpage)->rightlink = InvalidBlockNumber;
                        GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
-                       ((ginxlogSplit *) (rdata->data))->lblkno = BufferGetBlockNumber(lbuffer);
-
-                       START_CRIT_SECTION();
-
-                       GinInitBuffer(stack->buffer, GinPageGetOpaque(newlpage)->flags & ~GIN_LEAF);
-                       PageRestoreTempPage(newlpage, lpage);
-                       btree->fillRoot(btree, stack->buffer, lbuffer, rbuffer);
-
-                       MarkBufferDirty(rbuffer);
-                       MarkBufferDirty(lbuffer);
-                       MarkBufferDirty(stack->buffer);
-
-                       if (RelationNeedsWAL(btree->index))
-                       {
-                               XLogRecPtr      recptr;
-
-                               recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
-                               PageSetLSN(page, recptr);
-                               PageSetLSN(lpage, recptr);
-                               PageSetLSN(rpage, recptr);
-                       }
-
-                       UnlockReleaseBuffer(rbuffer);
-                       UnlockReleaseBuffer(lbuffer);
-                       END_CRIT_SECTION();
 
-                       /* During index build, count the newly-added root page */
-                       if (buildStats)
-                       {
-                               if (btree->isData)
-                                       buildStats->nDataPages++;
-                               else
-                                       buildStats->nEntryPages++;
-                       }
+                       /*
+                        * Construct a new root page containing downlinks to the new left
+                        * and right pages. (do this in a temporary copy first rather
+                        * than overwriting the original page directly, so that we can still
+                        * abort gracefully if this fails.)
+                        */
+                       newrootpg = PageGetTempPage(rpage);
+                       GinInitPage(newrootpg, GinPageGetOpaque(newlpage)->flags & ~GIN_LEAF, BLCKSZ);
 
-                       return true;
+                       btree->fillRoot(btree, newrootpg,
+                                                       BufferGetBlockNumber(lbuffer), newlpage,
+                                                       BufferGetBlockNumber(rbuffer), rpage);
                }
                else
                {
                        /* split non-root page */
-                       ((ginxlogSplit *) (rdata->data))->isRootSplit = FALSE;
-                       ((ginxlogSplit *) (rdata->data))->rrlink = savedRightLink;
-
-                       lpage = BufferGetPage(stack->buffer);
-                       rpage = BufferGetPage(rbuffer);
+                       data.rrlink = savedRightLink;
+                       data.lblkno = BufferGetBlockNumber(stack->buffer);
 
                        GinPageGetOpaque(rpage)->rightlink = savedRightLink;
+                       GinPageGetOpaque(newlpage)->flags |= GIN_INCOMPLETE_SPLIT;
                        GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
+               }
+
+               /*
+                * Ok, we have the new contents of the left page in a temporary copy
+                * now (newlpage), and the newly-allocated right block has been filled
+                * in. The original page is still unchanged.
+                *
+                * If this is a root split, we also have a temporary page containing
+                * the new contents of the root. Copy the new left page to a
+                * newly-allocated block, and initialize the (original) root page the
+                * new copy. Otherwise, copy over the temporary copy of the new left
+                * page over the old left page.
+                */
 
-                       START_CRIT_SECTION();
-                       PageRestoreTempPage(newlpage, lpage);
+               START_CRIT_SECTION();
 
-                       MarkBufferDirty(rbuffer);
-                       MarkBufferDirty(stack->buffer);
+               MarkBufferDirty(rbuffer);
 
-                       if (RelationNeedsWAL(btree->index))
-                       {
-                               XLogRecPtr      recptr;
+               if (stack->parent == NULL)
+               {
+                       PageRestoreTempPage(newlpage, BufferGetPage(lbuffer));
+                       MarkBufferDirty(lbuffer);
+                       newlpage = newrootpg;
+               }
 
-                               recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
-                               PageSetLSN(lpage, recptr);
-                               PageSetLSN(rpage, recptr);
-                       }
-                       UnlockReleaseBuffer(rbuffer);
-                       END_CRIT_SECTION();
+               PageRestoreTempPage(newlpage, BufferGetPage(stack->buffer));
+               MarkBufferDirty(stack->buffer);
 
-                       return false;
+               /* write WAL record */
+               if (RelationNeedsWAL(btree->index))
+               {
+                       XLogRecPtr      recptr;
+
+                       recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
+                       PageSetLSN(BufferGetPage(stack->buffer), recptr);
+                       PageSetLSN(rpage, recptr);
+                       if (stack->parent == NULL)
+                               PageSetLSN(BufferGetPage(lbuffer), recptr);
                }
+               END_CRIT_SECTION();
+
+               /*
+                * We can release the lock on the right page now, but keep the
+                * original buffer locked.
+                */
+               UnlockReleaseBuffer(rbuffer);
+               if (stack->parent == NULL)
+                       UnlockReleaseBuffer(lbuffer);
+
+               /*
+                * If we split the root, we're done. Otherwise the split is not
+                * complete until the downlink for the new page has been inserted to
+                * the parent.
+                */
+               if (stack->parent == NULL)
+                       return true;
+               else
+                       return false;
        }
 }
 
@@ -460,13 +599,27 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
  *
  * On entry, stack->buffer is exclusively locked.
  *
- * NB: the passed-in stack is freed, as though by freeGinBtreeStack.
+ * If freestack is true, all the buffers are released and unlocked as we
+ * crawl up the tree, and 'stack' is freed. Otherwise stack->buffer is kept
+ * locked, and stack is unmodified, except for possibly moving right to find
+ * the correct parent of page.
  */
-void
-ginFinishSplit(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
+static void
+ginFinishSplit(GinBtree btree, GinBtreeStack *stack, bool freestack,
+                          GinStatsData *buildStats)
 {
        Page            page;
        bool            done;
+       bool            first = true;
+
+       /*
+        * freestack == false when we encounter an incompletely split page during a
+        * scan, while freestack == true is used in the normal scenario that a
+        * split is finished right after the initial insert.
+        */
+       if (!freestack)
+               elog(DEBUG1, "finishing incomplete split of block %u in gin index \"%s\"",
+                        stack->blkno, RelationGetRelationName(btree->index));
 
        /* this loop crawls up the stack until the insertion is complete */
        do
@@ -475,19 +628,26 @@ ginFinishSplit(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
                void       *insertdata;
                BlockNumber updateblkno;
 
-               insertdata = btree->prepareDownlink(btree, stack->buffer);
-               updateblkno = GinPageGetOpaque(BufferGetPage(stack->buffer))->rightlink;
-
                /* search parent to lock */
                LockBuffer(parent->buffer, GIN_EXCLUSIVE);
 
+               /*
+                * If the parent page was incompletely split, finish that split first,
+                * then continue with the current one.
+                *
+                * Note: we have to finish *all* incomplete splits we encounter, even
+                * if we have to move right. Otherwise we might choose as the target
+                * a page that has no downlink in the parent, and splitting it further
+                * would fail.
+                */
+               if (GinPageIsIncompleteSplit(BufferGetPage(parent->buffer)))
+                       ginFinishSplit(btree, parent, false, buildStats);
+
                /* move right if it's needed */
                page = BufferGetPage(parent->buffer);
                while ((parent->off = btree->findChildPtr(btree, page, stack->blkno, parent->off)) == InvalidOffsetNumber)
                {
-                       BlockNumber rightlink = GinPageGetOpaque(page)->rightlink;
-
-                       if (rightlink == InvalidBlockNumber)
+                       if (GinPageRightMost(page))
                        {
                                /*
                                 * rightmost page, but we don't find parent, we should use
@@ -501,25 +661,44 @@ ginFinishSplit(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
                        }
 
                        parent->buffer = ginStepRight(parent->buffer, btree->index, GIN_EXCLUSIVE);
-                       parent->blkno = rightlink;
+                       parent->blkno = BufferGetBlockNumber(parent->buffer);
                        page = BufferGetPage(parent->buffer);
-               }
 
-               /* release the child */
-               UnlockReleaseBuffer(stack->buffer);
-               pfree(stack);
-               stack = parent;
+                       if (GinPageIsIncompleteSplit(BufferGetPage(parent->buffer)))
+                               ginFinishSplit(btree, parent, false, buildStats);
+               }
 
-               /* insert the downlink to parent */
-               done = ginPlaceToPage(btree, stack,
+               /* insert the downlink */
+               insertdata = btree->prepareDownlink(btree, stack->buffer);
+               updateblkno = GinPageGetOpaque(BufferGetPage(stack->buffer))->rightlink;
+               done = ginPlaceToPage(btree, parent,
                                                          insertdata, updateblkno,
-                                                         buildStats);
+                                                         stack->buffer, buildStats);
                pfree(insertdata);
+
+               /*
+                * If the caller requested to free the stack, unlock and release the
+                * child buffer now. Otherwise keep it pinned and locked, but if we
+                * have to recurse up the tree, we can unlock the upper pages, only
+                * keeping the page at the bottom of the stack locked.
+                */
+               if (!first || freestack)
+                       LockBuffer(stack->buffer, GIN_UNLOCK);
+               if (freestack)
+               {
+                       ReleaseBuffer(stack->buffer);
+                       pfree(stack);
+               }
+               stack = parent;
+
+               first = false;
        } while (!done);
+
+       /* unlock the parent */
        LockBuffer(stack->buffer, GIN_UNLOCK);
 
-       /* free the rest of the stack */
-       freeGinBtreeStack(stack);
+       if (freestack)
+               freeGinBtreeStack(stack);
 }
 
 /*
@@ -540,14 +719,18 @@ ginInsertValue(GinBtree btree, GinBtreeStack *stack, void *insertdata,
 {
        bool            done;
 
+       /* If the leaf page was incompletely split, finish the split first */
+       if (GinPageIsIncompleteSplit(BufferGetPage(stack->buffer)))
+               ginFinishSplit(btree, stack, false, buildStats);
+
        done = ginPlaceToPage(btree, stack,
                                                  insertdata, InvalidBlockNumber,
-                                                 buildStats);
+                                                 InvalidBuffer, buildStats);
        if (done)
        {
                LockBuffer(stack->buffer, GIN_UNLOCK);
                freeGinBtreeStack(stack);
        }
        else
-               ginFinishSplit(btree, stack, buildStats);
+               ginFinishSplit(btree, stack, true, buildStats);
 }
index 14e5e8b61f48aa299b0a56f31a6ece9bd42e9c0c..5221f5e92f00564e8f1ad189351f037b72e797b6 100644 (file)
@@ -227,6 +227,7 @@ GinDataPageAddItemPointer(Page page, ItemPointer data, OffsetNumber offset)
        OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
        char       *ptr;
 
+       Assert(ItemPointerIsValid(data));
        Assert(GinPageIsLeaf(page));
 
        if (offset == InvalidOffsetNumber)
@@ -255,6 +256,7 @@ GinDataPageAddPostingItem(Page page, PostingItem *data, OffsetNumber offset)
        OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
        char       *ptr;
 
+       Assert(PostingItemGetBlockNumber(data) != InvalidBlockNumber);
        Assert(!GinPageIsLeaf(page));
 
        if (offset == InvalidOffsetNumber)
@@ -338,11 +340,8 @@ dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
                                XLogRecData **prdata)
 {
        Page            page = BufferGetPage(buf);
-       int                     cnt = 0;
-
        /* these must be static so they can be returned to caller */
-       static XLogRecData rdata[3];
-       static ginxlogInsert data;
+       static XLogRecData rdata[2];
 
        /* quick exit if it doesn't fit */
        if (!dataIsEnoughSpace(btree, buf, off, insertdata))
@@ -359,45 +358,10 @@ dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
                PostingItemSetBlockNumber(pitem, updateblkno);
        }
 
-       data.updateBlkno = updateblkno;
-       data.node = btree->index->rd_node;
-       data.blkno = BufferGetBlockNumber(buf);
-       data.offset = off;
-       data.nitem = 1;
-       data.isDelete = FALSE;
-       data.isData = TRUE;
-       data.isLeaf = GinPageIsLeaf(page) ? TRUE : FALSE;
-
-       /*
-        * Prevent full page write if child's split occurs. That is needed to
-        * remove incomplete splits while replaying WAL
-        *
-        * data.updateBlkno contains new block number (of newly created right
-        * page) for recently splited page.
-        */
-       if (data.updateBlkno == InvalidBlockNumber)
-       {
-               rdata[0].buffer = buf;
-               rdata[0].buffer_std = FALSE;
-               rdata[0].data = NULL;
-               rdata[0].len = 0;
-               rdata[0].next = &rdata[1];
-               cnt++;
-       }
-
-       rdata[cnt].buffer = InvalidBuffer;
-       rdata[cnt].data = (char *) &data;
-       rdata[cnt].len = sizeof(ginxlogInsert);
-       rdata[cnt].next = &rdata[cnt + 1];
-       cnt++;
-
-       rdata[cnt].buffer = InvalidBuffer;
-       /* data and len filled in below */
-       rdata[cnt].next = NULL;
-
        if (GinPageIsLeaf(page))
        {
                GinBtreeDataLeafInsertData *items = insertdata;
+               static ginxlogInsertDataLeaf data;
                uint32          savedPos = items->curitem;
 
                if (GinPageRightMost(page) && off > GinPageGetOpaque(page)->maxoff)
@@ -415,10 +379,18 @@ dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
                {
                        GinDataPageAddItemPointer(page, items->items + items->curitem, off);
                        items->curitem++;
+                       data.nitem = 1;
                }
 
-               rdata[cnt].data = (char *) &items->items[savedPos];
-               rdata[cnt].len = sizeof(ItemPointerData) * data.nitem;
+               rdata[0].buffer = InvalidBuffer;
+               rdata[0].data = (char *) &data;
+               rdata[0].len = offsetof(ginxlogInsertDataLeaf, items);
+               rdata[0].next = &rdata[1];
+
+               rdata[1].buffer = InvalidBuffer;
+               rdata[1].data = (char *) &items->items[savedPos];
+               rdata[1].len = sizeof(ItemPointerData) * data.nitem;
+               rdata[1].next = NULL;
        }
        else
        {
@@ -426,8 +398,10 @@ dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
 
                GinDataPageAddPostingItem(page, pitem, off);
 
-               rdata[cnt].data = (char *) pitem;
-               rdata[cnt].len = sizeof(PostingItem);
+               rdata[0].buffer = InvalidBuffer;
+               rdata[0].data = (char *) pitem;
+               rdata[0].len = sizeof(PostingItem);
+               rdata[0].next = NULL;
        }
 
        return true;
@@ -456,8 +430,8 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off,
        Size            freeSpace;
 
        /* these must be static so they can be returned to caller */
-       static ginxlogSplit data;
-       static XLogRecData rdata[4];
+       static ginxlogSplitData data;
+       static XLogRecData rdata[2];
        static char vector[2 * BLCKSZ];
 
        GinInitPage(rpage, GinPageGetOpaque(lpage)->flags, pageSize);
@@ -488,6 +462,7 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off,
 
        if (isleaf && GinPageRightMost(lpage) && off > GinPageGetOpaque(lpage)->maxoff)
        {
+               /* append new items to the end */
                GinBtreeDataLeafInsertData *items = insertdata;
 
                while (items->curitem < items->nitem &&
@@ -566,25 +541,18 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off,
        bound = GinDataPageGetRightBound(rpage);
        *bound = oldbound;
 
-       data.node = btree->index->rd_node;
-       data.rootBlkno = InvalidBlockNumber;
-       data.lblkno = BufferGetBlockNumber(lbuf);
-       data.rblkno = BufferGetBlockNumber(rbuf);
        data.separator = separator;
        data.nitem = maxoff;
-       data.isData = TRUE;
-       data.isLeaf = GinPageIsLeaf(lpage) ? TRUE : FALSE;
-       data.isRootSplit = FALSE;
        data.rightbound = oldbound;
 
        rdata[0].buffer = InvalidBuffer;
        rdata[0].data = (char *) &data;
-       rdata[0].len = sizeof(ginxlogSplit);
+       rdata[0].len = sizeof(ginxlogSplitData);
        rdata[0].next = &rdata[1];
 
        rdata[1].buffer = InvalidBuffer;
        rdata[1].data = vector;
-       rdata[1].len = MAXALIGN(maxoff * sizeofitem);
+       rdata[1].len = maxoff * sizeofitem;
        rdata[1].next = NULL;
 
        return lpage;
@@ -610,21 +578,18 @@ dataPrepareDownlink(GinBtree btree, Buffer lbuf)
  * Also called from ginxlog, should not use btree
  */
 void
-ginDataFillRoot(GinBtree btree, Buffer root, Buffer lbuf, Buffer rbuf)
+ginDataFillRoot(GinBtree btree, Page root, BlockNumber lblkno, Page lpage, BlockNumber rblkno, Page rpage)
 {
-       Page            page = BufferGetPage(root),
-                               lpage = BufferGetPage(lbuf),
-                               rpage = BufferGetPage(rbuf);
        PostingItem li,
                                ri;
 
        li.key = *GinDataPageGetRightBound(lpage);
-       PostingItemSetBlockNumber(&li, BufferGetBlockNumber(lbuf));
-       GinDataPageAddPostingItem(page, &li, InvalidOffsetNumber);
+       PostingItemSetBlockNumber(&li, lblkno);
+       GinDataPageAddPostingItem(root, &li, InvalidOffsetNumber);
 
        ri.key = *GinDataPageGetRightBound(rpage);
-       PostingItemSetBlockNumber(&ri, BufferGetBlockNumber(rbuf));
-       GinDataPageAddPostingItem(page, &ri, InvalidOffsetNumber);
+       PostingItemSetBlockNumber(&ri, rblkno);
+       GinDataPageAddPostingItem(root, &ri, InvalidOffsetNumber);
 }
 
 /*
index 1949f24b977125255498d459b8978d8c5ff66fc9..89cde4aec0107664e46c0c4b917d1cbe727b8ad5 100644 (file)
@@ -504,7 +504,7 @@ entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
 
        /* these must be static so they can be returned to caller */
        static XLogRecData rdata[3];
-       static ginxlogInsert data;
+       static ginxlogInsertEntry data;
 
        /* quick exit if it doesn't fit */
        if (!entryIsEnoughSpace(btree, buf, off, insertData))
@@ -512,7 +512,6 @@ entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
 
        *prdata = rdata;
        entryPreparePage(btree, page, off, insertData, updateblkno);
-       data.updateBlkno = updateblkno;
 
        placed = PageAddItem(page,
                                                 (Item) insertData->entry,
@@ -522,34 +521,11 @@ entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
                elog(ERROR, "failed to add item to index page in \"%s\"",
                         RelationGetRelationName(btree->index));
 
-       data.node = btree->index->rd_node;
-       data.blkno = BufferGetBlockNumber(buf);
-       data.offset = off;
-       data.nitem = 1;
        data.isDelete = insertData->isDelete;
-       data.isData = false;
-       data.isLeaf = GinPageIsLeaf(page) ? TRUE : FALSE;
-
-       /*
-        * Prevent full page write if child's split occurs. That is needed to
-        * remove incomplete splits while replaying WAL
-        *
-        * data.updateBlkno contains new block number (of newly created right
-        * page) for recently splited page.
-        */
-       if (data.updateBlkno == InvalidBlockNumber)
-       {
-               rdata[0].buffer = buf;
-               rdata[0].buffer_std = TRUE;
-               rdata[0].data = NULL;
-               rdata[0].len = 0;
-               rdata[0].next = &rdata[1];
-               cnt++;
-       }
 
        rdata[cnt].buffer = InvalidBuffer;
        rdata[cnt].data = (char *) &data;
-       rdata[cnt].len = sizeof(ginxlogInsert);
+       rdata[cnt].len = offsetof(ginxlogInsertEntry, tuple);
        rdata[cnt].next = &rdata[cnt + 1];
        cnt++;
 
@@ -577,6 +553,7 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off,
                                maxoff,
                                separator = InvalidOffsetNumber;
        Size            totalsize = 0;
+       Size            tupstoresize;
        Size            lsize = 0,
                                size;
        char       *ptr;
@@ -588,18 +565,18 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off,
 
        /* these must be static so they can be returned to caller */
        static XLogRecData rdata[2];
-       static ginxlogSplit data;
+       static ginxlogSplitEntry data;
        static char tupstore[2 * BLCKSZ];
 
        *prdata = rdata;
-       data.leftChildBlkno = (GinPageIsLeaf(lpage)) ?
-               InvalidOffsetNumber : GinGetDownlink(insertData->entry);
-       data.updateBlkno = updateblkno;
        entryPreparePage(btree, lpage, off, insertData, updateblkno);
 
+       /*
+        * First, append all the existing tuples and the new tuple we're inserting
+        * one after another in a temporary workspace.
+        */
        maxoff = PageGetMaxOffsetNumber(lpage);
        ptr = tupstore;
-
        for (i = FirstOffsetNumber; i <= maxoff; i++)
        {
                if (i == off)
@@ -624,7 +601,12 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off,
                ptr += size;
                totalsize += size + sizeof(ItemIdData);
        }
+       tupstoresize = ptr - tupstore;
 
+       /*
+        * Initialize the left and right pages, and copy all the tuples back to
+        * them.
+        */
        GinInitPage(rpage, GinPageGetOpaque(lpage)->flags, pageSize);
        GinInitPage(lpage, GinPageGetOpaque(rpage)->flags, pageSize);
 
@@ -654,24 +636,17 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off,
                ptr += MAXALIGN(IndexTupleSize(itup));
        }
 
-       data.node = btree->index->rd_node;
-       data.rootBlkno = InvalidBlockNumber;
-       data.lblkno = BufferGetBlockNumber(lbuf);
-       data.rblkno = BufferGetBlockNumber(rbuf);
        data.separator = separator;
        data.nitem = maxoff;
-       data.isData = FALSE;
-       data.isLeaf = GinPageIsLeaf(lpage) ? TRUE : FALSE;
-       data.isRootSplit = FALSE;
 
        rdata[0].buffer = InvalidBuffer;
        rdata[0].data = (char *) &data;
-       rdata[0].len = sizeof(ginxlogSplit);
+       rdata[0].len = sizeof(ginxlogSplitEntry);
        rdata[0].next = &rdata[1];
 
        rdata[1].buffer = InvalidBuffer;
        rdata[1].data = tupstore;
-       rdata[1].len = MAXALIGN(totalsize);
+       rdata[1].len = tupstoresize;
        rdata[1].next = NULL;
 
        return lpage;
@@ -702,24 +677,19 @@ entryPrepareDownlink(GinBtree btree, Buffer lbuf)
  * Also called from ginxlog, should not use btree
  */
 void
-ginEntryFillRoot(GinBtree btree, Buffer root, Buffer lbuf, Buffer rbuf)
+ginEntryFillRoot(GinBtree btree, Page root,
+                                BlockNumber lblkno, Page lpage,
+                                BlockNumber rblkno, Page rpage)
 {
-       Page            page = BufferGetPage(root);
-       Page            lpage = BufferGetPage(lbuf);
-       Page            rpage = BufferGetPage(rbuf);
        IndexTuple      itup;
 
-       itup = GinFormInteriorTuple(getRightMostTuple(lpage),
-                                                               lpage,
-                                                               BufferGetBlockNumber(lbuf));
-       if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
+       itup = GinFormInteriorTuple(getRightMostTuple(lpage), lpage, lblkno);
+       if (PageAddItem(root, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
                elog(ERROR, "failed to add item to index root page");
        pfree(itup);
 
-       itup = GinFormInteriorTuple(getRightMostTuple(rpage),
-                                                               rpage,
-                                                               BufferGetBlockNumber(rbuf));
-       if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
+       itup = GinFormInteriorTuple(getRightMostTuple(rpage), rpage, rblkno);
+       if (PageAddItem(root, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
                elog(ERROR, "failed to add item to index root page");
        pfree(itup);
 }
index ca7bee1f3408b8f4443d154d8dec0b5f783232b0..4e43ae9feb156ca56d813fefd61f068784eb7727 100644 (file)
 #include "utils/memutils.h"
 
 static MemoryContext opCtx;            /* working memory for operations */
-static MemoryContext topCtx;
-
-typedef struct ginIncompleteSplit
-{
-       RelFileNode node;
-       BlockNumber leftBlkno;
-       BlockNumber rightBlkno;
-       BlockNumber rootBlkno;
-} ginIncompleteSplit;
-
-static List *incomplete_splits;
 
 static void
-pushIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber rightBlkno, BlockNumber rootBlkno)
+ginRedoClearIncompleteSplit(XLogRecPtr lsn, RelFileNode node, BlockNumber blkno)
 {
-       ginIncompleteSplit *split;
-
-       MemoryContextSwitchTo(topCtx);
-
-       split = palloc(sizeof(ginIncompleteSplit));
-
-       split->node = node;
-       split->leftBlkno = leftBlkno;
-       split->rightBlkno = rightBlkno;
-       split->rootBlkno = rootBlkno;
-
-       incomplete_splits = lappend(incomplete_splits, split);
-
-       MemoryContextSwitchTo(opCtx);
-}
+       Buffer          buffer;
+       Page            page;
 
-static void
-forgetIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber updateBlkno)
-{
-       ListCell   *l;
+       buffer = XLogReadBuffer(node, blkno, false);
+       if (!BufferIsValid(buffer))
+               return;                                 /* page was deleted, nothing to do */
+       page = (Page) BufferGetPage(buffer);
 
-       foreach(l, incomplete_splits)
+       if (lsn > PageGetLSN(page))
        {
-               ginIncompleteSplit *split = (ginIncompleteSplit *) lfirst(l);
+               GinPageGetOpaque(page)->flags &= ~GIN_INCOMPLETE_SPLIT;
 
-               if (RelFileNodeEquals(node, split->node) &&
-                       leftBlkno == split->leftBlkno &&
-                       updateBlkno == split->rightBlkno)
-               {
-                       incomplete_splits = list_delete_ptr(incomplete_splits, split);
-                       pfree(split);
-                       break;
-               }
+               PageSetLSN(page, lsn);
+               MarkBufferDirty(buffer);
        }
+
+       UnlockReleaseBuffer(buffer);
 }
 
 static void
@@ -128,44 +100,105 @@ ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record)
 }
 
 static void
-ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
+ginRedoInsertEntry(Buffer buffer, OffsetNumber offset, BlockNumber rightblkno,
+                                  void *rdata)
 {
-       ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record);
-       Buffer          buffer;
-       Page            page;
+       Page            page = BufferGetPage(buffer);
+       ginxlogInsertEntry *data = (ginxlogInsertEntry *) rdata;
+       IndexTuple      itup;
 
-       /* first, forget any incomplete split this insertion completes */
-       if (data->isData)
+       if (rightblkno != InvalidBlockNumber)
        {
-               Assert(data->isDelete == FALSE);
-               if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
-               {
-                       PostingItem *pitem;
+               /* update link to right page after split */
+               Assert(!GinPageIsLeaf(page));
+               Assert(offset >= FirstOffsetNumber && offset <= PageGetMaxOffsetNumber(page));
+               itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offset));
+               GinSetDownlink(itup, rightblkno);
+       }
 
-                       pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
-                       forgetIncompleteSplit(data->node,
-                                                                 PostingItemGetBlockNumber(pitem),
-                                                                 data->updateBlkno);
-               }
+       if (data->isDelete)
+       {
+               Assert(GinPageIsLeaf(page));
+               Assert(offset >= FirstOffsetNumber && offset <= PageGetMaxOffsetNumber(page));
+               PageIndexTupleDelete(page, offset);
+       }
+
+       itup = &data->tuple;
+
+       if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), offset, false, false) == InvalidOffsetNumber)
+       {
+               RelFileNode node;
+               ForkNumber forknum;
+               BlockNumber blknum;
 
+               BufferGetTag(buffer, &node, &forknum, &blknum);
+               elog(ERROR, "failed to add item to index page in %u/%u/%u",
+                        node.spcNode, node.dbNode, node.relNode);
+       }
+}
+
+static void
+ginRedoInsertData(Buffer buffer, OffsetNumber offset, BlockNumber rightblkno,
+                                 void *rdata)
+{
+       Page            page = BufferGetPage(buffer);
+
+       if (GinPageIsLeaf(page))
+       {
+               ginxlogInsertDataLeaf *data = (ginxlogInsertDataLeaf *) rdata;
+               ItemPointerData *items = data->items;
+               OffsetNumber i;
+
+               for (i = 0; i < data->nitem; i++)
+                       GinDataPageAddItemPointer(page, &items[i], offset + i);
        }
        else
        {
-               if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
-               {
-                       IndexTuple      itup;
+               PostingItem *pitem = (PostingItem *) rdata;
+               PostingItem *oldpitem;
 
-                       itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
-                       forgetIncompleteSplit(data->node,
-                                                                 GinGetDownlink(itup),
-                                                                 data->updateBlkno);
-               }
+               /* update link to right page after split */
+               oldpitem = GinDataPageGetPostingItem(page, offset);
+               PostingItemSetBlockNumber(oldpitem, rightblkno);
+
+               GinDataPageAddPostingItem(page, pitem, offset);
+       }
+}
+
+static void
+ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
+{
+       ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record);
+       Buffer          buffer;
+       Page            page;
+       char       *payload;
+       BlockNumber leftChildBlkno = InvalidBlockNumber;
+       BlockNumber rightChildBlkno = InvalidBlockNumber;
+       bool            isLeaf = (data->flags & GIN_INSERT_ISLEAF) != 0;
+
+       payload = XLogRecGetData(record) + sizeof(ginxlogInsert);
+
+       /*
+        * First clear incomplete-split flag on child page if this finishes
+        * a split.
+        */
+       if (!isLeaf)
+       {
+               leftChildBlkno = BlockIdGetBlockNumber((BlockId) payload);
+               payload += sizeof(BlockIdData);
+               rightChildBlkno = BlockIdGetBlockNumber((BlockId) payload);
+               payload += sizeof(BlockIdData);
+
+               if (record->xl_info & XLR_BKP_BLOCK(0))
+                       (void) RestoreBackupBlock(lsn, record, 0, false, false);
+               else
+                       ginRedoClearIncompleteSplit(lsn, data->node, leftChildBlkno);
        }
 
        /* If we have a full-page image, restore it and we're done */
-       if (record->xl_info & XLR_BKP_BLOCK(0))
+       if (record->xl_info & XLR_BKP_BLOCK(isLeaf ? 0 : 1))
        {
-               (void) RestoreBackupBlock(lsn, record, 0, false, false);
+               (void) RestoreBackupBlock(lsn, record, isLeaf ? 0 : 1, false, false);
                return;
        }
 
@@ -176,74 +209,82 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
 
        if (lsn > PageGetLSN(page))
        {
-               if (data->isData)
+               /* How to insert the payload is tree-type specific */
+               if (data->flags & GIN_INSERT_ISDATA)
                {
                        Assert(GinPageIsData(page));
-
-                       if (data->isLeaf)
-                       {
-                               OffsetNumber i;
-                               ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
-
-                               Assert(GinPageIsLeaf(page));
-                               Assert(data->updateBlkno == InvalidBlockNumber);
-
-                               for (i = 0; i < data->nitem; i++)
-                                       GinDataPageAddItemPointer(page, &items[i], data->offset + i);
-                       }
-                       else
-                       {
-                               PostingItem *pitem;
-
-                               Assert(!GinPageIsLeaf(page));
-
-                               if (data->updateBlkno != InvalidBlockNumber)
-                               {
-                                       /* update link to right page after split */
-                                       pitem = GinDataPageGetPostingItem(page, data->offset);
-                                       PostingItemSetBlockNumber(pitem, data->updateBlkno);
-                               }
-
-                               pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
-
-                               GinDataPageAddPostingItem(page, pitem, data->offset);
-                       }
+                       ginRedoInsertData(buffer, data->offset, rightChildBlkno, payload);
                }
                else
                {
-                       IndexTuple      itup;
-
                        Assert(!GinPageIsData(page));
+                       ginRedoInsertEntry(buffer, data->offset, rightChildBlkno, payload);
+               }
 
-                       if (data->updateBlkno != InvalidBlockNumber)
-                       {
-                               /* update link to right page after split */
-                               Assert(!GinPageIsLeaf(page));
-                               Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page));
-                               itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, data->offset));
-                               GinSetDownlink(itup, data->updateBlkno);
-                       }
+               PageSetLSN(page, lsn);
+               MarkBufferDirty(buffer);
+       }
 
-                       if (data->isDelete)
-                       {
-                               Assert(GinPageIsLeaf(page));
-                               Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page));
-                               PageIndexTupleDelete(page, data->offset);
-                       }
+       UnlockReleaseBuffer(buffer);
+}
 
-                       itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
+static void
+ginRedoSplitEntry(Page lpage, Page rpage, void *rdata)
+{
+       ginxlogSplitEntry *data = (ginxlogSplitEntry *) rdata;
+       IndexTuple      itup = (IndexTuple) ((char *) rdata + sizeof(ginxlogSplitEntry));
+       OffsetNumber i;
 
-                       if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), data->offset, false, false) == InvalidOffsetNumber)
-                               elog(ERROR, "failed to add item to index page in %u/%u/%u",
-                                 data->node.spcNode, data->node.dbNode, data->node.relNode);
-               }
+       for (i = 0; i < data->separator; i++)
+       {
+               if (PageAddItem(lpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
+                       elog(ERROR, "failed to add item to gin index page");
+               itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
+       }
 
-               PageSetLSN(page, lsn);
+       for (i = data->separator; i < data->nitem; i++)
+       {
+               if (PageAddItem(rpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
+                       elog(ERROR, "failed to add item to gin index page");
+               itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
+       }
+}
 
-               MarkBufferDirty(buffer);
+static void
+ginRedoSplitData(Page lpage, Page rpage, void *rdata)
+{
+       ginxlogSplitData *data = (ginxlogSplitData *) rdata;
+       bool            isleaf = GinPageIsLeaf(lpage);
+       char       *ptr = (char *) rdata + sizeof(ginxlogSplitData);
+       OffsetNumber i;
+       ItemPointer bound;
+
+       if (isleaf)
+       {
+               ItemPointer items = (ItemPointer) ptr;
+               for (i = 0; i < data->separator; i++)
+                       GinDataPageAddItemPointer(lpage, &items[i], InvalidOffsetNumber);
+               for (i = data->separator; i < data->nitem; i++)
+                       GinDataPageAddItemPointer(rpage, &items[i], InvalidOffsetNumber);
+       }
+       else
+       {
+               PostingItem *items = (PostingItem *) ptr;
+               for (i = 0; i < data->separator; i++)
+                       GinDataPageAddPostingItem(lpage, &items[i], InvalidOffsetNumber);
+               for (i = data->separator; i < data->nitem; i++)
+                       GinDataPageAddPostingItem(rpage, &items[i], InvalidOffsetNumber);
        }
 
-       UnlockReleaseBuffer(buffer);
+       /* set up right key */
+       bound = GinDataPageGetRightBound(lpage);
+       if (isleaf)
+               *bound = *GinDataPageGetItemPointer(lpage, GinPageGetOpaque(lpage)->maxoff);
+       else
+               *bound = GinDataPageGetPostingItem(lpage, GinPageGetOpaque(lpage)->maxoff)->key;
+
+       bound = GinDataPageGetRightBound(rpage);
+       *bound = data->rightbound;
 }
 
 static void
@@ -255,14 +296,30 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
        Page            lpage,
                                rpage;
        uint32          flags = 0;
+       char       *payload;
+       bool            isLeaf = (data->flags & GIN_INSERT_ISLEAF) != 0;
+       bool            isData = (data->flags & GIN_INSERT_ISDATA) != 0;
+       bool            isRoot = (data->flags & GIN_SPLIT_ROOT) != 0;
+
+       payload = XLogRecGetData(record) + sizeof(ginxlogSplit);
+
+       /*
+        * First clear incomplete-split flag on child page if this finishes
+        * a split
+        */
+       if (!isLeaf)
+       {
+               if (record->xl_info & XLR_BKP_BLOCK(0))
+                       (void) RestoreBackupBlock(lsn, record, 0, false, false);
+               else
+                       ginRedoClearIncompleteSplit(lsn, data->node, data->leftChildBlkno);
+       }
 
-       if (data->isLeaf)
+       if (isLeaf)
                flags |= GIN_LEAF;
-       if (data->isData)
-               flags |= GIN_DATA;
 
-       /* Backup blocks are not used in split records */
-       Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+       if (isData)
+               flags |= GIN_DATA;
 
        lbuffer = XLogReadBuffer(data->node, data->lblkno, true);
        Assert(BufferIsValid(lbuffer));
@@ -275,64 +332,13 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
        GinInitBuffer(rbuffer, flags);
 
        GinPageGetOpaque(lpage)->rightlink = BufferGetBlockNumber(rbuffer);
-       GinPageGetOpaque(rpage)->rightlink = data->rrlink;
-
-       if (data->isData)
-       {
-               char       *ptr = XLogRecGetData(record) + sizeof(ginxlogSplit);
-               Size            sizeofitem = GinSizeOfDataPageItem(lpage);
-               OffsetNumber i;
-               ItemPointer bound;
-
-               for (i = 0; i < data->separator; i++)
-               {
-                       if (data->isLeaf)
-                               GinDataPageAddItemPointer(lpage, (ItemPointer) ptr, InvalidOffsetNumber);
-                       else
-                               GinDataPageAddPostingItem(lpage, (PostingItem *) ptr, InvalidOffsetNumber);
-                       ptr += sizeofitem;
-               }
-
-               for (i = data->separator; i < data->nitem; i++)
-               {
-                       if (data->isLeaf)
-                               GinDataPageAddItemPointer(rpage, (ItemPointer) ptr, InvalidOffsetNumber);
-                       else
-                               GinDataPageAddPostingItem(rpage, (PostingItem *) ptr, InvalidOffsetNumber);
-                       ptr += sizeofitem;
-               }
+       GinPageGetOpaque(rpage)->rightlink = isRoot ? InvalidBlockNumber : data->rrlink;
 
-               /* set up right key */
-               bound = GinDataPageGetRightBound(lpage);
-               if (data->isLeaf)
-                       *bound = *GinDataPageGetItemPointer(lpage, GinPageGetOpaque(lpage)->maxoff);
-               else
-                       *bound = GinDataPageGetPostingItem(lpage, GinPageGetOpaque(lpage)->maxoff)->key;
-
-               bound = GinDataPageGetRightBound(rpage);
-               *bound = data->rightbound;
-       }
+       /* Do the tree-type specific portion to restore the page contents */
+       if (isData)
+               ginRedoSplitData(lpage, rpage, payload);
        else
-       {
-               IndexTuple      itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogSplit));
-               OffsetNumber i;
-
-               for (i = 0; i < data->separator; i++)
-               {
-                       if (PageAddItem(lpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
-                               elog(ERROR, "failed to add item to index page in %u/%u/%u",
-                                 data->node.spcNode, data->node.dbNode, data->node.relNode);
-                       itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
-               }
-
-               for (i = data->separator; i < data->nitem; i++)
-               {
-                       if (PageAddItem(rpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
-                               elog(ERROR, "failed to add item to index page in %u/%u/%u",
-                                 data->node.spcNode, data->node.dbNode, data->node.relNode);
-                       itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
-               }
-       }
+               ginRedoSplitEntry(lpage, rpage, payload);
 
        PageSetLSN(rpage, lsn);
        MarkBufferDirty(rbuffer);
@@ -340,25 +346,31 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
        PageSetLSN(lpage, lsn);
        MarkBufferDirty(lbuffer);
 
-       if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
-               forgetIncompleteSplit(data->node, data->leftChildBlkno, data->updateBlkno);
-
-       if (data->isRootSplit)
+       if (isRoot)
        {
-               Buffer          rootBuf = XLogReadBuffer(data->node, data->rootBlkno, true);
+               BlockNumber     rootBlkno = data->rrlink;
+               Buffer          rootBuf = XLogReadBuffer(data->node, rootBlkno, true);
                Page            rootPage = BufferGetPage(rootBuf);
 
                GinInitBuffer(rootBuf, flags & ~GIN_LEAF);
 
-               if (data->isData)
+               if (isData)
                {
-                       Assert(data->rootBlkno != GIN_ROOT_BLKNO);
-                       ginDataFillRoot(NULL, rootBuf, lbuffer, rbuffer);
+                       Assert(rootBlkno != GIN_ROOT_BLKNO);
+                       ginDataFillRoot(NULL, BufferGetPage(rootBuf),
+                                                       BufferGetBlockNumber(lbuffer),
+                                                       BufferGetPage(lbuffer),
+                                                       BufferGetBlockNumber(rbuffer),
+                                                       BufferGetPage(rbuffer));
                }
                else
                {
-                       Assert(data->rootBlkno == GIN_ROOT_BLKNO);
-                       ginEntryFillRoot(NULL, rootBuf, lbuffer, rbuffer);
+                       Assert(rootBlkno == GIN_ROOT_BLKNO);
+                       ginEntryFillRoot(NULL, BufferGetPage(rootBuf),
+                                                        BufferGetBlockNumber(lbuffer),
+                                                        BufferGetPage(lbuffer),
+                                                        BufferGetBlockNumber(rbuffer),
+                                                        BufferGetPage(rbuffer));
                }
 
                PageSetLSN(rootPage, lsn);
@@ -366,8 +378,6 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
                MarkBufferDirty(rootBuf);
                UnlockReleaseBuffer(rootBuf);
        }
-       else
-               pushIncompleteSplit(data->node, data->lblkno, data->rblkno, data->rootBlkno);
 
        UnlockReleaseBuffer(rbuffer);
        UnlockReleaseBuffer(lbuffer);
@@ -711,6 +721,7 @@ void
 gin_redo(XLogRecPtr lsn, XLogRecord *record)
 {
        uint8           info = record->xl_info & ~XLR_INFO_MASK;
+       MemoryContext oldCtx;
 
        /*
         * GIN indexes do not require any conflict processing. NB: If we ever
@@ -718,7 +729,7 @@ gin_redo(XLogRecPtr lsn, XLogRecord *record)
         * killed tuples outside VACUUM, we'll need to handle that here.
         */
 
-       topCtx = MemoryContextSwitchTo(opCtx);
+       oldCtx = MemoryContextSwitchTo(opCtx);
        switch (info)
        {
                case XLOG_GIN_CREATE_INDEX:
@@ -751,15 +762,13 @@ gin_redo(XLogRecPtr lsn, XLogRecord *record)
                default:
                        elog(PANIC, "gin_redo: unknown op code %u", info);
        }
-       MemoryContextSwitchTo(topCtx);
+       MemoryContextSwitchTo(oldCtx);
        MemoryContextReset(opCtx);
 }
 
 void
 gin_xlog_startup(void)
 {
-       incomplete_splits = NIL;
-
        opCtx = AllocSetContextCreate(CurrentMemoryContext,
                                                                  "GIN recovery temporary context",
                                                                  ALLOCSET_DEFAULT_MINSIZE,
@@ -767,84 +776,8 @@ gin_xlog_startup(void)
                                                                  ALLOCSET_DEFAULT_MAXSIZE);
 }
 
-static void
-ginContinueSplit(ginIncompleteSplit *split)
-{
-       GinBtreeData btree;
-       GinState        ginstate;
-       Relation        reln;
-       Buffer          buffer;
-       GinBtreeStack *stack;
-
-       /*
-        * elog(NOTICE,"ginContinueSplit root:%u l:%u r:%u",  split->rootBlkno,
-        * split->leftBlkno, split->rightBlkno);
-        */
-       buffer = XLogReadBuffer(split->node, split->leftBlkno, false);
-
-       /*
-        * Failure should be impossible here, because we wrote the page earlier.
-        */
-       if (!BufferIsValid(buffer))
-               elog(PANIC, "ginContinueSplit: left block %u not found",
-                        split->leftBlkno);
-
-       reln = CreateFakeRelcacheEntry(split->node);
-
-       if (split->rootBlkno == GIN_ROOT_BLKNO)
-       {
-               MemSet(&ginstate, 0, sizeof(ginstate));
-               ginstate.index = reln;
-
-               ginPrepareEntryScan(&btree,
-                                                       InvalidOffsetNumber, (Datum) 0, GIN_CAT_NULL_KEY,
-                                                       &ginstate);
-       }
-       else
-       {
-               ginPrepareDataScan(&btree, reln, split->rootBlkno);
-       }
-
-       stack = palloc(sizeof(GinBtreeStack));
-       stack->blkno = split->leftBlkno;
-       stack->buffer = buffer;
-       stack->off = InvalidOffsetNumber;
-       stack->parent = NULL;
-
-       ginFindParents(&btree, stack);
-       LockBuffer(stack->parent->buffer, GIN_UNLOCK);
-       ginFinishSplit(&btree, stack, NULL);
-
-       /* buffer is released by ginFinishSplit */
-
-       FreeFakeRelcacheEntry(reln);
-}
-
 void
 gin_xlog_cleanup(void)
 {
-       ListCell   *l;
-       MemoryContext topCtx;
-
-       topCtx = MemoryContextSwitchTo(opCtx);
-
-       foreach(l, incomplete_splits)
-       {
-               ginIncompleteSplit *split = (ginIncompleteSplit *) lfirst(l);
-
-               ginContinueSplit(split);
-               MemoryContextReset(opCtx);
-       }
-
-       MemoryContextSwitchTo(topCtx);
        MemoryContextDelete(opCtx);
-       incomplete_splits = NIL;
-}
-
-bool
-gin_safe_restartpoint(void)
-{
-       if (incomplete_splits)
-               return false;
-       return true;
 }
index 391f75fa4fb38a4cda7d413af2f20496fdf0bc94..72d60bf8d32f9ea3930ac099046db00df3a731ad 100644 (file)
@@ -41,20 +41,45 @@ gin_desc(StringInfo buf, uint8 xl_info, char *rec)
                        desc_node(buf, ((ginxlogCreatePostingTree *) rec)->node, ((ginxlogCreatePostingTree *) rec)->blkno);
                        break;
                case XLOG_GIN_INSERT:
-                       appendStringInfoString(buf, "Insert item, ");
-                       desc_node(buf, ((ginxlogInsert *) rec)->node, ((ginxlogInsert *) rec)->blkno);
-                       appendStringInfo(buf, " offset: %u nitem: %u isdata: %c isleaf %c isdelete %c updateBlkno:%u",
-                                                        ((ginxlogInsert *) rec)->offset,
-                                                        ((ginxlogInsert *) rec)->nitem,
-                                                        (((ginxlogInsert *) rec)->isData) ? 'T' : 'F',
-                                                        (((ginxlogInsert *) rec)->isLeaf) ? 'T' : 'F',
-                                                        (((ginxlogInsert *) rec)->isDelete) ? 'T' : 'F',
-                                                        ((ginxlogInsert *) rec)->updateBlkno);
+                       {
+                               ginxlogInsert *xlrec = (ginxlogInsert *) rec;
+                               char    *payload = rec + sizeof(ginxlogInsert);
+
+                               appendStringInfoString(buf, "Insert item, ");
+                               desc_node(buf, xlrec->node, xlrec->blkno);
+                               appendStringInfo(buf, " offset: %u isdata: %c isleaf: %c",
+                                                                xlrec->offset,
+                                                                (xlrec->flags & GIN_INSERT_ISDATA) ? 'T' : 'F',
+                                                                (xlrec->flags & GIN_INSERT_ISLEAF) ? 'T' : 'F');
+                               if (!(xlrec->flags & GIN_INSERT_ISLEAF))
+                               {
+                                       BlockNumber leftChildBlkno;
+                                       BlockNumber rightChildBlkno;
+
+                                       memcpy(&leftChildBlkno, payload, sizeof(BlockNumber));
+                                       payload += sizeof(BlockNumber);
+                                       memcpy(&rightChildBlkno, payload, sizeof(BlockNumber));
+                                       payload += sizeof(BlockNumber);
+                                       appendStringInfo(buf, " children: %u/%u",
+                                                                        leftChildBlkno, rightChildBlkno);
+                               }
+                               if (!(xlrec->flags & GIN_INSERT_ISDATA))
+                                       appendStringInfo(buf, " isdelete: %c",
+                                                                        (((ginxlogInsertEntry *) payload)->isDelete) ? 'T' : 'F');
+                               else if (xlrec->flags & GIN_INSERT_ISLEAF)
+                                       appendStringInfo(buf, " nitem: %u",
+                                                                        (((ginxlogInsertDataLeaf *) payload)->nitem) ? 'T' : 'F');
+                               else
+                                       appendStringInfo(buf, " pitem: %u-%u/%u",
+                                                                        PostingItemGetBlockNumber((PostingItem *) payload),
+                                                                        ItemPointerGetBlockNumber(&((PostingItem *) payload)->key),
+                                                                        ItemPointerGetOffsetNumber(&((PostingItem *) payload)->key));
+                       }
                        break;
                case XLOG_GIN_SPLIT:
                        appendStringInfoString(buf, "Page split, ");
                        desc_node(buf, ((ginxlogSplit *) rec)->node, ((ginxlogSplit *) rec)->lblkno);
-                       appendStringInfo(buf, " isrootsplit: %c", (((ginxlogSplit *) rec)->isRootSplit) ? 'T' : 'F');
+                       appendStringInfo(buf, " isrootsplit: %c", (((ginxlogSplit *) rec)->flags & GIN_SPLIT_ROOT) ? 'T' : 'F');
                        break;
                case XLOG_GIN_VACUUM_PAGE:
                        appendStringInfoString(buf, "Vacuum page, ");
index b6cb48da2878d77efcc3d8c43983c96588135600..7dcb0e0f20ba09432221466d2eb4d8cb0375a90a 100644 (file)
@@ -58,6 +58,5 @@ extern void gin_redo(XLogRecPtr lsn, XLogRecord *record);
 extern void gin_desc(StringInfo buf, uint8 xl_info, char *rec);
 extern void gin_xlog_startup(void);
 extern void gin_xlog_cleanup(void);
-extern bool gin_safe_restartpoint(void);
 
 #endif   /* GIN_H */
index bd407fe342a7177b9be00dcf0e072526ca40956a..714c8ca9841c5e96b506cf0bfbf981e9e27e2c67 100644 (file)
@@ -48,6 +48,7 @@ typedef GinPageOpaqueData *GinPageOpaque;
 #define GIN_META                 (1 << 3)
 #define GIN_LIST                 (1 << 4)
 #define GIN_LIST_FULLROW  (1 << 5)             /* makes sense only on GIN_LIST page */
+#define GIN_INCOMPLETE_SPLIT (1 << 6)  /* page was split, but parent not updated */
 
 /* Page numbers of fixed-location pages */
 #define GIN_METAPAGE_BLKNO     (0)
@@ -119,6 +120,7 @@ typedef struct GinMetaPageData
 #define GinPageIsDeleted(page) ( GinPageGetOpaque(page)->flags & GIN_DELETED)
 #define GinPageSetDeleted(page)    ( GinPageGetOpaque(page)->flags |= GIN_DELETED)
 #define GinPageSetNonDeleted(page) ( GinPageGetOpaque(page)->flags &= ~GIN_DELETED)
+#define GinPageIsIncompleteSplit(page) ( GinPageGetOpaque(page)->flags & GIN_INCOMPLETE_SPLIT)
 
 #define GinPageRightMost(page) ( GinPageGetOpaque(page)->rightlink == InvalidBlockNumber)
 
@@ -336,41 +338,77 @@ typedef struct ginxlogInsert
 {
        RelFileNode node;
        BlockNumber blkno;
-       BlockNumber updateBlkno;
+       uint16          flags;          /* GIN_SPLIT_ISLEAF and/or GIN_SPLIT_ISDATA */
        OffsetNumber offset;
-       bool            isDelete;
-       bool            isData;
-       bool            isLeaf;
-       OffsetNumber nitem;
 
        /*
-        * follows: tuples or ItemPointerData or PostingItem or list of
-        * ItemPointerData
+        * FOLLOWS:
+        *
+        * 1. if not leaf page, block numbers of the left and right child pages
+        * whose split this insertion finishes. As BlockIdData[2] (beware of adding
+        * fields before this that would make them not 16-bit aligned)
+        *
+        * 2. one of the following structs, depending on tree type.
+        *
+        * NB: the below structs are only 16-bit aligned when appended to a
+        * ginxlogInsert struct! Beware of adding fields to them that require
+        * stricter alignment.
         */
 } ginxlogInsert;
 
+typedef struct
+{
+       bool            isDelete;
+       IndexTupleData tuple;   /* variable length */
+} ginxlogInsertEntry;
+
+typedef struct
+{
+       OffsetNumber nitem;
+       ItemPointerData items[1]; /* variable length */
+} ginxlogInsertDataLeaf;
+
+/* In an insert to an internal data page, the payload is a PostingItem */
+
+
 #define XLOG_GIN_SPLIT 0x30
 
 typedef struct ginxlogSplit
 {
        RelFileNode node;
        BlockNumber lblkno;
-       BlockNumber rootBlkno;
        BlockNumber rblkno;
-       BlockNumber rrlink;
+       BlockNumber rrlink;                             /* right link, or root's blocknumber if root split */
+       BlockNumber     leftChildBlkno;         /* valid on a non-leaf split */
+       BlockNumber     rightChildBlkno;
+       uint16          flags;
+
+       /* follows: one of the following structs */
+} ginxlogSplit;
+
+/*
+ * Flags used in ginxlogInsert and ginxlogSplit records
+ */
+#define GIN_INSERT_ISDATA      0x01    /* for both insert and split records */
+#define GIN_INSERT_ISLEAF      0x02    /* .. */
+#define GIN_SPLIT_ROOT         0x04    /* only for split records */
+
+typedef struct
+{
        OffsetNumber separator;
        OffsetNumber nitem;
 
-       bool            isData;
-       bool            isLeaf;
-       bool            isRootSplit;
+       /* FOLLOWS: IndexTuples */
+} ginxlogSplitEntry;
 
-       BlockNumber leftChildBlkno;
-       BlockNumber updateBlkno;
+typedef struct
+{
+       OffsetNumber separator;
+       OffsetNumber nitem;
+       ItemPointerData rightbound;
 
-       ItemPointerData rightbound; /* used only in posting tree */
-       /* follows: list of tuple or ItemPointerData or PostingItem */
-} ginxlogSplit;
+       /* FOLLOWS: array of ItemPointers (for leaf) or PostingItems (non-leaf) */
+} ginxlogSplitData;
 
 #define XLOG_GIN_VACUUM_PAGE   0x40
 
@@ -488,7 +526,7 @@ typedef struct GinBtreeData
        bool            (*placeToPage) (GinBtree, Buffer, OffsetNumber, void *, BlockNumber, XLogRecData **);
        Page            (*splitPage) (GinBtree, Buffer, Buffer, OffsetNumber, void *, BlockNumber, XLogRecData **);
        void       *(*prepareDownlink) (GinBtree, Buffer);
-       void            (*fillRoot) (GinBtree, Buffer, Buffer, Buffer);
+       void            (*fillRoot) (GinBtree, Page, BlockNumber, Page, BlockNumber, Page);
 
        bool            isData;
 
@@ -535,9 +573,6 @@ extern Buffer ginStepRight(Buffer buffer, Relation index, int lockmode);
 extern void freeGinBtreeStack(GinBtreeStack *stack);
 extern void ginInsertValue(GinBtree btree, GinBtreeStack *stack,
                           void *insertdata, GinStatsData *buildStats);
-extern void ginFindParents(GinBtree btree, GinBtreeStack *stack);
-extern void ginFinishSplit(GinBtree btree, GinBtreeStack *stack,
-                          GinStatsData *buildStats);
 
 /* ginentrypage.c */
 extern IndexTuple GinFormTuple(GinState *ginstate,
@@ -547,7 +582,7 @@ extern void GinShortenTuple(IndexTuple itup, uint32 nipd);
 extern void ginPrepareEntryScan(GinBtree btree, OffsetNumber attnum,
                                        Datum key, GinNullCategory category,
                                        GinState *ginstate);
-extern void ginEntryFillRoot(GinBtree btree, Buffer root, Buffer lbuf, Buffer rbuf);
+extern void ginEntryFillRoot(GinBtree btree, Page root, BlockNumber lblkno, Page lpage, BlockNumber rblkno, Page rpage);
 
 /* gindatapage.c */
 extern BlockNumber createPostingTree(Relation index,
@@ -560,7 +595,7 @@ extern void ginInsertItemPointers(Relation index, BlockNumber rootBlkno,
                                          ItemPointerData *items, uint32 nitem,
                                          GinStatsData *buildStats);
 extern GinBtreeStack *ginScanBeginPostingTree(Relation index, BlockNumber rootBlkno);
-extern void ginDataFillRoot(GinBtree btree, Buffer root, Buffer lbuf, Buffer rbuf);
+extern void ginDataFillRoot(GinBtree btree, Page root, BlockNumber lblkno, Page lpage, BlockNumber rblkno, Page rpage);
 extern void ginPrepareDataScan(GinBtree btree, Relation index, BlockNumber rootBlkno);
 
 /* ginscan.c */
index 7ad71b32e2c7fdd2391eea8a768ce943304474b2..166689db10209a0a913f5583976306801d08056b 100644 (file)
@@ -38,7 +38,7 @@ PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, NULL, NULL, NULL)
 PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, NULL, NULL, NULL)
 PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup, btree_safe_restartpoint)
 PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, NULL, NULL, NULL)
-PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup, gin_safe_restartpoint)
+PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup, NULL)
 PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup, NULL)
 PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, NULL, NULL, NULL)
 PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_xlog_startup, spg_xlog_cleanup, NULL)
index c3e173106fc032479d103f94c6bf76df5b3be109..b6320eee3cf90f7abdbb35209df704b4506b06f8 100644 (file)
@@ -55,7 +55,7 @@ typedef struct BkpBlock
 /*
  * Each page of XLOG file has a header like this:
  */
-#define XLOG_PAGE_MAGIC 0xD076 /* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD077 /* can be used as WAL version indicator */
 
 typedef struct XLogPageHeaderData
 {