]> granicus.if.org Git - postgresql/commitdiff
Refactor to create generic WAL page read callback
authorSimon Riggs <simon@2ndQuadrant.com>
Thu, 21 Jan 2016 01:18:58 +0000 (17:18 -0800)
committerSimon Riggs <simon@2ndQuadrant.com>
Thu, 21 Jan 2016 01:18:58 +0000 (17:18 -0800)
Previously we didn’t have a generic WAL page read callback function,
surprisingly. Logical decoding has logical_read_local_xlog_page(), which was
actually generic, so move that to xlogfunc.c and rename to
read_local_xlog_page().
Maintain logical_read_local_xlog_page() so existing callers still work.

As requested by Michael Paquier, Alvaro Herrera and Andres Freund

src/backend/access/transam/xlogutils.c
src/backend/replication/logical/logicalfuncs.c
src/include/access/xlogutils.h

index 37e9e403fcac60cfbea14878a9d261ccda86a9b8..444e2180b0c2e1dcf861cfd10410cd1dc960ebc6 100644 (file)
  */
 #include "postgres.h"
 
+#include <unistd.h>
+
+#include "miscadmin.h"
+
 #include "access/xlog.h"
+#include "access/xlog_internal.h"
 #include "access/xlogutils.h"
 #include "catalog/catalog.h"
 #include "storage/smgr.h"
@@ -631,3 +636,164 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
 {
        forget_invalid_pages(rnode, forkNum, nblocks);
 }
+
+/*
+ * TODO: This is duplicate code with pg_xlogdump, similar to walsender.c, but
+ * we currently don't have the infrastructure (elog!) to share it.
+ */
+static void
+XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
+{
+       char       *p;
+       XLogRecPtr      recptr;
+       Size            nbytes;
+
+       static int      sendFile = -1;
+       static XLogSegNo sendSegNo = 0;
+       static uint32 sendOff = 0;
+
+       p = buf;
+       recptr = startptr;
+       nbytes = count;
+
+       while (nbytes > 0)
+       {
+               uint32          startoff;
+               int                     segbytes;
+               int                     readbytes;
+
+               startoff = recptr % XLogSegSize;
+
+               if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
+               {
+                       char            path[MAXPGPATH];
+
+                       /* Switch to another logfile segment */
+                       if (sendFile >= 0)
+                               close(sendFile);
+
+                       XLByteToSeg(recptr, sendSegNo);
+
+                       XLogFilePath(path, tli, sendSegNo);
+
+                       sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
+
+                       if (sendFile < 0)
+                       {
+                               if (errno == ENOENT)
+                                       ereport(ERROR,
+                                                       (errcode_for_file_access(),
+                                                        errmsg("requested WAL segment %s has already been removed",
+                                                                       path)));
+                               else
+                                       ereport(ERROR,
+                                                       (errcode_for_file_access(),
+                                                        errmsg("could not open file \"%s\": %m",
+                                                                       path)));
+                       }
+                       sendOff = 0;
+               }
+
+               /* Need to seek in the file? */
+               if (sendOff != startoff)
+               {
+                       if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
+                       {
+                               char            path[MAXPGPATH];
+
+                               XLogFilePath(path, tli, sendSegNo);
+
+                               ereport(ERROR,
+                                               (errcode_for_file_access(),
+                                 errmsg("could not seek in log segment %s to offset %u: %m",
+                                                path, startoff)));
+                       }
+                       sendOff = startoff;
+               }
+
+               /* How many bytes are within this segment? */
+               if (nbytes > (XLogSegSize - startoff))
+                       segbytes = XLogSegSize - startoff;
+               else
+                       segbytes = nbytes;
+
+               readbytes = read(sendFile, p, segbytes);
+               if (readbytes <= 0)
+               {
+                       char            path[MAXPGPATH];
+
+                       XLogFilePath(path, tli, sendSegNo);
+
+                       ereport(ERROR,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not read from log segment %s, offset %u, length %lu: %m",
+                                                       path, sendOff, (unsigned long) segbytes)));
+               }
+
+               /* Update state for read */
+               recptr += readbytes;
+
+               sendOff += readbytes;
+               nbytes -= readbytes;
+               p += readbytes;
+       }
+}
+
+/*
+ * read_page callback for reading local xlog files
+ *
+ * Public because it would likely be very helpful for someone writing another
+ * output method outside walsender, e.g. in a bgworker.
+ *
+ * TODO: The walsender has it's own version of this, but it relies on the
+ * walsender's latch being set whenever WAL is flushed. No such infrastructure
+ * exists for normal backends, so we have to do a check/sleep/repeat style of
+ * loop for now.
+ */
+int
+read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
+       int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
+{
+       XLogRecPtr      flushptr,
+                               loc;
+       int                     count;
+
+       loc = targetPagePtr + reqLen;
+       while (1)
+       {
+               /*
+                * TODO: we're going to have to do something more intelligent about
+                * timelines on standbys. Use readTimeLineHistory() and
+                * tliOfPointInHistory() to get the proper LSN? For now we'll catch
+                * that case earlier, but the code and TODO is left in here for when
+                * that changes.
+                */
+               if (!RecoveryInProgress())
+               {
+                       *pageTLI = ThisTimeLineID;
+                       flushptr = GetFlushRecPtr();
+               }
+               else
+                       flushptr = GetXLogReplayRecPtr(pageTLI);
+
+               if (loc <= flushptr)
+                       break;
+
+               CHECK_FOR_INTERRUPTS();
+               pg_usleep(1000L);
+       }
+
+       /* more than one block available */
+       if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
+               count = XLOG_BLCKSZ;
+       /* not enough data there */
+       else if (targetPagePtr + reqLen > flushptr)
+               return -1;
+       /* part of the page available */
+       else
+               count = flushptr - targetPagePtr;
+
+       XLogRead(cur_page, *pageTLI, targetPagePtr, XLOG_BLCKSZ);
+
+       return count;
+}
index 56e47e4b9c442b9f94ffdc11d1a02be20c1f70cd..f789fc127d0288302ae7eef75504b244f3351a1f 100644 (file)
@@ -22,6 +22,7 @@
 #include "miscadmin.h"
 
 #include "access/xlog_internal.h"
