]> granicus.if.org Git - postgresql/commitdiff
Fix assorted bugs in GIN's WAL replay logic.
authorTom Lane <tgl@sss.pgh.pa.us>
Mon, 11 Oct 2010 23:04:59 +0000 (19:04 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Mon, 11 Oct 2010 23:04:59 +0000 (19:04 -0400)
The original coding was quite sloppy about handling the case where
XLogReadBuffer fails (because the page has since been deleted).  This
would result in either "bad buffer id: 0" or an Assert failure during
replay, if indeed the page were no longer there.  In a couple of places
it also neglected to check whether the change had already been applied,
which would probably result in corrupted index contents.  I believe that
bug #5703 is an instance of the first problem.  These issues could show up
without replication, but only if you were unfortunate enough to crash
between modification of a GIN index and the next checkpoint.

Back-patch to 8.2, which is as far back as GIN has WAL support.

src/backend/access/gin/ginxlog.c

index 37d04cfb7e99433ee8b5f708c50b05842e7415bb..b7b89e4e49d943fd7387bfc07618a0442a391d32 100644 (file)
@@ -121,22 +121,49 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
        Buffer          buffer;
        Page            page;
 
+       /* first, forget any incomplete split this insertion completes */
+       if (data->isData)
+       {
+               Assert(data->isDelete == FALSE);
+               if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
+               {
+                       PostingItem *pitem;
+
+                       pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
+                       forgetIncompleteSplit(data->node,
+                                                                 PostingItemGetBlockNumber(pitem),
+                                                                 data->updateBlkno);
+               }
+       }
+       else
+       {
+               if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
+               {
+                       IndexTuple      itup;
+
+                       itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
+                       forgetIncompleteSplit(data->node,
+                                                                 GinItemPointerGetBlockNumber(&itup->t_tid),
+                                                                 data->updateBlkno);
+               }
+       }
+
        /* nothing else to do if page was backed up */
        if (record->xl_info & XLR_BKP_BLOCK_1)
                return;
 
        reln = XLogOpenRelation(data->node);
        buffer = XLogReadBuffer(reln, data->blkno, false);
-       Assert(BufferIsValid(buffer));
+       if (!BufferIsValid(buffer))
+               return;
        page = (Page) BufferGetPage(buffer);
 
-       if (data->isData)
+       if (!XLByteLE(lsn, PageGetLSN(page)))
        {
-               Assert(data->isDelete == FALSE);
-               Assert(GinPageIsData(page));
-
-               if (!XLByteLE(lsn, PageGetLSN(page)))
+               if (data->isData)
                {
+                       Assert(GinPageIsData(page));
+
                        if (data->isLeaf)
                        {
                                OffsetNumber i;
@@ -166,23 +193,12 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
                                GinDataPageAddItem(page, pitem, data->offset);
                        }
                }
-
-               if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
+               else
                {
-                       PostingItem *pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
-
-                       forgetIncompleteSplit(data->node, PostingItemGetBlockNumber(pitem), data->updateBlkno);
-               }
+                       IndexTuple      itup;
 
-       }
-       else
-       {
-               IndexTuple      itup;
+                       Assert(!GinPageIsData(page));
 
-               Assert(!GinPageIsData(page));
-
-               if (!XLByteLE(lsn, PageGetLSN(page)))
-               {
                        if (data->updateBlkno != InvalidBlockNumber)
                        {
                                /* update link to right page after split */
@@ -203,23 +219,15 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
 
                        if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), data->offset, false, false) == InvalidOffsetNumber)
                                elog(ERROR, "failed to add item to index page in %u/%u/%u",
-                                 data->node.spcNode, data->node.dbNode, data->node.relNode);
+                                        data->node.spcNode, data->node.dbNode, data->node.relNode);
                }
 
-               if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
-               {
-                       itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
-                       forgetIncompleteSplit(data->node, GinItemPointerGetBlockNumber(&itup->t_tid), data->updateBlkno);
-               }
-       }
-
-       if (!XLByteLE(lsn, PageGetLSN(page)))
-       {
                PageSetLSN(page, lsn);
                PageSetTLI(page, ThisTimeLineID);
 
                MarkBufferDirty(buffer);
        }
