]> granicus.if.org Git - postgresql/blobdiff - src/backend/access/gin/ginentrypage.c
Update copyright for 2016
[postgresql] / src / backend / access / gin / ginentrypage.c
index 9749a1be78669a15b259ea083211014b75b883c6..251274564803e873927e52af4421b46139e93dce 100644 (file)
@@ -1,10 +1,10 @@
 /*-------------------------------------------------------------------------
  *
  * ginentrypage.c
- *       page utilities routines for the postgres inverted index access method.
+ *       routines for handling GIN entry tree pages.
  *
  *
- * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
 #include "postgres.h"
 
 #include "access/gin_private.h"
-#include "storage/bufmgr.h"
+#include "access/xloginsert.h"
+#include "miscadmin.h"
 #include "utils/rel.h"
 
+static void entrySplitPage(GinBtree btree, Buffer origbuf,
+                          GinBtreeStack *stack,
+                          void *insertPayload,
+                          BlockNumber updateblkno,
+                          Page *newlpage, Page *newrpage);
+
 /*
  * Form a tuple for entry tree.
  *
  * format that is being built here.  We build on the assumption that we
  * are making a leaf-level key entry containing a posting list of nipd items.
  * If the caller is actually trying to make a posting-tree entry, non-leaf
- * entry, or pending-list entry, it should pass nipd = 0 and then overwrite
- * the t_tid fields as necessary.  In any case, ipd can be NULL to skip
- * copying any itempointers into the posting list; the caller is responsible
- * for filling the posting list afterwards, if ipd = NULL and nipd > 0.
+ * entry, or pending-list entry, it should pass dataSize = 0 and then overwrite
+ * the t_tid fields as necessary.  In any case, 'data' can be NULL to skip
+ * filling in the posting list; the caller is responsible for filling it
+ * afterwards if data = NULL and nipd > 0.
  */
 IndexTuple
 GinFormTuple(GinState *ginstate,
                         OffsetNumber attnum, Datum key, GinNullCategory category,
-                        ItemPointerData *ipd, uint32 nipd,
+                        Pointer data, Size dataSize, int nipd,
                         bool errorTooBig)
 {
        Datum           datums[2];
@@ -84,25 +91,24 @@ GinFormTuple(GinState *ginstate,
        newsize = SHORTALIGN(newsize);
 
        GinSetPostingOffset(itup, newsize);
-
        GinSetNPosting(itup, nipd);
 
        /*
         * Add space needed for posting list, if any.  Then check that the tuple
         * won't be too big to store.
         */
-       newsize += sizeof(ItemPointerData) * nipd;
+       newsize += dataSize;
+
        newsize = MAXALIGN(newsize);
-       if (newsize > Min(INDEX_SIZE_MASK, GinMaxItemSize))
+
+       if (newsize > GinMaxItemSize)
        {
                if (errorTooBig)
                        ereport(ERROR,
                                        (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
-                                        errmsg("index row size %lu exceeds maximum %lu for index \"%s\"",
-                                                       (unsigned long) newsize,
-                                                       (unsigned long) Min(INDEX_SIZE_MASK,
-                                                                                               GinMaxItemSize),
-                                                       RelationGetRelationName(ginstate->index))));
+                       errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
+                                  (Size) newsize, (Size) GinMaxItemSize,
+                                  RelationGetRelationName(ginstate->index))));
                pfree(itup);
                return NULL;
        }
@@ -114,11 +120,27 @@ GinFormTuple(GinState *ginstate,
        {
                itup = repalloc(itup, newsize);
 
+               /*
+                * PostgreSQL 9.3 and earlier did not clear this new space, so we
+                * might find uninitialized padding when reading tuples from disk.
+                */
+               memset((char *) itup + IndexTupleSize(itup),
+                          0, newsize - IndexTupleSize(itup));
                /* set new size in tuple header */
                itup->t_info &= ~INDEX_SIZE_MASK;
                itup->t_info |= newsize;
        }
 
+       /*
+        * Copy in the posting list, if provided
+        */
+       if (data)
+       {
+               char       *ptr = GinGetPosting(itup);
+
+               memcpy(ptr, data, dataSize);
+       }
+
        /*
         * Insert category byte, if needed
         */
@@ -127,37 +149,45 @@ GinFormTuple(GinState *ginstate,
                Assert(IndexTupleHasNulls(itup));
                GinSetNullCategory(itup, ginstate, category);
        }
-
-       /*
-        * Copy in the posting list, if provided
-        */
-       if (ipd)
-               memcpy(GinGetPosting(itup), ipd, sizeof(ItemPointerData) * nipd);
-
        return itup;
 }
 
 /*
- * Sometimes we reduce the number of posting list items in a tuple after
- * having built it with GinFormTuple.  This function adjusts the size
- * fields to match.
+ * Read item pointers from leaf entry tuple.
+ *
+ * Returns a palloc'd array of ItemPointers. The number of items is returned
+ * in *nitems.
  */
