/*-------------------------------------------------------------------------
*
* ginentrypage.c
- * page utilities routines for the postgres inverted index access method.
+ * routines for handling GIN entry tree pages.
*
*
- * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
#include "postgres.h"
-#include "access/gin.h"
-#include "storage/bufmgr.h"
+#include "access/gin_private.h"
+#include "access/xloginsert.h"
+#include "miscadmin.h"
#include "utils/rel.h"
+static void entrySplitPage(GinBtree btree, Buffer origbuf,
+ GinBtreeStack *stack,
+ void *insertPayload,
+ BlockNumber updateblkno,
+ Page *newlpage, Page *newrpage);
+
/*
* Form a tuple for entry tree.
*
* If the tuple would be too big to be stored, function throws a suitable
* error if errorTooBig is TRUE, or returns NULL if errorTooBig is FALSE.
*
- * On leaf pages, Index tuple has non-traditional layout. Tuple may contain
- * posting list or root blocknumber of posting tree.
- * Macros: GinIsPostingTree(itup) / GinSetPostingTree(itup, blkno)
- * 1) Posting list
- * - itup->t_info & INDEX_SIZE_MASK contains total size of tuple as usual
- * - ItemPointerGetBlockNumber(&itup->t_tid) contains original
- * size of tuple (without posting list).
- * Macros: GinGetOrigSizePosting(itup) / GinSetOrigSizePosting(itup,n)
- * - ItemPointerGetOffsetNumber(&itup->t_tid) contains number
- * of elements in posting list (number of heap itempointers)
- * Macros: GinGetNPosting(itup) / GinSetNPosting(itup,n)
- * - After standard part of tuple there is a posting list, ie, array
- * of heap itempointers
- * Macros: GinGetPosting(itup)
- * 2) Posting tree
- * - itup->t_info & INDEX_SIZE_MASK contains size of tuple as usual
- * - ItemPointerGetBlockNumber(&itup->t_tid) contains block number of
- * root of posting tree
- * - ItemPointerGetOffsetNumber(&itup->t_tid) contains magic number
- * GIN_TREE_POSTING, which distinguishes this from posting-list case
- *
- * Attributes of an index tuple are different for single and multicolumn index.
- * For single-column case, index tuple stores only value to be indexed.
- * For multicolumn case, it stores two attributes: column number of value
- * and value.
+ * See src/backend/access/gin/README for a description of the index tuple
+ * format that is being built here. We build on the assumption that we
+ * are making a leaf-level key entry containing a posting list of nipd items.
+ * If the caller is actually trying to make a posting-tree entry, non-leaf
+ * entry, or pending-list entry, it should pass dataSize = 0 and then overwrite
+ * the t_tid fields as necessary. In any case, 'data' can be NULL to skip
+ * filling in the posting list; the caller is responsible for filling it
+ * afterwards if data = NULL and nipd > 0.
*/
IndexTuple
-GinFormTuple(Relation index, GinState *ginstate,
- OffsetNumber attnum, Datum key,
- ItemPointerData *ipd, uint32 nipd, bool errorTooBig)
+GinFormTuple(GinState *ginstate,
+ OffsetNumber attnum, Datum key, GinNullCategory category,
+ Pointer data, Size dataSize, int nipd,
+ bool errorTooBig)
{
- bool isnull[2] = {FALSE, FALSE};
+ Datum datums[2];
+ bool isnull[2];
IndexTuple itup;
uint32 newsize;
+ /* Build the basic tuple: optional column number, plus key datum */
if (ginstate->oneCol)
- itup = index_form_tuple(ginstate->origTupdesc, &key, isnull);
+ {
+ datums[0] = key;
+ isnull[0] = (category != GIN_CAT_NORM_KEY);
+ }
else
{
- Datum datums[2];
-
datums[0] = UInt16GetDatum(attnum);
+ isnull[0] = false;
datums[1] = key;
- itup = index_form_tuple(ginstate->tupdesc[attnum - 1], datums, isnull);
+ isnull[1] = (category != GIN_CAT_NORM_KEY);
}
- GinSetOrigSizePosting(itup, IndexTupleSize(itup));
+ itup = index_form_tuple(ginstate->tupdesc[attnum - 1], datums, isnull);
+
+ /*
+ * Determine and store offset to the posting list, making sure there is
+ * room for the category byte if needed.
+ *
+ * Note: because index_form_tuple MAXALIGNs the tuple size, there may well
+ * be some wasted pad space. Is it worth recomputing the data length to
+ * prevent that? That would also allow us to Assert that the real data
+ * doesn't overlap the GinNullCategory byte, which this code currently
+ * takes on faith.
+ */
+ newsize = IndexTupleSize(itup);
- if (nipd > 0)
+ if (IndexTupleHasNulls(itup))
{
- newsize = MAXALIGN(SHORTALIGN(IndexTupleSize(itup)) + sizeof(ItemPointerData) * nipd);
- if (newsize > Min(INDEX_SIZE_MASK, GinMaxItemSize))
- {
- if (errorTooBig)
- ereport(ERROR,
- (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
- errmsg("index row size %lu exceeds maximum %lu for index \"%s\"",
- (unsigned long) newsize,
- (unsigned long) Min(INDEX_SIZE_MASK,
- GinMaxItemSize),
- RelationGetRelationName(index))));
- return NULL;
- }
+ uint32 minsize;
- itup = repalloc(itup, newsize);
+ Assert(category != GIN_CAT_NORM_KEY);
+ minsize = GinCategoryOffset(itup, ginstate) + sizeof(GinNullCategory);
+ newsize = Max(newsize, minsize);
+ }
- /* set new size */
- itup->t_info &= ~INDEX_SIZE_MASK;
- itup->t_info |= newsize;
+ newsize = SHORTALIGN(newsize);
+
+ GinSetPostingOffset(itup, newsize);
+ GinSetNPosting(itup, nipd);
- if (ipd)
- memcpy(GinGetPosting(itup), ipd, sizeof(ItemPointerData) * nipd);
- GinSetNPosting(itup, nipd);
+ /*
+ * Add space needed for posting list, if any. Then check that the tuple
+ * won't be too big to store.
+ */
+ newsize += dataSize;
+
+ newsize = MAXALIGN(newsize);
+
+ if (newsize > GinMaxItemSize)
+ {
+ if (errorTooBig)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
+ (Size) newsize, (Size) GinMaxItemSize,
+ RelationGetRelationName(ginstate->index))));
+ pfree(itup);
+ return NULL;
}
- else
+
+ /*
+ * Resize tuple if needed
+ */
+ if (newsize != IndexTupleSize(itup))
{
+ itup = repalloc(itup, newsize);
+
/*
- * Gin tuple without any ItemPointers should be large enough to keep
- * one ItemPointer, to prevent inconsistency between
- * ginHeapTupleFastCollect and ginEntryInsert called by
- * ginHeapTupleInsert. ginHeapTupleFastCollect forms tuple without
- * extra pointer to heap, but ginEntryInsert (called for pending list
- * cleanup during vacuum) will form the same tuple with one
- * ItemPointer.
+ * PostgreSQL 9.3 and earlier did not clear this new space, so we
+ * might find uninitialized padding when reading tuples from disk.
*/
- newsize = MAXALIGN(SHORTALIGN(IndexTupleSize(itup)) + sizeof(ItemPointerData));
- if (newsize > Min(INDEX_SIZE_MASK, GinMaxItemSize))
- {
- if (errorTooBig)
- ereport(ERROR,
- (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
- errmsg("index row size %lu exceeds maximum %lu for index \"%s\"",
- (unsigned long) newsize,
- (unsigned long) Min(INDEX_SIZE_MASK,
- GinMaxItemSize),
- RelationGetRelationName(index))));
- return NULL;
- }
+ memset((char *) itup + IndexTupleSize(itup),
+ 0, newsize - IndexTupleSize(itup));
+ /* set new size in tuple header */
+ itup->t_info &= ~INDEX_SIZE_MASK;
+ itup->t_info |= newsize;
+ }
+
+ /*
+ * Copy in the posting list, if provided
+ */
+ if (data)
+ {
+ char *ptr = GinGetPosting(itup);
- GinSetNPosting(itup, 0);
+ memcpy(ptr, data, dataSize);
+ }
+
+ /*
+ * Insert category byte, if needed
+ */
+ if (category != GIN_CAT_NORM_KEY)
+ {
+ Assert(IndexTupleHasNulls(itup));
+ GinSetNullCategory(itup, ginstate, category);
}
return itup;
}
/*
- * Sometimes we reduce the number of posting list items in a tuple after
- * having built it with GinFormTuple. This function adjusts the size
- * fields to match.
+ * Read item pointers from leaf entry tuple.
+ *
+ * Returns a palloc'd array of ItemPointers. The number of items is returned
+ * in *nitems.
*/
-void
-GinShortenTuple(IndexTuple itup, uint32 nipd)
+ItemPointer
+ginReadTuple(GinState *ginstate, OffsetNumber attnum, IndexTuple itup,
+ int *nitems)
{
- uint32 newsize;
+ Pointer ptr = GinGetPosting(itup);
+ int nipd = GinGetNPosting(itup);
+ ItemPointer ipd;
+ int ndecoded;
- Assert(nipd <= GinGetNPosting(itup));
+ if (GinItupIsCompressed(itup))
+ {
+ if (nipd > 0)
+ {
+ ipd = ginPostingListDecode((GinPostingList *) ptr, &ndecoded);
+ if (nipd != ndecoded)
+ elog(ERROR, "number of items mismatch in GIN entry tuple, %d in tuple header, %d decoded",
+ nipd, ndecoded);
+ }
+ else
+ {
+ ipd = palloc(0);
+ }
+ }
+ else
+ {
+ ipd = (ItemPointer) palloc(sizeof(ItemPointerData) * nipd);
+ memcpy(ipd, ptr, sizeof(ItemPointerData) * nipd);
+ }
+ *nitems = nipd;
+ return ipd;
+}
- newsize = MAXALIGN(SHORTALIGN(GinGetOrigSizePosting(itup)) + sizeof(ItemPointerData) * nipd);
+/*
+ * Form a non-leaf entry tuple by copying the key data from the given tuple,
+ * which can be either a leaf or non-leaf entry tuple.
+ *
+ * Any posting list in the source tuple is not copied. The specified child
+ * block number is inserted into t_tid.
+ */
+static IndexTuple
+GinFormInteriorTuple(IndexTuple itup, Page page, BlockNumber childblk)
+{
+ IndexTuple nitup;
- Assert(newsize <= (itup->t_info & INDEX_SIZE_MASK));
+ if (GinPageIsLeaf(page) && !GinIsPostingTree(itup))
+ {
+ /* Tuple contains a posting list, just copy stuff before that */
+ uint32 origsize = GinGetPostingOffset(itup);
- itup->t_info &= ~INDEX_SIZE_MASK;
- itup->t_info |= newsize;
+ origsize = MAXALIGN(origsize);
+ nitup = (IndexTuple) palloc(origsize);
+ memcpy(nitup, itup, origsize);
+ /* ... be sure to fix the size header field ... */
+ nitup->t_info &= ~INDEX_SIZE_MASK;
+ nitup->t_info |= origsize;
+ }
+ else
+ {
+ /* Copy the tuple as-is */
+ nitup = (IndexTuple) palloc(IndexTupleSize(itup));
+ memcpy(nitup, itup, IndexTupleSize(itup));
+ }
- GinSetNPosting(itup, nipd);
+ /* Now insert the correct downlink */
+ GinSetDownlink(nitup, childblk);
+
+ return nitup;
}
/*
* Entry tree is a "static", ie tuple never deletes from it,
- * so we don't use right bound, we use rightest key instead.
+ * so we don't use right bound, we use rightmost key instead.
*/
static IndexTuple
getRightMostTuple(Page page)
entryIsMoveRight(GinBtree btree, Page page)
{
IndexTuple itup;
+ OffsetNumber attnum;
+ Datum key;
+ GinNullCategory category;
if (GinPageRightMost(page))
return FALSE;
itup = getRightMostTuple(page);
+ attnum = gintuple_get_attrnum(btree->ginstate, itup);
+ key = gintuple_get_key(btree->ginstate, itup, &category);
- if (compareAttEntries(btree->ginstate,
- btree->entryAttnum, btree->entryValue,
- gintuple_get_attrnum(btree->ginstate, itup),
- gin_index_getattr(btree->ginstate, itup)) > 0)
+ if (ginCompareAttEntries(btree->ginstate,
+ btree->entryAttnum, btree->entryKey, btree->entryCategory,
+ attnum, key, category) > 0)
return TRUE;
return FALSE;
/*
* Find correct tuple in non-leaf page. It supposed that
- * page correctly choosen and searching value SHOULD be on page
+ * page correctly chosen and searching value SHOULD be on page
*/
static BlockNumber
entryLocateEntry(GinBtree btree, GinBtreeStack *stack)
{
stack->off = FirstOffsetNumber;
stack->predictNumber *= PageGetMaxOffsetNumber(page);
- return btree->getLeftMostPage(btree, page);
+ return btree->getLeftMostChild(btree, page);
}
low = FirstOffsetNumber;
OffsetNumber mid = low + ((high - low) / 2);
if (mid == maxoff && GinPageRightMost(page))
+ {
/* Right infinity */
result = -1;
+ }
else
{
+ OffsetNumber attnum;
+ Datum key;
+ GinNullCategory category;
+
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, mid));
- result = compareAttEntries(btree->ginstate,
- btree->entryAttnum, btree->entryValue,
- gintuple_get_attrnum(btree->ginstate, itup),
- gin_index_getattr(btree->ginstate, itup));
+ attnum = gintuple_get_attrnum(btree->ginstate, itup);
+ key = gintuple_get_key(btree->ginstate, itup, &category);
+ result = ginCompareAttEntries(btree->ginstate,
+ btree->entryAttnum,
+ btree->entryKey,
+ btree->entryCategory,
+ attnum, key, category);
}
if (result == 0)
{
stack->off = mid;
- Assert(GinItemPointerGetBlockNumber(&(itup)->t_tid) != GIN_ROOT_BLKNO);
- return GinItemPointerGetBlockNumber(&(itup)->t_tid);
+ Assert(GinGetDownlink(itup) != GIN_ROOT_BLKNO);
+ return GinGetDownlink(itup);
}
else if (result > 0)
low = mid + 1;
stack->off = high;
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, high));
- Assert(GinItemPointerGetBlockNumber(&(itup)->t_tid) != GIN_ROOT_BLKNO);
- return GinItemPointerGetBlockNumber(&(itup)->t_tid);
+ Assert(GinGetDownlink(itup) != GIN_ROOT_BLKNO);
+ return GinGetDownlink(itup);
}
/*
* Searches correct position for value on leaf page.
- * Page should be corrrectly choosen.
+ * Page should be correctly chosen.
* Returns true if value found on page.
*/
static bool
Page page = BufferGetPage(stack->buffer);
OffsetNumber low,
high;
- IndexTuple itup;
Assert(GinPageIsLeaf(page));
Assert(!GinPageIsData(page));
while (high > low)
{
OffsetNumber mid = low + ((high - low) / 2);
+ IndexTuple itup;
+ OffsetNumber attnum;
+ Datum key;
+ GinNullCategory category;
int result;
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, mid));
- result = compareAttEntries(btree->ginstate,
- btree->entryAttnum, btree->entryValue,
- gintuple_get_attrnum(btree->ginstate, itup),
- gin_index_getattr(btree->ginstate, itup));
+ attnum = gintuple_get_attrnum(btree->ginstate, itup);
+ key = gintuple_get_key(btree->ginstate, itup, &category);
+ result = ginCompareAttEntries(btree->ginstate,
+ btree->entryAttnum,
+ btree->entryKey,
+ btree->entryCategory,
+ attnum, key, category);
if (result == 0)
{
stack->off = mid;
if (storedOff >= FirstOffsetNumber && storedOff <= maxoff)
{
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, storedOff));
- if (GinItemPointerGetBlockNumber(&(itup)->t_tid) == blkno)
+ if (GinGetDownlink(itup) == blkno)
return storedOff;
/*
for (i = storedOff + 1; i <= maxoff; i++)
{
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
- if (GinItemPointerGetBlockNumber(&(itup)->t_tid) == blkno)
+ if (GinGetDownlink(itup) == blkno)
return i;
}
maxoff = storedOff - 1;
for (i = FirstOffsetNumber; i <= maxoff; i++)
{
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
- if (GinItemPointerGetBlockNumber(&(itup)->t_tid) == blkno)
+ if (GinGetDownlink(itup) == blkno)
return i;
}
Assert(PageGetMaxOffsetNumber(page) >= FirstOffsetNumber);
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, FirstOffsetNumber));
- return GinItemPointerGetBlockNumber(&(itup)->t_tid);
+ return GinGetDownlink(itup);
}
static bool
-entryIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off)
+entryIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off,
+ GinBtreeEntryInsertData *insertData)
{
- Size itupsz = 0;
+ Size releasedsz = 0;
+ Size addedsz;
Page page = BufferGetPage(buf);
- Assert(btree->entry);
+ Assert(insertData->entry);
Assert(!GinPageIsData(page));
- if (btree->isDelete)
+ if (insertData->isDelete)
{
IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, off));
- itupsz = MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData);
+ releasedsz = MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData);
}
- if (PageGetFreeSpace(page) + itupsz >= MAXALIGN(IndexTupleSize(btree->entry)) + sizeof(ItemIdData))
+ addedsz = MAXALIGN(IndexTupleSize(insertData->entry)) + sizeof(ItemIdData);
+
+ if (PageGetFreeSpace(page) + releasedsz >= addedsz)
return true;
return false;
}
/*
- * Delete tuple on leaf page if tuples was existed and we
+ * Delete tuple on leaf page if tuples existed and we
* should update it, update old child blkno to new right page
- * if child split is occured
+ * if child split occurred
*/
-static BlockNumber
-entryPreparePage(GinBtree btree, Page page, OffsetNumber off)
+static void
+entryPreparePage(GinBtree btree, Page page, OffsetNumber off,
+ GinBtreeEntryInsertData *insertData, BlockNumber updateblkno)
{
- BlockNumber ret = InvalidBlockNumber;
-
- Assert(btree->entry);
+ Assert(insertData->entry);
Assert(!GinPageIsData(page));
- if (btree->isDelete)
+ if (insertData->isDelete)
{
Assert(GinPageIsLeaf(page));
PageIndexTupleDelete(page, off);
}
- if (!GinPageIsLeaf(page) && btree->rightblkno != InvalidBlockNumber)
+ if (!GinPageIsLeaf(page) && updateblkno != InvalidBlockNumber)
{
IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, off));
- ItemPointerSet(&itup->t_tid, btree->rightblkno, InvalidOffsetNumber);
- ret = btree->rightblkno;
+ GinSetDownlink(itup, updateblkno);
}
-
- btree->rightblkno = InvalidBlockNumber;
-
- return ret;
}
/*
* Place tuple on page and fills WAL record
+ *
+ * If the tuple doesn't fit, returns false without modifying the page.
+ *
+ * On insertion to an internal node, in addition to inserting the given item,
+ * the downlink of the existing item at 'off' is updated to point to
+ * 'updateblkno'.
+ *
+ * On INSERTED, registers the buffer as buffer ID 0, with data.
+ * On SPLIT, returns rdata that represents the split pages in *prdata.
*/
-static void
-entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off, XLogRecData **prdata)
+static GinPlaceToPageRC
+entryPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
+ void *insertPayload, BlockNumber updateblkno,
+ Page *newlpage, Page *newrpage)
{
+ GinBtreeEntryInsertData *insertData = insertPayload;
Page page = BufferGetPage(buf);
- static XLogRecData rdata[3];
+ OffsetNumber off = stack->off;
OffsetNumber placed;
- static ginxlogInsert data;
- int cnt = 0;
- *prdata = rdata;
- data.updateBlkno = entryPreparePage(btree, page, off);
+ /* this must be static so it can be returned to caller. */
+ static ginxlogInsertEntry data;
- placed = PageAddItem(page, (Item) btree->entry, IndexTupleSize(btree->entry), off, false, false);
- if (placed != off)
- elog(ERROR, "failed to add item to index page in \"%s\"",
- RelationGetRelationName(btree->index));
-
- data.node = btree->index->rd_node;
- data.blkno = BufferGetBlockNumber(buf);
- data.offset = off;
- data.nitem = 1;
- data.isDelete = btree->isDelete;
- data.isData = false;
- data.isLeaf = GinPageIsLeaf(page) ? TRUE : FALSE;
-
- /*
- * Prevent full page write if child's split occurs. That is needed to
- * remove incomplete splits while replaying WAL
- *
- * data.updateBlkno contains new block number (of newly created right
- * page) for recently splited page.
- */
- if (data.updateBlkno == InvalidBlockNumber)
+ /* quick exit if it doesn't fit */
+ if (!entryIsEnoughSpace(btree, buf, off, insertData))
{
- rdata[0].buffer = buf;
- rdata[0].buffer_std = TRUE;
- rdata[0].data = NULL;
- rdata[0].len = 0;
- rdata[0].next = &rdata[1];
- cnt++;
+ entrySplitPage(btree, buf, stack, insertPayload, updateblkno,
+ newlpage, newrpage);
+ return SPLIT;
}
- rdata[cnt].buffer = InvalidBuffer;
- rdata[cnt].data = (char *) &data;
- rdata[cnt].len = sizeof(ginxlogInsert);
- rdata[cnt].next = &rdata[cnt + 1];
- cnt++;
-
- rdata[cnt].buffer = InvalidBuffer;
- rdata[cnt].data = (char *) btree->entry;
- rdata[cnt].len = IndexTupleSize(btree->entry);
- rdata[cnt].next = NULL;
+ START_CRIT_SECTION();
- btree->entry = NULL;
-}
+ entryPreparePage(btree, page, off, insertData, updateblkno);
-/*
- * Returns new tuple with copied value from source tuple.
- * New tuple will not store posting list
- */
-static IndexTuple
-copyIndexTuple(IndexTuple itup, Page page)
-{
- IndexTuple nitup;
+ placed = PageAddItem(page,
+ (Item) insertData->entry,
+ IndexTupleSize(insertData->entry),
+ off, false, false);
+ if (placed != off)
+ elog(ERROR, "failed to add item to index page in \"%s\"",
+ RelationGetRelationName(btree->index));
- if (GinPageIsLeaf(page) && !GinIsPostingTree(itup))
+ if (RelationNeedsWAL(btree->index))
{
- nitup = (IndexTuple) palloc(MAXALIGN(GinGetOrigSizePosting(itup)));
- memcpy(nitup, itup, GinGetOrigSizePosting(itup));
- nitup->t_info &= ~INDEX_SIZE_MASK;
- nitup->t_info |= GinGetOrigSizePosting(itup);
- }
- else
- {
- nitup = (IndexTuple) palloc(MAXALIGN(IndexTupleSize(itup)));
- memcpy(nitup, itup, IndexTupleSize(itup));
+ data.isDelete = insertData->isDelete;
+ data.offset = off;
+
+ XLogBeginInsert();
+ XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
+ XLogRegisterBufData(0, (char *) &data,
+ offsetof(ginxlogInsertEntry, tuple));
+ XLogRegisterBufData(0, (char *) insertData->entry,
+ IndexTupleSize(insertData->entry));
}
- return nitup;
+ return INSERTED;
}
/*
* Place tuple and split page, original buffer(lbuf) leaves untouched,
- * returns shadow page of lbuf filled new data.
+ * returns shadow pages filled with new data.
* Tuples are distributed between pages by equal size on its, not
* an equal number!
*/
-static Page
-entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRecData **prdata)
+static void
+entrySplitPage(GinBtree btree, Buffer origbuf,
+ GinBtreeStack *stack,
+ void *insertPayload,
+ BlockNumber updateblkno,
+ Page *newlpage, Page *newrpage)
{
- static XLogRecData rdata[2];
+ GinBtreeEntryInsertData *insertData = insertPayload;
+ OffsetNumber off = stack->off;
OffsetNumber i,
maxoff,
separator = InvalidOffsetNumber;
Size totalsize = 0;
Size lsize = 0,
size;
- static char tupstore[2 * BLCKSZ];
char *ptr;
- IndexTuple itup,
- leftrightmost = NULL;
- static ginxlogSplit data;
+ IndexTuple itup;
Page page;
- Page lpage = PageGetTempPageCopy(BufferGetPage(lbuf));
- Page rpage = BufferGetPage(rbuf);
+ Page lpage = PageGetTempPageCopy(BufferGetPage(origbuf));
+ Page rpage = PageGetTempPageCopy(BufferGetPage(origbuf));
Size pageSize = PageGetPageSize(lpage);
+ char tupstore[2 * BLCKSZ];
- *prdata = rdata;
- data.leftChildBlkno = (GinPageIsLeaf(lpage)) ?
- InvalidOffsetNumber : GinItemPointerGetBlockNumber(&(btree->entry->t_tid));
- data.updateBlkno = entryPreparePage(btree, lpage, off);
+ entryPreparePage(btree, lpage, off, insertData, updateblkno);
+ /*
+ * First, append all the existing tuples and the new tuple we're inserting
+ * one after another in a temporary workspace.
+ */
maxoff = PageGetMaxOffsetNumber(lpage);
ptr = tupstore;
-
for (i = FirstOffsetNumber; i <= maxoff; i++)
{
if (i == off)
{
- size = MAXALIGN(IndexTupleSize(btree->entry));
- memcpy(ptr, btree->entry, size);
+ size = MAXALIGN(IndexTupleSize(insertData->entry));
+ memcpy(ptr, insertData->entry, size);
ptr += size;
totalsize += size + sizeof(ItemIdData);
}
if (off == maxoff + 1)
{
- size = MAXALIGN(IndexTupleSize(btree->entry));
- memcpy(ptr, btree->entry, size);
+ size = MAXALIGN(IndexTupleSize(insertData->entry));
+ memcpy(ptr, insertData->entry, size);
ptr += size;
totalsize += size + sizeof(ItemIdData);
}
+ /*
+ * Initialize the left and right pages, and copy all the tuples back to
+ * them.
+ */
GinInitPage(rpage, GinPageGetOpaque(lpage)->flags, pageSize);
GinInitPage(lpage, GinPageGetOpaque(rpage)->flags, pageSize);
}
else
{
- leftrightmost = itup;
lsize += MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData);
}
ptr += MAXALIGN(IndexTupleSize(itup));
}
- btree->entry = copyIndexTuple(leftrightmost, lpage);
- ItemPointerSet(&(btree->entry)->t_tid, BufferGetBlockNumber(lbuf), InvalidOffsetNumber);
-
- btree->rightblkno = BufferGetBlockNumber(rbuf);
-
- data.node = btree->index->rd_node;
- data.rootBlkno = InvalidBlockNumber;
- data.lblkno = BufferGetBlockNumber(lbuf);
- data.rblkno = BufferGetBlockNumber(rbuf);
- data.separator = separator;
- data.nitem = maxoff;
- data.isData = FALSE;
- data.isLeaf = GinPageIsLeaf(lpage) ? TRUE : FALSE;
- data.isRootSplit = FALSE;
-
- rdata[0].buffer = InvalidBuffer;
- rdata[0].data = (char *) &data;
- rdata[0].len = sizeof(ginxlogSplit);
- rdata[0].next = &rdata[1];
-
- rdata[1].buffer = InvalidBuffer;
- rdata[1].data = tupstore;
- rdata[1].len = MAXALIGN(totalsize);
- rdata[1].next = NULL;
-
- return lpage;
+ *newlpage = lpage;
+ *newrpage = rpage;
}
/*
- * return newly allocated rightmost tuple
+ * Construct insertion payload for inserting the downlink for given buffer.
*/
-IndexTuple
-ginPageGetLinkItup(Buffer buf)
+static void *
+entryPrepareDownlink(GinBtree btree, Buffer lbuf)
{
- IndexTuple itup,
- nitup;
- Page page = BufferGetPage(buf);
+ GinBtreeEntryInsertData *insertData;
+ Page lpage = BufferGetPage(lbuf);
+ BlockNumber lblkno = BufferGetBlockNumber(lbuf);
+ IndexTuple itup;
- itup = getRightMostTuple(page);
- nitup = copyIndexTuple(itup, page);
- ItemPointerSet(&nitup->t_tid, BufferGetBlockNumber(buf), InvalidOffsetNumber);
+ itup = getRightMostTuple(lpage);
- return nitup;
+ insertData = palloc(sizeof(GinBtreeEntryInsertData));
+ insertData->entry = GinFormInteriorTuple(itup, lpage, lblkno);
+ insertData->isDelete = false;
+
+ return insertData;
}
/*
* Also called from ginxlog, should not use btree
*/
void
-entryFillRoot(GinBtree btree, Buffer root, Buffer lbuf, Buffer rbuf)
+ginEntryFillRoot(GinBtree btree, Page root,
+ BlockNumber lblkno, Page lpage,
+ BlockNumber rblkno, Page rpage)
{
- Page page;
IndexTuple itup;
- page = BufferGetPage(root);
-
- itup = ginPageGetLinkItup(lbuf);
- if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
+ itup = GinFormInteriorTuple(getRightMostTuple(lpage), lpage, lblkno);
+ if (PageAddItem(root, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add item to index root page");
pfree(itup);
- itup = ginPageGetLinkItup(rbuf);
- if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
+ itup = GinFormInteriorTuple(getRightMostTuple(rpage), rpage, rblkno);
+ if (PageAddItem(root, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add item to index root page");
pfree(itup);
}
+/*
+ * Set up GinBtree for entry page access
+ *
+ * Note: during WAL recovery, there may be no valid data in ginstate
+ * other than a faked-up Relation pointer; the key datum is bogus too.
+ */
void
-prepareEntryScan(GinBtree btree, Relation index, OffsetNumber attnum, Datum value, GinState *ginstate)
+ginPrepareEntryScan(GinBtree btree, OffsetNumber attnum,
+ Datum key, GinNullCategory category,
+ GinState *ginstate)
{
memset(btree, 0, sizeof(GinBtreeData));
- btree->index = index;
+ btree->index = ginstate->index;
+ btree->rootBlkno = GIN_ROOT_BLKNO;
btree->ginstate = ginstate;
btree->findChildPage = entryLocateEntry;
+ btree->getLeftMostChild = entryGetLeftMostPage;
btree->isMoveRight = entryIsMoveRight;
btree->findItem = entryLocateLeafEntry;
btree->findChildPtr = entryFindChildPtr;
- btree->getLeftMostPage = entryGetLeftMostPage;
- btree->isEnoughSpace = entryIsEnoughSpace;
btree->placeToPage = entryPlaceToPage;
- btree->splitPage = entrySplitPage;
- btree->fillRoot = entryFillRoot;
+ btree->fillRoot = ginEntryFillRoot;
+ btree->prepareDownlink = entryPrepareDownlink;
btree->isData = FALSE;
- btree->searchMode = FALSE;
btree->fullScan = FALSE;
btree->isBuild = FALSE;
btree->entryAttnum = attnum;
- btree->entryValue = value;
- btree->isDelete = FALSE;
+ btree->entryKey = key;
+ btree->entryCategory = category;
}