* Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.149 2004/07/19 14:34:39 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.150 2004/07/21 22:31:20 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "access/clog.h"
#include "access/subtrans.h"
-#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog.h"
+#include "access/xlog_internal.h"
#include "access/xlogutils.h"
#include "catalog/catversion.h"
#include "catalog/pg_control.h"
+#include "miscadmin.h"
#include "postmaster/bgwriter.h"
#include "storage/bufpage.h"
#include "storage/fd.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/relcache.h"
-#include "miscadmin.h"
/*
/*
- * ThisStartUpID will be same in all backends --- it identifies current
- * instance of the database system.
+ * ThisTimeLineID will be same in all backends --- it identifies current
+ * WAL timeline for the database system.
*/
-StartUpID ThisStartUpID = 0;
+TimeLineID ThisTimeLineID = 0;
/* Are we doing recovery from XLOG? */
bool InRecovery = false;
/* Are we recovering using offline XLOG archives? */
static bool InArchiveRecovery = false;
-/* Was the last file restored from archive, or local? */
+/* Was the last xlog file restored from archive, or local? */
static bool restoredFromArchive = false;
-static char recoveryRestoreCommand[MAXPGPATH];
+/* options taken from recovery.conf */
+static char *recoveryRestoreCommand = NULL;
static bool recoveryTarget = false;
static bool recoveryTargetExact = false;
static bool recoveryTargetInclusive = true;
static TransactionId recoveryTargetXid;
static time_t recoveryTargetTime;
+/* if recoveryStopsHere returns true, it saves actual stop xid/time here */
+static TransactionId recoveryStopXid;
+static time_t recoveryStopTime;
+static bool recoveryStopAfter;
+
+/*
+ * During normal operation, the only timeline we care about is ThisTimeLineID.
+ * During recovery, however, things are more complicated. To simplify life
+ * for rmgr code, we keep ThisTimeLineID set to the "current" timeline as we
+ * scan through the WAL history (that is, it is the line that was active when
+ * the currently-scanned WAL record was generated). We also need these
+ * timeline values:
+ *
+ * recoveryTargetTLI: the desired timeline that we want to end in.
+ *
+ * expectedTLIs: an integer list of recoveryTargetTLI and the TLIs of
+ * its known parents, newest first (so recoveryTargetTLI is always the
+ * first list member). Only these TLIs are expected to be seen in the WAL
+ * segments we read, and indeed only these TLIs will be considered as
+ * candidate WAL files to open at all.
+ *
+ * curFileTLI: the TLI appearing in the name of the current input WAL file.
+ * (This is not necessarily the same as ThisTimeLineID, because we could
+ * be scanning data that was copied from an ancestor timeline when the current
+ * file was created.) During a sequential scan we do not allow this value
+ * to decrease.
+ */
+static TimeLineID recoveryTargetTLI;
+static List *expectedTLIs;
+static TimeLineID curFileTLI;
+
/*
* MyLastRecPtr points to the start of the last XLOG record inserted by the
* current transaction. If MyLastRecPtr.xrecoff == 0, then the current
*
*----------
*/
+
typedef struct XLogwrtRqst
{
XLogRecPtr Write; /* last byte + 1 to write out */
XLogRecPtr Flush; /* last byte + 1 to flush */
} XLogwrtRqst;
+typedef struct XLogwrtResult
+{
+ XLogRecPtr Write; /* last byte + 1 written out */
+ XLogRecPtr Flush; /* last byte + 1 flushed */
+} XLogwrtResult;
+
/*
* Shared state data for XLogInsert.
*/
XLogRecPtr *xlblocks; /* 1st byte ptr-s + BLCKSZ */
uint32 XLogCacheByte; /* # bytes in xlog buffers */
uint32 XLogCacheBlck; /* highest allocated xlog buffer index */
- StartUpID ThisStartUpID;
+ TimeLineID ThisTimeLineID;
slock_t info_lck; /* locks shared LogwrtRqst/LogwrtResult */
} XLogCtlData;
XLogCtl->xlblocks[curridx].xrecoff - INSERT_FREESPACE(Insert) \
)
-
-/* Increment an xlogid/segment pair */
-#define NextLogSeg(logId, logSeg) \
- do { \
- if ((logSeg) >= XLogSegsPerFile-1) \
- { \
- (logId)++; \
- (logSeg) = 0; \
- } \
- else \
- (logSeg)++; \
- } while (0)
-
-/* Decrement an xlogid/segment pair (assume it's not 0,0) */
-#define PrevLogSeg(logId, logSeg) \
- do { \
- if (logSeg) \
- (logSeg)--; \
- else \
- { \
- (logId)--; \
- (logSeg) = XLogSegsPerFile-1; \
- } \
- } while (0)
-
-/*
- * Compute ID and segment from an XLogRecPtr.
- *
- * For XLByteToSeg, do the computation at face value. For XLByteToPrevSeg,
- * a boundary byte is taken to be in the previous segment. This is suitable
- * for deciding which segment to write given a pointer to a record end,
- * for example. (We can assume xrecoff is not zero, since no valid recptr
- * can have that.)
- */
-#define XLByteToSeg(xlrp, logId, logSeg) \
- ( logId = (xlrp).xlogid, \
- logSeg = (xlrp).xrecoff / XLogSegSize \
- )
-#define XLByteToPrevSeg(xlrp, logId, logSeg) \
- ( logId = (xlrp).xlogid, \
- logSeg = ((xlrp).xrecoff - 1) / XLogSegSize \
- )
-
-/*
- * Is an XLogRecPtr within a particular XLOG segment?
- *
- * For XLByteInSeg, do the computation at face value. For XLByteInPrevSeg,
- * a boundary byte is taken to be in the previous segment.
- */
-#define XLByteInSeg(xlrp, logId, logSeg) \
- ((xlrp).xlogid == (logId) && \
- (xlrp).xrecoff / XLogSegSize == (logSeg))
-
-#define XLByteInPrevSeg(xlrp, logId, logSeg) \
- ((xlrp).xlogid == (logId) && \
- ((xlrp).xrecoff - 1) / XLogSegSize == (logSeg))
-
-
#define PrevBufIdx(idx) \
(((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))
#define NextBufIdx(idx) \
(((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
-#define XRecOffIsValid(xrecoff) \
- ((xrecoff) % BLCKSZ >= SizeOfXLogPHD && \
- (BLCKSZ - (xrecoff) % BLCKSZ) >= SizeOfXLogRecord)
-
-/*
- * These macros encapsulate knowledge about the exact layout of XLog file
- * names as well as archive-status file names.
- */
-#define MAXFNAMELEN 32
-
-#define XLogFileName(fname, log, seg) \
- snprintf(fname, MAXFNAMELEN, "%08X%08X", log, seg)
-
-#define XLogFilePath(path, log, seg) \
- snprintf(path, MAXPGPATH, "%s/%08X%08X", XLogDir, log, seg)
-
-#define StatusFilePath(path, xlog, suffix) \
- snprintf(path, MAXPGPATH, "%s/archive_status/%s%s", XLogDir, xlog, suffix)
-
-/*
- * _INTL_MAXLOGRECSZ: max space needed for a record including header and
- * any backup-block data.
- */
-#define _INTL_MAXLOGRECSZ (SizeOfXLogRecord + MAXLOGRECSZ + \
- XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
-
/* File path names */
-static char XLogDir[MAXPGPATH];
+char XLogDir[MAXPGPATH];
static char ControlFilePath[MAXPGPATH];
/*
static XLogRecPtr ReadRecPtr;
static XLogRecPtr EndRecPtr;
static XLogRecord *nextRecord = NULL;
-static StartUpID lastReadSUI;
+static TimeLineID lastPageTLI = 0;
static bool InRedo = false;
+
static void XLogArchiveNotify(const char *xlog);
static void XLogArchiveNotifySeg(uint32 log, uint32 seg);
static bool XLogArchiveIsDone(const char *xlog);
static void XLogArchiveCleanup(const char *xlog);
static void readRecoveryCommandFile(void);
-static void exitArchiveRecovery(uint32 endLogId, uint32 endLogSeg,
- uint32 xrecoff);
+static void exitArchiveRecovery(TimeLineID endTLI,
+ uint32 endLogId, uint32 endLogSeg);
static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
static bool AdvanceXLInsertBuffer(void);
-static bool WasteXLInsertBuffer(void);
static void XLogWrite(XLogwrtRqst WriteRqst);
static int XLogFileInit(uint32 log, uint32 seg,
bool *use_existent, bool use_lock);
static bool InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath,
bool find_free, int max_advance,
bool use_lock);
-static int XLogFileOpen(uint32 log, uint32 seg, bool econt);
-static void RestoreArchivedXLog(char *path, uint32 log, uint32 seg);
+static int XLogFileOpen(uint32 log, uint32 seg);
+static int XLogFileRead(uint32 log, uint32 seg, int emode);
+static bool RestoreArchivedFile(char *path, const char *xlogfname,
+ const char *recovername);
static void PreallocXlogFiles(XLogRecPtr endptr);
static void MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr);
static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer);
-static bool ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI);
+static bool ValidXLOGHeader(XLogPageHeader hdr, int emode);
static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr,
int whichChkpt,
char *buffer);
+static List *readTimeLineHistory(TimeLineID targetTLI);
+static bool existsTimeLineHistory(TimeLineID probeTLI);
+static TimeLineID findNewestTimeLine(TimeLineID startTLI);
+static void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
+ TimeLineID endTLI,
+ uint32 endLogId, uint32 endLogSeg);
static void WriteControlFile(void);
static void ReadControlFile(void);
static char *str_time(time_t tnow);
if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
{
RecPtr.xlogid = 0;
- RecPtr.xrecoff = SizeOfXLogPHD; /* start of 1st checkpoint record */
+ RecPtr.xrecoff = SizeOfXLogLongPHD; /* start of 1st chkpt record */
return (RecPtr);
}
}
/*
- * Determine exactly where we will place the new XLOG record. If there
- * isn't enough space on the current XLOG page for a record header,
- * advance to the next page (leaving the unused space as zeroes).
- * If there isn't enough space in the current XLOG segment for the whole
- * record, advance to the next segment (inserting wasted-space records).
- * This avoids needing a continuation record at the start of a segment
- * file, which would conflict with placing a FILE_HEADER record there.
- * We assume that no XLOG record can be larger than a segment file...
+ * If there isn't enough space on the current XLOG page for a record
+ * header, advance to the next page (leaving the unused space as zeroes).
*/
-
updrqst = false;
freespace = INSERT_FREESPACE(Insert);
if (freespace < SizeOfXLogRecord)
freespace = INSERT_FREESPACE(Insert);
}
- if (freespace < (uint32) (SizeOfXLogRecord + write_len))
- {
- /* Doesn't fit on this page, so check for overrunning the file */
- uint32 avail;
-
- /* First figure the space available in remaining pages of file */
- avail = XLogSegSize - BLCKSZ -
- (Insert->currpage->xlp_pageaddr.xrecoff % XLogSegSize);
- avail /= BLCKSZ; /* convert to pages, then usable bytes */
- avail *= (BLCKSZ - SizeOfXLogPHD - SizeOfXLogContRecord);
- avail += freespace; /* add in the current page too */
- if (avail < (uint32) (SizeOfXLogRecord + write_len))
- {
- /* It overruns the file, so waste the rest of the file... */
- do {
- updrqst = WasteXLInsertBuffer();
- } while ((Insert->currpage->xlp_pageaddr.xrecoff % XLogSegSize) != 0);
- freespace = INSERT_FREESPACE(Insert);
- }
- }
-
curridx = Insert->curridx;
record = (XLogRecord *) Insert->currpos;
/* Use next buffer */
updrqst = AdvanceXLInsertBuffer();
curridx = Insert->curridx;
- /* This assert checks we did not insert a file header record */
- Assert(INSERT_FREESPACE(Insert) == BLCKSZ - SizeOfXLogPHD);
/* Insert cont-record header */
Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
contrecord = (XLogContRecord *) Insert->currpos;
contrecord->xl_rem_len = write_len;
Insert->currpos += SizeOfXLogContRecord;
- freespace = BLCKSZ - SizeOfXLogPHD - SizeOfXLogContRecord;
+ freespace = INSERT_FREESPACE(Insert);
}
/* Ensure next record will be properly aligned */
* Create an archive notification file
*
* The name of the notification file is the message that will be picked up
- * by the archiver, e.g. we write 00000001000000C6.ready
- * and the archiver then knows to archive XLogDir/00000001000000C6,
- * then when complete, rename it to 00000001000000C6.done
+ * by the archiver, e.g. we write 0000000100000001000000C6.ready
+ * and the archiver then knows to archive XLogDir/0000000100000001000000C6,
+ * then when complete, rename it to 0000000100000001000000C6.done
*/
static void
XLogArchiveNotify(const char *xlog)
{
char xlog[MAXFNAMELEN];
- XLogFileName(xlog, log, seg);
+ XLogFileName(xlog, ThisTimeLineID, log, seg);
XLogArchiveNotify(xlog);
}
/*
* XLogArchiveCleanup
*
- * Cleanup an archive notification file for a particular xlog segment
+ * Cleanup archive notification file(s) for a particular xlog segment
*/
static void
XLogArchiveCleanup(const char *xlog)
{
char archiveStatusPath[MAXPGPATH];
+ /* Remove the .done file */
StatusFilePath(archiveStatusPath, xlog, ".done");
unlink(archiveStatusPath);
/* should we complain about failure? */
+
+ /* Remove the .ready file if present --- normally it shouldn't be */
+ StatusFilePath(archiveStatusPath, xlog, ".ready");
+ unlink(archiveStatusPath);
+ /* should we complain about failure? */
}
/*
NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * BLCKSZ);
Insert->curridx = nextidx;
Insert->currpage = NewPage;
- Insert->currpos = ((char *) NewPage) + SizeOfXLogPHD;
+ Insert->currpos = ((char *) NewPage) + SizeOfXLogShortPHD;
/*
* Be sure to re-zero the buffer so that bytes beyond what we've
*/
NewPage->xlp_magic = XLOG_PAGE_MAGIC;
/* NewPage->xlp_info = 0; */ /* done by memset */
- NewPage->xlp_sui = ThisStartUpID;
+ NewPage->xlp_tli = ThisTimeLineID;
NewPage->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid;
NewPage->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - BLCKSZ;
/*
- * If first page of an XLOG segment file, add a FILE_HEADER record.
+ * If first page of an XLOG segment file, make it a long header.
*/
if ((NewPage->xlp_pageaddr.xrecoff % XLogSegSize) == 0)
{
- XLogRecPtr RecPtr;
- XLogRecord *record;
- XLogFileHeaderData *fhdr;
- crc64 crc;
+ XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;
- record = (XLogRecord *) Insert->currpos;
- record->xl_prev = Insert->PrevRecord;
- record->xl_xact_prev.xlogid = 0;
- record->xl_xact_prev.xrecoff = 0;
- record->xl_xid = InvalidTransactionId;
- record->xl_len = SizeOfXLogFHD;
- record->xl_info = XLOG_FILE_HEADER;
- record->xl_rmid = RM_XLOG_ID;
- fhdr = (XLogFileHeaderData *) XLogRecGetData(record);
- fhdr->xlfhd_sysid = ControlFile->system_identifier;
- fhdr->xlfhd_xlogid = NewPage->xlp_pageaddr.xlogid;
- fhdr->xlfhd_segno = NewPage->xlp_pageaddr.xrecoff / XLogSegSize;
- fhdr->xlfhd_seg_size = XLogSegSize;
-
- INIT_CRC64(crc);
- COMP_CRC64(crc, fhdr, SizeOfXLogFHD);
- COMP_CRC64(crc, (char *) record + sizeof(crc64),
- SizeOfXLogRecord - sizeof(crc64));
- FIN_CRC64(crc);
- record->xl_crc = crc;
-
- /* Compute record's XLOG location */
- INSERT_RECPTR(RecPtr, Insert, nextidx);
-
- /* Record begin of record in appropriate places */
- Insert->PrevRecord = RecPtr;
-
- Insert->currpos += SizeOfXLogRecord + SizeOfXLogFHD;
+ NewLongPage->xlp_sysid = ControlFile->system_identifier;
+ NewLongPage->xlp_seg_size = XLogSegSize;
+ NewPage->xlp_info |= XLP_LONG_HEADER;
+ Insert->currpos = ((char *) NewPage) + SizeOfXLogLongPHD;
}
return update_needed;
}
-/*
- * Fill the remainder of the current XLOG page with an XLOG_WASTED_SPACE
- * record, and advance to the next page. This has the same calling and
- * result conditions as AdvanceXLInsertBuffer, except that
- * AdvanceXLInsertBuffer expects the current page to be already filled.
- */
-static bool
-WasteXLInsertBuffer(void)
-{
- XLogCtlInsert *Insert = &XLogCtl->Insert;
- XLogRecord *record;
- XLogRecPtr RecPtr;
- uint32 freespace;
- uint16 curridx;
- crc64 rdata_crc;
-
- freespace = INSERT_FREESPACE(Insert);
- Assert(freespace >= SizeOfXLogRecord);
- freespace -= SizeOfXLogRecord;
-
- curridx = Insert->curridx;
- record = (XLogRecord *) Insert->currpos;
-
- record->xl_prev = Insert->PrevRecord;
- record->xl_xact_prev.xlogid = 0;
- record->xl_xact_prev.xrecoff = 0;
-
- record->xl_xid = InvalidTransactionId;
- record->xl_len = freespace;
- record->xl_info = XLOG_WASTED_SPACE;
- record->xl_rmid = RM_XLOG_ID;
-
- INIT_CRC64(rdata_crc);
- COMP_CRC64(rdata_crc, XLogRecGetData(record), freespace);
- COMP_CRC64(rdata_crc, (char *) record + sizeof(crc64),
- SizeOfXLogRecord - sizeof(crc64));
- FIN_CRC64(rdata_crc);
- record->xl_crc = rdata_crc;
-
- /* Compute record's XLOG location */
- INSERT_RECPTR(RecPtr, Insert, curridx);
-
- /* Record begin of record in appropriate places */
- Insert->PrevRecord = RecPtr;
-
- /* We needn't bother to advance Insert->currpos */
-
- return AdvanceXLInsertBuffer();
-}
-
/*
* Write and/or fsync the log at least as far as WriteRqst indicates.
*
if (openLogFile < 0)
{
XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
- openLogFile = XLogFileOpen(openLogId, openLogSeg, false);
+ openLogFile = XLogFileOpen(openLogId, openLogSeg);
openLogOff = 0;
}
if (openLogFile < 0)
{
XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
- openLogFile = XLogFileOpen(openLogId, openLogSeg, false);
+ openLogFile = XLogFileOpen(openLogId, openLogSeg);
openLogOff = 0;
}
issue_xlog_fsync();
int fd;
int nbytes;
- XLogFilePath(path, log, seg);
+ XLogFilePath(path, ThisTimeLineID, log, seg);
/*
* Try to use existent file (checkpoint maker may have created it
return (fd);
}
+/*
+ * Create a new XLOG file segment by copying a pre-existing one.
+ *
+ * log, seg: identify segment to be created.
+ *
+ * srcTLI, srclog, srcseg: identify segment to be copied (could be from
+ * a different timeline)
+ *
+ * Currently this is only used during recovery, and so there are no locking
+ * considerations. But we should be just as tense as XLogFileInit to avoid
+ * emplacing a bogus file.
+ */
+static void
+XLogFileCopy(uint32 log, uint32 seg,
+ TimeLineID srcTLI, uint32 srclog, uint32 srcseg)
+{
+ char path[MAXPGPATH];
+ char tmppath[MAXPGPATH];
+ char buffer[BLCKSZ];
+ int srcfd;
+ int fd;
+ int nbytes;
+
+ /*
+ * Open the source file
+ */
+ XLogFilePath(path, srcTLI, srclog, srcseg);
+ srcfd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
+ if (srcfd < 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\": %m", path)));
+
+ /*
+ * Copy into a temp file name.
+ */
+ snprintf(tmppath, MAXPGPATH, "%s/xlogtemp.%d",
+ XLogDir, (int) getpid());
+
+ unlink(tmppath);
+
+ /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
+ fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
+ S_IRUSR | S_IWUSR);
+ if (fd < 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not create file \"%s\": %m", tmppath)));
+
+ /*
+ * Do the data copying.
+ */
+ for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(buffer))
+ {
+ errno = 0;
+ if ((int) read(srcfd, buffer, sizeof(buffer)) != (int) sizeof(buffer))
+ {
+ if (errno != 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not read file \"%s\": %m", path)));
+ else
+ ereport(PANIC,
+ (errmsg("insufficient data in file \"%s\"", path)));
+ }
+ errno = 0;
+ if ((int) write(fd, buffer, sizeof(buffer)) != (int) sizeof(buffer))
+ {
+ int save_errno = errno;
+
+ /*
+ * If we fail to make the file, delete it to release disk
+ * space
+ */
+ unlink(tmppath);
+ /* if write didn't set errno, assume problem is no disk space */
+ errno = save_errno ? save_errno : ENOSPC;
+
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not write to file \"%s\": %m", tmppath)));
+ }
+ }
+
+ if (pg_fsync(fd) != 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not fsync file \"%s\": %m", tmppath)));
+
+ if (close(fd))
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not close file \"%s\": %m", tmppath)));
+
+ close(srcfd);
+
+ /*
+ * Now move the segment into place with its final name.
+ */
+ if (!InstallXLogFileSegment(log, seg, tmppath, false, 0, false))
+ elog(PANIC, "InstallXLogFileSegment should not have failed");
+}
+
/*
* Install a new XLOG segment file as a current or future log segment.
*
char path[MAXPGPATH];
struct stat stat_buf;
- XLogFilePath(path, log, seg);
+ XLogFilePath(path, ThisTimeLineID, log, seg);
/*
* We want to be sure that only one process does this at a time.
return false;
}
NextLogSeg(log, seg);
- XLogFilePath(path, log, seg);
+ XLogFilePath(path, ThisTimeLineID, log, seg);
}
}
}
/*
- * Open a pre-existing logfile segment.
+ * Open a pre-existing logfile segment for writing.
*/
static int
-XLogFileOpen(uint32 log, uint32 seg, bool econt)
+XLogFileOpen(uint32 log, uint32 seg)
{
char path[MAXPGPATH];
int fd;
- if (InArchiveRecovery)
- RestoreArchivedXLog(path, log, seg);
- else
- XLogFilePath(path, log, seg);
+ XLogFilePath(path, ThisTimeLineID, log, seg);
fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
S_IRUSR | S_IWUSR);
if (fd < 0)
- {
- if (econt && errno == ENOENT)
- {
- ereport(LOG,
- (errcode_for_file_access(),
- errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
- path, log, seg)));
- return (fd);
- }
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
path, log, seg)));
- }
+
+ return fd;
+}
+
+/*
+ * Open a logfile segment for reading (during recovery).
+ */
+static int
+XLogFileRead(uint32 log, uint32 seg, int emode)
+{
+ char path[MAXPGPATH];
+ char xlogfname[MAXFNAMELEN];
+ ListCell *cell;
+ int fd;
/*
- * XXX this is a pretty horrid hack. Remove after implementing timelines.
- *
- * if we switched back to local xlogs after having been
- * restoring from archive, we need to make sure that the
- * local files don't get removed by end-of-recovery checkpoint
- * in case we need to re-run the recovery
+ * Loop looking for a suitable timeline ID: we might need to
+ * read any of the timelines listed in expectedTLIs.
*
- * we want to copy these away as soon as possible, so set
- * the archive status flag to .ready for them
- * in case admin isn't cautious enough to have done this anyway
- *
- * XXX this is completely broken, because there is no guarantee this file
- * is actually complete and ready to be archived. Also, what if there's
- * a .done file for them?
+ * We expect curFileTLI on entry to be the TLI of the preceding file
+ * in sequence, or 0 if there was no predecessor. We do not allow
+ * curFileTLI to go backwards; this prevents us from picking up the
+ * wrong file when a parent timeline extends to higher segment numbers
+ * than the child we want to read.
*/
- if (InArchiveRecovery && !restoredFromArchive)
- XLogArchiveNotifySeg(log, seg);
+ foreach(cell, expectedTLIs)
+ {
+ TimeLineID tli = (TimeLineID) lfirst_int(cell);
- return (fd);
+ if (tli < curFileTLI)
+ break; /* don't bother looking at too-old TLIs */
+
+ if (InArchiveRecovery)
+ {
+ XLogFileName(xlogfname, tli, log, seg);
+ restoredFromArchive = RestoreArchivedFile(path, xlogfname,
+ "RECOVERYXLOG");
+ }
+ else
+ XLogFilePath(path, tli, log, seg);
+
+ fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
+ if (fd >= 0)
+ {
+ /* Success! */
+ curFileTLI = tli;
+ return fd;
+ }
+ if (errno != ENOENT) /* unexpected failure? */
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
+ path, log, seg)));
+ }
+
+ /* Couldn't find it. For simplicity, complain about front timeline */
+ XLogFilePath(path, recoveryTargetTLI, log, seg);
+ errno = ENOENT;
+ ereport(emode,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
+ path, log, seg)));
+ return -1;
}
/*
- * Get next logfile segment when using off-line archive for recovery
- *
- * Attempt to retrieve the specified segment from off-line archival storage.
+ * Attempt to retrieve the specified file from off-line archival storage.
* If successful, fill "path" with its complete path (note that this will be
- * a temp file name that doesn't follow the normal naming convention).
+ * a temp file name that doesn't follow the normal naming convention), and
+ * return TRUE.
*
- * If not successful, fill "path" with the name of the normal on-line segment
- * file (which may or may not actually exist, but we'll try to use it).
+ * If not successful, fill "path" with the name of the normal on-line file
+ * (which may or may not actually exist, but we'll try to use it), and return
+ * FALSE.
*/
-static void
-RestoreArchivedXLog(char *path, uint32 log, uint32 seg)
+static bool
+RestoreArchivedFile(char *path, const char *xlogfname,
+ const char *recovername)
{
- char xlogfname[MAXFNAMELEN];
char xlogpath[MAXPGPATH];
char xlogRestoreCmd[MAXPGPATH];
char *dp;
* The copy-from-archive filename is always the same, ensuring that we
* don't run out of disk space on long recoveries.
*/
- XLogFileName(xlogfname, log, seg);
- snprintf(xlogpath, MAXPGPATH, "%s/RECOVERYXLOG", XLogDir);
+ snprintf(xlogpath, MAXPGPATH, "%s/%s", XLogDir, recovername);
/*
- * Make sure there is no existing RECOVERYXLOG file.
+ * Make sure there is no existing file named recovername.
*/
if (stat(xlogpath, &stat_buf) != 0)
{
(errmsg("restored log file \"%s\" from archive",
xlogfname)));
strcpy(path, xlogpath);
- restoredFromArchive = true;
- return;
+ return true;
}
if (errno != ENOENT)
ereport(FATAL,
* In many recovery scenarios we expect this to fail also, but
* if so that just means we've reached the end of WAL.
*/
- XLogFilePath(path, log, seg);
- restoredFromArchive = false;
+ snprintf(path, MAXPGPATH, "%s/%s", XLogDir, xlogfname);
+ return false;
}
/*
errmsg("could not open transaction log directory \"%s\": %m",
XLogDir)));
- XLogFileName(lastoff, log, seg);
+ XLogFileName(lastoff, ThisTimeLineID, log, seg);
errno = 0;
while ((xlde = readdir(xldir)) != NULL)
{
/*
- * use the alphanumeric sorting property of the filenames to decide
- * which ones are earlier than the lastoff segment
+ * We ignore the timeline part of the XLOG segment identifiers in
+ * deciding whether a segment is still needed. This ensures that
+ * we won't prematurely remove a segment from a parent timeline.
+ * We could probably be a little more proactive about removing
+ * segments of non-parent timelines, but that would be a whole lot
+ * more complicated.
+ *
+ * We use the alphanumeric sorting property of the filenames to decide
+ * which ones are earlier than the lastoff segment.
*/
- if (strlen(xlde->d_name) == 16 &&
- strspn(xlde->d_name, "0123456789ABCDEF") == 16 &&
- strcmp(xlde->d_name, lastoff) <= 0)
+ if (strlen(xlde->d_name) == 24 &&
+ strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
+ strcmp(xlde->d_name + 8, lastoff + 8) <= 0)
{
bool recycle;
page = (Page) BufferGetPage(buffer);
memcpy((char *) page, blk, BLCKSZ);
PageSetLSN(page, lsn);
- PageSetSUI(page, ThisStartUpID);
+ PageSetTLI(page, ThisTimeLineID);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
WriteBuffer(buffer);
}
{
XLogRecord *record;
XLogRecPtr tmpRecPtr = EndRecPtr;
+ bool randAccess = false;
uint32 len,
total_len;
uint32 targetPageOff;
+ uint32 targetRecOff;
+ uint32 pageHeaderSize;
unsigned i;
- bool nextmode = false;
if (readBuf == NULL)
{
if (RecPtr == NULL)
{
RecPtr = &tmpRecPtr;
- nextmode = true;
/* fast case if next record is on same page */
if (nextRecord != NULL)
{
(tmpRecPtr.xlogid)++;
tmpRecPtr.xrecoff = 0;
}
- tmpRecPtr.xrecoff += SizeOfXLogPHD;
+ /* We will account for page header size below */
+ }
+ else
+ {
+ if (!XRecOffIsValid(RecPtr->xrecoff))
+ ereport(PANIC,
+ (errmsg("invalid record offset at %X/%X",
+ RecPtr->xlogid, RecPtr->xrecoff)));
+ /*
+ * Since we are going to a random position in WAL, forget any
+ * prior state about what timeline we were in, and allow it
+ * to be any timeline in expectedTLIs. We also set a flag to
+ * allow curFileTLI to go backwards (but we can't reset that
+ * variable right here, since we might not change files at all).
+ */
+ lastPageTLI = 0; /* see comment in ValidXLOGHeader */
+ randAccess = true; /* allow curFileTLI to go backwards too */
}
- else if (!XRecOffIsValid(RecPtr->xrecoff))
- ereport(PANIC,
- (errmsg("invalid record offset at %X/%X",
- RecPtr->xlogid, RecPtr->xrecoff)));
if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
{
XLByteToSeg(*RecPtr, readId, readSeg);
if (readFile < 0)
{
- readFile = XLogFileOpen(readId, readSeg, (emode == LOG));
+ /* Now it's okay to reset curFileTLI if random fetch */
+ if (randAccess)
+ curFileTLI = 0;
+
+ readFile = XLogFileRead(readId, readSeg, emode);
if (readFile < 0)
goto next_record_is_invalid;
readOff = (uint32) (-1); /* force read to occur below */
readId, readSeg, readOff)));
goto next_record_is_invalid;
}
- if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode, nextmode))
+ if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
goto next_record_is_invalid;
}
+ pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
+ targetRecOff = RecPtr->xrecoff % BLCKSZ;
+ if (targetRecOff == 0)
+ {
+ /*
+ * Can only get here in the continuing-from-prev-page case, because
+ * XRecOffIsValid eliminated the zero-page-offset case otherwise.
+ * Need to skip over the new page's header.
+ */
+ tmpRecPtr.xrecoff += pageHeaderSize;
+ targetRecOff = pageHeaderSize;
+ }
+ else if (targetRecOff < pageHeaderSize)
+ {
+ ereport(emode,
+ (errmsg("invalid record offset at %X/%X",
+ RecPtr->xlogid, RecPtr->xrecoff)));
+ goto next_record_is_invalid;
+ }
if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
- RecPtr->xrecoff % BLCKSZ == SizeOfXLogPHD)
+ targetRecOff == pageHeaderSize)
{
ereport(emode,
(errmsg("contrecord is requested by %X/%X",
close(readFile);
readFile = -1;
NextLogSeg(readId, readSeg);
- readFile = XLogFileOpen(readId, readSeg, (emode == LOG));
+ readFile = XLogFileRead(readId, readSeg, emode);
if (readFile < 0)
goto next_record_is_invalid;
readOff = 0;
readId, readSeg, readOff)));
goto next_record_is_invalid;
}
- if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode, true))
+ if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
goto next_record_is_invalid;
if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD))
{
readId, readSeg, readOff)));
goto next_record_is_invalid;
}
- contrecord = (XLogContRecord *) ((char *) readBuf + SizeOfXLogPHD);
+ pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
+ contrecord = (XLogContRecord *) ((char *) readBuf + pageHeaderSize);
if (contrecord->xl_rem_len == 0 ||
total_len != (contrecord->xl_rem_len + gotlen))
{
readId, readSeg, readOff)));
goto next_record_is_invalid;
}
- len = BLCKSZ - SizeOfXLogPHD - SizeOfXLogContRecord;
+ len = BLCKSZ - pageHeaderSize - SizeOfXLogContRecord;
if (contrecord->xl_rem_len > len)
{
memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord, len);
}
if (!RecordIsValid(record, *RecPtr, emode))
goto next_record_is_invalid;
- if (BLCKSZ - SizeOfXLogRecord >= SizeOfXLogPHD +
+ pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
+ if (BLCKSZ - SizeOfXLogRecord >= pageHeaderSize +
SizeOfXLogContRecord + MAXALIGN(contrecord->xl_rem_len))
{
nextRecord = (XLogRecord *) ((char *) contrecord +
}
EndRecPtr.xlogid = readId;
EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff +
- SizeOfXLogPHD + SizeOfXLogContRecord +
+ pageHeaderSize + SizeOfXLogContRecord +
MAXALIGN(contrecord->xl_rem_len);
ReadRecPtr = *RecPtr;
return record;
* ReadRecord. It's not intended for use from anywhere else.
*/
static bool
-ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI)
+ValidXLOGHeader(XLogPageHeader hdr, int emode)
{
XLogRecPtr recaddr;
hdr->xlp_info, readId, readSeg, readOff)));
return false;
}
- recaddr.xlogid = readId;
- recaddr.xrecoff = readSeg * XLogSegSize + readOff;
- if (!XLByteEQ(hdr->xlp_pageaddr, recaddr))
+ if (hdr->xlp_info & XLP_LONG_HEADER)
{
- ereport(emode,
- (errmsg("unexpected pageaddr %X/%X in log file %u, segment %u, offset %u",
- hdr->xlp_pageaddr.xlogid, hdr->xlp_pageaddr.xrecoff,
- readId, readSeg, readOff)));
- return false;
- }
+ XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
- /*
- * We disbelieve a SUI less than the previous page's SUI, or more than
- * a few counts greater. In theory as many as 512 shutdown checkpoint
- * records could appear on a 32K-sized xlog page, so that's the most
- * differential there could legitimately be.
- *
- * Note this check can only be applied when we are reading the next page
- * in sequence, so ReadRecord passes a flag indicating whether to
- * check.
- */
- if (checkSUI)
- {
- if (hdr->xlp_sui < lastReadSUI ||
- hdr->xlp_sui > lastReadSUI + 512)
+ if (longhdr->xlp_sysid != ControlFile->system_identifier)
{
- ereport(emode,
- /* translator: SUI = startup id */
- (errmsg("out-of-sequence SUI %u (after %u) in log file %u, segment %u, offset %u",
- hdr->xlp_sui, lastReadSUI,
- readId, readSeg, readOff)));
- return false;
- }
- }
- lastReadSUI = hdr->xlp_sui;
- return true;
-}
+ char fhdrident_str[32];
+ char sysident_str[32];
-/*
- * I/O routines for pg_control
+ /*
+ * Format sysids separately to keep platform-dependent format
+ * code out of the translatable message string.
+ */
+ snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT,
+ longhdr->xlp_sysid);
+ snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT,
+ ControlFile->system_identifier);
+ ereport(emode,
+ (errmsg("WAL file is from different system"),
+ errdetail("WAL file SYSID is %s, pg_control SYSID is %s",
+ fhdrident_str, sysident_str)));
+ return false;
+ }
+ if (longhdr->xlp_seg_size != XLogSegSize)
+ {
+ ereport(emode,
+ (errmsg("WAL file is from different system"),
+ errdetail("Incorrect XLOG_SEG_SIZE in page header.")));
+ return false;
+ }
+ }
+ recaddr.xlogid = readId;
+ recaddr.xrecoff = readSeg * XLogSegSize + readOff;
+ if (!XLByteEQ(hdr->xlp_pageaddr, recaddr))
+ {
+ ereport(emode,
+ (errmsg("unexpected pageaddr %X/%X in log file %u, segment %u, offset %u",
+ hdr->xlp_pageaddr.xlogid, hdr->xlp_pageaddr.xrecoff,
+ readId, readSeg, readOff)));
+ return false;
+ }
+
+ /*
+ * Check page TLI is one of the expected values.
+ */
+ if (!list_member_int(expectedTLIs, (int) hdr->xlp_tli))
+ {
+ ereport(emode,
+ (errmsg("unexpected timeline ID %u in log file %u, segment %u, offset %u",
+ hdr->xlp_tli,
+ readId, readSeg, readOff)));
+ return false;
+ }
+
+ /*
+ * Since child timelines are always assigned a TLI greater than their
+ * immediate parent's TLI, we should never see TLI go backwards across
+ * successive pages of a consistent WAL sequence.
+ *
+ * Of course this check should only be applied when advancing sequentially
+ * across pages; therefore ReadRecord resets lastPageTLI to zero when
+ * going to a random page.
+ */
+ if (hdr->xlp_tli < lastPageTLI)
+ {
+ ereport(emode,
+ (errmsg("out-of-sequence timeline ID %u (after %u) in log file %u, segment %u, offset %u",
+ hdr->xlp_tli, lastPageTLI,
+ readId, readSeg, readOff)));
+ return false;
+ }
+ lastPageTLI = hdr->xlp_tli;
+ return true;
+}
+
+/*
+ * Try to read a timeline's history file.
+ *
+ * If successful, return the list of component TLIs (the given TLI followed by
+ * its ancestor TLIs). If we can't find the history file, assume that the
+ * timeline has no parents, and return a list of just the specified timeline
+ * ID.
+ */
+static List *
+readTimeLineHistory(TimeLineID targetTLI)
+{
+ List *result;
+ char path[MAXPGPATH];
+ char histfname[MAXFNAMELEN];
+ char fline[MAXPGPATH];
+ FILE *fd;
+
+ if (InArchiveRecovery)
+ {
+ TLHistoryFileName(histfname, targetTLI);
+ RestoreArchivedFile(path, histfname, "RECOVERYHISTORY");
+ }
+ else
+ TLHistoryFilePath(path, targetTLI);
+
+ fd = AllocateFile(path, "r");
+ if (fd == NULL)
+ {
+ if (errno != ENOENT)
+ ereport(FATAL,
+ (errcode_for_file_access(),
+ errmsg("could not open \"%s\": %m", path)));
+ /* Not there, so assume no parents */
+ return list_make1_int((int) targetTLI);
+ }
+
+ result = NIL;
+
+ /*
+ * Parse the file...
+ */
+ while (fgets(fline, MAXPGPATH, fd) != NULL)
+ {
+ /* skip leading whitespace and check for # comment */
+ char *ptr;
+ char *endptr;
+ TimeLineID tli;
+
+ for (ptr = fline; *ptr; ptr++)
+ {
+ if (!isspace((unsigned char) *ptr))
+ break;
+ }
+ if (*ptr == '\0' || *ptr == '#')
+ continue;
+
+ /* expect a numeric timeline ID as first field of line */
+ tli = (TimeLineID) strtoul(ptr, &endptr, 0);
+ if (endptr == ptr)
+ ereport(FATAL,
+ (errmsg("syntax error in history file: %s", fline),
+ errhint("Expected a numeric timeline ID.")));
+
+ if (result &&
+ tli <= (TimeLineID) linitial_int(result))
+ ereport(FATAL,
+ (errmsg("invalid data in history file: %s", fline),
+ errhint("Timeline IDs must be in increasing sequence.")));
+
+ /* Build list with newest item first */
+ result = lcons_int((int) tli, result);
+
+ /* we ignore the remainder of each line */
+ }
+
+ FreeFile(fd);
+
+ if (result &&
+ targetTLI <= (TimeLineID) linitial_int(result))
+ ereport(FATAL,
+ (errmsg("invalid data in history file \"%s\"", path),
+ errhint("Timeline IDs must be less than child timeline's ID.")));
+
+ result = lcons_int((int) targetTLI, result);
+
+ ereport(DEBUG3,
+ (errmsg_internal("history of timeline %u is %s",
+ targetTLI, nodeToString(result))));
+
+ return result;
+}
+
+/*
+ * Probe whether a timeline history file exists for the given timeline ID
+ */
+static bool
+existsTimeLineHistory(TimeLineID probeTLI)
+{
+ char path[MAXPGPATH];
+ char histfname[MAXFNAMELEN];
+ FILE *fd;
+
+ if (InArchiveRecovery)
+ {
+ TLHistoryFileName(histfname, probeTLI);
+ RestoreArchivedFile(path, histfname, "RECOVERYHISTORY");
+ }
+ else
+ TLHistoryFilePath(path, probeTLI);
+
+ fd = AllocateFile(path, "r");
+ if (fd != NULL)
+ {
+ FreeFile(fd);
+ return true;
+ }
+ else
+ {
+ if (errno != ENOENT)
+ ereport(FATAL,
+ (errcode_for_file_access(),
+ errmsg("could not open \"%s\": %m", path)));
+ return false;
+ }
+}
+
+/*
+ * Find the newest existing timeline, assuming that startTLI exists.
+ *
+ * Note: while this is somewhat heuristic, it does positively guarantee
+ * that (result + 1) is not a known timeline, and therefore it should
+ * be safe to assign that ID to a new timeline.
+ */
+static TimeLineID
+findNewestTimeLine(TimeLineID startTLI)
+{
+ TimeLineID newestTLI;
+ TimeLineID probeTLI;
+
+ /*
+ * The algorithm is just to probe for the existence of timeline history
+ * files. XXX is it useful to allow gaps in the sequence?
+ */
+ newestTLI = startTLI;
+
+ for (probeTLI = startTLI + 1; ; probeTLI++)
+ {
+ if (existsTimeLineHistory(probeTLI))
+ {
+ newestTLI = probeTLI; /* probeTLI exists */
+ }
+ else
+ {
+ /* doesn't exist, assume we're done */
+ break;
+ }
+ }
+
+ return newestTLI;
+}
+
+/*
+ * Create a new timeline history file.
+ *
+ * newTLI: ID of the new timeline
+ * parentTLI: ID of its immediate parent
+ * endTLI et al: ID of the last used WAL file, for annotation purposes
+ *
+ * Currently this is only used during recovery, and so there are no locking
+ * considerations. But we should be just as tense as XLogFileInit to avoid
+ * emplacing a bogus file.
+ */
+static void
+writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
+ TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
+{
+ char path[MAXPGPATH];
+ char tmppath[MAXPGPATH];
+ char histfname[MAXFNAMELEN];
+ char xlogfname[MAXFNAMELEN];
+ char buffer[BLCKSZ];
+ int srcfd;
+ int fd;
+ int nbytes;
+
+ Assert(newTLI > parentTLI); /* else bad selection of newTLI */
+
+ /*
+ * Write into a temp file name.
+ */
+ snprintf(tmppath, MAXPGPATH, "%s/xlogtemp.%d",
+ XLogDir, (int) getpid());
+
+ unlink(tmppath);
+
+ /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
+ fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL,
+ S_IRUSR | S_IWUSR);
+ if (fd < 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not create file \"%s\": %m", tmppath)));
+
+ /*
+ * If a history file exists for the parent, copy it verbatim
+ */
+ if (InArchiveRecovery)
+ {
+ TLHistoryFileName(histfname, parentTLI);
+ RestoreArchivedFile(path, histfname, "RECOVERYHISTORY");
+ }
+ else
+ TLHistoryFilePath(path, parentTLI);
+
+ srcfd = BasicOpenFile(path, O_RDONLY, 0);
+ if (srcfd < 0)
+ {
+ if (errno != ENOENT)
+ ereport(FATAL,
+ (errcode_for_file_access(),
+ errmsg("could not open \"%s\": %m", path)));
+ /* Not there, so assume parent has no parents */
+ }
+ else
+ {
+ for (;;)
+ {
+ errno = 0;
+ nbytes = (int) read(srcfd, buffer, sizeof(buffer));
+ if (nbytes < 0 || errno != 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not read file \"%s\": %m", path)));
+ if (nbytes == 0)
+ break;
+ errno = 0;
+ if ((int) write(fd, buffer, nbytes) != nbytes)
+ {
+ int save_errno = errno;
+
+ /*
+ * If we fail to make the file, delete it to release disk
+ * space
+ */
+ unlink(tmppath);
+ /* if write didn't set errno, assume problem is no disk space */
+ errno = save_errno ? save_errno : ENOSPC;
+
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not write to file \"%s\": %m", tmppath)));
+ }
+ }
+ close(srcfd);
+ }
+
+ /*
+ * Append one line with the details of this timeline split.
+ *
+ * If we did have a parent file, insert an extra newline just in case
+ * the parent file failed to end with one.
+ */
+ XLogFileName(xlogfname, endTLI, endLogId, endLogSeg);
+
+ snprintf(buffer, sizeof(buffer),
+ "%s%u\t%s\t%s transaction %u at %s\n",
+ (srcfd < 0) ? "" : "\n",
+ parentTLI,
+ xlogfname,
+ recoveryStopAfter ? "after" : "before",
+ recoveryStopXid,
+ str_time(recoveryStopTime));
+
+ nbytes = strlen(buffer);
+ errno = 0;
+ if ((int) write(fd, buffer, nbytes) != nbytes)
+ {
+ int save_errno = errno;
+
+ /*
+ * If we fail to make the file, delete it to release disk
+ * space
+ */
+ unlink(tmppath);
+ /* if write didn't set errno, assume problem is no disk space */
+ errno = save_errno ? save_errno : ENOSPC;
+
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not write to file \"%s\": %m", tmppath)));
+ }
+
+ if (pg_fsync(fd) != 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not fsync file \"%s\": %m", tmppath)));
+
+ if (close(fd))
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not close file \"%s\": %m", tmppath)));
+
+
+ /*
+ * Now move the completed history file into place with its final name.
+ */
+ TLHistoryFilePath(path, newTLI);
+
+ /*
+ * Prefer link() to rename() here just to be really sure that we don't
+ * overwrite an existing logfile. However, there shouldn't be one, so
+ * rename() is an acceptable substitute except for the truly paranoid.
+ */
+#if HAVE_WORKING_LINK
+ if (link(tmppath, path) < 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not link file \"%s\" to \"%s\": %m",
+ tmppath, path)));
+ unlink(tmppath);
+#else
+ if (rename(tmppath, path) < 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not rename file \"%s\" to \"%s\": %m",
+ tmppath, path)));
+#endif
+
+ /* The history file can be archived immediately. */
+ TLHistoryFileName(histfname, newTLI);
+ XLogArchiveNotify(histfname);
+}
+
+/*
+ * I/O routines for pg_control
*
* *ControlFile is a buffer in shared memory that holds an image of the
* contents of pg_control. WriteControlFile() initializes pg_control
CheckPoint checkPoint;
char *buffer;
XLogPageHeader page;
+ XLogLongPageHeader longpage;
XLogRecord *record;
- XLogFileHeaderData *fhdr;
bool use_existent;
uint64 sysidentifier;
struct timeval tv;
sysidentifier = ((uint64) tv.tv_sec) << 32;
sysidentifier |= (uint32) (tv.tv_sec | tv.tv_usec);
+ /* First timeline ID is always 1 */
+ ThisTimeLineID = 1;
+
/* Use malloc() to ensure buffer is MAXALIGNED */
buffer = (char *) malloc(BLCKSZ);
page = (XLogPageHeader) buffer;
/* Set up information for the initial checkpoint record */
checkPoint.redo.xlogid = 0;
- checkPoint.redo.xrecoff = SizeOfXLogPHD + SizeOfXLogRecord + SizeOfXLogFHD;
+ checkPoint.redo.xrecoff = SizeOfXLogLongPHD;
checkPoint.undo = checkPoint.redo;
- checkPoint.ThisStartUpID = 0;
+ checkPoint.ThisTimeLineID = ThisTimeLineID;
checkPoint.nextXid = FirstNormalTransactionId;
checkPoint.nextOid = BootstrapObjectIdData;
checkPoint.time = time(NULL);
/* Set up the XLOG page header */
page->xlp_magic = XLOG_PAGE_MAGIC;
- page->xlp_info = 0;
- page->xlp_sui = checkPoint.ThisStartUpID;
+ page->xlp_info = XLP_LONG_HEADER;
+ page->xlp_tli = ThisTimeLineID;
page->xlp_pageaddr.xlogid = 0;
page->xlp_pageaddr.xrecoff = 0;
-
- /* Insert the file header record */
- record = (XLogRecord *) ((char *) page + SizeOfXLogPHD);
- record->xl_prev.xlogid = 0;
- record->xl_prev.xrecoff = 0;
- record->xl_xact_prev.xlogid = 0;
- record->xl_xact_prev.xrecoff = 0;
- record->xl_xid = InvalidTransactionId;
- record->xl_len = SizeOfXLogFHD;
- record->xl_info = XLOG_FILE_HEADER;
- record->xl_rmid = RM_XLOG_ID;
- fhdr = (XLogFileHeaderData *) XLogRecGetData(record);
- fhdr->xlfhd_sysid = sysidentifier;
- fhdr->xlfhd_xlogid = 0;
- fhdr->xlfhd_segno = 0;
- fhdr->xlfhd_seg_size = XLogSegSize;
-
- INIT_CRC64(crc);
- COMP_CRC64(crc, fhdr, SizeOfXLogFHD);
- COMP_CRC64(crc, (char *) record + sizeof(crc64),
- SizeOfXLogRecord - sizeof(crc64));
- FIN_CRC64(crc);
- record->xl_crc = crc;
+ longpage = (XLogLongPageHeader) page;
+ longpage->xlp_sysid = sysidentifier;
+ longpage->xlp_seg_size = XLogSegSize;
/* Insert the initial checkpoint record */
- record = (XLogRecord *) ((char *) page + SizeOfXLogPHD + SizeOfXLogRecord + SizeOfXLogFHD);
+ record = (XLogRecord *) ((char *) page + SizeOfXLogLongPHD);
record->xl_prev.xlogid = 0;
- record->xl_prev.xrecoff = SizeOfXLogPHD;
+ record->xl_prev.xrecoff = 0;
record->xl_xact_prev.xlogid = 0;
record->xl_xact_prev.xrecoff = 0;
record->xl_xid = InvalidTransactionId;
use_existent = false;
openLogFile = XLogFileInit(0, 0, &use_existent, false);
- /* Write the first page with the initial records */
+ /* Write the first page with the initial record */
errno = 0;
if (write(openLogFile, buffer, BLCKSZ) != BLCKSZ)
{
char recoveryCommandFile[MAXPGPATH];
FILE *fd;
char cmdline[MAXPGPATH];
+ TimeLineID rtli = 0;
+ bool rtliGiven = false;
bool syntaxError = false;
snprintf(recoveryCommandFile, MAXPGPATH, "%s/recovery.conf", DataDir);
}
if (strcmp(tok1,"restore_command") == 0) {
- StrNCpy(recoveryRestoreCommand, tok2, MAXPGPATH);
+ recoveryRestoreCommand = pstrdup(tok2);
ereport(LOG,
(errmsg("restore_command = \"%s\"",
recoveryRestoreCommand)));
}
+ else if (strcmp(tok1,"recovery_target_timeline") == 0) {
+ rtliGiven = true;
+ if (strcmp(tok2, "latest") == 0)
+ rtli = 0;
+ else
+ {
+ errno = 0;
+ rtli = (TimeLineID) strtoul(tok2, NULL, 0);
+ if (errno == EINVAL || errno == ERANGE)
+ ereport(FATAL,
+ (errmsg("recovery_target_timeline is not a valid number: \"%s\"",
+ tok2)));
+ }
+ if (rtli)
+ ereport(LOG,
+ (errmsg("recovery_target_timeline = %u", rtli)));
+ else
+ ereport(LOG,
+ (errmsg("recovery_target_timeline = latest")));
+ }
else if (strcmp(tok1,"recovery_target_xid") == 0) {
errno = 0;
recoveryTargetXid = (TransactionId) strtoul(tok2, NULL, 0);
errhint("Lines should have the format parameter = 'value'.")));
/* Check that required parameters were supplied */
- if (recoveryRestoreCommand[0] == '\0')
+ if (recoveryRestoreCommand == NULL)
ereport(FATAL,
(errmsg("recovery command file \"%s\" did not specify restore_command",
recoveryCommandFile)));
+ /* Enable fetching from archive recovery area */
+ InArchiveRecovery = true;
+
/*
- * clearly indicate our state
+ * If user specified recovery_target_timeline, validate it or compute the
+ * "latest" value. We can't do this until after we've gotten the restore
+ * command and set InArchiveRecovery, because we need to fetch timeline
+ * history files from the archive.
*/
- InArchiveRecovery = true;
+ if (rtliGiven)
+ {
+ if (rtli)
+ {
+ /* Timeline 1 does not have a history file, all else should */
+ if (rtli != 1 && !existsTimeLineHistory(rtli))
+ ereport(FATAL,
+ (errmsg("recovery_target_timeline %u does not exist",
+ rtli)));
+ recoveryTargetTLI = rtli;
+ }
+ else
+ {
+ /* We start the "latest" search from pg_control's timeline */
+ recoveryTargetTLI = findNewestTimeLine(recoveryTargetTLI);
+ }
+ }
}
/*
* Exit archive-recovery state
*/
static void
-exitArchiveRecovery(uint32 endLogId, uint32 endLogSeg, uint32 xrecoff)
+exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
{
char recoveryPath[MAXPGPATH];
char xlogpath[MAXPGPATH];
char recoveryCommandDone[MAXPGPATH];
/*
- * Disable fetches from archive, so we can use XLogFileOpen below.
+ * We are no longer in archive recovery state.
*/
InArchiveRecovery = false;
* more descriptive of what our current database state is, because that
* is what we replayed from.
*
- * XXX there ought to be a timeline increment somewhere around here.
+ * Note that if we are establishing a new timeline, ThisTimeLineID is
+ * already set to the new value, and so we will create a new file instead
+ * of overwriting any existing file.
*/
snprintf(recoveryPath, MAXPGPATH, "%s/RECOVERYXLOG", XLogDir);
- XLogFilePath(xlogpath, endLogId, endLogSeg);
+ XLogFilePath(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
if (restoredFromArchive)
{
* RECOVERYXLOG laying about, get rid of it.
*/
unlink(recoveryPath); /* ignore any error */
+ /*
+ * If we are establishing a new timeline, we have to copy data
+ * from the last WAL segment of the old timeline to create a
+ * starting WAL segment for the new timeline.
+ */
+ if (endTLI != ThisTimeLineID)
+ XLogFileCopy(endLogId, endLogSeg,
+ endTLI, endLogId, endLogSeg);
}
/*
- * If we restored to a point-in-time, then the current WAL segment
- * probably contains records beyond the stop point. These represent an
- * extreme hazard: if we crash in the near future, the replay apparatus
- * will know no reason why it shouldn't replay them. Therefore,
- * explicitly zero out all the remaining pages of the segment. (We need
- * not worry about the partial page in which the last record ends, since
- * StartUpXlog will handle zeroing that. Also, there's nothing to do
- * if we are right at a segment boundary.)
- *
- * XXX segment files beyond thhe current one also represent a hazard
- * for the same reason. Need to invent timelines to fix this.
+ * Let's just make real sure there are not .ready or .done flags posted
+ * for the new segment.
*/
+ XLogFileName(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
+ XLogArchiveCleanup(xlogpath);
- /* align xrecoff to next page, then drop segment part */
- if (xrecoff % BLCKSZ != 0)
- xrecoff += (BLCKSZ - xrecoff % BLCKSZ);
- xrecoff %= XLogSegSize;
-
- if (recoveryTarget && xrecoff != 0)
- {
- int fd;
- char zbuffer[BLCKSZ];
-
- fd = XLogFileOpen(endLogId, endLogSeg, false);
- MemSet(zbuffer, 0, sizeof(zbuffer));
- if (lseek(fd, (off_t) xrecoff, SEEK_SET) < 0)
- ereport(PANIC,
- (errcode_for_file_access(),
- errmsg("could not seek in file \"%s\": %m",
- xlogpath)));
- for (; xrecoff < XLogSegSize; xrecoff += sizeof(zbuffer))
- {
- errno = 0;
- if ((int) write(fd, zbuffer, sizeof(zbuffer)) != (int) sizeof(zbuffer))
- {
- /* if write didn't set errno, assume problem is no disk space */
- if (errno == 0)
- errno = ENOSPC;
- ereport(PANIC,
- (errcode_for_file_access(),
- errmsg("could not write to file \"%s\": %m", xlogpath)));
- }
- }
- if (pg_fsync(fd) != 0)
- ereport(PANIC,
- (errcode_for_file_access(),
- errmsg("could not fsync file \"%s\": %m", xlogpath)));
- if (close(fd))
- ereport(PANIC,
- (errcode_for_file_access(),
- errmsg("could not close file \"%s\": %m", xlogpath)));
- }
+ /* Get rid of any remaining recovered timeline-history file, too */
+ snprintf(recoveryPath, MAXPGPATH, "%s/RECOVERYHISTORY", XLogDir);
+ unlink(recoveryPath); /* ignore any error */
/*
* Rename the config file out of the way, so that we don't accidentally
*
* Returns TRUE if we are stopping, FALSE otherwise. On TRUE return,
* *includeThis is set TRUE if we should apply this record before stopping.
+ * Also, some information is saved in recoveryStopXid et al for use in
+ * annotating the new timeline's history file.
*/
static bool
recoveryStopsHere(XLogRecord *record, bool *includeThis)
if (stopsHere)
{
+ recoveryStopXid = record->xl_xid;
+ recoveryStopTime = recordXtime;
+ recoveryStopAfter = *includeThis;
+
if (record_info == XLOG_XACT_COMMIT)
{
- if (*includeThis)
+ if (recoveryStopAfter)
ereport(LOG,
(errmsg("recovery stopping after commit of transaction %u, time %s",
- record->xl_xid, str_time(recordXtime))));
+ recoveryStopXid, str_time(recoveryStopTime))));
else
ereport(LOG,
(errmsg("recovery stopping before commit of transaction %u, time %s",
- record->xl_xid, str_time(recordXtime))));
+ recoveryStopXid, str_time(recoveryStopTime))));
}
else
{
- if (*includeThis)
+ if (recoveryStopAfter)
ereport(LOG,
(errmsg("recovery stopping after abort of transaction %u, time %s",
- record->xl_xid, str_time(recordXtime))));
+ recoveryStopXid, str_time(recoveryStopTime))));
else
ereport(LOG,
(errmsg("recovery stopping before abort of transaction %u, time %s",
- record->xl_xid, str_time(recordXtime))));
+ recoveryStopXid, str_time(recoveryStopTime))));
}
}
XLogCtlInsert *Insert;
CheckPoint checkPoint;
bool wasShutdown;
+ bool needNewTimeLine = false;
XLogRecPtr RecPtr,
LastRec,
checkPointLoc,
pg_usleep(60000000L);
#endif
+ /*
+ * Initialize on the assumption we want to recover to the same timeline
+ * that's active according to pg_control.
+ */
+ recoveryTargetTLI = ControlFile->checkPointCopy.ThisTimeLineID;
+
/*
* Check for recovery control file, and if so set up state for
* offline recovery
*/
readRecoveryCommandFile();
+ /* Now we can determine the list of expected TLIs */
+ expectedTLIs = readTimeLineHistory(recoveryTargetTLI);
+
/*
* Get the last valid checkpoint record. If the latest one according
* to pg_control is broken, try the next-to-last one.
ShmemVariableCache->oidCount = 0;
/*
- * If it was a shutdown checkpoint, then any following WAL entries
- * were created under the next StartUpID; if it was a regular
- * checkpoint then any following WAL entries were created under the
- * same StartUpID. We must replay WAL entries using the same StartUpID
- * they were created under, so temporarily adopt that SUI (see also
- * xlog_redo()).
+ * We must replay WAL entries using the same TimeLineID they were created
+ * under, so temporarily adopt the TLI indicated by the checkpoint (see
+ * also xlog_redo()).
*/
- if (wasShutdown)
- ThisStartUpID = checkPoint.ThisStartUpID + 1;
- else
- ThisStartUpID = checkPoint.ThisStartUpID;
+ ThisTimeLineID = checkPoint.ThisTimeLineID;
RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
RmgrTable[rmid].rm_startup();
}
- /* Is REDO required ? */
+ /*
+ * Find the first record that logically follows the checkpoint ---
+ * it might physically precede it, though.
+ */
if (XLByteLT(checkPoint.redo, RecPtr))
+ {
+ /* back up to find the record */
record = ReadRecord(&(checkPoint.redo), PANIC, buffer);
+ }
else
{
- /* read past CheckPoint record */
+ /* just have to read next record after CheckPoint */
record = ReadRecord(NULL, LOG, buffer);
}
*/
if (recoveryStopsHere(record, &recoveryApply))
{
+ needNewTimeLine = true; /* see below */
recoveryContinue = false;
if (!recoveryApply)
break;
EndOfLog = EndRecPtr;
XLByteToPrevSeg(EndOfLog, endLogId, endLogSeg);
+ /*
+ * Consider whether we need to assign a new timeline ID.
+ *
+ * If we stopped short of the end of WAL during recovery, then we
+ * are generating a new timeline and must assign it a unique new ID.
+ * Otherwise, we can just extend the timeline we were in when we
+ * ran out of WAL.
+ */
+ if (needNewTimeLine)
+ {
+ ThisTimeLineID = findNewestTimeLine(recoveryTargetTLI) + 1;
+ ereport(LOG,
+ (errmsg("selected new timeline ID: %u", ThisTimeLineID)));
+ writeTimeLineHistory(ThisTimeLineID, recoveryTargetTLI,
+ curFileTLI, endLogId, endLogSeg);
+ }
+
+ /* Save the selected TimeLineID in shared memory, too */
+ XLogCtl->ThisTimeLineID = ThisTimeLineID;
+
/*
* We are now done reading the old WAL. Turn off archive fetching
* if it was active, and make a writable copy of the last WAL segment.
* readBuf; we will use that below.)
*/
if (InArchiveRecovery)
- exitArchiveRecovery(endLogId, endLogSeg, EndOfLog.xrecoff);
+ exitArchiveRecovery(curFileTLI, endLogId, endLogSeg);
/*
* Prepare to write WAL starting at EndOfLog position, and init xlog
*/
openLogId = endLogId;
openLogSeg = endLogSeg;
- openLogFile = XLogFileOpen(openLogId, openLogSeg, false);
+ openLogFile = XLogFileOpen(openLogId, openLogSeg);
openLogOff = 0;
ControlFile->logId = openLogId;
ControlFile->logSeg = openLogSeg + 1;
* XLogWrite()).
*
* Note: it might seem we should do AdvanceXLInsertBuffer() here, but
- * we can't since we haven't yet determined the correct StartUpID
- * to put into the new page's header. The first actual attempt to
- * insert a log record will advance the insert state.
+ * this is sufficient. The first actual attempt to insert a log
+ * record will advance the insert state.
*/
XLogCtl->Write.curridx = NextBufIdx(0);
}
RmgrTable[rmid].rm_cleanup();
}
- /*
- * At this point, ThisStartUpID is the largest SUI that we could
- * find evidence for in the WAL entries. But check it against
- * pg_control's latest checkpoint, to make sure that we can't
- * accidentally re-use an already-used SUI.
- */
- if (ThisStartUpID < ControlFile->checkPointCopy.ThisStartUpID)
- ThisStartUpID = ControlFile->checkPointCopy.ThisStartUpID;
-
/*
* Perform a new checkpoint to update our recovery activity to
* disk.
*
- * Note that we write a shutdown checkpoint. This is correct since
- * the records following it will use SUI one more than what is
- * shown in the checkpoint's ThisStartUpID.
+ * Note that we write a shutdown checkpoint rather than an on-line
+ * one. This is not particularly critical, but since we may be
+ * assigning a new TLI, using a shutdown checkpoint allows us to
+ * have the rule that TLI only changes in shutdown checkpoints,
+ * which allows some extra error checking in xlog_redo.
*
* In case we had to use the secondary checkpoint, make sure that it
* will still be shown as the secondary checkpoint after this
*/
XLogCloseRelationCache();
}
- else
- {
- /*
- * If we are not doing recovery, then we saw a checkpoint with
- * nothing after it, and we can safely use StartUpID equal to one
- * more than the checkpoint's SUI. But just for paranoia's sake,
- * check against pg_control too.
- */
- ThisStartUpID = checkPoint.ThisStartUpID;
- if (ThisStartUpID < ControlFile->checkPointCopy.ThisStartUpID)
- ThisStartUpID = ControlFile->checkPointCopy.ThisStartUpID;
- }
/*
* Preallocate additional log files, if wanted.
*/
PreallocXlogFiles(EndOfLog);
- /*
- * Advance StartUpID to one more than the highest value used
- * previously.
- */
- ThisStartUpID++;
- XLogCtl->ThisStartUpID = ThisStartUpID;
-
/*
* Okay, we're officially UP.
*/
/*
* This must be called during startup of a backend process, except that
* it need not be called in a standalone backend (which does StartupXLOG
- * instead). We need to initialize the local copies of ThisStartUpID and
+ * instead). We need to initialize the local copies of ThisTimeLineID and
* RedoRecPtr.
*
* Note: before Postgres 7.5, we went to some effort to keep the postmaster
- * process's copies of ThisStartUpID and RedoRecPtr valid too. This was
+ * process's copies of ThisTimeLineID and RedoRecPtr valid too. This was
* unnecessary however, since the postmaster itself never touches XLOG anyway.
*/
void
InitXLOGAccess(void)
{
- /* ThisStartUpID doesn't change so we need no lock to copy it */
- ThisStartUpID = XLogCtl->ThisStartUpID;
+ /* ThisTimeLineID doesn't change so we need no lock to copy it */
+ ThisTimeLineID = XLogCtl->ThisTimeLineID;
/* Use GetRedoRecPtr to copy the RedoRecPtr safely */
(void) GetRedoRecPtr();
}
}
MemSet(&checkPoint, 0, sizeof(checkPoint));
- checkPoint.ThisStartUpID = ThisStartUpID;
+ checkPoint.ThisTimeLineID = ThisTimeLineID;
checkPoint.time = time(NULL);
LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
ShmemVariableCache->nextXid = checkPoint.nextXid;
ShmemVariableCache->nextOid = checkPoint.nextOid;
ShmemVariableCache->oidCount = 0;
- /* Any later WAL records should be run with shutdown SUI plus 1 */
- ThisStartUpID = checkPoint.ThisStartUpID + 1;
+ /*
+ * TLI may change in a shutdown checkpoint, but it shouldn't decrease
+ */
+ if (checkPoint.ThisTimeLineID != ThisTimeLineID)
+ {
+ if (checkPoint.ThisTimeLineID < ThisTimeLineID ||
+ !list_member_int(expectedTLIs,
+ (int) checkPoint.ThisTimeLineID))
+ ereport(PANIC,
+ (errmsg("unexpected timeline ID %u (after %u) in checkpoint record",
+ checkPoint.ThisTimeLineID, ThisTimeLineID)));
+ /* Following WAL records should be run with new TLI */
+ ThisTimeLineID = checkPoint.ThisTimeLineID;
+ }
}
else if (info == XLOG_CHECKPOINT_ONLINE)
{
ShmemVariableCache->nextOid = checkPoint.nextOid;
ShmemVariableCache->oidCount = 0;
}
- /* Any later WAL records should be run with the then-active SUI */
- ThisStartUpID = checkPoint.ThisStartUpID;
- }
- else if (info == XLOG_FILE_HEADER)
- {
- XLogFileHeaderData fhdr;
-
- memcpy(&fhdr, XLogRecGetData(record), sizeof(XLogFileHeaderData));
- if (fhdr.xlfhd_sysid != ControlFile->system_identifier)
- {
- char fhdrident_str[32];
- char sysident_str[32];
-
- /*
- * Format sysids separately to keep platform-dependent format
- * code out of the translatable message string.
- */
- snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT,
- fhdr.xlfhd_sysid);
- snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT,
- ControlFile->system_identifier);
- ereport(PANIC,
- (errmsg("WAL file is from different system"),
- errdetail("WAL file SYSID is %s, pg_control SYSID is %s",
- fhdrident_str, sysident_str)));
- }
- if (fhdr.xlfhd_seg_size != XLogSegSize)
+ /* TLI should not change in an on-line checkpoint */
+ if (checkPoint.ThisTimeLineID != ThisTimeLineID)
ereport(PANIC,
- (errmsg("WAL file is from different system"),
- errdetail("Incorrect XLOG_SEG_SIZE in file header.")));
- }
- else if (info == XLOG_WASTED_SPACE)
- {
- /* ignore */
+ (errmsg("unexpected timeline ID %u (should be %u) in checkpoint record",
+ checkPoint.ThisTimeLineID, ThisTimeLineID)));
}
}
CheckPoint *checkpoint = (CheckPoint *) rec;
sprintf(buf + strlen(buf), "checkpoint: redo %X/%X; undo %X/%X; "
- "sui %u; xid %u; oid %u; %s",
+ "tli %u; xid %u; oid %u; %s",
checkpoint->redo.xlogid, checkpoint->redo.xrecoff,
checkpoint->undo.xlogid, checkpoint->undo.xrecoff,
- checkpoint->ThisStartUpID, checkpoint->nextXid,
+ checkpoint->ThisTimeLineID, checkpoint->nextXid,
checkpoint->nextOid,
(info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
}
memcpy(&nextOid, rec, sizeof(Oid));
sprintf(buf + strlen(buf), "nextOid: %u", nextOid);
}
- else if (info == XLOG_FILE_HEADER)
- {
- XLogFileHeaderData *fhdr = (XLogFileHeaderData *) rec;
-
- sprintf(buf + strlen(buf),
- "file header: sysid " UINT64_FORMAT "; "
- "xlogid %X segno %X; seg_size %X",
- fhdr->xlfhd_sysid,
- fhdr->xlfhd_xlogid,
- fhdr->xlfhd_segno,
- fhdr->xlfhd_seg_size);
- }
- else if (info == XLOG_WASTED_SPACE)
- {
- strcat(buf, "wasted space");
- }
else
strcat(buf, "UNKNOWN");
}