]> granicus.if.org Git - postgresql/commitdiff
Improve gist XLOG code to follow the coding rules needed to prevent
authorTom Lane <tgl@sss.pgh.pa.us>
Thu, 30 Mar 2006 23:03:10 +0000 (23:03 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Thu, 30 Mar 2006 23:03:10 +0000 (23:03 +0000)
torn-page problems.  This introduces some issues of its own, mainly
that there are now some critical sections of unreasonably broad scope,
but it's a step forward anyway.  Further cleanup will require some
code refactoring that I'd prefer to get Oleg and Teodor involved in.

src/backend/access/gist/gist.c
src/backend/access/gist/gistvacuum.c
src/backend/access/gist/gistxlog.c
src/include/access/gist_private.h

index de880831bf47c7ec62ca99141197a1375700e5e2..d997db37efbd031e16552824a047ce87b65c0a05 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.129 2006/03/05 15:58:20 momjian Exp $
+ *       $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.130 2006/03/30 23:03:09 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -90,6 +90,7 @@ gistbuild(PG_FUNCTION_ARGS)
        double          reltuples;
        GISTBuildState buildstate;
        Buffer          buffer;
+       Page            page;
 
        /*
         * We expect to be called exactly once for any index relation. If that's
@@ -104,33 +105,33 @@ gistbuild(PG_FUNCTION_ARGS)
 
        /* initialize the root page */
        buffer = gistNewBuffer(index);
+       Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
+       page = BufferGetPage(buffer);
+
+       START_CRIT_SECTION();
+
        GISTInitBuffer(buffer, F_LEAF);
        if (!index->rd_istemp)
        {
                XLogRecPtr      recptr;
                XLogRecData rdata;
-               Page            page;
 
-               rdata.buffer = InvalidBuffer;
                rdata.data = (char *) &(index->rd_node);
                rdata.len = sizeof(RelFileNode);
+               rdata.buffer = InvalidBuffer;
                rdata.next = NULL;
 
-               page = BufferGetPage(buffer);
-
-               START_CRIT_SECTION();
-
                recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_CREATE_INDEX, &rdata);
                PageSetLSN(page, recptr);
                PageSetTLI(page, ThisTimeLineID);
-
-               END_CRIT_SECTION();
        }
        else
-               PageSetLSN(BufferGetPage(buffer), XLogRecPtrForTemp);
+               PageSetLSN(page, XLogRecPtrForTemp);
        LockBuffer(buffer, GIST_UNLOCK);
        WriteBuffer(buffer);
 
+       END_CRIT_SECTION();
+
        /* build the index */
        buildstate.numindexattrs = indexInfo->ii_NumIndexAttrs;
        buildstate.indtuples = 0;
@@ -305,6 +306,15 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
        bool            is_splitted = false;
        bool            is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false;
 
+       /*
+        * XXX this code really ought to work by locking, but not modifying,
+        * all the buffers it needs; then starting a critical section; then
+        * modifying the buffers in an already-determined way and writing an
+        * XLOG record to reflect that.  Since it doesn't, we've got to put
+        * a critical section around the entire process, which is horrible
+        * from a robustness point of view.
+        */
+       START_CRIT_SECTION();
 
        if (!is_leaf)
 
@@ -312,6 +322,11 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
                 * This node's key has been modified, either because a child split
                 * occurred or because we needed to adjust our key for an insert in a
                 * child node. Therefore, remove the old version of this node's key.
+                *
+                * Note: for WAL replay, in the non-split case we handle this by
+                * setting up a one-element todelete array; in the split case, it's
+                * handled implicitly because the tuple vector passed to gistSplit
+                * won't include this tuple.
                 */
 
                PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
@@ -336,9 +351,7 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
                        XLogRecData *rdata;
 
                        rdata = formSplitRdata(state->r->rd_node, state->stack->blkno,
-                                                                  &(state->key), dist);
-
-                       START_CRIT_SECTION();
+                                                                  is_leaf, &(state->key), dist);
 
                        recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
                        ptr = dist;
@@ -348,8 +361,6 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
                                PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID);
                                ptr = ptr->next;
                        }
-
-                       END_CRIT_SECTION();
                }
                else
                {
@@ -410,7 +421,6 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
                        else
                                ourpage = dist;
 
-
                        /* now gets all needed data, and sets nsn's */
                        page = (Page) BufferGetPage(ourpage->buffer);
                        opaque = GistPageGetOpaque(page);
@@ -437,8 +447,11 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
                                WriteBuffer(ptr->buffer);
                                ptr = ptr->next;
                        }
+
+                       WriteNoReleaseBuffer(state->stack->buffer);
                }
