]> granicus.if.org Git - postgresql/blobdiff - src/backend/access/gin/ginentrypage.c
Update copyright for 2016
[postgresql] / src / backend / access / gin / ginentrypage.c
index a47e92785c2dc6f9d8cce621ab3951015ce088f6..251274564803e873927e52af4421b46139e93dce 100644 (file)
@@ -1,10 +1,10 @@
 /*-------------------------------------------------------------------------
  *
  * ginentrypage.c
- *       page utilities routines for the postgres inverted index access method.
+ *       routines for handling GIN entry tree pages.
  *
  *
- * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
 
 #include "postgres.h"
 
-#include "access/gin.h"
-#include "storage/bufmgr.h"
+#include "access/gin_private.h"
+#include "access/xloginsert.h"
+#include "miscadmin.h"
 #include "utils/rel.h"
 
+static void entrySplitPage(GinBtree btree, Buffer origbuf,
+                          GinBtreeStack *stack,
+                          void *insertPayload,
+                          BlockNumber updateblkno,
+                          Page *newlpage, Page *newrpage);
+
 /*
  * Form a tuple for entry tree.
  *
  * If the tuple would be too big to be stored, function throws a suitable
  * error if errorTooBig is TRUE, or returns NULL if errorTooBig is FALSE.
  *
- * On leaf pages, Index tuple has non-traditional layout. Tuple may contain
- * posting list or root blocknumber of posting tree.
- * Macros: GinIsPostingTree(itup) / GinSetPostingTree(itup, blkno)
- * 1) Posting list
- *             - itup->t_info & INDEX_SIZE_MASK contains total size of tuple as usual
- *             - ItemPointerGetBlockNumber(&itup->t_tid) contains original
- *               size of tuple (without posting list).
- *               Macros: GinGetOrigSizePosting(itup) / GinSetOrigSizePosting(itup,n)
- *             - ItemPointerGetOffsetNumber(&itup->t_tid) contains number
- *               of elements in posting list (number of heap itempointers)
- *               Macros: GinGetNPosting(itup) / GinSetNPosting(itup,n)
- *             - After standard part of tuple there is a posting list, ie, array
- *               of heap itempointers
- *               Macros: GinGetPosting(itup)
- * 2) Posting tree
- *             - itup->t_info & INDEX_SIZE_MASK contains size of tuple as usual
- *             - ItemPointerGetBlockNumber(&itup->t_tid) contains block number of
- *               root of posting tree
- *             - ItemPointerGetOffsetNumber(&itup->t_tid) contains magic number
- *               GIN_TREE_POSTING, which distinguishes this from posting-list case
- *
- * Attributes of an index tuple are different for single and multicolumn index.
- * For single-column case, index tuple stores only value to be indexed.
- * For multicolumn case, it stores two attributes: column number of value
- * and value.
+ * See src/backend/access/gin/README for a description of the index tuple
+ * format that is being built here.  We build on the assumption that we
+ * are making a leaf-level key entry containing a posting list of nipd items.
+ * If the caller is actually trying to make a posting-tree entry, non-leaf
+ * entry, or pending-list entry, it should pass dataSize = 0 and then overwrite
+ * the t_tid fields as necessary.  In any case, 'data' can be NULL to skip
+ * filling in the posting list; the caller is responsible for filling it
+ * afterwards if data = NULL and nipd > 0.
  */
 IndexTuple