+#include "access/xlogutils.h"
 
 #include "catalog/pg_type.h"
 
@@ -100,108 +101,6 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
        p->returned_rows++;
 }
 
-/*
- * TODO: This is duplicate code with pg_xlogdump, similar to walsender.c, but
- * we currently don't have the infrastructure (elog!) to share it.
- */
-static void
-XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
-{
-       char       *p;
-       XLogRecPtr      recptr;
-       Size            nbytes;
-
-       static int      sendFile = -1;
-       static XLogSegNo sendSegNo = 0;
-       static uint32 sendOff = 0;
-
-       p = buf;
-       recptr = startptr;
-       nbytes = count;
-
-       while (nbytes > 0)
-       {
-               uint32          startoff;
-               int                     segbytes;
-               int                     readbytes;
-
-               startoff = recptr % XLogSegSize;
-
-               if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
-               {
-                       char            path[MAXPGPATH];
-
-                       /* Switch to another logfile segment */
-                       if (sendFile >= 0)
-                               close(sendFile);
-
-                       XLByteToSeg(recptr, sendSegNo);
-
-                       XLogFilePath(path, tli, sendSegNo);
-
-                       sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
-
-                       if (sendFile < 0)
-                       {
-                               if (errno == ENOENT)
-                                       ereport(ERROR,
-                                                       (errcode_for_file_access(),
-                                                        errmsg("requested WAL segment %s has already been removed",
-                                                                       path)));
-                               else
-                                       ereport(ERROR,
-                                                       (errcode_for_file_access(),
-                                                        errmsg("could not open file \"%s\": %m",
-                                                                       path)));
-                       }
-                       sendOff = 0;
-               }
-
-               /* Need to seek in the file? */
-               if (sendOff != startoff)
-               {
-                       if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
-                       {
-                               char            path[MAXPGPATH];
-
-                               XLogFilePath(path, tli, sendSegNo);
-
-                               ereport(ERROR,
-                                               (errcode_for_file_access(),
-                                 errmsg("could not seek in log segment %s to offset %u: %m",
-                                                path, startoff)));
-                       }
-                       sendOff = startoff;
-               }
-
-               /* How many bytes are within this segment? */
-               if (nbytes > (XLogSegSize - startoff))
-                       segbytes = XLogSegSize - startoff;
-               else
-                       segbytes = nbytes;
-
-               readbytes = read(sendFile, p, segbytes);
-               if (readbytes <= 0)
-               {
-                       char            path[MAXPGPATH];
-
-                       XLogFilePath(path, tli, sendSegNo);
-
-                       ereport(ERROR,
-                                       (errcode_for_file_access(),
-                                        errmsg("could not read from log segment %s, offset %u, length %lu: %m",
-                                                       path, sendOff, (unsigned long) segbytes)));
-               }
-
-               /* Update state for read */
-               recptr += readbytes;
-
-               sendOff += readbytes;
-               nbytes -= readbytes;
-               p += readbytes;
-       }
-}
-
 static void
 check_permissions(void)
 {
@@ -211,63 +110,12 @@ check_permissions(void)
                                 (errmsg("must be superuser or replication role to use replication slots"))));
 }
 