-               WriteNoReleaseBuffer(state->stack->buffer);
+
+               END_CRIT_SECTION();
        }
        else
        {
@@ -451,7 +464,7 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
                if (!state->r->rd_istemp)
                {
                        OffsetNumber noffs = 0,
-                                               offs[MAXALIGN(sizeof(OffsetNumber)) / sizeof(OffsetNumber)];
+                                               offs[1];
                        XLogRecPtr      recptr;
                        XLogRecData *rdata;
 
@@ -462,17 +475,14 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
                                noffs = 1;
                        }
 
-                       rdata = formUpdateRdata(state->r->rd_node, state->stack->blkno,
-                                                        offs, noffs, false, state->itup, state->ituplen,
+                       rdata = formUpdateRdata(state->r->rd_node, state->stack->buffer,
+                                                                       offs, noffs, false,
+                                                                       state->itup, state->ituplen,
                                                                        &(state->key));
 
-                       START_CRIT_SECTION();
-
-                       recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata);
+                       recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
                        PageSetLSN(state->stack->page, recptr);
                        PageSetTLI(state->stack->page, ThisTimeLineID);
-
-                       END_CRIT_SECTION();
                }
                else
                        PageSetLSN(state->stack->page, XLogRecPtrForTemp);
@@ -481,6 +491,8 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
                        state->needInsertComplete = false;
                WriteNoReleaseBuffer(state->stack->buffer);
 
+               END_CRIT_SECTION();
+
                if (!is_leaf)                   /* small optimization: inform scan ablout
                                                                 * deleting... */
                        gistadjscans(state->r, GISTOP_DEL, state->stack->blkno,
@@ -636,30 +648,14 @@ gistfindleaf(GISTInsertState *state, GISTSTATE *giststate)
 }
 
 /*
- * Should have the same interface as XLogReadBuffer
- */
-static Buffer
-gistReadAndLockBuffer(Relation r, BlockNumber blkno)
-{
-       Buffer          buffer = ReadBuffer(r, blkno);
-
-       LockBuffer(buffer, GIST_SHARE);
-       return buffer;
-}
-
-/*
- * Traverse the tree to find path from root page.
+ * Traverse the tree to find path from root page to specified "child" block.
  *
  * returns from the begining of closest parent;
  *
- * Function is used in both regular and recovery mode, so must work with
- * different read functions (gistReadAndLockBuffer and XLogReadBuffer)
- *
  * To prevent deadlocks, this should lock only one page simultaneously.
  */
 GISTInsertStack *
-gistFindPath(Relation r, BlockNumber child,
-                        Buffer (*myReadBuffer) (Relation, BlockNumber))
+gistFindPath(Relation r, BlockNumber child)
 {
        Page            page;
        Buffer          buffer;
@@ -677,7 +673,8 @@ gistFindPath(Relation r, BlockNumber child,
 
        while (top && top->blkno != child)
        {
-               buffer = myReadBuffer(r, top->blkno);   /* locks buffer */
+               buffer = ReadBuffer(r, top->blkno);
+               LockBuffer(buffer, GIST_SHARE);
                gistcheckpage(r, buffer);
                page = (Page) BufferGetPage(buffer);
 
@@ -833,7 +830,7 @@ gistFindCorrectParent(Relation r, GISTInsertStack *child)
                }
 
                /* ok, find new path */
-               ptr = parent = gistFindPath(r, child->blkno, gistReadAndLockBuffer);
+               ptr = parent = gistFindPath(r, child->blkno);
                Assert(ptr != NULL);
 
                /* read all buffers as expected by caller */
@@ -1192,27 +1189,31 @@ gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer ke
 
        Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
        page = BufferGetPage(buffer);
-       GISTInitBuffer(buffer, 0);
 
+       START_CRIT_SECTION();
+
+       GISTInitBuffer(buffer, 0);      /* XXX not F_LEAF? */
        gistfillbuffer(r, page, itup, len, FirstOffsetNumber);
+
        if (!r->rd_istemp)
        {
                XLogRecPtr      recptr;
                XLogRecData *rdata;
 
-               rdata = formUpdateRdata(r->rd_node, GIST_ROOT_BLKNO,
-                                                               NULL, 0, false, itup, len, key);
-
-               START_CRIT_SECTION();
+               rdata = formUpdateRdata(r->rd_node, buffer,
+                                                               NULL, 0, false,
+                                                               itup, len, key);
 
                recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata);
                PageSetLSN(page, recptr);
                PageSetTLI(page, ThisTimeLineID);
-
-               END_CRIT_SECTION();
        }
        else
                PageSetLSN(page, XLogRecPtrForTemp);
+
+       WriteNoReleaseBuffer(buffer);
+
+       END_CRIT_SECTION();
 }
 
 void
index 664ba47e40dbe1fd1518bb1f1df4655c8da3fbd3..e7925c2c15138994e3b289d0722361546a0b5a4f 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.16 2006/03/05 15:58:20 momjian Exp $
+ *       $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.17 2006/03/30 23:03:10 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -80,6 +80,12 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
        page = (Page) BufferGetPage(buffer);
        maxoff = PageGetMaxOffsetNumber(page);
 