-GinFormTuple(Relation index, GinState *ginstate,
-                        OffsetNumber attnum, Datum key,
-                        ItemPointerData *ipd, uint32 nipd, bool errorTooBig)
+GinFormTuple(GinState *ginstate,
+                        OffsetNumber attnum, Datum key, GinNullCategory category,
+                        Pointer data, Size dataSize, int nipd,
+                        bool errorTooBig)
 {
-       bool            isnull[2] = {FALSE, FALSE};
+       Datum           datums[2];
+       bool            isnull[2];
        IndexTuple      itup;
        uint32          newsize;
 
+       /* Build the basic tuple: optional column number, plus key datum */
        if (ginstate->oneCol)
-               itup = index_form_tuple(ginstate->origTupdesc, &key, isnull);
+       {
+               datums[0] = key;
+               isnull[0] = (category != GIN_CAT_NORM_KEY);
+       }
        else
        {
-               Datum           datums[2];
-
                datums[0] = UInt16GetDatum(attnum);
+               isnull[0] = false;
                datums[1] = key;
-               itup = index_form_tuple(ginstate->tupdesc[attnum - 1], datums, isnull);
+               isnull[1] = (category != GIN_CAT_NORM_KEY);
        }
 
-       GinSetOrigSizePosting(itup, IndexTupleSize(itup));
+       itup = index_form_tuple(ginstate->tupdesc[attnum - 1], datums, isnull);
+
+       /*
+        * Determine and store offset to the posting list, making sure there is
+        * room for the category byte if needed.
+        *
+        * Note: because index_form_tuple MAXALIGNs the tuple size, there may well
+        * be some wasted pad space.  Is it worth recomputing the data length to
+        * prevent that?  That would also allow us to Assert that the real data
+        * doesn't overlap the GinNullCategory byte, which this code currently
+        * takes on faith.
+        */
+       newsize = IndexTupleSize(itup);
 
-       if (nipd > 0)
+       if (IndexTupleHasNulls(itup))
        {
-               newsize = MAXALIGN(SHORTALIGN(IndexTupleSize(itup)) + sizeof(ItemPointerData) * nipd);
-               if (newsize > Min(INDEX_SIZE_MASK, GinMaxItemSize))
-               {
-                       if (errorTooBig)
-                               ereport(ERROR,
-                                               (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
-                                                errmsg("index row size %lu exceeds maximum %lu for index \"%s\"",
-                                                               (unsigned long) newsize,
-                                                               (unsigned long) Min(INDEX_SIZE_MASK,
-                                                                                                       GinMaxItemSize),
-                                                               RelationGetRelationName(index))));
-                       return NULL;
-               }
+               uint32          minsize;
 
-               itup = repalloc(itup, newsize);
+               Assert(category != GIN_CAT_NORM_KEY);
+               minsize = GinCategoryOffset(itup, ginstate) + sizeof(GinNullCategory);
+               newsize = Max(newsize, minsize);
+       }
 
-               /* set new size */
-               itup->t_info &= ~INDEX_SIZE_MASK;
-               itup->t_info |= newsize;
+       newsize = SHORTALIGN(newsize);
+
+       GinSetPostingOffset(itup, newsize);
+       GinSetNPosting(itup, nipd);
 
-               if (ipd)
-                       memcpy(GinGetPosting(itup), ipd, sizeof(ItemPointerData) * nipd);
-               GinSetNPosting(itup, nipd);
+       /*
+        * Add space needed for posting list, if any.  Then check that the tuple
+        * won't be too big to store.
+        */
+       newsize += dataSize;
+
+       newsize = MAXALIGN(newsize);
+
+       if (newsize > GinMaxItemSize)
+       {
+               if (errorTooBig)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+                       errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
+                                  (Size) newsize, (Size) GinMaxItemSize,
+                                  RelationGetRelationName(ginstate->index))));
+               pfree(itup);
+               return NULL;
        }
-       else
+
+       /*
+        * Resize tuple if needed
+        */
+       if (newsize != IndexTupleSize(itup))
        {
+               itup = repalloc(itup, newsize);
+
                /*
-                * Gin tuple without any ItemPointers should be large enough to keep
-                * one ItemPointer, to prevent inconsistency between
-                * ginHeapTupleFastCollect and ginEntryInsert called by
-                * ginHeapTupleInsert.  ginHeapTupleFastCollect forms tuple without
-                * extra pointer to heap, but ginEntryInsert (called for pending list
-                * cleanup during vacuum) will form the same tuple with one
-                * ItemPointer.
+                * PostgreSQL 9.3 and earlier did not clear this new space, so we
+                * might find uninitialized padding when reading tuples from disk.
                 */
-               newsize = MAXALIGN(SHORTALIGN(IndexTupleSize(itup)) + sizeof(ItemPointerData));
-               if (newsize > Min(INDEX_SIZE_MASK, GinMaxItemSize))
-               {
-                       if (errorTooBig)
-                               ereport(ERROR,
-                                               (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
-                                                errmsg("index row size %lu exceeds maximum %lu for index \"%s\"",
-                                                               (unsigned long) newsize,
-                                                               (unsigned long) Min(INDEX_SIZE_MASK,
-                                                                                                       GinMaxItemSize),
-                                                               RelationGetRelationName(index))));
-                       return NULL;
-               }
+               memset((char *) itup + IndexTupleSize(itup),
+                          0, newsize - IndexTupleSize(itup));
+               /* set new size in tuple header */
+               itup->t_info &= ~INDEX_SIZE_MASK;
+               itup->t_info |= newsize;
+       }
+
+       /*
+        * Copy in the posting list, if provided
+        */
+       if (data)
+       {
+               char       *ptr = GinGetPosting(itup);
 
-               GinSetNPosting(itup, 0);
+               memcpy(ptr, data, dataSize);
+       }
+
+       /*
+        * Insert category byte, if needed
+        */
+       if (category != GIN_CAT_NORM_KEY)
+       {
+               Assert(IndexTupleHasNulls(itup));
+               GinSetNullCategory(itup, ginstate, category);
        }
        return itup;
 }
 
 /*
- * Sometimes we reduce the number of posting list items in a tuple after
- * having built it with GinFormTuple.  This function adjusts the size
- * fields to match.
+ * Read item pointers from leaf entry tuple.
+ *
+ * Returns a palloc'd array of ItemPointers. The number of items is returned
+ * in *nitems.
  */