-void
-GinShortenTuple(IndexTuple itup, uint32 nipd)
+ItemPointer
+ginReadTuple(GinState *ginstate, OffsetNumber attnum, IndexTuple itup,
+                        int *nitems)
 {
-       uint32          newsize;
-
-       Assert(nipd <= GinGetNPosting(itup));
-
-       newsize = GinGetPostingOffset(itup) + sizeof(ItemPointerData) * nipd;
-       newsize = MAXALIGN(newsize);
-
-       Assert(newsize <= (itup->t_info & INDEX_SIZE_MASK));
+       Pointer         ptr = GinGetPosting(itup);
+       int                     nipd = GinGetNPosting(itup);
+       ItemPointer ipd;
+       int                     ndecoded;
 
-       itup->t_info &= ~INDEX_SIZE_MASK;
-       itup->t_info |= newsize;
-
-       GinSetNPosting(itup, nipd);
+       if (GinItupIsCompressed(itup))
+       {
+               if (nipd > 0)
+               {
+                       ipd = ginPostingListDecode((GinPostingList *) ptr, &ndecoded);
+                       if (nipd != ndecoded)
+                               elog(ERROR, "number of items mismatch in GIN entry tuple, %d in tuple header, %d decoded",
+                                        nipd, ndecoded);
+               }
+               else
+               {
+                       ipd = palloc(0);
+               }
+       }
+       else
+       {
+               ipd = (ItemPointer) palloc(sizeof(ItemPointerData) * nipd);
+               memcpy(ipd, ptr, sizeof(ItemPointerData) * nipd);
+       }
+       *nitems = nipd;
+       return ipd;
 }
 
 /*
@@ -225,7 +255,7 @@ entryIsMoveRight(GinBtree btree, Page page)
        key = gintuple_get_key(btree->ginstate, itup, &category);
 
        if (ginCompareAttEntries(btree->ginstate,
-                                                        btree->entryAttnum, btree->entryKey, btree->entryCategory,
+                                  btree->entryAttnum, btree->entryKey, btree->entryCategory,
                                                         attnum, key, category) > 0)
                return TRUE;
 
@@ -253,7 +283,7 @@ entryLocateEntry(GinBtree btree, GinBtreeStack *stack)
        {
                stack->off = FirstOffsetNumber;
                stack->predictNumber *= PageGetMaxOffsetNumber(page);
-               return btree->getLeftMostPage(btree, page);
+               return btree->getLeftMostChild(btree, page);
        }
 
        low = FirstOffsetNumber;
@@ -426,128 +456,133 @@ entryGetLeftMostPage(GinBtree btree, Page page)
 }
 
 static bool
-entryIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off)
+entryIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off,
+                                  GinBtreeEntryInsertData *insertData)
 {
-       Size            itupsz = 0;
+       Size            releasedsz = 0;
+       Size            addedsz;
        Page            page = BufferGetPage(buf);
 
-       Assert(btree->entry);
+       Assert(insertData->entry);
        Assert(!GinPageIsData(page));
 
-       if (btree->isDelete)
+       if (insertData->isDelete)
        {
                IndexTuple      itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, off));
 
-               itupsz = MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData);
+               releasedsz = MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData);
        }
 
-       if (PageGetFreeSpace(page) + itupsz >= MAXALIGN(IndexTupleSize(btree->entry)) + sizeof(ItemIdData))
+       addedsz = MAXALIGN(IndexTupleSize(insertData->entry)) + sizeof(ItemIdData);
+
+       if (PageGetFreeSpace(page) + releasedsz >= addedsz)
                return true;
 
        return false;
 }
 
 /*
- * Delete tuple on leaf page if tuples was existed and we
+ * Delete tuple on leaf page if tuples existed and we
  * should update it, update old child blkno to new right page
- * if child split is occured
+ * if child split occurred
  */
