/*-------------------------------------------------------------------------
*
* ginentrypage.c
- * page utilities routines for the postgres inverted index access method.
+ * routines for handling GIN entry tree pages.
*
*
- * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
#include "postgres.h"
#include "access/gin_private.h"
-#include "storage/bufmgr.h"
+#include "access/xloginsert.h"
+#include "miscadmin.h"
#include "utils/rel.h"
+static void entrySplitPage(GinBtree btree, Buffer origbuf,
+ GinBtreeStack *stack,
+ void *insertPayload,
+ BlockNumber updateblkno,
+ Page *newlpage, Page *newrpage);
+
/*
* Form a tuple for entry tree.
*
* format that is being built here. We build on the assumption that we
* are making a leaf-level key entry containing a posting list of nipd items.
* If the caller is actually trying to make a posting-tree entry, non-leaf
- * entry, or pending-list entry, it should pass nipd = 0 and then overwrite
- * the t_tid fields as necessary. In any case, ipd can be NULL to skip
- * copying any itempointers into the posting list; the caller is responsible
- * for filling the posting list afterwards, if ipd = NULL and nipd > 0.
+ * entry, or pending-list entry, it should pass dataSize = 0 and then overwrite
+ * the t_tid fields as necessary. In any case, 'data' can be NULL to skip
+ * filling in the posting list; the caller is responsible for filling it
+ * afterwards if data = NULL and nipd > 0.
*/
IndexTuple
GinFormTuple(GinState *ginstate,
OffsetNumber attnum, Datum key, GinNullCategory category,
- ItemPointerData *ipd, uint32 nipd,
+ Pointer data, Size dataSize, int nipd,
bool errorTooBig)
{
Datum datums[2];
newsize = SHORTALIGN(newsize);
GinSetPostingOffset(itup, newsize);
-
GinSetNPosting(itup, nipd);
/*
* Add space needed for posting list, if any. Then check that the tuple
* won't be too big to store.
*/
- newsize += sizeof(ItemPointerData) * nipd;
+ newsize += dataSize;
+
newsize = MAXALIGN(newsize);
- if (newsize > Min(INDEX_SIZE_MASK, GinMaxItemSize))
+
+ if (newsize > GinMaxItemSize)
{
if (errorTooBig)
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
- errmsg("index row size %lu exceeds maximum %lu for index \"%s\"",
- (unsigned long) newsize,
- (unsigned long) Min(INDEX_SIZE_MASK,
- GinMaxItemSize),
- RelationGetRelationName(ginstate->index))));
+ errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
+ (Size) newsize, (Size) GinMaxItemSize,
+ RelationGetRelationName(ginstate->index))));
pfree(itup);
return NULL;
}
{
itup = repalloc(itup, newsize);
+ /*
+ * PostgreSQL 9.3 and earlier did not clear this new space, so we
+ * might find uninitialized padding when reading tuples from disk.
+ */
+ memset((char *) itup + IndexTupleSize(itup),
+ 0, newsize - IndexTupleSize(itup));
/* set new size in tuple header */
itup->t_info &= ~INDEX_SIZE_MASK;
itup->t_info |= newsize;
}
+ /*
+ * Copy in the posting list, if provided
+ */
+ if (data)
+ {
+ char *ptr = GinGetPosting(itup);
+
+ memcpy(ptr, data, dataSize);
+ }
+
/*
* Insert category byte, if needed
*/
Assert(IndexTupleHasNulls(itup));
GinSetNullCategory(itup, ginstate, category);
}
-
- /*
- * Copy in the posting list, if provided
- */
- if (ipd)
- memcpy(GinGetPosting(itup), ipd, sizeof(ItemPointerData) * nipd);
-
return itup;
}
/*
- * Sometimes we reduce the number of posting list items in a tuple after
- * having built it with GinFormTuple. This function adjusts the size
- * fields to match.
+ * Read item pointers from leaf entry tuple.
+ *
+ * Returns a palloc'd array of ItemPointers. The number of items is returned
+ * in *nitems.
*/
-void
-GinShortenTuple(IndexTuple itup, uint32 nipd)
+ItemPointer
+ginReadTuple(GinState *ginstate, OffsetNumber attnum, IndexTuple itup,
+ int *nitems)
{
- uint32 newsize;
-
- Assert(nipd <= GinGetNPosting(itup));
-
- newsize = GinGetPostingOffset(itup) + sizeof(ItemPointerData) * nipd;
- newsize = MAXALIGN(newsize);
-
- Assert(newsize <= (itup->t_info & INDEX_SIZE_MASK));
+ Pointer ptr = GinGetPosting(itup);
+ int nipd = GinGetNPosting(itup);
+ ItemPointer ipd;
+ int ndecoded;
- itup->t_info &= ~INDEX_SIZE_MASK;
- itup->t_info |= newsize;
-
- GinSetNPosting(itup, nipd);
+ if (GinItupIsCompressed(itup))
+ {
+ if (nipd > 0)
+ {
+ ipd = ginPostingListDecode((GinPostingList *) ptr, &ndecoded);
+ if (nipd != ndecoded)
+ elog(ERROR, "number of items mismatch in GIN entry tuple, %d in tuple header, %d decoded",
+ nipd, ndecoded);
+ }
+ else
+ {
+ ipd = palloc(0);
+ }
+ }
+ else
+ {
+ ipd = (ItemPointer) palloc(sizeof(ItemPointerData) * nipd);
+ memcpy(ipd, ptr, sizeof(ItemPointerData) * nipd);
+ }
+ *nitems = nipd;
+ return ipd;
}
/*
key = gintuple_get_key(btree->ginstate, itup, &category);
if (ginCompareAttEntries(btree->ginstate,
- btree->entryAttnum, btree->entryKey, btree->entryCategory,
+ btree->entryAttnum, btree->entryKey, btree->entryCategory,
attnum, key, category) > 0)
return TRUE;
{
stack->off = FirstOffsetNumber;
stack->predictNumber *= PageGetMaxOffsetNumber(page);
- return btree->getLeftMostPage(btree, page);
+ return btree->getLeftMostChild(btree, page);
}
low = FirstOffsetNumber;
}
static bool
-entryIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off)
+entryIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off,
+ GinBtreeEntryInsertData *insertData)
{
- Size itupsz = 0;
+ Size releasedsz = 0;
+ Size addedsz;
Page page = BufferGetPage(buf);
- Assert(btree->entry);
+ Assert(insertData->entry);
Assert(!GinPageIsData(page));
- if (btree->isDelete)
+ if (insertData->isDelete)
{
IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, off));
- itupsz = MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData);
+ releasedsz = MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData);
}
- if (PageGetFreeSpace(page) + itupsz >= MAXALIGN(IndexTupleSize(btree->entry)) + sizeof(ItemIdData))
+ addedsz = MAXALIGN(IndexTupleSize(insertData->entry)) + sizeof(ItemIdData);
+
+ if (PageGetFreeSpace(page) + releasedsz >= addedsz)
return true;
return false;
}
/*
- * Delete tuple on leaf page if tuples was existed and we
+ * Delete tuple on leaf page if tuples existed and we
* should update it, update old child blkno to new right page
- * if child split is occured
+ * if child split occurred
*/
-static BlockNumber
-entryPreparePage(GinBtree btree, Page page, OffsetNumber off)
+static void
+entryPreparePage(GinBtree btree, Page page, OffsetNumber off,
+ GinBtreeEntryInsertData *insertData, BlockNumber updateblkno)
{
- BlockNumber ret = InvalidBlockNumber;
-
- Assert(btree->entry);
+ Assert(insertData->entry);
Assert(!GinPageIsData(page));
- if (btree->isDelete)
+ if (insertData->isDelete)
{
Assert(GinPageIsLeaf(page));
PageIndexTupleDelete(page, off);
}
- if (!GinPageIsLeaf(page) && btree->rightblkno != InvalidBlockNumber)
+ if (!GinPageIsLeaf(page) && updateblkno != InvalidBlockNumber)
{
IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, off));
- GinSetDownlink(itup, btree->rightblkno);
- ret = btree->rightblkno;
+ GinSetDownlink(itup, updateblkno);
}
-
- btree->rightblkno = InvalidBlockNumber;
-
- return ret;
}
/*
* Place tuple on page and fills WAL record
+ *
+ * If the tuple doesn't fit, returns false without modifying the page.
+ *
+ * On insertion to an internal node, in addition to inserting the given item,
+ * the downlink of the existing item at 'off' is updated to point to
+ * 'updateblkno'.
+ *
+ * On INSERTED, registers the buffer as buffer ID 0, with data.
+ * On SPLIT, returns rdata that represents the split pages in *prdata.
*/
-static void
-entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off, XLogRecData **prdata)
+static GinPlaceToPageRC
+entryPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
+ void *insertPayload, BlockNumber updateblkno,
+ Page *newlpage, Page *newrpage)
{
+ GinBtreeEntryInsertData *insertData = insertPayload;
Page page = BufferGetPage(buf);
+ OffsetNumber off = stack->off;
OffsetNumber placed;
- int cnt = 0;
- /* these must be static so they can be returned to caller */
- static XLogRecData rdata[3];
- static ginxlogInsert data;
- *prdata = rdata;
- data.updateBlkno = entryPreparePage(btree, page, off);
+ /* this must be static so it can be returned to caller. */
+ static ginxlogInsertEntry data;
+
+ /* quick exit if it doesn't fit */
+ if (!entryIsEnoughSpace(btree, buf, off, insertData))
+ {
+ entrySplitPage(btree, buf, stack, insertPayload, updateblkno,
+ newlpage, newrpage);
+ return SPLIT;
+ }
+
+ START_CRIT_SECTION();
+
+ entryPreparePage(btree, page, off, insertData, updateblkno);
- placed = PageAddItem(page, (Item) btree->entry, IndexTupleSize(btree->entry), off, false, false);
+ placed = PageAddItem(page,
+ (Item) insertData->entry,
+ IndexTupleSize(insertData->entry),
+ off, false, false);
if (placed != off)
elog(ERROR, "failed to add item to index page in \"%s\"",
RelationGetRelationName(btree->index));
- data.node = btree->index->rd_node;
- data.blkno = BufferGetBlockNumber(buf);
- data.offset = off;
- data.nitem = 1;
- data.isDelete = btree->isDelete;
- data.isData = false;
- data.isLeaf = GinPageIsLeaf(page) ? TRUE : FALSE;
-
- /*
- * Prevent full page write if child's split occurs. That is needed to
- * remove incomplete splits while replaying WAL
- *
- * data.updateBlkno contains new block number (of newly created right
- * page) for recently splited page.
- */
- if (data.updateBlkno == InvalidBlockNumber)
+ if (RelationNeedsWAL(btree->index))
{
- rdata[0].buffer = buf;
- rdata[0].buffer_std = TRUE;
- rdata[0].data = NULL;
- rdata[0].len = 0;
- rdata[0].next = &rdata[1];
- cnt++;
+ data.isDelete = insertData->isDelete;
+ data.offset = off;
+
+ XLogBeginInsert();
+ XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
+ XLogRegisterBufData(0, (char *) &data,
+ offsetof(ginxlogInsertEntry, tuple));
+ XLogRegisterBufData(0, (char *) insertData->entry,
+ IndexTupleSize(insertData->entry));
}
- rdata[cnt].buffer = InvalidBuffer;
- rdata[cnt].data = (char *) &data;
- rdata[cnt].len = sizeof(ginxlogInsert);
- rdata[cnt].next = &rdata[cnt + 1];
- cnt++;
-
- rdata[cnt].buffer = InvalidBuffer;
- rdata[cnt].data = (char *) btree->entry;
- rdata[cnt].len = IndexTupleSize(btree->entry);
- rdata[cnt].next = NULL;
-
- btree->entry = NULL;
+ return INSERTED;
}
/*
* Place tuple and split page, original buffer(lbuf) leaves untouched,
- * returns shadow page of lbuf filled new data.
+ * returns shadow pages filled with new data.
* Tuples are distributed between pages by equal size on its, not
* an equal number!
*/
-static Page
-entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRecData **prdata)
+static void
+entrySplitPage(GinBtree btree, Buffer origbuf,
+ GinBtreeStack *stack,
+ void *insertPayload,
+ BlockNumber updateblkno,
+ Page *newlpage, Page *newrpage)
{
+ GinBtreeEntryInsertData *insertData = insertPayload;
+ OffsetNumber off = stack->off;
OffsetNumber i,
maxoff,
separator = InvalidOffsetNumber;
Size lsize = 0,
size;
char *ptr;
- IndexTuple itup,
- leftrightmost = NULL;
+ IndexTuple itup;
Page page;
- Page lpage = PageGetTempPageCopy(BufferGetPage(lbuf));
- Page rpage = BufferGetPage(rbuf);
+ Page lpage = PageGetTempPageCopy(BufferGetPage(origbuf));
+ Page rpage = PageGetTempPageCopy(BufferGetPage(origbuf));
Size pageSize = PageGetPageSize(lpage);
- /* these must be static so they can be returned to caller */
- static XLogRecData rdata[2];
- static ginxlogSplit data;
- static char tupstore[2 * BLCKSZ];
+ char tupstore[2 * BLCKSZ];
- *prdata = rdata;
- data.leftChildBlkno = (GinPageIsLeaf(lpage)) ?
- InvalidOffsetNumber : GinGetDownlink(btree->entry);
- data.updateBlkno = entryPreparePage(btree, lpage, off);
+ entryPreparePage(btree, lpage, off, insertData, updateblkno);
+ /*
+ * First, append all the existing tuples and the new tuple we're inserting
+ * one after another in a temporary workspace.
+ */
maxoff = PageGetMaxOffsetNumber(lpage);
ptr = tupstore;
-
for (i = FirstOffsetNumber; i <= maxoff; i++)
{
if (i == off)
{
- size = MAXALIGN(IndexTupleSize(btree->entry));
- memcpy(ptr, btree->entry, size);
+ size = MAXALIGN(IndexTupleSize(insertData->entry));
+ memcpy(ptr, insertData->entry, size);
ptr += size;
totalsize += size + sizeof(ItemIdData);
}
if (off == maxoff + 1)
{
- size = MAXALIGN(IndexTupleSize(btree->entry));
- memcpy(ptr, btree->entry, size);
+ size = MAXALIGN(IndexTupleSize(insertData->entry));
+ memcpy(ptr, insertData->entry, size);
ptr += size;
totalsize += size + sizeof(ItemIdData);
}
+ /*
+ * Initialize the left and right pages, and copy all the tuples back to
+ * them.
+ */
GinInitPage(rpage, GinPageGetOpaque(lpage)->flags, pageSize);
GinInitPage(lpage, GinPageGetOpaque(rpage)->flags, pageSize);
}
else
{
- leftrightmost = itup;
lsize += MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData);
}
ptr += MAXALIGN(IndexTupleSize(itup));
}
- btree->entry = GinFormInteriorTuple(leftrightmost, lpage,
- BufferGetBlockNumber(lbuf));
-
- btree->rightblkno = BufferGetBlockNumber(rbuf);
-
- data.node = btree->index->rd_node;
- data.rootBlkno = InvalidBlockNumber;
- data.lblkno = BufferGetBlockNumber(lbuf);
- data.rblkno = BufferGetBlockNumber(rbuf);
- data.separator = separator;
- data.nitem = maxoff;
- data.isData = FALSE;
- data.isLeaf = GinPageIsLeaf(lpage) ? TRUE : FALSE;
- data.isRootSplit = FALSE;
-
- rdata[0].buffer = InvalidBuffer;
- rdata[0].data = (char *) &data;
- rdata[0].len = sizeof(ginxlogSplit);
- rdata[0].next = &rdata[1];
-
- rdata[1].buffer = InvalidBuffer;
- rdata[1].data = tupstore;
- rdata[1].len = MAXALIGN(totalsize);
- rdata[1].next = NULL;
-
- return lpage;
+ *newlpage = lpage;
+ *newrpage = rpage;
}
/*
- * return newly allocated rightmost tuple
+ * Construct insertion payload for inserting the downlink for given buffer.
*/
-IndexTuple
-ginPageGetLinkItup(Buffer buf)
+static void *
+entryPrepareDownlink(GinBtree btree, Buffer lbuf)
{
- IndexTuple itup,
- nitup;
- Page page = BufferGetPage(buf);
+ GinBtreeEntryInsertData *insertData;
+ Page lpage = BufferGetPage(lbuf);
+ BlockNumber lblkno = BufferGetBlockNumber(lbuf);
+ IndexTuple itup;
- itup = getRightMostTuple(page);
- nitup = GinFormInteriorTuple(itup, page, BufferGetBlockNumber(buf));
+ itup = getRightMostTuple(lpage);
- return nitup;
+ insertData = palloc(sizeof(GinBtreeEntryInsertData));
+ insertData->entry = GinFormInteriorTuple(itup, lpage, lblkno);
+ insertData->isDelete = false;
+
+ return insertData;
}
/*
* Also called from ginxlog, should not use btree
*/
void
-ginEntryFillRoot(GinBtree btree, Buffer root, Buffer lbuf, Buffer rbuf)
+ginEntryFillRoot(GinBtree btree, Page root,
+ BlockNumber lblkno, Page lpage,
+ BlockNumber rblkno, Page rpage)
{
- Page page;
IndexTuple itup;
- page = BufferGetPage(root);
-
- itup = ginPageGetLinkItup(lbuf);
- if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
+ itup = GinFormInteriorTuple(getRightMostTuple(lpage), lpage, lblkno);
+ if (PageAddItem(root, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add item to index root page");
pfree(itup);
- itup = ginPageGetLinkItup(rbuf);
- if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
+ itup = GinFormInteriorTuple(getRightMostTuple(rpage), rpage, rblkno);
+ if (PageAddItem(root, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add item to index root page");
pfree(itup);
}
memset(btree, 0, sizeof(GinBtreeData));
btree->index = ginstate->index;
+ btree->rootBlkno = GIN_ROOT_BLKNO;
btree->ginstate = ginstate;
btree->findChildPage = entryLocateEntry;
+ btree->getLeftMostChild = entryGetLeftMostPage;
btree->isMoveRight = entryIsMoveRight;
btree->findItem = entryLocateLeafEntry;
btree->findChildPtr = entryFindChildPtr;
- btree->getLeftMostPage = entryGetLeftMostPage;
- btree->isEnoughSpace = entryIsEnoughSpace;
btree->placeToPage = entryPlaceToPage;
- btree->splitPage = entrySplitPage;
btree->fillRoot = ginEntryFillRoot;
+ btree->prepareDownlink = entryPrepareDownlink;
btree->isData = FALSE;
- btree->searchMode = FALSE;
btree->fullScan = FALSE;
btree->isBuild = FALSE;
btree->entryAttnum = attnum;
btree->entryKey = key;
btree->entryCategory = category;
- btree->isDelete = FALSE;
}