]> granicus.if.org Git - postgresql/commitdiff
Refactor the code implementing standby-mode logic.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Mon, 3 Dec 2012 10:32:44 +0000 (12:32 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Mon, 3 Dec 2012 10:32:44 +0000 (12:32 +0200)
It is now easier to see that it's a state machine, making the code easier
to understand overall.

src/backend/access/transam/xlog.c

index 9208bc21d462d51bd975b560a28e27f467fab39e..c8ac97fbf7fac6cddfd7019e19d252da285c501e 100644 (file)
@@ -506,12 +506,18 @@ static XLogwrtResult LogwrtResult = {0, 0};
 
 /*
  * Codes indicating where we got a WAL file from during recovery, or where
- * to attempt to get one.  These are chosen so that they can be OR'd together
- * in a bitmask state variable.
+ * to attempt to get one.
  */
-#define XLOG_FROM_ARCHIVE              (1<<0)  /* Restored using restore_command */
-#define XLOG_FROM_PG_XLOG              (1<<1)  /* Existing file in pg_xlog */
-#define XLOG_FROM_STREAM               (1<<2)  /* Streamed from master */
+typedef enum
+{
+       XLOG_FROM_ANY = 0,              /* request to read WAL from any source */
+       XLOG_FROM_ARCHIVE,              /* restored using restore_command */
+       XLOG_FROM_PG_XLOG,              /* existing file in pg_xlog */
+       XLOG_FROM_STREAM,               /* streamed from master */
+} XLogSource;
+
+/* human-readable names for XLogSources, for debugging output */
+static const char *xlogSourceNames[] = { "any", "archive", "pg_xlog", "stream" };
 
 /*
  * openLogFile is -1 or a kernel FD for an open log file segment.
@@ -536,22 +542,28 @@ static XLogSegNo readSegNo = 0;
 static uint32 readOff = 0;
 static uint32 readLen = 0;
 static bool    readFileHeaderValidated = false;
-static int     readSource = 0;         /* XLOG_FROM_* code */
+static XLogSource readSource = 0;              /* XLOG_FROM_* code */
 
 /*
- * Keeps track of which sources we've tried to read the current WAL
- * record from and failed.
+ * Keeps track of which source we're currently reading from. This is
+ * different from readSource in that this is always set, even when we don't
+ * currently have a WAL file open. If lastSourceFailed is set, our last
+ * attempt to read from currentSource failed, and we should try another source
+ * next.
  */
-static int     failedSources = 0;      /* OR of XLOG_FROM_* codes */
+static XLogSource currentSource = 0;   /* XLOG_FROM_* code */
+static bool    lastSourceFailed = false;
 
 /*
  * These variables track when we last obtained some WAL data to process,
  * and where we got it from.  (XLogReceiptSource is initially the same as
  * readSource, but readSource gets reset to zero when we don't have data
- * to process right now.)
+ * to process right now.  It is also different from currentSource, which
+ * also changes when we try to read from a source and fail, while
+ * XLogReceiptSource tracks where we last successfully read some WAL.)
  */
 static TimestampTz XLogReceiptTime = 0;
-static int     XLogReceiptSource = 0;          /* XLOG_FROM_* code */
+static XLogSource XLogReceiptSource = 0;       /* XLOG_FROM_* code */
 
 /* Buffer for currently read page (XLOG_BLCKSZ bytes) */
 static char *readBuf = NULL;
@@ -605,7 +617,7 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
                                           bool use_lock);
 static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
                         int source, bool notexistOk);
-static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources);
+static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source);
 static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
                         bool randAccess);
 static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
@@ -2551,7 +2563,7 @@ XLogFileOpen(XLogSegNo segno)
 /*
  * Open a logfile segment for reading (during recovery).
  *
- * If source = XLOG_FROM_ARCHIVE, the segment is retrieved from archive.
+ * If source == XLOG_FROM_ARCHIVE, the segment is retrieved from archive.
  * Otherwise, it's assumed to be already available in pg_xlog.
  */
 static int