+       /*
+        * XXX need to reduce scope of changes to page so we can make this
+        * critical section less extensive
+        */
+       START_CRIT_SECTION();
+
        if (GistPageIsLeaf(page))
        {
                if (GistTuplesDeleted(page))
@@ -188,11 +194,9 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
                                        ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
 
                                        rdata = formSplitRdata(gv->index->rd_node, blkno,
-                                                                                  &key, dist);
+                                                                                  false, &key, dist);
                                        xlinfo = rdata->data;
 
-                                       START_CRIT_SECTION();
-
                                        recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
                                        ptr = dist;
                                        while (ptr)
@@ -202,7 +206,6 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
                                                ptr = ptr->next;
                                        }
 
-                                       END_CRIT_SECTION();
                                        pfree(xlinfo);
                                        pfree(rdata);
                                }
@@ -235,8 +238,6 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
                                        oldCtx = MemoryContextSwitchTo(gv->opCtx);
                                        gistnewroot(gv->index, buffer, res.itup, res.ituplen, &key);
                                        MemoryContextSwitchTo(oldCtx);
-
-                                       WriteNoReleaseBuffer(buffer);
                                }
 
                                needwrite = false;
@@ -302,15 +303,14 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
                        XLogRecPtr      recptr;
                        char       *xlinfo;
 
-                       rdata = formUpdateRdata(gv->index->rd_node, blkno, todelete, ntodelete,
-                                                                       res.emptypage, addon, curlenaddon, NULL);
+                       rdata = formUpdateRdata(gv->index->rd_node, buffer,
+                                                                       todelete, ntodelete, res.emptypage,
+                                                                       addon, curlenaddon, NULL);
                        xlinfo = rdata->data;
 
-                       START_CRIT_SECTION();
-                       recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata);
+                       recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
                        PageSetLSN(page, recptr);
                        PageSetTLI(page, ThisTimeLineID);
-                       END_CRIT_SECTION();
 
                        pfree(xlinfo);
                        pfree(rdata);
@@ -322,6 +322,8 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
        else
                ReleaseBuffer(buffer);
 
+       END_CRIT_SECTION();
+
        if (ncompleted && !gv->index->rd_istemp)
                gistxlogInsertCompletion(gv->index->rd_node, completed, ncompleted);
 
@@ -579,6 +581,17 @@ gistbulkdelete(PG_FUNCTION_ARGS)
                         */
                        pushStackIfSplited(page, stack);
 
+                       /*
+                        * Remove deletable tuples from page
+                        *
+                        * XXX try to make this critical section shorter.  Could do it
+                        * by separating the callback loop from the actual tuple deletion,
+                        * but that would affect the definition of the todelete[] array
+                        * passed into the WAL record (because the indexes would all be
+                        * pre-deletion).
+                        */
+                       START_CRIT_SECTION();
+
                        maxoff = PageGetMaxOffsetNumber(page);
 
                        for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
@@ -608,17 +621,17 @@ gistbulkdelete(PG_FUNCTION_ARGS)
                                {
                                        XLogRecData *rdata;
                                        XLogRecPtr      recptr;
-                                       gistxlogEntryUpdate *xlinfo;
+                                       gistxlogPageUpdate *xlinfo;
 
-                                       rdata = formUpdateRdata(rel->rd_node, stack->blkno, todelete, ntodelete,
-                                                                                       false, NULL, 0, NULL);
-                                       xlinfo = (gistxlogEntryUpdate *) rdata->data;
+                                       rdata = formUpdateRdata(rel->rd_node, buffer,
+                                                                                       todelete, ntodelete, false,
+                                                                                       NULL, 0,
+                                                                                       NULL);
+                                       xlinfo = (gistxlogPageUpdate *) rdata->data;
 
-                                       START_CRIT_SECTION();
-                                       recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata);
+                                       recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
                                        PageSetLSN(page, recptr);
                                        PageSetTLI(page, ThisTimeLineID);
-                                       END_CRIT_SECTION();
 
                                        pfree(xlinfo);
                                        pfree(rdata);
@@ -627,6 +640,8 @@ gistbulkdelete(PG_FUNCTION_ARGS)
                                        PageSetLSN(page, XLogRecPtrForTemp);
                                WriteNoReleaseBuffer(buffer);
                        }
+
+                       END_CRIT_SECTION();
                }
                else
                {
index 9a15061484f9d7cd17286e38f11f7b6cba3dbb8a..12a521c75c96c0b01e1ef4e8afa9f1d6ecb329ba 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *                      $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.12 2006/03/29 21:17:36 tgl Exp $
+ *                      $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.13 2006/03/30 23:03:10 tgl Exp $
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
 typedef struct
 {
-       gistxlogEntryUpdate *data;
+       gistxlogPageUpdate *data;
        int                     len;
        IndexTuple *itup;
        OffsetNumber *todelete;
-} EntryUpdateRecord;
+} PageUpdateRecord;
 
 typedef struct
 {
@@ -58,16 +58,15 @@ typedef struct gistIncompleteInsert
 } gistIncompleteInsert;
 
 