-void
-GinShortenTuple(IndexTuple itup, uint32 nipd)
+ItemPointer
+ginReadTuple(GinState *ginstate, OffsetNumber attnum, IndexTuple itup,
+                        int *nitems)
 {
-       uint32          newsize;
+       Pointer         ptr = GinGetPosting(itup);
+       int                     nipd = GinGetNPosting(itup);
+       ItemPointer ipd;
+       int                     ndecoded;
 
-       Assert(nipd <= GinGetNPosting(itup));
+       if (GinItupIsCompressed(itup))
+       {
+               if (nipd > 0)
+               {
+                       ipd = ginPostingListDecode((GinPostingList *) ptr, &ndecoded);
+                       if (nipd != ndecoded)
+                               elog(ERROR, "number of items mismatch in GIN entry tuple, %d in tuple header, %d decoded",
+                                        nipd, ndecoded);
+               }
+               else
+               {
+                       ipd = palloc(0);
+               }
+       }
+       else
+       {
+               ipd = (ItemPointer) palloc(sizeof(ItemPointerData) * nipd);
+               memcpy(ipd, ptr, sizeof(ItemPointerData) * nipd);
+       }
+       *nitems = nipd;
+       return ipd;
+}
 
-       newsize = MAXALIGN(SHORTALIGN(GinGetOrigSizePosting(itup)) + sizeof(ItemPointerData) * nipd);
+/*
+ * Form a non-leaf entry tuple by copying the key data from the given tuple,
+ * which can be either a leaf or non-leaf entry tuple.
+ *
+ * Any posting list in the source tuple is not copied.  The specified child
+ * block number is inserted into t_tid.
+ */
+static IndexTuple
+GinFormInteriorTuple(IndexTuple itup, Page page, BlockNumber childblk)
+{
+       IndexTuple      nitup;
 
-       Assert(newsize <= (itup->t_info & INDEX_SIZE_MASK));
+       if (GinPageIsLeaf(page) && !GinIsPostingTree(itup))
+       {
+               /* Tuple contains a posting list, just copy stuff before that */
+               uint32          origsize = GinGetPostingOffset(itup);
 
-       itup->t_info &= ~INDEX_SIZE_MASK;
-       itup->t_info |= newsize;
+               origsize = MAXALIGN(origsize);
+               nitup = (IndexTuple) palloc(origsize);
+               memcpy(nitup, itup, origsize);
+               /* ... be sure to fix the size header field ... */
+               nitup->t_info &= ~INDEX_SIZE_MASK;
+               nitup->t_info |= origsize;
+       }
+       else
+       {
+               /* Copy the tuple as-is */
+               nitup = (IndexTuple) palloc(IndexTupleSize(itup));
+               memcpy(nitup, itup, IndexTupleSize(itup));
+       }
 
-       GinSetNPosting(itup, nipd);
+       /* Now insert the correct downlink */
+       GinSetDownlink(nitup, childblk);
+
+       return nitup;
 }
 
 /*
  * Entry tree is a "static", ie tuple never deletes from it,
- * so we don't use right bound, we use rightest key instead.
+ * so we don't use right bound, we use rightmost key instead.
  */
 static IndexTuple
 getRightMostTuple(Page page)
