*
* NOTES
* See xlogreader.h for more notes on this facility.
+ *
+ * This file is compiled as both front-end and backend code, so it
+ * may not use ereport, server-defined static variables, etc.
*-------------------------------------------------------------------------
*/
-
#include "postgres.h"
#include "access/transam.h"
{
XLogRecord *record;
XLogRecPtr targetPagePtr;
- bool randAccess = false;
+ bool randAccess;
uint32 len,
total_len;
uint32 targetRecOff;
bool gotheader;
int readOff;
+ /*
+ * randAccess indicates whether to verify the previous-record pointer of
+ * the record we're reading. We only do this if we're reading
+ * sequentially, which is what we initially assume.
+ */
+ randAccess = false;
+
/* reset error state */
*errormsg = NULL;
state->errormsg_buf[0] = '\0';
if (RecPtr == InvalidXLogRecPtr)
{
+ /* No explicit start point; read the record after the one we just read */
RecPtr = state->EndRecPtr;
if (state->ReadRecPtr == InvalidXLogRecPtr)
else
{
/*
+ * Caller supplied a position to start at.
+ *
* In this case, the passed-in record pointer should already be
* pointing to a valid record starting position.
*/
Assert(XRecOffIsValid(RecPtr));
- randAccess = true; /* allow readPageTLI to go backwards too */
+ randAccess = true;
}
state->currRecPtr = RecPtr;
/* XXX: more validation should be done here */
if (total_len < SizeOfXLogRecord)
{
- report_invalid_record(state, "invalid record length at %X/%X",
- (uint32) (RecPtr >> 32), (uint32) RecPtr);
+ report_invalid_record(state,
+ "invalid record length at %X/%X: wanted %lu, got %u",
+ (uint32) (RecPtr >> 32), (uint32) RecPtr,
+ SizeOfXLogRecord, total_len);
goto err;
}
gotheader = false;
err:
/*
- * Invalidate the xlog page we've cached. We might read from a different
- * source after failure.
+ * Invalidate the read state. We might read from a different source after
+ * failure.
*/
- state->readSegNo = 0;
- state->readOff = 0;
- state->readLen = 0;
+ XLogReaderInvalReadState(state);
if (state->errormsg_buf[0] != '\0')
*errormsg = state->errormsg_buf;
if (!ValidXLogPageHeader(state, pageptr, hdr))
goto err;
- /* update cache information */
+ /* update read state information */
state->readSegNo = targetSegNo;
state->readOff = targetPageOff;
state->readLen = readLen;
return readLen;
err:
+ XLogReaderInvalReadState(state);
+ return -1;
+}
+
+/*
+ * Invalidate the xlogreader's read state to force a re-read.
+ */
+void
+XLogReaderInvalReadState(XLogReaderState *state)
+{
state->readSegNo = 0;
state->readOff = 0;
state->readLen = 0;
- return -1;
}
/*
if (record->xl_tot_len < SizeOfXLogRecord)
{
report_invalid_record(state,
- "invalid record length at %X/%X",
- (uint32) (RecPtr >> 32), (uint32) RecPtr);
+ "invalid record length at %X/%X: wanted %lu, got %u",
+ (uint32) (RecPtr >> 32), (uint32) RecPtr,
+ SizeOfXLogRecord, record->xl_tot_len);
return false;
}
if (record->xl_rmid > RM_MAX_ID)
err:
out:
/* Reset state to what we had before finding the record */
- state->readSegNo = 0;
- state->readOff = 0;
- state->readLen = 0;
state->ReadRecPtr = saved_state.ReadRecPtr;
state->EndRecPtr = saved_state.EndRecPtr;
+ XLogReaderInvalReadState(state);
return found;
}
#include <unistd.h>
-#include "miscadmin.h"
-
#include "access/xlog.h"
#include "access/xlog_internal.h"
#include "access/xlogutils.h"
#include "catalog/catalog.h"
+#include "miscadmin.h"
#include "storage/smgr.h"
#include "utils/guc.h"
#include "utils/hsearch.h"
}
/*
- * TODO: This is duplicate code with pg_xlogdump, similar to walsender.c, but
- * we currently don't have the infrastructure (elog!) to share it.
+ * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
+ * in timeline 'tli'.
+ *
+ * Will open, and keep open, one WAL segment stored in the static file
+ * descriptor 'sendFile'. This means if XLogRead is used once, there will
+ * always be one descriptor left open until the process ends, but never
+ * more than one.
+ *
+ * XXX This is very similar to pg_xlogdump's XLogDumpXLogRead and to XLogRead
+ * in walsender.c but for small differences (such as lack of elog() in
+ * frontend). Probably these should be merged at some point.
*/
static void
XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
XLogRecPtr recptr;
Size nbytes;
+ /* state maintained across calls */
static int sendFile = -1;
static XLogSegNo sendSegNo = 0;
static uint32 sendOff = 0;
startoff = recptr % XLogSegSize;
+ /* Do we need to switch to a different xlog segment? */
if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
{
char path[MAXPGPATH];
- /* Switch to another logfile segment */
if (sendFile >= 0)
close(sendFile);
* Public because it would likely be very helpful for someone writing another
* output method outside walsender, e.g. in a bgworker.
*
- * TODO: The walsender has it's own version of this, but it relies on the
+ * TODO: The walsender has its own version of this, but it relies on the
* walsender's latch being set whenever WAL is flushed. No such infrastructure
* exists for normal backends, so we have to do a check/sleep/repeat style of
* loop for now.
int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
{
return read_local_xlog_page(state, targetPagePtr, reqLen,
- targetRecPtr, cur_page, pageTLI);
+ targetRecPtr, cur_page, pageTLI);
}
/*
PG_TRY();
{
+ /*
+ * Passing InvalidXLogRecPtr here causes replay to start at the slot's
+ * confirmed_flush.
+ */
ctx = CreateDecodingContext(InvalidXLogRecPtr,
options,
logical_read_local_xlog_page,
ctx->output_writer_private = p;
+ /*
+ * We start reading xlog from the restart lsn, even though in
+ * CreateDecodingContext we set the snapshot builder up using the
+ * slot's confirmed_flush. This means we might read xlog we don't
+ * actually decode rows from, but the snapshot builder might need it
+ * to get to a consistent point. The point we start returning data to
+ * *users* at is the candidate restart lsn from the decoding context.
+ */
startptr = MyReplicationSlot->data.restart_lsn;
CurrentResourceOwner = ResourceOwnerCreate(CurrentResourceOwner, "logical decoding");
if (errm)
elog(ERROR, "%s", errm);
+ /*
+ * Now that we've set up the xlog reader state, subsequent calls
+ * pass InvalidXLogRecPtr to say "continue from last record"
+ */
startptr = InvalidXLogRecPtr;
/*
* ----------------------------------------
*/
- /* Buffer for currently read page (XLOG_BLCKSZ bytes) */
+ /*
+ * Buffer for currently read page (XLOG_BLCKSZ bytes, valid up to at least
+ * readLen bytes)
+ */
char *readBuf;
+ uint32 readLen;
- /* last read segment, segment offset, read length, TLI */
+ /* last read segment, segment offset, TLI for data currently in readBuf */
XLogSegNo readSegNo;
uint32 readOff;
- uint32 readLen;
TimeLineID readPageTLI;
- /* beginning of last page read, and its TLI */
+ /*
+ * beginning of prior page read, and its TLI. Doesn't necessarily
+ * correspond to what's in readBuf; used for timeline sanity checks.
+ */
XLogRecPtr latestPagePtr;
TimeLineID latestPageTLI;
extern struct XLogRecord *XLogReadRecord(XLogReaderState *state,
XLogRecPtr recptr, char **errormsg);
+/* Invalidate read state */
+extern void XLogReaderInvalReadState(XLogReaderState *state);
+
#ifdef FRONTEND
extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr);
#endif /* FRONTEND */
extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
extern void FreeFakeRelcacheEntry(Relation fakerel);
-extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
- int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI);
+extern int read_local_xlog_page(XLogReaderState *state,
+ XLogRecPtr targetPagePtr, int reqLen,
+ XLogRecPtr targetRecPtr, char *cur_page,
+ TimeLineID *pageTLI);
#endif