#
# Copyright (c) 1994, Regents of the University of California
#
-# $PostgreSQL: pgsql/src/Makefile,v 1.49 2010/01/15 17:01:06 heikki Exp $
+# $PostgreSQL: pgsql/src/Makefile,v 1.50 2010/01/20 09:16:23 heikki Exp $
#
#-------------------------------------------------------------------------
$(MAKE) -C backend/snowball $@
$(MAKE) -C include $@
$(MAKE) -C interfaces $@
- $(MAKE) -C backend/replication/walreceiver $@
+ $(MAKE) -C backend/replication/libpqwalreceiver $@
$(MAKE) -C bin $@
$(MAKE) -C pl $@
$(MAKE) -C makefiles $@
$(MAKE) -C backend/snowball $@
$(MAKE) -C include $@
$(MAKE) -C interfaces $@
- $(MAKE) -C backend/replication/walreceiver $@
+ $(MAKE) -C backend/replication/libpqwalreceiver $@
$(MAKE) -C bin $@
$(MAKE) -C pl $@
$(MAKE) -C makefiles $@
$(MAKE) -C backend/snowball $@
$(MAKE) -C include $@
$(MAKE) -C interfaces $@
- $(MAKE) -C backend/replication/walreceiver $@
+ $(MAKE) -C backend/replication/libpqwalreceiver $@
$(MAKE) -C bin $@
$(MAKE) -C pl $@
$(MAKE) -C makefiles $@
$(MAKE) -C backend/utils/mb/conversion_procs $@
$(MAKE) -C backend/snowball $@
$(MAKE) -C interfaces $@
- $(MAKE) -C backend/replication/walreceiver $@
+ $(MAKE) -C backend/replication/libpqwalreceiver $@
$(MAKE) -C bin $@
$(MAKE) -C pl $@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/bootstrap/bootstrap.c,v 1.256 2010/01/15 09:19:00 heikki Exp $
+ * $PostgreSQL: pgsql/src/backend/bootstrap/bootstrap.c,v 1.257 2010/01/20 09:16:23 heikki Exp $
*
*-------------------------------------------------------------------------
*/
case WalReceiverProcess:
/* don't set signals, walreceiver has its own agenda */
- {
- PGFunction WalReceiverMain;
-
- /*
- * Walreceiver is not linked directly into the server
- * binary because we would then need to link the server
- * with libpq. It's compiled as a dynamically loaded module
- * to avoid that.
- */
- WalReceiverMain = load_external_function("walreceiver",
- "WalReceiverMain",
- true, NULL);
- WalReceiverMain(NULL);
- }
+ WalReceiverMain();
proc_exit(1); /* should never return */
default:
# Makefile for src/backend/replication
#
# IDENTIFICATION
-# $PostgreSQL: pgsql/src/backend/replication/Makefile,v 1.1 2010/01/15 09:19:03 heikki Exp $
+# $PostgreSQL: pgsql/src/backend/replication/Makefile,v 1.2 2010/01/20 09:16:24 heikki Exp $
#
#-------------------------------------------------------------------------
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
-OBJS = walsender.o walreceiverfuncs.o
+OBJS = walsender.o walreceiverfuncs.o walreceiver.o
include $(top_srcdir)/src/backend/common.mk
-$PostgreSQL: pgsql/src/backend/replication/README,v 1.1 2010/01/15 09:19:03 heikki Exp $
+$PostgreSQL: pgsql/src/backend/replication/README,v 1.2 2010/01/20 09:16:24 heikki Exp $
+
+Walreceiver - libpqwalreceiver API
+----------------------------------
+
+The transport-specific part of walreceiver, responsible for connecting to
+the primary server and receiving WAL files, is loaded dynamically to avoid
+having to link the main server binary with libpq. The dynamically loaded
+module is in libpqwalreceiver subdirectory.
+
+The dynamically loaded module implements three functions:
+
+
+bool walrcv_connect(char *conninfo, XLogRecPtr startpoint)
+
+Establish connection to the primary, and starts streaming from 'startpoint'.
+Returns true on success.
+
+
+bool walrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len)
+
+Retrieve any WAL record available through the connection, blocking for
+maximum of 'timeout' ms.
+
+
+void walrcv_disconnect(void);
+
+Disconnect.
+
+
+This API should be considered internal at the moment, but we could open it
+up for 3rd party replacements of libpqwalreceiver in the future, allowing
+pluggable methods for receiveing WAL.
Walreceiver IPC
---------------
#-------------------------------------------------------------------------
#
# Makefile--
-# Makefile for src/backend/replication/walreceiver
+# Makefile for src/backend/replication/libpqwalreceiver
#
# IDENTIFICATION
-# $PostgreSQL: pgsql/src/backend/replication/walreceiver/Makefile,v 1.4 2010/01/15 21:06:26 tgl Exp $
+# $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/Makefile,v 1.1 2010/01/20 09:16:24 heikki Exp $
#
#-------------------------------------------------------------------------
-subdir = src/backend/replication/walreceiver
+subdir = src/backend/postmaster/libpqwalreceiver
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
-override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
-
-OBJS = walreceiver.o
+override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS)
+OBJS = libpqwalreceiver.o
SHLIB_LINK = $(libpq)
-
-NAME := walreceiver
+NAME = libpqwalreceiver
all: submake-libpq all-shared-lib
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * libpqwalreceiver.c
+ *
+ * This file contains the libpq-specific parts of walreceiver. It's
+ * loaded as a dynamic module to avoid linking the main server binary with
+ * libpq.
+ *
+ * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.1 2010/01/20 09:16:24 heikki Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <unistd.h>
+#include <sys/time.h>
+
+#include "libpq-fe.h"
+#include "access/xlog.h"
+#include "miscadmin.h"
+#include "replication/walreceiver.h"
+#include "utils/builtins.h"
+
+#ifdef HAVE_POLL_H
+#include <poll.h>
+#endif
+#ifdef HAVE_SYS_POLL_H
+#include <sys/poll.h>
+#endif
+#ifdef HAVE_SYS_SELECT_H
+#include <sys/select.h>
+#endif
+
+PG_MODULE_MAGIC;
+
+void _PG_init(void);
+
+/* Current connection to the primary, if any */
+static PGconn *streamConn = NULL;
+static bool justconnected = false;
+
+/* Buffer for currently read records */
+static char *recvBuf = NULL;
+
+/* Prototypes for interface functions */
+static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
+static bool libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer,
+ int *len);
+static void libpqrcv_disconnect(void);
+
+/* Prototypes for private functions */
+static bool libpq_select(int timeout_ms);
+
+/*
+ * Module load callback
+ */
+void
+_PG_init(void)
+{
+ /* Tell walreceiver how to reach us */
+ if (walrcv_connect != NULL || walrcv_receive != NULL || walrcv_disconnect)
+ elog(ERROR, "libpqwalreceiver already loaded");
+ walrcv_connect = libpqrcv_connect;
+ walrcv_receive = libpqrcv_receive;
+ walrcv_disconnect = libpqrcv_disconnect;
+}
+
+/*
+ * Establish the connection to the primary server for XLOG streaming
+ */
+static bool
+libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
+{
+ char conninfo_repl[MAXCONNINFO + 14];
+ char *primary_sysid;
+ char standby_sysid[32];
+ TimeLineID primary_tli;
+ TimeLineID standby_tli;
+ PGresult *res;
+ char cmd[64];
+
+ Assert(startpoint.xlogid != 0 || startpoint.xrecoff != 0);
+
+ /* Connect */
+ snprintf(conninfo_repl, sizeof(conninfo_repl), "%s replication=true", conninfo);
+
+ streamConn = PQconnectdb(conninfo_repl);
+ if (PQstatus(streamConn) != CONNECTION_OK)
+ ereport(ERROR,
+ (errmsg("could not connect to the primary server : %s",
+ PQerrorMessage(streamConn))));
+
+ /*
+ * Get the system identifier and timeline ID as a DataRow message
+ * from the primary server.
+ */
+ res = PQexec(streamConn, "IDENTIFY_SYSTEM");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ PQclear(res);
+ ereport(ERROR,
+ (errmsg("could not receive the SYSID and timeline ID from "
+ "the primary server: %s",
+ PQerrorMessage(streamConn))));
+ }
+ if (PQnfields(res) != 2 || PQntuples(res) != 1)
+ {
+ int ntuples = PQntuples(res);
+ int nfields = PQnfields(res);
+ PQclear(res);
+ ereport(ERROR,
+ (errmsg("invalid response from primary server"),
+ errdetail("expected 1 tuple with 2 fields, got %d tuples with %d fields",
+ ntuples, nfields)));
+ }
+ primary_sysid = PQgetvalue(res, 0, 0);
+ primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
+
+ /*
+ * Confirm that the system identifier of the primary is the same
+ * as ours.
+ */
+ snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
+ GetSystemIdentifier());
+ if (strcmp(primary_sysid, standby_sysid) != 0)
+ {
+ PQclear(res);
+ ereport(ERROR,
+ (errmsg("system differs between the primary and standby"),
+ errdetail("the primary SYSID is %s, standby SYSID is %s",
+ primary_sysid, standby_sysid)));
+ }
+
+ /*
+ * Confirm that the current timeline of the primary is the same
+ * as the recovery target timeline.
+ */
+ standby_tli = GetRecoveryTargetTLI();
+ PQclear(res);
+ if (primary_tli != standby_tli)
+ ereport(ERROR,
+ (errmsg("timeline %u of the primary does not match recovery target timeline %u",
+ primary_tli, standby_tli)));
+ ThisTimeLineID = primary_tli;
+
+ /* Start streaming from the point requested by startup process */
+ snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
+ startpoint.xlogid, startpoint.xrecoff);
+ res = PQexec(streamConn, cmd);
+ if (PQresultStatus(res) != PGRES_COPY_OUT)
+ ereport(ERROR,
+ (errmsg("could not start XLOG streaming: %s",
+ PQerrorMessage(streamConn))));
+ PQclear(res);
+
+ justconnected = true;
+
+ return true;
+}
+
+/*
+ * Wait until we can read WAL stream, or timeout.
+ *
+ * Returns true if data has become available for reading, false if timed out
+ * or interrupted by signal.
+ *
+ * This is based on pqSocketCheck.
+ */
+static bool
+libpq_select(int timeout_ms)
+{
+ int ret;
+
+ Assert(streamConn != NULL);
+ if (PQsocket(streamConn) < 0)
+ ereport(ERROR,
+ (errcode_for_socket_access(),
+ errmsg("socket not open")));
+
+ /* We use poll(2) if available, otherwise select(2) */
+ {
+#ifdef HAVE_POLL
+ struct pollfd input_fd;
+
+ input_fd.fd = PQsocket(streamConn);
+ input_fd.events = POLLIN | POLLERR;
+ input_fd.revents = 0;
+
+ ret = poll(&input_fd, 1, timeout_ms);
+#else /* !HAVE_POLL */
+
+ fd_set input_mask;
+ struct timeval timeout;
+ struct timeval *ptr_timeout;
+
+ FD_ZERO(&input_mask);
+ FD_SET(PQsocket(streamConn), &input_mask);
+
+ if (timeout_ms < 0)
+ ptr_timeout = NULL;
+ else
+ {
+ timeout.tv_sec = timeout_ms / 1000;
+ timeout.tv_usec = (timeout_ms % 1000) * 1000;
+ ptr_timeout = &timeout;
+ }
+
+ ret = select(PQsocket(streamConn) + 1, &input_mask,
+ NULL, NULL, ptr_timeout);
+#endif /* HAVE_POLL */
+ }
+
+ if (ret == 0 || (ret < 0 && errno == EINTR))
+ return false;
+ if (ret < 0)
+ ereport(ERROR,
+ (errcode_for_socket_access(),
+ errmsg("select() failed: %m")));
+ return true;
+}
+
+/*
+ * Disconnect connection to primary, if any.
+ */
+static void
+libpqrcv_disconnect(void)
+{
+ PQfinish(streamConn);
+ streamConn = NULL;
+ justconnected = false;
+}
+
+/*
+ * Receive any WAL records available from XLOG stream, blocking for
+ * maximum of 'timeout' ms.
+ *
+ * Returns:
+ *
+ * True if data was received. *recptr, *buffer and *len are set to
+ * the WAL location of the received data, buffer holding it, and length,
+ * respectively.
+ *
+ * False if no data was available within timeout, or wait was interrupted
+ * by signal.
+ *
+ * The buffer returned is only valid until the next call of this function or
+ * libpq_connect/disconnect.
+ *
+ * ereports on error.
+ */
+static bool
+libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len)
+{
+ int rawlen;
+
+ if (recvBuf != NULL)
+ PQfreemem(recvBuf);
+ recvBuf = NULL;
+
+ /*
+ * If the caller requested to block, wait for data to arrive. But if
+ * this is the first call after connecting, don't wait, because
+ * there might already be some data in libpq buffer that we haven't
+ * returned to caller.
+ */
+ if (timeout > 0 && !justconnected)
+ {
+ if (!libpq_select(timeout))
+ return false;
+
+ if (PQconsumeInput(streamConn) == 0)
+ ereport(ERROR,
+ (errmsg("could not read xlog records: %s",
+ PQerrorMessage(streamConn))));
+ }
+ justconnected = false;
+
+ /* Receive CopyData message */
+ rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
+ if (rawlen == 0) /* no records available yet, then return */
+ return false;
+ if (rawlen == -1) /* end-of-streaming or error */
+ {
+ PGresult *res;
+
+ res = PQgetResult(streamConn);
+ if (PQresultStatus(res) == PGRES_COMMAND_OK)
+ {
+ PQclear(res);
+ ereport(ERROR,
+ (errmsg("replication terminated by primary server")));
+ }
+ PQclear(res);
+ ereport(ERROR,
+ (errmsg("could not read xlog records: %s",
+ PQerrorMessage(streamConn))));
+ }
+ if (rawlen < -1)
+ ereport(ERROR,
+ (errmsg("could not read xlog records: %s",
+ PQerrorMessage(streamConn))));
+
+ if (rawlen < sizeof(XLogRecPtr))
+ ereport(ERROR,
+ (errmsg("invalid WAL message received from primary")));
+
+ /* Return received WAL records to caller */
+ *recptr = *((XLogRecPtr *) recvBuf);
+ *buffer = recvBuf + sizeof(XLogRecPtr);
+ *len = rawlen - sizeof(XLogRecPtr);
+
+ return true;
+}
* of the connection and a FATAL error are treated not as a crash but as
* normal operation.
*
- * Walreceiver is a postmaster child process like others, but it's compiled
- * as a dynamic module to avoid linking libpq with the main server binary.
+ * This file contains the server-facing parts of walreceiver. The libpq-
+ * specific parts are in the libpqwalreceiver module. It's loaded
+ * dynamically to avoid linking the server with libpq.
*
* Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/replication/walreceiver/walreceiver.c,v 1.2 2010/01/16 01:55:28 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.1 2010/01/20 09:16:24 heikki Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
+#include <signal.h>
#include <unistd.h>
-#include <sys/time.h>
#include "access/xlog_internal.h"
-#include "libpq-fe.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "replication/walreceiver.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
-#ifdef HAVE_POLL_H
-#include <poll.h>
-#endif
-#ifdef HAVE_SYS_POLL_H
-#include <sys/poll.h>
-#endif
-#ifdef HAVE_SYS_SELECT_H
-#include <sys/select.h>
-#endif
-
-PG_MODULE_MAGIC;
-
-PG_FUNCTION_INFO_V1(WalReceiverMain);
-Datum WalReceiverMain(PG_FUNCTION_ARGS);
-
-/* streamConn is a PGconn object of a connection to walsender from walreceiver */
-static PGconn *streamConn = NULL;
+/* libpqreceiver hooks to these when loaded */
+walrcv_connect_type walrcv_connect = NULL;
+walrcv_receive_type walrcv_receive = NULL;
+walrcv_disconnect_type walrcv_disconnect = NULL;
#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
static uint32 recvSeg = 0;
static uint32 recvOff = 0;
-/* Buffer for currently read records */
-static char *recvBuf = NULL;
-
-/* Flags set by interrupt handlers of walreceiver for later service in the main loop */
+/*
+ * Flags set by interrupt handlers of walreceiver for later service in the
+ * main loop.
+ */
static volatile sig_atomic_t got_SIGHUP = false;
static volatile sig_atomic_t got_SIGTERM = false;
static void ProcessWalRcvInterrupts(void);
-static void EnableImmediateExit(void);
-static void DisableImmediateExit(void);
+static void EnableWalRcvImmediateExit(void);
+static void DisableWalRcvImmediateExit(void);
/*
* About SIGTERM handling:
}
static void
-EnableImmediateExit()
+EnableWalRcvImmediateExit()
{
WalRcvImmediateInterruptOK = true;
ProcessWalRcvInterrupts();
}
static void
-DisableImmediateExit()
+DisableWalRcvImmediateExit()
{
WalRcvImmediateInterruptOK = false;
ProcessWalRcvInterrupts();
static void WalRcvQuickDieHandler(SIGNAL_ARGS);
/* Prototypes for private functions */
-static void WalRcvLoop(void);
static void InitWalRcv(void);
-static void WalRcvConnect(void);
-static bool WalRcvWait(int timeout_ms);
static void WalRcvKill(int code, Datum arg);
-static void XLogRecv(void);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(void);
} LogstreamResult;
/* Main entry point for walreceiver process */
-Datum
-WalReceiverMain(PG_FUNCTION_ARGS)
+void
+WalReceiverMain(void)
{
sigjmp_buf local_sigjmp_buf;
MemoryContext walrcv_context;
+ char conninfo[MAXCONNINFO];
+ XLogRecPtr startpoint;
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalRcvData *walrcv = WalRcv;
+
+ /* Load the libpq-specific functions */
+ load_file("libpqwalreceiver", false);
+ if (walrcv_connect == NULL || walrcv_receive == NULL ||
+ walrcv_disconnect == NULL)
+ elog(ERROR, "libpqwalreceiver didn't initialize correctly");
/* Mark walreceiver in progress */
InitWalRcv();
error_context_stack = NULL;
/* Reset WalRcvImmediateInterruptOK */
- DisableImmediateExit();
+ DisableWalRcvImmediateExit();
/* Prevent interrupts while cleaning up */
HOLD_INTERRUPTS();
/* Report the error to the server log */
EmitErrorReport();
- /* Free the data structure related to a connection */
- PQfinish(streamConn);
- streamConn = NULL;
- if (recvBuf != NULL)
- PQfreemem(recvBuf);
- recvBuf = NULL;
+ /* Disconnect any previous connection. */
+ EnableWalRcvImmediateExit();
+ walrcv_disconnect();
+ DisableWalRcvImmediateExit();
/*
* Now return to normal top-level context and clear ErrorContext for
/* Unblock signals (they were blocked when the postmaster forked us) */
PG_SETMASK(&UnBlockSig);
- /* Establish the connection to the primary for XLOG streaming */
- WalRcvConnect();
-
- /* Main loop of walreceiver */
- WalRcvLoop();
+ /* Fetch connection information from shared memory */
+ SpinLockAcquire(&walrcv->mutex);
+ strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
+ startpoint = walrcv->receivedUpto;
+ SpinLockRelease(&walrcv->mutex);
- PG_RETURN_VOID(); /* WalRcvLoop() never returns, but keep compiler quiet */
-}
+ /* Establish the connection to the primary for XLOG streaming */
+ EnableWalRcvImmediateExit();
+ walrcv_connect(conninfo, startpoint);
+ DisableWalRcvImmediateExit();
-/* Main loop of walreceiver process */
-static void
-WalRcvLoop(void)
-{
/* Loop until end-of-streaming or error */
for (;;)
{
+ XLogRecPtr recptr;
+ char *buf;
+ int len;
+
/*
* Emergency bailout if postmaster has died. This is to avoid the
* necessity for manual cleanup of all postmaster children.
}
/* Wait a while for data to arrive */
- if (WalRcvWait(NAPTIME_PER_CYCLE))
+ if (walrcv_receive(NAPTIME_PER_CYCLE, &recptr, &buf, &len))
{
- /* data has arrived. Process it */
- if (PQconsumeInput(streamConn) == 0)
- ereport(ERROR,
- (errmsg("could not read xlog records: %s",
- PQerrorMessage(streamConn))));
- XLogRecv();
+ /* Write received WAL records to disk */
+ XLogWalRcvWrite(buf, len, recptr);
+
+ /* Receive any more WAL records we can without sleeping */
+ while(walrcv_receive(0, &recptr, &buf, &len))
+ XLogWalRcvWrite(buf, len, recptr);
+
+ /*
+ * Now that we've written some records, flush them to disk and
+ * let the startup process know about them.
+ */
+ XLogWalRcvFlush();
}
}
}
on_shmem_exit(WalRcvKill, 0);
}
-/*
- * Establish the connection to the primary server for XLOG streaming
- */
-static void
-WalRcvConnect(void)
-{
- char conninfo[MAXCONNINFO + 14];
- char *primary_sysid;
- char standby_sysid[32];
- TimeLineID primary_tli;
- TimeLineID standby_tli;
- PGresult *res;
- XLogRecPtr recptr;
- char cmd[64];
- /* use volatile pointer to prevent code rearrangement */
- volatile WalRcvData *walrcv = WalRcv;
-
- /*
- * Set up a connection for XLOG streaming
- */
- SpinLockAcquire(&walrcv->mutex);
- snprintf(conninfo, sizeof(conninfo), "%s replication=true", walrcv->conninfo);
- recptr = walrcv->receivedUpto;
- SpinLockRelease(&walrcv->mutex);
-
- /* initialize local XLOG pointers */
- LogstreamResult.Write = LogstreamResult.Flush = recptr;
-
- Assert(recptr.xlogid != 0 || recptr.xrecoff != 0);
-
- EnableImmediateExit();
- streamConn = PQconnectdb(conninfo);
- DisableImmediateExit();
- if (PQstatus(streamConn) != CONNECTION_OK)
- ereport(ERROR,
- (errmsg("could not connect to the primary server : %s",
- PQerrorMessage(streamConn))));
-
- /*
- * Get the system identifier and timeline ID as a DataRow message
- * from the primary server.
- */
- EnableImmediateExit();
- res = PQexec(streamConn, "IDENTIFY_SYSTEM");
- DisableImmediateExit();
- if (PQresultStatus(res) != PGRES_TUPLES_OK)
- {
- PQclear(res);
- ereport(ERROR,
- (errmsg("could not receive the SYSID and timeline ID from "
- "the primary server: %s",
- PQerrorMessage(streamConn))));
- }
- if (PQnfields(res) != 2 || PQntuples(res) != 1)
- {
- int ntuples = PQntuples(res);
- int nfields = PQnfields(res);
- PQclear(res);
- ereport(ERROR,
- (errmsg("invalid response from primary server"),
- errdetail("expected 1 tuple with 2 fields, got %d tuples with %d fields",
- ntuples, nfields)));
- }
- primary_sysid = PQgetvalue(res, 0, 0);
- primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
-
- /*
- * Confirm that the system identifier of the primary is the same
- * as ours.
- */
- snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
- GetSystemIdentifier());
- if (strcmp(primary_sysid, standby_sysid) != 0)
- {
- PQclear(res);
- ereport(ERROR,
- (errmsg("system differs between the primary and standby"),
- errdetail("the primary SYSID is %s, standby SYSID is %s",
- primary_sysid, standby_sysid)));
- }
-
- /*
- * Confirm that the current timeline of the primary is the same
- * as the recovery target timeline.
- */
- standby_tli = GetRecoveryTargetTLI();
- PQclear(res);
- if (primary_tli != standby_tli)
- ereport(ERROR,
- (errmsg("timeline %u of the primary does not match recovery target timeline %u",
- primary_tli, standby_tli)));
- ThisTimeLineID = primary_tli;
-
- /* Start streaming from the point requested by startup process */
- snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X", recptr.xlogid, recptr.xrecoff);
- EnableImmediateExit();
- res = PQexec(streamConn, cmd);
- DisableImmediateExit();
- if (PQresultStatus(res) != PGRES_COPY_OUT)
- ereport(ERROR,
- (errmsg("could not start XLOG streaming: %s",
- PQerrorMessage(streamConn))));
- PQclear(res);
-
- /*
- * Process the outstanding messages before beginning to wait for
- * new message to arrive.
- */
- XLogRecv();
-}
-
-/*
- * Wait until we can read WAL stream, or timeout.
- *
- * Returns true if data has become available for reading, false if timed out
- * or interrupted by signal.
- *
- * This is based on pqSocketCheck.
- */
-static bool
-WalRcvWait(int timeout_ms)
-{
- int ret;
-
- Assert(streamConn != NULL);
- if (PQsocket(streamConn) < 0)
- ereport(ERROR,
- (errcode_for_socket_access(),
- errmsg("socket not open")));
-
- /* We use poll(2) if available, otherwise select(2) */
- {
-#ifdef HAVE_POLL
- struct pollfd input_fd;
-
- input_fd.fd = PQsocket(streamConn);
- input_fd.events = POLLIN | POLLERR;
- input_fd.revents = 0;
-
- ret = poll(&input_fd, 1, timeout_ms);
-#else /* !HAVE_POLL */
-
- fd_set input_mask;
- struct timeval timeout;
- struct timeval *ptr_timeout;
-
- FD_ZERO(&input_mask);
- FD_SET(PQsocket(streamConn), &input_mask);
-
- if (timeout_ms < 0)
- ptr_timeout = NULL;
- else
- {
- timeout.tv_sec = timeout_ms / 1000;
- timeout.tv_usec = (timeout_ms % 1000) * 1000;
- ptr_timeout = &timeout;
- }
-
- ret = select(PQsocket(streamConn) + 1, &input_mask,
- NULL, NULL, ptr_timeout);
-#endif /* HAVE_POLL */
- }
-
- if (ret == 0 || (ret < 0 && errno == EINTR))
- return false;
- if (ret < 0)
- ereport(ERROR,
- (errcode_for_socket_access(),
- errmsg("select() failed: %m")));
- return true;
-}
-
/*
* Clear our pid from shared memory at exit.
*/
walrcv->pid = 0;
SpinLockRelease(&walrcv->mutex);
- PQfinish(streamConn);
+ walrcv_disconnect();
/* If requested to stop, tell postmaster to not restart us. */
if (stopped)
exit(2);
}
-/*
- * Receive any WAL records available without blocking from XLOG stream and
- * write it to the disk.
- */
-static void
-XLogRecv(void)
-{
- XLogRecPtr *recptr;
- int len;
-
- for (;;)
- {
- /* Receive CopyData message */
- len = PQgetCopyData(streamConn, &recvBuf, 1);
- if (len == 0) /* no records available yet, then return */
- break;
- if (len == -1) /* end-of-streaming or error */
- {
- PGresult *res;
-
- res = PQgetResult(streamConn);
- if (PQresultStatus(res) == PGRES_COMMAND_OK)
- {
- PQclear(res);
- ereport(ERROR,
- (errmsg("replication terminated by primary server")));
- }
- PQclear(res);
- ereport(ERROR,
- (errmsg("could not read xlog records: %s",
- PQerrorMessage(streamConn))));
- }
- if (len < -1)
- ereport(ERROR,
- (errmsg("could not read xlog records: %s",
- PQerrorMessage(streamConn))));
-
- if (len < sizeof(XLogRecPtr))
- ereport(ERROR,
- (errmsg("invalid WAL message received from primary")));
-
- /* Write received WAL records to disk */
- recptr = (XLogRecPtr *) recvBuf;
- XLogWalRcvWrite(recvBuf + sizeof(XLogRecPtr),
- len - sizeof(XLogRecPtr), *recptr);
-
- if (recvBuf != NULL)
- PQfreemem(recvBuf);
- recvBuf = NULL;
- }
-
- /*
- * Now that we've written some records, flush them to disk and let the
- * startup process know about them.
- */
- XLogWalRcvFlush();
-}
-
/*
* Write XLOG data to disk.
*/
*
* This file contains functions used by the startup process to communicate
* with the walreceiver process. Functions implementing walreceiver itself
- * are in src/backend/replication/walreceiver subdirectory.
+ * are in walreceiver.c.
*
* Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.1 2010/01/15 09:19:03 heikki Exp $
+ * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.2 2010/01/20 09:16:24 heikki Exp $
*
*-------------------------------------------------------------------------
*/
*
* Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
*
- * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.2 2010/01/16 00:04:41 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.3 2010/01/20 09:16:24 heikki Exp $
*
*-------------------------------------------------------------------------
*/
#ifndef _WALRECEIVER_H
#define _WALRECEIVER_H
+#include "access/xlogdefs.h"
#include "storage/spin.h"
/*
extern PGDLLIMPORT WalRcvData *WalRcv;
+/* libpqwalreceiver hooks */
+typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint);
+extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
+
+typedef bool (*walrcv_receive_type) (int timeout, XLogRecPtr *recptr, char **buffer, int *len);
+extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
+
+typedef void (*walrcv_disconnect_type) (void);
+extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
+
+extern void WalReceiverMain(void);
extern Size WalRcvShmemSize(void);
extern void WalRcvShmemInit(void);
extern bool WalRcvInProgress(void);