*/
#include "postgres.h"
+#include <unistd.h>
+
+#include "miscadmin.h"
+
#include "access/xlog.h"
+#include "access/xlog_internal.h"
#include "access/xlogutils.h"
#include "catalog/catalog.h"
#include "storage/smgr.h"
{
forget_invalid_pages(rnode, forkNum, nblocks);
}
+
+/*
+ * TODO: This is duplicate code with pg_xlogdump, similar to walsender.c, but
+ * we currently don't have the infrastructure (elog!) to share it.
+ */
+static void
+XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
+{
+ char *p;
+ XLogRecPtr recptr;
+ Size nbytes;
+
+ static int sendFile = -1;
+ static XLogSegNo sendSegNo = 0;
+ static uint32 sendOff = 0;
+
+ p = buf;
+ recptr = startptr;
+ nbytes = count;
+
+ while (nbytes > 0)
+ {
+ uint32 startoff;
+ int segbytes;
+ int readbytes;
+
+ startoff = recptr % XLogSegSize;
+
+ if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
+ {
+ char path[MAXPGPATH];
+
+ /* Switch to another logfile segment */
+ if (sendFile >= 0)
+ close(sendFile);
+
+ XLByteToSeg(recptr, sendSegNo);
+
+ XLogFilePath(path, tli, sendSegNo);
+
+ sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
+
+ if (sendFile < 0)
+ {
+ if (errno == ENOENT)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("requested WAL segment %s has already been removed",
+ path)));
+ else
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\": %m",
+ path)));
+ }
+ sendOff = 0;
+ }
+
+ /* Need to seek in the file? */
+ if (sendOff != startoff)
+ {
+ if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
+ {
+ char path[MAXPGPATH];
+
+ XLogFilePath(path, tli, sendSegNo);
+
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not seek in log segment %s to offset %u: %m",
+ path, startoff)));
+ }
+ sendOff = startoff;
+ }
+
+ /* How many bytes are within this segment? */
+ if (nbytes > (XLogSegSize - startoff))
+ segbytes = XLogSegSize - startoff;
+ else
+ segbytes = nbytes;
+
+ readbytes = read(sendFile, p, segbytes);
+ if (readbytes <= 0)
+ {
+ char path[MAXPGPATH];
+
+ XLogFilePath(path, tli, sendSegNo);
+
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from log segment %s, offset %u, length %lu: %m",
+ path, sendOff, (unsigned long) segbytes)));
+ }
+
+ /* Update state for read */
+ recptr += readbytes;
+
+ sendOff += readbytes;
+ nbytes -= readbytes;
+ p += readbytes;
+ }
+}
+
+/*
+ * read_page callback for reading local xlog files
+ *
+ * Public because it would likely be very helpful for someone writing another
+ * output method outside walsender, e.g. in a bgworker.
+ *
+ * TODO: The walsender has it's own version of this, but it relies on the
+ * walsender's latch being set whenever WAL is flushed. No such infrastructure
+ * exists for normal backends, so we have to do a check/sleep/repeat style of
+ * loop for now.
+ */
+int
+read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
+ int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
+{
+ XLogRecPtr flushptr,
+ loc;
+ int count;
+
+ loc = targetPagePtr + reqLen;
+ while (1)
+ {
+ /*
+ * TODO: we're going to have to do something more intelligent about
+ * timelines on standbys. Use readTimeLineHistory() and
+ * tliOfPointInHistory() to get the proper LSN? For now we'll catch
+ * that case earlier, but the code and TODO is left in here for when
+ * that changes.
+ */
+ if (!RecoveryInProgress())
+ {
+ *pageTLI = ThisTimeLineID;
+ flushptr = GetFlushRecPtr();
+ }
+ else
+ flushptr = GetXLogReplayRecPtr(pageTLI);
+
+ if (loc <= flushptr)
+ break;
+
+ CHECK_FOR_INTERRUPTS();
+ pg_usleep(1000L);
+ }
+
+ /* more than one block available */
+ if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
+ count = XLOG_BLCKSZ;
+ /* not enough data there */
+ else if (targetPagePtr + reqLen > flushptr)
+ return -1;
+ /* part of the page available */
+ else
+ count = flushptr - targetPagePtr;
+
+ XLogRead(cur_page, *pageTLI, targetPagePtr, XLOG_BLCKSZ);
+
+ return count;
+}
#include "miscadmin.h"
#include "access/xlog_internal.h"
+#include "access/xlogutils.h"
#include "catalog/pg_type.h"
p->returned_rows++;
}
-/*
- * TODO: This is duplicate code with pg_xlogdump, similar to walsender.c, but
- * we currently don't have the infrastructure (elog!) to share it.
- */
-static void
-XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
-{
- char *p;
- XLogRecPtr recptr;
- Size nbytes;
-
- static int sendFile = -1;
- static XLogSegNo sendSegNo = 0;
- static uint32 sendOff = 0;
-
- p = buf;
- recptr = startptr;
- nbytes = count;
-
- while (nbytes > 0)
- {
- uint32 startoff;
- int segbytes;
- int readbytes;
-
- startoff = recptr % XLogSegSize;
-
- if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
- {
- char path[MAXPGPATH];
-
- /* Switch to another logfile segment */
- if (sendFile >= 0)
- close(sendFile);
-
- XLByteToSeg(recptr, sendSegNo);
-
- XLogFilePath(path, tli, sendSegNo);
-
- sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
-
- if (sendFile < 0)
- {
- if (errno == ENOENT)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("requested WAL segment %s has already been removed",
- path)));
- else
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not open file \"%s\": %m",
- path)));
- }
- sendOff = 0;
- }
-
- /* Need to seek in the file? */
- if (sendOff != startoff)
- {
- if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
- {
- char path[MAXPGPATH];
-
- XLogFilePath(path, tli, sendSegNo);
-
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not seek in log segment %s to offset %u: %m",
- path, startoff)));
- }
- sendOff = startoff;
- }
-
- /* How many bytes are within this segment? */
- if (nbytes > (XLogSegSize - startoff))
- segbytes = XLogSegSize - startoff;
- else
- segbytes = nbytes;
-
- readbytes = read(sendFile, p, segbytes);
- if (readbytes <= 0)
- {
- char path[MAXPGPATH];
-
- XLogFilePath(path, tli, sendSegNo);
-
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not read from log segment %s, offset %u, length %lu: %m",
- path, sendOff, (unsigned long) segbytes)));
- }
-
- /* Update state for read */
- recptr += readbytes;
-
- sendOff += readbytes;
- nbytes -= readbytes;
- p += readbytes;
- }
-}
-
static void
check_permissions(void)
{
(errmsg("must be superuser or replication role to use replication slots"))));
}
-/*
- * read_page callback for logical decoding contexts.
- *
- * Public because it would likely be very helpful for someone writing another
- * output method outside walsender, e.g. in a bgworker.
- *
- * TODO: The walsender has it's own version of this, but it relies on the
- * walsender's latch being set whenever WAL is flushed. No such infrastructure
- * exists for normal backends, so we have to do a check/sleep/repeat style of
- * loop for now.
- */
int
logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
{
- XLogRecPtr flushptr,
- loc;
- int count;
-
- loc = targetPagePtr + reqLen;
- while (1)
- {
- /*
- * TODO: we're going to have to do something more intelligent about
- * timelines on standbys. Use readTimeLineHistory() and
- * tliOfPointInHistory() to get the proper LSN? For now we'll catch
- * that case earlier, but the code and TODO is left in here for when
- * that changes.
- */
- if (!RecoveryInProgress())
- {
- *pageTLI = ThisTimeLineID;
- flushptr = GetFlushRecPtr();
- }
- else
- flushptr = GetXLogReplayRecPtr(pageTLI);
-
- if (loc <= flushptr)
- break;
-
- CHECK_FOR_INTERRUPTS();
- pg_usleep(1000L);
- }
-
- /* more than one block available */
- if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
- count = XLOG_BLCKSZ;
- /* not enough data there */
- else if (targetPagePtr + reqLen > flushptr)
- return -1;
- /* part of the page available */
- else
- count = flushptr - targetPagePtr;
-
- XLogRead(cur_page, *pageTLI, targetPagePtr, XLOG_BLCKSZ);
-
- return count;
+ return read_local_xlog_page(state, targetPagePtr, reqLen,
+ targetRecPtr, cur_page, pageTLI);
}
/*
extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
extern void FreeFakeRelcacheEntry(Relation fakerel);
+extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
+ int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI);
+
#endif