-static BlockNumber
-entryPreparePage(GinBtree btree, Page page, OffsetNumber off)
+static void
+entryPreparePage(GinBtree btree, Page page, OffsetNumber off,
+                                GinBtreeEntryInsertData *insertData, BlockNumber updateblkno)
 {
-       BlockNumber ret = InvalidBlockNumber;
-
-       Assert(btree->entry);
+       Assert(insertData->entry);
        Assert(!GinPageIsData(page));
 
-       if (btree->isDelete)
+       if (insertData->isDelete)
        {
                Assert(GinPageIsLeaf(page));
                PageIndexTupleDelete(page, off);
        }
 
-       if (!GinPageIsLeaf(page) && btree->rightblkno != InvalidBlockNumber)
+       if (!GinPageIsLeaf(page) && updateblkno != InvalidBlockNumber)
        {
                IndexTuple      itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, off));
 
-               GinSetDownlink(itup, btree->rightblkno);
-               ret = btree->rightblkno;
+               GinSetDownlink(itup, updateblkno);
        }
-
-       btree->rightblkno = InvalidBlockNumber;
-
-       return ret;
 }
 
 /*
  * Place tuple on page and fills WAL record
+ *
+ * If the tuple doesn't fit, returns false without modifying the page.
+ *
+ * On insertion to an internal node, in addition to inserting the given item,
+ * the downlink of the existing item at 'off' is updated to point to
+ * 'updateblkno'.
+ *
+ * On INSERTED, registers the buffer as buffer ID 0, with data.
+ * On SPLIT, returns rdata that represents the split pages in *prdata.
  */
-static void
-entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off, XLogRecData **prdata)
+static GinPlaceToPageRC
+entryPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
+                                void *insertPayload, BlockNumber updateblkno,
+                                Page *newlpage, Page *newrpage)
 {
+       GinBtreeEntryInsertData *insertData = insertPayload;
        Page            page = BufferGetPage(buf);
+       OffsetNumber off = stack->off;
        OffsetNumber placed;
-       int                     cnt = 0;
-       /* these must be static so they can be returned to caller */
-       static XLogRecData rdata[3];
-       static ginxlogInsert data;
 
-       *prdata = rdata;
-       data.updateBlkno = entryPreparePage(btree, page, off);
+       /* this must be static so it can be returned to caller. */
+       static ginxlogInsertEntry data;
+
+       /* quick exit if it doesn't fit */
+       if (!entryIsEnoughSpace(btree, buf, off, insertData))
+       {
+               entrySplitPage(btree, buf, stack, insertPayload, updateblkno,
+                                          newlpage, newrpage);
+               return SPLIT;
+       }
+
+       START_CRIT_SECTION();
+
+       entryPreparePage(btree, page, off, insertData, updateblkno);
 
-       placed = PageAddItem(page, (Item) btree->entry, IndexTupleSize(btree->entry), off, false, false);
+       placed = PageAddItem(page,
+                                                (Item) insertData->entry,
+                                                IndexTupleSize(insertData->entry),
+                                                off, false, false);
        if (placed != off)
                elog(ERROR, "failed to add item to index page in \"%s\"",
                         RelationGetRelationName(btree->index));
 
-       data.node = btree->index->rd_node;
-       data.blkno = BufferGetBlockNumber(buf);
-       data.offset = off;
-       data.nitem = 1;
-       data.isDelete = btree->isDelete;
-       data.isData = false;
-       data.isLeaf = GinPageIsLeaf(page) ? TRUE : FALSE;
-
-       /*
-        * Prevent full page write if child's split occurs. That is needed to
-        * remove incomplete splits while replaying WAL
-        *
-        * data.updateBlkno contains new block number (of newly created right
-        * page) for recently splited page.
-        */
-       if (data.updateBlkno == InvalidBlockNumber)
+       if (RelationNeedsWAL(btree->index))
        {
-               rdata[0].buffer = buf;
-               rdata[0].buffer_std = TRUE;
-               rdata[0].data = NULL;
-               rdata[0].len = 0;
-               rdata[0].next = &rdata[1];
-               cnt++;
+               data.isDelete = insertData->isDelete;
+               data.offset = off;
+
+               XLogBeginInsert();
+               XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
+               XLogRegisterBufData(0, (char *) &data,
+                                                       offsetof(ginxlogInsertEntry, tuple));
+               XLogRegisterBufData(0, (char *) insertData->entry,
+                                                       IndexTupleSize(insertData->entry));
        }
 
-       rdata[cnt].buffer = InvalidBuffer;
-       rdata[cnt].data = (char *) &data;
-       rdata[cnt].len = sizeof(ginxlogInsert);
-       rdata[cnt].next = &rdata[cnt + 1];
-       cnt++;
-
-       rdata[cnt].buffer = InvalidBuffer;
-       rdata[cnt].data = (char *) btree->entry;
-       rdata[cnt].len = IndexTupleSize(btree->entry);
-       rdata[cnt].next = NULL;
-
-       btree->entry = NULL;
+       return INSERTED;
 }
 
 /*
  * Place tuple and split page, original buffer(lbuf) leaves untouched,
- * returns shadow page of lbuf filled new data.
+ * returns shadow pages filled with new data.
  * Tuples are distributed between pages by equal size on its, not
  * an equal number!
  */