@@ -166,16 +243,20 @@ static bool
 entryIsMoveRight(GinBtree btree, Page page)
 {
        IndexTuple      itup;
+       OffsetNumber attnum;
+       Datum           key;
+       GinNullCategory category;
 
        if (GinPageRightMost(page))
                return FALSE;
 
        itup = getRightMostTuple(page);
+       attnum = gintuple_get_attrnum(btree->ginstate, itup);
+       key = gintuple_get_key(btree->ginstate, itup, &category);
 
-       if (compareAttEntries(btree->ginstate,
-                                                 btree->entryAttnum, btree->entryValue,
-                                                 gintuple_get_attrnum(btree->ginstate, itup),
-                                                 gin_index_getattr(btree->ginstate, itup)) > 0)
+       if (ginCompareAttEntries(btree->ginstate,
+                                  btree->entryAttnum, btree->entryKey, btree->entryCategory,
+                                                        attnum, key, category) > 0)
                return TRUE;
 
        return FALSE;
@@ -183,7 +264,7 @@ entryIsMoveRight(GinBtree btree, Page page)
 
 /*
  * Find correct tuple in non-leaf page. It supposed that
- * page correctly choosen and searching value SHOULD be on page
+ * page correctly chosen and searching value SHOULD be on page
  */
 static BlockNumber
 entryLocateEntry(GinBtree btree, GinBtreeStack *stack)
@@ -202,7 +283,7 @@ entryLocateEntry(GinBtree btree, GinBtreeStack *stack)
        {
                stack->off = FirstOffsetNumber;
                stack->predictNumber *= PageGetMaxOffsetNumber(page);
-               return btree->getLeftMostPage(btree, page);
+               return btree->getLeftMostChild(btree, page);
        }
 
        low = FirstOffsetNumber;
@@ -216,22 +297,31 @@ entryLocateEntry(GinBtree btree, GinBtreeStack *stack)
                OffsetNumber mid = low + ((high - low) / 2);
 
                if (mid == maxoff && GinPageRightMost(page))
+               {
                        /* Right infinity */
                        result = -1;
+               }
                else
                {
+                       OffsetNumber attnum;
+                       Datum           key;
+                       GinNullCategory category;
+
                        itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, mid));
-                       result = compareAttEntries(btree->ginstate,
-                                                                          btree->entryAttnum, btree->entryValue,
-                                                                gintuple_get_attrnum(btree->ginstate, itup),
-                                                                  gin_index_getattr(btree->ginstate, itup));
+                       attnum = gintuple_get_attrnum(btree->ginstate, itup);
+                       key = gintuple_get_key(btree->ginstate, itup, &category);
+                       result = ginCompareAttEntries(btree->ginstate,
+                                                                                 btree->entryAttnum,
+                                                                                 btree->entryKey,
+                                                                                 btree->entryCategory,
+                                                                                 attnum, key, category);
                }
 
                if (result == 0)
                {
                        stack->off = mid;
-                       Assert(GinItemPointerGetBlockNumber(&(itup)->t_tid) != GIN_ROOT_BLKNO);
-                       return GinItemPointerGetBlockNumber(&(itup)->t_tid);
+                       Assert(GinGetDownlink(itup) != GIN_ROOT_BLKNO);
+                       return GinGetDownlink(itup);
                }
                else if (result > 0)
                        low = mid + 1;
@@ -243,13 +333,13 @@ entryLocateEntry(GinBtree btree, GinBtreeStack *stack)
 
        stack->off = high;
        itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, high));
-       Assert(GinItemPointerGetBlockNumber(&(itup)->t_tid) != GIN_ROOT_BLKNO);
-       return GinItemPointerGetBlockNumber(&(itup)->t_tid);
+       Assert(GinGetDownlink(itup) != GIN_ROOT_BLKNO);
+       return GinGetDownlink(itup);
 }
 
 /*
  * Searches correct position for value on leaf page.
- * Page should be corrrectly choosen.
+ * Page should be correctly chosen.
  * Returns true if value found on page.
  */
 static bool
@@ -258,7 +348,6 @@ entryLocateLeafEntry(GinBtree btree, GinBtreeStack *stack)
        Page            page = BufferGetPage(stack->buffer);
        OffsetNumber low,
                                high;
-       IndexTuple      itup;
 
        Assert(GinPageIsLeaf(page));
        Assert(!GinPageIsData(page));
@@ -283,13 +372,20 @@ entryLocateLeafEntry(GinBtree btree, GinBtreeStack *stack)
        while (high > low)
        {
                OffsetNumber mid = low + ((high - low) / 2);
+               IndexTuple      itup;
+               OffsetNumber attnum;
+               Datum           key;
+               GinNullCategory category;
                int                     result;
 
                itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, mid));
