]> granicus.if.org Git - postgresql/commitdiff
Fix assorted bugs in GIN's WAL replay logic.
authorTom Lane <tgl@sss.pgh.pa.us>
Mon, 11 Oct 2010 23:04:37 +0000 (19:04 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Mon, 11 Oct 2010 23:04:37 +0000 (19:04 -0400)
The original coding was quite sloppy about handling the case where
XLogReadBuffer fails (because the page has since been deleted).  This
would result in either "bad buffer id: 0" or an Assert failure during
replay, if indeed the page were no longer there.  In a couple of places
it also neglected to check whether the change had already been applied,
which would probably result in corrupted index contents.  I believe that
bug #5703 is an instance of the first problem.  These issues could show up
without replication, but only if you were unfortunate enough to crash
between modification of a GIN index and the next checkpoint.

Back-patch to 8.2, which is as far back as GIN has WAL support.

src/backend/access/gin/ginxlog.c

index 7a581334f1466a767950ff7e3a2c18047cdb5d9d..75997d9534b4e03558a21e9f1a9fd6534fa5cf81 100644 (file)
@@ -77,11 +77,13 @@ ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
 
        MetaBuffer = XLogReadBuffer(*node, GIN_METAPAGE_BLKNO, true);
        Assert(BufferIsValid(MetaBuffer));
+       page = (Page) BufferGetPage(MetaBuffer);
+
        GinInitMetabuffer(MetaBuffer);
 
-       page = (Page) BufferGetPage(MetaBuffer);
        PageSetLSN(page, lsn);
        PageSetTLI(page, ThisTimeLineID);
+       MarkBufferDirty(MetaBuffer);
 
        RootBuffer = XLogReadBuffer(*node, GIN_ROOT_BLKNO, true);
        Assert(BufferIsValid(RootBuffer));
@@ -91,11 +93,10 @@ ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
 
        PageSetLSN(page, lsn);
        PageSetTLI(page, ThisTimeLineID);
-
-       MarkBufferDirty(MetaBuffer);
-       UnlockReleaseBuffer(MetaBuffer);
        MarkBufferDirty(RootBuffer);
+
        UnlockReleaseBuffer(RootBuffer);
+       UnlockReleaseBuffer(MetaBuffer);
 }
 
 static void
@@ -128,21 +129,49 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
        Buffer          buffer;
        Page            page;
 
+       /* first, forget any incomplete split this insertion completes */
+       if (data->isData)
+       {
+               Assert(data->isDelete == FALSE);
+               if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
+               {
+                       PostingItem *pitem;
+
+                       pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
+                       forgetIncompleteSplit(data->node,
+                                                                 PostingItemGetBlockNumber(pitem),
+                                                                 data->updateBlkno);
+               }
+
+       }
+       else
+       {
+               if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
+               {
+                       IndexTuple      itup;
+
+                       itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
+                       forgetIncompleteSplit(data->node,
+                                                                 GinItemPointerGetBlockNumber(&itup->t_tid),
+                                                                 data->updateBlkno);
+               }
+       }
+
        /* nothing else to do if page was backed up */
        if (record->xl_info & XLR_BKP_BLOCK_1)
                return;
 
        buffer = XLogReadBuffer(data->node, data->blkno, false);
-       Assert(BufferIsValid(buffer));
+       if (!BufferIsValid(buffer))
+               return;                                 /* page was deleted, nothing to do */
        page = (Page) BufferGetPage(buffer);
 