-/*
- * read_page callback for logical decoding contexts.
- *
- * Public because it would likely be very helpful for someone writing another
- * output method outside walsender, e.g. in a bgworker.
- *
- * TODO: The walsender has it's own version of this, but it relies on the
- * walsender's latch being set whenever WAL is flushed. No such infrastructure
- * exists for normal backends, so we have to do a check/sleep/repeat style of
- * loop for now.
- */
 int
 logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
        int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
 {
-       XLogRecPtr      flushptr,
-                               loc;
-       int                     count;
-
-       loc = targetPagePtr + reqLen;
-       while (1)
-       {
-               /*
-                * TODO: we're going to have to do something more intelligent about
-                * timelines on standbys. Use readTimeLineHistory() and
-                * tliOfPointInHistory() to get the proper LSN? For now we'll catch
-                * that case earlier, but the code and TODO is left in here for when
-                * that changes.
-                */
-               if (!RecoveryInProgress())
-               {
-                       *pageTLI = ThisTimeLineID;
-                       flushptr = GetFlushRecPtr();
-               }
-               else
-                       flushptr = GetXLogReplayRecPtr(pageTLI);
-
-               if (loc <= flushptr)
-                       break;
-
-               CHECK_FOR_INTERRUPTS();
-               pg_usleep(1000L);
-       }
-
-       /* more than one block available */
-       if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
-               count = XLOG_BLCKSZ;
-       /* not enough data there */
-       else if (targetPagePtr + reqLen > flushptr)
-               return -1;
-       /* part of the page available */
-       else
-               count = flushptr - targetPagePtr;
-
-       XLogRead(cur_page, *pageTLI, targetPagePtr, XLOG_BLCKSZ);
-
-       return count;
+       return read_local_xlog_page(state, targetPagePtr, reqLen,
+                                                targetRecPtr, cur_page, pageTLI);
 }
 
 /*
index a1c0c82c347339d54d43614e9acf92efb40b1b3f..1b9abce9ad3b61e62f921361432280afc7549817 100644 (file)
@@ -47,4 +47,7 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
 extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
 extern void FreeFakeRelcacheEntry(Relation fakerel);
 
+extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
+       int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI);
+
 #endif