@@ -2697,7 +2709,7 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
  * This version searches for the segment with any TLI listed in expectedTLIs.
  */
 static int
-XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources)
+XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source)
 {
        char            path[MAXPGPATH];
        ListCell   *cell;
@@ -2720,7 +2732,7 @@ XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources)
                if (tli < curFileTLI)
                        break;                          /* don't bother looking at too-old TLIs */
 
-               if (sources & XLOG_FROM_ARCHIVE)
+               if (source == XLOG_FROM_ANY || source == XLOG_FROM_ARCHIVE)
                {
                        fd = XLogFileRead(segno, emode, tli, XLOG_FROM_ARCHIVE, true);
                        if (fd != -1)
@@ -2730,7 +2742,7 @@ XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources)
                        }
                }
 
-               if (sources & XLOG_FROM_PG_XLOG)
+               if (source == XLOG_FROM_ANY || source == XLOG_FROM_PG_XLOG)
                {
                        fd = XLogFileRead(segno, emode, tli, XLOG_FROM_PG_XLOG, true);
                        if (fd != -1)
@@ -3332,7 +3344,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
        }
 
        /* This is the first try to read this page. */
-       failedSources = 0;
+       lastSourceFailed = false;
 retry:
        /* Read the page containing the record */
        if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess))
@@ -3545,7 +3557,7 @@ retry:
        return record;
 
 next_record_is_invalid:
-       failedSources |= readSource;
+       lastSourceFailed = true;
 
        if (readFile >= 0)
        {
@@ -9162,7 +9174,7 @@ CancelBackup(void)
  * In standby mode, if after a successful return of XLogPageRead() the
  * caller finds the record it's interested in to be broken, it should
  * ereport the error with the level determined by
- * emode_for_corrupt_record(), and then set "failedSources |= readSource"
+ * emode_for_corrupt_record(), and then set lastSourceFailed
  * and call XLogPageRead() again with the same arguments. This lets
  * XLogPageRead() to try fetching the record from another source, or to
  * sleep and retry.
@@ -9180,7 +9192,7 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
        targetRecOff = (*RecPtr) % XLOG_BLCKSZ;
 
        /* Fast exit if we have read the record in the current buffer already */
-       if (failedSources == 0 && targetSegNo == readSegNo &&
+       if (!lastSourceFailed && targetSegNo == readSegNo &&
                targetPageOff == readOff && targetRecOff < readLen)
                return true;
 
@@ -9227,17 +9239,18 @@ retry:
                        /* In archive or crash recovery. */
                        if (readFile < 0)
                        {
-                               int                     sources;
+                               int                     source;
 
                                /* Reset curFileTLI if random fetch. */
                                if (randAccess)
                                        curFileTLI = 0;
 
-                               sources = XLOG_FROM_PG_XLOG;
                                if (InArchiveRecovery)
-                                       sources |= XLOG_FROM_ARCHIVE;
+                                       source = XLOG_FROM_ANY;
+                               else
+                                       source = XLOG_FROM_PG_XLOG;
 
-                               readFile = XLogFileReadAnyTLI(readSegNo, emode, sources);
+                               readFile = XLogFileReadAnyTLI(readSegNo, emode, source);
                                if (readFile < 0)
                                        return false;
                        }
@@ -9326,7 +9339,7 @@ retry:
        return true;
 
 next_record_is_invalid:
-       failedSources |= readSource;
+       lastSourceFailed = true;
 
        if (readFile >= 0)
                close(readFile);
@@ -9366,185 +9379,289 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                                                        bool fetching_ckpt)
 {
        static pg_time_t last_fail_time = 0;
+       pg_time_t now;
+
+       /*-------
+        * Standby mode is implemented by a state machine:
+        *
+        * 1. Read from archive (XLOG_FROM_ARCHIVE)
+        * 2. Read from pg_xlog (XLOG_FROM_PG_XLOG)
+        * 3. Check trigger file
+        * 4. Read from primary server via walreceiver (XLOG_FROM_STREAM)
+        * 5. Rescan timelines
+        * 6. Sleep 5 seconds, and loop back to 1.
+        *
+        * Failure to read from the current source advances the state machine to
+        * the next state. In addition, successfully reading a file from pg_xlog
+        * moves the state machine from state 2 back to state 1 (we always prefer
+        * files in the archive over files in pg_xlog).
+        *
+        * 'currentSource' indicates the current state. There are no currentSource
+        * values for "check trigger", "rescan timelines", and "sleep" states,
+        * those actions are taken when reading from the previous source fails, as
+        * part of advancing to the next state.
+        *-------
+        */
+       if (currentSource == 0)
+               currentSource = XLOG_FROM_ARCHIVE;
 
        for (;;)
        {
-               if (WalRcvInProgress())
+               int             oldSource = currentSource;
+
+               /*
+                * First check if we failed to read from the current source, and
+                * advance the state machine if so. The failure to read might've
+                * happened outside this function, e.g when a CRC check fails on a
+                * record, or within this loop.
+                */
+               if (lastSourceFailed)
                {
-                       bool            havedata;
 
-                       /*
-                        * If we find an invalid record in the WAL streamed from master,
-                        * something is seriously wrong. There's little chance that the
-                        * problem will just go away, but PANIC is not good for
-                        * availability either, especially in hot standby mode.
-                        * Disconnect, and retry from archive/pg_xlog again. The WAL in
-                        * the archive should be identical to what was streamed, so it's
-                        * unlikely that it helps, but one can hope...
-                        */
-                       if (failedSources & XLOG_FROM_STREAM)
+                       switch (currentSource)
                        {
-                               ShutdownWalRcv();
-                               continue;
-                       }
+                               case XLOG_FROM_ARCHIVE:
+                                       currentSource = XLOG_FROM_PG_XLOG;
+                                       break;
 
-                       /*
-                        * Walreceiver is active, so see if new data has arrived.
-                        *
-                        * We only advance XLogReceiptTime when we obtain fresh WAL from
-                        * walreceiver and observe that we had already processed
-                        * everything before the most recent "chunk" that it flushed to
-                        * disk.  In steady state where we are keeping up with the
-                        * incoming data, XLogReceiptTime will be updated on each cycle.
-                        * When we are behind, XLogReceiptTime will not advance, so the
-                        * grace time allotted to conflicting queries will decrease.
-                        */
-                       if (XLByteLT(RecPtr, receivedUpto))
-                               havedata = true;
-                       else
-                       {
-                               XLogRecPtr      latestChunkStart;
+                               case XLOG_FROM_PG_XLOG:
+                                       /*
+                                        * Check to see if the trigger file exists. Note that we do
+                                        * this only after failure, so when you create the trigger
+                                        * file, we still finish replaying as much as we can from
+                                        * archive and pg_xlog before failover.
+                                        */
+                                       if (CheckForStandbyTrigger())
+                                               return false;
 
-                               receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart);
-                               if (XLByteLT(RecPtr, receivedUpto))
-                               {
-                                       havedata = true;
-                                       if (!XLByteLT(RecPtr, latestChunkStart))
+                                       /*
+                                        * If primary_conninfo is set, launch walreceiver to try to
+                                        * stream the missing WAL.
+                                        *
+                                        * If fetching_ckpt is TRUE, RecPtr points to the initial
+                                        * checkpoint location. In that case, we use RedoStartLSN
+                                        * as the streaming start position instead of RecPtr, so
+                                        * that when we later jump backwards to start redo at
+                                        * RedoStartLSN, we will have the logs streamed already.
+                                        */
+                                       if (PrimaryConnInfo)
                                        {
-                                               XLogReceiptTime = GetCurrentTimestamp();
-                                               SetCurrentChunkStartTime(XLogReceiptTime);
+                                               XLogRecPtr ptr = fetching_ckpt ? RedoStartLSN : RecPtr;
+
+                                               RequestXLogStreaming(ptr, PrimaryConnInfo);
                                        }
-                               }
-                               else
-                                       havedata = false;
-                       }
-                       if (havedata)
-                       {
-                               /*
-                                * Great, streamed far enough.  Open the file if it's not open
-                                * already.  Use XLOG_FROM_STREAM so that source info is set
-                                * correctly and XLogReceiptTime isn't changed.
-                                */
-                               if (readFile < 0)
-                               {
-                                       readFile = XLogFileRead(readSegNo, PANIC,
-                                                                                       recoveryTargetTLI,
-                                                                                       XLOG_FROM_STREAM, false);
-                                       Assert(readFile >= 0);
-                               }
-                               else
-                               {
-                                       /* just make sure source info is correct... */
-                                       readSource = XLOG_FROM_STREAM;
-                                       XLogReceiptSource = XLOG_FROM_STREAM;
-                               }
-                               break;
-                       }
+                                       /*
+                                        * Move to XLOG_FROM_STREAM state in either case. We'll get
+                                        * immediate failure if we didn't launch walreceiver, and
+                                        * move on to the next state.
+                                        */
+                                       currentSource = XLOG_FROM_STREAM;
+                                       break;
 
-                       /*
-                        * Data not here yet, so check for trigger then sleep for five
-                        * seconds like in the WAL file polling case below.
-                        */
-                       if (CheckForStandbyTrigger())
-                               return false;
+                               case XLOG_FROM_STREAM:
+                                       /*
+                                        * Failure while streaming. Most likely, we got here because
+                                        * streaming replication was terminated, or promotion was
+                                        * triggered. But we also get here if we find an invalid
+                                        * record in the WAL streamed from master, in which case
+                                        * something is seriously wrong. There's little chance that
+                                        * the problem will just go away, but PANIC is not good for
+                                        * availability either, especially in hot standby mode. So,
+                                        * we treat that the same as disconnection, and retry from
+                                        * archive/pg_xlog again. The WAL in the archive should be
+                                        * identical to what was streamed, so it's unlikely that it
+                                        * helps, but one can hope...
+                                        */
+                                       /*
+                                        * Before we leave XLOG_FROM_STREAM state, make sure that
+                                        * walreceiver is not running, so that it won't overwrite
+                                        * any WAL that we restore from archive.
+                                        */
+                                       if (WalRcvInProgress())
+                                               ShutdownWalRcv();
 
-                       /*
-                        * Wait for more WAL to arrive, or timeout to be reached
-                        */
-                       WaitLatch(&XLogCtl->recoveryWakeupLatch,
-                                         WL_LATCH_SET | WL_TIMEOUT,
-                                         5000L);
-                       ResetLatch(&XLogCtl->recoveryWakeupLatch);
+                                       /*
+                                        * Before we sleep, re-scan for possible new timelines if
+                                        * we were requested to recover to the latest timeline.
+                                        */
+                                       if (recoveryTargetIsLatest)
+                                       {
+                                               if (rescanLatestTimeLine())
+                                               {
+                                                       currentSource = XLOG_FROM_ARCHIVE;
+                                                       break;
+                                               }
+                                       }
+
+                                       /*
+                                        * XLOG_FROM_STREAM is the last state in our state machine,
+                                        * so we've exhausted all the options for obtaining the
+                                        * requested WAL. We're going to loop back and retry from
+                                        * the archive, but if it hasn't been long since last
+                                        * attempt, sleep 5 seconds to avoid busy-waiting.
+                                        */
+                                       now = (pg_time_t) time(NULL);
+                                       if ((now - last_fail_time) < 5)
+                                       {
+                                               pg_usleep(1000000L * (5 - (now - last_fail_time)));
+                                               now = (pg_time_t) time(NULL);
+                                       }
+                                       last_fail_time = now;
+                                       currentSource = XLOG_FROM_ARCHIVE;
+                                       break;
+
+                               default:
+                                       elog(ERROR, "unexpected WAL source %d", currentSource);
+                       }
                }
-               else
+               else if (currentSource == XLOG_FROM_PG_XLOG)
                {
                        /*
-                        * WAL receiver is not active. Poll the archive.
+                        * We just successfully read a file in pg_xlog. We prefer files
+                        * in the archive over ones in pg_xlog, so try the next file
+                        * again from the archive first.
                         */
-                       int                     sources;
-                       pg_time_t       now;
+                       currentSource = XLOG_FROM_ARCHIVE;
+               }
 
-                       if (readFile >= 0)
-                       {
-                               close(readFile);
-                               readFile = -1;
-                       }
-                       /* Reset curFileTLI if random fetch. */
-                       if (randAccess)
-                               curFileTLI = 0;
+               if (currentSource != oldSource)
+                       elog(LOG, "switched WAL source from %s to %s after %s",
+                                xlogSourceNames[oldSource], xlogSourceNames[currentSource],
+                                lastSourceFailed ? "failure" : "success");
+
+               /*
+                * We've now handled possible failure. Try to read from the chosen
+                * source.
+                */
+               lastSourceFailed = false;
+
+               switch (currentSource)
+               {
+                       case XLOG_FROM_ARCHIVE:
+                       case XLOG_FROM_PG_XLOG:
+                               /* Close any old file we might have open. */
+                               if (readFile >= 0)
+                               {
+                                       close(readFile);
+                                       readFile = -1;
+                               }
+                               /* Reset curFileTLI if random fetch. */
+                               if (randAccess)
+                                       curFileTLI = 0;
 
-                       /*
-                        * Try to restore the file from archive, or read an existing file
-                        * from pg_xlog.
-                        */
-                       sources = XLOG_FROM_ARCHIVE | XLOG_FROM_PG_XLOG;
-                       if (!(sources & ~failedSources))
-                       {
                                /*
-                                * We've exhausted all options for retrieving the file. Retry.
+                                * Try to restore the file from archive, or read an existing
+                                * file from pg_xlog.
                                 */
-                               failedSources = 0;
+                               readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2, currentSource);
+                               if (readFile >= 0)
+                                       return true;    /* success! */
 
                                /*
-                                * Before we sleep, re-scan for possible new timelines if we
-                                * were requested to recover to the latest timeline.
+                                * Nope, not found in archive or pg_xlog.
                                 */
-                               if (recoveryTargetIsLatest)
-                               {
-                                       if (rescanLatestTimeLine())
-                                               continue;
-                               }
+                               lastSourceFailed = true;
+                               break;
+
+                       case XLOG_FROM_STREAM:
+                       {
+                               bool            havedata;
 
                                /*
-                                * If it hasn't been long since last attempt, sleep to avoid
-                                * busy-waiting.
+                                * Check if WAL receiver is still active.
                                 */
-                               now = (pg_time_t) time(NULL);
-                               if ((now - last_fail_time) < 5)
+                               if (!WalRcvInProgress())
                                {
-                                       pg_usleep(1000000L * (5 - (now - last_fail_time)));
-                                       now = (pg_time_t) time(NULL);
+                                       lastSourceFailed = true;
+                                       break;
                                }
-                               last_fail_time = now;
 
                                /*
-                                * If primary_conninfo is set, launch walreceiver to try to
-                                * stream the missing WAL, before retrying to restore from
-                                * archive/pg_xlog.
+                                * Walreceiver is active, so see if new data has arrived.
                                 *
-                                * If fetching_ckpt is TRUE, RecPtr points to the initial
-                                * checkpoint location. In that case, we use RedoStartLSN as
-                                * the streaming start position instead of RecPtr, so that
-                                * when we later jump backwards to start redo at RedoStartLSN,
-                                * we will have the logs streamed already.
+                                * We only advance XLogReceiptTime when we obtain fresh WAL
+                                * from walreceiver and observe that we had already processed
+                                * everything before the most recent "chunk" that it flushed to
+                                * disk.  In steady state where we are keeping up with the
+                                * incoming data, XLogReceiptTime will be updated on each cycle.
+                                * When we are behind, XLogReceiptTime will not advance, so the
+                                * grace time allotted to conflicting queries will decrease.
                                 */
-                               if (PrimaryConnInfo)
+                               if (XLByteLT(RecPtr, receivedUpto))
+                                       havedata = true;
+                               else
                                {
-                                       XLogRecPtr ptr = fetching_ckpt ? RedoStartLSN : RecPtr;
+                                       XLogRecPtr      latestChunkStart;
 
-                                       RequestXLogStreaming(ptr, PrimaryConnInfo);
-                                       continue;
+                                       receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart);
+                                       if (XLByteLT(RecPtr, receivedUpto))
+                                       {
+                                               havedata = true;
+                                               if (!XLByteLT(RecPtr, latestChunkStart))
+                                               {
+                                                       XLogReceiptTime = GetCurrentTimestamp();
+                                                       SetCurrentChunkStartTime(XLogReceiptTime);
+                                               }
+                                       }
+                                       else
+                                               havedata = false;
+                               }
+                               if (havedata)
+                               {
+                                       /*
+                                        * Great, streamed far enough.  Open the file if it's not
+                                        * open already.  Use XLOG_FROM_STREAM so that source info
+                                        * is set correctly and XLogReceiptTime isn't changed.
+                                        */
+                                       if (readFile < 0)
+                                       {
+                                               readFile = XLogFileRead(readSegNo, PANIC,
+                                                                                               recoveryTargetTLI,
+                                                                                               XLOG_FROM_STREAM, false);
+                                               Assert(readFile >= 0);
+                                       }
+                                       else
+                                       {
+                                               /* just make sure source info is correct... */
+                                               readSource = XLOG_FROM_STREAM;
+                                               XLogReceiptSource = XLOG_FROM_STREAM;
+                                               return true;
+                                       }
+                                       break;
                                }
-                       }
-                       /* Don't try to read from a source that just failed */
-                       sources &= ~failedSources;
-                       readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2, sources);
-                       if (readFile >= 0)
-                               break;
 
