]> granicus.if.org Git - postgresql/commitdiff
Fix a couple of error-handling bugs in the xlogreader patch.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Thu, 17 Jan 2013 17:05:19 +0000 (19:05 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Thu, 17 Jan 2013 17:27:04 +0000 (19:27 +0200)
XLogReadRecord should reset its state on every error, to make sure it
re-reads the page on next call. It was inconsistent in that some errors did
that, but some did not.

In ReadRecord(), don't give up on an error if we're in standby mode. The
loop was set up to retry, but the checks within the loop broke out of the
loop on any error.

Andres Freund, with some tweaking by me.

src/backend/access/transam/xlog.c
src/backend/access/transam/xlogreader.c

index 70cfabc23678737eddd66b0da5311359d6414420..ac2b26b49822561fe10da71b6a2e0daf1a7e803d 100644 (file)
@@ -3180,7 +3180,8 @@ RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index,
  * try to read a record just after the last one previously read.
  *
  * If no valid record is available, returns NULL, or fails if emode is PANIC.
- * (emode must be either PANIC, LOG)
+ * (emode must be either PANIC, LOG). In standby mode, retries until a valid
+ * record is available.
  *
  * The record is copied into readRecordBuf, so that on successful return,
  * the returned record pointer always points there.
@@ -3209,12 +3210,6 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
                EndRecPtr = xlogreader->EndRecPtr;
                if (record == NULL)
                {
-                       /* not all failures fill errormsg; report those that do */
-                       if (errormsg && errormsg[0] != '\0')
-                               ereport(emode_for_corrupt_record(emode,
-                                                                                                RecPtr ? RecPtr : EndRecPtr),
-                                               (errmsg_internal("%s", errormsg) /* already translated */));
-
                        lastSourceFailed = true;
 
                        if (readFile >= 0)
@@ -3222,7 +3217,20 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
                                close(readFile);
                                readFile = -1;
                        }
-                       break;
+
+                       /*
+                        * We only end up here without a message when XLogPageRead() failed
+                        * - in that case we already logged something.
+                        * In StandbyMode that only happens if we have been triggered, so
+                        * we shouldn't loop anymore in that case.
+                        */
+                       if (errormsg)
+                               ereport(emode_for_corrupt_record(emode,
+                                                                                                RecPtr ? RecPtr : EndRecPtr),
+                                               (errmsg_internal("%s", errormsg) /* already translated */));
+
+                       /* Give up, or retry if we're in standby mode. */
+                       continue;
                }
 
                /*
@@ -3234,6 +3242,8 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
                        XLogSegNo segno;
                        int32 offset;
 
+                       lastSourceFailed = true;
+
                        XLByteToSeg(xlogreader->latestPagePtr, segno);
                        offset = xlogreader->latestPagePtr % XLogSegSize;
                        XLogFileName(fname, xlogreader->readPageTLI, segno);
@@ -3243,9 +3253,10 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
                                                        xlogreader->latestPageTLI,
                                                        fname,
                                                        offset)));
-                       return false;
+                       record = NULL;
+                       continue;
                }
-       } while (StandbyMode && record == NULL);
+       } while (StandbyMode && record == NULL && !CheckForStandbyTrigger());
 
        return record;
 }
index ff871a3412f7eced59022f83b942cd54687499f8..75164f686e54a3b9fbdfff08c2f0d4562aeed5d9 100644 (file)
@@ -222,11 +222,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
        readOff = ReadPageInternal(state, targetPagePtr, SizeOfXLogRecord);
 
        if (readOff < 0)
-       {
-               if (state->errormsg_buf[0] != '\0')
-                       *errormsg = state->errormsg_buf;
-               return NULL;
-       }
+               goto err;
 
        /*
         * ReadPageInternal always returns at least the page header, so we can
@@ -246,8 +242,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
        {
                report_invalid_record(state, "invalid record offset at %X/%X",
                                                          (uint32) (RecPtr >> 32), (uint32) RecPtr);
-               *errormsg = state->errormsg_buf;
-               return NULL;
+               goto err;
        }
 
        if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
@@ -255,8 +250,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
        {
                report_invalid_record(state, "contrecord is requested by %X/%X",
                                                          (uint32) (RecPtr >> 32), (uint32) RecPtr);
-               *errormsg = state->errormsg_buf;
-               return NULL;
+               goto err;
        }
 
        /* ReadPageInternal has verified the page header */
@@ -270,11 +264,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
                                                           targetPagePtr,
                                                  Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
        if (readOff < 0)
-       {
-               if (state->errormsg_buf[0] != '\0')
-                       *errormsg = state->errormsg_buf;
-               return NULL;
-       }
+               goto err;
 
        /*
         * Read the record length.
@@ -300,11 +290,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
        {
                if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record,
                                                                   randAccess))
-               {
-                       if (state->errormsg_buf[0] != '\0')
-                               *errormsg = state->errormsg_buf;
-                       return NULL;
-               }
+                       goto err;
                gotheader = true;
        }
        else
@@ -314,8 +300,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
                {
                        report_invalid_record(state, "invalid record length at %X/%X",
                                                                  (uint32) (RecPtr >> 32), (uint32) RecPtr);
-                       *errormsg = state->errormsg_buf;
-                       return NULL;
+                       goto err;
                }
                gotheader = false;
        }
@@ -330,8 +315,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
                report_invalid_record(state, "record length %u at %X/%X too long",
                                                          total_len,
                                                          (uint32) (RecPtr >> 32), (uint32) RecPtr);
-               *errormsg = state->errormsg_buf;
-               return NULL;
+               goto err;
        }
 
        len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;