/*
* Codes indicating where we got a WAL file from during recovery, or where
- * to attempt to get one. These are chosen so that they can be OR'd together
- * in a bitmask state variable.
+ * to attempt to get one.
*/
-#define XLOG_FROM_ARCHIVE (1<<0) /* Restored using restore_command */
-#define XLOG_FROM_PG_XLOG (1<<1) /* Existing file in pg_xlog */
-#define XLOG_FROM_STREAM (1<<2) /* Streamed from master */
+typedef enum
+{
+ XLOG_FROM_ANY = 0, /* request to read WAL from any source */
+ XLOG_FROM_ARCHIVE, /* restored using restore_command */
+ XLOG_FROM_PG_XLOG, /* existing file in pg_xlog */
+ XLOG_FROM_STREAM, /* streamed from master */
+} XLogSource;
+
+/* human-readable names for XLogSources, for debugging output */
+static const char *xlogSourceNames[] = { "any", "archive", "pg_xlog", "stream" };
/*
* openLogFile is -1 or a kernel FD for an open log file segment.
static uint32 readOff = 0;
static uint32 readLen = 0;
static bool readFileHeaderValidated = false;
-static int readSource = 0; /* XLOG_FROM_* code */
+static XLogSource readSource = 0; /* XLOG_FROM_* code */
/*
- * Keeps track of which sources we've tried to read the current WAL
- * record from and failed.
+ * Keeps track of which source we're currently reading from. This is
+ * different from readSource in that this is always set, even when we don't
+ * currently have a WAL file open. If lastSourceFailed is set, our last
+ * attempt to read from currentSource failed, and we should try another source
+ * next.
*/
-static int failedSources = 0; /* OR of XLOG_FROM_* codes */
+static XLogSource currentSource = 0; /* XLOG_FROM_* code */
+static bool lastSourceFailed = false;
/*
* These variables track when we last obtained some WAL data to process,
* and where we got it from. (XLogReceiptSource is initially the same as
* readSource, but readSource gets reset to zero when we don't have data
- * to process right now.)
+ * to process right now. It is also different from currentSource, which
+ * also changes when we try to read from a source and fail, while
+ * XLogReceiptSource tracks where we last successfully read some WAL.)
*/
static TimestampTz XLogReceiptTime = 0;
-static int XLogReceiptSource = 0; /* XLOG_FROM_* code */
+static XLogSource XLogReceiptSource = 0; /* XLOG_FROM_* code */
/* Buffer for currently read page (XLOG_BLCKSZ bytes) */
static char *readBuf = NULL;
bool use_lock);
static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
int source, bool notexistOk);
-static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources);
+static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source);
static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
bool randAccess);
static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
/*
* Open a logfile segment for reading (during recovery).
*
- * If source = XLOG_FROM_ARCHIVE, the segment is retrieved from archive.
+ * If source == XLOG_FROM_ARCHIVE, the segment is retrieved from archive.
* Otherwise, it's assumed to be already available in pg_xlog.
*/
static int
* This version searches for the segment with any TLI listed in expectedTLIs.
*/
static int
-XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources)
+XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source)
{
char path[MAXPGPATH];
ListCell *cell;
if (tli < curFileTLI)
break; /* don't bother looking at too-old TLIs */
- if (sources & XLOG_FROM_ARCHIVE)
+ if (source == XLOG_FROM_ANY || source == XLOG_FROM_ARCHIVE)
{
fd = XLogFileRead(segno, emode, tli, XLOG_FROM_ARCHIVE, true);
if (fd != -1)
}
}
- if (sources & XLOG_FROM_PG_XLOG)
+ if (source == XLOG_FROM_ANY || source == XLOG_FROM_PG_XLOG)
{
fd = XLogFileRead(segno, emode, tli, XLOG_FROM_PG_XLOG, true);
if (fd != -1)
}
/* This is the first try to read this page. */
- failedSources = 0;
+ lastSourceFailed = false;
retry:
/* Read the page containing the record */
if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess))
return record;
next_record_is_invalid:
- failedSources |= readSource;
+ lastSourceFailed = true;
if (readFile >= 0)
{
* In standby mode, if after a successful return of XLogPageRead() the
* caller finds the record it's interested in to be broken, it should
* ereport the error with the level determined by
- * emode_for_corrupt_record(), and then set "failedSources |= readSource"
+ * emode_for_corrupt_record(), and then set lastSourceFailed
* and call XLogPageRead() again with the same arguments. This lets
* XLogPageRead() to try fetching the record from another source, or to
* sleep and retry.
targetRecOff = (*RecPtr) % XLOG_BLCKSZ;
/* Fast exit if we have read the record in the current buffer already */
- if (failedSources == 0 && targetSegNo == readSegNo &&
+ if (!lastSourceFailed && targetSegNo == readSegNo &&
targetPageOff == readOff && targetRecOff < readLen)
return true;
/* In archive or crash recovery. */
if (readFile < 0)
{
- int sources;
+ int source;
/* Reset curFileTLI if random fetch. */
if (randAccess)
curFileTLI = 0;
- sources = XLOG_FROM_PG_XLOG;
if (InArchiveRecovery)
- sources |= XLOG_FROM_ARCHIVE;
+ source = XLOG_FROM_ANY;
+ else
+ source = XLOG_FROM_PG_XLOG;
- readFile = XLogFileReadAnyTLI(readSegNo, emode, sources);
+ readFile = XLogFileReadAnyTLI(readSegNo, emode, source);
if (readFile < 0)
return false;
}
return true;
next_record_is_invalid:
- failedSources |= readSource;
+ lastSourceFailed = true;
if (readFile >= 0)
close(readFile);
bool fetching_ckpt)
{
static pg_time_t last_fail_time = 0;
+ pg_time_t now;
+
+ /*-------
+ * Standby mode is implemented by a state machine:
+ *
+ * 1. Read from archive (XLOG_FROM_ARCHIVE)
+ * 2. Read from pg_xlog (XLOG_FROM_PG_XLOG)
+ * 3. Check trigger file
+ * 4. Read from primary server via walreceiver (XLOG_FROM_STREAM)
+ * 5. Rescan timelines
+ * 6. Sleep 5 seconds, and loop back to 1.
+ *
+ * Failure to read from the current source advances the state machine to
+ * the next state. In addition, successfully reading a file from pg_xlog
+ * moves the state machine from state 2 back to state 1 (we always prefer
+ * files in the archive over files in pg_xlog).
+ *
+ * 'currentSource' indicates the current state. There are no currentSource
+ * values for "check trigger", "rescan timelines", and "sleep" states,
+ * those actions are taken when reading from the previous source fails, as
+ * part of advancing to the next state.
+ *-------
+ */
+ if (currentSource == 0)
+ currentSource = XLOG_FROM_ARCHIVE;
for (;;)
{
- if (WalRcvInProgress())
+ int oldSource = currentSource;
+
+ /*
+ * First check if we failed to read from the current source, and
+ * advance the state machine if so. The failure to read might've
+ * happened outside this function, e.g when a CRC check fails on a
+ * record, or within this loop.
+ */
+ if (lastSourceFailed)
{
- bool havedata;
- /*
- * If we find an invalid record in the WAL streamed from master,
- * something is seriously wrong. There's little chance that the
- * problem will just go away, but PANIC is not good for
- * availability either, especially in hot standby mode.
- * Disconnect, and retry from archive/pg_xlog again. The WAL in
- * the archive should be identical to what was streamed, so it's
- * unlikely that it helps, but one can hope...
- */
- if (failedSources & XLOG_FROM_STREAM)
+ switch (currentSource)
{
- ShutdownWalRcv();
- continue;
- }
+ case XLOG_FROM_ARCHIVE:
+ currentSource = XLOG_FROM_PG_XLOG;
+ break;
- /*
- * Walreceiver is active, so see if new data has arrived.
- *
- * We only advance XLogReceiptTime when we obtain fresh WAL from
- * walreceiver and observe that we had already processed
- * everything before the most recent "chunk" that it flushed to
- * disk. In steady state where we are keeping up with the
- * incoming data, XLogReceiptTime will be updated on each cycle.
- * When we are behind, XLogReceiptTime will not advance, so the
- * grace time allotted to conflicting queries will decrease.
- */
- if (XLByteLT(RecPtr, receivedUpto))
- havedata = true;
- else
- {
- XLogRecPtr latestChunkStart;
+ case XLOG_FROM_PG_XLOG:
+ /*
+ * Check to see if the trigger file exists. Note that we do
+ * this only after failure, so when you create the trigger
+ * file, we still finish replaying as much as we can from
+ * archive and pg_xlog before failover.
+ */
+ if (CheckForStandbyTrigger())
+ return false;
- receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart);
- if (XLByteLT(RecPtr, receivedUpto))
- {
- havedata = true;
- if (!XLByteLT(RecPtr, latestChunkStart))
+ /*
+ * If primary_conninfo is set, launch walreceiver to try to
+ * stream the missing WAL.
+ *
+ * If fetching_ckpt is TRUE, RecPtr points to the initial
+ * checkpoint location. In that case, we use RedoStartLSN
+ * as the streaming start position instead of RecPtr, so
+ * that when we later jump backwards to start redo at
+ * RedoStartLSN, we will have the logs streamed already.
+ */
+ if (PrimaryConnInfo)
{
- XLogReceiptTime = GetCurrentTimestamp();
- SetCurrentChunkStartTime(XLogReceiptTime);
+ XLogRecPtr ptr = fetching_ckpt ? RedoStartLSN : RecPtr;
+
+ RequestXLogStreaming(ptr, PrimaryConnInfo);
}
- }
- else
- havedata = false;
- }
- if (havedata)
- {
- /*
- * Great, streamed far enough. Open the file if it's not open
- * already. Use XLOG_FROM_STREAM so that source info is set
- * correctly and XLogReceiptTime isn't changed.
- */
- if (readFile < 0)
- {
- readFile = XLogFileRead(readSegNo, PANIC,
- recoveryTargetTLI,
- XLOG_FROM_STREAM, false);
- Assert(readFile >= 0);
- }
- else
- {
- /* just make sure source info is correct... */
- readSource = XLOG_FROM_STREAM;
- XLogReceiptSource = XLOG_FROM_STREAM;
- }
- break;
- }
+ /*
+ * Move to XLOG_FROM_STREAM state in either case. We'll get
+ * immediate failure if we didn't launch walreceiver, and
+ * move on to the next state.
+ */
+ currentSource = XLOG_FROM_STREAM;
+ break;
- /*
- * Data not here yet, so check for trigger then sleep for five
- * seconds like in the WAL file polling case below.
- */
- if (CheckForStandbyTrigger())
- return false;
+ case XLOG_FROM_STREAM:
+ /*
+ * Failure while streaming. Most likely, we got here because
+ * streaming replication was terminated, or promotion was
+ * triggered. But we also get here if we find an invalid
+ * record in the WAL streamed from master, in which case
+ * something is seriously wrong. There's little chance that
+ * the problem will just go away, but PANIC is not good for
+ * availability either, especially in hot standby mode. So,
+ * we treat that the same as disconnection, and retry from
+ * archive/pg_xlog again. The WAL in the archive should be
+ * identical to what was streamed, so it's unlikely that it
+ * helps, but one can hope...
+ */
+ /*
+ * Before we leave XLOG_FROM_STREAM state, make sure that
+ * walreceiver is not running, so that it won't overwrite
+ * any WAL that we restore from archive.
+ */
+ if (WalRcvInProgress())
+ ShutdownWalRcv();
- /*
- * Wait for more WAL to arrive, or timeout to be reached
- */
- WaitLatch(&XLogCtl->recoveryWakeupLatch,
- WL_LATCH_SET | WL_TIMEOUT,
- 5000L);
- ResetLatch(&XLogCtl->recoveryWakeupLatch);
+ /*
+ * Before we sleep, re-scan for possible new timelines if
+ * we were requested to recover to the latest timeline.
+ */
+ if (recoveryTargetIsLatest)
+ {
+ if (rescanLatestTimeLine())
+ {
+ currentSource = XLOG_FROM_ARCHIVE;
+ break;
+ }
+ }
+
+ /*
+ * XLOG_FROM_STREAM is the last state in our state machine,
+ * so we've exhausted all the options for obtaining the
+ * requested WAL. We're going to loop back and retry from
+ * the archive, but if it hasn't been long since last
+ * attempt, sleep 5 seconds to avoid busy-waiting.
+ */
+ now = (pg_time_t) time(NULL);
+ if ((now - last_fail_time) < 5)
+ {
+ pg_usleep(1000000L * (5 - (now - last_fail_time)));
+ now = (pg_time_t) time(NULL);
+ }
+ last_fail_time = now;
+ currentSource = XLOG_FROM_ARCHIVE;
+ break;
+
+ default:
+ elog(ERROR, "unexpected WAL source %d", currentSource);
+ }
}
- else
+ else if (currentSource == XLOG_FROM_PG_XLOG)
{
/*
- * WAL receiver is not active. Poll the archive.
+ * We just successfully read a file in pg_xlog. We prefer files
+ * in the archive over ones in pg_xlog, so try the next file
+ * again from the archive first.
*/
- int sources;
- pg_time_t now;
+ currentSource = XLOG_FROM_ARCHIVE;
+ }
- if (readFile >= 0)
- {
- close(readFile);
- readFile = -1;
- }
- /* Reset curFileTLI if random fetch. */
- if (randAccess)
- curFileTLI = 0;
+ if (currentSource != oldSource)
+ elog(LOG, "switched WAL source from %s to %s after %s",
+ xlogSourceNames[oldSource], xlogSourceNames[currentSource],
+ lastSourceFailed ? "failure" : "success");
+
+ /*
+ * We've now handled possible failure. Try to read from the chosen
+ * source.
+ */
+ lastSourceFailed = false;
+
+ switch (currentSource)
+ {
+ case XLOG_FROM_ARCHIVE:
+ case XLOG_FROM_PG_XLOG:
+ /* Close any old file we might have open. */
+ if (readFile >= 0)
+ {
+ close(readFile);
+ readFile = -1;
+ }
+ /* Reset curFileTLI if random fetch. */
+ if (randAccess)
+ curFileTLI = 0;
- /*
- * Try to restore the file from archive, or read an existing file
- * from pg_xlog.
- */
- sources = XLOG_FROM_ARCHIVE | XLOG_FROM_PG_XLOG;
- if (!(sources & ~failedSources))
- {
/*
- * We've exhausted all options for retrieving the file. Retry.
+ * Try to restore the file from archive, or read an existing
+ * file from pg_xlog.
*/
- failedSources = 0;
+ readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2, currentSource);
+ if (readFile >= 0)
+ return true; /* success! */
/*
- * Before we sleep, re-scan for possible new timelines if we
- * were requested to recover to the latest timeline.
+ * Nope, not found in archive or pg_xlog.
*/
- if (recoveryTargetIsLatest)
- {
- if (rescanLatestTimeLine())
- continue;
- }
+ lastSourceFailed = true;
+ break;
+
+ case XLOG_FROM_STREAM:
+ {
+ bool havedata;
/*
- * If it hasn't been long since last attempt, sleep to avoid
- * busy-waiting.
+ * Check if WAL receiver is still active.
*/
- now = (pg_time_t) time(NULL);
- if ((now - last_fail_time) < 5)
+ if (!WalRcvInProgress())
{
- pg_usleep(1000000L * (5 - (now - last_fail_time)));
- now = (pg_time_t) time(NULL);
+ lastSourceFailed = true;
+ break;
}
- last_fail_time = now;
/*
- * If primary_conninfo is set, launch walreceiver to try to
- * stream the missing WAL, before retrying to restore from
- * archive/pg_xlog.
+ * Walreceiver is active, so see if new data has arrived.
*
- * If fetching_ckpt is TRUE, RecPtr points to the initial
- * checkpoint location. In that case, we use RedoStartLSN as
- * the streaming start position instead of RecPtr, so that
- * when we later jump backwards to start redo at RedoStartLSN,
- * we will have the logs streamed already.
+ * We only advance XLogReceiptTime when we obtain fresh WAL
+ * from walreceiver and observe that we had already processed
+ * everything before the most recent "chunk" that it flushed to
+ * disk. In steady state where we are keeping up with the
+ * incoming data, XLogReceiptTime will be updated on each cycle.
+ * When we are behind, XLogReceiptTime will not advance, so the
+ * grace time allotted to conflicting queries will decrease.
*/
- if (PrimaryConnInfo)
+ if (XLByteLT(RecPtr, receivedUpto))
+ havedata = true;
+ else
{
- XLogRecPtr ptr = fetching_ckpt ? RedoStartLSN : RecPtr;
+ XLogRecPtr latestChunkStart;
- RequestXLogStreaming(ptr, PrimaryConnInfo);
- continue;
+ receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart);
+ if (XLByteLT(RecPtr, receivedUpto))
+ {
+ havedata = true;
+ if (!XLByteLT(RecPtr, latestChunkStart))
+ {
+ XLogReceiptTime = GetCurrentTimestamp();
+ SetCurrentChunkStartTime(XLogReceiptTime);
+ }
+ }
+ else
+ havedata = false;
+ }
+ if (havedata)
+ {
+ /*
+ * Great, streamed far enough. Open the file if it's not
+ * open already. Use XLOG_FROM_STREAM so that source info
+ * is set correctly and XLogReceiptTime isn't changed.
+ */
+ if (readFile < 0)
+ {
+ readFile = XLogFileRead(readSegNo, PANIC,
+ recoveryTargetTLI,
+ XLOG_FROM_STREAM, false);
+ Assert(readFile >= 0);
+ }
+ else
+ {
+ /* just make sure source info is correct... */
+ readSource = XLOG_FROM_STREAM;
+ XLogReceiptSource = XLOG_FROM_STREAM;
+ return true;
+ }
+ break;
}
- }
- /* Don't try to read from a source that just failed */
- sources &= ~failedSources;
- readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2, sources);
- if (readFile >= 0)
- break;
- /*
- * Nope, not found in archive and/or pg_xlog.
- */
- failedSources |= sources;
+ /*
+ * Data not here yet. Check for trigger, then wait for
+ * walreceiver to wake us up when new WAL arrives.
+ */
+ if (CheckForStandbyTrigger())
+ {
+ /*
+ * Note that we don't "return false" immediately here.
+ * After being triggered, we still want to replay all the
+ * WAL that was already streamed. It's in pg_xlog now, so
+ * we just treat this as a failure, and the state machine
+ * will move on to replay the streamed WAL from pg_xlog,
+ * and then recheck the trigger and exit replay.
+ */
+ lastSourceFailed = true;
+ break;
+ }
- /*
- * Check to see if the trigger file exists. Note that we do this
- * only after failure, so when you create the trigger file, we
- * still finish replaying as much as we can from archive and
- * pg_xlog before failover.
- */
- if (CheckForStandbyTrigger())
- return false;
+ /*
+ * Wait for more WAL to arrive. Time out after 5 seconds, like
+ * when polling the archive, to react to a trigger file
+ * promptly.
+ */
+ WaitLatch(&XLogCtl->recoveryWakeupLatch,
+ WL_LATCH_SET | WL_TIMEOUT,
+ 5000L);
+ ResetLatch(&XLogCtl->recoveryWakeupLatch);
+ break;
+ }
+
+ default:
+ elog(ERROR, "unexpected WAL source %d", currentSource);
}
/*
HandleStartupProcInterrupts();
}
- return true;
+ return false; /* not reached */
}
/*