From 04eee1fa9ee80dabf7cf4b8b9106897272e9b291 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Wed, 20 Nov 2013 17:00:53 +0200 Subject: [PATCH] More GIN refactoring. Split off the portion of ginInsertValue that inserts the tuple to current level into a separate function, ginPlaceToPage. ginInsertValue's charter is now to recurse up the tree to insert the downlink, when a page split is required. This is in preparation for a patch to change the way incomplete splits are handled, which will need to do these operations separately. And IMHO makes the code more readable anyway. --- src/backend/access/gin/ginbtree.c | 276 ++++++++++++++++-------------- 1 file changed, 148 insertions(+), 128 deletions(-) diff --git a/src/backend/access/gin/ginbtree.c b/src/backend/access/gin/ginbtree.c index c4801d5cd8..7248b06326 100644 --- a/src/backend/access/gin/ginbtree.c +++ b/src/backend/access/gin/ginbtree.c @@ -280,80 +280,115 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack, } /* - * Insert value (stored in GinBtree) to tree described by stack - * - * During an index build, buildStats is non-null and the counters - * it contains are incremented as needed. + * Returns true if the insertion is done, false if the page was split and + * downlink insertion is pending. * - * NB: the passed-in stack is freed, as though by freeGinBtreeStack. + * stack->buffer is locked on entry, and is kept locked. */ -void -ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats) +static bool +ginPlaceToPage(GinBtree btree, BlockNumber rootBlkno, GinBtreeStack *stack, + GinStatsData *buildStats) { - GinBtreeStack *parent; - BlockNumber rootBlkno; - Page page, - rpage, - lpage; + Page page = BufferGetPage(stack->buffer); + XLogRecData *rdata; + bool fit; - /* extract root BlockNumber from stack */ - Assert(stack != NULL); - parent = stack; - while (parent->parent) - parent = parent->parent; - rootBlkno = parent->blkno; - Assert(BlockNumberIsValid(rootBlkno)); + START_CRIT_SECTION(); + fit = btree->placeToPage(btree, stack->buffer, stack->off, &rdata); + if (fit) + { + MarkBufferDirty(stack->buffer); - /* this loop crawls up the stack until the insertion is complete */ - for (;;) + if (RelationNeedsWAL(btree->index)) + { + XLogRecPtr recptr; + + recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT, rdata); + PageSetLSN(page, recptr); + } + + END_CRIT_SECTION(); + + return true; + } + else { - XLogRecData *rdata; + /* Didn't fit, have to split */ + Buffer rbuffer; + Page newlpage; BlockNumber savedRightLink; - bool fit; + GinBtreeStack *parent; + Page lpage, + rpage; + + END_CRIT_SECTION(); + + rbuffer = GinNewBuffer(btree->index); - page = BufferGetPage(stack->buffer); savedRightLink = GinPageGetOpaque(page)->rightlink; - START_CRIT_SECTION(); - fit = btree->placeToPage(btree, stack->buffer, stack->off, &rdata); - if (fit) + /* + * newlpage is a pointer to memory page, it is not associated with + * a buffer. stack->buffer is not touched yet. + */ + newlpage = btree->splitPage(btree, stack->buffer, rbuffer, stack->off, &rdata); + + ((ginxlogSplit *) (rdata->data))->rootBlkno = rootBlkno; + + /* During index build, count the newly-split page */ + if (buildStats) { + if (btree->isData) + buildStats->nDataPages++; + else + buildStats->nEntryPages++; + } + + parent = stack->parent; + + if (parent == NULL) + { + /* + * split root, so we need to allocate new left page and place + * pointer on root to left and right page + */ + Buffer lbuffer = GinNewBuffer(btree->index); + + ((ginxlogSplit *) (rdata->data))->isRootSplit = TRUE; + ((ginxlogSplit *) (rdata->data))->rrlink = InvalidBlockNumber; + + lpage = BufferGetPage(lbuffer); + rpage = BufferGetPage(rbuffer); + + GinPageGetOpaque(rpage)->rightlink = InvalidBlockNumber; + GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer); + ((ginxlogSplit *) (rdata->data))->lblkno = BufferGetBlockNumber(lbuffer); + + START_CRIT_SECTION(); + + GinInitBuffer(stack->buffer, GinPageGetOpaque(newlpage)->flags & ~GIN_LEAF); + PageRestoreTempPage(newlpage, lpage); + btree->fillRoot(btree, stack->buffer, lbuffer, rbuffer); + + MarkBufferDirty(rbuffer); + MarkBufferDirty(lbuffer); MarkBufferDirty(stack->buffer); if (RelationNeedsWAL(btree->index)) { XLogRecPtr recptr; - recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT, rdata); + recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata); PageSetLSN(page, recptr); + PageSetLSN(lpage, recptr); + PageSetLSN(rpage, recptr); } - LockBuffer(stack->buffer, GIN_UNLOCK); - END_CRIT_SECTION(); - - freeGinBtreeStack(stack); - - return; - } - else - { - /* Didn't fit, have to split */ - Buffer rbuffer; - Page newlpage; - + UnlockReleaseBuffer(rbuffer); + UnlockReleaseBuffer(lbuffer); END_CRIT_SECTION(); - rbuffer = GinNewBuffer(btree->index); - - /* - * newlpage is a pointer to memory page, it is not associated with - * a buffer. stack->buffer is not touched yet. - */ - newlpage = btree->splitPage(btree, stack->buffer, rbuffer, stack->off, &rdata); - - ((ginxlogSplit *) (rdata->data))->rootBlkno = rootBlkno; - - /* During index build, count the newly-split page */ + /* During index build, count the newly-added root page */ if (buildStats) { if (btree->isData) @@ -362,98 +397,83 @@ ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats) buildStats->nEntryPages++; } - parent = stack->parent; - - if (parent == NULL) - { - /* - * split root, so we need to allocate new left page and place - * pointer on root to left and right page - */ - Buffer lbuffer = GinNewBuffer(btree->index); - - ((ginxlogSplit *) (rdata->data))->isRootSplit = TRUE; - ((ginxlogSplit *) (rdata->data))->rrlink = InvalidBlockNumber; - - page = BufferGetPage(stack->buffer); - lpage = BufferGetPage(lbuffer); - rpage = BufferGetPage(rbuffer); - - GinPageGetOpaque(rpage)->rightlink = InvalidBlockNumber; - GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer); - ((ginxlogSplit *) (rdata->data))->lblkno = BufferGetBlockNumber(lbuffer); - - START_CRIT_SECTION(); - - GinInitBuffer(stack->buffer, GinPageGetOpaque(newlpage)->flags & ~GIN_LEAF); - PageRestoreTempPage(newlpage, lpage); - btree->fillRoot(btree, stack->buffer, lbuffer, rbuffer); - - MarkBufferDirty(rbuffer); - MarkBufferDirty(lbuffer); - MarkBufferDirty(stack->buffer); + return true; + } + else + { + /* split non-root page */ + ((ginxlogSplit *) (rdata->data))->isRootSplit = FALSE; + ((ginxlogSplit *) (rdata->data))->rrlink = savedRightLink; - if (RelationNeedsWAL(btree->index)) - { - XLogRecPtr recptr; + lpage = BufferGetPage(stack->buffer); + rpage = BufferGetPage(rbuffer); - recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata); - PageSetLSN(page, recptr); - PageSetLSN(lpage, recptr); - PageSetLSN(rpage, recptr); - } + GinPageGetOpaque(rpage)->rightlink = savedRightLink; + GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer); - UnlockReleaseBuffer(rbuffer); - UnlockReleaseBuffer(lbuffer); - LockBuffer(stack->buffer, GIN_UNLOCK); - END_CRIT_SECTION(); + START_CRIT_SECTION(); + PageRestoreTempPage(newlpage, lpage); - freeGinBtreeStack(stack); + MarkBufferDirty(rbuffer); + MarkBufferDirty(stack->buffer); - /* During index build, count the newly-added root page */ - if (buildStats) - { - if (btree->isData) - buildStats->nDataPages++; - else - buildStats->nEntryPages++; - } + if (RelationNeedsWAL(btree->index)) + { + XLogRecPtr recptr; - return; + recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata); + PageSetLSN(lpage, recptr); + PageSetLSN(rpage, recptr); } - else - { - /* split non-root page */ - ((ginxlogSplit *) (rdata->data))->isRootSplit = FALSE; - ((ginxlogSplit *) (rdata->data))->rrlink = savedRightLink; + UnlockReleaseBuffer(rbuffer); + END_CRIT_SECTION(); - lpage = BufferGetPage(stack->buffer); - rpage = BufferGetPage(rbuffer); + return false; + } + } +} - GinPageGetOpaque(rpage)->rightlink = savedRightLink; - GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer); +/* + * Insert value (stored in GinBtree) to tree described by stack + * + * During an index build, buildStats is non-null and the counters + * it contains are incremented as needed. + * + * NB: the passed-in stack is freed, as though by freeGinBtreeStack. + */ +void +ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats) +{ + GinBtreeStack *parent; + BlockNumber rootBlkno; + Page page; + + /* extract root BlockNumber from stack */ + Assert(stack != NULL); + parent = stack; + while (parent->parent) + parent = parent->parent; + rootBlkno = parent->blkno; + Assert(BlockNumberIsValid(rootBlkno)); - START_CRIT_SECTION(); - PageRestoreTempPage(newlpage, lpage); + /* this loop crawls up the stack until the insertion is complete */ + for (;;) + { + bool done; - MarkBufferDirty(rbuffer); - MarkBufferDirty(stack->buffer); + done = ginPlaceToPage(btree, rootBlkno, stack, buildStats); - if (RelationNeedsWAL(btree->index)) - { - XLogRecPtr recptr; + /* just to be extra sure we don't delete anything by accident... */ + btree->isDelete = FALSE; - recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata); - PageSetLSN(lpage, recptr); - PageSetLSN(rpage, recptr); - } - UnlockReleaseBuffer(rbuffer); - END_CRIT_SECTION(); - } + if (done) + { + LockBuffer(stack->buffer, GIN_UNLOCK); + freeGinBtreeStack(stack); + break; } btree->prepareDownlink(btree, stack->buffer); - btree->isDelete = FALSE; /* search parent to lock */ LockBuffer(parent->buffer, GIN_EXCLUSIVE); -- 2.40.0