-               result = compareAttEntries(btree->ginstate,
-                                                                  btree->entryAttnum, btree->entryValue,
-                                                                gintuple_get_attrnum(btree->ginstate, itup),
-                                                                  gin_index_getattr(btree->ginstate, itup));
+               attnum = gintuple_get_attrnum(btree->ginstate, itup);
+               key = gintuple_get_key(btree->ginstate, itup, &category);
+               result = ginCompareAttEntries(btree->ginstate,
+                                                                         btree->entryAttnum,
+                                                                         btree->entryKey,
+                                                                         btree->entryCategory,
+                                                                         attnum, key, category);
                if (result == 0)
                {
                        stack->off = mid;
@@ -319,7 +415,7 @@ entryFindChildPtr(GinBtree btree, Page page, BlockNumber blkno, OffsetNumber sto
        if (storedOff >= FirstOffsetNumber && storedOff <= maxoff)
        {
                itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, storedOff));
-               if (GinItemPointerGetBlockNumber(&(itup)->t_tid) == blkno)
+               if (GinGetDownlink(itup) == blkno)
                        return storedOff;
 
                /*
@@ -329,7 +425,7 @@ entryFindChildPtr(GinBtree btree, Page page, BlockNumber blkno, OffsetNumber sto
                for (i = storedOff + 1; i <= maxoff; i++)
                {
                        itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
-                       if (GinItemPointerGetBlockNumber(&(itup)->t_tid) == blkno)
+                       if (GinGetDownlink(itup) == blkno)
                                return i;
                }
                maxoff = storedOff - 1;
@@ -339,7 +435,7 @@ entryFindChildPtr(GinBtree btree, Page page, BlockNumber blkno, OffsetNumber sto
        for (i = FirstOffsetNumber; i <= maxoff; i++)
        {
                itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
-               if (GinItemPointerGetBlockNumber(&(itup)->t_tid) == blkno)
+               if (GinGetDownlink(itup) == blkno)
                        return i;
        }
 
@@ -356,187 +452,165 @@ entryGetLeftMostPage(GinBtree btree, Page page)
        Assert(PageGetMaxOffsetNumber(page) >= FirstOffsetNumber);
 
        itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, FirstOffsetNumber));
-       return GinItemPointerGetBlockNumber(&(itup)->t_tid);
+       return GinGetDownlink(itup);
 }
 
 static bool
-entryIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off)
+entryIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off,
+                                  GinBtreeEntryInsertData *insertData)
 {
-       Size            itupsz = 0;
+       Size            releasedsz = 0;
+       Size            addedsz;
        Page            page = BufferGetPage(buf);
 
-       Assert(btree->entry);
+       Assert(insertData->entry);
        Assert(!GinPageIsData(page));
 
-       if (btree->isDelete)
+       if (insertData->isDelete)
        {
                IndexTuple      itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, off));
 
-               itupsz = MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData);
+               releasedsz = MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData);
        }
 
-       if (PageGetFreeSpace(page) + itupsz >= MAXALIGN(IndexTupleSize(btree->entry)) + sizeof(ItemIdData))
+       addedsz = MAXALIGN(IndexTupleSize(insertData->entry)) + sizeof(ItemIdData);
+
+       if (PageGetFreeSpace(page) + releasedsz >= addedsz)
                return true;
 
        return false;
 }
 
 /*
- * Delete tuple on leaf page if tuples was existed and we
+ * Delete tuple on leaf page if tuples existed and we
  * should update it, update old child blkno to new right page
- * if child split is occured
+ * if child split occurred
  */
-static BlockNumber
-entryPreparePage(GinBtree btree, Page page, OffsetNumber off)
+static void
+entryPreparePage(GinBtree btree, Page page, OffsetNumber off,
+                                GinBtreeEntryInsertData *insertData, BlockNumber updateblkno)
 {
-       BlockNumber ret = InvalidBlockNumber;
-
-       Assert(btree->entry);
+       Assert(insertData->entry);
        Assert(!GinPageIsData(page));
 
-       if (btree->isDelete)
+       if (insertData->isDelete)
        {
                Assert(GinPageIsLeaf(page));
                PageIndexTupleDelete(page, off);
        }
 
-       if (!GinPageIsLeaf(page) && btree->rightblkno != InvalidBlockNumber)
+       if (!GinPageIsLeaf(page) && updateblkno != InvalidBlockNumber)
        {
                IndexTuple      itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, off));
 
-               ItemPointerSet(&itup->t_tid, btree->rightblkno, InvalidOffsetNumber);
-               ret = btree->rightblkno;
+               GinSetDownlink(itup, updateblkno);
        }
-
-       btree->rightblkno = InvalidBlockNumber;
-
-       return ret;
 }
 
 /*
  * Place tuple on page and fills WAL record
+ *
+ * If the tuple doesn't fit, returns false without modifying the page.
+ *
+ * On insertion to an internal node, in addition to inserting the given item,
+ * the downlink of the existing item at 'off' is updated to point to
+ * 'updateblkno'.
+ *
+ * On INSERTED, registers the buffer as buffer ID 0, with data.
+ * On SPLIT, returns rdata that represents the split pages in *prdata.
  */