-       if (data->isData)
+       if (!XLByteLE(lsn, PageGetLSN(page)))
        {
-               Assert(data->isDelete == FALSE);
-               Assert(GinPageIsData(page));
-
-               if (!XLByteLE(lsn, PageGetLSN(page)))
+               if (data->isData)
                {
+                       Assert(GinPageIsData(page));
+
                        if (data->isLeaf)
                        {
                                OffsetNumber i;
@@ -172,23 +201,12 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
                                GinDataPageAddItem(page, pitem, data->offset);
                        }
                }
-
-               if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
+               else
                {
-                       PostingItem *pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
+                       IndexTuple      itup;
 
-                       forgetIncompleteSplit(data->node, PostingItemGetBlockNumber(pitem), data->updateBlkno);
-               }
+                       Assert(!GinPageIsData(page));
 
-       }
-       else
-       {
-               IndexTuple      itup;
-
-               Assert(!GinPageIsData(page));
-
-               if (!XLByteLE(lsn, PageGetLSN(page)))
-               {
                        if (data->updateBlkno != InvalidBlockNumber)
                        {
                                /* update link to right page after split */
@@ -212,20 +230,12 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
                                  data->node.spcNode, data->node.dbNode, data->node.relNode);
                }
 
-               if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
-               {
-                       itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
-                       forgetIncompleteSplit(data->node, GinItemPointerGetBlockNumber(&itup->t_tid), data->updateBlkno);
-               }
-       }
-
-       if (!XLByteLE(lsn, PageGetLSN(page)))
-       {
                PageSetLSN(page, lsn);
                PageSetTLI(page, ThisTimeLineID);
 
                MarkBufferDirty(buffer);
        }
+
        UnlockReleaseBuffer(buffer);
 }
 
@@ -244,7 +254,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
        if (data->isData)
                flags |= GIN_DATA;
 
-       lbuffer = XLogReadBuffer(data->node, data->lblkno, data->isRootSplit);
+       lbuffer = XLogReadBuffer(data->node, data->lblkno, true);
        Assert(BufferIsValid(lbuffer));
        lpage = (Page) BufferGetPage(lbuffer);
        GinInitBuffer(lbuffer, flags);
@@ -321,7 +331,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
 
        if (data->isRootSplit)
        {
-               Buffer          rootBuf = XLogReadBuffer(data->node, data->rootBlkno, false);
+               Buffer          rootBuf = XLogReadBuffer(data->node, data->rootBlkno, true);
                Page            rootPage = BufferGetPage(rootBuf);
 
                GinInitBuffer(rootBuf, flags & ~GIN_LEAF);
@@ -357,45 +367,49 @@ ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
        Buffer          buffer;
        Page            page;
 
-       /* nothing else to do if page was backed up (and no info to do it with) */
+       /* nothing to do if page was backed up (and no info to do it with) */
        if (record->xl_info & XLR_BKP_BLOCK_1)
                return;
 
        buffer = XLogReadBuffer(data->node, data->blkno, false);
-       Assert(BufferIsValid(buffer));
+       if (!BufferIsValid(buffer))
+               return;
        page = (Page) BufferGetPage(buffer);
 
-       if (GinPageIsData(page))
-       {
-               memcpy(GinDataPageGetData(page), XLogRecGetData(record) + sizeof(ginxlogVacuumPage),
-                          GinSizeOfItem(page) *data->nitem);
-               GinPageGetOpaque(page)->maxoff = data->nitem;
-       }
-       else
+       if (!XLByteLE(lsn, PageGetLSN(page)))
        {
-               OffsetNumber i,
-                                  *tod;
-               IndexTuple      itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogVacuumPage));
+               if (GinPageIsData(page))
+               {
+                       memcpy(GinDataPageGetData(page), XLogRecGetData(record) + sizeof(ginxlogVacuumPage),
+                                  GinSizeOfItem(page) *data->nitem);
+                       GinPageGetOpaque(page)->maxoff = data->nitem;
+               }
+               else
+               {
+                       OffsetNumber i,
+                               *tod;
+                       IndexTuple      itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogVacuumPage));
 
-               tod = (OffsetNumber *) palloc(sizeof(OffsetNumber) * PageGetMaxOffsetNumber(page));
-               for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++)
-                       tod[i - 1] = i;
+                       tod = (OffsetNumber *) palloc(sizeof(OffsetNumber) * PageGetMaxOffsetNumber(page));
+                       for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++)
+                               tod[i - 1] = i;
 