-                       /*
-                        * Nope, not found in archive and/or pg_xlog.
-                        */
-                       failedSources |= sources;
+                               /*
+                                * Data not here yet. Check for trigger, then wait for
+                                * walreceiver to wake us up when new WAL arrives.
+                                */
+                               if (CheckForStandbyTrigger())
+                               {
+                                       /*
+                                        * Note that we don't "return false" immediately here.
+                                        * After being triggered, we still want to replay all the
+                                        * WAL that was already streamed. It's in pg_xlog now, so
+                                        * we just treat this as a failure, and the state machine
+                                        * will move on to replay the streamed WAL from pg_xlog,
+                                        * and then recheck the trigger and exit replay.
+                                        */
+                                       lastSourceFailed = true;
+                                       break;
+                               }
 
-                       /*
-                        * Check to see if the trigger file exists. Note that we do this
-                        * only after failure, so when you create the trigger file, we
-                        * still finish replaying as much as we can from archive and
-                        * pg_xlog before failover.
-                        */
-                       if (CheckForStandbyTrigger())
-                               return false;
+                               /*
+                                * Wait for more WAL to arrive. Time out after 5 seconds, like
+                                * when polling the archive, to react to a trigger file
+                                * promptly.
+                                */
+                               WaitLatch(&XLogCtl->recoveryWakeupLatch,
+                                                 WL_LATCH_SET | WL_TIMEOUT,
+                                                 5000L);
+                               ResetLatch(&XLogCtl->recoveryWakeupLatch);
+                               break;
+                       }
+
+                       default:
+                               elog(ERROR, "unexpected WAL source %d", currentSource);
                }
 
                /*
@@ -9554,7 +9671,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                HandleStartupProcInterrupts();
        }
 
-       return true;
+       return false;   /* not reached */
 }
 
 /*