]> granicus.if.org Git - postgresql/commitdiff
Split out XLog reading as an independent facility
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Wed, 16 Jan 2013 19:12:53 +0000 (16:12 -0300)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Wed, 16 Jan 2013 19:12:53 +0000 (16:12 -0300)
This new facility can not only be used by xlog.c to carry out crash
recovery, but also by external programs.  By supplying a function to
read XLog pages from somewhere, all the WAL reading can be used for
completely different purposes.

For the standard backend use, the behavior should be pretty much the
same as previously.  As for non-backend programs, an hypothetical
pg_xlogdump program is now closer to reality, but some more backend
support is still necessary.

This patch was originally submitted by Andres Freund in a different
form, but Heikki Linnakangas opted for and authored another design of
the concept.  Andres has advanced the patch since Heikki's initial
version.  Review and some (mostly cosmetics) changes by me.

src/backend/access/transam/Makefile
src/backend/access/transam/xlog.c
src/backend/access/transam/xlogreader.c [new file with mode: 0644]
src/backend/nls.mk
src/include/access/xlogreader.h [new file with mode: 0644]

index 700cfd85c08e7b78255527bb7e9a91d12fc91166..eb6cfc5c44e9051fb968fdb5cd580200eab08a59 100644 (file)
@@ -14,7 +14,7 @@ include $(top_builddir)/src/Makefile.global
 
 OBJS = clog.o transam.o varsup.o xact.o rmgr.o slru.o subtrans.o multixact.o \
        timeline.o twophase.o twophase_rmgr.o xlog.o xlogarchive.o xlogfuncs.o \
-       xlogutils.o
+       xlogreader.o xlogutils.o
 
 include $(top_srcdir)/src/backend/common.mk
 
index 51a515a5552925a1acefac28aca0f5d1df523990..70cfabc23678737eddd66b0da5311359d6414420 100644 (file)
@@ -30,6 +30,7 @@
 #include "access/twophase.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
+#include "access/xlogreader.h"
 #include "access/xlogutils.h"
 #include "catalog/catversion.h"
 #include "catalog/pg_control.h"
@@ -548,7 +549,6 @@ static int  readFile = -1;
 static XLogSegNo readSegNo = 0;
 static uint32 readOff = 0;
 static uint32 readLen = 0;
-static bool    readFileHeaderValidated = false;
 static XLogSource readSource = 0;              /* XLOG_FROM_* code */
 
 /*
@@ -561,6 +561,13 @@ static XLogSource readSource = 0;          /* XLOG_FROM_* code */
 static XLogSource currentSource = 0;   /* XLOG_FROM_* code */
 static bool    lastSourceFailed = false;
 
+typedef struct XLogPageReadPrivate
+{
+       int                     emode;
+       bool            fetching_ckpt;  /* are we fetching a checkpoint record? */
+       bool            randAccess;
+} XLogPageReadPrivate;
+
 /*
  * These variables track when we last obtained some WAL data to process,
  * and where we got it from.  (XLogReceiptSource is initially the same as
@@ -572,18 +579,9 @@ static bool        lastSourceFailed = false;
 static TimestampTz XLogReceiptTime = 0;
 static XLogSource XLogReceiptSource = 0;       /* XLOG_FROM_* code */
 
-/* Buffer for currently read page (XLOG_BLCKSZ bytes) */
-static char *readBuf = NULL;
-
-/* Buffer for current ReadRecord result (expandable) */
-static char *readRecordBuf = NULL;
-static uint32 readRecordBufSize = 0;
-
 /* State information for XLOG reading */
 static XLogRecPtr ReadRecPtr;  /* start of last record read */
 static XLogRecPtr EndRecPtr;   /* end+1 of last record read */
-static TimeLineID lastPageTLI = 0;
-static TimeLineID lastSegmentTLI = 0;
 
 static XLogRecPtr minRecoveryPoint;            /* local copy of
                                                                                 * ControlFile->minRecoveryPoint */
@@ -627,8 +625,8 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
 static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
                         int source, bool notexistOk);
 static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source);
-static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
-                        bool randAccess);
+static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
+                                int reqLen, char *readBuf, TimeLineID *readTLI);
 static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                                                        bool fetching_ckpt);
 static int     emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
@@ -639,12 +637,11 @@ static void UpdateLastRemovedPtr(char *filename);
 static void ValidateXLOGDirectoryStructure(void);
 static void CleanupBackupHistory(void);
 static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
-static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt);
+static XLogRecord *ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
+                  int emode, bool fetching_ckpt);
 static void CheckRecoveryConsistency(void);
-static bool ValidXLogPageHeader(XLogPageHeader hdr, int emode, bool segmentonly);
-static bool ValidXLogRecordHeader(XLogRecPtr *RecPtr, XLogRecord *record,
-                                         int emode, bool randAccess);
-static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
+static XLogRecord *ReadCheckpointRecord(XLogReaderState *xlogreader,
+                                        XLogRecPtr RecPtr, int whichChkpt);
 static bool rescanLatestTimeLine(void);
 static void WriteControlFile(void);
 static void ReadControlFile(void);
@@ -2652,9 +2649,6 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
                if (source != XLOG_FROM_STREAM)
                        XLogReceiptTime = GetCurrentTimestamp();
 
-               /* The file header needs to be validated on first access */
-               readFileHeaderValidated = false;
-
                return fd;
        }
        if (errno != ENOENT || !notfoundOk) /* unexpected failure? */
@@ -2709,7 +2703,8 @@ XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source)
 
                if (source == XLOG_FROM_ANY || source == XLOG_FROM_ARCHIVE)
                {
-                       fd = XLogFileRead(segno, emode, tli, XLOG_FROM_ARCHIVE, true);
+                       fd = XLogFileRead(segno, emode, tli,
+                                                         XLOG_FROM_ARCHIVE, true);
                        if (fd != -1)
                        {
                                elog(DEBUG1, "got WAL segment from archive");
@@ -2721,7 +2716,8 @@ XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source)
 
                if (source == XLOG_FROM_ANY || source == XLOG_FROM_PG_XLOG)
                {
-                       fd = XLogFileRead(segno, emode, tli, XLOG_FROM_PG_XLOG, true);
+                       fd = XLogFileRead(segno, emode, tli,
+                                                         XLOG_FROM_PG_XLOG, true);
                        if (fd != -1)
                        {
                                if (!expectedTLEs)
@@ -3177,102 +3173,6 @@ RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index,
        return InvalidBuffer;           /* keep compiler quiet */
 }
 
-/*
- * CRC-check an XLOG record.  We do not believe the contents of an XLOG
- * record (other than to the minimal extent of computing the amount of
- * data to read in) until we've checked the CRCs.
- *
- * We assume all of the record (that is, xl_tot_len bytes) has been read
- * into memory at *record.  Also, ValidXLogRecordHeader() has accepted the
- * record's header, which means in particular that xl_tot_len is at least
- * SizeOfXlogRecord, so it is safe to fetch xl_len.
- */
-static bool
-RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
-{
-       pg_crc32        crc;
-       int                     i;
-       uint32          len = record->xl_len;
-       BkpBlock        bkpb;
-       char       *blk;
-       size_t          remaining = record->xl_tot_len;
-
-       /* First the rmgr data */
-       if (remaining < SizeOfXLogRecord + len)
-       {
-               /* ValidXLogRecordHeader() should've caught this already... */
-               ereport(emode_for_corrupt_record(emode, recptr),
-                               (errmsg("invalid record length at %X/%X",
-                                               (uint32) (recptr >> 32), (uint32) recptr)));
-               return false;
-       }
-       remaining -= SizeOfXLogRecord + len;
-       INIT_CRC32(crc);
-       COMP_CRC32(crc, XLogRecGetData(record), len);
-
-       /* Add in the backup blocks, if any */
-       blk = (char *) XLogRecGetData(record) + len;
-       for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
-       {
-               uint32          blen;
-
-               if (!(record->xl_info & XLR_BKP_BLOCK(i)))
-                       continue;
-
-               if (remaining < sizeof(BkpBlock))
-               {
-                       ereport(emode_for_corrupt_record(emode, recptr),
-                                       (errmsg("invalid backup block size in record at %X/%X",
-                                                       (uint32) (recptr >> 32), (uint32) recptr)));
-                       return false;
-               }
-               memcpy(&bkpb, blk, sizeof(BkpBlock));
-
-               if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ)
-               {
-                       ereport(emode_for_corrupt_record(emode, recptr),
-                                       (errmsg("incorrect hole size in record at %X/%X",
-                                                       (uint32) (recptr >> 32), (uint32) recptr)));
-                       return false;
-               }
-               blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length;
-
-               if (remaining < blen)
-               {
-                       ereport(emode_for_corrupt_record(emode, recptr),
-                                       (errmsg("invalid backup block size in record at %X/%X",
-                                                       (uint32) (recptr >> 32), (uint32) recptr)));
-                       return false;
-               }
-               remaining -= blen;
-               COMP_CRC32(crc, blk, blen);
-               blk += blen;
-       }
-
-       /* Check that xl_tot_len agrees with our calculation */
-       if (remaining != 0)
-       {
-               ereport(emode_for_corrupt_record(emode, recptr),
-                               (errmsg("incorrect total length in record at %X/%X",
-                                               (uint32) (recptr >> 32), (uint32) recptr)));
-               return false;
-       }
-
-       /* Finally include the record header */
-       COMP_CRC32(crc, (char *) record, offsetof(XLogRecord, xl_crc));
-       FIN_CRC32(crc);
-
-       if (!EQ_CRC32(record->xl_crc, crc))
-       {
-               ereport(emode_for_corrupt_record(emode, recptr),
-               (errmsg("incorrect resource manager data checksum in record at %X/%X",
-                               (uint32) (recptr >> 32), (uint32) recptr)));
-               return false;
-       }
-
-       return true;
-}
-
 /*
  * Attempt to read an XLOG record.
  *
@@ -3286,511 +3186,68 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
  * the returned record pointer always points there.
  */
 static XLogRecord *
-ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
+ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
+                  bool fetching_ckpt)
 {
        XLogRecord *record;
-       XLogRecPtr      tmpRecPtr = EndRecPtr;
-       bool            randAccess = false;
-       uint32          len,
-                               total_len;
-       uint32          targetRecOff;
-       uint32          pageHeaderSize;
-       bool            gotheader;
-
-       if (readBuf == NULL)
-       {
-               /*
-                * First time through, permanently allocate readBuf.  We do it this
-                * way, rather than just making a static array, for two reasons: (1)
-                * no need to waste the storage in most instantiations of the backend;
-                * (2) a static char array isn't guaranteed to have any particular
-                * alignment, whereas malloc() will provide MAXALIGN'd storage.
-                */
-               readBuf = (char *) malloc(XLOG_BLCKSZ);
-               Assert(readBuf != NULL);
-       }
-
-       if (RecPtr == NULL)
-       {
-               RecPtr = &tmpRecPtr;
+       XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
 
-               /*
-                * RecPtr is pointing to end+1 of the previous WAL record.  If
-                * we're at a page boundary, no more records can fit on the current
-                * page. We must skip over the page header, but we can't do that
-                * until we've read in the page, since the header size is variable.
-                */
-       }
-       else
-       {
-               /*
-                * In this case, the passed-in record pointer should already be
-                * pointing to a valid record starting position.
-                */
-               if (!XRecOffIsValid(*RecPtr))
-                       ereport(PANIC,
-                                       (errmsg("invalid record offset at %X/%X",
-                                                       (uint32) (*RecPtr >> 32), (uint32) *RecPtr)));
-
-               /*
-                * Since we are going to a random position in WAL, forget any prior
-                * state about what timeline we were in, and allow it to be any
-                * timeline in expectedTLEs.  We also set a flag to allow curFileTLI
-                * to go backwards (but we can't reset that variable right here, since
-                * we might not change files at all).
-                */
-               /* see comment in ValidXLogPageHeader */
-               lastPageTLI = lastSegmentTLI = 0;
-               randAccess = true;              /* allow curFileTLI to go backwards too */
-       }
+       /* Pass through parameters to XLogPageRead */
+       private->fetching_ckpt = fetching_ckpt;
+       private->emode = emode;
+       private->randAccess = (RecPtr != InvalidXLogRecPtr);
 
        /* This is the first try to read this page. */
        lastSourceFailed = false;
-retry:
-       /* Read the page containing the record */
-       if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess))
-               return NULL;
 
