]> granicus.if.org Git - postgresql/commitdiff
Avoid tricky race condition recording XLOG_HINT
authorSimon Riggs <simon@2ndQuadrant.com>
Mon, 8 Apr 2013 07:52:39 +0000 (08:52 +0100)
committerSimon Riggs <simon@2ndQuadrant.com>
Mon, 8 Apr 2013 07:52:39 +0000 (08:52 +0100)
We copy the buffer before inserting an XLOG_HINT to avoid WAL CRC errors
caused by concurrent hint writes to buffer while share locked. To make this work
we refactor RestoreBackupBlock() to allow an XLOG_HINT to avoid the normal
path for backup blocks, which assumes the underlying buffer is exclusive locked.
Resulting code completely changes layout of XLOG_HINT WAL records, but
this isn't even beta code, so this is a low impact change.
In passing, avoid taking WALInsertLock for full page writes on checksummed
hints, remove related cruft from XLogInsert() and improve xlog_desc record for
XLOG_HINT.

Andres Freund

Bug report by Fujii Masao, testing by Jeff Janes and Jaime Casanova,
review by Jeff Davis and Simon Riggs. Applied with changes from review
and some comment editing.

src/backend/access/rmgrdesc/xlogdesc.c
src/backend/access/transam/xlog.c
src/backend/storage/buffer/bufmgr.c
src/include/catalog/catversion.h

index 52cf75973573f08d75bec227aa1eb30cadccc262..4c68b6ae0a39095939c0359316e7d44aa916a1dd 100644 (file)
@@ -17,6 +17,7 @@
 #include "access/xlog.h"
 #include "access/xlog_internal.h"
 #include "catalog/pg_control.h"
+#include "common/relpath.h"
 #include "utils/guc.h"
 #include "utils/timestamp.h"
 
@@ -83,7 +84,10 @@ xlog_desc(StringInfo buf, uint8 xl_info, char *rec)
        }
        else if (info == XLOG_HINT)
        {
-               appendStringInfo(buf, "page hint");
+               BkpBlock *bkp = (BkpBlock *) rec;
+               appendStringInfo(buf, "page hint: %s block %u",
+                                                relpathperm(bkp->node, bkp->fork),
+                                                bkp->block);
        }
        else if (info == XLOG_BACKUP_END)
        {
index 3227d4c6006868087d6a6332a40954048833fb51..6736cfe749dbdee11cd9e4987bd67e5e1c56fd9b 100644 (file)
@@ -648,6 +648,8 @@ static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
 
 static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
                                XLogRecPtr *lsn, BkpBlock *bkpb);
+static Buffer RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb,
+                               char *blk, bool get_cleanup_lock, bool keep_buffer);
 static bool AdvanceXLInsertBuffer(bool new_segment);
 static bool XLogCheckpointNeeded(XLogSegNo new_segno);
 static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
@@ -731,7 +733,6 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
        bool            updrqst;
        bool            doPageWrites;
        bool            isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
-       bool            isHint = (rmid == RM_XLOG_ID && info == XLOG_HINT);
        uint8           info_orig = info;
        static XLogRecord *rechdr;
 
@@ -1001,18 +1002,6 @@ begin:;
                goto begin;
        }
 
-       /*
-        * If this is a hint record and we don't need a backup block then
-        * we have no more work to do and can exit quickly without inserting
-        * a WAL record at all. In that case return InvalidXLogRecPtr.
-        */
-       if (isHint && !(info & XLR_BKP_BLOCK_MASK))
-       {
-               LWLockRelease(WALInsertLock);
-               END_CRIT_SECTION();
-               return InvalidXLogRecPtr;
-       }
-
        /*
         * If the current page is completely full, the record goes to the next
         * page, right after the page header.
@@ -3158,8 +3147,6 @@ Buffer
 RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index,
                                   bool get_cleanup_lock, bool keep_buffer)
 {
-       Buffer          buffer;
-       Page            page;
        BkpBlock        bkpb;
        char       *blk;
        int                     i;
@@ -3177,42 +3164,8 @@ RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index,
                if (i == block_index)
                {
                        /* Found it, apply the update */
-                       buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block,
-                                                                                       RBM_ZERO);
-                       Assert(BufferIsValid(buffer));
-                       if (get_cleanup_lock)
-                               LockBufferForCleanup(buffer);
-                       else
-                               LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
-
-                       page = (Page) BufferGetPage(buffer);
-
-                       if (bkpb.hole_length == 0)
-                       {
-                               memcpy((char *) page, blk, BLCKSZ);
-                       }
-                       else
-                       {
-                               memcpy((char *) page, blk, bkpb.hole_offset);
-                               /* must zero-fill the hole */
-                               MemSet((char *) page + bkpb.hole_offset, 0, bkpb.hole_length);
-                               memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length),
-                                          blk + bkpb.hole_offset,
-                                          BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
-                       }
-
-                       /*
-                        * Any checksum set on this page will be invalid. We don't need
-                        * to reset it here since it will be set before being written.
-                        */
-
-                       PageSetLSN(page, lsn);
-                       MarkBufferDirty(buffer);
-
-                       if (!keep_buffer)
-                               UnlockReleaseBuffer(buffer);
-
-                       return buffer;
+                       return RestoreBackupBlockContents(lsn, bkpb, blk, get_cleanup_lock,
+                                                                                         keep_buffer);
                }
 
                blk += BLCKSZ - bkpb.hole_length;
