From e015c3e51f76a05cc026c8323c51a373172adaa3 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Mon, 20 Jul 2015 16:02:28 +0300
Subject: [PATCH] Sanity-check that a page zeroed by redo routine is marked
 with WILL_INIT.

There was already a sanity-check in the other direction: if a page was
marked with WILL_INIT, it had to be initialized by the redo routine. It's
not strictly necessary for correctness that a page is marked with WILL_INIT
if it's going to be initialized at redo, but it's a missed optimization if
nothing else.

Fix a few instances of this issue in SP-GiST, where a block in WAL record
was not marked with WILL_INIT, but was in fact always initialized at redo.
We were creating a full-page image of the page unnecessarily in those
cases.

Backpatch to 9.5, where the new WILL_INIT flag was added.
---
 src/backend/access/spgist/spgdoinsert.c | 31 ++++++++++++++++++++-----
 src/backend/access/transam/xlogutils.c  | 19 ++++++++++-----
 2 files changed, 38 insertions(+), 12 deletions(-)

diff --git a/src/backend/access/spgist/spgdoinsert.c b/src/backend/access/spgist/spgdoinsert.c
index b3fda13fa0..db5d962b88 100644
--- a/src/backend/access/spgist/spgdoinsert.c
+++ b/src/backend/access/spgist/spgdoinsert.c
@@ -291,12 +291,16 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
 	if (RelationNeedsWAL(index))
 	{
 		XLogRecPtr	recptr;
+		int			flags;
 
 		XLogBeginInsert();
 		XLogRegisterData((char *) &xlrec, sizeof(xlrec));
 		XLogRegisterData((char *) leafTuple, leafTuple->size);
 
-		XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
+		flags = REGBUF_STANDARD;
+		if (xlrec.newPage)
+			flags |= REGBUF_WILL_INIT;
+		XLogRegisterBuffer(0, current->buffer, flags);
 		if (xlrec.offnumParent != InvalidOffsetNumber)
 			XLogRegisterBuffer(1, parent->buffer, REGBUF_STANDARD);
 
@@ -1348,12 +1352,16 @@ doPickSplit(Relation index, SpGistState *state,
 		XLogRegisterData((char *) innerTuple, innerTuple->size);
 		XLogRegisterData(leafdata, leafptr - leafdata);
 
-		flags = REGBUF_STANDARD;
-		if (xlrec.initSrc)
-			flags |= REGBUF_WILL_INIT;
+		/* Old leaf page */
 		if (BufferIsValid(saveCurrent.buffer))
+		{
+			flags = REGBUF_STANDARD;
+			if (xlrec.initSrc)
+				flags |= REGBUF_WILL_INIT;
 			XLogRegisterBuffer(0, saveCurrent.buffer, flags);
+		}
 
+		/* New leaf page */
 		if (BufferIsValid(newLeafBuffer))
 		{
 			flags = REGBUF_STANDARD;
@@ -1361,7 +1369,14 @@ doPickSplit(Relation index, SpGistState *state,
 				flags |= REGBUF_WILL_INIT;
 			XLogRegisterBuffer(1, newLeafBuffer, flags);
 		}
-		XLogRegisterBuffer(2, current->buffer, REGBUF_STANDARD);
+
+		/* Inner page */
+		flags = REGBUF_STANDARD;
+		if (xlrec.initInner)
+			flags |= REGBUF_WILL_INIT;
+		XLogRegisterBuffer(2, current->buffer, flags);
+
+		/* Parent page, if different from inner page */
 		if (parent->buffer != InvalidBuffer)
 		{
 			if (parent->buffer != current->buffer)
@@ -1631,13 +1646,17 @@ spgAddNodeAction(Relation index, SpGistState *state,
 		if (RelationNeedsWAL(index))
 		{
 			XLogRecPtr	recptr;
+			int			flags;
 
 			XLogBeginInsert();
 
 			/* orig page */
 			XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD);
 			/* new page */
-			XLogRegisterBuffer(1, current->buffer, REGBUF_STANDARD);
+			flags = REGBUF_STANDARD;
+			if (xlrec.newPage)
+				flags |= REGBUF_WILL_INIT;
+			XLogRegisterBuffer(1, current->buffer, flags);
 			/* parent page (if different from orig and new) */
 			if (xlrec.parentBlk == 2)
 				XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index fa98b8294e..a5003c3b92 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -328,6 +328,8 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
 	ForkNumber	forknum;
 	BlockNumber blkno;
 	Page		page;
+	bool		zeromode;
+	bool		willinit;
 
 	if (!XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno))
 	{
@@ -335,6 +337,17 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
 		elog(PANIC, "failed to locate backup block with ID %d", block_id);
 	}
 
+	/*
+	 * Make sure that if the block is marked with WILL_INIT, the caller is
+	 * going to initialize it. And vice versa.
+	 */
+	zeromode = (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK);
+	willinit = (record->blocks[block_id].flags & BKPBLOCK_WILL_INIT) != 0;
+	if (willinit && !zeromode)
+		elog(PANIC, "block with WILL_INIT flag in WAL record must be zeroed by redo routine");
+	if (!willinit && zeromode)
+		elog(PANIC, "block to be initialized in redo routine must be marked with WILL_INIT flag in the WAL record");
+
 	/* If it's a full-page image, restore it. */
 	if (XLogRecHasBlockImage(record, block_id))
 	{
@@ -359,12 +372,6 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
 	}
 	else
 	{
-		if ((record->blocks[block_id].flags & BKPBLOCK_WILL_INIT) != 0 &&
-			mode != RBM_ZERO_AND_LOCK && mode != RBM_ZERO_AND_CLEANUP_LOCK)
-		{
-			elog(PANIC, "block with WILL_INIT flag in WAL record must be zeroed by redo routine");
-		}
-
 		*buf = XLogReadBufferExtended(rnode, forknum, blkno, mode);
 		if (BufferIsValid(*buf))
 		{
-- 
2.40.0