-MemoryContext opCtx;
-MemoryContext insertCtx;
+static MemoryContext opCtx;            /* working memory for operations */
+static MemoryContext insertCtx;        /* holds incomplete_inserts list */
 static List *incomplete_inserts;
 
 
-#define ItemPointerEQ( a, b )  \
-       ( \
-       ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \
-       ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) \
-               )
+#define ItemPointerEQ(a, b)    \
+       ( ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \
+         ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) )
+
 
 static void
 pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
@@ -101,7 +100,13 @@ pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
        }
        Assert(ninsert->lenblk > 0);
 
-       incomplete_inserts = lappend(incomplete_inserts, ninsert);
+       /*
+        * Stick the new incomplete insert onto the front of the list, not the
+        * back.  This is so that gist_xlog_cleanup will process incompletions
+        * in last-in-first-out order.
+        */
+       incomplete_inserts = lcons(ninsert, incomplete_inserts);
+
        MemoryContextSwitchTo(oldCxt);
 }
 
@@ -116,10 +121,9 @@ forgetIncompleteInsert(RelFileNode node, ItemPointerData key)
 
                if (RelFileNodeEquals(node, insert->node) && ItemPointerEQ(&(insert->key), &(key)))
                {
-
                        /* found */
-                       pfree(insert->blkno);
                        incomplete_inserts = list_delete_ptr(incomplete_inserts, insert);
+                       pfree(insert->blkno);
                        pfree(insert);
                        break;
                }
@@ -127,25 +131,25 @@ forgetIncompleteInsert(RelFileNode node, ItemPointerData key)
 }
 
 static void
-decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record)
+decodePageUpdateRecord(PageUpdateRecord *decoded, XLogRecord *record)
 {
        char       *begin = XLogRecGetData(record),
                           *ptr;
        int                     i = 0,
                                addpath = 0;
 
-       decoded->data = (gistxlogEntryUpdate *) begin;
+       decoded->data = (gistxlogPageUpdate *) begin;
 
        if (decoded->data->ntodelete)
        {
-               decoded->todelete = (OffsetNumber *) (begin + sizeof(gistxlogEntryUpdate) + addpath);
+               decoded->todelete = (OffsetNumber *) (begin + sizeof(gistxlogPageUpdate) + addpath);
                addpath = MAXALIGN(sizeof(OffsetNumber) * decoded->data->ntodelete);
        }
        else
                decoded->todelete = NULL;
 
        decoded->len = 0;
-       ptr = begin + sizeof(gistxlogEntryUpdate) + addpath;
+       ptr = begin + sizeof(gistxlogPageUpdate) + addpath;
        while (ptr - begin < record->xl_len)
        {
                decoded->len++;
@@ -154,7 +158,7 @@ decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record)
 
        decoded->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * decoded->len);
 
-       ptr = begin + sizeof(gistxlogEntryUpdate) + addpath;
+       ptr = begin + sizeof(gistxlogPageUpdate) + addpath;
        while (ptr - begin < record->xl_len)
        {
                decoded->itup[i] = (IndexTuple) ptr;
@@ -167,38 +171,30 @@ decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record)
  * redo any page update (except page split)
  */
 static void
-gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
+gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
 {
-       EntryUpdateRecord xlrec;
+       PageUpdateRecord xlrec;
        Relation        reln;
        Buffer          buffer;
        Page            page;
 
-       decodeEntryUpdateRecord(&xlrec, record);
+       /* nothing to do if whole page was backed up (and no info to do it with) */
+       if (record->xl_info & XLR_BKP_BLOCK_1)
+               return;
+
+       decodePageUpdateRecord(&xlrec, record);
 
        reln = XLogOpenRelation(xlrec.data->node);
        buffer = XLogReadBuffer(reln, xlrec.data->blkno, false);
        if (!BufferIsValid(buffer))
-               elog(PANIC, "block %u unfound", xlrec.data->blkno);
+               return;
        page = (Page) BufferGetPage(buffer);
 
-       if (isnewroot)
+       if (XLByteLE(lsn, PageGetLSN(page)))
        {
-               if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(page)))
-               {
-                       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-                       ReleaseBuffer(buffer);
-                       return;
-               }
-       }
-       else
-       {
-               if (XLByteLE(lsn, PageGetLSN(page)))
-               {
-                       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-                       ReleaseBuffer(buffer);
-                       return;
-               }
+               LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+               ReleaseBuffer(buffer);
+               return;
        }
 
        if (xlrec.data->isemptypage)
@@ -237,9 +233,9 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
                        GistClearTuplesDeleted(page);
        }
 
+       GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
        PageSetLSN(page, lsn);
        PageSetTLI(page, ThisTimeLineID);
-       GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
        LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
        WriteBuffer(buffer);
 
@@ -294,38 +290,21 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
        Buffer          buffer;
        Page            page;
        int                     i;
-       int                     flags = 0;
+       int                     flags;
 
        decodePageSplitRecord(&xlrec, record);
        reln = XLogOpenRelation(xlrec.data->node);