-               PageIndexMultiDelete(page, tod, PageGetMaxOffsetNumber(page));
+                       PageIndexMultiDelete(page, tod, PageGetMaxOffsetNumber(page));
 
-               for (i = 0; i < data->nitem; i++)
-               {
-                       if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
-                               elog(ERROR, "failed to add item to index page in %u/%u/%u",
-                                 data->node.spcNode, data->node.dbNode, data->node.relNode);
-                       itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
+                       for (i = 0; i < data->nitem; i++)
+                       {
+                               if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
+                                       elog(ERROR, "failed to add item to index page in %u/%u/%u",
+                                                data->node.spcNode, data->node.dbNode, data->node.relNode);
+                               itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
+                       }
                }
-       }
 
-       PageSetLSN(page, lsn);
-       PageSetTLI(page, ThisTimeLineID);
+               PageSetLSN(page, lsn);
+               PageSetTLI(page, ThisTimeLineID);
+               MarkBufferDirty(buffer);
+       }
 
-       MarkBufferDirty(buffer);
        UnlockReleaseBuffer(buffer);
 }
 
@@ -409,38 +423,56 @@ ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
        if (!(record->xl_info & XLR_BKP_BLOCK_1))
        {
                buffer = XLogReadBuffer(data->node, data->blkno, false);
-               page = BufferGetPage(buffer);
-               Assert(GinPageIsData(page));
-               GinPageGetOpaque(page)->flags = GIN_DELETED;
-               PageSetLSN(page, lsn);
-               PageSetTLI(page, ThisTimeLineID);
-               MarkBufferDirty(buffer);
-               UnlockReleaseBuffer(buffer);
+               if (BufferIsValid(buffer))
+               {
+                       page = BufferGetPage(buffer);
+                       if (!XLByteLE(lsn, PageGetLSN(page)))
+                       {
+                               Assert(GinPageIsData(page));
+                               GinPageGetOpaque(page)->flags = GIN_DELETED;
+                               PageSetLSN(page, lsn);
+                               PageSetTLI(page, ThisTimeLineID);
+                               MarkBufferDirty(buffer);
+                       }
+                       UnlockReleaseBuffer(buffer);
+               }
        }
 
        if (!(record->xl_info & XLR_BKP_BLOCK_2))
        {
                buffer = XLogReadBuffer(data->node, data->parentBlkno, false);
-               page = BufferGetPage(buffer);
-               Assert(GinPageIsData(page));
-               Assert(!GinPageIsLeaf(page));
-               PageDeletePostingItem(page, data->parentOffset);
-               PageSetLSN(page, lsn);
-               PageSetTLI(page, ThisTimeLineID);
-               MarkBufferDirty(buffer);
-               UnlockReleaseBuffer(buffer);
+               if (BufferIsValid(buffer))
+               {
+                       page = BufferGetPage(buffer);
+                       if (!XLByteLE(lsn, PageGetLSN(page)))
+                       {
+                               Assert(GinPageIsData(page));
+                               Assert(!GinPageIsLeaf(page));
+                               PageDeletePostingItem(page, data->parentOffset);
+                               PageSetLSN(page, lsn);
+                               PageSetTLI(page, ThisTimeLineID);
+                               MarkBufferDirty(buffer);
+                       }
+                       UnlockReleaseBuffer(buffer);
+               }
        }
 
        if (!(record->xl_info & XLR_BKP_BLOCK_3) && data->leftBlkno != InvalidBlockNumber)
        {
                buffer = XLogReadBuffer(data->node, data->leftBlkno, false);
-               page = BufferGetPage(buffer);
-               Assert(GinPageIsData(page));
-               GinPageGetOpaque(page)->rightlink = data->rightLink;
-               PageSetLSN(page, lsn);
-               PageSetTLI(page, ThisTimeLineID);
-               MarkBufferDirty(buffer);
-               UnlockReleaseBuffer(buffer);
+               if (BufferIsValid(buffer))
+               {
+                       page = BufferGetPage(buffer);
+                       if (!XLByteLE(lsn, PageGetLSN(page)))
+                       {
+                               Assert(GinPageIsData(page));
+                               GinPageGetOpaque(page)->rightlink = data->rightLink;
+                               PageSetLSN(page, lsn);
+                               PageSetTLI(page, ThisTimeLineID);
+                               MarkBufferDirty(buffer);
+                       }
+                       UnlockReleaseBuffer(buffer);
+               }
        }
 }
 