-static void
-entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off, XLogRecData **prdata)
+static GinPlaceToPageRC
+entryPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
+                                void *insertPayload, BlockNumber updateblkno,
+                                Page *newlpage, Page *newrpage)
 {
+       GinBtreeEntryInsertData *insertData = insertPayload;
        Page            page = BufferGetPage(buf);
-       static XLogRecData rdata[3];
+       OffsetNumber off = stack->off;
        OffsetNumber placed;
-       static ginxlogInsert data;
-       int                     cnt = 0;
 
-       *prdata = rdata;
-       data.updateBlkno = entryPreparePage(btree, page, off);
+       /* this must be static so it can be returned to caller. */
+       static ginxlogInsertEntry data;
 
-       placed = PageAddItem(page, (Item) btree->entry, IndexTupleSize(btree->entry), off, false, false);
-       if (placed != off)
-               elog(ERROR, "failed to add item to index page in \"%s\"",
-                        RelationGetRelationName(btree->index));
-
-       data.node = btree->index->rd_node;
-       data.blkno = BufferGetBlockNumber(buf);
-       data.offset = off;
-       data.nitem = 1;
-       data.isDelete = btree->isDelete;
-       data.isData = false;
-       data.isLeaf = GinPageIsLeaf(page) ? TRUE : FALSE;
-
-       /*
-        * Prevent full page write if child's split occurs. That is needed to
-        * remove incomplete splits while replaying WAL
-        *
-        * data.updateBlkno contains new block number (of newly created right
-        * page) for recently splited page.
-        */
-       if (data.updateBlkno == InvalidBlockNumber)
+       /* quick exit if it doesn't fit */
+       if (!entryIsEnoughSpace(btree, buf, off, insertData))
        {
-               rdata[0].buffer = buf;
-               rdata[0].buffer_std = TRUE;
-               rdata[0].data = NULL;
-               rdata[0].len = 0;
-               rdata[0].next = &rdata[1];
-               cnt++;
+               entrySplitPage(btree, buf, stack, insertPayload, updateblkno,
+                                          newlpage, newrpage);
+               return SPLIT;
        }
 
-       rdata[cnt].buffer = InvalidBuffer;
-       rdata[cnt].data = (char *) &data;
-       rdata[cnt].len = sizeof(ginxlogInsert);
-       rdata[cnt].next = &rdata[cnt + 1];
-       cnt++;
-
-       rdata[cnt].buffer = InvalidBuffer;
-       rdata[cnt].data = (char *) btree->entry;
-       rdata[cnt].len = IndexTupleSize(btree->entry);
-       rdata[cnt].next = NULL;
+       START_CRIT_SECTION();
 
-       btree->entry = NULL;
-}
+       entryPreparePage(btree, page, off, insertData, updateblkno);
 
-/*
- * Returns new tuple with copied value from source tuple.
- * New tuple will not store posting list
- */
-static IndexTuple
-copyIndexTuple(IndexTuple itup, Page page)
-{
-       IndexTuple      nitup;
+       placed = PageAddItem(page,
+                                                (Item) insertData->entry,
+                                                IndexTupleSize(insertData->entry),
+                                                off, false, false);
+       if (placed != off)
+               elog(ERROR, "failed to add item to index page in \"%s\"",
+                        RelationGetRelationName(btree->index));
 
-       if (GinPageIsLeaf(page) && !GinIsPostingTree(itup))
+       if (RelationNeedsWAL(btree->index))
        {
-               nitup = (IndexTuple) palloc(MAXALIGN(GinGetOrigSizePosting(itup)));
-               memcpy(nitup, itup, GinGetOrigSizePosting(itup));
-               nitup->t_info &= ~INDEX_SIZE_MASK;
-               nitup->t_info |= GinGetOrigSizePosting(itup);
-       }
-       else
-       {
-               nitup = (IndexTuple) palloc(MAXALIGN(IndexTupleSize(itup)));
-               memcpy(nitup, itup, IndexTupleSize(itup));
+               data.isDelete = insertData->isDelete;
+               data.offset = off;
+
+               XLogBeginInsert();
+               XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
+               XLogRegisterBufData(0, (char *) &data,
+                                                       offsetof(ginxlogInsertEntry, tuple));
+               XLogRegisterBufData(0, (char *) insertData->entry,
+                                                       IndexTupleSize(insertData->entry));
        }
 
-       return nitup;
+       return INSERTED;
 }
 
 /*
  * Place tuple and split page, original buffer(lbuf) leaves untouched,
- * returns shadow page of lbuf filled new data.
+ * returns shadow pages filled with new data.
  * Tuples are distributed between pages by equal size on its, not
  * an equal number!
  */
