]> granicus.if.org Git - postgresql/commitdiff
More GIN refactoring.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 20 Nov 2013 15:00:53 +0000 (17:00 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 20 Nov 2013 15:01:33 +0000 (17:01 +0200)
Split off the portion of ginInsertValue that inserts the tuple to current
level into a separate function, ginPlaceToPage. ginInsertValue's charter
is now to recurse up the tree to insert the downlink, when a page split is
required.

This is in preparation for a patch to change the way incomplete splits are
handled, which will need to do these operations separately. And IMHO makes
the code more readable anyway.

src/backend/access/gin/ginbtree.c

index c4801d5cd8937fbb3977733ab756427e5b5fe847..7248b06326fdb279d7cf7ce28369e472c73a17d7 100644 (file)
@@ -280,80 +280,115 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack,
 }
 
 /*
- * Insert value (stored in GinBtree) to tree described by stack
- *
- * During an index build, buildStats is non-null and the counters
- * it contains are incremented as needed.
+ * Returns true if the insertion is done, false if the page was split and
+ * downlink insertion is pending.
  *
- * NB: the passed-in stack is freed, as though by freeGinBtreeStack.
+ * stack->buffer is locked on entry, and is kept locked.
  */
-void
-ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
+static bool
+ginPlaceToPage(GinBtree btree, BlockNumber rootBlkno, GinBtreeStack *stack,
+                          GinStatsData *buildStats)
 {
-       GinBtreeStack *parent;
-       BlockNumber rootBlkno;
-       Page            page,
-                               rpage,
-                               lpage;
+       Page            page = BufferGetPage(stack->buffer);
+       XLogRecData *rdata;
+       bool            fit;
 
-       /* extract root BlockNumber from stack */
-       Assert(stack != NULL);
-       parent = stack;
-       while (parent->parent)
-               parent = parent->parent;
-       rootBlkno = parent->blkno;
-       Assert(BlockNumberIsValid(rootBlkno));
+       START_CRIT_SECTION();
+       fit = btree->placeToPage(btree, stack->buffer, stack->off, &rdata);
+       if (fit)
+       {
+               MarkBufferDirty(stack->buffer);
 
-       /* this loop crawls up the stack until the insertion is complete */
-       for (;;)
+               if (RelationNeedsWAL(btree->index))
+               {
+                       XLogRecPtr      recptr;
+
+                       recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT, rdata);
+                       PageSetLSN(page, recptr);
+               }
+
+               END_CRIT_SECTION();
+
+               return true;
+       }
+       else
        {
-               XLogRecData *rdata;
+               /* Didn't fit, have to split */
+               Buffer          rbuffer;
+               Page            newlpage;
                BlockNumber savedRightLink;
-               bool            fit;
+               GinBtreeStack *parent;
+               Page            lpage,
+                                       rpage;
+
+               END_CRIT_SECTION();
+
+               rbuffer = GinNewBuffer(btree->index);
 
-               page = BufferGetPage(stack->buffer);
                savedRightLink = GinPageGetOpaque(page)->rightlink;
 
-               START_CRIT_SECTION();
-               fit = btree->placeToPage(btree, stack->buffer, stack->off, &rdata);
-               if (fit)
+               /*
+                * newlpage is a pointer to memory page, it is not associated with
+                * a buffer. stack->buffer is not touched yet.
+                */
+               newlpage = btree->splitPage(btree, stack->buffer, rbuffer, stack->off, &rdata);
+
+               ((ginxlogSplit *) (rdata->data))->rootBlkno = rootBlkno;
+
+               /* During index build, count the newly-split page */
+               if (buildStats)
                {
+                       if (btree->isData)
+                               buildStats->nDataPages++;
+                       else
+                               buildStats->nEntryPages++;
+               }
+
+               parent = stack->parent;
+
+               if (parent == NULL)
+               {
+                       /*
+                        * split root, so we need to allocate new left page and place
+                        * pointer on root to left and right page
+                        */
+                       Buffer          lbuffer = GinNewBuffer(btree->index);
+
+                       ((ginxlogSplit *) (rdata->data))->isRootSplit = TRUE;
+                       ((ginxlogSplit *) (rdata->data))->rrlink = InvalidBlockNumber;
+
+                       lpage = BufferGetPage(lbuffer);
+                       rpage = BufferGetPage(rbuffer);
+
+                       GinPageGetOpaque(rpage)->rightlink = InvalidBlockNumber;
+                       GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
+                       ((ginxlogSplit *) (rdata->data))->lblkno = BufferGetBlockNumber(lbuffer);
+
+                       START_CRIT_SECTION();
+
+                       GinInitBuffer(stack->buffer, GinPageGetOpaque(newlpage)->flags & ~GIN_LEAF);
+                       PageRestoreTempPage(newlpage, lpage);
+                       btree->fillRoot(btree, stack->buffer, lbuffer, rbuffer);
+
+                       MarkBufferDirty(rbuffer);
+                       MarkBufferDirty(lbuffer);
                        MarkBufferDirty(stack->buffer);
 
                        if (RelationNeedsWAL(btree->index))
                        {
                                XLogRecPtr      recptr;
 
-                               recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT, rdata);
+                               recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
                                PageSetLSN(page, recptr);
+                               PageSetLSN(lpage, recptr);
+                               PageSetLSN(rpage, recptr);
                        }
 
-                       LockBuffer(stack->buffer, GIN_UNLOCK);
-                       END_CRIT_SECTION();
-
-                       freeGinBtreeStack(stack);
-
-                       return;
-               }
-               else
-               {
-                       /* Didn't fit, have to split */
-                       Buffer          rbuffer;
-                       Page            newlpage;
-
+                       UnlockReleaseBuffer(rbuffer);
+                       UnlockReleaseBuffer(lbuffer);
                        END_CRIT_SECTION();
 
-                       rbuffer = GinNewBuffer(btree->index);
-
-                       /*
-                        * newlpage is a pointer to memory page, it is not associated with
-                        * a buffer. stack->buffer is not touched yet.
-                        */
-                       newlpage = btree->splitPage(btree, stack->buffer, rbuffer, stack->off, &rdata);
-
-                       ((ginxlogSplit *) (rdata->data))->rootBlkno = rootBlkno;
-
-                       /* During index build, count the newly-split page */
+                       /* During index build, count the newly-added root page */
                        if (buildStats)
                        {
                                if (btree->isData)
@@ -362,98 +397,83 @@ ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
                                        buildStats->nEntryPages++;
                        }
 
-                       parent = stack->parent;
-
-                       if (parent == NULL)
-                       {
-                               /*
-                                * split root, so we need to allocate new left page and place
-                                * pointer on root to left and right page
-                                */
-                               Buffer          lbuffer = GinNewBuffer(btree->index);
-
-                               ((ginxlogSplit *) (rdata->data))->isRootSplit = TRUE;
-                               ((ginxlogSplit *) (rdata->data))->rrlink = InvalidBlockNumber;
-
-                               page = BufferGetPage(stack->buffer);
-                               lpage = BufferGetPage(lbuffer);
-                               rpage = BufferGetPage(rbuffer);
-
-                               GinPageGetOpaque(rpage)->rightlink = InvalidBlockNumber;
-                               GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
-                               ((ginxlogSplit *) (rdata->data))->lblkno = BufferGetBlockNumber(lbuffer);
-
-                               START_CRIT_SECTION();
-
-                               GinInitBuffer(stack->buffer, GinPageGetOpaque(newlpage)->flags & ~GIN_LEAF);
-                               PageRestoreTempPage(newlpage, lpage);
-                               btree->fillRoot(btree, stack->buffer, lbuffer, rbuffer);
-
-                               MarkBufferDirty(rbuffer);
-                               MarkBufferDirty(lbuffer);
-                               MarkBufferDirty(stack->buffer);
+                       return true;
+               }
+               else
+               {
+                       /* split non-root page */
+                       ((ginxlogSplit *) (rdata->data))->isRootSplit = FALSE;
+                       ((ginxlogSplit *) (rdata->data))->rrlink = savedRightLink;
 
-                               if (RelationNeedsWAL(btree->index))
-                               {
-                                       XLogRecPtr      recptr;
+                       lpage = BufferGetPage(stack->buffer);
+                       rpage = BufferGetPage(rbuffer);
 
-                                       recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
-                                       PageSetLSN(page, recptr);
-                                       PageSetLSN(lpage, recptr);
-                                       PageSetLSN(rpage, recptr);
-                               }
+                       GinPageGetOpaque(rpage)->rightlink = savedRightLink;
+                       GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
 
-                               UnlockReleaseBuffer(rbuffer);
-                               UnlockReleaseBuffer(lbuffer);
-                               LockBuffer(stack->buffer, GIN_UNLOCK);
-                               END_CRIT_SECTION();
+                       START_CRIT_SECTION();
+                       PageRestoreTempPage(newlpage, lpage);
 
-                               freeGinBtreeStack(stack);
+                       MarkBufferDirty(rbuffer);
+                       MarkBufferDirty(stack->buffer);
 
-                               /* During index build, count the newly-added root page */
-                               if (buildStats)
-                               {
-                                       if (btree->isData)
-                                               buildStats->nDataPages++;
-                                       else
-                                               buildStats->nEntryPages++;
-                               }
+                       if (RelationNeedsWAL(btree->index))
+                       {
+                               XLogRecPtr      recptr;
 
-                               return;
+                               recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
+                               PageSetLSN(lpage, recptr);
+                               PageSetLSN(rpage, recptr);
                        }
-                       else
-                       {
-                               /* split non-root page */
-                               ((ginxlogSplit *) (rdata->data))->isRootSplit = FALSE;
-                               ((ginxlogSplit *) (rdata->data))->rrlink = savedRightLink;
+                       UnlockReleaseBuffer(rbuffer);
+                       END_CRIT_SECTION();
 
-                               lpage = BufferGetPage(stack->buffer);
-                               rpage = BufferGetPage(rbuffer);
+                       return false;
+               }
+       }
+}
 