-static Page
-entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRecData **prdata)
+static void
+entrySplitPage(GinBtree btree, Buffer origbuf,
+                          GinBtreeStack *stack,
+                          void *insertPayload,
+                          BlockNumber updateblkno,
+                          Page *newlpage, Page *newrpage)
 {
+       GinBtreeEntryInsertData *insertData = insertPayload;
+       OffsetNumber off = stack->off;
        OffsetNumber i,
                                maxoff,
                                separator = InvalidOffsetNumber;
@@ -555,31 +590,27 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogR
        Size            lsize = 0,
                                size;
        char       *ptr;
-       IndexTuple      itup,
-                               leftrightmost = NULL;
+       IndexTuple      itup;
        Page            page;
-       Page            lpage = PageGetTempPageCopy(BufferGetPage(lbuf));
-       Page            rpage = BufferGetPage(rbuf);
+       Page            lpage = PageGetTempPageCopy(BufferGetPage(origbuf));
+       Page            rpage = PageGetTempPageCopy(BufferGetPage(origbuf));
        Size            pageSize = PageGetPageSize(lpage);
-       /* these must be static so they can be returned to caller */
-       static XLogRecData rdata[2];
-       static ginxlogSplit data;
-       static char tupstore[2 * BLCKSZ];
+       char            tupstore[2 * BLCKSZ];
 
-       *prdata = rdata;
-       data.leftChildBlkno = (GinPageIsLeaf(lpage)) ?
-               InvalidOffsetNumber : GinGetDownlink(btree->entry);
-       data.updateBlkno = entryPreparePage(btree, lpage, off);
+       entryPreparePage(btree, lpage, off, insertData, updateblkno);
 
+       /*
+        * First, append all the existing tuples and the new tuple we're inserting
+        * one after another in a temporary workspace.
+        */
        maxoff = PageGetMaxOffsetNumber(lpage);
        ptr = tupstore;
-
        for (i = FirstOffsetNumber; i <= maxoff; i++)
        {
                if (i == off)
                {
-                       size = MAXALIGN(IndexTupleSize(btree->entry));
-                       memcpy(ptr, btree->entry, size);
+                       size = MAXALIGN(IndexTupleSize(insertData->entry));
+                       memcpy(ptr, insertData->entry, size);
                        ptr += size;
                        totalsize += size + sizeof(ItemIdData);
                }
@@ -593,12 +624,16 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogR
 
        if (off == maxoff + 1)
        {
-               size = MAXALIGN(IndexTupleSize(btree->entry));
-               memcpy(ptr, btree->entry, size);
+               size = MAXALIGN(IndexTupleSize(insertData->entry));
+               memcpy(ptr, insertData->entry, size);
                ptr += size;
                totalsize += size + sizeof(ItemIdData);
        }
 
+       /*
+        * Initialize the left and right pages, and copy all the tuples back to
+        * them.
+        */
        GinInitPage(rpage, GinPageGetOpaque(lpage)->flags, pageSize);
        GinInitPage(lpage, GinPageGetOpaque(rpage)->flags, pageSize);
 
@@ -619,7 +654,6 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogR
                }
                else
                {
-                       leftrightmost = itup;
                        lsize += MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData);
                }
 
@@ -629,48 +663,28 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogR
                ptr += MAXALIGN(IndexTupleSize(itup));
        }
 