+
        UnlockReleaseBuffer(buffer);
 }
 
@@ -241,7 +249,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
        if (data->isData)
                flags |= GIN_DATA;
 
-       lbuffer = XLogReadBuffer(reln, data->lblkno, data->isRootSplit);
+       lbuffer = XLogReadBuffer(reln, data->lblkno, true);
        Assert(BufferIsValid(lbuffer));
        lpage = (Page) BufferGetPage(lbuffer);
        GinInitBuffer(lbuffer, flags);
@@ -318,7 +326,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
 
        if (data->isRootSplit)
        {
-               Buffer          rootBuf = XLogReadBuffer(reln, data->rootBlkno, false);
+               Buffer          rootBuf = XLogReadBuffer(reln, data->rootBlkno, true);
                Page            rootPage = BufferGetPage(rootBuf);
 
                GinInitBuffer(rootBuf, flags & ~GIN_LEAF);
@@ -355,46 +363,51 @@ ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
        Buffer          buffer;
        Page            page;
 
-       /* nothing else to do if page was backed up (and no info to do it with) */
+       /* nothing to do if page was backed up (and no info to do it with) */
        if (record->xl_info & XLR_BKP_BLOCK_1)
                return;
 
        reln = XLogOpenRelation(data->node);
        buffer = XLogReadBuffer(reln, data->blkno, false);
-       Assert(BufferIsValid(buffer));
+       if (!BufferIsValid(buffer))
+               return;
        page = (Page) BufferGetPage(buffer);
 
-       if (GinPageIsData(page))
-       {
-               memcpy(GinDataPageGetData(page), XLogRecGetData(record) + sizeof(ginxlogVacuumPage),
-                          GinSizeOfItem(page) *data->nitem);
-               GinPageGetOpaque(page)->maxoff = data->nitem;
-       }
-       else
+       if (!XLByteLE(lsn, PageGetLSN(page)))
        {
-               OffsetNumber i,
-                                  *tod;
-               IndexTuple      itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogVacuumPage));
+               if (GinPageIsData(page))
+               {
+                       memcpy(GinDataPageGetData(page), XLogRecGetData(record) + sizeof(ginxlogVacuumPage),
+                                  GinSizeOfItem(page) *data->nitem);
+                       GinPageGetOpaque(page)->maxoff = data->nitem;
+               }
+               else
+               {
+                       OffsetNumber i,
+                               *tod;
+                       IndexTuple      itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogVacuumPage));
 
-               tod = (OffsetNumber *) palloc(sizeof(OffsetNumber) * PageGetMaxOffsetNumber(page));
-               for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++)
-                       tod[i - 1] = i;
+                       tod = (OffsetNumber *) palloc(sizeof(OffsetNumber) * PageGetMaxOffsetNumber(page));
+                       for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++)
+                               tod[i - 1] = i;
 
-               PageIndexMultiDelete(page, tod, PageGetMaxOffsetNumber(page));
+                       PageIndexMultiDelete(page, tod, PageGetMaxOffsetNumber(page));
 
-               for (i = 0; i < data->nitem; i++)
-               {
-                       if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
-                               elog(ERROR, "failed to add item to index page in %u/%u/%u",
-                                 data->node.spcNode, data->node.dbNode, data->node.relNode);
-                       itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
+                       for (i = 0; i < data->nitem; i++)
+                       {
+                               if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
+                                       elog(ERROR, "failed to add item to index page in %u/%u/%u",
+                                                data->node.spcNode, data->node.dbNode, data->node.relNode);
+                               itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
+                       }
                }
-       }
 
-       PageSetLSN(page, lsn);
-       PageSetTLI(page, ThisTimeLineID);
+               PageSetLSN(page, lsn);
+               PageSetTLI(page, ThisTimeLineID);
+
+               MarkBufferDirty(buffer);
+       }
 
