From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Mon, 11 Oct 2010 23:04:37 +0000 (-0400)
Subject: Fix assorted bugs in GIN's WAL replay logic.
X-Git-Tag: REL9_1_ALPHA2~101
X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=4016bdef8aded77b4903c457050622a5a1815c16;p=postgresql

Fix assorted bugs in GIN's WAL replay logic.

The original coding was quite sloppy about handling the case where
XLogReadBuffer fails (because the page has since been deleted).  This
would result in either "bad buffer id: 0" or an Assert failure during
replay, if indeed the page were no longer there.  In a couple of places
it also neglected to check whether the change had already been applied,
which would probably result in corrupted index contents.  I believe that
bug #5703 is an instance of the first problem.  These issues could show up
without replication, but only if you were unfortunate enough to crash
between modification of a GIN index and the next checkpoint.

Back-patch to 8.2, which is as far back as GIN has WAL support.
---

diff --git a/src/backend/access/gin/ginxlog.c b/src/backend/access/gin/ginxlog.c
index 7a581334f1..75997d9534 100644
--- a/src/backend/access/gin/ginxlog.c
+++ b/src/backend/access/gin/ginxlog.c
@@ -77,11 +77,13 @@ ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
 
 	MetaBuffer = XLogReadBuffer(*node, GIN_METAPAGE_BLKNO, true);
 	Assert(BufferIsValid(MetaBuffer));
+	page = (Page) BufferGetPage(MetaBuffer);
+
 	GinInitMetabuffer(MetaBuffer);
 
-	page = (Page) BufferGetPage(MetaBuffer);
 	PageSetLSN(page, lsn);
 	PageSetTLI(page, ThisTimeLineID);
+	MarkBufferDirty(MetaBuffer);
 
 	RootBuffer = XLogReadBuffer(*node, GIN_ROOT_BLKNO, true);
 	Assert(BufferIsValid(RootBuffer));
@@ -91,11 +93,10 @@ ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
 
 	PageSetLSN(page, lsn);
 	PageSetTLI(page, ThisTimeLineID);
-
-	MarkBufferDirty(MetaBuffer);
-	UnlockReleaseBuffer(MetaBuffer);
 	MarkBufferDirty(RootBuffer);
+
 	UnlockReleaseBuffer(RootBuffer);
+	UnlockReleaseBuffer(MetaBuffer);
 }
 
 static void
@@ -128,21 +129,49 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
 	Buffer		buffer;
 	Page		page;
 
+	/* first, forget any incomplete split this insertion completes */
+	if (data->isData)
+	{
+		Assert(data->isDelete == FALSE);
+		if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
+		{
+			PostingItem *pitem;
+
+			pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
+			forgetIncompleteSplit(data->node,
+								  PostingItemGetBlockNumber(pitem),
+								  data->updateBlkno);
+		}
+
+	}
+	else
+	{
+		if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
+		{
+			IndexTuple	itup;
+
+			itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
+			forgetIncompleteSplit(data->node,
+								  GinItemPointerGetBlockNumber(&itup->t_tid),
+								  data->updateBlkno);
+		}
+	}
+
 	/* nothing else to do if page was backed up */
 	if (record->xl_info & XLR_BKP_BLOCK_1)
 		return;
 
 	buffer = XLogReadBuffer(data->node, data->blkno, false);
-	Assert(BufferIsValid(buffer));
+	if (!BufferIsValid(buffer))
+		return;					/* page was deleted, nothing to do */
 	page = (Page) BufferGetPage(buffer);
 
-	if (data->isData)
+	if (!XLByteLE(lsn, PageGetLSN(page)))
 	{
-		Assert(data->isDelete == FALSE);
-		Assert(GinPageIsData(page));
-
-		if (!XLByteLE(lsn, PageGetLSN(page)))
+		if (data->isData)
 		{
+			Assert(GinPageIsData(page));
+
 			if (data->isLeaf)
 			{
 				OffsetNumber i;
@@ -172,23 +201,12 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
 				GinDataPageAddItem(page, pitem, data->offset);
 			}
 		}
-
-		if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
+		else
 		{
-			PostingItem *pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
+			IndexTuple	itup;
 
-			forgetIncompleteSplit(data->node, PostingItemGetBlockNumber(pitem), data->updateBlkno);
-		}
+			Assert(!GinPageIsData(page));
 
-	}
-	else
-	{
-		IndexTuple	itup;
-
-		Assert(!GinPageIsData(page));
-
-		if (!XLByteLE(lsn, PageGetLSN(page)))
-		{
 			if (data->updateBlkno != InvalidBlockNumber)
 			{
 				/* update link to right page after split */
@@ -212,20 +230,12 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
 				  data->node.spcNode, data->node.dbNode, data->node.relNode);
 		}
 
-		if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
-		{
-			itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
-			forgetIncompleteSplit(data->node, GinItemPointerGetBlockNumber(&itup->t_tid), data->updateBlkno);
-		}
-	}
-
-	if (!XLByteLE(lsn, PageGetLSN(page)))
-	{
 		PageSetLSN(page, lsn);
 		PageSetTLI(page, ThisTimeLineID);
 
 		MarkBufferDirty(buffer);
 	}
