* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.361 2010/01/26 00:07:13 sriggs Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.362 2010/01/27 15:27:50 heikki Exp $
*
*-------------------------------------------------------------------------
*/
static XLogRecPtr LastRec;
-/*
- * Are we doing recovery from XLOG stream? If so, we recover without using
- * offline XLOG archives even though InArchiveRecovery==true. This flag is
- * used only in standby mode.
- */
-static bool InStreamingRecovery = false;
-
-/* The current log page is partially-filled, and so needs to be read again? */
-static bool needReread = false;
-
/*
* Local copy of SharedRecoveryInProgress variable. True actually means "not
* known, need to check the shared state".
* These variables are used similarly to the ones above, but for reading
* the XLOG. Note, however, that readOff generally represents the offset
* of the page just read, not the seek position of the FD itself, which
- * will be just past that page.
+ * will be just past that page. readLen indicates how much of the current
+ * page has been read into readBuf.
*/
static int readFile = -1;
static uint32 readId = 0;
static uint32 readSeg = 0;
static uint32 readOff = 0;
+static uint32 readLen = 0;
+/* Is the currently open segment being streamed from primary? */
+static bool readStreamed = false;
/* Buffer for currently read page (XLOG_BLCKSZ bytes) */
static char *readBuf = NULL;
/* State information for XLOG reading */
static XLogRecPtr ReadRecPtr; /* start of last record read */
static XLogRecPtr EndRecPtr; /* end+1 of last record read */
-static XLogRecord *nextRecord = NULL;
static TimeLineID lastPageTLI = 0;
static XLogRecPtr minRecoveryPoint; /* local copy of
static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
bool find_free, int *max_advance,
bool use_lock);
-static int XLogFileRead(uint32 log, uint32 seg, int emode);
+static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
+ bool fromArchive, bool notexistOk);
+static int XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode,
+ bool fromArchive);
+static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
+ bool randAccess);
static void XLogFileClose(void);
static bool RestoreArchivedFile(char *path, const char *xlogfname,
const char *recovername, off_t expectedSize);
static void ValidateXLOGDirectoryStructure(void);
static void CleanupBackupHistory(void);
static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
-static XLogRecord *FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt);
-static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode);
+static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt);
static bool ValidXLOGHeader(XLogPageHeader hdr, int emode);
static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
static List *readTimeLineHistory(TimeLineID targetTLI);
static void WriteControlFile(void);
static void ReadControlFile(void);
static char *str_time(pg_time_t tnow);
+static bool CheckForStandbyTrigger(void);
#ifdef WAL_DEBUG
static void xlog_outrec(StringInfo buf, XLogRecord *record);
/*
* Open a logfile segment for reading (during recovery).
+ *
+ * If fromArchive is true, the segment is retrieved from archive, otherwise
+ * it's read from pg_xlog.
*/
static int
-XLogFileRead(uint32 log, uint32 seg, int emode)
+XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
+ bool fromArchive, bool notfoundOk)
{
- char path[MAXPGPATH];
char xlogfname[MAXFNAMELEN];
char activitymsg[MAXFNAMELEN + 16];
- ListCell *cell;
+ char path[MAXPGPATH];
int fd;
- /*
- * Loop looking for a suitable timeline ID: we might need to read any of
- * the timelines listed in expectedTLIs.
- *
- * We expect curFileTLI on entry to be the TLI of the preceding file in
- * sequence, or 0 if there was no predecessor. We do not allow curFileTLI
- * to go backwards; this prevents us from picking up the wrong file when a
- * parent timeline extends to higher segment numbers than the child we
- * want to read.
- */
- foreach(cell, expectedTLIs)
- {
- TimeLineID tli = (TimeLineID) lfirst_int(cell);
-
- if (tli < curFileTLI)
- break; /* don't bother looking at too-old TLIs */
-
XLogFileName(xlogfname, tli, log, seg);
- if (InArchiveRecovery && !InStreamingRecovery)
+ if (fromArchive)
{
/* Report recovery progress in PS display */
snprintf(activitymsg, sizeof(activitymsg), "waiting for %s",
restoredFromArchive = RestoreArchivedFile(path, xlogfname,
"RECOVERYXLOG",
XLogSegSize);
+ if (!restoredFromArchive)
+ return -1;
}
else
+ {
XLogFilePath(path, tli, log, seg);
+ restoredFromArchive = false;
+ }
fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
if (fd >= 0)
return fd;
}
- if (errno != ENOENT) /* unexpected failure? */
+ if (errno != ENOENT || !notfoundOk) /* unexpected failure? */
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
path, log, seg)));
+ return -1;
+}
+
+/*
+ * Open a logfile segment for reading (during recovery).
+ *
+ * This version searches for the segment with any TLI listed in expectedTLIs.
+ * If not in StandbyMode and fromArchive is true, the segment is also
+ * searched in pg_xlog if not found in archive.
+ */
+static int
+XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, bool fromArchive)
+{
+ char path[MAXPGPATH];
+ ListCell *cell;
+ int fd;
+
+ /*
+ * Loop looking for a suitable timeline ID: we might need to read any of
+ * the timelines listed in expectedTLIs.
+ *
+ * We expect curFileTLI on entry to be the TLI of the preceding file in
+ * sequence, or 0 if there was no predecessor. We do not allow curFileTLI
+ * to go backwards; this prevents us from picking up the wrong file when a
+ * parent timeline extends to higher segment numbers than the child we
+ * want to read.
+ */
+ foreach(cell, expectedTLIs)
+ {
+ TimeLineID tli = (TimeLineID) lfirst_int(cell);
+
+ if (tli < curFileTLI)
+ break; /* don't bother looking at too-old TLIs */
+
+ fd = XLogFileRead(log, seg, emode, tli, fromArchive, true);
+ if (fd != -1)
+ return fd;
+
+ /*
+ * If not in StandbyMode, fall back to searching pg_xlog. In
+ * StandbyMode we're streaming segments from the primary to pg_xlog,
+ * and we mustn't confuse the (possibly partial) segments in pg_xlog
+ * with complete segments ready to be applied. We rather wait for
+ * the records to arrive through streaming.
+ */
+ if (!StandbyMode && fromArchive)
+ {
+ fd = XLogFileRead(log, seg, emode, tli, false, true);
+ if (fd != -1)
+ return fd;
+ }
}
/* Couldn't find it. For simplicity, complain about front timeline */
* different filename that can't be confused with regular XLOG
* files.
*/
- if (InStreamingRecovery || XLogArchiveCheckDone(xlde->d_name))
+ if (WalRcvInProgress() || XLogArchiveCheckDone(xlde->d_name))
{
snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
return true;
}
-/*
- * Attempt to fetch an XLOG record.
- *
- * If RecPtr is not NULL, try to fetch a record at that position. Otherwise
- * try to fetch a record just after the last one previously read.
- *
- * In standby mode, if we failed in reading a valid record and are not doing
- * recovery from XLOG stream yet, we ignore the failure and start walreceiver
- * process to fetch the record from the primary. Otherwise, returns NULL,
- * or fails if emode is PANIC. (emode must be either PANIC or LOG.)
- *
- * If fetching_ckpt is TRUE, RecPtr points to the checkpoint location. In
- * this case, if we have to start XLOG streaming, we use RedoStartLSN as the
- * streaming start position instead of RecPtr.
- *
- * The record is copied into readRecordBuf, so that on successful return,
- * the returned record pointer always points there.
- */
-static XLogRecord *
-FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
-{
- if (StandbyMode && !InStreamingRecovery)
- {
- XLogRecord *record;
- XLogRecPtr startlsn;
- bool haveNextRecord = (nextRecord != NULL);
-
- /* An invalid record is OK here, so we set emode to DEBUG2 */
- record = ReadRecord(RecPtr, DEBUG2);
- if (record != NULL)
- return record;
-
- /*
- * Start XLOG streaming if there is no more valid records available
- * in the archive.
- *
- * We need to calculate the start position of XLOG streaming. If we
- * read a record in the middle of a segment which doesn't exist in
- * pg_xlog, we use the start of the segment as the start position.
- * That prevents a broken segment (i.e., with no records in the
- * first half of a segment) from being created by XLOG streaming,
- * which might cause trouble later on if the segment is e.g
- * archived.
- */
- startlsn = fetching_ckpt ? RedoStartLSN : EndRecPtr;
- if (startlsn.xrecoff % XLogSegSize != 0)
- {
- char xlogpath[MAXPGPATH];
- struct stat stat_buf;
- uint32 log;
- uint32 seg;
-
- XLByteToSeg(startlsn, log, seg);
- XLogFilePath(xlogpath, recoveryTargetTLI, log, seg);
-
- if (stat(xlogpath, &stat_buf) != 0)
- startlsn.xrecoff -= startlsn.xrecoff % XLogSegSize;
- }
- RequestXLogStreaming(startlsn, PrimaryConnInfo);
-
- /* Needs to read the current page again if the next record is in it */
- needReread = haveNextRecord;
- nextRecord = NULL;
-
- InStreamingRecovery = true;
- ereport(LOG,
- (errmsg("starting streaming recovery at %X/%X",
- startlsn.xlogid, startlsn.xrecoff)));
- }
-
- return ReadRecord(RecPtr, emode);
-}
-
/*
* Attempt to read an XLOG record.
*
* try to read a record just after the last one previously read.
*
* If no valid record is available, returns NULL, or fails if emode is PANIC.
- * (emode must be either PANIC, LOG or DEBUG2.)
+ * (emode must be either PANIC, LOG)
*
* The record is copied into readRecordBuf, so that on successful return,
* the returned record pointer always points there.
*/
static XLogRecord *
-ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
+ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt)
{
XLogRecord *record;
char *buffer;
bool randAccess = false;
uint32 len,
total_len;
- uint32 targetPageOff;
uint32 targetRecOff;
uint32 pageHeaderSize;
- XLogRecPtr receivedUpto = {0,0};
- bool finished;
int emode;
/*
* should never hit the end of WAL because we wait for it to be streamed.
* Therefore treat any broken WAL as PANIC, instead of failing over.
*/
- if (InStreamingRecovery)
+ if (StandbyMode)
emode = PANIC;
else
emode = emode_arg;
if (RecPtr == NULL)
{
RecPtr = &tmpRecPtr;
- /* fast case if next record is on same page */
- if (nextRecord != NULL)
- {
- record = nextRecord;
- goto got_record;
- }
/*
- * Align old recptr to next page if the current page is filled and
- * doesn't need to be read again.
+ * Align recptr to next page if no more records can fit on the
+ * current page.
*/
- if (!needReread)
+ if (XLOG_BLCKSZ - (RecPtr->xrecoff % XLOG_BLCKSZ) < SizeOfXLogRecord)
+ {
NextLogPage(tmpRecPtr);
- /* We will account for page header size below */
+ /* We will account for page header size below */
+ }
}
else
{
randAccess = true; /* allow curFileTLI to go backwards too */
}
- if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
- {
- close(readFile);
- readFile = -1;
- }
-
- /* Is the target record ready yet? */
- if (InStreamingRecovery)
- {
- receivedUpto = WaitNextXLogAvailable(*RecPtr, &finished);
- if (finished)
- {
- if (emode_arg == PANIC)
- ereport(PANIC,
- (errmsg("streaming recovery ended")));
- else
- return NULL;
- }
- }
-
- XLByteToSeg(*RecPtr, readId, readSeg);
- if (readFile < 0)
- {
- /* Now it's okay to reset curFileTLI if random fetch */
- if (randAccess)
- curFileTLI = 0;
-
- readFile = XLogFileRead(readId, readSeg, emode);
- if (readFile < 0)
- goto next_record_is_invalid;
-
- /*
- * Whenever switching to a new WAL segment, we read the first page of
- * the file and validate its header, even if that's not where the
- * target record is. This is so that we can check the additional
- * identification info that is present in the first page's "long"
- * header.
- */
- readOff = 0;
- if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
- {
- ereport(emode,
- (errcode_for_file_access(),
- errmsg("could not read from log file %u, segment %u, offset %u: %m",
- readId, readSeg, readOff)));
- goto next_record_is_invalid;
- }
- if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
- goto next_record_is_invalid;
- }
+ /* Read the page containing the record */
+ if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess))
+ return NULL;
- targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
- if (readOff != targetPageOff || needReread)
- {
- readOff = targetPageOff;
- needReread = false;
- if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
- {
- ereport(emode,
- (errcode_for_file_access(),
- errmsg("could not seek in log file %u, segment %u to offset %u: %m",
- readId, readSeg, readOff)));
- goto next_record_is_invalid;
- }
- if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
- {
- ereport(emode,
- (errcode_for_file_access(),
- errmsg("could not read from log file %u, segment %u, offset %u: %m",
- readId, readSeg, readOff)));
- goto next_record_is_invalid;
- }
- if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
- goto next_record_is_invalid;
- }
pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
if (targetRecOff == 0)
}
record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % XLOG_BLCKSZ);
-got_record:;
-
/*
* xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
* required.
}
buffer = readRecordBuf;
- nextRecord = NULL;
len = XLOG_BLCKSZ - RecPtr->xrecoff % XLOG_BLCKSZ;
if (total_len > len)
{
/* Need to reassemble record */
XLogContRecord *contrecord;
- XLogRecPtr nextpagelsn = *RecPtr;
+ XLogRecPtr pagelsn;
uint32 gotlen = len;
+ /* Initialize pagelsn to the beginning of the page this record is on */
+ pagelsn = *RecPtr;
+ pagelsn.xrecoff = (pagelsn.xrecoff / XLOG_BLCKSZ) * XLOG_BLCKSZ;
+
memcpy(buffer, record, len);
record = (XLogRecord *) buffer;
buffer += len;
for (;;)
{
- /* Is the next page ready yet? */
- if (InStreamingRecovery)
+ /* Calculate pointer to beginning of next page */
+ pagelsn.xrecoff += XLOG_BLCKSZ;
+ if (pagelsn.xrecoff >= XLogFileSize)
{
- if (gotlen != len)
- nextpagelsn.xrecoff += XLOG_BLCKSZ;
- NextLogPage(nextpagelsn);
- receivedUpto = WaitNextXLogAvailable(nextpagelsn, &finished);
- if (finished)
- {
- if (emode_arg == PANIC)
- ereport(PANIC,
- (errmsg("streaming recovery ended")));
- else
- return NULL;
- }
+ (pagelsn.xlogid)++;
+ pagelsn.xrecoff = 0;
}
+ /* Wait for the next page to become available */
+ if (!XLogPageRead(&pagelsn, emode, false, false))
+ return NULL;
- readOff += XLOG_BLCKSZ;
- if (readOff >= XLogSegSize)
- {
- close(readFile);
- readFile = -1;
- NextLogSeg(readId, readSeg);
- readFile = XLogFileRead(readId, readSeg, emode);
- if (readFile < 0)
- goto next_record_is_invalid;
- readOff = 0;
- }
- if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
- {
- ereport(emode,
- (errcode_for_file_access(),
- errmsg("could not read from log file %u, segment %u, offset %u: %m",
- readId, readSeg, readOff)));
- goto next_record_is_invalid;
- }
- if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
- goto next_record_is_invalid;
+ /* Check that the continuation record looks valid */
if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD))
{
ereport(emode,
if (!RecordIsValid(record, *RecPtr, emode))
goto next_record_is_invalid;
pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
- if (XLOG_BLCKSZ - SizeOfXLogRecord >= pageHeaderSize +
- MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len))
- {
- nextRecord = (XLogRecord *) ((char *) contrecord +
- MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len));
- }
EndRecPtr.xlogid = readId;
EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff +
pageHeaderSize +
MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len);
- /*
- * Check whether the current page needs to be read again. If there is no
- * unread record in the current page (nextRecord == NULL), obviously we
- * don't need to reread it. If we're not in streaming recovery mode yet,
- * partially-filled page doesn't need to be reread because it is the
- * last valid page.
- */
- if (nextRecord != NULL && InStreamingRecovery &&
- XLByteLE(receivedUpto, EndRecPtr))
- {
- nextRecord = NULL;
- needReread = true;
- }
-
ReadRecPtr = *RecPtr;
/* needn't worry about XLOG SWITCH, it can't cross page boundaries */
return record;
/* Record does not cross a page boundary */
if (!RecordIsValid(record, *RecPtr, emode))
goto next_record_is_invalid;
- if (XLOG_BLCKSZ - SizeOfXLogRecord >= RecPtr->xrecoff % XLOG_BLCKSZ +
- MAXALIGN(total_len))
- nextRecord = (XLogRecord *) ((char *) record + MAXALIGN(total_len));
EndRecPtr.xlogid = RecPtr->xlogid;
EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);
- /*
- * Check whether the current page needs to be read again. If there is no
- * unread record in the current page (nextRecord == NULL), obviously we
- * don't need to reread it. If we're not in streaming recovery mode yet,
- * partially-filled page doesn't need to be reread because it is the last
- * valid page.
- */
- if (nextRecord != NULL && InStreamingRecovery &&
- XLByteLE(receivedUpto, EndRecPtr))
- {
- nextRecord = NULL;
- needReread = true;
- }
-
ReadRecPtr = *RecPtr;
memcpy(buffer, record, total_len);
/* Pretend it extends to end of segment */
EndRecPtr.xrecoff += XLogSegSize - 1;
EndRecPtr.xrecoff -= EndRecPtr.xrecoff % XLogSegSize;
- nextRecord = NULL; /* definitely not on same page */
- needReread = false;
/*
* Pretend that readBuf contains the last page of the segment. This is
close(readFile);
readFile = -1;
}
- nextRecord = NULL;
return NULL;
}
(errmsg("checkpoint record is at %X/%X",
checkPointLoc.xlogid, checkPointLoc.xrecoff)));
}
- else if (InStreamingRecovery)
+ else if (StandbyMode)
{
/*
* The last valid checkpoint record required for a streaming
if (XLByteLT(checkPoint.redo, RecPtr))
{
/* back up to find the record */
- record = FetchRecord(&(checkPoint.redo), PANIC, false);
+ record = ReadRecord(&(checkPoint.redo), PANIC, false);
}
else
{
/* just have to read next record after CheckPoint */
- record = FetchRecord(NULL, LOG, false);
+ record = ReadRecord(NULL, LOG, false);
}
if (record != NULL)
LastRec = ReadRecPtr;
- record = FetchRecord(NULL, LOG, false);
+ record = ReadRecord(NULL, LOG, false);
} while (record != NULL && recoveryContinue);
/*
/*
* We are now done reading the xlog from stream. Turn off streaming
- * recovery, and restart fetching the files (which would be required
- * at end of recovery, e.g., timeline history file) from archive.
+ * recovery to force fetching the files (which would be required
+ * at end of recovery, e.g., timeline history file) from archive or
+ * pg_xlog.
*/
- if (InStreamingRecovery)
- {
- /* We are no longer in streaming recovery state */
- InStreamingRecovery = false;
- ereport(LOG,
- (errmsg("streaming recovery complete")));
- }
+ StandbyMode = false;
/*
* Re-fetch the last valid or last applied record, so we can identify the
* exact endpoint of what we consider the valid portion of WAL.
*/
- record = ReadRecord(&LastRec, PANIC);
+ record = ReadRecord(&LastRec, PANIC, false);
EndOfLog = EndRecPtr;
XLByteToPrevSeg(EndOfLog, endLogId, endLogSeg);
return NULL;
}
- record = FetchRecord(&RecPtr, LOG, true);
+ record = ReadRecord(&RecPtr, LOG, true);
if (record == NULL)
{
}
LWLockRelease(ControlFileLock);
- /* Are we doing recovery from XLOG stream? */
- if (!InStreamingRecovery)
- InStreamingRecovery = WalRcvInProgress();
-
/*
* Delete old log files (those no longer needed even for previous
* checkpoint/restartpoint) to prevent the disk holding the xlog from
* streaming recovery we have to or the disk will eventually fill up from
* old log files streamed from master.
*/
- if (InStreamingRecovery && (_logId || _logSeg))
+ if (WalRcvInProgress() && (_logId || _logSeg))
{
XLogRecPtr endptr;
*/
if (shutdown_requested)
proc_exit(1);
+
+ /*
+ * Emergency bailout if postmaster has died. This is to avoid the
+ * necessity for manual cleanup of all postmaster children.
+ */
+ if (IsUnderPostmaster && !PostmasterIsAlive(true))
+ exit(1);
}
/* Main entry point for startup process */
*/
proc_exit(0);
}
+
+/*
+ * Read the XLOG page containing RecPtr into readBuf (if not read already).
+ * Returns true if successful, false otherwise or fails if emode is PANIC.
+ *
+ * This is responsible for restoring files from archive as needed, as well
+ * as for waiting for the requested WAL record to arrive in standby mode.
+ */
+static bool
+XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
+ bool randAccess)
+{
+ static XLogRecPtr receivedUpto = {0, 0};
+ bool switched_segment = false;
+ uint32 targetPageOff;
+ uint32 targetRecOff;
+ uint32 targetId;
+ uint32 targetSeg;
+
+ XLByteToSeg(*RecPtr, targetId, targetSeg);
+ targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
+ targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
+
+ /* Fast exit if we have read the record in the current buffer already */
+ if (targetId == readId && targetSeg == readSeg &&
+ targetPageOff == readOff && targetRecOff < readLen)
+ return true;
+
+ /*
+ * See if we need to switch to a new segment because the requested record
+ * is not in the currently open one.
+ */
+ if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
+ {
+ close(readFile);
+ readFile = -1;
+ }
+
+ XLByteToSeg(*RecPtr, readId, readSeg);
+
+ /* See if we need to retrieve more data */
+ if (readFile < 0 ||
+ (readStreamed && !XLByteLT(*RecPtr, receivedUpto)))
+ {
+ if (StandbyMode)
+ {
+ bool last_restore_failed = false;
+
+ /*
+ * In standby mode, wait for the requested record to become
+ * available, either via restore_command succeeding to restore
+ * the segment, or via walreceiver having streamed the record.
+ */
+ for (;;)
+ {
+ if (WalRcvInProgress())
+ {
+ /*
+ * While walreceiver is active, wait for new WAL to
+ * arrive from primary.
+ */
+ receivedUpto = GetWalRcvWriteRecPtr();
+ if (XLByteLT(*RecPtr, receivedUpto))
+ {
+ /*
+ * Great, streamed far enough. Open the file if it's
+ * not open already.
+ */
+ if (readFile < 0)
+ {
+ readFile =
+ XLogFileRead(readId, readSeg, PANIC,
+ recoveryTargetTLI, false, false);
+ switched_segment = true;
+ readStreamed = true;
+ }
+ break;
+ }
+
+ if (CheckForStandbyTrigger())
+ goto next_record_is_invalid;
+
+ /*
+ * When streaming is active, we want to react quickly when
+ * the next WAL record arrives, so sleep only a bit.
+ */
+ pg_usleep(100000L); /* 100ms */
+ }
+ else
+ {
+ /*
+ * Until walreceiver manages to reconnect, poll the
+ * archive.
+ */
+ if (readFile >= 0)
+ {
+ close(readFile);
+ readFile = -1;
+ }
+ /* Reset curFileTLI if random fetch. */
+ if (randAccess)
+ curFileTLI = 0;
+ readFile = XLogFileReadAnyTLI(readId, readSeg, DEBUG2, true);
+ switched_segment = true;
+ readStreamed = false;
+ if (readFile != -1)
+ {
+ elog(DEBUG1, "got WAL segment from archive");
+ break;
+ }
+
+ /*
+ * If we succeeded restoring some segments from archive
+ * since the last connection attempt (or we haven't
+ * tried streaming yet, retry immediately. But if we
+ * haven't, assume the problem is persistent, so be
+ * less aggressive.
+ */
+ if (last_restore_failed)
+ {
+ /*
+ * Check to see if the trigger file exists. Note that
+ * we do this only after failure, so when you create
+ * the trigger file, we still finish replaying as much
+ * as we can before failover.
+ */
+ if (CheckForStandbyTrigger())
+ goto next_record_is_invalid;
+ pg_usleep(5000000L); /* 5 seconds */
+ }
+ last_restore_failed = true;
+
+ /*
+ * Nope, not found in archive. Try to stream it.
+ *
+ * If fetching_ckpt is TRUE, RecPtr points to the initial
+ * checkpoint location. In that case, we use RedoStartLSN
+ * as the streaming start position instead of RecPtr, so
+ * that when we later jump backwards to start redo at
+ * RedoStartLSN, we will have the logs streamed already.
+ */
+ RequestXLogStreaming(fetching_ckpt ? RedoStartLSN : *RecPtr,
+ PrimaryConnInfo);
+ }
+
+ /*
+ * This possibly-long loop needs to handle interrupts of startup
+ * process.
+ */
+ HandleStartupProcInterrupts();
+ }
+ }
+ else
+ {
+ /* In archive or crash recovery. */
+ if (readFile < 0)
+ {
+ /* Reset curFileTLI if random fetch. */
+ if (randAccess)
+ curFileTLI = 0;
+ readFile = XLogFileReadAnyTLI(readId, readSeg, emode,
+ InArchiveRecovery);
+ switched_segment = true;
+ readStreamed = false;
+ if (readFile < 0)
+ return false;
+ }
+ }
+ }
+
+ /*
+ * At this point, we have the right segment open and we know the
+ * requested record is in it.
+ */
+ Assert(readFile != -1);
+
+ /*
+ * If the current segment is being streamed from master, calculate
+ * how much of the current page we have received already. We know the
+ * requested record has been received, but this is for the benefit
+ * of future calls, to allow quick exit at the top of this function.
+ */
+ if (readStreamed)
+ {
+ if (RecPtr->xlogid != receivedUpto.xlogid ||
+ (RecPtr->xrecoff / XLOG_BLCKSZ) != (receivedUpto.xrecoff / XLOG_BLCKSZ))
+ {
+ readLen = XLOG_BLCKSZ;
+ }
+ else
+ readLen = receivedUpto.xrecoff % XLogSegSize - targetPageOff;
+ }
+ else
+ readLen = XLOG_BLCKSZ;
+
+ if (switched_segment && targetPageOff != 0)
+ {
+ /*
+ * Whenever switching to a new WAL segment, we read the first page of
+ * the file and validate its header, even if that's not where the
+ * target record is. This is so that we can check the additional
+ * identification info that is present in the first page's "long"
+ * header.
+ */
+ readOff = 0;
+ if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+ {
+ ereport(emode,
+ (errcode_for_file_access(),
+ errmsg("could not read from log file %u, segment %u, offset %u: %m",
+ readId, readSeg, readOff)));
+ goto next_record_is_invalid;
+ }
+ if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
+ goto next_record_is_invalid;
+ }
+
+ /* Read the requested page */
+ readOff = targetPageOff;
+ if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
+ {
+ ereport(emode,
+ (errcode_for_file_access(),
+ errmsg("could not seek in log file %u, segment %u to offset %u: %m",
+ readId, readSeg, readOff)));
+ goto next_record_is_invalid;
+ }
+ if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+ {
+ ereport(emode,
+ (errcode_for_file_access(),
+ errmsg("could not read from log file %u, segment %u, offset %u: %m",
+ readId, readSeg, readOff)));
+ goto next_record_is_invalid;
+ }
+ if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
+ goto next_record_is_invalid;
+
+ Assert(targetId == readId);
+ Assert(targetSeg == readSeg);
+ Assert(targetPageOff == readOff);
+ Assert(targetRecOff < readLen);
+
+ return true;
+
+next_record_is_invalid:
+ if (readFile >= 0)
+ close(readFile);
+ readFile = -1;
+ readStreamed = false;
+ readLen = 0;
+
+ return false;
+}
+
+/*
+ * Check to see if the trigger file exists. If it does, request postmaster
+ * to shut down walreceiver, wait for it to exit, remove the trigger
+ * file, and return true.
+ */
+static bool
+CheckForStandbyTrigger(void)
+{
+ struct stat stat_buf;
+
+ if (TriggerFile == NULL)
+ return false;
+
+ if (stat(TriggerFile, &stat_buf) == 0)
+ {
+ ereport(LOG,
+ (errmsg("trigger file found: %s", TriggerFile)));
+ ShutdownWalRcv();
+ unlink(TriggerFile);
+ return true;
+ }
+ return false;
+}
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.1 2010/01/20 09:16:24 heikki Exp $
+ * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.2 2010/01/27 15:27:51 heikki Exp $
*
*-------------------------------------------------------------------------
*/
static void WalRcvQuickDieHandler(SIGNAL_ARGS);
/* Prototypes for private functions */
-static void InitWalRcv(void);
-static void WalRcvKill(int code, Datum arg);
+static void WalRcvDie(int code, Datum arg);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(void);
void
WalReceiverMain(void)
{
- sigjmp_buf local_sigjmp_buf;
- MemoryContext walrcv_context;
char conninfo[MAXCONNINFO];
XLogRecPtr startpoint;
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
- /* Load the libpq-specific functions */
- load_file("libpqwalreceiver", false);
- if (walrcv_connect == NULL || walrcv_receive == NULL ||
- walrcv_disconnect == NULL)
- elog(ERROR, "libpqwalreceiver didn't initialize correctly");
+ /*
+ * WalRcv should be set up already (if we are a backend, we inherit
+ * this by fork() or EXEC_BACKEND mechanism from the postmaster).
+ */
+ Assert(walrcv != NULL);
+
+ /*
+ * Mark walreceiver as running in shared memory.
+ *
+ * Do this as early as possible, so that if we fail later on, we'll
+ * set state to STOPPED. If we die before this, the startup process
+ * will keep waiting for us to start up, until it times out.
+ */
+ SpinLockAcquire(&walrcv->mutex);
+ Assert(walrcv->pid == 0);
+ switch(walrcv->walRcvState)
+ {
+ case WALRCV_STOPPING:
+ /* If we've already been requested to stop, don't start up. */
+ walrcv->walRcvState = WALRCV_STOPPED;
+ /* fall through */
+
+ case WALRCV_STOPPED:
+ SpinLockRelease(&walrcv->mutex);
+ proc_exit(1);
+ break;
+
+ case WALRCV_STARTING:
+ /* The usual case */
+ break;
+
+ case WALRCV_RUNNING:
+ /* Shouldn't happen */
+ elog(PANIC, "walreceiver still running according to shared memory state");
+ }
+ /* Advertise our PID so that the startup process can kill us */
+ walrcv->pid = MyProcPid;
+ walrcv->walRcvState = WALRCV_RUNNING;
+
+ /* Fetch information required to start streaming */
+ strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
+ startpoint = walrcv->receivedUpto;
+ SpinLockRelease(&walrcv->mutex);
- /* Mark walreceiver in progress */
- InitWalRcv();
+ /* Arrange to clean up at walreceiver exit */
+ on_shmem_exit(WalRcvDie, 0);
/*
* If possible, make this process a group leader, so that the postmaster
/* We allow SIGQUIT (quickdie) at all times */
sigdelset(&BlockSig, SIGQUIT);
+ /* Load the libpq-specific functions */
+ load_file("libpqwalreceiver", false);
+ if (walrcv_connect == NULL || walrcv_receive == NULL ||
+ walrcv_disconnect == NULL)
+ elog(ERROR, "libpqwalreceiver didn't initialize correctly");
+
/*
* Create a resource owner to keep track of our resources (not clear that
* we need this, but may as well have one).
*/
CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");
- /*
- * Create a memory context that we will do all our work in. We do this so
- * that we can reset the context during error recovery and thereby avoid
- * possible memory leaks.
- */
- walrcv_context = AllocSetContextCreate(TopMemoryContext,
- "Wal Receiver",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
- MemoryContextSwitchTo(walrcv_context);
-
- /*
- * If an exception is encountered, processing resumes here.
- *
- * This code is heavily based on bgwriter.c, q.v.
- */
- if (sigsetjmp(local_sigjmp_buf, 1) != 0)
- {
- /* Since not using PG_TRY, must reset error stack by hand */
- error_context_stack = NULL;
-
- /* Reset WalRcvImmediateInterruptOK */
- DisableWalRcvImmediateExit();
-
- /* Prevent interrupts while cleaning up */
- HOLD_INTERRUPTS();
-
- /* Report the error to the server log */
- EmitErrorReport();
-
- /* Disconnect any previous connection. */
- EnableWalRcvImmediateExit();
- walrcv_disconnect();
- DisableWalRcvImmediateExit();
-
- /*
- * Now return to normal top-level context and clear ErrorContext for
- * next time.
- */
- MemoryContextSwitchTo(walrcv_context);
- FlushErrorState();
-
- /* Flush any leaked data in the top-level context */
- MemoryContextResetAndDeleteChildren(walrcv_context);
-
- /* Now we can allow interrupts again */
- RESUME_INTERRUPTS();
-
- /*
- * Sleep at least 1 second after any error. A write error is likely
- * to be repeated, and we don't want to be filling the error logs as
- * fast as we can.
- */
- pg_usleep(1000000L);
- }
-
- /* We can now handle ereport(ERROR) */
- PG_exception_stack = &local_sigjmp_buf;
-
/* Unblock signals (they were blocked when the postmaster forked us) */
PG_SETMASK(&UnBlockSig);
- /* Fetch connection information from shared memory */
- SpinLockAcquire(&walrcv->mutex);
- strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
- startpoint = walrcv->receivedUpto;
- SpinLockRelease(&walrcv->mutex);
-
/* Establish the connection to the primary for XLOG streaming */
EnableWalRcvImmediateExit();
walrcv_connect(conninfo, startpoint);
}
}
-/* Advertise our pid in shared memory, so that startup process can kill us. */
-static void
-InitWalRcv(void)
-{
- /* use volatile pointer to prevent code rearrangement */
- volatile WalRcvData *walrcv = WalRcv;
-
- /*
- * WalRcv should be set up already (if we are a backend, we inherit
- * this by fork() or EXEC_BACKEND mechanism from the postmaster).
- */
- if (walrcv == NULL)
- elog(PANIC, "walreceiver control data uninitialized");
-
- /* If we've already been requested to stop, don't start up */
- SpinLockAcquire(&walrcv->mutex);
- Assert(walrcv->pid == 0);
- if (walrcv->walRcvState == WALRCV_STOPPED ||
- walrcv->walRcvState == WALRCV_STOPPING)
- {
- walrcv->walRcvState = WALRCV_STOPPED;
- SpinLockRelease(&walrcv->mutex);
- proc_exit(1);
- }
- walrcv->pid = MyProcPid;
- SpinLockRelease(&walrcv->mutex);
-
- /* Arrange to clean up at walreceiver exit */
- on_shmem_exit(WalRcvKill, 0);
-}
-
/*
- * Clear our pid from shared memory at exit.
+ * Mark us as STOPPED in shared memory at exit.
*/
static void
-WalRcvKill(int code, Datum arg)
+WalRcvDie(int code, Datum arg)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
- bool stopped = false;
SpinLockAcquire(&walrcv->mutex);
- if (walrcv->walRcvState == WALRCV_STOPPING ||
- walrcv->walRcvState == WALRCV_STOPPED)
- {
- walrcv->walRcvState = WALRCV_STOPPED;
- stopped = true;
- elog(LOG, "walreceiver stopped");
- }
+ Assert(walrcv->walRcvState == WALRCV_RUNNING ||
+ walrcv->walRcvState == WALRCV_STOPPING);
+ walrcv->walRcvState = WALRCV_STOPPED;
walrcv->pid = 0;
SpinLockRelease(&walrcv->mutex);
+ /* Terminate the connection gracefully. */
walrcv_disconnect();
-
- /* If requested to stop, tell postmaster to not restart us. */
- if (stopped)
- SendPostmasterSignal(PMSIGNAL_SHUTDOWN_WALRECEIVER);
}
/* SIGHUP: set flag to re-read config file at next convenient time */
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.2 2010/01/20 09:16:24 heikki Exp $
+ * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.3 2010/01/27 15:27:51 heikki Exp $
*
*-------------------------------------------------------------------------
*/
#include <sys/types.h>
#include <sys/stat.h>
+#include <sys/time.h>
+#include <time.h>
#include <unistd.h>
#include <signal.h>
WalRcvData *WalRcv = NULL;
-static bool CheckForStandbyTrigger(void);
-static void ShutdownWalRcv(void);
+/*
+ * How long to wait for walreceiver to start up after requesting
+ * postmaster to launch it. In seconds.
+ */
+#define WALRCV_STARTUP_TIMEOUT 10
/* Report shared memory space needed by WalRcvShmemInit */
Size
/* Initialize the data structures */
MemSet(WalRcv, 0, WalRcvShmemSize());
- WalRcv->walRcvState = WALRCV_NOT_STARTED;
+ WalRcv->walRcvState = WALRCV_STOPPED;
SpinLockInit(&WalRcv->mutex);
}
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
WalRcvState state;
+ pg_time_t startTime;
SpinLockAcquire(&walrcv->mutex);
- state = walrcv->walRcvState;
- SpinLockRelease(&walrcv->mutex);
- if (state == WALRCV_RUNNING || state == WALRCV_STOPPING)
- return true;
- else
- return false;
-}
-
-/*
- * Wait for the XLOG record at given position to become available.
- *
- * 'recptr' indicates the byte position which caller wants to read the
- * XLOG record up to. The byte position actually written and flushed
- * by walreceiver is returned. It can be higher than the requested
- * location, and the caller can safely read up to that point without
- * calling WaitNextXLogAvailable() again.
- *
- * If WAL streaming is ended (because a trigger file is found), *finished
- * is set to true and function returns immediately. The returned position
- * can be lower than requested in that case.
- *
- * Called by the startup process during streaming recovery.
- */
-XLogRecPtr
-WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished)
-{
- static XLogRecPtr receivedUpto = {0, 0};
-
- *finished = false;
+ state = walrcv->walRcvState;
+ startTime = walrcv->startTime;
- /* Quick exit if already known available */
- if (XLByteLT(recptr, receivedUpto))
- return receivedUpto;
+ SpinLockRelease(&walrcv->mutex);
- for (;;)
+ /*
+ * If it has taken too long for walreceiver to start up, give up.
+ * Setting the state to STOPPED ensures that if walreceiver later
+ * does start up after all, it will see that it's not supposed to be
+ * running and die without doing anything.
+ */
+ if (state == WALRCV_STARTING)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile WalRcvData *walrcv = WalRcv;
-
- /* Update local status */
- SpinLockAcquire(&walrcv->mutex);
- receivedUpto = walrcv->receivedUpto;
- SpinLockRelease(&walrcv->mutex);
+ pg_time_t now = (pg_time_t) time(NULL);
- /* If available already, leave here */
- if (XLByteLT(recptr, receivedUpto))
- return receivedUpto;
-
- /* Check to see if the trigger file exists */
- if (CheckForStandbyTrigger())
+ if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
{
- *finished = true;
- return receivedUpto;
- }
+ SpinLockAcquire(&walrcv->mutex);
- pg_usleep(100000L); /* 100ms */
-
- /*
- * This possibly-long loop needs to handle interrupts of startup
- * process.
- */
- HandleStartupProcInterrupts();
+ if (walrcv->walRcvState == WALRCV_STARTING)
+ state = walrcv->walRcvState = WALRCV_STOPPED;
- /*
- * Emergency bailout if postmaster has died. This is to avoid the
- * necessity for manual cleanup of all postmaster children.
- */
- if (!PostmasterIsAlive(true))
- exit(1);
+ SpinLockRelease(&walrcv->mutex);
+ }
}
+
+ if (state != WALRCV_STOPPED)
+ return true;
+ else
+ return false;
}
/*
- * Stop walreceiver and wait for it to die.
+ * Stop walreceiver (if running) and wait for it to die.
*/
-static void
+void
ShutdownWalRcv(void)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
- pid_t walrcvpid;
+ pid_t walrcvpid = 0;
/*
* Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
* restart itself.
*/
SpinLockAcquire(&walrcv->mutex);
- Assert(walrcv->walRcvState == WALRCV_RUNNING);
- walrcv->walRcvState = WALRCV_STOPPING;
- walrcvpid = walrcv->pid;
+ switch(walrcv->walRcvState)
+ {
+ case WALRCV_STOPPED:
+ break;
+ case WALRCV_STARTING:
+ walrcv->walRcvState = WALRCV_STOPPED;
+ break;
+
+ case WALRCV_RUNNING:
+ walrcv->walRcvState = WALRCV_STOPPING;
+ /* fall through */
+ case WALRCV_STOPPING:
+ walrcvpid = walrcv->pid;
+ break;
+ }
SpinLockRelease(&walrcv->mutex);
/*
- * Pid can be 0, if no walreceiver process is active right now.
- * Postmaster should restart it, and when it does, it will see the
- * STOPPING state.
+ * Signal walreceiver process if it was still running.
*/
if (walrcvpid != 0)
kill(walrcvpid, SIGTERM);
}
}
-/*
- * Check to see if the trigger file exists. If it does, request postmaster
- * to shut down walreceiver and wait for it to exit, and remove the trigger
- * file.
- */
-static bool
-CheckForStandbyTrigger(void)
-{
- struct stat stat_buf;
-
- if (TriggerFile == NULL)
- return false;
-
- if (stat(TriggerFile, &stat_buf) == 0)
- {
- ereport(LOG,
- (errmsg("trigger file found: %s", TriggerFile)));
- ShutdownWalRcv();
- unlink(TriggerFile);
- return true;
- }
- return false;
-}
-
/*
* Request postmaster to start walreceiver.
*
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
+ pg_time_t now = (pg_time_t) time(NULL);
- Assert(walrcv->walRcvState == WALRCV_NOT_STARTED);
+ /*
+ * We always start at the beginning of the segment.
+ * That prevents a broken segment (i.e., with no records in the
+ * first half of a segment) from being created by XLOG streaming,
+ * which might cause trouble later on if the segment is e.g
+ * archived.
+ */
+ if (recptr.xrecoff % XLogSegSize != 0)
+ recptr.xrecoff -= recptr.xrecoff % XLogSegSize;
+
+ /* It better be stopped before we try to restart it */
+ Assert(walrcv->walRcvState == WALRCV_STOPPED);
- /* locking is just pro forma here; walreceiver isn't started yet */
SpinLockAcquire(&walrcv->mutex);
- walrcv->receivedUpto = recptr;
if (conninfo != NULL)
strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
else
walrcv->conninfo[0] = '\0';
- walrcv->walRcvState = WALRCV_RUNNING;
+ walrcv->walRcvState = WALRCV_STARTING;
+ walrcv->startTime = now;
+
+ walrcv->receivedUpto = recptr;
SpinLockRelease(&walrcv->mutex);
SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
return recptr;
}
+