-       btree->entry = GinFormInteriorTuple(leftrightmost, lpage,
-                                                                               BufferGetBlockNumber(lbuf));
-
-       btree->rightblkno = BufferGetBlockNumber(rbuf);
-
-       data.node = btree->index->rd_node;
-       data.rootBlkno = InvalidBlockNumber;
-       data.lblkno = BufferGetBlockNumber(lbuf);
-       data.rblkno = BufferGetBlockNumber(rbuf);
-       data.separator = separator;
-       data.nitem = maxoff;
-       data.isData = FALSE;
-       data.isLeaf = GinPageIsLeaf(lpage) ? TRUE : FALSE;
-       data.isRootSplit = FALSE;
-
-       rdata[0].buffer = InvalidBuffer;
-       rdata[0].data = (char *) &data;
-       rdata[0].len = sizeof(ginxlogSplit);
-       rdata[0].next = &rdata[1];
-
-       rdata[1].buffer = InvalidBuffer;
-       rdata[1].data = tupstore;
-       rdata[1].len = MAXALIGN(totalsize);
-       rdata[1].next = NULL;
-
-       return lpage;
+       *newlpage = lpage;
+       *newrpage = rpage;
 }
 
 /*
- * return newly allocated rightmost tuple
+ * Construct insertion payload for inserting the downlink for given buffer.
  */
-IndexTuple
-ginPageGetLinkItup(Buffer buf)
+static void *
+entryPrepareDownlink(GinBtree btree, Buffer lbuf)
 {
-       IndexTuple      itup,
-                               nitup;
-       Page            page = BufferGetPage(buf);
+       GinBtreeEntryInsertData *insertData;
+       Page            lpage = BufferGetPage(lbuf);
+       BlockNumber lblkno = BufferGetBlockNumber(lbuf);
+       IndexTuple      itup;
 
-       itup = getRightMostTuple(page);
-       nitup = GinFormInteriorTuple(itup, page, BufferGetBlockNumber(buf));
+       itup = getRightMostTuple(lpage);
 
-       return nitup;
+       insertData = palloc(sizeof(GinBtreeEntryInsertData));
+       insertData->entry = GinFormInteriorTuple(itup, lpage, lblkno);
+       insertData->isDelete = false;
+
+       return insertData;
 }
 
 /*
@@ -678,20 +692,19 @@ ginPageGetLinkItup(Buffer buf)
  * Also called from ginxlog, should not use btree
  */
 void
-ginEntryFillRoot(GinBtree btree, Buffer root, Buffer lbuf, Buffer rbuf)
+ginEntryFillRoot(GinBtree btree, Page root,
+                                BlockNumber lblkno, Page lpage,
+                                BlockNumber rblkno, Page rpage)
 {
-       Page            page;
        IndexTuple      itup;
 
-       page = BufferGetPage(root);
-
-       itup = ginPageGetLinkItup(lbuf);
-       if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
+       itup = GinFormInteriorTuple(getRightMostTuple(lpage), lpage, lblkno);
+       if (PageAddItem(root, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
                elog(ERROR, "failed to add item to index root page");
        pfree(itup);
 
-       itup = ginPageGetLinkItup(rbuf);
-       if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
+       itup = GinFormInteriorTuple(getRightMostTuple(rpage), rpage, rblkno);
+       if (PageAddItem(root, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
                elog(ERROR, "failed to add item to index root page");
        pfree(itup);
 }
@@ -710,25 +723,23 @@ ginPrepareEntryScan(GinBtree btree, OffsetNumber attnum,
        memset(btree, 0, sizeof(GinBtreeData));
 
        btree->index = ginstate->index;
+       btree->rootBlkno = GIN_ROOT_BLKNO;
        btree->ginstate = ginstate;
 
        btree->findChildPage = entryLocateEntry;
+       btree->getLeftMostChild = entryGetLeftMostPage;
        btree->isMoveRight = entryIsMoveRight;
        btree->findItem = entryLocateLeafEntry;
        btree->findChildPtr = entryFindChildPtr;
-       btree->getLeftMostPage = entryGetLeftMostPage;
-       btree->isEnoughSpace = entryIsEnoughSpace;
        btree->placeToPage = entryPlaceToPage;
-       btree->splitPage = entrySplitPage;
        btree->fillRoot = ginEntryFillRoot;
+       btree->prepareDownlink = entryPrepareDownlink;
 
        btree->isData = FALSE;
-       btree->searchMode = FALSE;
        btree->fullScan = FALSE;
        btree->isBuild = FALSE;
 
        btree->entryAttnum = attnum;
        btree->entryKey = key;
        btree->entryCategory = category;
-       btree->isDelete = FALSE;
 }