-
-       /* first of all wee need get F_LEAF flag from original page */
-       buffer = XLogReadBuffer(reln, xlrec.data->origblkno, false);
-       if (!BufferIsValid(buffer))
-               elog(PANIC, "block %u unfound", xlrec.data->origblkno);
-       page = (Page) BufferGetPage(buffer);
-       flags = (GistPageIsLeaf(page)) ? F_LEAF : 0;
-       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-       ReleaseBuffer(buffer);
+       flags = xlrec.data->origleaf ? F_LEAF : 0;
 
        /* loop around all pages */
        for (i = 0; i < xlrec.data->npage; i++)
        {
                NewPage    *newpage = xlrec.page + i;
-               bool            isorigpage = (xlrec.data->origblkno == newpage->header->blkno) ? true : false;
 
-               buffer = XLogReadBuffer(reln, newpage->header->blkno, !isorigpage);
-               if (!BufferIsValid(buffer))
-                       elog(PANIC, "block %u unfound", newpage->header->blkno);
+               buffer = XLogReadBuffer(reln, newpage->header->blkno, true);
+               Assert(BufferIsValid(buffer));
                page = (Page) BufferGetPage(buffer);
 
-               if (XLByteLE(lsn, PageGetLSN(page)))
-               {
-                       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-                       ReleaseBuffer(buffer);
-                       continue;
-               }
-
                /* ok, clear buffer */
                GISTInitBuffer(buffer, flags);
 
@@ -399,12 +378,11 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record)
        oldCxt = MemoryContextSwitchTo(opCtx);
        switch (info)
        {
-               case XLOG_GIST_ENTRY_UPDATE:
-               case XLOG_GIST_ENTRY_DELETE:
-                       gistRedoEntryUpdateRecord(lsn, record, false);
+               case XLOG_GIST_PAGE_UPDATE:
+                       gistRedoPageUpdateRecord(lsn, record, false);
                        break;
                case XLOG_GIST_NEW_ROOT:
-                       gistRedoEntryUpdateRecord(lsn, record, true);
+                       gistRedoPageUpdateRecord(lsn, record, true);
                        break;
                case XLOG_GIST_PAGE_SPLIT:
                        gistRedoPageSplitRecord(lsn, record);
@@ -433,7 +411,7 @@ out_target(StringInfo buf, RelFileNode node, ItemPointerData key)
 }
 
 static void
-out_gistxlogEntryUpdate(StringInfo buf, gistxlogEntryUpdate *xlrec)
+out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec)
 {
        out_target(buf, xlrec->node, xlrec->key);
        appendStringInfo(buf, "; block number %u", xlrec->blkno);
@@ -455,17 +433,13 @@ gist_desc(StringInfo buf, uint8 xl_info, char *rec)
 
        switch (info)
        {
-               case XLOG_GIST_ENTRY_UPDATE:
-                       appendStringInfo(buf, "entry_update: ");
-                       out_gistxlogEntryUpdate(buf, (gistxlogEntryUpdate *) rec);
-                       break;
-               case XLOG_GIST_ENTRY_DELETE:
-                       appendStringInfo(buf, "entry_delete: ");
-                       out_gistxlogEntryUpdate(buf, (gistxlogEntryUpdate *) rec);
+               case XLOG_GIST_PAGE_UPDATE:
+                       appendStringInfo(buf, "page_update: ");
+                       out_gistxlogPageUpdate(buf, (gistxlogPageUpdate *) rec);
                        break;
                case XLOG_GIST_NEW_ROOT:
                        appendStringInfo(buf, "new_root: ");
-                       out_target(buf, ((gistxlogEntryUpdate *) rec)->node, ((gistxlogEntryUpdate *) rec)->key);
+                       out_target(buf, ((gistxlogPageUpdate *) rec)->node, ((gistxlogPageUpdate *) rec)->key);
                        break;
                case XLOG_GIST_PAGE_SPLIT:
                        out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec);
@@ -506,60 +480,47 @@ gist_form_invalid_tuple(BlockNumber blkno)
        return tuple;
 }
 
-static Buffer
-gistXLogReadAndLockBuffer(Relation r, BlockNumber blkno)
-{
-       Buffer          buffer = XLogReadBuffer(r, blkno, false);
-
-       if (!BufferIsValid(buffer))
-               elog(PANIC, "block %u unfound", blkno);
-
-       return buffer;
-}
-
 
 static void
