From 422a55a68784fd00f4514834f3649140a9166fa5 Mon Sep 17 00:00:00 2001 From: Simon Riggs Date: Wed, 20 Jan 2016 17:18:58 -0800 Subject: [PATCH] Refactor to create generic WAL page read callback MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Previously we didn’t have a generic WAL page read callback function, surprisingly. Logical decoding has logical_read_local_xlog_page(), which was actually generic, so move that to xlogfunc.c and rename to read_local_xlog_page(). Maintain logical_read_local_xlog_page() so existing callers still work. As requested by Michael Paquier, Alvaro Herrera and Andres Freund --- src/backend/access/transam/xlogutils.c | 166 ++++++++++++++++++ .../replication/logical/logicalfuncs.c | 158 +---------------- src/include/access/xlogutils.h | 3 + 3 files changed, 172 insertions(+), 155 deletions(-) diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 37e9e403fc..444e2180b0 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -17,7 +17,12 @@ */ #include "postgres.h" +#include + +#include "miscadmin.h" + #include "access/xlog.h" +#include "access/xlog_internal.h" #include "access/xlogutils.h" #include "catalog/catalog.h" #include "storage/smgr.h" @@ -631,3 +636,164 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum, { forget_invalid_pages(rnode, forkNum, nblocks); } + +/* + * TODO: This is duplicate code with pg_xlogdump, similar to walsender.c, but + * we currently don't have the infrastructure (elog!) to share it. + */ +static void +XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) +{ + char *p; + XLogRecPtr recptr; + Size nbytes; + + static int sendFile = -1; + static XLogSegNo sendSegNo = 0; + static uint32 sendOff = 0; + + p = buf; + recptr = startptr; + nbytes = count; + + while (nbytes > 0) + { + uint32 startoff; + int segbytes; + int readbytes; + + startoff = recptr % XLogSegSize; + + if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo)) + { + char path[MAXPGPATH]; + + /* Switch to another logfile segment */ + if (sendFile >= 0) + close(sendFile); + + XLByteToSeg(recptr, sendSegNo); + + XLogFilePath(path, tli, sendSegNo); + + sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0); + + if (sendFile < 0) + { + if (errno == ENOENT) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("requested WAL segment %s has already been removed", + path))); + else + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + path))); + } + sendOff = 0; + } + + /* Need to seek in the file? */ + if (sendOff != startoff) + { + if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0) + { + char path[MAXPGPATH]; + + XLogFilePath(path, tli, sendSegNo); + + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not seek in log segment %s to offset %u: %m", + path, startoff))); + } + sendOff = startoff; + } + + /* How many bytes are within this segment? */ + if (nbytes > (XLogSegSize - startoff)) + segbytes = XLogSegSize - startoff; + else + segbytes = nbytes; + + readbytes = read(sendFile, p, segbytes); + if (readbytes <= 0) + { + char path[MAXPGPATH]; + + XLogFilePath(path, tli, sendSegNo); + + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from log segment %s, offset %u, length %lu: %m", + path, sendOff, (unsigned long) segbytes))); + } + + /* Update state for read */ + recptr += readbytes; + + sendOff += readbytes; + nbytes -= readbytes; + p += readbytes; + } +} + +/* + * read_page callback for reading local xlog files + * + * Public because it would likely be very helpful for someone writing another + * output method outside walsender, e.g. in a bgworker. + * + * TODO: The walsender has it's own version of this, but it relies on the + * walsender's latch being set whenever WAL is flushed. No such infrastructure + * exists for normal backends, so we have to do a check/sleep/repeat style of + * loop for now. + */ +int +read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, + int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI) +{ + XLogRecPtr flushptr, + loc; + int count; + + loc = targetPagePtr + reqLen; + while (1) + { + /* + * TODO: we're going to have to do something more intelligent about + * timelines on standbys. Use readTimeLineHistory() and + * tliOfPointInHistory() to get the proper LSN? For now we'll catch + * that case earlier, but the code and TODO is left in here for when + * that changes. + */ + if (!RecoveryInProgress()) + { + *pageTLI = ThisTimeLineID; + flushptr = GetFlushRecPtr(); + } + else + flushptr = GetXLogReplayRecPtr(pageTLI); + + if (loc <= flushptr) + break; + + CHECK_FOR_INTERRUPTS(); + pg_usleep(1000L); + } + + /* more than one block available */ + if (targetPagePtr + XLOG_BLCKSZ <= flushptr) + count = XLOG_BLCKSZ; + /* not enough data there */ + else if (targetPagePtr + reqLen > flushptr) + return -1; + /* part of the page available */ + else + count = flushptr - targetPagePtr; + + XLogRead(cur_page, *pageTLI, targetPagePtr, XLOG_BLCKSZ); + + return count; +} diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 56e47e4b9c..f789fc127d 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -22,6 +22,7 @@ #include "miscadmin.h" #include "access/xlog_internal.h" +#include "access/xlogutils.h" #include "catalog/pg_type.h" @@ -100,108 +101,6 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi p->returned_rows++; } -/* - * TODO: This is duplicate code with pg_xlogdump, similar to walsender.c, but - * we currently don't have the infrastructure (elog!) to share it. - */ -static void -XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) -{ - char *p; - XLogRecPtr recptr; - Size nbytes; - - static int sendFile = -1; - static XLogSegNo sendSegNo = 0; - static uint32 sendOff = 0; - - p = buf; - recptr = startptr; - nbytes = count; - - while (nbytes > 0) - { - uint32 startoff; - int segbytes; - int readbytes; - - startoff = recptr % XLogSegSize; - - if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo)) - { - char path[MAXPGPATH]; - - /* Switch to another logfile segment */ - if (sendFile >= 0) - close(sendFile); - - XLByteToSeg(recptr, sendSegNo); - - XLogFilePath(path, tli, sendSegNo); - - sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0); - - if (sendFile < 0) - { - if (errno == ENOENT) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("requested WAL segment %s has already been removed", - path))); - else - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\": %m", - path))); - } - sendOff = 0; - } - - /* Need to seek in the file? */ - if (sendOff != startoff) - { - if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0) - { - char path[MAXPGPATH]; - - XLogFilePath(path, tli, sendSegNo); - - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not seek in log segment %s to offset %u: %m", - path, startoff))); - } - sendOff = startoff; - } - - /* How many bytes are within this segment? */ - if (nbytes > (XLogSegSize - startoff)) - segbytes = XLogSegSize - startoff; - else - segbytes = nbytes; - - readbytes = read(sendFile, p, segbytes); - if (readbytes <= 0) - { - char path[MAXPGPATH]; - - XLogFilePath(path, tli, sendSegNo); - - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read from log segment %s, offset %u, length %lu: %m", - path, sendOff, (unsigned long) segbytes))); - } - - /* Update state for read */ - recptr += readbytes; - - sendOff += readbytes; - nbytes -= readbytes; - p += readbytes; - } -} - static void check_permissions(void) { @@ -211,63 +110,12 @@ check_permissions(void) (errmsg("must be superuser or replication role to use replication slots")))); } -/* - * read_page callback for logical decoding contexts. - * - * Public because it would likely be very helpful for someone writing another - * output method outside walsender, e.g. in a bgworker. - * - * TODO: The walsender has it's own version of this, but it relies on the - * walsender's latch being set whenever WAL is flushed. No such infrastructure - * exists for normal backends, so we have to do a check/sleep/repeat style of - * loop for now. - */ int logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI) { - XLogRecPtr flushptr, - loc; - int count; - - loc = targetPagePtr + reqLen; - while (1) - { - /* - * TODO: we're going to have to do something more intelligent about - * timelines on standbys. Use readTimeLineHistory() and - * tliOfPointInHistory() to get the proper LSN? For now we'll catch - * that case earlier, but the code and TODO is left in here for when - * that changes. - */ - if (!RecoveryInProgress()) - { - *pageTLI = ThisTimeLineID; - flushptr = GetFlushRecPtr(); - } - else - flushptr = GetXLogReplayRecPtr(pageTLI); - - if (loc <= flushptr) - break; - - CHECK_FOR_INTERRUPTS(); - pg_usleep(1000L); - } - - /* more than one block available */ - if (targetPagePtr + XLOG_BLCKSZ <= flushptr) - count = XLOG_BLCKSZ; - /* not enough data there */ - else if (targetPagePtr + reqLen > flushptr) - return -1; - /* part of the page available */ - else - count = flushptr - targetPagePtr; - - XLogRead(cur_page, *pageTLI, targetPagePtr, XLOG_BLCKSZ); - - return count; + return read_local_xlog_page(state, targetPagePtr, reqLen, + targetRecPtr, cur_page, pageTLI); } /* diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index a1c0c82c34..1b9abce9ad 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -47,4 +47,7 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum, extern Relation CreateFakeRelcacheEntry(RelFileNode rnode); extern void FreeFakeRelcacheEntry(Relation fakerel); +extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, + int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI); + #endif -- 2.40.0