@@ -450,8 +482,11 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)
        ginxlogUpdateMeta *data = (ginxlogUpdateMeta *) XLogRecGetData(record);
        Buffer          metabuffer;
        Page            metapage;
+       Buffer          buffer;
 
        metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false);
+       if (!BufferIsValid(metabuffer))
+               elog(PANIC, "GIN metapage disappeared");
        metapage = BufferGetPage(metabuffer);
 
        if (!XLByteLE(lsn, PageGetLSN(metapage)))
@@ -469,40 +504,43 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)
                 */
                if (!(record->xl_info & XLR_BKP_BLOCK_1))
                {
-                       Buffer          buffer = XLogReadBuffer(data->node, data->metadata.tail, false);
-                       Page            page = BufferGetPage(buffer);
-
-                       if (!XLByteLE(lsn, PageGetLSN(page)))
+                       buffer = XLogReadBuffer(data->node, data->metadata.tail, false);
+                       if (BufferIsValid(buffer))
                        {
-                               OffsetNumber l,
-                                                       off = (PageIsEmpty(page)) ? FirstOffsetNumber :
-                               OffsetNumberNext(PageGetMaxOffsetNumber(page));
-                               int                     i,
-                                                       tupsize;
-                               IndexTuple      tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogUpdateMeta));
-
-                               for (i = 0; i < data->ntuples; i++)
+                               Page            page = BufferGetPage(buffer);
+
+                               if (!XLByteLE(lsn, PageGetLSN(page)))
                                {
-                                       tupsize = IndexTupleSize(tuples);
+                                       OffsetNumber l,
+                                               off = (PageIsEmpty(page)) ? FirstOffsetNumber :
+                                               OffsetNumberNext(PageGetMaxOffsetNumber(page));
+                                       int                     i,
+                                               tupsize;
+                                       IndexTuple      tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogUpdateMeta));
 
-                                       l = PageAddItem(page, (Item) tuples, tupsize, off, false, false);
+                                       for (i = 0; i < data->ntuples; i++)
+                                       {
+                                               tupsize = IndexTupleSize(tuples);
 
-                                       if (l == InvalidOffsetNumber)
-                                               elog(ERROR, "failed to add item to index page");
+                                               l = PageAddItem(page, (Item) tuples, tupsize, off, false, false);
 
-                                       tuples = (IndexTuple) (((char *) tuples) + tupsize);
-                               }
+                                               if (l == InvalidOffsetNumber)
+                                                       elog(ERROR, "failed to add item to index page");
 
-                               /*
-                                * Increase counter of heap tuples
-                                */
-                               GinPageGetOpaque(page)->maxoff++;
+                                               tuples = (IndexTuple) (((char *) tuples) + tupsize);
+                                       }
 
-                               PageSetLSN(page, lsn);
-                               PageSetTLI(page, ThisTimeLineID);
-                               MarkBufferDirty(buffer);
+                                       /*
+                                        * Increase counter of heap tuples
+                                        */
+                                       GinPageGetOpaque(page)->maxoff++;
+
+                                       PageSetLSN(page, lsn);
+                                       PageSetTLI(page, ThisTimeLineID);
+                                       MarkBufferDirty(buffer);
+                               }
+                               UnlockReleaseBuffer(buffer);
                        }