-       pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
-       targetRecOff = (*RecPtr) % XLOG_BLCKSZ;
-       if (targetRecOff == 0)
-       {
-               /*
-                * At page start, so skip over page header.  The Assert checks that
-                * we're not scribbling on caller's record pointer; it's OK because we
-                * can only get here in the continuing-from-prev-record case, since
-                * XRecOffIsValid rejected the zero-page-offset case otherwise.
-                */
-               Assert(RecPtr == &tmpRecPtr);
-               (*RecPtr) += pageHeaderSize;
-               targetRecOff = pageHeaderSize;
-       }
-       else if (targetRecOff < pageHeaderSize)
+       do
        {
-               ereport(emode_for_corrupt_record(emode, *RecPtr),
-                               (errmsg("invalid record offset at %X/%X",
-                                               (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-               goto next_record_is_invalid;
-       }
-       if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
-               targetRecOff == pageHeaderSize)
-       {
-               ereport(emode_for_corrupt_record(emode, *RecPtr),
-                               (errmsg("contrecord is requested by %X/%X",
-                                               (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-               goto next_record_is_invalid;
-       }
+               char   *errormsg;
 
-       /*
-        * Read the record length.
-        *
-        * NB: Even though we use an XLogRecord pointer here, the whole record
-        * header might not fit on this page. xl_tot_len is the first field of
-        * the struct, so it must be on this page (the records are MAXALIGNed),
-        * but we cannot access any other fields until we've verified that we
-        * got the whole header.
-        */
-       record = (XLogRecord *) (readBuf + (*RecPtr) % XLOG_BLCKSZ);
-       total_len = record->xl_tot_len;
-
-       /*
-        * If the whole record header is on this page, validate it immediately.
-        * Otherwise do just a basic sanity check on xl_tot_len, and validate the
-        * rest of the header after reading it from the next page.  The xl_tot_len
-        * check is necessary here to ensure that we enter the "Need to reassemble
-        * record" code path below; otherwise we might fail to apply
-        * ValidXLogRecordHeader at all.
-        */
-       if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
-       {
-               if (!ValidXLogRecordHeader(RecPtr, record, emode, randAccess))
-                       goto next_record_is_invalid;
-               gotheader = true;
-       }
-       else
-       {
-               if (total_len < SizeOfXLogRecord)
+               record = XLogReadRecord(xlogreader, RecPtr, &errormsg);
+               ReadRecPtr = xlogreader->ReadRecPtr;
+               EndRecPtr = xlogreader->EndRecPtr;
+               if (record == NULL)
                {
-                       ereport(emode_for_corrupt_record(emode, *RecPtr),
-                                       (errmsg("invalid record length at %X/%X",
-                                                       (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-                       goto next_record_is_invalid;
-               }
-               gotheader = false;
-       }
+                       /* not all failures fill errormsg; report those that do */
+                       if (errormsg && errormsg[0] != '\0')
+                               ereport(emode_for_corrupt_record(emode,
+                                                                                                RecPtr ? RecPtr : EndRecPtr),
+                                               (errmsg_internal("%s", errormsg) /* already translated */));
 
-       /*
-        * Allocate or enlarge readRecordBuf as needed.  To avoid useless small
-        * increases, round its size to a multiple of XLOG_BLCKSZ, and make sure
-        * it's at least 4*Max(BLCKSZ, XLOG_BLCKSZ) to start with.  (That is
-        * enough for all "normal" records, but very large commit or abort records
-        * might need more space.)
-        */
-       if (total_len > readRecordBufSize)
-       {
-               uint32          newSize = total_len;
-
-               newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
-               newSize = Max(newSize, 4 * Max(BLCKSZ, XLOG_BLCKSZ));
-               if (readRecordBuf)
-                       free(readRecordBuf);
-               readRecordBuf = (char *) malloc(newSize);
-               if (!readRecordBuf)
-               {
-                       readRecordBufSize = 0;
-                       /* We treat this as a "bogus data" condition */
-                       ereport(emode_for_corrupt_record(emode, *RecPtr),
-                                       (errmsg("record length %u at %X/%X too long",
-                                                       total_len, (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-                       goto next_record_is_invalid;
-               }
-               readRecordBufSize = newSize;
-       }
+                       lastSourceFailed = true;
 
-       len = XLOG_BLCKSZ - (*RecPtr) % XLOG_BLCKSZ;
-       if (total_len > len)
-       {
-               /* Need to reassemble record */
-               char       *contrecord;
-               XLogPageHeader pageHeader;
-               XLogRecPtr      pagelsn;
-               char       *buffer;
-               uint32          gotlen;
-
-               /* Initialize pagelsn to the beginning of the page this record is on */
-               pagelsn = ((*RecPtr) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
-
-               /* Copy the first fragment of the record from the first page. */
-               memcpy(readRecordBuf, readBuf + (*RecPtr) % XLOG_BLCKSZ, len);
-               buffer = readRecordBuf + len;
-               gotlen = len;
-
-               do
-               {
-                       /* Calculate pointer to beginning of next page */
-                       pagelsn += XLOG_BLCKSZ;
-                       /* Wait for the next page to become available */
-                       if (!XLogPageRead(&pagelsn, emode, false, false))
-                               return NULL;
-
-                       /* Check that the continuation on next page looks valid */
-                       pageHeader = (XLogPageHeader) readBuf;
-                       if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
-                       {
-                               ereport(emode_for_corrupt_record(emode, *RecPtr),
-                                               (errmsg("there is no contrecord flag in log segment %s, offset %u",
-                                                               XLogFileNameP(curFileTLI, readSegNo),
-                                                               readOff)));
-                               goto next_record_is_invalid;
-                       }
-                       /*
-                        * Cross-check that xlp_rem_len agrees with how much of the record
-                        * we expect there to be left.
-                        */
-                       if (pageHeader->xlp_rem_len == 0 ||
-                               total_len != (pageHeader->xlp_rem_len + gotlen))
+                       if (readFile >= 0)
                        {
-                               ereport(emode_for_corrupt_record(emode, *RecPtr),
-                                               (errmsg("invalid contrecord length %u in log segment %s, offset %u",
-                                                               pageHeader->xlp_rem_len,
-                                                               XLogFileNameP(curFileTLI, readSegNo),
-                                                               readOff)));
-                               goto next_record_is_invalid;
+                               close(readFile);
+                               readFile = -1;
                        }
-
-                       /* Append the continuation from this page to the buffer */
-                       pageHeaderSize = XLogPageHeaderSize(pageHeader);
-                       contrecord = (char *) readBuf + pageHeaderSize;
-                       len = XLOG_BLCKSZ - pageHeaderSize;
-                       if (pageHeader->xlp_rem_len < len)
-                               len = pageHeader->xlp_rem_len;
-                       memcpy(buffer, (char *) contrecord, len);
-                       buffer += len;
-                       gotlen += len;
-
-                       /* If we just reassembled the record header, validate it. */
-                       if (!gotheader)
-                       {
-                               record = (XLogRecord *) readRecordBuf;
-                               if (!ValidXLogRecordHeader(RecPtr, record, emode, randAccess))
-                                       goto next_record_is_invalid;
-                               gotheader = true;
-                       }
-               } while (pageHeader->xlp_rem_len > len);
-
-               record = (XLogRecord *) readRecordBuf;
-               if (!RecordIsValid(record, *RecPtr, emode))
-                       goto next_record_is_invalid;
-               pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
-               XLogSegNoOffsetToRecPtr(
-                       readSegNo,
-                       readOff + pageHeaderSize + MAXALIGN(pageHeader->xlp_rem_len),
-                       EndRecPtr);
-               ReadRecPtr = *RecPtr;
-       }
-       else
-       {
-               /* Record does not cross a page boundary */
-               if (!RecordIsValid(record, *RecPtr, emode))
-                       goto next_record_is_invalid;
-               EndRecPtr = *RecPtr + MAXALIGN(total_len);
-
-               ReadRecPtr = *RecPtr;
-               memcpy(readRecordBuf, record, total_len);
-       }
-
-       /*
-        * Special processing if it's an XLOG SWITCH record
-        */
-       if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
-       {
-               /* Pretend it extends to end of segment */
-               EndRecPtr += XLogSegSize - 1;
-               EndRecPtr -= EndRecPtr % XLogSegSize;
-
-               /*
-                * Pretend that readBuf contains the last page of the segment. This is
-                * just to avoid Assert failure in StartupXLOG if XLOG ends with this
-                * segment.
-                */
-               readOff = XLogSegSize - XLOG_BLCKSZ;
-       }
-       return record;
-
-next_record_is_invalid:
-       lastSourceFailed = true;
-
-       if (readFile >= 0)
-       {
-               close(readFile);
-               readFile = -1;
-       }
-
-       /* In standby-mode, keep trying */
-       if (StandbyMode)
-               goto retry;
-       else
-               return NULL;
-}
-
-/*
- * Check whether the xlog header of a page just read in looks valid.
- *
- * This is just a convenience subroutine to avoid duplicated code in
- * ReadRecord. It's not intended for use from anywhere else.
- */
-static bool
-ValidXLogPageHeader(XLogPageHeader hdr, int emode, bool segmentonly)
-{
-       XLogRecPtr      recaddr;
-
-       XLogSegNoOffsetToRecPtr(readSegNo, readOff, recaddr);
-
-       if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
-       {
-               ereport(emode_for_corrupt_record(emode, recaddr),
-                               (errmsg("invalid magic number %04X in log segment %s, offset %u",
-                                               hdr->xlp_magic,
-                                               XLogFileNameP(curFileTLI, readSegNo),
-                                               readOff)));
-               return false;
-       }
-       if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
-       {
-               ereport(emode_for_corrupt_record(emode, recaddr),
-                               (errmsg("invalid info bits %04X in log segment %s, offset %u",
-                                               hdr->xlp_info,
-                                               XLogFileNameP(curFileTLI, readSegNo),
-                                               readOff)));
-               return false;
-       }
-       if (hdr->xlp_info & XLP_LONG_HEADER)
-       {
-               XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
-
-               if (longhdr->xlp_sysid != ControlFile->system_identifier)
-               {
-                       char            fhdrident_str[32];
-                       char            sysident_str[32];
-
-                       /*
-                        * Format sysids separately to keep platform-dependent format code
-                        * out of the translatable message string.
-                        */
-                       snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT,
-                                        longhdr->xlp_sysid);
-                       snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT,
-                                        ControlFile->system_identifier);
-                       ereport(emode_for_corrupt_record(emode, recaddr),
-                                       (errmsg("WAL file is from different database system"),
-                                        errdetail("WAL file database system identifier is %s, pg_control database system identifier is %s.",
-                                                          fhdrident_str, sysident_str)));
-                       return false;
-               }
-               if (longhdr->xlp_seg_size != XLogSegSize)
-               {
-                       ereport(emode_for_corrupt_record(emode, recaddr),
-                                       (errmsg("WAL file is from different database system"),
-                                        errdetail("Incorrect XLOG_SEG_SIZE in page header.")));
-                       return false;
-               }
-               if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
-               {
-                       ereport(emode_for_corrupt_record(emode, recaddr),
-                                       (errmsg("WAL file is from different database system"),
-                                        errdetail("Incorrect XLOG_BLCKSZ in page header.")));
-                       return false;
+                       break;
                }
-       }
-       else if (readOff == 0)
-       {
-               /* hmm, first page of file doesn't have a long header? */
-               ereport(emode_for_corrupt_record(emode, recaddr),
-                               (errmsg("invalid info bits %04X in log segment %s, offset %u",
-                                               hdr->xlp_info,
-                                               XLogFileNameP(curFileTLI, readSegNo),
-                                               readOff)));
-               return false;
-       }
-
-       if (hdr->xlp_pageaddr != recaddr)
-       {
-               ereport(emode_for_corrupt_record(emode, recaddr),
-                               (errmsg("unexpected pageaddr %X/%X in log segment %s, offset %u",
-                                               (uint32) (hdr->xlp_pageaddr >> 32), (uint32) hdr->xlp_pageaddr,
-                                               XLogFileNameP(curFileTLI, readSegNo),
-                                               readOff)));
-               return false;
-       }
 
-       /*
-        * Check page TLI is one of the expected values.
-        */
-       if (!tliInHistory(hdr->xlp_tli, expectedTLEs))
-       {
-               ereport(emode_for_corrupt_record(emode, recaddr),
-                               (errmsg("unexpected timeline ID %u in log segment %s, offset %u",
-                                               hdr->xlp_tli,
-                                               XLogFileNameP(curFileTLI, readSegNo),
-                                               readOff)));
-               return false;
-       }
-
-       /*
-        * Since child timelines are always assigned a TLI greater than their
-        * immediate parent's TLI, we should never see TLI go backwards across
-        * successive pages of a consistent WAL sequence.
-        *
-        * Of course this check should only be applied when advancing sequentially
-        * across pages; therefore ReadRecord resets lastPageTLI and
-        * lastSegmentTLI to zero when going to a random page.
-        *
-        * Sometimes we re-open a segment that's already been partially replayed.
-        * In that case we cannot perform the normal TLI check: if there is a
-        * timeline switch within the segment, the first page has a smaller TLI
-        * than later pages following the timeline switch, and we might've read
-        * them already. As a weaker test, we still check that it's not smaller
-        * than the TLI we last saw at the beginning of a segment. Pass
-        * segmentonly = true when re-validating the first page like that, and the
-        * page you're actually interested in comes later.
-        */
-       if (hdr->xlp_tli < (segmentonly ? lastSegmentTLI : lastPageTLI))
-       {
-               ereport(emode_for_corrupt_record(emode, recaddr),
-                               (errmsg("out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u",
-                                               hdr->xlp_tli,
-                                               segmentonly ? lastSegmentTLI : lastPageTLI,
-                                               XLogFileNameP(curFileTLI, readSegNo),
-                                               readOff)));
-               return false;
-       }
-       lastPageTLI = hdr->xlp_tli;
-       if (readOff == 0)
-               lastSegmentTLI = hdr->xlp_tli;
-
-       return true;
-}
-
-/*
- * Validate an XLOG record header.
- *
- * This is just a convenience subroutine to avoid duplicated code in
- * ReadRecord. It's not intended for use from anywhere else.
- */
-static bool
-ValidXLogRecordHeader(XLogRecPtr *RecPtr, XLogRecord *record, int emode,
-                                         bool randAccess)
-{
-       /*
-        * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
-        * required.
-        */
-       if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
-       {
-               if (record->xl_len != 0)
-               {
-                       ereport(emode_for_corrupt_record(emode, *RecPtr),
-                                       (errmsg("invalid xlog switch record at %X/%X",
-                                                       (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-                       return false;
-               }
-       }
-       else if (record->xl_len == 0)
-       {
-               ereport(emode_for_corrupt_record(emode, *RecPtr),
-                               (errmsg("record with zero length at %X/%X",
-                                               (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-               return false;
-       }
-       if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len ||
-               record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
-               XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
-       {
-               ereport(emode_for_corrupt_record(emode, *RecPtr),
-                               (errmsg("invalid record length at %X/%X",
-                                               (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-               return false;
-       }
-       if (record->xl_rmid > RM_MAX_ID)
-       {
-               ereport(emode_for_corrupt_record(emode, *RecPtr),
-                               (errmsg("invalid resource manager ID %u at %X/%X",
-                                               record->xl_rmid, (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-               return false;
-       }
-       if (randAccess)
-       {
                /*
-                * We can't exactly verify the prev-link, but surely it should be less
-                * than the record's own address.
+                * Check page TLI is one of the expected values.
                 */
-               if (!(record->xl_prev < *RecPtr))
+               if (!tliInHistory(xlogreader->latestPageTLI, expectedTLEs))
                {
-                       ereport(emode_for_corrupt_record(emode, *RecPtr),
-                                       (errmsg("record with incorrect prev-link %X/%X at %X/%X",
-                                                       (uint32) (record->xl_prev >> 32), (uint32) record->xl_prev,
-                                                       (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
+                       char            fname[MAXFNAMELEN];
+                       XLogSegNo segno;
+                       int32 offset;
+
+                       XLByteToSeg(xlogreader->latestPagePtr, segno);
+                       offset = xlogreader->latestPagePtr % XLogSegSize;
+                       XLogFileName(fname, xlogreader->readPageTLI, segno);
+                       ereport(emode_for_corrupt_record(emode,
+                                                                                        RecPtr ? RecPtr : EndRecPtr),
+                                       (errmsg("unexpected timeline ID %u in log segment %s, offset %u",
+                                                       xlogreader->latestPageTLI,
+                                                       fname,
+                                                       offset)));
                        return false;
                }
-       }
-       else
-       {
-               /*
-                * Record's prev-link should exactly match our previous location. This
-                * check guards against torn WAL pages where a stale but valid-looking
-                * WAL record starts on a sector boundary.
-                */
-               if (record->xl_prev != ReadRecPtr)
-               {
-                       ereport(emode_for_corrupt_record(emode, *RecPtr),
-                                       (errmsg("record with incorrect prev-link %X/%X at %X/%X",
-                                                       (uint32) (record->xl_prev >> 32), (uint32) record->xl_prev,
-                                                       (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-                       return false;
-               }
-       }
+       } while (StandbyMode && record == NULL);
 
-       return true;
+       return record;
 }
 
 /*
@@ -5235,6 +4692,8 @@ StartupXLOG(void)
        bool            backupEndRequired = false;
        bool            backupFromStandby = false;
        DBState         dbstate_at_startup;
+       XLogReaderState *xlogreader;
+       XLogPageReadPrivate private;
 
        /*
         * Read control file and check XLOG status looks valid.
@@ -5351,6 +4810,16 @@ StartupXLOG(void)
        if (StandbyMode)
                OwnLatch(&XLogCtl->recoveryWakeupLatch);
 
+       /* Set up XLOG reader facility */
+       MemSet(&private, 0, sizeof(XLogPageReadPrivate));
+       xlogreader = XLogReaderAllocate(&XLogPageRead, &private);
+       if (!xlogreader)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OUT_OF_MEMORY),
+                                errmsg("out of memory"),
+                                errdetail("Failed while allocating an XLog reading processor")));
+       xlogreader->system_identifier = ControlFile->system_identifier;
+
        if (read_backup_label(&checkPointLoc, &backupEndRequired,
                                                  &backupFromStandby))
        {
@@ -5358,7 +4827,7 @@ StartupXLOG(void)
                 * When a backup_label file is present, we want to roll forward from
                 * the checkpoint it identifies, rather than using pg_control.
                 */
-               record = ReadCheckpointRecord(checkPointLoc, 0);
+               record = ReadCheckpointRecord(xlogreader, checkPointLoc, 0);
                if (record != NULL)
                {
                        memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
@@ -5376,7 +4845,7 @@ StartupXLOG(void)
                         */
                        if (checkPoint.redo < checkPointLoc)
                        {
-                               if (!ReadRecord(&(checkPoint.redo), LOG, false))
+                               if (!ReadRecord(xlogreader, checkPoint.redo, LOG, false))
                                        ereport(FATAL,
                                                        (errmsg("could not find redo location referenced by checkpoint record"),
                                                         errhint("If you are not restoring from a backup, try removing the file \"%s/backup_label\".", DataDir)));
@@ -5400,7 +4869,7 @@ StartupXLOG(void)
                 */
                checkPointLoc = ControlFile->checkPoint;
                RedoStartLSN = ControlFile->checkPointCopy.redo;
-               record = ReadCheckpointRecord(checkPointLoc, 1);
+               record = ReadCheckpointRecord(xlogreader, checkPointLoc, 1);
                if (record != NULL)
                {
                        ereport(DEBUG1,
@@ -5419,7 +4888,7 @@ StartupXLOG(void)
                else
                {
                        checkPointLoc = ControlFile->prevCheckPoint;
-                       record = ReadCheckpointRecord(checkPointLoc, 2);
+                       record = ReadCheckpointRecord(xlogreader, checkPointLoc, 2);
                        if (record != NULL)
                        {
                                ereport(LOG,
@@ -5777,12 +5246,12 @@ StartupXLOG(void)
                if (checkPoint.redo < RecPtr)
                {
                        /* back up to find the record */
-                       record = ReadRecord(&(checkPoint.redo), PANIC, false);
+                       record = ReadRecord(xlogreader, checkPoint.redo, PANIC, false);
                }
                else
                {
                        /* just have to read next record after CheckPoint */
-                       record = ReadRecord(NULL, LOG, false);
+                       record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
                }
 
                if (record != NULL)
@@ -5963,7 +5432,7 @@ StartupXLOG(void)
                                        break;
 
                                /* Else, try to fetch the next WAL record */
-                               record = ReadRecord(NULL, LOG, false);
+                               record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
                        } while (record != NULL);
 
                        /*
@@ -6013,7 +5482,7 @@ StartupXLOG(void)
         * Re-fetch the last valid or last applied record, so we can identify the
         * exact endpoint of what we consider the valid portion of WAL.
         */
-       record = ReadRecord(&LastRec, PANIC, false);
+       record = ReadRecord(xlogreader, LastRec, PANIC, false);
        EndOfLog = EndRecPtr;
        XLByteToPrevSeg(EndOfLog, endLogSegNo);
 
@@ -6117,7 +5586,7 @@ StartupXLOG(void)
         * we will use that below.)
         */
        if (InArchiveRecovery)
-               exitArchiveRecovery(curFileTLI, endLogSegNo);
+               exitArchiveRecovery(xlogreader->readPageTLI, endLogSegNo);
 
        /*
         * Prepare to write WAL starting at EndOfLog position, and init xlog
@@ -6136,8 +5605,15 @@ StartupXLOG(void)
         * record spans, not the one it starts in.      The last block is indeed the
         * one we want to use.
         */
-       Assert(readOff == (XLogCtl->xlblocks[0] - XLOG_BLCKSZ) % XLogSegSize);
-       memcpy((char *) Insert->currpage, readBuf, XLOG_BLCKSZ);
+       if (EndOfLog % XLOG_BLCKSZ == 0)
+       {
+               memset(Insert->currpage, 0, XLOG_BLCKSZ);
+       }
+       else
+       {
+               Assert(readOff == (XLogCtl->xlblocks[0] - XLOG_BLCKSZ) % XLogSegSize);
+               memcpy((char *) Insert->currpage, xlogreader->readBuf, XLOG_BLCKSZ);
+       }
        Insert->currpos = (char *) Insert->currpage +
                (EndOfLog + XLOG_BLCKSZ - XLogCtl->xlblocks[0]);
 
@@ -6288,23 +5764,13 @@ StartupXLOG(void)
        if (standbyState != STANDBY_DISABLED)
                ShutdownRecoveryTransactionEnvironment();
 
-       /* Shut down readFile facility, free space */
+       /* Shut down xlogreader */
        if (readFile >= 0)
        {
                close(readFile);
                readFile = -1;
        }
-       if (readBuf)
-       {
-               free(readBuf);
-               readBuf = NULL;
-       }
-       if (readRecordBuf)
-       {
-               free(readRecordBuf);
-               readRecordBuf = NULL;
-               readRecordBufSize = 0;
-       }
+       XLogReaderFree(xlogreader);
 
        /*
         * If any of the critical GUCs have changed, log them before we allow
@@ -6554,7 +6020,8 @@ LocalSetXLogInsertAllowed(void)
  * 1 for "primary", 2 for "secondary", 0 for "other" (backup_label)
  */
 static XLogRecord *
-ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
+ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
+                                        int whichChkpt)
 {
        XLogRecord *record;
 
@@ -6578,7 +6045,7 @@ ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
                return NULL;
        }
 
-       record = ReadRecord(&RecPtr, LOG, true);
+       record = ReadRecord(xlogreader, RecPtr, LOG, true);
 
        if (record == NULL)
        {
@@ -9313,7 +8780,9 @@ CancelBackup(void)
 
 /*
  * Read the XLOG page containing RecPtr into readBuf (if not read already).
- * Returns true if the page is read successfully.
+ * Returns number of bytes read, if the page is read successfully, or -1
+ * in case of errors.  When errors occur, they are ereport'ed, but only
+ * if they have not been previously reported.
  *
  * This is responsible for restoring files from archive as needed, as well
  * as for waiting for the requested WAL record to arrive in standby mode.
@@ -9332,28 +8801,24 @@ CancelBackup(void)
  * XLogPageRead() to try fetching the record from another source, or to
  * sleep and retry.
  */
-static bool
-XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
-                        bool randAccess)
+static int
+XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
+                        char *readBuf, TimeLineID *readTLI)
 {
+       XLogPageReadPrivate *private =
+               (XLogPageReadPrivate *) xlogreader->private_data;
+       int                     emode = private->emode;
        uint32          targetPageOff;
-       uint32          targetRecOff;
-       XLogSegNo       targetSegNo;
-
-       XLByteToSeg(*RecPtr, targetSegNo);
-       targetPageOff = (((*RecPtr) % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
-       targetRecOff = (*RecPtr) % XLOG_BLCKSZ;
+       XLogSegNo       targetSegNo PG_USED_FOR_ASSERTS_ONLY;
 
-       /* Fast exit if we have read the record in the current buffer already */
-       if (!lastSourceFailed && targetSegNo == readSegNo &&
-               targetPageOff == readOff && targetRecOff < readLen)
-               return true;
+       XLByteToSeg(targetPagePtr, targetSegNo);
+       targetPageOff = targetPagePtr % XLogSegSize;
 
        /*
         * See if we need to switch to a new segment because the requested record
         * is not in the currently open one.
         */
-       if (readFile >= 0 && !XLByteInSeg(*RecPtr, readSegNo))
+       if (readFile >= 0 && !XLByteInSeg(targetPagePtr, readSegNo))
        {
                /*
                 * Request a restartpoint if we've replayed too much xlog since the
@@ -9374,39 +8839,34 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
                readSource = 0;
        }
 
-       XLByteToSeg(*RecPtr, readSegNo);
+       XLByteToSeg(targetPagePtr, readSegNo);
 
 retry:
        /* See if we need to retrieve more data */
        if (readFile < 0 ||
-               (readSource == XLOG_FROM_STREAM && receivedUpto <= *RecPtr))
+               (readSource == XLOG_FROM_STREAM &&
+                receivedUpto <= targetPagePtr + reqLen))
        {
                if (StandbyMode)
                {
-                       if (!WaitForWALToBecomeAvailable(*RecPtr, randAccess,
-                                                                                        fetching_ckpt))
+                       if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen,
+                                                                                        private->randAccess,
+                                                                                        private->fetching_ckpt))
                                goto triggered;
                }
-               else
+               /* In archive or crash recovery. */
+               else if (readFile < 0)
                {
-                       /* In archive or crash recovery. */
-                       if (readFile < 0)
-                       {
-                               int                     source;
+                       int source;
 
-                               /* Reset curFileTLI if random fetch. */
-                               if (randAccess)
-                                       curFileTLI = 0;
-
-                               if (InArchiveRecovery)
-                                       source = XLOG_FROM_ANY;
-                               else
-                                       source = XLOG_FROM_PG_XLOG;
+                       if (InArchiveRecovery)
+                               source = XLOG_FROM_ANY;
+                       else
+                               source = XLOG_FROM_PG_XLOG;
 
-                               readFile = XLogFileReadAnyTLI(readSegNo, emode, source);
-                               if (readFile < 0)
-                                       return false;
-                       }
+                       readFile = XLogFileReadAnyTLI(readSegNo, emode, source);
+                       if (readFile < 0)
+                               return -1;
                }
        }
 
@@ -9424,72 +8884,46 @@ retry:
         */
        if (readSource == XLOG_FROM_STREAM)
        {
-               if (((*RecPtr) / XLOG_BLCKSZ) != (receivedUpto / XLOG_BLCKSZ))
-               {
+               if (((targetPagePtr) / XLOG_BLCKSZ) != (receivedUpto / XLOG_BLCKSZ))
                        readLen = XLOG_BLCKSZ;
-               }
                else
                        readLen = receivedUpto % XLogSegSize - targetPageOff;
        }
        else
                readLen = XLOG_BLCKSZ;
 
-       if (!readFileHeaderValidated && targetPageOff != 0)
-       {
-               /*
-                * Whenever switching to a new WAL segment, we read the first page of
-                * the file and validate its header, even if that's not where the
-                * target record is.  This is so that we can check the additional
-                * identification info that is present in the first page's "long"
-                * header.
-                */
-               readOff = 0;
-               if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
-               {
-                       char fname[MAXFNAMELEN];
-                       XLogFileName(fname, curFileTLI, readSegNo);
-                       ereport(emode_for_corrupt_record(emode, *RecPtr),
-                                       (errcode_for_file_access(),
-                                        errmsg("could not read from log segment %s, offset %u: %m",
-                                                       fname, readOff)));
-                       goto next_record_is_invalid;
-               }
-               if (!ValidXLogPageHeader((XLogPageHeader) readBuf, emode, true))
-                       goto next_record_is_invalid;
-       }
-
        /* Read the requested page */
        readOff = targetPageOff;
        if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
        {
                char fname[MAXFNAMELEN];
+
                XLogFileName(fname, curFileTLI, readSegNo);
-               ereport(emode_for_corrupt_record(emode, *RecPtr),
+               ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
                                (errcode_for_file_access(),
                 errmsg("could not seek in log segment %s to offset %u: %m",
-                               fname, readOff)));
+                                               fname, readOff)));
                goto next_record_is_invalid;
        }
+
        if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
        {
                char fname[MAXFNAMELEN];
+
                XLogFileName(fname, curFileTLI, readSegNo);
-               ereport(emode_for_corrupt_record(emode, *RecPtr),
+               ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
                                (errcode_for_file_access(),
                 errmsg("could not read from log segment %s, offset %u: %m",
-                               fname, readOff)));
+                                               fname, readOff)));
                goto next_record_is_invalid;
        }
-       if (!ValidXLogPageHeader((XLogPageHeader) readBuf, emode, false))
-               goto next_record_is_invalid;
-
-       readFileHeaderValidated = true;
 
        Assert(targetSegNo == readSegNo);
        Assert(targetPageOff == readOff);
-       Assert(targetRecOff < readLen);
+       Assert(reqLen <= readLen);
 
-       return true;
+       *readTLI = curFileTLI;
+       return readLen;
 
 next_record_is_invalid:
        lastSourceFailed = true;
@@ -9504,7 +8938,7 @@ next_record_is_invalid:
        if (StandbyMode)
                goto retry;
        else
-               return false;
+               return -1;
 
 triggered:
        if (readFile >= 0)
@@ -9513,7 +8947,7 @@ triggered:
        readLen = 0;
        readSource = 0;
 
-       return false;
+       return -1;
 }
 
 /*
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
new file mode 100644 (file)
index 0000000..ff871a3
--- /dev/null
@@ -0,0 +1,1005 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogreader.c
+ *             Generic XLog reading facility
+ *
+ * Portions Copyright (c) 2013, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *             src/backend/access/transam/xlogreader.c
+ *
+ * NOTES
+ *             See xlogreader.h for more notes on this facility.
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/transam.h"
+#include "access/xlog.h"
+#include "access/xlog_internal.h"
+#include "access/xlogreader.h"
+#include "catalog/pg_control.h"
+
+static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
+
+static bool ValidXLogPageHeader(XLogReaderState *state, XLogRecPtr recptr,
+                                       XLogPageHeader hdr);
+static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
+                                XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess);
+static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
+                               XLogRecPtr recptr);
+static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr,
+                                int reqLen);
+static void
+report_invalid_record(XLogReaderState *state, const char *fmt,...)
+/* This extension allows gcc to check the format string for consistency with
+   the supplied arguments. */
+__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3)));
+
+/* size of the buffer allocated for error message. */
+#define MAX_ERRORMSG_LEN 1000
+
+/*
+ * Construct a string in state->errormsg_buf explaining what's wrong with
+ * the current record being read.
+ */
+static void
+report_invalid_record(XLogReaderState *state, const char *fmt,...)
+{
+       va_list         args;
+
+       fmt = _(fmt);
+
+       va_start(args, fmt);
+       vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args);
+       va_end(args);
+}
+
+/*
+ * Allocate and initialize a new XLogReader.
+ *
+ * Returns NULL if the xlogreader couldn't be allocated.
+ */
+XLogReaderState *
+XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data)
+{
+       XLogReaderState *state;
+
+       AssertArg(pagereadfunc != NULL);
+
+       state = (XLogReaderState *) malloc(sizeof(XLogReaderState));
+       if (!state)
+               return NULL;
+       MemSet(state, 0, sizeof(XLogReaderState));
+
+       /*
+        * Permanently allocate readBuf.  We do it this way, rather than just
+        * making a static array, for two reasons: (1) no need to waste the
+        * storage in most instantiations of the backend; (2) a static char array
+        * isn't guaranteed to have any particular alignment, whereas malloc()
+        * will provide MAXALIGN'd storage.
+        */
+       state->readBuf = (char *) malloc(XLOG_BLCKSZ);
+       if (!state->readBuf)
+       {
+               free(state);
+               return NULL;
+       }
+
+       state->read_page = pagereadfunc;
+       /* system_identifier initialized to zeroes above */
+       state->private_data = private_data;
+       /* ReadRecPtr and EndRecPtr initialized to zeroes above */
+       /* readSegNo, readOff, readLen, readPageTLI initialized to zeroes above */
+       state->errormsg_buf = malloc(MAX_ERRORMSG_LEN + 1);
+       if (!state->errormsg_buf)
+       {
+               free(state->readBuf);
+               free(state);
+               return NULL;
+       }
+       state->errormsg_buf[0] = '\0';
+
+       /*
+        * Allocate an initial readRecordBuf of minimal size, which can later be
+        * enlarged if necessary.
+        */
+       if (!allocate_recordbuf(state, 0))
+       {
+               free(state->errormsg_buf);
+               free(state->readBuf);
+               free(state);
+               return NULL;
+       }
+
+       return state;
+}
+
+void
+XLogReaderFree(XLogReaderState *state)
+{
+       free(state->errormsg_buf);
+       if (state->readRecordBuf)
+               free(state->readRecordBuf);
+       free(state->readBuf);
+       free(state);
+}
+
+/*
+ * Allocate readRecordBuf to fit a record of at least the given length.
+ * Returns true if successful, false if out of memory.
+ *
+ * readRecordBufSize is set to the new buffer size.
+ *
+ * To avoid useless small increases, round its size to a multiple of
+ * XLOG_BLCKSZ, and make sure it's at least 5*Max(BLCKSZ, XLOG_BLCKSZ) to start
+ * with.  (That is enough for all "normal" records, but very large commit or
+ * abort records might need more space.)
+ */
+static bool
+allocate_recordbuf(XLogReaderState *state, uint32 reclength)
+{
+       uint32          newSize = reclength;
+
+       newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
+       newSize = Max(newSize, 5 * Max(BLCKSZ, XLOG_BLCKSZ));
+
+       if (state->readRecordBuf)
+               free(state->readRecordBuf);
+       state->readRecordBuf = (char *) malloc(newSize);
+       if (!state->readRecordBuf)
+       {
+               state->readRecordBufSize = 0;
+               return false;
+       }
+
+       state->readRecordBufSize = newSize;
+       return true;
+}
+
+/*
+ * Attempt to read an XLOG record.
+ *
+ * If RecPtr is not NULL, try to read a record at that position.  Otherwise
+ * try to read a record just after the last one previously read.
+ *
+ * If the page_read callback fails to read the requested data, NULL is
+ * returned.  The callback is expected to have reported the error; errormsg
+ * is set to NULL.
+ *
+ * If the reading fails for some other reason, NULL is also returned, and
+ * *errormsg is set to a string with details of the failure.
+ *
+ * The returned pointer (or *errormsg) points to an internal buffer that's
+ * valid until the next call to XLogReadRecord.
+ */
+XLogRecord *
+XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
+{
+       XLogRecord *record;
+       XLogRecPtr      targetPagePtr;
+       bool            randAccess = false;
+       uint32          len,
+                               total_len;
+       uint32          targetRecOff;
+       uint32          pageHeaderSize;
+       bool            gotheader;
+       int                     readOff;
+
+       randAccess = false;
+       /* reset error state */
+       *errormsg = NULL;
+       state->errormsg_buf[0] = '\0';
+
+       if (RecPtr == InvalidXLogRecPtr)
+       {
+               RecPtr = state->EndRecPtr;
+
+               if (state->ReadRecPtr == InvalidXLogRecPtr)
+                       randAccess = true;
+
+               /*
+                * RecPtr is pointing to end+1 of the previous WAL record.      If we're
+                * at a page boundary, no more records can fit on the current page. We
+                * must skip over the page header, but we can't do that until we've
+                * read in the page, since the header size is variable.
+                */
+       }
+       else
+       {
+               /*
+                * In this case, the passed-in record pointer should already be
+                * pointing to a valid record starting position.
+                */
+               Assert(XRecOffIsValid(RecPtr));
+               randAccess = true;              /* allow readPageTLI to go backwards too */
+       }
+
+       targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
+
+       /* Read the page containing the record into state->readBuf */
+       readOff = ReadPageInternal(state, targetPagePtr, SizeOfXLogRecord);
+
+       if (readOff < 0)
+       {
+               if (state->errormsg_buf[0] != '\0')
+                       *errormsg = state->errormsg_buf;
+               return NULL;
+       }
+
+       /*
+        * ReadPageInternal always returns at least the page header, so we can
+        * examine it now.
+        */
+       pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
+       targetRecOff = RecPtr % XLOG_BLCKSZ;
+       if (targetRecOff == 0)
+       {
+               /*
+                * At page start, so skip over page header.
+                */
+               RecPtr += pageHeaderSize;
+               targetRecOff = pageHeaderSize;
+       }
+       else if (targetRecOff < pageHeaderSize)
+       {
+               report_invalid_record(state, "invalid record offset at %X/%X",
+                                                         (uint32) (RecPtr >> 32), (uint32) RecPtr);
+               *errormsg = state->errormsg_buf;
+               return NULL;
+       }
+
+       if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
+               targetRecOff == pageHeaderSize)
+       {
+               report_invalid_record(state, "contrecord is requested by %X/%X",
+                                                         (uint32) (RecPtr >> 32), (uint32) RecPtr);
+               *errormsg = state->errormsg_buf;
+               return NULL;
+       }
+
+       /* ReadPageInternal has verified the page header */
+       Assert(pageHeaderSize <= readOff);
+
+       /*
+        * Ensure the whole record header or at least the part on this page is
+        * read.
+        */
+       readOff = ReadPageInternal(state,
+                                                          targetPagePtr,
+                                                 Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
+       if (readOff < 0)
+       {
+               if (state->errormsg_buf[0] != '\0')
+                       *errormsg = state->errormsg_buf;
+               return NULL;
+       }
+
+       /*
+        * Read the record length.
+        *
+        * NB: Even though we use an XLogRecord pointer here, the whole record
+        * header might not fit on this page. xl_tot_len is the first field of the
+        * struct, so it must be on this page (the records are MAXALIGNed), but we
+        * cannot access any other fields until we've verified that we got the
+        * whole header.
+        */
+       record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
+       total_len = record->xl_tot_len;
+
+       /*
+        * If the whole record header is on this page, validate it immediately.
+        * Otherwise do just a basic sanity check on xl_tot_len, and validate the
+        * rest of the header after reading it from the next page.      The xl_tot_len
+        * check is necessary here to ensure that we enter the "Need to reassemble
+        * record" code path below; otherwise we might fail to apply
+        * ValidXLogRecordHeader at all.
+        */
+       if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
+       {
+               if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record,
+                                                                  randAccess))
+               {
+                       if (state->errormsg_buf[0] != '\0')
+                               *errormsg = state->errormsg_buf;
+                       return NULL;
+               }
+               gotheader = true;
+       }
+       else
+       {
+               /* XXX: more validation should be done here */
+               if (total_len < SizeOfXLogRecord)
+               {
+                       report_invalid_record(state, "invalid record length at %X/%X",
+                                                                 (uint32) (RecPtr >> 32), (uint32) RecPtr);
+                       *errormsg = state->errormsg_buf;
+                       return NULL;
+               }
+               gotheader = false;
+       }
+
+       /*
+        * Enlarge readRecordBuf as needed.
+        */
+       if (total_len > state->readRecordBufSize &&
+               !allocate_recordbuf(state, total_len))
+       {
+               /* We treat this as a "bogus data" condition */
+               report_invalid_record(state, "record length %u at %X/%X too long",
+                                                         total_len,
+                                                         (uint32) (RecPtr >> 32), (uint32) RecPtr);
+               *errormsg = state->errormsg_buf;
+               return NULL;
+       }
+
+       len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
+       if (total_len > len)
+       {
+               /* Need to reassemble record */
+               char       *contdata;
+               XLogPageHeader pageHeader;
+               char       *buffer;
+               uint32          gotlen;
+
+               /* Copy the first fragment of the record from the first page. */
+               memcpy(state->readRecordBuf,
+                          state->readBuf + RecPtr % XLOG_BLCKSZ, len);
+               buffer = state->readRecordBuf + len;
+               gotlen = len;
+
+               do
+               {
+                       /* Calculate pointer to beginning of next page */
+                       targetPagePtr += XLOG_BLCKSZ;
+
+                       /* Wait for the next page to become available */
+                       readOff = ReadPageInternal(state, targetPagePtr,
+                                                                Min(total_len - gotlen + SizeOfXLogShortPHD,
+                                                                        XLOG_BLCKSZ));
+
+                       if (readOff < 0)
+                               goto err;
+
+                       Assert(SizeOfXLogShortPHD <= readOff);
+
+                       /* Check that the continuation on next page looks valid */
+                       pageHeader = (XLogPageHeader) state->readBuf;
+                       if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
+                       {
+                               report_invalid_record(state,
+                                                                         "there is no contrecord flag at %X/%X",
+                                                                  (uint32) (RecPtr >> 32), (uint32) RecPtr);
+                               goto err;
+                       }
+
+                       /*
+                        * Cross-check that xlp_rem_len agrees with how much of the record
+                        * we expect there to be left.
+                        */
+                       if (pageHeader->xlp_rem_len == 0 ||
+                               total_len != (pageHeader->xlp_rem_len + gotlen))
+                       {
+                               report_invalid_record(state,
+                                                                         "invalid contrecord length %u at %X/%X",
+                                                                         pageHeader->xlp_rem_len,
+                                                                  (uint32) (RecPtr >> 32), (uint32) RecPtr);
+                               goto err;
+                       }
+
+                       /* Append the continuation from this page to the buffer */
+                       pageHeaderSize = XLogPageHeaderSize(pageHeader);
+
+                       if (readOff < pageHeaderSize)
+                               readOff = ReadPageInternal(state, targetPagePtr,
+                                                                                  pageHeaderSize);
+
+                       Assert(pageHeaderSize <= readOff);
+
+                       contdata = (char *) state->readBuf + pageHeaderSize;
+                       len = XLOG_BLCKSZ - pageHeaderSize;
+                       if (pageHeader->xlp_rem_len < len)
+                               len = pageHeader->xlp_rem_len;
+
+                       if (readOff < pageHeaderSize + len)
+                               readOff = ReadPageInternal(state, targetPagePtr,
+                                                                                  pageHeaderSize + len);
+
+                       memcpy(buffer, (char *) contdata, len);
+                       buffer += len;
+                       gotlen += len;
+
+                       /* If we just reassembled the record header, validate it. */
+                       if (!gotheader)
+                       {
+                               record = (XLogRecord *) state->readRecordBuf;
+                               if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr,
+                                                                                  record, randAccess))
+                                       goto err;
+                               gotheader = true;
+                       }
+               } while (gotlen < total_len);
+
+               Assert(gotheader);
+
+               record = (XLogRecord *) state->readRecordBuf;
+               if (!ValidXLogRecord(state, record, RecPtr))
+                       goto err;
+
+               pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
+               state->ReadRecPtr = RecPtr;
+               state->EndRecPtr = targetPagePtr + pageHeaderSize
+                       + MAXALIGN(pageHeader->xlp_rem_len);
+       }
+       else
+       {
+               /* Wait for the record data to become available */
+               readOff = ReadPageInternal(state, targetPagePtr,
+                                                                Min(targetRecOff + total_len, XLOG_BLCKSZ));
+               if (readOff < 0)
+                       goto err;
+
+               /* Record does not cross a page boundary */
+               if (!ValidXLogRecord(state, record, RecPtr))
+                       goto err;
+
+               state->EndRecPtr = RecPtr + MAXALIGN(total_len);
+
+               state->ReadRecPtr = RecPtr;
+               memcpy(state->readRecordBuf, record, total_len);
+       }
+
+       /*
+        * Special processing if it's an XLOG SWITCH record
+        */
+       if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
+       {
+               /* Pretend it extends to end of segment */
+               state->EndRecPtr += XLogSegSize - 1;
+               state->EndRecPtr -= state->EndRecPtr % XLogSegSize;
+       }
+
+       return record;
+
+err:
+
+       /*
+        * Invalidate the xlog page we've cached. We might read from a different
+        * source after failure.
+        */
+       state->readSegNo = 0;
+       state->readOff = 0;
+       state->readLen = 0;
+
+       if (state->errormsg_buf[0] != '\0')
+               *errormsg = state->errormsg_buf;
+
+       return NULL;
+}
+
+/*
+ * Read a single xlog page including at least [pagestart, RecPtr] of valid data
+ * via the read_page() callback.
+ *
+ * Returns -1 if the required page cannot be read for some reason; errormsg_buf
+ * is set in that case (unless the error occurs in the read_page callback).
+ *
+ * We fetch the page from a reader-local cache if we know we have the required
+ * data and if there hasn't been any error since caching the data.
+ */
+static int
+ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
+{
+       int                     readLen;
+       uint32          targetPageOff;
+       XLogSegNo       targetSegNo;
+       XLogPageHeader hdr;
+
+       Assert((pageptr % XLOG_BLCKSZ) == 0);
+
+       XLByteToSeg(pageptr, targetSegNo);
+       targetPageOff = (pageptr % XLogSegSize);
+
+       /* check whether we have all the requested data already */
+       if (targetSegNo == state->readSegNo && targetPageOff == state->readOff &&
+               reqLen < state->readLen)
+               return state->readLen;
+
+       /*
+        * Data is not in our buffer.
+        *
+        * Every time we actually read the page, even if we looked at parts of it
+        * before, we need to do verification as the read_page callback might now
+        * be rereading data from a different source.
+        *
+        * Whenever switching to a new WAL segment, we read the first page of the
+        * file and validate its header, even if that's not where the target
+        * record is.  This is so that we can check the additional identification
+        * info that is present in the first page's "long" header.
+        */
+       if (targetSegNo != state->readSegNo &&
+               targetPageOff != 0)
+       {
+               XLogPageHeader hdr;
+               XLogRecPtr      targetSegmentPtr = pageptr - targetPageOff;
+
+               readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ,
+                                                                  state->readBuf, &state->readPageTLI);
+               if (readLen < 0)
+                       goto err;
+
+               /* we can be sure to have enough WAL available, we scrolled back */
+               Assert(readLen == XLOG_BLCKSZ);
+
+               hdr = (XLogPageHeader) state->readBuf;
+
+               if (!ValidXLogPageHeader(state, targetSegmentPtr, hdr))
+                       goto err;
+       }
+
+       /*
+        * First, read the requested data length, but at least a short page header
+        * so that we can validate it.
+        */
+       readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
+                                                          state->readBuf, &state->readPageTLI);
+       if (readLen < 0)
+               goto err;
+
+       Assert(readLen <= XLOG_BLCKSZ);
+
+       /* Do we have enough data to check the header length? */
+       if (readLen <= SizeOfXLogShortPHD)
+               goto err;
+
+       Assert(readLen >= reqLen);
+
+       hdr = (XLogPageHeader) state->readBuf;
+
+       /* still not enough */
+       if (readLen < XLogPageHeaderSize(hdr))
+       {
+               readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr),
+                                                                  state->readBuf, &state->readPageTLI);
+               if (readLen < 0)
+                       goto err;
+       }
+
+       /*
+        * Now that we know we have the full header, validate it.
+        */
+       if (!ValidXLogPageHeader(state, pageptr, hdr))
+               goto err;
+
+       /* update cache information */
+       state->readSegNo = targetSegNo;
+       state->readOff = targetPageOff;
+       state->readLen = readLen;
+
+       return readLen;
+
+err:
+       state->readSegNo = 0;
+       state->readOff = 0;
+       state->readLen = 0;
+       return -1;
+}
+
+/*
+ * Validate an XLOG record header.
+ *
+ * This is just a convenience subroutine to avoid duplicated code in
+ * XLogReadRecord.     It's not intended for use from anywhere else.
+ */
+static bool
+ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
+                                         XLogRecPtr PrevRecPtr, XLogRecord *record,
+                                         bool randAccess)
+{
+       /*
+        * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
+        * required.
+        */
+       if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
+       {
+               if (record->xl_len != 0)
+               {
+                       report_invalid_record(state,
+                                                                 "invalid xlog switch record at %X/%X",
+                                                                 (uint32) (RecPtr >> 32), (uint32) RecPtr);
+                       return false;
+               }
+       }
+       else if (record->xl_len == 0)
+       {
+               report_invalid_record(state,
+                                                         "record with zero length at %X/%X",
+                                                         (uint32) (RecPtr >> 32), (uint32) RecPtr);
+               return false;
+       }
+       if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len ||
+               record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
+               XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
+       {
+               report_invalid_record(state,
+                                                         "invalid record length at %X/%X",
+                                                         (uint32) (RecPtr >> 32), (uint32) RecPtr);
+               return false;
+       }
+       if (record->xl_rmid > RM_MAX_ID)
+       {
+               report_invalid_record(state,
+                                                         "invalid resource manager ID %u at %X/%X",
+                                                         record->xl_rmid, (uint32) (RecPtr >> 32),
+                                                         (uint32) RecPtr);
+               return false;
+       }
+       if (randAccess)
+       {
+               /*
+                * We can't exactly verify the prev-link, but surely it should be less
+                * than the record's own address.
+                */
+               if (!(record->xl_prev < RecPtr))
+               {
+                       report_invalid_record(state,
+                                                       "record with incorrect prev-link %X/%X at %X/%X",
+                                                                 (uint32) (record->xl_prev >> 32),
+                                                                 (uint32) record->xl_prev,
+                                                                 (uint32) (RecPtr >> 32), (uint32) RecPtr);
+                       return false;
+               }
+       }
+       else
+       {
+               /*
+                * Record's prev-link should exactly match our previous location. This
+                * check guards against torn WAL pages where a stale but valid-looking
+                * WAL record starts on a sector boundary.
+                */
+               if (record->xl_prev != PrevRecPtr)
+               {
+                       report_invalid_record(state,
+                                                       "record with incorrect prev-link %X/%X at %X/%X",
+                                                                 (uint32) (record->xl_prev >> 32),
+                                                                 (uint32) record->xl_prev,
+                                                                 (uint32) (RecPtr >> 32), (uint32) RecPtr);
+                       return false;
+               }
+       }
+
+       return true;
+}
+
+
+/*
+ * CRC-check an XLOG record.  We do not believe the contents of an XLOG
+ * record (other than to the minimal extent of computing the amount of
+ * data to read in) until we've checked the CRCs.
+ *
+ * We assume all of the record (that is, xl_tot_len bytes) has been read
+ * into memory at *record.     Also, ValidXLogRecordHeader() has accepted the
+ * record's header, which means in particular that xl_tot_len is at least
+ * SizeOfXlogRecord, so it is safe to fetch xl_len.
+ */
+static bool
+ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr)
+{
+       pg_crc32        crc;
+       int                     i;
+       uint32          len = record->xl_len;
+       BkpBlock        bkpb;
+       char       *blk;
+       size_t          remaining = record->xl_tot_len;
+
+       /* First the rmgr data */
+       if (remaining < SizeOfXLogRecord + len)
+       {
+               /* ValidXLogRecordHeader() should've caught this already... */
+               report_invalid_record(state, "invalid record length at %X/%X",
+                                                         (uint32) (recptr >> 32), (uint32) recptr);
+               return false;
+       }
+       remaining -= SizeOfXLogRecord + len;
+       INIT_CRC32(crc);
+       COMP_CRC32(crc, XLogRecGetData(record), len);
+
+       /* Add in the backup blocks, if any */
+       blk = (char *) XLogRecGetData(record) + len;
+       for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
+       {
+               uint32          blen;
+
+               if (!(record->xl_info & XLR_BKP_BLOCK(i)))
+                       continue;
+
+               if (remaining < sizeof(BkpBlock))
+               {
+                       report_invalid_record(state,
+                                                         "invalid backup block size in record at %X/%X",
+                                                                 (uint32) (recptr >> 32), (uint32) recptr);
+                       return false;
+               }
+               memcpy(&bkpb, blk, sizeof(BkpBlock));
+
+               if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ)
+               {
+                       report_invalid_record(state,
+                                                                 "incorrect hole size in record at %X/%X",
+                                                                 (uint32) (recptr >> 32), (uint32) recptr);
+                       return false;
+               }
+               blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length;
+
+               if (remaining < blen)
+               {
+                       report_invalid_record(state,
+                                                         "invalid backup block size in record at %X/%X",
+                                                                 (uint32) (recptr >> 32), (uint32) recptr);
+                       return false;
+               }
+               remaining -= blen;
+               COMP_CRC32(crc, blk, blen);
+               blk += blen;
+       }
+
+       /* Check that xl_tot_len agrees with our calculation */
+       if (remaining != 0)
+       {
+               report_invalid_record(state,
+                                                         "incorrect total length in record at %X/%X",
+                                                         (uint32) (recptr >> 32), (uint32) recptr);
+               return false;
+       }
+
+       /* Finally include the record header */
+       COMP_CRC32(crc, (char *) record, offsetof(XLogRecord, xl_crc));
+       FIN_CRC32(crc);
+
+       if (!EQ_CRC32(record->xl_crc, crc))
+       {
+               report_invalid_record(state,
+                          "incorrect resource manager data checksum in record at %X/%X",
+                                                         (uint32) (recptr >> 32), (uint32) recptr);
+               return false;
+       }
+
+       return true;
+}
+
+/*
+ * Validate a page header
+ */
+static bool
+ValidXLogPageHeader(XLogReaderState *state, XLogRecPtr recptr,
+                                       XLogPageHeader hdr)
+{
+       XLogRecPtr      recaddr;
+       XLogSegNo       segno;
+       int32           offset;
+
+       Assert((recptr % XLOG_BLCKSZ) == 0);
+
+       XLByteToSeg(recptr, segno);
+       offset = recptr % XLogSegSize;
+
+       XLogSegNoOffsetToRecPtr(segno, offset, recaddr);
+
+       if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
+       {
+               char            fname[MAXFNAMELEN];
+
+               XLogFileName(fname, state->readPageTLI, segno);
+
+               report_invalid_record(state,
+                                       "invalid magic number %04X in log segment %s, offset %u",
+                                                         hdr->xlp_magic,
+                                                         fname,
+                                                         offset);
+               return false;
+       }
+
+       if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
+       {
+               char            fname[MAXFNAMELEN];
+
+               XLogFileName(fname, state->readPageTLI, segno);
+
+               report_invalid_record(state,
+                                          "invalid info bits %04X in log segment %s, offset %u",
+                                                         hdr->xlp_info,
+                                                         fname,
+                                                         offset);
+               return false;
+       }
+
+       if (hdr->xlp_info & XLP_LONG_HEADER)
+       {
+               XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
+
+               if (state->system_identifier &&
+                       longhdr->xlp_sysid != state->system_identifier)
+               {
+                       char            fhdrident_str[32];
+                       char            sysident_str[32];
+
+                       /*
+                        * Format sysids separately to keep platform-dependent format code
+                        * out of the translatable message string.
+                        */
+                       snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT,
+                                        longhdr->xlp_sysid);
+                       snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT,
+                                        state->system_identifier);
+                       report_invalid_record(state,
+                                                                 "WAL file is from different database system: WAL file database system identifier is %s, pg_control database system identifier is %s.",
+                                                                 fhdrident_str, sysident_str);
+                       return false;
+               }
+               else if (longhdr->xlp_seg_size != XLogSegSize)
+               {
+                       report_invalid_record(state,
+                                                                 "WAL file is from different database system: Incorrect XLOG_SEG_SIZE in page header.");
+                       return false;
+               }
+               else if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
+               {
+                       report_invalid_record(state,
+                                                                 "WAL file is from different database system: Incorrect XLOG_BLCKSZ in page header.");
+                       return false;
+               }
+       }
+       else if (offset == 0)
+       {
+               char            fname[MAXFNAMELEN];
+
+               XLogFileName(fname, state->readPageTLI, segno);
+
+               /* hmm, first page of file doesn't have a long header? */
+               report_invalid_record(state,
+                                          "invalid info bits %04X in log segment %s, offset %u",
+                                                         hdr->xlp_info,
+                                                         fname,
+                                                         offset);
+               return false;
+       }
+
+       if (hdr->xlp_pageaddr != recaddr)
+       {
+               char            fname[MAXFNAMELEN];
+
+               XLogFileName(fname, state->readPageTLI, segno);
+
+               report_invalid_record(state,
+                                       "unexpected pageaddr %X/%X in log segment %s, offset %u",
+                         (uint32) (hdr->xlp_pageaddr >> 32), (uint32) hdr->xlp_pageaddr,
+                                                         fname,
+                                                         offset);
+               return false;
+       }
+
+       /*
+        * Since child timelines are always assigned a TLI greater than their
+        * immediate parent's TLI, we should never see TLI go backwards across
+        * successive pages of a consistent WAL sequence.
+        *
+        * Sometimes we re-read a segment that's already been (partially) read. So
+        * we only verify TLIs for pages that are later than the last remembered
+        * LSN.
+        */
+       if (recptr > state->latestPagePtr)
+       {
+               if (hdr->xlp_tli < state->latestPageTLI)
+               {
+                       char            fname[MAXFNAMELEN];
+
+                       XLogFileName(fname, state->readPageTLI, segno);
+
+                       report_invalid_record(state,
+                                                                 "out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u",
+                                                                 hdr->xlp_tli,
+                                                                 state->latestPageTLI,
+                                                                 fname,
+                                                                 offset);
+                       return false;
+               }
+       }
+       state->latestPagePtr = recptr;
+       state->latestPageTLI = hdr->xlp_tli;
+
+       return true;
+}
+
+#ifdef FRONTEND
+/*
+ * Functions that are currently not needed in the backend, but are better
+ * implemented inside xlogreader.c because of the internal facilities available
+ * here.
+ */
+
+/*
+ * Find the first record with at an lsn >= RecPtr.
+ *
+ * Useful for checking wether RecPtr is a valid xlog address for reading and to
+ * find the first valid address after some address when dumping records for
+ * debugging purposes.
+ */
+XLogRecPtr
+XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
+{
+       XLogReaderState saved_state = *state;
+       XLogRecPtr      targetPagePtr;
+       XLogRecPtr      tmpRecPtr;
+       int                     targetRecOff;
+       XLogRecPtr      found = InvalidXLogRecPtr;
+       uint32          pageHeaderSize;
+       XLogPageHeader header;
+       XLogRecord *record;
+       int                     readLen;
+       char       *errormsg;
+
+       Assert(!XLogRecPtrIsInvalid(RecPtr));
+
+       targetRecOff = RecPtr % XLOG_BLCKSZ;
+
+       /* scroll back to page boundary */
+       targetPagePtr = RecPtr - targetRecOff;
+
+       /* Read the page containing the record */
+       readLen = ReadPageInternal(state, targetPagePtr, targetRecOff);
+       if (readLen < 0)
+               goto err;
+
+       header = (XLogPageHeader) state->readBuf;
+
+       pageHeaderSize = XLogPageHeaderSize(header);
+
+       /* make sure we have enough data for the page header */
+       readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize);
+       if (readLen < 0)
+               goto err;
+
+       /* skip over potential continuation data */
+       if (header->xlp_info & XLP_FIRST_IS_CONTRECORD)
+       {
+               /* record headers are MAXALIGN'ed */
+               tmpRecPtr = targetPagePtr + pageHeaderSize
+                       + MAXALIGN(header->xlp_rem_len);
+       }
+       else
+       {
+               tmpRecPtr = targetPagePtr + pageHeaderSize;
+       }
+
+       /*
+        * we know now that tmpRecPtr is an address pointing to a valid XLogRecord
+        * because either we're at the first record after the beginning of a page
+        * or we just jumped over the remaining data of a continuation.
+        */
+       while ((record = XLogReadRecord(state, tmpRecPtr, &errormsg)))
+       {
+               /* continue after the record */
+               tmpRecPtr = InvalidXLogRecPtr;
+
+               /* past the record we've found, break out */
+               if (RecPtr <= state->ReadRecPtr)
+               {
+                       found = state->ReadRecPtr;
+                       goto out;
+               }
+       }
+
+err:
+out:
+       /* Reset state to what we had before finding the record */
+       state->readSegNo = 0;
+       state->readOff = 0;
+       state->readLen = 0;
+       state->ReadRecPtr = saved_state.ReadRecPtr;
+       state->EndRecPtr = saved_state.EndRecPtr;
+
+       return found;
+}
+
+#endif   /* FRONTEND */
index 30f6a2bf9f8ae746f654ea8141f01a04e633de26..c072de7fa7a0656bac6190224b2d5838efd1bdc2 100644 (file)
@@ -4,12 +4,13 @@ AVAIL_LANGUAGES  = de es fr ja pt_BR tr zh_CN zh_TW
 GETTEXT_FILES    = + gettext-files
 GETTEXT_TRIGGERS = $(BACKEND_COMMON_GETTEXT_TRIGGERS) \
     GUC_check_errmsg GUC_check_errdetail GUC_check_errhint \
-    write_stderr yyerror parser_yyerror
+    write_stderr yyerror parser_yyerror report_invalid_record
 GETTEXT_FLAGS    = $(BACKEND_COMMON_GETTEXT_FLAGS) \
     GUC_check_errmsg:1:c-format \
     GUC_check_errdetail:1:c-format \
     GUC_check_errhint:1:c-format \
-    write_stderr:1:c-format
+    write_stderr:1:c-format \
+    report_invalid_record:2:c-format
 
 gettext-files: distprep
        find $(srcdir)/ $(srcdir)/../port/ -name '*.c' -print | LC_ALL=C sort >$@
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
new file mode 100644 (file)
index 0000000..36907d6
--- /dev/null
@@ -0,0 +1,116 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogreader.h
+ *             Definitions for the generic XLog reading facility
+ *
+ * Portions Copyright (c) 2013, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *             src/include/access/xlogreader.h
+ *
+ * NOTES
+ *             See the definition of the XLogReaderState struct for instructions on
+ *             how to use the XLogReader infrastructure.
+ *
+ *             The basic idea is to allocate an XLogReaderState via
+ *             XLogReaderAllocate(), and call XLogReadRecord() until it returns NULL.
+ *-------------------------------------------------------------------------
+ */
+#ifndef XLOGREADER_H
+#define XLOGREADER_H
+
+#include "access/xlog_internal.h"
+
+typedef struct XLogReaderState XLogReaderState;
+
+/* Function type definition for the read_page callback */
+typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,
+                                                                                  XLogRecPtr targetPagePtr,
+                                                                                  int reqLen,
+                                                                                  char *readBuf,
+                                                                                  TimeLineID *pageTLI);
+
+struct XLogReaderState
+{
+       /* ----------------------------------------
+        * Public parameters
+        * ----------------------------------------
+        */
+
+       /*
+        * Data input callback (mandatory).
+        *
+        * This callback shall read at least reqLen valid bytes of the xlog page
+        * starting at targetPagePtr, and store them in readBuf.  The callback
+        * shall return the number of bytes read (never more than XLOG_BLCKSZ), or
+        * -1 on failure.  The callback shall sleep, if necessary, to wait for the
+        * requested bytes to become available.  The callback will not be invoked
+        * again for the same page unless more than the returned number of bytes
+        * are necessary.
+        *
+        * *pageTLI should be set to the TLI of the file the page was read from.
+        * It is currently used only for error reporting purposes, to reconstruct
+        * the name of the WAL file where an error occurred.
+        */
+       XLogPageReadCB read_page;
+
+       /*
+        * System identifier of the xlog files we're about to read.  Set to zero
+        * (the default value) if unknown or unimportant.
+        */
+       uint64          system_identifier;
+
+       /*
+        * Opaque data for callbacks to use.  Not used by XLogReader.
+        */
+       void       *private_data;
+
+       /*
+        * Start and end point of last record read.  EndRecPtr is also used as the
+        * position to read next, if XLogReadRecord receives an invalid recptr.
+        */
+       XLogRecPtr      ReadRecPtr;             /* start of last record read */
+       XLogRecPtr      EndRecPtr;              /* end+1 of last record read */
+
+       /* ----------------------------------------
+        * private/internal state
+        * ----------------------------------------
+        */
+
+       /* Buffer for currently read page (XLOG_BLCKSZ bytes) */
+       char       *readBuf;
+
+       /* last read segment, segment offset, read length, TLI */
+       XLogSegNo       readSegNo;
+       uint32          readOff;
+       uint32          readLen;
+       TimeLineID      readPageTLI;
+
+       /* beginning of last page read, and its TLI  */
+       XLogRecPtr      latestPagePtr;
+       TimeLineID      latestPageTLI;
+
+       /* Buffer for current ReadRecord result (expandable) */
+       char       *readRecordBuf;
+       uint32          readRecordBufSize;
+
+       /* Buffer to hold error message */
+       char       *errormsg_buf;
+};
+
+/* Get a new XLogReader */
+extern XLogReaderState *XLogReaderAllocate(XLogPageReadCB pagereadfunc,
+                                  void *private_data);
+
+/* Free an XLogReader */
+extern void XLogReaderFree(XLogReaderState *state);
+
+/* Read the next XLog record. Returns NULL on end-of-WAL or failure */
+extern struct XLogRecord *XLogReadRecord(XLogReaderState *state,
+                          XLogRecPtr recptr, char **errormsg);
+
+#ifdef FRONTEND
+extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr);
+#endif   /* FRONTEND */
+
+#endif   /* XLOGREADER_H */