+
 	UnlockReleaseBuffer(buffer);
 }
 
@@ -244,7 +254,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
 	if (data->isData)
 		flags |= GIN_DATA;
 
-	lbuffer = XLogReadBuffer(data->node, data->lblkno, data->isRootSplit);
+	lbuffer = XLogReadBuffer(data->node, data->lblkno, true);
 	Assert(BufferIsValid(lbuffer));
 	lpage = (Page) BufferGetPage(lbuffer);
 	GinInitBuffer(lbuffer, flags);
@@ -321,7 +331,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
 
 	if (data->isRootSplit)
 	{
-		Buffer		rootBuf = XLogReadBuffer(data->node, data->rootBlkno, false);
+		Buffer		rootBuf = XLogReadBuffer(data->node, data->rootBlkno, true);
 		Page		rootPage = BufferGetPage(rootBuf);
 
 		GinInitBuffer(rootBuf, flags & ~GIN_LEAF);
@@ -357,45 +367,49 @@ ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
 	Buffer		buffer;
 	Page		page;
 
-	/* nothing else to do if page was backed up (and no info to do it with) */
+	/* nothing to do if page was backed up (and no info to do it with) */
 	if (record->xl_info & XLR_BKP_BLOCK_1)
 		return;
 
 	buffer = XLogReadBuffer(data->node, data->blkno, false);
-	Assert(BufferIsValid(buffer));
+	if (!BufferIsValid(buffer))
+		return;
 	page = (Page) BufferGetPage(buffer);
 
-	if (GinPageIsData(page))
-	{
-		memcpy(GinDataPageGetData(page), XLogRecGetData(record) + sizeof(ginxlogVacuumPage),
-			   GinSizeOfItem(page) *data->nitem);
-		GinPageGetOpaque(page)->maxoff = data->nitem;
-	}
-	else
+	if (!XLByteLE(lsn, PageGetLSN(page)))
 	{
-		OffsetNumber i,
-				   *tod;
-		IndexTuple	itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogVacuumPage));
+		if (GinPageIsData(page))
+		{
+			memcpy(GinDataPageGetData(page), XLogRecGetData(record) + sizeof(ginxlogVacuumPage),
+				   GinSizeOfItem(page) *data->nitem);
+			GinPageGetOpaque(page)->maxoff = data->nitem;
+		}
+		else
+		{
+			OffsetNumber i,
+				*tod;
+			IndexTuple	itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogVacuumPage));
 
-		tod = (OffsetNumber *) palloc(sizeof(OffsetNumber) * PageGetMaxOffsetNumber(page));
-		for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++)
-			tod[i - 1] = i;
+			tod = (OffsetNumber *) palloc(sizeof(OffsetNumber) * PageGetMaxOffsetNumber(page));
+			for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++)
+				tod[i - 1] = i;
 
-		PageIndexMultiDelete(page, tod, PageGetMaxOffsetNumber(page));
+			PageIndexMultiDelete(page, tod, PageGetMaxOffsetNumber(page));
 
-		for (i = 0; i < data->nitem; i++)
-		{
-			if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
-				elog(ERROR, "failed to add item to index page in %u/%u/%u",
-				  data->node.spcNode, data->node.dbNode, data->node.relNode);
-			itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
+			for (i = 0; i < data->nitem; i++)
+			{
+				if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
+					elog(ERROR, "failed to add item to index page in %u/%u/%u",
+						 data->node.spcNode, data->node.dbNode, data->node.relNode);
+				itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
+			}
 		}
-	}
 
-	PageSetLSN(page, lsn);
-	PageSetTLI(page, ThisTimeLineID);
+		PageSetLSN(page, lsn);
+		PageSetTLI(page, ThisTimeLineID);
+		MarkBufferDirty(buffer);
+	}
 
-	MarkBufferDirty(buffer);
 	UnlockReleaseBuffer(buffer);
 }
 