-gixtxlogFindPath(Relation index, gistIncompleteInsert *insert)
+gistxlogFindPath(Relation index, gistIncompleteInsert *insert)
 {
        GISTInsertStack *top;
 
        insert->pathlen = 0;
        insert->path = NULL;
 
-       if ((top = gistFindPath(index, insert->origblkno, gistXLogReadAndLockBuffer)) != NULL)
+       if ((top = gistFindPath(index, insert->origblkno)) != NULL)
        {
                int                     i;
-               GISTInsertStack *ptr = top;
+               GISTInsertStack *ptr;
 
-               while (ptr)
-               {
+               for (ptr = top; ptr; ptr = ptr->parent)
                        insert->pathlen++;
-                       ptr = ptr->parent;
-               }
 
                insert->path = (BlockNumber *) palloc(sizeof(BlockNumber) * insert->pathlen);
 
                i = 0;
-               ptr = top;
-               while (ptr)
-               {
-                       insert->path[i] = ptr->blkno;
-                       i++;
-                       ptr = ptr->parent;
-               }
+               for (ptr = top; ptr; ptr = ptr->parent)
+                       insert->path[i++] = ptr->blkno;
        }
        else
                elog(LOG, "lost parent for block %u", insert->origblkno);
 }
 
 /*
- * Continue insert after crash. In normal situation, there isn't any incomplete
- * inserts, but if it might be after crash, WAL may has not a record of completetion.
+ * Continue insert after crash.  In normal situations, there aren't any
+ * incomplete inserts, but if a crash occurs partway through an insertion
+ * sequence, we'll need to finish making the index valid at the end of WAL
+ * replay.
+ *
+ * Note that we assume the index is now in a valid state, except for the
+ * unfinished insertion.  In particular it's safe to invoke gistFindPath();
+ * there shouldn't be any garbage pages for it to run into.
  *
  * Although stored LSN in gistIncompleteInsert is a LSN of child page,
  * we can compare it with LSN of parent, because parent is always locked
  * while we change child page (look at gistmakedeal). So if parent's LSN is
- * lesser than stored lsn then changes in parent doesn't do yet.
+ * less than stored lsn then changes in parent aren't done yet.
  */
 static void
 gistContinueInsert(gistIncompleteInsert *insert)
@@ -602,6 +563,12 @@ gistContinueInsert(gistIncompleteInsert *insert)
 
                LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
                WriteBuffer(buffer);
+
+               /*
+                * XXX fall out to avoid making LOG message at bottom of routine.
+                * I think the logic for when to emit that message is all wrong...
+                */
+               return;
        }
        else
        {
@@ -610,7 +577,7 @@ gistContinueInsert(gistIncompleteInsert *insert)
                int                     numbuffer;
 
                /* construct path */
-               gixtxlogFindPath(index, insert);
+               gistxlogFindPath(index, insert);
 
                Assert(insert->pathlen > 0);
 
@@ -625,9 +592,8 @@ gistContinueInsert(gistIncompleteInsert *insert)
                                                childfound = 0;
 
                        numbuffer = 1;
-                       buffers[numbuffer - 1] = XLogReadBuffer(index, insert->path[i], false);
-                       if (!BufferIsValid(buffers[numbuffer - 1]))
-                               elog(PANIC, "block %u unfound", insert->path[i]);
+                       buffers[numbuffer - 1] = ReadBuffer(index, insert->path[i]);
+                       LockBuffer(buffers[numbuffer - 1], GIST_EXCLUSIVE);
                        pages[numbuffer - 1] = BufferGetPage(buffers[numbuffer - 1]);
 
                        if (XLByteLE(insert->lsn, PageGetLSN(pages[numbuffer - 1])))
@@ -661,10 +627,9 @@ gistContinueInsert(gistIncompleteInsert *insert)
 
                        if (gistnospace(pages[numbuffer - 1], itup, lenitup))
                        {
-                               /* no space left on page, so we should split */
-                               buffers[numbuffer] = XLogReadBuffer(index, P_NEW, true);
-                               if (!BufferIsValid(buffers[numbuffer]))
-                                       elog(PANIC, "could not obtain new block");
+                               /* no space left on page, so we must split */
+                               buffers[numbuffer] = ReadBuffer(index, P_NEW);
+                               LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE);
                                GISTInitBuffer(buffers[numbuffer], 0);
                                pages[numbuffer] = BufferGetPage(buffers[numbuffer]);
                                gistfillbuffer(index, pages[numbuffer], itup, lenitup, FirstOffsetNumber);
@@ -678,7 +643,8 @@ gistContinueInsert(gistIncompleteInsert *insert)
                                         * we split root, just copy tuples from old root to new
                                         * page
                                         */
-                                       parentitup = gistextractbuffer(buffers[numbuffer - 1], &pituplen);
+                                       parentitup = gistextractbuffer(buffers[numbuffer - 1],
+                                                                                                  &pituplen);
 
                                        /* sanity check */
                                        if (i + 1 != insert->pathlen)
@@ -686,9 +652,8 @@ gistContinueInsert(gistIncompleteInsert *insert)
                                                         RelationGetRelationName(index));
 
                                        /* fill new page */