-                       UnlockReleaseBuffer(buffer);
                }
        }
        else if (data->prevTail != InvalidBlockNumber)
@@ -510,19 +548,21 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)
                /*
                 * New tail
                 */
-
-               Buffer          buffer = XLogReadBuffer(data->node, data->prevTail, false);
-               Page            page = BufferGetPage(buffer);
-
-               if (!XLByteLE(lsn, PageGetLSN(page)))
+               buffer = XLogReadBuffer(data->node, data->prevTail, false);
+               if (BufferIsValid(buffer))
                {
-                       GinPageGetOpaque(page)->rightlink = data->newRightlink;
+                       Page            page = BufferGetPage(buffer);
 
-                       PageSetLSN(page, lsn);
-                       PageSetTLI(page, ThisTimeLineID);
-                       MarkBufferDirty(buffer);
+                       if (!XLByteLE(lsn, PageGetLSN(page)))
+                       {
+                               GinPageGetOpaque(page)->rightlink = data->newRightlink;
+
+                               PageSetLSN(page, lsn);
+                               PageSetTLI(page, ThisTimeLineID);
+                               MarkBufferDirty(buffer);
+                       }
+                       UnlockReleaseBuffer(buffer);
                }
-               UnlockReleaseBuffer(buffer);
        }
 
        UnlockReleaseBuffer(metabuffer);
@@ -544,6 +584,7 @@ ginRedoInsertListPage(XLogRecPtr lsn, XLogRecord *record)
                return;
 
        buffer = XLogReadBuffer(data->node, data->blkno, true);
+       Assert(BufferIsValid(buffer));
        page = BufferGetPage(buffer);
 
        GinInitBuffer(buffer, GIN_LIST);
@@ -587,6 +628,8 @@ ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record)
        int                     i;
 
        metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false);
+       if (!BufferIsValid(metabuffer))
+               elog(PANIC, "GIN metapage disappeared");
        metapage = BufferGetPage(metabuffer);
 
        if (!XLByteLE(lsn, PageGetLSN(metapage)))
@@ -600,18 +643,22 @@ ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record)
        for (i = 0; i < data->ndeleted; i++)
        {
                Buffer          buffer = XLogReadBuffer(data->node, data->toDelete[i], false);
-               Page            page = BufferGetPage(buffer);
 
-               if (!XLByteLE(lsn, PageGetLSN(page)))
+               if (BufferIsValid(buffer))
                {
-                       GinPageGetOpaque(page)->flags = GIN_DELETED;
+                       Page            page = BufferGetPage(buffer);
 
-                       PageSetLSN(page, lsn);
-                       PageSetTLI(page, ThisTimeLineID);
-                       MarkBufferDirty(buffer);
-               }
+                       if (!XLByteLE(lsn, PageGetLSN(page)))
+                       {
+                               GinPageGetOpaque(page)->flags = GIN_DELETED;
 
-               UnlockReleaseBuffer(buffer);
+                               PageSetLSN(page, lsn);
+                               PageSetTLI(page, ThisTimeLineID);
+                               MarkBufferDirty(buffer);
+                       }
+
+                       UnlockReleaseBuffer(buffer);
+               }
        }
        UnlockReleaseBuffer(metabuffer);
 }
@@ -755,6 +802,13 @@ ginContinueSplit(ginIncompleteSplit *split)
         */
        buffer = XLogReadBuffer(split->node, split->leftBlkno, false);
 
+       /*
+        * Failure should be impossible here, because we wrote the page earlier.
+        */
+       if (!BufferIsValid(buffer))
+               elog(PANIC, "ginContinueSplit: left block %u not found",
+                        split->leftBlkno);
+
        reln = CreateFakeRelcacheEntry(split->node);
 
        if (split->rootBlkno == GIN_ROOT_BLKNO)