static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt);
static void CheckRecoveryConsistency(void);
-static bool ValidXLOGHeader(XLogPageHeader hdr, int emode);
+static bool ValidXLogPageHeader(XLogPageHeader hdr, int emode);
+static bool ValidXLogRecordHeader(XLogRecPtr *RecPtr, XLogRecord *record,
+ int emode, bool randAccess);
static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
static List *readTimeLineHistory(TimeLineID targetTLI);
static bool existsTimeLineHistory(TimeLineID probeTLI);
XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
{
XLogCtlInsert *Insert = &XLogCtl->Insert;
- XLogRecord *record;
XLogRecPtr RecPtr;
XLogRecPtr WriteRqst;
uint32 freespace;
XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS];
XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS];
XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS];
+ XLogRecData hdr_rdt;
pg_crc32 rdata_crc;
uint32 len,
write_len;
bool doPageWrites;
bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
uint8 info_orig = info;
+ static XLogRecord *rechdr;
+
+ if (rechdr == NULL)
+ {
+ rechdr = malloc(SizeOfXLogRecord);
+ if (rechdr == NULL)
+ elog(ERROR, "out of memory");
+ MemSet(rechdr, 0, SizeOfXLogRecord);
+ }
/* cross-check on whether we should be here or not */
if (!XLogInsertAllowed())
for (rdt = rdata; rdt != NULL; rdt = rdt->next)
COMP_CRC32(rdata_crc, rdt->data, rdt->len);
+ /*
+ * Construct record header (prev-link and CRC are filled in later), and
+ * make that the first chunk in the chain.
+ */
+ rechdr->xl_xid = GetCurrentTransactionIdIfAny();
+ rechdr->xl_tot_len = SizeOfXLogRecord + write_len;
+ rechdr->xl_len = len; /* doesn't include backup blocks */
+ rechdr->xl_info = info;
+ rechdr->xl_rmid = rmid;
+
+ hdr_rdt.next = rdata;
+ hdr_rdt.data = (char *) rechdr;
+ hdr_rdt.len = SizeOfXLogRecord;
+
+ write_len += SizeOfXLogRecord;
+
START_CRIT_SECTION();
/* Now wait to get insert lock */
}
/*
- * If there isn't enough space on the current XLOG page for a record
- * header, advance to the next page (leaving the unused space as zeroes).
+ * If the current page is completely full, the record goes to the next
+ * page, right after the page header.
*/
updrqst = false;
freespace = INSERT_FREESPACE(Insert);
- if (freespace < SizeOfXLogRecord)
+ if (freespace == 0)
{
updrqst = AdvanceXLInsertBuffer(false);
freespace = INSERT_FREESPACE(Insert);
return RecPtr;
}
- /* Insert record header */
-
- record = (XLogRecord *) Insert->currpos;
- record->xl_prev = Insert->PrevRecord;
- record->xl_xid = GetCurrentTransactionIdIfAny();
- record->xl_tot_len = SizeOfXLogRecord + write_len;
- record->xl_len = len; /* doesn't include backup blocks */
- record->xl_info = info;
- record->xl_rmid = rmid;
+ /* Finish the record header */
+ rechdr->xl_prev = Insert->PrevRecord;
/* Now we can finish computing the record's CRC */
- COMP_CRC32(rdata_crc, (char *) record + sizeof(pg_crc32),
- SizeOfXLogRecord - sizeof(pg_crc32));
+ COMP_CRC32(rdata_crc, (char *) rechdr, offsetof(XLogRecord, xl_crc));
FIN_CRC32(rdata_crc);
- record->xl_crc = rdata_crc;
+ rechdr->xl_crc = rdata_crc;
#ifdef WAL_DEBUG
if (XLOG_DEBUG)
initStringInfo(&buf);
appendStringInfo(&buf, "INSERT @ %X/%X: ",
RecPtr.xlogid, RecPtr.xrecoff);
- xlog_outrec(&buf, record);
+ xlog_outrec(&buf, rechdr);
if (rdata->data != NULL)
{
appendStringInfo(&buf, " - ");
- RmgrTable[record->xl_rmid].rm_desc(&buf, record->xl_info, rdata->data);
+ RmgrTable[rechdr->xl_rmid].rm_desc(&buf, rechdr->xl_info, rdata->data);
}
elog(LOG, "%s", buf.data);
pfree(buf.data);
ProcLastRecPtr = RecPtr;
Insert->PrevRecord = RecPtr;
- Insert->currpos += SizeOfXLogRecord;
- freespace -= SizeOfXLogRecord;
-
/*
* Append the data, including backup blocks if any
*/
+ rdata = &hdr_rdt;
while (write_len)
{
while (rdata->data == NULL)
/* normal case, ie not xlog switch */
/* Need to update shared LogwrtRqst if some block was filled up */
- if (freespace < SizeOfXLogRecord)
+ if (freespace == 0)
{
/* curridx is filled and available for writing out */
updrqst = true;
XLogCtlInsert *Insert = &XLogCtl->Insert;
uint32 freespace = INSERT_FREESPACE(Insert);
- if (freespace < SizeOfXLogRecord) /* buffer is full */
+ if (freespace == 0) /* buffer is full */
WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
else
{
}
/* Finally include the record header */
- COMP_CRC32(crc, (char *) record + sizeof(pg_crc32),
- SizeOfXLogRecord - sizeof(pg_crc32));
+ COMP_CRC32(crc, (char *) record, offsetof(XLogRecord, xl_crc));
FIN_CRC32(crc);
if (!EQ_CRC32(record->xl_crc, crc))
ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
{
XLogRecord *record;
- char *buffer;
XLogRecPtr tmpRecPtr = EndRecPtr;
bool randAccess = false;
uint32 len,
total_len;
uint32 targetRecOff;
uint32 pageHeaderSize;
+ bool gotheader;
if (readBuf == NULL)
{
RecPtr = &tmpRecPtr;
/*
- * RecPtr is pointing to end+1 of the previous WAL record. We must
- * advance it if necessary to where the next record starts. First,
- * align to next page if no more records can fit on the current page.
- */
- if (XLOG_BLCKSZ - (RecPtr->xrecoff % XLOG_BLCKSZ) < SizeOfXLogRecord)
- NextLogPage(*RecPtr);
-
- /*
- * If at page start, we must skip over the page header. But we can't
- * do that until we've read in the page, since the header size is
- * variable.
+ * RecPtr is pointing to end+1 of the previous WAL record. If
+ * we're at a page boundary, no more records can fit on the current
+ * page. We must skip over the page header, but we can't do that
+ * until we've read in the page, since the header size is variable.
*/
}
else
* to go backwards (but we can't reset that variable right here, since
* we might not change files at all).
*/
- lastPageTLI = 0; /* see comment in ValidXLOGHeader */
+ lastPageTLI = 0; /* see comment in ValidXLogPageHeader */
randAccess = true; /* allow curFileTLI to go backwards too */
}
RecPtr->xlogid, RecPtr->xrecoff)));
goto next_record_is_invalid;
}
- record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % XLOG_BLCKSZ);
/*
- * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
- * required.
+ * NB: Even though we use an XLogRecord pointer here, the whole record
+ * header might not fit on this page. xl_tot_len is the first field in
+ * struct, so it must be on this page, but we cannot safely access any
+ * other fields yet.
*/
- if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
- {
- if (record->xl_len != 0)
- {
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errmsg("invalid xlog switch record at %X/%X",
- RecPtr->xlogid, RecPtr->xrecoff)));
- goto next_record_is_invalid;
- }
- }
- else if (record->xl_len == 0)
- {
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errmsg("record with zero length at %X/%X",
- RecPtr->xlogid, RecPtr->xrecoff)));
- goto next_record_is_invalid;
- }
- if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len ||
- record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
- XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
- {
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errmsg("invalid record length at %X/%X",
- RecPtr->xlogid, RecPtr->xrecoff)));
- goto next_record_is_invalid;
- }
- if (record->xl_rmid > RM_MAX_ID)
- {
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errmsg("invalid resource manager ID %u at %X/%X",
- record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff)));
- goto next_record_is_invalid;
- }
- if (randAccess)
- {
- /*
- * We can't exactly verify the prev-link, but surely it should be less
- * than the record's own address.
- */
- if (!XLByteLT(record->xl_prev, *RecPtr))
- {
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errmsg("record with incorrect prev-link %X/%X at %X/%X",
- record->xl_prev.xlogid, record->xl_prev.xrecoff,
- RecPtr->xlogid, RecPtr->xrecoff)));
- goto next_record_is_invalid;
- }
- }
- else
- {
- /*
- * Record's prev-link should exactly match our previous location. This
- * check guards against torn WAL pages where a stale but valid-looking
- * WAL record starts on a sector boundary.
- */
- if (!XLByteEQ(record->xl_prev, ReadRecPtr))
- {
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errmsg("record with incorrect prev-link %X/%X at %X/%X",
- record->xl_prev.xlogid, record->xl_prev.xrecoff,
- RecPtr->xlogid, RecPtr->xrecoff)));
- goto next_record_is_invalid;
- }
- }
+ record = (XLogRecord *) (readBuf + RecPtr->xrecoff % XLOG_BLCKSZ);
+ total_len = record->xl_tot_len;
/*
* Allocate or enlarge readRecordBuf as needed. To avoid useless small
* enough for all "normal" records, but very large commit or abort records
* might need more space.)
*/
- total_len = record->xl_tot_len;
if (total_len > readRecordBufSize)
{
uint32 newSize = total_len;
readRecordBufSize = newSize;
}
- buffer = readRecordBuf;
+ /*
+ * If we got the whole header already, validate it immediately. Otherwise
+ * we validate it after reading the rest of the header from the next page.
+ */
+ if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
+ {
+ if (!ValidXLogRecordHeader(RecPtr, record, emode, randAccess))
+ goto next_record_is_invalid;
+ gotheader = true;
+ }
+ else
+ gotheader = false;
+
len = XLOG_BLCKSZ - RecPtr->xrecoff % XLOG_BLCKSZ;
if (total_len > len)
{
char *contrecord;
XLogPageHeader pageHeader;
XLogRecPtr pagelsn;
- uint32 gotlen = len;
+ char *buffer;
+ uint32 gotlen;
/* Initialize pagelsn to the beginning of the page this record is on */
pagelsn = *RecPtr;
pagelsn.xrecoff = (pagelsn.xrecoff / XLOG_BLCKSZ) * XLOG_BLCKSZ;
- memcpy(buffer, record, len);
- record = (XLogRecord *) buffer;
- buffer += len;
- for (;;)
+ /* Copy the first fragment of the record from the first page. */
+ memcpy(readRecordBuf, readBuf + RecPtr->xrecoff % XLOG_BLCKSZ, len);
+ buffer = readRecordBuf + len;
+ gotlen = len;
+
+ do
{
/* Calculate pointer to beginning of next page */
XLByteAdvance(pagelsn, XLOG_BLCKSZ);
if (!XLogPageRead(&pagelsn, emode, false, false))
return NULL;
- /* Check that the continuation record looks valid */
- if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD))
+ /* Check that the continuation on next page looks valid */
+ pageHeader = (XLogPageHeader) readBuf;
+ if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
{
ereport(emode_for_corrupt_record(emode, *RecPtr),
(errmsg("there is no contrecord flag in log segment %s, offset %u",
readOff)));
goto next_record_is_invalid;
}
- pageHeader = (XLogPageHeader) readBuf;
- pageHeaderSize = XLogPageHeaderSize(pageHeader);
- contrecord = (char *) readBuf + pageHeaderSize;
+ /*
+ * Cross-check that xlp_rem_len agrees with how much of the record
+ * we expect there to be left.
+ */
if (pageHeader->xlp_rem_len == 0 ||
total_len != (pageHeader->xlp_rem_len + gotlen))
{
- char fname[MAXFNAMELEN];
- XLogFileName(fname, curFileTLI, readSegNo);
ereport(emode_for_corrupt_record(emode, *RecPtr),
(errmsg("invalid contrecord length %u in log segment %s, offset %u",
pageHeader->xlp_rem_len,
readOff)));
goto next_record_is_invalid;
}
+
+ /* Append the continuation from this page to the buffer */
+ pageHeaderSize = XLogPageHeaderSize(pageHeader);
+ contrecord = (char *) readBuf + pageHeaderSize;
len = XLOG_BLCKSZ - pageHeaderSize;
- if (pageHeader->xlp_rem_len > len)
+ if (pageHeader->xlp_rem_len < len)
+ len = pageHeader->xlp_rem_len;
+ memcpy(buffer, (char *) contrecord, len);
+ buffer += len;
+ gotlen += len;
+
+ /* If we just reassembled the record header, validate it. */
+ if (!gotheader)
{
- memcpy(buffer, (char *) contrecord, len);
- gotlen += len;
- buffer += len;
- continue;
+ record = (XLogRecord *) readRecordBuf;
+ if (!ValidXLogRecordHeader(RecPtr, record, emode, randAccess))
+ goto next_record_is_invalid;
+ gotheader = true;
}
- memcpy(buffer, (char *) contrecord, pageHeader->xlp_rem_len);
- break;
- }
+ } while (pageHeader->xlp_rem_len > len);
+
+ record = (XLogRecord *) readRecordBuf;
if (!RecordIsValid(record, *RecPtr, emode))
goto next_record_is_invalid;
pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
readOff + pageHeaderSize + MAXALIGN(pageHeader->xlp_rem_len),
EndRecPtr);
ReadRecPtr = *RecPtr;
- /* needn't worry about XLOG SWITCH, it can't cross page boundaries */
- return record;
}
+ else
+ {
+ /* Record does not cross a page boundary */
+ if (!RecordIsValid(record, *RecPtr, emode))
+ goto next_record_is_invalid;
+ EndRecPtr.xlogid = RecPtr->xlogid;
+ EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);
- /* Record does not cross a page boundary */
- if (!RecordIsValid(record, *RecPtr, emode))
- goto next_record_is_invalid;
- EndRecPtr.xlogid = RecPtr->xlogid;
- EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);
-
- ReadRecPtr = *RecPtr;
- memcpy(buffer, record, total_len);
+ ReadRecPtr = *RecPtr;
+ memcpy(readRecordBuf, record, total_len);
+ }
/*
* Special processing if it's an XLOG SWITCH record
*/
readOff = XLogSegSize - XLOG_BLCKSZ;
}
- return (XLogRecord *) buffer;
+ return record;
next_record_is_invalid:
failedSources |= readSource;
* ReadRecord. It's not intended for use from anywhere else.
*/
static bool
-ValidXLOGHeader(XLogPageHeader hdr, int emode)
+ValidXLogPageHeader(XLogPageHeader hdr, int emode)
{
XLogRecPtr recaddr;
return true;
}
+/*
+ * Validate an XLOG record header.
+ *
+ * This is just a convenience subroutine to avoid duplicated code in
+ * ReadRecord. It's not intended for use from anywhere else.
+ */
+static bool
+ValidXLogRecordHeader(XLogRecPtr *RecPtr, XLogRecord *record, int emode,
+ bool randAccess)
+{
+ /*
+ * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
+ * required.
+ */
+ if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
+ {
+ if (record->xl_len != 0)
+ {
+ ereport(emode_for_corrupt_record(emode, *RecPtr),
+ (errmsg("invalid xlog switch record at %X/%X",
+ RecPtr->xlogid, RecPtr->xrecoff)));
+ return false;
+ }
+ }
+ else if (record->xl_len == 0)
+ {
+ ereport(emode_for_corrupt_record(emode, *RecPtr),
+ (errmsg("record with zero length at %X/%X",
+ RecPtr->xlogid, RecPtr->xrecoff)));
+ return false;
+ }
+ if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len ||
+ record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
+ XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
+ {
+ ereport(emode_for_corrupt_record(emode, *RecPtr),
+ (errmsg("invalid record length at %X/%X",
+ RecPtr->xlogid, RecPtr->xrecoff)));
+ return false;
+ }
+ if (record->xl_rmid > RM_MAX_ID)
+ {
+ ereport(emode_for_corrupt_record(emode, *RecPtr),
+ (errmsg("invalid resource manager ID %u at %X/%X",
+ record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff)));
+ return false;
+ }
+ if (randAccess)
+ {
+ /*
+ * We can't exactly verify the prev-link, but surely it should be less
+ * than the record's own address.
+ */
+ if (!XLByteLT(record->xl_prev, *RecPtr))
+ {
+ ereport(emode_for_corrupt_record(emode, *RecPtr),
+ (errmsg("record with incorrect prev-link %X/%X at %X/%X",
+ record->xl_prev.xlogid, record->xl_prev.xrecoff,
+ RecPtr->xlogid, RecPtr->xrecoff)));
+ return false;
+ }
+ }
+ else
+ {
+ /*
+ * Record's prev-link should exactly match our previous location. This
+ * check guards against torn WAL pages where a stale but valid-looking
+ * WAL record starts on a sector boundary.
+ */
+ if (!XLByteEQ(record->xl_prev, ReadRecPtr))
+ {
+ ereport(emode_for_corrupt_record(emode, *RecPtr),
+ (errmsg("record with incorrect prev-link %X/%X at %X/%X",
+ record->xl_prev.xlogid, record->xl_prev.xrecoff,
+ RecPtr->xlogid, RecPtr->xrecoff)));
+ return false;
+ }
+ }
+
+ return true;
+}
+
/*
* Try to read a timeline's history file.
*
INIT_CRC32(crc);
COMP_CRC32(crc, &checkPoint, sizeof(checkPoint));
- COMP_CRC32(crc, (char *) record + sizeof(pg_crc32),
- SizeOfXLogRecord - sizeof(pg_crc32));
+ COMP_CRC32(crc, (char *) record, offsetof(XLogRecord, xl_crc));
FIN_CRC32(crc);
record->xl_crc = crc;
* checkpoint, even though physically before it. Got that?
*/
freespace = INSERT_FREESPACE(Insert);
- if (freespace < SizeOfXLogRecord)
+ if (freespace == 0)
{
(void) AdvanceXLInsertBuffer(false);
/* OK to ignore update return flag, since we will do flush anyway */
fname, readOff)));
goto next_record_is_invalid;
}
- if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
+ if (!ValidXLogPageHeader((XLogPageHeader) readBuf, emode))
goto next_record_is_invalid;
}
fname, readOff)));
goto next_record_is_invalid;
}
- if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
+ if (!ValidXLogPageHeader((XLogPageHeader) readBuf, emode))
goto next_record_is_invalid;
Assert(targetSegNo == readSegNo);