-                                       buffers[numbuffer] = XLogReadBuffer(index, P_NEW, true);
-                                       if (!BufferIsValid(buffers[numbuffer]))
-                                               elog(PANIC, "could not obtain new block");
+                                       buffers[numbuffer] = ReadBuffer(index, P_NEW);
+                                       LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE);
                                        GISTInitBuffer(buffers[numbuffer], 0);
                                        pages[numbuffer] = BufferGetPage(buffers[numbuffer]);
                                        gistfillbuffer(index, pages[numbuffer], parentitup, pituplen, FirstOffsetNumber);
@@ -748,16 +713,10 @@ void
 gist_xlog_cleanup(void)
 {
        ListCell   *l;
-       List       *reverse = NIL;
-       MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx);
-
-       /* we should call gistContinueInsert in reverse order */
+       MemoryContext oldCxt;
 
+       oldCxt = MemoryContextSwitchTo(opCtx);
        foreach(l, incomplete_inserts)
-               reverse = lappend(reverse, lfirst(l));
-
-       MemoryContextSwitchTo(opCtx);
-       foreach(l, reverse)
        {
                gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);
 
@@ -772,10 +731,9 @@ gist_xlog_cleanup(void)
 
 
 XLogRecData *
-formSplitRdata(RelFileNode node, BlockNumber blkno,
+formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
                           ItemPointer key, SplitedPageLayout *dist)
 {
-
        XLogRecData *rdata;
        gistxlogPageSplit *xlrec = (gistxlogPageSplit *) palloc(sizeof(gistxlogPageSplit));
        SplitedPageLayout *ptr;
@@ -793,6 +751,7 @@ formSplitRdata(RelFileNode node, BlockNumber blkno,
 
        xlrec->node = node;
        xlrec->origblkno = blkno;
+       xlrec->origleaf = page_is_leaf;
        xlrec->npage = (uint16) npage;
        if (key)
                xlrec->key = *key;
@@ -825,68 +784,64 @@ formSplitRdata(RelFileNode node, BlockNumber blkno,
        return rdata;
 }
 
-
+/*
+ * Construct the rdata array for an XLOG record describing a page update
+ * (deletion and/or insertion of tuples on a single index page).
+ *
+ * Note that both the todelete array and the tuples are marked as belonging
+ * to the target buffer; they need not be stored in XLOG if XLogInsert decides
+ * to log the whole buffer contents instead.  Also, we take care that there's
+ * at least one rdata item referencing the buffer, even when ntodelete and
+ * ituplen are both zero; this ensures that XLogInsert knows about the buffer.
+ */
 XLogRecData *
-formUpdateRdata(RelFileNode node, BlockNumber blkno,
+formUpdateRdata(RelFileNode node, Buffer buffer,
                                OffsetNumber *todelete, int ntodelete, bool emptypage,
                                IndexTuple *itup, int ituplen, ItemPointer key)
 {
        XLogRecData *rdata;
-       gistxlogEntryUpdate *xlrec = (gistxlogEntryUpdate *) palloc(sizeof(gistxlogEntryUpdate));
+       gistxlogPageUpdate *xlrec;
+       int                     cur,
+                               i;
+
+       /* ugly wart in API: emptypage causes us to ignore other inputs */
+       if (emptypage)
+               ntodelete = ituplen = 0;
+
+       rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (2 + ituplen));
+       xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate));
 
        xlrec->node = node;
-       xlrec->blkno = blkno;
+       xlrec->blkno = BufferGetBlockNumber(buffer);
+       xlrec->ntodelete = ntodelete;
+       xlrec->isemptypage = emptypage;
        if (key)
                xlrec->key = *key;
        else
                ItemPointerSetInvalid(&(xlrec->key));
 
-       if (emptypage)
-       {
-               xlrec->isemptypage = true;
-               xlrec->ntodelete = 0;
-
-               rdata = (XLogRecData *) palloc(sizeof(XLogRecData));
-               rdata->buffer = InvalidBuffer;
-               rdata->data = (char *) xlrec;
-               rdata->len = sizeof(gistxlogEntryUpdate);
-               rdata->next = NULL;
-       }
-       else
-       {
-               int                     cur = 1,
-                                       i;
-
-               xlrec->isemptypage = false;
-               xlrec->ntodelete = ntodelete;
-
-               rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (2 + ituplen));
-
-               rdata->buffer = InvalidBuffer;
-               rdata->data = (char *) xlrec;
-               rdata->len = sizeof(gistxlogEntryUpdate);
-               rdata->next = NULL;
+       rdata[0].data = (char *) xlrec;
+       rdata[0].len = sizeof(gistxlogPageUpdate);
+       rdata[0].buffer = InvalidBuffer;
+       rdata[0].next = &(rdata[1]);
 
-               if (ntodelete)
-               {
-                       rdata[cur - 1].next = &(rdata[cur]);
-                       rdata[cur].buffer = InvalidBuffer;
-                       rdata[cur].data = (char *) todelete;
-                       rdata[cur].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete);
-                       rdata[cur].next = NULL;
-                       cur++;
-               }
+       rdata[1].data = (char *) todelete;
+       rdata[1].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete);
+       rdata[1].buffer = buffer;
+       rdata[1].buffer_std = true;
+       rdata[1].next = NULL;
 
