From 3b02ea4f0780ccce7dc116010201dad7ee50a401 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera Date: Wed, 30 Mar 2016 18:56:13 -0300 Subject: [PATCH] XLogReader general code cleanup Some minor tweaks and comment additions, for cleanliness sake and to avoid having the upcoming timeline-following patch be polluted with unrelated cleanup. Extracted from a larger patch by Craig Ringer, reviewed by Andres Freund, with some additions by myself. --- src/backend/access/transam/xlogreader.c | 54 +++++++++++++------ src/backend/access/transam/xlogutils.c | 21 +++++--- .../replication/logical/logicalfuncs.c | 18 ++++++- src/include/access/xlogreader.h | 17 ++++-- src/include/access/xlogutils.h | 6 ++- 5 files changed, 86 insertions(+), 30 deletions(-) diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index fcb08720c0..018fdf3d34 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -10,9 +10,11 @@ * * NOTES * See xlogreader.h for more notes on this facility. + * + * This file is compiled as both front-end and backend code, so it + * may not use ereport, server-defined static variables, etc. *------------------------------------------------------------------------- */ - #include "postgres.h" #include "access/transam.h" @@ -192,7 +194,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) { XLogRecord *record; XLogRecPtr targetPagePtr; - bool randAccess = false; + bool randAccess; uint32 len, total_len; uint32 targetRecOff; @@ -200,6 +202,13 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) bool gotheader; int readOff; + /* + * randAccess indicates whether to verify the previous-record pointer of + * the record we're reading. We only do this if we're reading + * sequentially, which is what we initially assume. + */ + randAccess = false; + /* reset error state */ *errormsg = NULL; state->errormsg_buf[0] = '\0'; @@ -208,6 +217,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) if (RecPtr == InvalidXLogRecPtr) { + /* No explicit start point; read the record after the one we just read */ RecPtr = state->EndRecPtr; if (state->ReadRecPtr == InvalidXLogRecPtr) @@ -223,11 +233,13 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) else { /* + * Caller supplied a position to start at. + * * In this case, the passed-in record pointer should already be * pointing to a valid record starting position. */ Assert(XRecOffIsValid(RecPtr)); - randAccess = true; /* allow readPageTLI to go backwards too */ + randAccess = true; } state->currRecPtr = RecPtr; @@ -309,8 +321,10 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) /* XXX: more validation should be done here */ if (total_len < SizeOfXLogRecord) { - report_invalid_record(state, "invalid record length at %X/%X", - (uint32) (RecPtr >> 32), (uint32) RecPtr); + report_invalid_record(state, + "invalid record length at %X/%X: wanted %lu, got %u", + (uint32) (RecPtr >> 32), (uint32) RecPtr, + SizeOfXLogRecord, total_len); goto err; } gotheader = false; @@ -463,12 +477,10 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) err: /* - * Invalidate the xlog page we've cached. We might read from a different - * source after failure. + * Invalidate the read state. We might read from a different source after + * failure. */ - state->readSegNo = 0; - state->readOff = 0; - state->readLen = 0; + XLogReaderInvalReadState(state); if (state->errormsg_buf[0] != '\0') *errormsg = state->errormsg_buf; @@ -572,7 +584,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) if (!ValidXLogPageHeader(state, pageptr, hdr)) goto err; - /* update cache information */ + /* update read state information */ state->readSegNo = targetSegNo; state->readOff = targetPageOff; state->readLen = readLen; @@ -580,10 +592,19 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) return readLen; err: + XLogReaderInvalReadState(state); + return -1; +} + +/* + * Invalidate the xlogreader's read state to force a re-read. + */ +void +XLogReaderInvalReadState(XLogReaderState *state) +{ state->readSegNo = 0; state->readOff = 0; state->readLen = 0; - return -1; } /* @@ -600,8 +621,9 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, if (record->xl_tot_len < SizeOfXLogRecord) { report_invalid_record(state, - "invalid record length at %X/%X", - (uint32) (RecPtr >> 32), (uint32) RecPtr); + "invalid record length at %X/%X: wanted %lu, got %u", + (uint32) (RecPtr >> 32), (uint32) RecPtr, + SizeOfXLogRecord, record->xl_tot_len); return false; } if (record->xl_rmid > RM_MAX_ID) @@ -907,11 +929,9 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) err: out: /* Reset state to what we had before finding the record */ - state->readSegNo = 0; - state->readOff = 0; - state->readLen = 0; state->ReadRecPtr = saved_state.ReadRecPtr; state->EndRecPtr = saved_state.EndRecPtr; + XLogReaderInvalReadState(state); return found; } diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 444e2180b0..2635d80dc0 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -19,12 +19,11 @@ #include -#include "miscadmin.h" - #include "access/xlog.h" #include "access/xlog_internal.h" #include "access/xlogutils.h" #include "catalog/catalog.h" +#include "miscadmin.h" #include "storage/smgr.h" #include "utils/guc.h" #include "utils/hsearch.h" @@ -638,8 +637,17 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum, } /* - * TODO: This is duplicate code with pg_xlogdump, similar to walsender.c, but - * we currently don't have the infrastructure (elog!) to share it. + * Read 'count' bytes from WAL into 'buf', starting at location 'startptr' + * in timeline 'tli'. + * + * Will open, and keep open, one WAL segment stored in the static file + * descriptor 'sendFile'. This means if XLogRead is used once, there will + * always be one descriptor left open until the process ends, but never + * more than one. + * + * XXX This is very similar to pg_xlogdump's XLogDumpXLogRead and to XLogRead + * in walsender.c but for small differences (such as lack of elog() in + * frontend). Probably these should be merged at some point. */ static void XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) @@ -648,6 +656,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) XLogRecPtr recptr; Size nbytes; + /* state maintained across calls */ static int sendFile = -1; static XLogSegNo sendSegNo = 0; static uint32 sendOff = 0; @@ -664,11 +673,11 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) startoff = recptr % XLogSegSize; + /* Do we need to switch to a different xlog segment? */ if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo)) { char path[MAXPGPATH]; - /* Switch to another logfile segment */ if (sendFile >= 0) close(sendFile); @@ -745,7 +754,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) * Public because it would likely be very helpful for someone writing another * output method outside walsender, e.g. in a bgworker. * - * TODO: The walsender has it's own version of this, but it relies on the + * TODO: The walsender has its own version of this, but it relies on the * walsender's latch being set whenever WAL is flushed. No such infrastructure * exists for normal backends, so we have to do a check/sleep/repeat style of * loop for now. diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index f789fc127d..3853ab4cf5 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -115,7 +115,7 @@ logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI) { return read_local_xlog_page(state, targetPagePtr, reqLen, - targetRecPtr, cur_page, pageTLI); + targetRecPtr, cur_page, pageTLI); } /* @@ -241,6 +241,10 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin PG_TRY(); { + /* + * Passing InvalidXLogRecPtr here causes replay to start at the slot's + * confirmed_flush. + */ ctx = CreateDecodingContext(InvalidXLogRecPtr, options, logical_read_local_xlog_page, @@ -263,6 +267,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin ctx->output_writer_private = p; + /* + * We start reading xlog from the restart lsn, even though in + * CreateDecodingContext we set the snapshot builder up using the + * slot's confirmed_flush. This means we might read xlog we don't + * actually decode rows from, but the snapshot builder might need it + * to get to a consistent point. The point we start returning data to + * *users* at is the candidate restart lsn from the decoding context. + */ startptr = MyReplicationSlot->data.restart_lsn; CurrentResourceOwner = ResourceOwnerCreate(CurrentResourceOwner, "logical decoding"); @@ -280,6 +292,10 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin if (errm) elog(ERROR, "%s", errm); + /* + * Now that we've set up the xlog reader state, subsequent calls + * pass InvalidXLogRecPtr to say "continue from last record" + */ startptr = InvalidXLogRecPtr; /* diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 7553cc44cb..deaa7f5128 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -139,16 +139,22 @@ struct XLogReaderState * ---------------------------------------- */ - /* Buffer for currently read page (XLOG_BLCKSZ bytes) */ + /* + * Buffer for currently read page (XLOG_BLCKSZ bytes, valid up to at least + * readLen bytes) + */ char *readBuf; + uint32 readLen; - /* last read segment, segment offset, read length, TLI */ + /* last read segment, segment offset, TLI for data currently in readBuf */ XLogSegNo readSegNo; uint32 readOff; - uint32 readLen; TimeLineID readPageTLI; - /* beginning of last page read, and its TLI */ + /* + * beginning of prior page read, and its TLI. Doesn't necessarily + * correspond to what's in readBuf; used for timeline sanity checks. + */ XLogRecPtr latestPagePtr; TimeLineID latestPageTLI; @@ -174,6 +180,9 @@ extern void XLogReaderFree(XLogReaderState *state); extern struct XLogRecord *XLogReadRecord(XLogReaderState *state, XLogRecPtr recptr, char **errormsg); +/* Invalidate read state */ +extern void XLogReaderInvalReadState(XLogReaderState *state); + #ifdef FRONTEND extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr); #endif /* FRONTEND */ diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index 1b9abce9ad..d027ea173b 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -47,7 +47,9 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum, extern Relation CreateFakeRelcacheEntry(RelFileNode rnode); extern void FreeFakeRelcacheEntry(Relation fakerel); -extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI); +extern int read_local_xlog_page(XLogReaderState *state, + XLogRecPtr targetPagePtr, int reqLen, + XLogRecPtr targetRecPtr, char *cur_page, + TimeLineID *pageTLI); #endif -- 2.40.0