-static Page
-entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRecData **prdata)
+static void
+entrySplitPage(GinBtree btree, Buffer origbuf,
+                          GinBtreeStack *stack,
+                          void *insertPayload,
+                          BlockNumber updateblkno,
+                          Page *newlpage, Page *newrpage)
 {
-       static XLogRecData rdata[2];
+       GinBtreeEntryInsertData *insertData = insertPayload;
+       OffsetNumber off = stack->off;
        OffsetNumber i,
                                maxoff,
                                separator = InvalidOffsetNumber;
        Size            totalsize = 0;
        Size            lsize = 0,
                                size;
-       static char tupstore[2 * BLCKSZ];
        char       *ptr;
-       IndexTuple      itup,
-                               leftrightmost = NULL;
-       static ginxlogSplit data;
+       IndexTuple      itup;
        Page            page;
-       Page            lpage = PageGetTempPageCopy(BufferGetPage(lbuf));
-       Page            rpage = BufferGetPage(rbuf);
+       Page            lpage = PageGetTempPageCopy(BufferGetPage(origbuf));
+       Page            rpage = PageGetTempPageCopy(BufferGetPage(origbuf));
        Size            pageSize = PageGetPageSize(lpage);
+       char            tupstore[2 * BLCKSZ];
 
-       *prdata = rdata;
-       data.leftChildBlkno = (GinPageIsLeaf(lpage)) ?
-               InvalidOffsetNumber : GinItemPointerGetBlockNumber(&(btree->entry->t_tid));
-       data.updateBlkno = entryPreparePage(btree, lpage, off);
+       entryPreparePage(btree, lpage, off, insertData, updateblkno);
 
+       /*
+        * First, append all the existing tuples and the new tuple we're inserting
+        * one after another in a temporary workspace.
+        */
        maxoff = PageGetMaxOffsetNumber(lpage);
        ptr = tupstore;
-
        for (i = FirstOffsetNumber; i <= maxoff; i++)
        {
                if (i == off)
                {
-                       size = MAXALIGN(IndexTupleSize(btree->entry));
-                       memcpy(ptr, btree->entry, size);
+                       size = MAXALIGN(IndexTupleSize(insertData->entry));
+                       memcpy(ptr, insertData->entry, size);
                        ptr += size;
                        totalsize += size + sizeof(ItemIdData);
                }
@@ -550,12 +624,16 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogR
 
        if (off == maxoff + 1)
        {
-               size = MAXALIGN(IndexTupleSize(btree->entry));
-               memcpy(ptr, btree->entry, size);
+               size = MAXALIGN(IndexTupleSize(insertData->entry));
+               memcpy(ptr, insertData->entry, size);
                ptr += size;
                totalsize += size + sizeof(ItemIdData);
        }
 
+       /*
+        * Initialize the left and right pages, and copy all the tuples back to
+        * them.
+        */
        GinInitPage(rpage, GinPageGetOpaque(lpage)->flags, pageSize);
        GinInitPage(lpage, GinPageGetOpaque(rpage)->flags, pageSize);
 
@@ -576,7 +654,6 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogR
                }
                else
                {
-                       leftrightmost = itup;
                        lsize += MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData);
                }
 
@@ -586,49 +663,28 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogR
                ptr += MAXALIGN(IndexTupleSize(itup));
        }
 
-       btree->entry = copyIndexTuple(leftrightmost, lpage);
-       ItemPointerSet(&(btree->entry)->t_tid, BufferGetBlockNumber(lbuf), InvalidOffsetNumber);
-
-       btree->rightblkno = BufferGetBlockNumber(rbuf);
-
-       data.node = btree->index->rd_node;
-       data.rootBlkno = InvalidBlockNumber;
-       data.lblkno = BufferGetBlockNumber(lbuf);
-       data.rblkno = BufferGetBlockNumber(rbuf);
-       data.separator = separator;
-       data.nitem = maxoff;
-       data.isData = FALSE;
-       data.isLeaf = GinPageIsLeaf(lpage) ? TRUE : FALSE;
-       data.isRootSplit = FALSE;
-
-       rdata[0].buffer = InvalidBuffer;
-       rdata[0].data = (char *) &data;
-       rdata[0].len = sizeof(ginxlogSplit);
-       rdata[0].next = &rdata[1];
-
-       rdata[1].buffer = InvalidBuffer;
-       rdata[1].data = tupstore;
-       rdata[1].len = MAXALIGN(totalsize);
-       rdata[1].next = NULL;
-
-       return lpage;
+       *newlpage = lpage;
+       *newrpage = rpage;
 }
 
 /*
- * return newly allocated rightmost tuple
+ * Construct insertion payload for inserting the downlink for given buffer.
  */