-               /* new tuples */
-               for (i = 0; i < ituplen; i++)
-               {
-                       rdata[cur].buffer = InvalidBuffer;
-                       rdata[cur].data = (char *) (itup[i]);
-                       rdata[cur].len = IndexTupleSize(itup[i]);
-                       rdata[cur].next = NULL;
-                       rdata[cur - 1].next = &(rdata[cur]);
-                       cur++;
-               }
+       /* new tuples */
+       cur = 2;
+       for (i = 0; i < ituplen; i++)
+       {
+               rdata[cur - 1].next = &(rdata[cur]);
+               rdata[cur].data = (char *) (itup[i]);
+               rdata[cur].len = IndexTupleSize(itup[i]);
+               rdata[cur].buffer = buffer;
+               rdata[cur].buffer_std = true;
+               rdata[cur].next = NULL;
+               cur++;
        }
 
        return rdata;
index 3b072da637615628767306badc61d7023d112562..1bfc90abbcedf69641e0d7bd00d4ce1dcd6ee736 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.11 2006/03/24 04:32:13 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.12 2006/03/30 23:03:10 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -80,11 +80,13 @@ typedef GISTScanOpaqueData *GISTScanOpaque;
 /* XLog stuff */
 extern const XLogRecPtr XLogRecPtrForTemp;
 
-#define XLOG_GIST_ENTRY_UPDATE 0x00
-#define XLOG_GIST_ENTRY_DELETE 0x10
-#define XLOG_GIST_NEW_ROOT     0x20
+#define XLOG_GIST_PAGE_UPDATE          0x00
+#define XLOG_GIST_NEW_ROOT                     0x20
+#define XLOG_GIST_PAGE_SPLIT           0x30
+#define XLOG_GIST_INSERT_COMPLETE      0x40
+#define XLOG_GIST_CREATE_INDEX         0x50
 
-typedef struct gistxlogEntryUpdate
+typedef struct gistxlogPageUpdate
 {
        RelFileNode node;
        BlockNumber blkno;
@@ -100,17 +102,16 @@ typedef struct gistxlogEntryUpdate
        /*
         * follow: 1. todelete OffsetNumbers 2. tuples to insert
         */
-} gistxlogEntryUpdate;
-
-#define XLOG_GIST_PAGE_SPLIT   0x30
+} gistxlogPageUpdate;
 
 typedef struct gistxlogPageSplit
 {
        RelFileNode node;
        BlockNumber origblkno;          /* splitted page */
+       bool            origleaf;               /* was splitted page a leaf page? */
        uint16          npage;
 
-       /* see comments on gistxlogEntryUpdate */
+       /* see comments on gistxlogPageUpdate */
        ItemPointerData key;
 
        /*
@@ -118,22 +119,19 @@ typedef struct gistxlogPageSplit
         */
 } gistxlogPageSplit;
 
-#define XLOG_GIST_INSERT_COMPLETE  0x40
-
 typedef struct gistxlogPage
 {
        BlockNumber blkno;
-       int                     num;
+       int                     num;                    /* number of index tuples following */
 } gistxlogPage;
 
-#define XLOG_GIST_CREATE_INDEX 0x50
-
 typedef struct gistxlogInsertComplete
 {
        RelFileNode node;
        /* follows ItemPointerData key to clean */
 } gistxlogInsertComplete;
 
+
 /* SplitedPageLayout - gistSplit function result */
 typedef struct SplitedPageLayout
 {
@@ -239,8 +237,7 @@ extern void gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, It
 extern IndexTuple *gistSplit(Relation r, Buffer buffer, IndexTuple *itup,
                  int *len, SplitedPageLayout **dist, GISTSTATE *giststate);
 
-extern GISTInsertStack *gistFindPath(Relation r, BlockNumber child,
-                        Buffer (*myReadBuffer) (Relation, BlockNumber));
+extern GISTInsertStack *gistFindPath(Relation r, BlockNumber child);
 
 /* gistxlog.c */
 extern void gist_redo(XLogRecPtr lsn, XLogRecord *record);
@@ -249,11 +246,12 @@ extern void gist_xlog_startup(void);
 extern void gist_xlog_cleanup(void);
 extern IndexTuple gist_form_invalid_tuple(BlockNumber blkno);
 
-extern XLogRecData *formUpdateRdata(RelFileNode node, BlockNumber blkno,
+extern XLogRecData *formUpdateRdata(RelFileNode node, Buffer buffer,
                                OffsetNumber *todelete, int ntodelete, bool emptypage,
                                IndexTuple *itup, int ituplen, ItemPointer key);
 
-extern XLogRecData *formSplitRdata(RelFileNode node, BlockNumber blkno,
+extern XLogRecData *formSplitRdata(RelFileNode node,
+                          BlockNumber blkno, bool page_is_leaf,
                           ItemPointer key, SplitedPageLayout *dist);
 
 extern XLogRecPtr gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len);