@@ -3223,6 +3176,56 @@ RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index,
        return InvalidBuffer;           /* keep compiler quiet */
 }
 
+/*
+ * Workhorse for RestoreBackupBlock usable without an xlog record
+ *
+ * Restores a full-page image from BkpBlock and a data pointer.
+ */
+static Buffer
+RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb, char *blk,
+                                                  bool get_cleanup_lock, bool keep_buffer)
+{
+       Buffer          buffer;
+       Page            page;
+
+       buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block,
+                                                                       RBM_ZERO);
+       Assert(BufferIsValid(buffer));
+       if (get_cleanup_lock)
+               LockBufferForCleanup(buffer);
+       else
+               LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
+       page = (Page) BufferGetPage(buffer);
+
+       if (bkpb.hole_length == 0)
+       {
+               memcpy((char *) page, blk, BLCKSZ);
+       }
+       else
+       {
+               memcpy((char *) page, blk, bkpb.hole_offset);
+               /* must zero-fill the hole */
+               MemSet((char *) page + bkpb.hole_offset, 0, bkpb.hole_length);
+               memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length),
+                          blk + bkpb.hole_offset,
+                          BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
+       }
+
+       /*
+        * The checksum value on this page is currently invalid. We don't
+        * need to reset it here since it will be set before being written.
+        */
+
+       PageSetLSN(page, lsn);
+       MarkBufferDirty(buffer);
+
+       if (!keep_buffer)
+               UnlockReleaseBuffer(buffer);
+
+       return buffer;
+}
+
 /*
  * Attempt to read an XLOG record.
  *
@@ -7635,45 +7638,86 @@ XLogRestorePoint(const char *rpName)
  * Write a backup block if needed when we are setting a hint. Note that
  * this may be called for a variety of page types, not just heaps.
  *
- * Deciding the "if needed" part is delicate and requires us to either
- * grab WALInsertLock or check the info_lck spinlock. If we check the
- * spinlock and it says Yes then we will need to get WALInsertLock as well,
- * so the design choice here is to just go straight for the WALInsertLock
- * and trust that calls to this function are minimised elsewhere.
- *
  * Callable while holding just share lock on the buffer content.
  *
- * Possible that multiple concurrent backends could attempt to write
- * WAL records. In that case, more than one backup block may be recorded
- * though that isn't important to the outcome and the backup blocks are
- * likely to be identical anyway.
+ * We can't use the plain backup block mechanism since that relies on the
+ * Buffer being exclusively locked. Since some modifications (setting LSN, hint
+ * bits) are allowed in a sharelocked buffer that can lead to wal checksum
+ * failures. So instead we copy the page and insert the copied data as normal
+ * record data.
+ *
+ * We only need to do something if page has not yet been full page written in
+ * this checkpoint round. The LSN of the inserted wal record is returned if we
+ * had to write, InvalidXLogRecPtr otherwise.
+ *
+ * It is possible that multiple concurrent backends could attempt to write WAL
+ * records. In that case, multiple copies of the same block would be recorded
+ * in separate WAL records by different backends, though that is still OK from
+ * a correctness perspective.
+ *
+ * Note that this only works for buffers that fit the standard page model,
+ * i.e. those for which buffer_std == true
  */