-IndexTuple
-ginPageGetLinkItup(Buffer buf)
+static void *
+entryPrepareDownlink(GinBtree btree, Buffer lbuf)
 {
-       IndexTuple      itup,
-                               nitup;
-       Page            page = BufferGetPage(buf);
+       GinBtreeEntryInsertData *insertData;
+       Page            lpage = BufferGetPage(lbuf);
+       BlockNumber lblkno = BufferGetBlockNumber(lbuf);
+       IndexTuple      itup;
 
-       itup = getRightMostTuple(page);
-       nitup = copyIndexTuple(itup, page);
-       ItemPointerSet(&nitup->t_tid, BufferGetBlockNumber(buf), InvalidOffsetNumber);
+       itup = getRightMostTuple(lpage);
 
-       return nitup;
+       insertData = palloc(sizeof(GinBtreeEntryInsertData));
+       insertData->entry = GinFormInteriorTuple(itup, lpage, lblkno);
+       insertData->isDelete = false;
+
+       return insertData;
 }
 
 /*
@@ -636,48 +692,54 @@ ginPageGetLinkItup(Buffer buf)
  * Also called from ginxlog, should not use btree
  */
 void
-entryFillRoot(GinBtree btree, Buffer root, Buffer lbuf, Buffer rbuf)
+ginEntryFillRoot(GinBtree btree, Page root,
+                                BlockNumber lblkno, Page lpage,
+                                BlockNumber rblkno, Page rpage)
 {
-       Page            page;
        IndexTuple      itup;
 
-       page = BufferGetPage(root);
-
-       itup = ginPageGetLinkItup(lbuf);
-       if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
+       itup = GinFormInteriorTuple(getRightMostTuple(lpage), lpage, lblkno);
+       if (PageAddItem(root, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
                elog(ERROR, "failed to add item to index root page");
        pfree(itup);
 
-       itup = ginPageGetLinkItup(rbuf);
-       if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
+       itup = GinFormInteriorTuple(getRightMostTuple(rpage), rpage, rblkno);
+       if (PageAddItem(root, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
                elog(ERROR, "failed to add item to index root page");
        pfree(itup);
 }
 
+/*
+ * Set up GinBtree for entry page access
+ *
+ * Note: during WAL recovery, there may be no valid data in ginstate
+ * other than a faked-up Relation pointer; the key datum is bogus too.
+ */
 void
-prepareEntryScan(GinBtree btree, Relation index, OffsetNumber attnum, Datum value, GinState *ginstate)
+ginPrepareEntryScan(GinBtree btree, OffsetNumber attnum,
+                                       Datum key, GinNullCategory category,
+                                       GinState *ginstate)
 {
        memset(btree, 0, sizeof(GinBtreeData));
 
-       btree->index = index;
+       btree->index = ginstate->index;
+       btree->rootBlkno = GIN_ROOT_BLKNO;
        btree->ginstate = ginstate;
 
        btree->findChildPage = entryLocateEntry;
+       btree->getLeftMostChild = entryGetLeftMostPage;
        btree->isMoveRight = entryIsMoveRight;
        btree->findItem = entryLocateLeafEntry;
        btree->findChildPtr = entryFindChildPtr;
-       btree->getLeftMostPage = entryGetLeftMostPage;
-       btree->isEnoughSpace = entryIsEnoughSpace;
        btree->placeToPage = entryPlaceToPage;
-       btree->splitPage = entrySplitPage;
-       btree->fillRoot = entryFillRoot;
+       btree->fillRoot = ginEntryFillRoot;
+       btree->prepareDownlink = entryPrepareDownlink;
 
        btree->isData = FALSE;
-       btree->searchMode = FALSE;
        btree->fullScan = FALSE;
        btree->isBuild = FALSE;
 
        btree->entryAttnum = attnum;
-       btree->entryValue = value;
-       btree->isDelete = FALSE;
+       btree->entryKey = key;
+       btree->entryCategory = category;
 }