@@ -409,38 +423,56 @@ ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
 	if (!(record->xl_info & XLR_BKP_BLOCK_1))
 	{
 		buffer = XLogReadBuffer(data->node, data->blkno, false);
-		page = BufferGetPage(buffer);
-		Assert(GinPageIsData(page));
-		GinPageGetOpaque(page)->flags = GIN_DELETED;
-		PageSetLSN(page, lsn);
-		PageSetTLI(page, ThisTimeLineID);
-		MarkBufferDirty(buffer);
-		UnlockReleaseBuffer(buffer);
+		if (BufferIsValid(buffer))
+		{
+			page = BufferGetPage(buffer);
+			if (!XLByteLE(lsn, PageGetLSN(page)))
+			{
+				Assert(GinPageIsData(page));
+				GinPageGetOpaque(page)->flags = GIN_DELETED;
+				PageSetLSN(page, lsn);
+				PageSetTLI(page, ThisTimeLineID);
+				MarkBufferDirty(buffer);
+			}
+			UnlockReleaseBuffer(buffer);
+		}
 	}
 
 	if (!(record->xl_info & XLR_BKP_BLOCK_2))
 	{
 		buffer = XLogReadBuffer(data->node, data->parentBlkno, false);
-		page = BufferGetPage(buffer);
-		Assert(GinPageIsData(page));
-		Assert(!GinPageIsLeaf(page));
-		PageDeletePostingItem(page, data->parentOffset);
-		PageSetLSN(page, lsn);
-		PageSetTLI(page, ThisTimeLineID);
-		MarkBufferDirty(buffer);
-		UnlockReleaseBuffer(buffer);
+		if (BufferIsValid(buffer))
+		{
+			page = BufferGetPage(buffer);
+			if (!XLByteLE(lsn, PageGetLSN(page)))
+			{
+				Assert(GinPageIsData(page));
+				Assert(!GinPageIsLeaf(page));
+				PageDeletePostingItem(page, data->parentOffset);
+				PageSetLSN(page, lsn);
+				PageSetTLI(page, ThisTimeLineID);
+				MarkBufferDirty(buffer);
+			}
+			UnlockReleaseBuffer(buffer);
+		}
 	}
 
 	if (!(record->xl_info & XLR_BKP_BLOCK_3) && data->leftBlkno != InvalidBlockNumber)
 	{
 		buffer = XLogReadBuffer(data->node, data->leftBlkno, false);
-		page = BufferGetPage(buffer);
-		Assert(GinPageIsData(page));
-		GinPageGetOpaque(page)->rightlink = data->rightLink;
-		PageSetLSN(page, lsn);
-		PageSetTLI(page, ThisTimeLineID);
-		MarkBufferDirty(buffer);
-		UnlockReleaseBuffer(buffer);
+		if (BufferIsValid(buffer))
+		{
+			page = BufferGetPage(buffer);
+			if (!XLByteLE(lsn, PageGetLSN(page)))
+			{
+				Assert(GinPageIsData(page));
+				GinPageGetOpaque(page)->rightlink = data->rightLink;
+				PageSetLSN(page, lsn);
+				PageSetTLI(page, ThisTimeLineID);
+				MarkBufferDirty(buffer);
+			}
+			UnlockReleaseBuffer(buffer);
+		}
 	}
 }
 
@@ -450,8 +482,11 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)
 	ginxlogUpdateMeta *data = (ginxlogUpdateMeta *) XLogRecGetData(record);
 	Buffer		metabuffer;
 	Page		metapage;
+	Buffer		buffer;
 
 	metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false);
+	if (!BufferIsValid(metabuffer))
+		elog(PANIC, "GIN metapage disappeared");
 	metapage = BufferGetPage(metabuffer);
 
 	if (!XLByteLE(lsn, PageGetLSN(metapage)))
@@ -469,40 +504,43 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)
 		 */
 		if (!(record->xl_info & XLR_BKP_BLOCK_1))
 		{
-			Buffer		buffer = XLogReadBuffer(data->node, data->metadata.tail, false);
-			Page		page = BufferGetPage(buffer);
-
-			if (!XLByteLE(lsn, PageGetLSN(page)))
+			buffer = XLogReadBuffer(data->node, data->metadata.tail, false);
+			if (BufferIsValid(buffer))
 			{
-				OffsetNumber l,
-							off = (PageIsEmpty(page)) ? FirstOffsetNumber :
-				OffsetNumberNext(PageGetMaxOffsetNumber(page));
-				int			i,
-							tupsize;
-				IndexTuple	tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogUpdateMeta));
-
-				for (i = 0; i < data->ntuples; i++)
+				Page		page = BufferGetPage(buffer);
+
+				if (!XLByteLE(lsn, PageGetLSN(page)))
 				{
-					tupsize = IndexTupleSize(tuples);
+					OffsetNumber l,
+						off = (PageIsEmpty(page)) ? FirstOffsetNumber :
+						OffsetNumberNext(PageGetMaxOffsetNumber(page));
+					int			i,
+						tupsize;
+					IndexTuple	tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogUpdateMeta));
 
