#include "miscadmin.h"
#include "utils/rel.h"
+static void ginFindParents(GinBtree btree, GinBtreeStack *stack);
+static bool ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
+ void *insertdata, BlockNumber updateblkno,
+ Buffer childbuf, GinStatsData *buildStats);
+static void ginFinishSplit(GinBtree btree, GinBtreeStack *stack,
+ bool freestack, GinStatsData *buildStats);
+
/*
* Lock buffer by needed method for search.
*/
access = ginTraverseLock(stack->buffer, searchMode);
+ /*
+ * If we're going to modify the tree, finish any incomplete splits we
+ * encounter on the way.
+ */
+ if (!searchMode && GinPageIsIncompleteSplit(page))
+ ginFinishSplit(btree, stack, false, NULL);
+
/*
* ok, page is correctly locked, we should check to move right ..,
* root never has a right link, so small optimization
stack->buffer = ginStepRight(stack->buffer, btree->index, access);
stack->blkno = rightlink;
page = BufferGetPage(stack->buffer);
+
+ if (!searchMode && GinPageIsIncompleteSplit(page))
+ ginFinishSplit(btree, stack, false, NULL);
}
if (GinPageIsLeaf(page)) /* we found, return locked page */
* child's offset in stack->parent. The root page is never released, to
* to prevent conflict with vacuum process.
*/
-void
+static void
ginFindParents(GinBtree btree, GinBtreeStack *stack)
{
Page page;
BlockNumber blkno,
leftmostBlkno;
OffsetNumber offset;
- GinBtreeStack *root = stack->parent;
+ GinBtreeStack *root;
GinBtreeStack *ptr;
- if (!root)
+ /*
+ * Unwind the stack all the way up to the root, leaving only the root
+ * item.
+ *
+ * Be careful not to release the pin on the root page! The pin on root
+ * page is required to lock out concurrent vacuums on the tree.
+ */
+ root = stack->parent;
+ while (root->parent)
{
- /* XLog mode... */
- root = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
- root->blkno = btree->rootBlkno;
- root->buffer = ReadBuffer(btree->index, btree->rootBlkno);
- LockBuffer(root->buffer, GIN_EXCLUSIVE);
- root->parent = NULL;
+ ReleaseBuffer(root->buffer);
+ root = root->parent;
}
- else
- {
- /*
- * find root, we should not release root page until update is
- * finished!!
- */
- while (root->parent)
- {
- ReleaseBuffer(root->buffer);
- root = root->parent;
- }
- Assert(root->blkno == btree->rootBlkno);
- Assert(BufferGetBlockNumber(root->buffer) == btree->rootBlkno);
- LockBuffer(root->buffer, GIN_EXCLUSIVE);
- }
+ Assert(root->blkno == btree->rootBlkno);
+ Assert(BufferGetBlockNumber(root->buffer) == btree->rootBlkno);
root->off = InvalidOffsetNumber;
- page = BufferGetPage(root->buffer);
- Assert(!GinPageIsLeaf(page));
-
- /* check trivial case */
- if ((root->off = btree->findChildPtr(btree, page, stack->blkno, InvalidOffsetNumber)) != InvalidOffsetNumber)
- {
- stack->parent = root;
- return;
- }
+ blkno = root->blkno;
+ buffer = root->buffer;
+ offset = InvalidOffsetNumber;
- leftmostBlkno = blkno = btree->getLeftMostChild(btree, page);
- LockBuffer(root->buffer, GIN_UNLOCK);
- Assert(blkno != InvalidBlockNumber);
+ ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
for (;;)
{
- buffer = ReadBuffer(btree->index, blkno);
LockBuffer(buffer, GIN_EXCLUSIVE);
page = BufferGetPage(buffer);
if (GinPageIsLeaf(page))
elog(ERROR, "Lost path");
+ if (GinPageIsIncompleteSplit(page))
+ {
+ Assert(blkno != btree->rootBlkno);
+ ptr->blkno = blkno;
+ ptr->buffer = buffer;
+ /*
+ * parent may be wrong, but if so, the ginFinishSplit call will
+ * recurse to call ginFindParents again to fix it.
+ */
+ ptr->parent = root;
+ ptr->off = InvalidOffsetNumber;
+
+ ginFinishSplit(btree, ptr, false, NULL);
+ }
+
leftmostBlkno = btree->getLeftMostChild(btree, page);
while ((offset = btree->findChildPtr(btree, page, stack->blkno, InvalidOffsetNumber)) == InvalidOffsetNumber)
}
buffer = ginStepRight(buffer, btree->index, GIN_EXCLUSIVE);
page = BufferGetPage(buffer);
+
+ /* finish any incomplete splits, as above */
+ if (GinPageIsIncompleteSplit(page))
+ {
+ Assert(blkno != btree->rootBlkno);
+ ptr->blkno = blkno;
+ ptr->buffer = buffer;
+ ptr->parent = root;
+ ptr->off = InvalidOffsetNumber;
+
+ ginFinishSplit(btree, ptr, false, NULL);
+ }
}
if (blkno != InvalidBlockNumber)
{
- ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
ptr->blkno = blkno;
ptr->buffer = buffer;
ptr->parent = root; /* it may be wrong, but in next call we will
return;
}
+ /* Descend down to next level */
blkno = leftmostBlkno;
+ buffer = ReadBuffer(btree->index, blkno);
}
}
* the parent needs to be updated. (a root split returns true as it doesn't
* need any further action by the caller to complete)
*
- * When inserting a downlink to a internal page, the existing item at the
- * given location is updated to point to 'updateblkno'.
+ * When inserting a downlink to a internal page, 'childbuf' contains the
+ * child page that was split. Its GIN_INCOMPLETE_SPLIT flag will be cleared
+ * atomically with the insert. Also, the existing item at the given location
+ * is updated to point to 'updateblkno'.
*
* stack->buffer is locked on entry, and is kept locked.
*/
static bool
ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
void *insertdata, BlockNumber updateblkno,
- GinStatsData *buildStats)
+ Buffer childbuf, GinStatsData *buildStats)
{
Page page = BufferGetPage(stack->buffer);
- XLogRecData *rdata;
+ XLogRecData *payloadrdata;
bool fit;
+ uint16 xlflags = 0;
+ Page childpage = NULL;
+
+ if (GinPageIsData(page))
+ xlflags |= GIN_INSERT_ISDATA;
+ if (GinPageIsLeaf(page))
+ {
+ xlflags |= GIN_INSERT_ISLEAF;
+ Assert(!BufferIsValid(childbuf));
+ Assert(updateblkno == InvalidBlockNumber);
+ }
+ else
+ {
+ Assert(BufferIsValid(childbuf));
+ Assert(updateblkno != InvalidBlockNumber);
+ childpage = BufferGetPage(childbuf);
+ }
/*
* Try to put the incoming tuple on the page. If it doesn't fit,
START_CRIT_SECTION();
fit = btree->placeToPage(btree, stack->buffer, stack->off,
insertdata, updateblkno,
- &rdata);
+ &payloadrdata);
if (fit)
{
MarkBufferDirty(stack->buffer);
+ /* An insert to an internal page finishes the split of the child. */
+ if (childbuf != InvalidBuffer)
+ {
+ GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
+ MarkBufferDirty(childbuf);
+ }
+
if (RelationNeedsWAL(btree->index))
{
XLogRecPtr recptr;
+ XLogRecData rdata[3];
+ ginxlogInsert xlrec;
+ BlockIdData childblknos[2];
+
+ xlrec.node = btree->index->rd_node;
+ xlrec.blkno = BufferGetBlockNumber(stack->buffer);
+ xlrec.offset = stack->off;
+ xlrec.flags = xlflags;
+
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].data = (char *) &xlrec;
+ rdata[0].len = sizeof(ginxlogInsert);
+
+ /*
+ * Log information about child if this was an insertion of a
+ * downlink.
+ */
+ if (childbuf != InvalidBuffer)
+ {
+ rdata[0].next = &rdata[1];
+
+ BlockIdSet(&childblknos[0], BufferGetBlockNumber(childbuf));
+ BlockIdSet(&childblknos[1], GinPageGetOpaque(childpage)->rightlink);
+
+ rdata[1].buffer = InvalidBuffer;
+ rdata[1].data = (char *) childblknos;
+ rdata[1].len = sizeof(BlockIdData) * 2;
+ rdata[1].next = &rdata[2];
+
+ rdata[2].buffer = childbuf;
+ rdata[2].buffer_std = false;
+ rdata[2].data = NULL;
+ rdata[2].len = 0;
+ rdata[2].next = payloadrdata;
+ }
+ else
+ rdata[0].next = payloadrdata;
recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT, rdata);
PageSetLSN(page, recptr);
+ if (childbuf != InvalidBuffer)
+ PageSetLSN(childpage, recptr);
}
END_CRIT_SECTION();
Buffer rbuffer;
Page newlpage;
BlockNumber savedRightLink;
- GinBtreeStack *parent;
- Page lpage,
- rpage;
+ Page rpage;
+ XLogRecData rdata[2];
+ ginxlogSplit data;
+ Buffer lbuffer = InvalidBuffer;
+ Page newrootpg = NULL;
END_CRIT_SECTION();
rbuffer = GinNewBuffer(btree->index);
+
/* During index build, count the new page */
if (buildStats)
{
*/
newlpage = btree->splitPage(btree, stack->buffer, rbuffer, stack->off,
insertdata, updateblkno,
- &rdata);
+ &payloadrdata);
- ((ginxlogSplit *) (rdata->data))->rootBlkno = btree->rootBlkno;
+ data.node = btree->index->rd_node;
+ data.rblkno = BufferGetBlockNumber(rbuffer);
+ data.flags = xlflags;
+ if (childbuf != InvalidBuffer)
+ {
+ Page childpage = BufferGetPage(childbuf);
+ GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
- parent = stack->parent;
+ data.leftChildBlkno = BufferGetBlockNumber(childbuf);
+ data.rightChildBlkno = GinPageGetOpaque(childpage)->rightlink;
+ }
+ else
+ data.leftChildBlkno = data.rightChildBlkno = InvalidBlockNumber;
+
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].data = (char *) &data;
+ rdata[0].len = sizeof(ginxlogSplit);
+
+ if (childbuf != InvalidBuffer)
+ {
+ rdata[0].next = &rdata[1];
+
+ rdata[1].buffer = childbuf;
+ rdata[1].buffer_std = false;
+ rdata[1].data = NULL;
+ rdata[1].len = 0;
+ rdata[1].next = payloadrdata;
+ }
+ else
+ rdata[0].next = payloadrdata;
+
+ rpage = BufferGetPage(rbuffer);
- if (parent == NULL)
+ if (stack->parent == NULL)
{
/*
* split root, so we need to allocate new left page and place
* pointer on root to left and right page
*/
- Buffer lbuffer = GinNewBuffer(btree->index);
+ lbuffer = GinNewBuffer(btree->index);
- /* During index build, count the new page */
+ /* During index build, count the newly-added root page */
if (buildStats)
{
if (btree->isData)
buildStats->nEntryPages++;
}
- ((ginxlogSplit *) (rdata->data))->isRootSplit = TRUE;
- ((ginxlogSplit *) (rdata->data))->rrlink = InvalidBlockNumber;
-
- lpage = BufferGetPage(lbuffer);
- rpage = BufferGetPage(rbuffer);
+ /*
+ * root never has a right-link, so we borrow the rrlink field to
+ * store the root block number.
+ */
+ data.rrlink = BufferGetBlockNumber(stack->buffer);
+ data.lblkno = BufferGetBlockNumber(lbuffer);
+ data.flags |= GIN_SPLIT_ROOT;
GinPageGetOpaque(rpage)->rightlink = InvalidBlockNumber;
GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
- ((ginxlogSplit *) (rdata->data))->lblkno = BufferGetBlockNumber(lbuffer);
-
- START_CRIT_SECTION();
-
- GinInitBuffer(stack->buffer, GinPageGetOpaque(newlpage)->flags & ~GIN_LEAF);
- PageRestoreTempPage(newlpage, lpage);
- btree->fillRoot(btree, stack->buffer, lbuffer, rbuffer);
-
- MarkBufferDirty(rbuffer);
- MarkBufferDirty(lbuffer);
- MarkBufferDirty(stack->buffer);
-
- if (RelationNeedsWAL(btree->index))
- {
- XLogRecPtr recptr;
-
- recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
- PageSetLSN(page, recptr);
- PageSetLSN(lpage, recptr);
- PageSetLSN(rpage, recptr);
- }
-
- UnlockReleaseBuffer(rbuffer);
- UnlockReleaseBuffer(lbuffer);
- END_CRIT_SECTION();
- /* During index build, count the newly-added root page */
- if (buildStats)
- {
- if (btree->isData)
- buildStats->nDataPages++;
- else
- buildStats->nEntryPages++;
- }
+ /*
+ * Construct a new root page containing downlinks to the new left
+ * and right pages. (do this in a temporary copy first rather
+ * than overwriting the original page directly, so that we can still
+ * abort gracefully if this fails.)
+ */
+ newrootpg = PageGetTempPage(rpage);
+ GinInitPage(newrootpg, GinPageGetOpaque(newlpage)->flags & ~GIN_LEAF, BLCKSZ);
- return true;
+ btree->fillRoot(btree, newrootpg,
+ BufferGetBlockNumber(lbuffer), newlpage,
+ BufferGetBlockNumber(rbuffer), rpage);
}
else
{
/* split non-root page */
- ((ginxlogSplit *) (rdata->data))->isRootSplit = FALSE;
- ((ginxlogSplit *) (rdata->data))->rrlink = savedRightLink;
-
- lpage = BufferGetPage(stack->buffer);
- rpage = BufferGetPage(rbuffer);
+ data.rrlink = savedRightLink;
+ data.lblkno = BufferGetBlockNumber(stack->buffer);
GinPageGetOpaque(rpage)->rightlink = savedRightLink;
+ GinPageGetOpaque(newlpage)->flags |= GIN_INCOMPLETE_SPLIT;
GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
+ }
+
+ /*
+ * Ok, we have the new contents of the left page in a temporary copy
+ * now (newlpage), and the newly-allocated right block has been filled
+ * in. The original page is still unchanged.
+ *
+ * If this is a root split, we also have a temporary page containing
+ * the new contents of the root. Copy the new left page to a
+ * newly-allocated block, and initialize the (original) root page the
+ * new copy. Otherwise, copy over the temporary copy of the new left
+ * page over the old left page.
+ */
- START_CRIT_SECTION();
- PageRestoreTempPage(newlpage, lpage);
+ START_CRIT_SECTION();
- MarkBufferDirty(rbuffer);
- MarkBufferDirty(stack->buffer);
+ MarkBufferDirty(rbuffer);
- if (RelationNeedsWAL(btree->index))
- {
- XLogRecPtr recptr;
+ if (stack->parent == NULL)
+ {
+ PageRestoreTempPage(newlpage, BufferGetPage(lbuffer));
+ MarkBufferDirty(lbuffer);
+ newlpage = newrootpg;
+ }
- recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
- PageSetLSN(lpage, recptr);
- PageSetLSN(rpage, recptr);
- }
- UnlockReleaseBuffer(rbuffer);
- END_CRIT_SECTION();
+ PageRestoreTempPage(newlpage, BufferGetPage(stack->buffer));
+ MarkBufferDirty(stack->buffer);
- return false;
+ /* write WAL record */
+ if (RelationNeedsWAL(btree->index))
+ {
+ XLogRecPtr recptr;
+
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
+ PageSetLSN(BufferGetPage(stack->buffer), recptr);
+ PageSetLSN(rpage, recptr);
+ if (stack->parent == NULL)
+ PageSetLSN(BufferGetPage(lbuffer), recptr);
}
+ END_CRIT_SECTION();
+
+ /*
+ * We can release the lock on the right page now, but keep the
+ * original buffer locked.
+ */
+ UnlockReleaseBuffer(rbuffer);
+ if (stack->parent == NULL)
+ UnlockReleaseBuffer(lbuffer);
+
+ /*
+ * If we split the root, we're done. Otherwise the split is not
+ * complete until the downlink for the new page has been inserted to
+ * the parent.
+ */
+ if (stack->parent == NULL)
+ return true;
+ else
+ return false;
}
}
*
* On entry, stack->buffer is exclusively locked.
*
- * NB: the passed-in stack is freed, as though by freeGinBtreeStack.
+ * If freestack is true, all the buffers are released and unlocked as we
+ * crawl up the tree, and 'stack' is freed. Otherwise stack->buffer is kept
+ * locked, and stack is unmodified, except for possibly moving right to find
+ * the correct parent of page.
*/
-void
-ginFinishSplit(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
+static void
+ginFinishSplit(GinBtree btree, GinBtreeStack *stack, bool freestack,
+ GinStatsData *buildStats)
{
Page page;
bool done;
+ bool first = true;
+
+ /*
+ * freestack == false when we encounter an incompletely split page during a
+ * scan, while freestack == true is used in the normal scenario that a
+ * split is finished right after the initial insert.
+ */
+ if (!freestack)
+ elog(DEBUG1, "finishing incomplete split of block %u in gin index \"%s\"",
+ stack->blkno, RelationGetRelationName(btree->index));
/* this loop crawls up the stack until the insertion is complete */
do
void *insertdata;
BlockNumber updateblkno;
- insertdata = btree->prepareDownlink(btree, stack->buffer);
- updateblkno = GinPageGetOpaque(BufferGetPage(stack->buffer))->rightlink;
-
/* search parent to lock */
LockBuffer(parent->buffer, GIN_EXCLUSIVE);
+ /*
+ * If the parent page was incompletely split, finish that split first,
+ * then continue with the current one.
+ *
+ * Note: we have to finish *all* incomplete splits we encounter, even
+ * if we have to move right. Otherwise we might choose as the target
+ * a page that has no downlink in the parent, and splitting it further
+ * would fail.
+ */
+ if (GinPageIsIncompleteSplit(BufferGetPage(parent->buffer)))
+ ginFinishSplit(btree, parent, false, buildStats);
+
/* move right if it's needed */
page = BufferGetPage(parent->buffer);
while ((parent->off = btree->findChildPtr(btree, page, stack->blkno, parent->off)) == InvalidOffsetNumber)
{
- BlockNumber rightlink = GinPageGetOpaque(page)->rightlink;
-
- if (rightlink == InvalidBlockNumber)
+ if (GinPageRightMost(page))
{
/*
* rightmost page, but we don't find parent, we should use
}
parent->buffer = ginStepRight(parent->buffer, btree->index, GIN_EXCLUSIVE);
- parent->blkno = rightlink;
+ parent->blkno = BufferGetBlockNumber(parent->buffer);
page = BufferGetPage(parent->buffer);
- }
- /* release the child */
- UnlockReleaseBuffer(stack->buffer);
- pfree(stack);
- stack = parent;
+ if (GinPageIsIncompleteSplit(BufferGetPage(parent->buffer)))
+ ginFinishSplit(btree, parent, false, buildStats);
+ }
- /* insert the downlink to parent */
- done = ginPlaceToPage(btree, stack,
+ /* insert the downlink */
+ insertdata = btree->prepareDownlink(btree, stack->buffer);
+ updateblkno = GinPageGetOpaque(BufferGetPage(stack->buffer))->rightlink;
+ done = ginPlaceToPage(btree, parent,
insertdata, updateblkno,
- buildStats);
+ stack->buffer, buildStats);
pfree(insertdata);
+
+ /*
+ * If the caller requested to free the stack, unlock and release the
+ * child buffer now. Otherwise keep it pinned and locked, but if we
+ * have to recurse up the tree, we can unlock the upper pages, only
+ * keeping the page at the bottom of the stack locked.
+ */
+ if (!first || freestack)
+ LockBuffer(stack->buffer, GIN_UNLOCK);
+ if (freestack)
+ {
+ ReleaseBuffer(stack->buffer);
+ pfree(stack);
+ }
+ stack = parent;
+
+ first = false;
} while (!done);
+
+ /* unlock the parent */
LockBuffer(stack->buffer, GIN_UNLOCK);
- /* free the rest of the stack */
- freeGinBtreeStack(stack);
+ if (freestack)
+ freeGinBtreeStack(stack);
}
/*
{
bool done;
+ /* If the leaf page was incompletely split, finish the split first */
+ if (GinPageIsIncompleteSplit(BufferGetPage(stack->buffer)))
+ ginFinishSplit(btree, stack, false, buildStats);
+
done = ginPlaceToPage(btree, stack,
insertdata, InvalidBlockNumber,
- buildStats);
+ InvalidBuffer, buildStats);
if (done)
{
LockBuffer(stack->buffer, GIN_UNLOCK);
freeGinBtreeStack(stack);
}
else
- ginFinishSplit(btree, stack, buildStats);
+ ginFinishSplit(btree, stack, true, buildStats);
}
OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
char *ptr;
+ Assert(ItemPointerIsValid(data));
Assert(GinPageIsLeaf(page));
if (offset == InvalidOffsetNumber)
OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
char *ptr;
+ Assert(PostingItemGetBlockNumber(data) != InvalidBlockNumber);
Assert(!GinPageIsLeaf(page));
if (offset == InvalidOffsetNumber)
XLogRecData **prdata)
{
Page page = BufferGetPage(buf);
- int cnt = 0;
-
/* these must be static so they can be returned to caller */
- static XLogRecData rdata[3];
- static ginxlogInsert data;
+ static XLogRecData rdata[2];
/* quick exit if it doesn't fit */
if (!dataIsEnoughSpace(btree, buf, off, insertdata))
PostingItemSetBlockNumber(pitem, updateblkno);
}
- data.updateBlkno = updateblkno;
- data.node = btree->index->rd_node;
- data.blkno = BufferGetBlockNumber(buf);
- data.offset = off;
- data.nitem = 1;
- data.isDelete = FALSE;
- data.isData = TRUE;
- data.isLeaf = GinPageIsLeaf(page) ? TRUE : FALSE;
-
- /*
- * Prevent full page write if child's split occurs. That is needed to
- * remove incomplete splits while replaying WAL
- *
- * data.updateBlkno contains new block number (of newly created right
- * page) for recently splited page.
- */
- if (data.updateBlkno == InvalidBlockNumber)
- {
- rdata[0].buffer = buf;
- rdata[0].buffer_std = FALSE;
- rdata[0].data = NULL;
- rdata[0].len = 0;
- rdata[0].next = &rdata[1];
- cnt++;
- }
-
- rdata[cnt].buffer = InvalidBuffer;
- rdata[cnt].data = (char *) &data;
- rdata[cnt].len = sizeof(ginxlogInsert);
- rdata[cnt].next = &rdata[cnt + 1];
- cnt++;
-
- rdata[cnt].buffer = InvalidBuffer;
- /* data and len filled in below */
- rdata[cnt].next = NULL;
-
if (GinPageIsLeaf(page))
{
GinBtreeDataLeafInsertData *items = insertdata;
+ static ginxlogInsertDataLeaf data;
uint32 savedPos = items->curitem;
if (GinPageRightMost(page) && off > GinPageGetOpaque(page)->maxoff)
{
GinDataPageAddItemPointer(page, items->items + items->curitem, off);
items->curitem++;
+ data.nitem = 1;
}
- rdata[cnt].data = (char *) &items->items[savedPos];
- rdata[cnt].len = sizeof(ItemPointerData) * data.nitem;
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].data = (char *) &data;
+ rdata[0].len = offsetof(ginxlogInsertDataLeaf, items);
+ rdata[0].next = &rdata[1];
+
+ rdata[1].buffer = InvalidBuffer;
+ rdata[1].data = (char *) &items->items[savedPos];
+ rdata[1].len = sizeof(ItemPointerData) * data.nitem;
+ rdata[1].next = NULL;
}
else
{
GinDataPageAddPostingItem(page, pitem, off);
- rdata[cnt].data = (char *) pitem;
- rdata[cnt].len = sizeof(PostingItem);
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].data = (char *) pitem;
+ rdata[0].len = sizeof(PostingItem);
+ rdata[0].next = NULL;
}
return true;
Size freeSpace;
/* these must be static so they can be returned to caller */
- static ginxlogSplit data;
- static XLogRecData rdata[4];
+ static ginxlogSplitData data;
+ static XLogRecData rdata[2];
static char vector[2 * BLCKSZ];
GinInitPage(rpage, GinPageGetOpaque(lpage)->flags, pageSize);
if (isleaf && GinPageRightMost(lpage) && off > GinPageGetOpaque(lpage)->maxoff)
{
+ /* append new items to the end */
GinBtreeDataLeafInsertData *items = insertdata;
while (items->curitem < items->nitem &&
bound = GinDataPageGetRightBound(rpage);
*bound = oldbound;
- data.node = btree->index->rd_node;
- data.rootBlkno = InvalidBlockNumber;
- data.lblkno = BufferGetBlockNumber(lbuf);
- data.rblkno = BufferGetBlockNumber(rbuf);
data.separator = separator;
data.nitem = maxoff;
- data.isData = TRUE;
- data.isLeaf = GinPageIsLeaf(lpage) ? TRUE : FALSE;
- data.isRootSplit = FALSE;
data.rightbound = oldbound;
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) &data;
- rdata[0].len = sizeof(ginxlogSplit);
+ rdata[0].len = sizeof(ginxlogSplitData);
rdata[0].next = &rdata[1];
rdata[1].buffer = InvalidBuffer;
rdata[1].data = vector;
- rdata[1].len = MAXALIGN(maxoff * sizeofitem);
+ rdata[1].len = maxoff * sizeofitem;
rdata[1].next = NULL;
return lpage;
* Also called from ginxlog, should not use btree
*/
void
-ginDataFillRoot(GinBtree btree, Buffer root, Buffer lbuf, Buffer rbuf)
+ginDataFillRoot(GinBtree btree, Page root, BlockNumber lblkno, Page lpage, BlockNumber rblkno, Page rpage)
{
- Page page = BufferGetPage(root),
- lpage = BufferGetPage(lbuf),
- rpage = BufferGetPage(rbuf);
PostingItem li,
ri;
li.key = *GinDataPageGetRightBound(lpage);
- PostingItemSetBlockNumber(&li, BufferGetBlockNumber(lbuf));
- GinDataPageAddPostingItem(page, &li, InvalidOffsetNumber);
+ PostingItemSetBlockNumber(&li, lblkno);
+ GinDataPageAddPostingItem(root, &li, InvalidOffsetNumber);
ri.key = *GinDataPageGetRightBound(rpage);
- PostingItemSetBlockNumber(&ri, BufferGetBlockNumber(rbuf));
- GinDataPageAddPostingItem(page, &ri, InvalidOffsetNumber);
+ PostingItemSetBlockNumber(&ri, rblkno);
+ GinDataPageAddPostingItem(root, &ri, InvalidOffsetNumber);
}
/*
/* these must be static so they can be returned to caller */
static XLogRecData rdata[3];
- static ginxlogInsert data;
+ static ginxlogInsertEntry data;
/* quick exit if it doesn't fit */
if (!entryIsEnoughSpace(btree, buf, off, insertData))
*prdata = rdata;
entryPreparePage(btree, page, off, insertData, updateblkno);
- data.updateBlkno = updateblkno;
placed = PageAddItem(page,
(Item) insertData->entry,
elog(ERROR, "failed to add item to index page in \"%s\"",
RelationGetRelationName(btree->index));
- data.node = btree->index->rd_node;
- data.blkno = BufferGetBlockNumber(buf);
- data.offset = off;
- data.nitem = 1;
data.isDelete = insertData->isDelete;
- data.isData = false;
- data.isLeaf = GinPageIsLeaf(page) ? TRUE : FALSE;
-
- /*
- * Prevent full page write if child's split occurs. That is needed to
- * remove incomplete splits while replaying WAL
- *
- * data.updateBlkno contains new block number (of newly created right
- * page) for recently splited page.
- */
- if (data.updateBlkno == InvalidBlockNumber)
- {
- rdata[0].buffer = buf;
- rdata[0].buffer_std = TRUE;
- rdata[0].data = NULL;
- rdata[0].len = 0;
- rdata[0].next = &rdata[1];
- cnt++;
- }
rdata[cnt].buffer = InvalidBuffer;
rdata[cnt].data = (char *) &data;
- rdata[cnt].len = sizeof(ginxlogInsert);
+ rdata[cnt].len = offsetof(ginxlogInsertEntry, tuple);
rdata[cnt].next = &rdata[cnt + 1];
cnt++;
maxoff,
separator = InvalidOffsetNumber;
Size totalsize = 0;
+ Size tupstoresize;
Size lsize = 0,
size;
char *ptr;
/* these must be static so they can be returned to caller */
static XLogRecData rdata[2];
- static ginxlogSplit data;
+ static ginxlogSplitEntry data;
static char tupstore[2 * BLCKSZ];
*prdata = rdata;
- data.leftChildBlkno = (GinPageIsLeaf(lpage)) ?
- InvalidOffsetNumber : GinGetDownlink(insertData->entry);
- data.updateBlkno = updateblkno;
entryPreparePage(btree, lpage, off, insertData, updateblkno);
+ /*
+ * First, append all the existing tuples and the new tuple we're inserting
+ * one after another in a temporary workspace.
+ */
maxoff = PageGetMaxOffsetNumber(lpage);
ptr = tupstore;
-
for (i = FirstOffsetNumber; i <= maxoff; i++)
{
if (i == off)
ptr += size;
totalsize += size + sizeof(ItemIdData);
}
+ tupstoresize = ptr - tupstore;
+ /*
+ * Initialize the left and right pages, and copy all the tuples back to
+ * them.
+ */
GinInitPage(rpage, GinPageGetOpaque(lpage)->flags, pageSize);
GinInitPage(lpage, GinPageGetOpaque(rpage)->flags, pageSize);
ptr += MAXALIGN(IndexTupleSize(itup));
}
- data.node = btree->index->rd_node;
- data.rootBlkno = InvalidBlockNumber;
- data.lblkno = BufferGetBlockNumber(lbuf);
- data.rblkno = BufferGetBlockNumber(rbuf);
data.separator = separator;
data.nitem = maxoff;
- data.isData = FALSE;
- data.isLeaf = GinPageIsLeaf(lpage) ? TRUE : FALSE;
- data.isRootSplit = FALSE;
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) &data;
- rdata[0].len = sizeof(ginxlogSplit);
+ rdata[0].len = sizeof(ginxlogSplitEntry);
rdata[0].next = &rdata[1];
rdata[1].buffer = InvalidBuffer;
rdata[1].data = tupstore;
- rdata[1].len = MAXALIGN(totalsize);
+ rdata[1].len = tupstoresize;
rdata[1].next = NULL;
return lpage;
* Also called from ginxlog, should not use btree
*/
void
-ginEntryFillRoot(GinBtree btree, Buffer root, Buffer lbuf, Buffer rbuf)
+ginEntryFillRoot(GinBtree btree, Page root,
+ BlockNumber lblkno, Page lpage,
+ BlockNumber rblkno, Page rpage)
{
- Page page = BufferGetPage(root);
- Page lpage = BufferGetPage(lbuf);
- Page rpage = BufferGetPage(rbuf);
IndexTuple itup;
- itup = GinFormInteriorTuple(getRightMostTuple(lpage),
- lpage,
- BufferGetBlockNumber(lbuf));
- if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
+ itup = GinFormInteriorTuple(getRightMostTuple(lpage), lpage, lblkno);
+ if (PageAddItem(root, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add item to index root page");
pfree(itup);
- itup = GinFormInteriorTuple(getRightMostTuple(rpage),
- rpage,
- BufferGetBlockNumber(rbuf));
- if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
+ itup = GinFormInteriorTuple(getRightMostTuple(rpage), rpage, rblkno);
+ if (PageAddItem(root, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add item to index root page");
pfree(itup);
}
#include "utils/memutils.h"
static MemoryContext opCtx; /* working memory for operations */
-static MemoryContext topCtx;
-
-typedef struct ginIncompleteSplit
-{
- RelFileNode node;
- BlockNumber leftBlkno;
- BlockNumber rightBlkno;
- BlockNumber rootBlkno;
-} ginIncompleteSplit;
-
-static List *incomplete_splits;
static void
-pushIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber rightBlkno, BlockNumber rootBlkno)
+ginRedoClearIncompleteSplit(XLogRecPtr lsn, RelFileNode node, BlockNumber blkno)
{
- ginIncompleteSplit *split;
-
- MemoryContextSwitchTo(topCtx);
-
- split = palloc(sizeof(ginIncompleteSplit));
-
- split->node = node;
- split->leftBlkno = leftBlkno;
- split->rightBlkno = rightBlkno;
- split->rootBlkno = rootBlkno;
-
- incomplete_splits = lappend(incomplete_splits, split);
-
- MemoryContextSwitchTo(opCtx);
-}
+ Buffer buffer;
+ Page page;
-static void
-forgetIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber updateBlkno)
-{
- ListCell *l;
+ buffer = XLogReadBuffer(node, blkno, false);
+ if (!BufferIsValid(buffer))
+ return; /* page was deleted, nothing to do */
+ page = (Page) BufferGetPage(buffer);
- foreach(l, incomplete_splits)
+ if (lsn > PageGetLSN(page))
{
- ginIncompleteSplit *split = (ginIncompleteSplit *) lfirst(l);
+ GinPageGetOpaque(page)->flags &= ~GIN_INCOMPLETE_SPLIT;
- if (RelFileNodeEquals(node, split->node) &&
- leftBlkno == split->leftBlkno &&
- updateBlkno == split->rightBlkno)
- {
- incomplete_splits = list_delete_ptr(incomplete_splits, split);
- pfree(split);
- break;
- }
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
}
+
+ UnlockReleaseBuffer(buffer);
}
static void
}
static void
-ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
+ginRedoInsertEntry(Buffer buffer, OffsetNumber offset, BlockNumber rightblkno,
+ void *rdata)
{
- ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record);
- Buffer buffer;
- Page page;
+ Page page = BufferGetPage(buffer);
+ ginxlogInsertEntry *data = (ginxlogInsertEntry *) rdata;
+ IndexTuple itup;
- /* first, forget any incomplete split this insertion completes */
- if (data->isData)
+ if (rightblkno != InvalidBlockNumber)
{
- Assert(data->isDelete == FALSE);
- if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
- {
- PostingItem *pitem;
+ /* update link to right page after split */
+ Assert(!GinPageIsLeaf(page));
+ Assert(offset >= FirstOffsetNumber && offset <= PageGetMaxOffsetNumber(page));
+ itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offset));
+ GinSetDownlink(itup, rightblkno);
+ }
- pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
- forgetIncompleteSplit(data->node,
- PostingItemGetBlockNumber(pitem),
- data->updateBlkno);
- }
+ if (data->isDelete)
+ {
+ Assert(GinPageIsLeaf(page));
+ Assert(offset >= FirstOffsetNumber && offset <= PageGetMaxOffsetNumber(page));
+ PageIndexTupleDelete(page, offset);
+ }
+
+ itup = &data->tuple;
+
+ if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), offset, false, false) == InvalidOffsetNumber)
+ {
+ RelFileNode node;
+ ForkNumber forknum;
+ BlockNumber blknum;
+ BufferGetTag(buffer, &node, &forknum, &blknum);
+ elog(ERROR, "failed to add item to index page in %u/%u/%u",
+ node.spcNode, node.dbNode, node.relNode);
+ }
+}
+
+static void
+ginRedoInsertData(Buffer buffer, OffsetNumber offset, BlockNumber rightblkno,
+ void *rdata)
+{
+ Page page = BufferGetPage(buffer);
+
+ if (GinPageIsLeaf(page))
+ {
+ ginxlogInsertDataLeaf *data = (ginxlogInsertDataLeaf *) rdata;
+ ItemPointerData *items = data->items;
+ OffsetNumber i;
+
+ for (i = 0; i < data->nitem; i++)
+ GinDataPageAddItemPointer(page, &items[i], offset + i);
}
else
{
- if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
- {
- IndexTuple itup;
+ PostingItem *pitem = (PostingItem *) rdata;
+ PostingItem *oldpitem;
- itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
- forgetIncompleteSplit(data->node,
- GinGetDownlink(itup),
- data->updateBlkno);
- }
+ /* update link to right page after split */
+ oldpitem = GinDataPageGetPostingItem(page, offset);
+ PostingItemSetBlockNumber(oldpitem, rightblkno);
+
+ GinDataPageAddPostingItem(page, pitem, offset);
+ }
+}
+
+static void
+ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
+{
+ ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record);
+ Buffer buffer;
+ Page page;
+ char *payload;
+ BlockNumber leftChildBlkno = InvalidBlockNumber;
+ BlockNumber rightChildBlkno = InvalidBlockNumber;
+ bool isLeaf = (data->flags & GIN_INSERT_ISLEAF) != 0;
+
+ payload = XLogRecGetData(record) + sizeof(ginxlogInsert);
+
+ /*
+ * First clear incomplete-split flag on child page if this finishes
+ * a split.
+ */
+ if (!isLeaf)
+ {
+ leftChildBlkno = BlockIdGetBlockNumber((BlockId) payload);
+ payload += sizeof(BlockIdData);
+ rightChildBlkno = BlockIdGetBlockNumber((BlockId) payload);
+ payload += sizeof(BlockIdData);
+
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
+ else
+ ginRedoClearIncompleteSplit(lsn, data->node, leftChildBlkno);
}
/* If we have a full-page image, restore it and we're done */
- if (record->xl_info & XLR_BKP_BLOCK(0))
+ if (record->xl_info & XLR_BKP_BLOCK(isLeaf ? 0 : 1))
{
- (void) RestoreBackupBlock(lsn, record, 0, false, false);
+ (void) RestoreBackupBlock(lsn, record, isLeaf ? 0 : 1, false, false);
return;
}
if (lsn > PageGetLSN(page))
{
- if (data->isData)
+ /* How to insert the payload is tree-type specific */
+ if (data->flags & GIN_INSERT_ISDATA)
{
Assert(GinPageIsData(page));
-
- if (data->isLeaf)
- {
- OffsetNumber i;
- ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
-
- Assert(GinPageIsLeaf(page));
- Assert(data->updateBlkno == InvalidBlockNumber);
-
- for (i = 0; i < data->nitem; i++)
- GinDataPageAddItemPointer(page, &items[i], data->offset + i);
- }
- else
- {
- PostingItem *pitem;
-
- Assert(!GinPageIsLeaf(page));
-
- if (data->updateBlkno != InvalidBlockNumber)
- {
- /* update link to right page after split */
- pitem = GinDataPageGetPostingItem(page, data->offset);
- PostingItemSetBlockNumber(pitem, data->updateBlkno);
- }
-
- pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
-
- GinDataPageAddPostingItem(page, pitem, data->offset);
- }
+ ginRedoInsertData(buffer, data->offset, rightChildBlkno, payload);
}
else
{
- IndexTuple itup;
-
Assert(!GinPageIsData(page));
+ ginRedoInsertEntry(buffer, data->offset, rightChildBlkno, payload);
+ }
- if (data->updateBlkno != InvalidBlockNumber)
- {
- /* update link to right page after split */
- Assert(!GinPageIsLeaf(page));
- Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page));
- itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, data->offset));
- GinSetDownlink(itup, data->updateBlkno);
- }
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
+ }
- if (data->isDelete)
- {
- Assert(GinPageIsLeaf(page));
- Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page));
- PageIndexTupleDelete(page, data->offset);
- }
+ UnlockReleaseBuffer(buffer);
+}
- itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
+static void
+ginRedoSplitEntry(Page lpage, Page rpage, void *rdata)
+{
+ ginxlogSplitEntry *data = (ginxlogSplitEntry *) rdata;
+ IndexTuple itup = (IndexTuple) ((char *) rdata + sizeof(ginxlogSplitEntry));
+ OffsetNumber i;
- if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), data->offset, false, false) == InvalidOffsetNumber)
- elog(ERROR, "failed to add item to index page in %u/%u/%u",
- data->node.spcNode, data->node.dbNode, data->node.relNode);
- }
+ for (i = 0; i < data->separator; i++)
+ {
+ if (PageAddItem(lpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
+ elog(ERROR, "failed to add item to gin index page");
+ itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
+ }
- PageSetLSN(page, lsn);
+ for (i = data->separator; i < data->nitem; i++)
+ {
+ if (PageAddItem(rpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
+ elog(ERROR, "failed to add item to gin index page");
+ itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
+ }
+}
- MarkBufferDirty(buffer);
+static void
+ginRedoSplitData(Page lpage, Page rpage, void *rdata)
+{
+ ginxlogSplitData *data = (ginxlogSplitData *) rdata;
+ bool isleaf = GinPageIsLeaf(lpage);
+ char *ptr = (char *) rdata + sizeof(ginxlogSplitData);
+ OffsetNumber i;
+ ItemPointer bound;
+
+ if (isleaf)
+ {
+ ItemPointer items = (ItemPointer) ptr;
+ for (i = 0; i < data->separator; i++)
+ GinDataPageAddItemPointer(lpage, &items[i], InvalidOffsetNumber);
+ for (i = data->separator; i < data->nitem; i++)
+ GinDataPageAddItemPointer(rpage, &items[i], InvalidOffsetNumber);
+ }
+ else
+ {
+ PostingItem *items = (PostingItem *) ptr;
+ for (i = 0; i < data->separator; i++)
+ GinDataPageAddPostingItem(lpage, &items[i], InvalidOffsetNumber);
+ for (i = data->separator; i < data->nitem; i++)
+ GinDataPageAddPostingItem(rpage, &items[i], InvalidOffsetNumber);
}
- UnlockReleaseBuffer(buffer);
+ /* set up right key */
+ bound = GinDataPageGetRightBound(lpage);
+ if (isleaf)
+ *bound = *GinDataPageGetItemPointer(lpage, GinPageGetOpaque(lpage)->maxoff);
+ else
+ *bound = GinDataPageGetPostingItem(lpage, GinPageGetOpaque(lpage)->maxoff)->key;
+
+ bound = GinDataPageGetRightBound(rpage);
+ *bound = data->rightbound;
}
static void
Page lpage,
rpage;
uint32 flags = 0;
+ char *payload;
+ bool isLeaf = (data->flags & GIN_INSERT_ISLEAF) != 0;
+ bool isData = (data->flags & GIN_INSERT_ISDATA) != 0;
+ bool isRoot = (data->flags & GIN_SPLIT_ROOT) != 0;
+
+ payload = XLogRecGetData(record) + sizeof(ginxlogSplit);
+
+ /*
+ * First clear incomplete-split flag on child page if this finishes
+ * a split
+ */
+ if (!isLeaf)
+ {
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
+ else
+ ginRedoClearIncompleteSplit(lsn, data->node, data->leftChildBlkno);
+ }
- if (data->isLeaf)
+ if (isLeaf)
flags |= GIN_LEAF;
- if (data->isData)
- flags |= GIN_DATA;
- /* Backup blocks are not used in split records */
- Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+ if (isData)
+ flags |= GIN_DATA;
lbuffer = XLogReadBuffer(data->node, data->lblkno, true);
Assert(BufferIsValid(lbuffer));
GinInitBuffer(rbuffer, flags);
GinPageGetOpaque(lpage)->rightlink = BufferGetBlockNumber(rbuffer);
- GinPageGetOpaque(rpage)->rightlink = data->rrlink;
-
- if (data->isData)
- {
- char *ptr = XLogRecGetData(record) + sizeof(ginxlogSplit);
- Size sizeofitem = GinSizeOfDataPageItem(lpage);
- OffsetNumber i;
- ItemPointer bound;
-
- for (i = 0; i < data->separator; i++)
- {
- if (data->isLeaf)
- GinDataPageAddItemPointer(lpage, (ItemPointer) ptr, InvalidOffsetNumber);
- else
- GinDataPageAddPostingItem(lpage, (PostingItem *) ptr, InvalidOffsetNumber);
- ptr += sizeofitem;
- }
-
- for (i = data->separator; i < data->nitem; i++)
- {
- if (data->isLeaf)
- GinDataPageAddItemPointer(rpage, (ItemPointer) ptr, InvalidOffsetNumber);
- else
- GinDataPageAddPostingItem(rpage, (PostingItem *) ptr, InvalidOffsetNumber);
- ptr += sizeofitem;
- }
+ GinPageGetOpaque(rpage)->rightlink = isRoot ? InvalidBlockNumber : data->rrlink;
- /* set up right key */
- bound = GinDataPageGetRightBound(lpage);
- if (data->isLeaf)
- *bound = *GinDataPageGetItemPointer(lpage, GinPageGetOpaque(lpage)->maxoff);
- else
- *bound = GinDataPageGetPostingItem(lpage, GinPageGetOpaque(lpage)->maxoff)->key;
-
- bound = GinDataPageGetRightBound(rpage);
- *bound = data->rightbound;
- }
+ /* Do the tree-type specific portion to restore the page contents */
+ if (isData)
+ ginRedoSplitData(lpage, rpage, payload);
else
- {
- IndexTuple itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogSplit));
- OffsetNumber i;
-
- for (i = 0; i < data->separator; i++)
- {
- if (PageAddItem(lpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
- elog(ERROR, "failed to add item to index page in %u/%u/%u",
- data->node.spcNode, data->node.dbNode, data->node.relNode);
- itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
- }
-
- for (i = data->separator; i < data->nitem; i++)
- {
- if (PageAddItem(rpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
- elog(ERROR, "failed to add item to index page in %u/%u/%u",
- data->node.spcNode, data->node.dbNode, data->node.relNode);
- itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
- }
- }
+ ginRedoSplitEntry(lpage, rpage, payload);
PageSetLSN(rpage, lsn);
MarkBufferDirty(rbuffer);
PageSetLSN(lpage, lsn);
MarkBufferDirty(lbuffer);
- if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
- forgetIncompleteSplit(data->node, data->leftChildBlkno, data->updateBlkno);
-
- if (data->isRootSplit)
+ if (isRoot)
{
- Buffer rootBuf = XLogReadBuffer(data->node, data->rootBlkno, true);
+ BlockNumber rootBlkno = data->rrlink;
+ Buffer rootBuf = XLogReadBuffer(data->node, rootBlkno, true);
Page rootPage = BufferGetPage(rootBuf);
GinInitBuffer(rootBuf, flags & ~GIN_LEAF);
- if (data->isData)
+ if (isData)
{
- Assert(data->rootBlkno != GIN_ROOT_BLKNO);
- ginDataFillRoot(NULL, rootBuf, lbuffer, rbuffer);
+ Assert(rootBlkno != GIN_ROOT_BLKNO);
+ ginDataFillRoot(NULL, BufferGetPage(rootBuf),
+ BufferGetBlockNumber(lbuffer),
+ BufferGetPage(lbuffer),
+ BufferGetBlockNumber(rbuffer),
+ BufferGetPage(rbuffer));
}
else
{
- Assert(data->rootBlkno == GIN_ROOT_BLKNO);
- ginEntryFillRoot(NULL, rootBuf, lbuffer, rbuffer);
+ Assert(rootBlkno == GIN_ROOT_BLKNO);
+ ginEntryFillRoot(NULL, BufferGetPage(rootBuf),
+ BufferGetBlockNumber(lbuffer),
+ BufferGetPage(lbuffer),
+ BufferGetBlockNumber(rbuffer),
+ BufferGetPage(rbuffer));
}
PageSetLSN(rootPage, lsn);
MarkBufferDirty(rootBuf);
UnlockReleaseBuffer(rootBuf);
}
- else
- pushIncompleteSplit(data->node, data->lblkno, data->rblkno, data->rootBlkno);
UnlockReleaseBuffer(rbuffer);
UnlockReleaseBuffer(lbuffer);
gin_redo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
+ MemoryContext oldCtx;
/*
* GIN indexes do not require any conflict processing. NB: If we ever
* killed tuples outside VACUUM, we'll need to handle that here.
*/
- topCtx = MemoryContextSwitchTo(opCtx);
+ oldCtx = MemoryContextSwitchTo(opCtx);
switch (info)
{
case XLOG_GIN_CREATE_INDEX:
default:
elog(PANIC, "gin_redo: unknown op code %u", info);
}
- MemoryContextSwitchTo(topCtx);
+ MemoryContextSwitchTo(oldCtx);
MemoryContextReset(opCtx);
}
void
gin_xlog_startup(void)
{
- incomplete_splits = NIL;
-
opCtx = AllocSetContextCreate(CurrentMemoryContext,
"GIN recovery temporary context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
}
-static void
-ginContinueSplit(ginIncompleteSplit *split)
-{
- GinBtreeData btree;
- GinState ginstate;
- Relation reln;
- Buffer buffer;
- GinBtreeStack *stack;
-
- /*
- * elog(NOTICE,"ginContinueSplit root:%u l:%u r:%u", split->rootBlkno,
- * split->leftBlkno, split->rightBlkno);
- */
- buffer = XLogReadBuffer(split->node, split->leftBlkno, false);
-
- /*
- * Failure should be impossible here, because we wrote the page earlier.
- */
- if (!BufferIsValid(buffer))
- elog(PANIC, "ginContinueSplit: left block %u not found",
- split->leftBlkno);
-
- reln = CreateFakeRelcacheEntry(split->node);
-
- if (split->rootBlkno == GIN_ROOT_BLKNO)
- {
- MemSet(&ginstate, 0, sizeof(ginstate));
- ginstate.index = reln;
-
- ginPrepareEntryScan(&btree,
- InvalidOffsetNumber, (Datum) 0, GIN_CAT_NULL_KEY,
- &ginstate);
- }
- else
- {
- ginPrepareDataScan(&btree, reln, split->rootBlkno);
- }
-
- stack = palloc(sizeof(GinBtreeStack));
- stack->blkno = split->leftBlkno;
- stack->buffer = buffer;
- stack->off = InvalidOffsetNumber;
- stack->parent = NULL;
-
- ginFindParents(&btree, stack);
- LockBuffer(stack->parent->buffer, GIN_UNLOCK);
- ginFinishSplit(&btree, stack, NULL);
-
- /* buffer is released by ginFinishSplit */
-
- FreeFakeRelcacheEntry(reln);
-}
-
void
gin_xlog_cleanup(void)
{
- ListCell *l;
- MemoryContext topCtx;
-
- topCtx = MemoryContextSwitchTo(opCtx);
-
- foreach(l, incomplete_splits)
- {
- ginIncompleteSplit *split = (ginIncompleteSplit *) lfirst(l);
-
- ginContinueSplit(split);
- MemoryContextReset(opCtx);
- }
-
- MemoryContextSwitchTo(topCtx);
MemoryContextDelete(opCtx);
- incomplete_splits = NIL;
-}
-
-bool
-gin_safe_restartpoint(void)
-{
- if (incomplete_splits)
- return false;
- return true;
}
desc_node(buf, ((ginxlogCreatePostingTree *) rec)->node, ((ginxlogCreatePostingTree *) rec)->blkno);
break;
case XLOG_GIN_INSERT:
- appendStringInfoString(buf, "Insert item, ");
- desc_node(buf, ((ginxlogInsert *) rec)->node, ((ginxlogInsert *) rec)->blkno);
- appendStringInfo(buf, " offset: %u nitem: %u isdata: %c isleaf %c isdelete %c updateBlkno:%u",
- ((ginxlogInsert *) rec)->offset,
- ((ginxlogInsert *) rec)->nitem,
- (((ginxlogInsert *) rec)->isData) ? 'T' : 'F',
- (((ginxlogInsert *) rec)->isLeaf) ? 'T' : 'F',
- (((ginxlogInsert *) rec)->isDelete) ? 'T' : 'F',
- ((ginxlogInsert *) rec)->updateBlkno);
+ {
+ ginxlogInsert *xlrec = (ginxlogInsert *) rec;
+ char *payload = rec + sizeof(ginxlogInsert);
+
+ appendStringInfoString(buf, "Insert item, ");
+ desc_node(buf, xlrec->node, xlrec->blkno);
+ appendStringInfo(buf, " offset: %u isdata: %c isleaf: %c",
+ xlrec->offset,
+ (xlrec->flags & GIN_INSERT_ISDATA) ? 'T' : 'F',
+ (xlrec->flags & GIN_INSERT_ISLEAF) ? 'T' : 'F');
+ if (!(xlrec->flags & GIN_INSERT_ISLEAF))
+ {
+ BlockNumber leftChildBlkno;
+ BlockNumber rightChildBlkno;
+
+ memcpy(&leftChildBlkno, payload, sizeof(BlockNumber));
+ payload += sizeof(BlockNumber);
+ memcpy(&rightChildBlkno, payload, sizeof(BlockNumber));
+ payload += sizeof(BlockNumber);
+ appendStringInfo(buf, " children: %u/%u",
+ leftChildBlkno, rightChildBlkno);
+ }
+ if (!(xlrec->flags & GIN_INSERT_ISDATA))
+ appendStringInfo(buf, " isdelete: %c",
+ (((ginxlogInsertEntry *) payload)->isDelete) ? 'T' : 'F');
+ else if (xlrec->flags & GIN_INSERT_ISLEAF)
+ appendStringInfo(buf, " nitem: %u",
+ (((ginxlogInsertDataLeaf *) payload)->nitem) ? 'T' : 'F');
+ else
+ appendStringInfo(buf, " pitem: %u-%u/%u",
+ PostingItemGetBlockNumber((PostingItem *) payload),
+ ItemPointerGetBlockNumber(&((PostingItem *) payload)->key),
+ ItemPointerGetOffsetNumber(&((PostingItem *) payload)->key));
+ }
break;
case XLOG_GIN_SPLIT:
appendStringInfoString(buf, "Page split, ");
desc_node(buf, ((ginxlogSplit *) rec)->node, ((ginxlogSplit *) rec)->lblkno);
- appendStringInfo(buf, " isrootsplit: %c", (((ginxlogSplit *) rec)->isRootSplit) ? 'T' : 'F');
+ appendStringInfo(buf, " isrootsplit: %c", (((ginxlogSplit *) rec)->flags & GIN_SPLIT_ROOT) ? 'T' : 'F');
break;
case XLOG_GIN_VACUUM_PAGE:
appendStringInfoString(buf, "Vacuum page, ");
extern void gin_desc(StringInfo buf, uint8 xl_info, char *rec);
extern void gin_xlog_startup(void);
extern void gin_xlog_cleanup(void);
-extern bool gin_safe_restartpoint(void);
#endif /* GIN_H */
#define GIN_META (1 << 3)
#define GIN_LIST (1 << 4)
#define GIN_LIST_FULLROW (1 << 5) /* makes sense only on GIN_LIST page */
+#define GIN_INCOMPLETE_SPLIT (1 << 6) /* page was split, but parent not updated */
/* Page numbers of fixed-location pages */
#define GIN_METAPAGE_BLKNO (0)
#define GinPageIsDeleted(page) ( GinPageGetOpaque(page)->flags & GIN_DELETED)
#define GinPageSetDeleted(page) ( GinPageGetOpaque(page)->flags |= GIN_DELETED)
#define GinPageSetNonDeleted(page) ( GinPageGetOpaque(page)->flags &= ~GIN_DELETED)
+#define GinPageIsIncompleteSplit(page) ( GinPageGetOpaque(page)->flags & GIN_INCOMPLETE_SPLIT)
#define GinPageRightMost(page) ( GinPageGetOpaque(page)->rightlink == InvalidBlockNumber)
{
RelFileNode node;
BlockNumber blkno;
- BlockNumber updateBlkno;
+ uint16 flags; /* GIN_SPLIT_ISLEAF and/or GIN_SPLIT_ISDATA */
OffsetNumber offset;
- bool isDelete;
- bool isData;
- bool isLeaf;
- OffsetNumber nitem;
/*
- * follows: tuples or ItemPointerData or PostingItem or list of
- * ItemPointerData
+ * FOLLOWS:
+ *
+ * 1. if not leaf page, block numbers of the left and right child pages
+ * whose split this insertion finishes. As BlockIdData[2] (beware of adding
+ * fields before this that would make them not 16-bit aligned)
+ *
+ * 2. one of the following structs, depending on tree type.
+ *
+ * NB: the below structs are only 16-bit aligned when appended to a
+ * ginxlogInsert struct! Beware of adding fields to them that require
+ * stricter alignment.
*/
} ginxlogInsert;
+typedef struct
+{
+ bool isDelete;
+ IndexTupleData tuple; /* variable length */
+} ginxlogInsertEntry;
+
+typedef struct
+{
+ OffsetNumber nitem;
+ ItemPointerData items[1]; /* variable length */
+} ginxlogInsertDataLeaf;
+
+/* In an insert to an internal data page, the payload is a PostingItem */
+
+
#define XLOG_GIN_SPLIT 0x30
typedef struct ginxlogSplit
{
RelFileNode node;
BlockNumber lblkno;
- BlockNumber rootBlkno;
BlockNumber rblkno;
- BlockNumber rrlink;
+ BlockNumber rrlink; /* right link, or root's blocknumber if root split */
+ BlockNumber leftChildBlkno; /* valid on a non-leaf split */
+ BlockNumber rightChildBlkno;
+ uint16 flags;
+
+ /* follows: one of the following structs */
+} ginxlogSplit;
+
+/*
+ * Flags used in ginxlogInsert and ginxlogSplit records
+ */
+#define GIN_INSERT_ISDATA 0x01 /* for both insert and split records */
+#define GIN_INSERT_ISLEAF 0x02 /* .. */
+#define GIN_SPLIT_ROOT 0x04 /* only for split records */
+
+typedef struct
+{
OffsetNumber separator;
OffsetNumber nitem;
- bool isData;
- bool isLeaf;
- bool isRootSplit;
+ /* FOLLOWS: IndexTuples */
+} ginxlogSplitEntry;
- BlockNumber leftChildBlkno;
- BlockNumber updateBlkno;
+typedef struct
+{
+ OffsetNumber separator;
+ OffsetNumber nitem;
+ ItemPointerData rightbound;
- ItemPointerData rightbound; /* used only in posting tree */
- /* follows: list of tuple or ItemPointerData or PostingItem */
-} ginxlogSplit;
+ /* FOLLOWS: array of ItemPointers (for leaf) or PostingItems (non-leaf) */
+} ginxlogSplitData;
#define XLOG_GIN_VACUUM_PAGE 0x40
bool (*placeToPage) (GinBtree, Buffer, OffsetNumber, void *, BlockNumber, XLogRecData **);
Page (*splitPage) (GinBtree, Buffer, Buffer, OffsetNumber, void *, BlockNumber, XLogRecData **);
void *(*prepareDownlink) (GinBtree, Buffer);
- void (*fillRoot) (GinBtree, Buffer, Buffer, Buffer);
+ void (*fillRoot) (GinBtree, Page, BlockNumber, Page, BlockNumber, Page);
bool isData;
extern void freeGinBtreeStack(GinBtreeStack *stack);
extern void ginInsertValue(GinBtree btree, GinBtreeStack *stack,
void *insertdata, GinStatsData *buildStats);
-extern void ginFindParents(GinBtree btree, GinBtreeStack *stack);
-extern void ginFinishSplit(GinBtree btree, GinBtreeStack *stack,
- GinStatsData *buildStats);
/* ginentrypage.c */
extern IndexTuple GinFormTuple(GinState *ginstate,
extern void ginPrepareEntryScan(GinBtree btree, OffsetNumber attnum,
Datum key, GinNullCategory category,
GinState *ginstate);
-extern void ginEntryFillRoot(GinBtree btree, Buffer root, Buffer lbuf, Buffer rbuf);
+extern void ginEntryFillRoot(GinBtree btree, Page root, BlockNumber lblkno, Page lpage, BlockNumber rblkno, Page rpage);
/* gindatapage.c */
extern BlockNumber createPostingTree(Relation index,
ItemPointerData *items, uint32 nitem,
GinStatsData *buildStats);
extern GinBtreeStack *ginScanBeginPostingTree(Relation index, BlockNumber rootBlkno);
-extern void ginDataFillRoot(GinBtree btree, Buffer root, Buffer lbuf, Buffer rbuf);
+extern void ginDataFillRoot(GinBtree btree, Page root, BlockNumber lblkno, Page lpage, BlockNumber rblkno, Page rpage);
extern void ginPrepareDataScan(GinBtree btree, Relation index, BlockNumber rootBlkno);
/* ginscan.c */
PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, NULL, NULL, NULL)
PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup, btree_safe_restartpoint)
PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, NULL, NULL, NULL)
-PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup, gin_safe_restartpoint)
+PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup, NULL)
PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup, NULL)
PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, NULL, NULL, NULL)
PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_xlog_startup, spg_xlog_cleanup, NULL)
/*
* Each page of XLOG file has a header like this:
*/
-#define XLOG_PAGE_MAGIC 0xD076 /* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD077 /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData
{