-#define        XLOG_HINT_WATERMARK             13579
 XLogRecPtr
 XLogSaveBufferForHint(Buffer buffer)
 {
+       XLogRecPtr recptr = InvalidXLogRecPtr;
+       XLogRecPtr lsn;
+       XLogRecData rdata[2];
+       BkpBlock bkpb;
+
        /*
-        * Make an XLOG entry reporting the hint
+        * Ensure no checkpoint can change our view of RedoRecPtr.
         */
-       XLogRecData rdata[2];
-       int                     watermark = XLOG_HINT_WATERMARK;
+       Assert(MyPgXact->delayChkpt);
+
+       /*
+        * Update RedoRecPtr so XLogCheckBuffer can make the right decision
+        */
+       GetRedoRecPtr();
 
        /*
-        * Not allowed to have zero-length records, so use a small watermark
+        * Setup phony rdata element for use within XLogCheckBuffer only.
+        * We reuse and reset rdata for any actual WAL record insert.
         */
-       rdata[0].data = (char *) (&watermark);
-       rdata[0].len = sizeof(int);
-       rdata[0].buffer = InvalidBuffer;
-       rdata[0].buffer_std = false;
-       rdata[0].next = &(rdata[1]);
+       rdata[0].buffer = buffer;
+       rdata[0].buffer_std = true;
+
+       if (XLogCheckBuffer(rdata, true, &lsn, &bkpb))
+       {
+               char copied_buffer[BLCKSZ];
+               char *origdata = (char *) BufferGetBlock(buffer);
+
+               /*
+                * Copy buffer so we don't have to worry about concurrent hint bit or
+                * lsn updates. We assume pd_lower/upper cannot be changed without an
+                * exclusive lock, so the contents bkp are not racy.
+                */
+               memcpy(copied_buffer, origdata, bkpb.hole_offset);
+               memcpy(copied_buffer + bkpb.hole_offset,
+                               origdata + bkpb.hole_offset + bkpb.hole_length,
+                               BLCKSZ - bkpb.hole_offset - bkpb.hole_length);
+
+               /*
+                * Header for backup block.
+                */
+               rdata[0].data = (char *) &bkpb;
+               rdata[0].len = sizeof(BkpBlock);
+               rdata[0].buffer = InvalidBuffer;
+               rdata[0].next = &(rdata[1]);
 
-       rdata[1].data = NULL;
-       rdata[1].len = 0;
-       rdata[1].buffer = buffer;
-       rdata[1].buffer_std = true;
-       rdata[1].next = NULL;
+               /*
+                * Save copy of the buffer.
+                */
+               rdata[1].data = copied_buffer;
+               rdata[1].len = BLCKSZ - bkpb.hole_length;
+               rdata[1].buffer = InvalidBuffer;
+               rdata[1].next = NULL;
 
-       return XLogInsert(RM_XLOG_ID, XLOG_HINT, rdata);
+               recptr = XLogInsert(RM_XLOG_ID, XLOG_HINT, rdata);
+       }
+
+       return recptr;
 }
 
 /*
@@ -7842,8 +7886,8 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
 {
        uint8           info = record->xl_info & ~XLR_INFO_MASK;
 
-       /* Backup blocks are not used in most xlog records */
-       Assert(info == XLOG_HINT || !(record->xl_info & XLR_BKP_BLOCK_MASK));
+       /* Backup blocks are not used by XLOG rmgr */
+       Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
 
        if (info == XLOG_NEXTOID)
        {
@@ -8038,31 +8082,27 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
        }
        else if (info == XLOG_HINT)
        {
-#ifdef USE_ASSERT_CHECKING
-               int     *watermark = (int *) XLogRecGetData(record);
-#endif
-
-               /* Check the watermark is correct for the hint record */
-               Assert(*watermark == XLOG_HINT_WATERMARK);
-
-               /* Backup blocks must be present for smgr hint records */
-               Assert(record->xl_info & XLR_BKP_BLOCK_MASK);
+               char *data;
+               BkpBlock bkpb;
 
                /*
-                * Hint records have no information that needs to be replayed.
-                * The sole purpose of them is to ensure that a hint bit does
-                * not cause a checksum invalidation if a hint bit write should
-                * cause a torn page. So the body of the record is empty but
-                * there must be one backup block.
+                * Hint bit records contain a backup block stored "inline" in the normal
+                * data since the locking when writing hint records isn't sufficient to
+                * use the normal backup block mechanism, which assumes exclusive lock
+                * on the buffer supplied.
                 *
-                * Since the only change in the backup block is a hint bit,
-                * there is no confict with Hot Standby.
+                * Since the only change in these backup block are hint bits, there are
+                * no recovery conflicts generated.
                 *
                 * This also means there is no corresponding API call for this,
                 * so an smgr implementation has no need to implement anything.
                 * Which means nothing is needed in md.c etc
                 */
-               RestoreBackupBlock(lsn, record, 0, false, false);
+               data = XLogRecGetData(record);
+               memcpy(&bkpb, data, sizeof(BkpBlock));
+               data += sizeof(BkpBlock);
+
+               RestoreBackupBlockContents(lsn, bkpb, data, false, false);
        }
        else if (info == XLOG_BACKUP_END)
        {
index c2ef53f4617ec466d60e1d4033745de33021ebb3..1c414281ae50270a2c8ca2572b5af66f96d6549f 100644 (file)
@@ -2682,8 +2682,8 @@ MarkBufferDirtyHint(Buffer buffer)
                         * as long as we serialise it somehow we're OK. We choose to
                         * set LSN while holding the buffer header lock, which causes
                         * any reader of an LSN who holds only a share lock to also
-                        * obtain a buffer header lock before using PageGetLSN().
-                        * Fortunately, thats not too many places.
+                        * obtain a buffer header lock before using PageGetLSN(),
+                        * which is enforced in BufferGetLSNAtomic().
                         *
                         * If checksums are enabled, you might think we should reset the
                         * checksum here. That will happen when the page is written
index aa8b715a6519117428cebe49877395369dae6f10..39216c029fa5d90f892708b1237251027a475842 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     201303291
+#define CATALOG_VERSION_NO     201304071
 
 #endif