-                               GinPageGetOpaque(rpage)->rightlink = savedRightLink;
-                               GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
+/*
+ * Insert value (stored in GinBtree) to tree described by stack
+ *
+ * During an index build, buildStats is non-null and the counters
+ * it contains are incremented as needed.
+ *
+ * NB: the passed-in stack is freed, as though by freeGinBtreeStack.
+ */
+void
+ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
+{
+       GinBtreeStack *parent;
+       BlockNumber rootBlkno;
+       Page            page;
+
+       /* extract root BlockNumber from stack */
+       Assert(stack != NULL);
+       parent = stack;
+       while (parent->parent)
+               parent = parent->parent;
+       rootBlkno = parent->blkno;
+       Assert(BlockNumberIsValid(rootBlkno));
 
-                               START_CRIT_SECTION();
-                               PageRestoreTempPage(newlpage, lpage);
+       /* this loop crawls up the stack until the insertion is complete */
+       for (;;)
+       {
+               bool done;
 
-                               MarkBufferDirty(rbuffer);
-                               MarkBufferDirty(stack->buffer);
+               done = ginPlaceToPage(btree, rootBlkno, stack, buildStats);
 
-                               if (RelationNeedsWAL(btree->index))
-                               {
-                                       XLogRecPtr      recptr;
+               /* just to be extra sure we don't delete anything by accident... */
+               btree->isDelete = FALSE;
 
-                                       recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
-                                       PageSetLSN(lpage, recptr);
-                                       PageSetLSN(rpage, recptr);
-                               }
-                               UnlockReleaseBuffer(rbuffer);
-                               END_CRIT_SECTION();
-                       }
+               if (done)
+               {
+                       LockBuffer(stack->buffer, GIN_UNLOCK);
+                       freeGinBtreeStack(stack);
+                       break;
                }
 
                btree->prepareDownlink(btree, stack->buffer);
-               btree->isDelete = FALSE;
 
                /* search parent to lock */
                LockBuffer(parent->buffer, GIN_EXCLUSIVE);