-       MarkBufferDirty(buffer);
        UnlockReleaseBuffer(buffer);
 }
 
@@ -411,38 +424,56 @@ ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
        if (!(record->xl_info & XLR_BKP_BLOCK_1))
        {
                buffer = XLogReadBuffer(reln, data->blkno, false);
-               page = BufferGetPage(buffer);
-               Assert(GinPageIsData(page));
-               GinPageGetOpaque(page)->flags = GIN_DELETED;
-               PageSetLSN(page, lsn);
-               PageSetTLI(page, ThisTimeLineID);
-               MarkBufferDirty(buffer);
-               UnlockReleaseBuffer(buffer);
+               if (BufferIsValid(buffer))
+               {
+                       page = BufferGetPage(buffer);
+                       if (!XLByteLE(lsn, PageGetLSN(page)))
+                       {
+                               Assert(GinPageIsData(page));
+                               GinPageGetOpaque(page)->flags = GIN_DELETED;
+                               PageSetLSN(page, lsn);
+                               PageSetTLI(page, ThisTimeLineID);
+                               MarkBufferDirty(buffer);
+                       }
+                       UnlockReleaseBuffer(buffer);
+               }
        }
 
        if (!(record->xl_info & XLR_BKP_BLOCK_2))
        {
                buffer = XLogReadBuffer(reln, data->parentBlkno, false);
-               page = BufferGetPage(buffer);
-               Assert(GinPageIsData(page));
-               Assert(!GinPageIsLeaf(page));
-               PageDeletePostingItem(page, data->parentOffset);
-               PageSetLSN(page, lsn);
-               PageSetTLI(page, ThisTimeLineID);
-               MarkBufferDirty(buffer);
-               UnlockReleaseBuffer(buffer);
+               if (BufferIsValid(buffer))
+               {
+                       page = BufferGetPage(buffer);
+                       if (!XLByteLE(lsn, PageGetLSN(page)))
+                       {
+                               Assert(GinPageIsData(page));
+                               Assert(!GinPageIsLeaf(page));
+                               PageDeletePostingItem(page, data->parentOffset);
+                               PageSetLSN(page, lsn);
+                               PageSetTLI(page, ThisTimeLineID);
+                               MarkBufferDirty(buffer);
+                       }
+                       UnlockReleaseBuffer(buffer);
+               }
        }
 
        if (!(record->xl_info & XLR_BKP_BLOCK_3) && data->leftBlkno != InvalidBlockNumber)
        {
                buffer = XLogReadBuffer(reln, data->leftBlkno, false);
-               page = BufferGetPage(buffer);
-               Assert(GinPageIsData(page));
-               GinPageGetOpaque(page)->rightlink = data->rightLink;
-               PageSetLSN(page, lsn);
-               PageSetTLI(page, ThisTimeLineID);
-               MarkBufferDirty(buffer);
-               UnlockReleaseBuffer(buffer);
+               if (BufferIsValid(buffer))
+               {
+                       page = BufferGetPage(buffer);
+                       if (!XLByteLE(lsn, PageGetLSN(page)))
+                       {
+                               Assert(GinPageIsData(page));
+                               GinPageGetOpaque(page)->rightlink = data->rightLink;
+                               PageSetLSN(page, lsn);
+                               PageSetTLI(page, ThisTimeLineID);
+                               MarkBufferDirty(buffer);
+                       }
+                       UnlockReleaseBuffer(buffer);
+               }
        }
 }
 
@@ -560,6 +591,13 @@ ginContinueSplit(ginIncompleteSplit *split)
 
        buffer = XLogReadBuffer(reln, split->leftBlkno, false);
 
+       /*
+        * Failure should be impossible here, because we wrote the page earlier.
+        */
+       if (!BufferIsValid(buffer))
+               elog(PANIC, "ginContinueSplit: left block %u not found",
+                        split->leftBlkno);
+
        if (split->rootBlkno == GIN_ROOT_BLKNO)
        {
                prepareEntryScan(&btree, reln, (Datum) 0, NULL);