-					l = PageAddItem(page, (Item) tuples, tupsize, off, false, false);
+					for (i = 0; i < data->ntuples; i++)
+					{
+						tupsize = IndexTupleSize(tuples);
 
-					if (l == InvalidOffsetNumber)
-						elog(ERROR, "failed to add item to index page");
+						l = PageAddItem(page, (Item) tuples, tupsize, off, false, false);
 
-					tuples = (IndexTuple) (((char *) tuples) + tupsize);
-				}
+						if (l == InvalidOffsetNumber)
+							elog(ERROR, "failed to add item to index page");
 
-				/*
-				 * Increase counter of heap tuples
-				 */
-				GinPageGetOpaque(page)->maxoff++;
+						tuples = (IndexTuple) (((char *) tuples) + tupsize);
+					}
 
-				PageSetLSN(page, lsn);
-				PageSetTLI(page, ThisTimeLineID);
-				MarkBufferDirty(buffer);
+					/*
+					 * Increase counter of heap tuples
+					 */
+					GinPageGetOpaque(page)->maxoff++;
+
+					PageSetLSN(page, lsn);
+					PageSetTLI(page, ThisTimeLineID);
+					MarkBufferDirty(buffer);
+				}
+				UnlockReleaseBuffer(buffer);
 			}
-			UnlockReleaseBuffer(buffer);
 		}
 	}
 	else if (data->prevTail != InvalidBlockNumber)
@@ -510,19 +548,21 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)
 		/*
 		 * New tail
 		 */
-
-		Buffer		buffer = XLogReadBuffer(data->node, data->prevTail, false);
-		Page		page = BufferGetPage(buffer);
-
-		if (!XLByteLE(lsn, PageGetLSN(page)))
+		buffer = XLogReadBuffer(data->node, data->prevTail, false);
+		if (BufferIsValid(buffer))
 		{
-			GinPageGetOpaque(page)->rightlink = data->newRightlink;
+			Page		page = BufferGetPage(buffer);
 
-			PageSetLSN(page, lsn);
-			PageSetTLI(page, ThisTimeLineID);
-			MarkBufferDirty(buffer);
+			if (!XLByteLE(lsn, PageGetLSN(page)))
+			{
+				GinPageGetOpaque(page)->rightlink = data->newRightlink;
+
+				PageSetLSN(page, lsn);
+				PageSetTLI(page, ThisTimeLineID);
+				MarkBufferDirty(buffer);
+			}
+			UnlockReleaseBuffer(buffer);
 		}
-		UnlockReleaseBuffer(buffer);
 	}
 
 	UnlockReleaseBuffer(metabuffer);
@@ -544,6 +584,7 @@ ginRedoInsertListPage(XLogRecPtr lsn, XLogRecord *record)
 		return;
 
 	buffer = XLogReadBuffer(data->node, data->blkno, true);
+	Assert(BufferIsValid(buffer));
 	page = BufferGetPage(buffer);
 
 	GinInitBuffer(buffer, GIN_LIST);
@@ -587,6 +628,8 @@ ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record)
 	int			i;
 
 	metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false);
+	if (!BufferIsValid(metabuffer))
+		elog(PANIC, "GIN metapage disappeared");
 	metapage = BufferGetPage(metabuffer);
 
 	if (!XLByteLE(lsn, PageGetLSN(metapage)))
@@ -600,18 +643,22 @@ ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record)
 	for (i = 0; i < data->ndeleted; i++)
 	{
 		Buffer		buffer = XLogReadBuffer(data->node, data->toDelete[i], false);
-		Page		page = BufferGetPage(buffer);
 
-		if (!XLByteLE(lsn, PageGetLSN(page)))
+		if (BufferIsValid(buffer))
 		{
-			GinPageGetOpaque(page)->flags = GIN_DELETED;
+			Page		page = BufferGetPage(buffer);
 
-			PageSetLSN(page, lsn);
-			PageSetTLI(page, ThisTimeLineID);
-			MarkBufferDirty(buffer);
-		}
+			if (!XLByteLE(lsn, PageGetLSN(page)))
+			{
+				GinPageGetOpaque(page)->flags = GIN_DELETED;
 
-		UnlockReleaseBuffer(buffer);
+				PageSetLSN(page, lsn);
+				PageSetTLI(page, ThisTimeLineID);
+				MarkBufferDirty(buffer);
+			}
+
+			UnlockReleaseBuffer(buffer);
+		}
 	}
 	UnlockReleaseBuffer(metabuffer);
 }
@@ -755,6 +802,13 @@ ginContinueSplit(ginIncompleteSplit *split)
 	 */
 	buffer = XLogReadBuffer(split->node, split->leftBlkno, false);
 
+	/*
+	 * Failure should be impossible here, because we wrote the page earlier.
+	 */
+	if (!BufferIsValid(buffer))
+		elog(PANIC, "ginContinueSplit: left block %u not found",
+			 split->leftBlkno);
+
 	reln = CreateFakeRelcacheEntry(split->node);
 
 	if (split->rootBlkno == GIN_ROOT_BLKNO)