]> granicus.if.org Git - postgresql/commitdiff
Rethink the way walreceiver is linked into the backend. Instead than shoving
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 20 Jan 2010 09:16:24 +0000 (09:16 +0000)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 20 Jan 2010 09:16:24 +0000 (09:16 +0000)
walreceiver as whole into a dynamically loaded module, split the
libpq-specific parts of it into dynamically loaded module and keep the rest
in the main backend binary.

Although Tom fixed the Windows compilation problems with the old walreceiver
module already, this is a cleaner division of labour and makes the code
more readable. There's also the prospect of adding new transport methods
as pluggable modules in the future, which this patch makes easier, though for
now the API between libpqwalreceiver and walreceiver process should be
considered private.

The libpq-specific module is now in src/backend/replication/libpqwalreceiver,
and the part linked with postgres binary is in
src/backend/replication/walreceiver.c.

src/Makefile
src/backend/bootstrap/bootstrap.c
src/backend/replication/Makefile
src/backend/replication/README
src/backend/replication/libpqwalreceiver/Makefile [moved from src/backend/replication/walreceiver/Makefile with 60% similarity]
src/backend/replication/libpqwalreceiver/libpqwalreceiver.c [new file with mode: 0644]
src/backend/replication/walreceiver.c [moved from src/backend/replication/walreceiver/walreceiver.c with 66% similarity]
src/backend/replication/walreceiverfuncs.c
src/include/replication/walreceiver.h

index f4d726e7694409ea39eae721c9921da8c9f7852e..93b2d17a8221479b9cc290db6cdd9516cc22e416 100644 (file)
@@ -4,7 +4,7 @@
 #
 # Copyright (c) 1994, Regents of the University of California
 #
-# $PostgreSQL: pgsql/src/Makefile,v 1.49 2010/01/15 17:01:06 heikki Exp $
+# $PostgreSQL: pgsql/src/Makefile,v 1.50 2010/01/20 09:16:23 heikki Exp $
 #
 #-------------------------------------------------------------------------
 
@@ -21,7 +21,7 @@ all install installdirs uninstall distprep:
        $(MAKE) -C backend/snowball $@
        $(MAKE) -C include $@
        $(MAKE) -C interfaces $@
-       $(MAKE) -C backend/replication/walreceiver $@
+       $(MAKE) -C backend/replication/libpqwalreceiver $@
        $(MAKE) -C bin $@
        $(MAKE) -C pl $@
        $(MAKE) -C makefiles $@
@@ -52,7 +52,7 @@ clean:
        $(MAKE) -C backend/snowball $@
        $(MAKE) -C include $@
        $(MAKE) -C interfaces $@
-       $(MAKE) -C backend/replication/walreceiver $@
+       $(MAKE) -C backend/replication/libpqwalreceiver $@
        $(MAKE) -C bin $@
        $(MAKE) -C pl $@
        $(MAKE) -C makefiles $@
@@ -67,7 +67,7 @@ distclean maintainer-clean:
        $(MAKE) -C backend/snowball $@
        $(MAKE) -C include $@
        $(MAKE) -C interfaces $@
-       $(MAKE) -C backend/replication/walreceiver $@
+       $(MAKE) -C backend/replication/libpqwalreceiver $@
        $(MAKE) -C bin $@
        $(MAKE) -C pl $@
        $(MAKE) -C makefiles $@
@@ -82,7 +82,7 @@ coverage:
        $(MAKE) -C backend/utils/mb/conversion_procs $@
        $(MAKE) -C backend/snowball $@
        $(MAKE) -C interfaces $@
-       $(MAKE) -C backend/replication/walreceiver $@
+       $(MAKE) -C backend/replication/libpqwalreceiver $@
        $(MAKE) -C bin $@
        $(MAKE) -C pl $@
 
index 84dd6638efc43f713fa74ad0257c2a1b53b70390..396589660851d7dd33def8fe6f77d3cae0f79ef3 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/bootstrap/bootstrap.c,v 1.256 2010/01/15 09:19:00 heikki Exp $
+ *       $PostgreSQL: pgsql/src/backend/bootstrap/bootstrap.c,v 1.257 2010/01/20 09:16:23 heikki Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -425,20 +425,7 @@ AuxiliaryProcessMain(int argc, char *argv[])
 
                case WalReceiverProcess:
                        /* don't set signals, walreceiver has its own agenda */
-                       {
-                               PGFunction WalReceiverMain;
-
-                               /*
-                                * Walreceiver is not linked directly into the server
-                                * binary because we would then need to link the server
-                                * with libpq. It's compiled as a dynamically loaded module
-                                * to avoid that.
-                                */
-                               WalReceiverMain = load_external_function("walreceiver",
-                                                                                                                "WalReceiverMain",
-                                                                                                                true, NULL);
-                               WalReceiverMain(NULL);
-                       }
+                       WalReceiverMain();
                        proc_exit(1);           /* should never return */
 
                default:
index 7903c1ac5e448da2abdbbc4b71710011d666f2cd..64a966b1cc72fee2499dd4cd3d679de96836fe16 100644 (file)
@@ -4,7 +4,7 @@
 #    Makefile for src/backend/replication
 #
 # IDENTIFICATION
-#    $PostgreSQL: pgsql/src/backend/replication/Makefile,v 1.1 2010/01/15 09:19:03 heikki Exp $
+#    $PostgreSQL: pgsql/src/backend/replication/Makefile,v 1.2 2010/01/20 09:16:24 heikki Exp $
 #
 #-------------------------------------------------------------------------
 
@@ -12,6 +12,6 @@ subdir = src/backend/replication
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = walsender.o walreceiverfuncs.o
+OBJS = walsender.o walreceiverfuncs.o walreceiver.o
 
 include $(top_srcdir)/src/backend/common.mk
index 0f40dc79e90dc4b1ace4ea76feca9b12f5d03536..8b15dea58ebbf193ee2693b43b49e24c6e06ead4 100644 (file)
@@ -1,4 +1,36 @@
-$PostgreSQL: pgsql/src/backend/replication/README,v 1.1 2010/01/15 09:19:03 heikki Exp $
+$PostgreSQL: pgsql/src/backend/replication/README,v 1.2 2010/01/20 09:16:24 heikki Exp $
+
+Walreceiver - libpqwalreceiver API
+----------------------------------
+
+The transport-specific part of walreceiver, responsible for connecting to
+the primary server and receiving WAL files, is loaded dynamically to avoid
+having to link the main server binary with libpq. The dynamically loaded
+module is in libpqwalreceiver subdirectory.
+
+The dynamically loaded module implements three functions:
+
+
+bool walrcv_connect(char *conninfo, XLogRecPtr startpoint)
+
+Establish connection to the primary, and starts streaming from 'startpoint'.
+Returns true on success.
+
+
+bool walrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len)
+
+Retrieve any WAL record available through the connection, blocking for
+maximum of 'timeout' ms.
+
+
+void walrcv_disconnect(void);
+
+Disconnect.
+
+
+This API should be considered internal at the moment, but we could open it
+up for 3rd party replacements of libpqwalreceiver in the future, allowing
+pluggable methods for receiveing WAL.
 
 Walreceiver IPC
 ---------------
similarity index 60%
rename from src/backend/replication/walreceiver/Makefile
rename to src/backend/replication/libpqwalreceiver/Makefile
index 3376ba6ec871029a50f992cf63c6831b9b1ca78a..df28b90c4caaa24038faa9ecdb8ad67668f7bdcf 100644 (file)
@@ -1,24 +1,22 @@
 #-------------------------------------------------------------------------
 #
 # Makefile--
-#    Makefile for src/backend/replication/walreceiver
+#    Makefile for src/backend/replication/libpqwalreceiver
 #
 # IDENTIFICATION
-#    $PostgreSQL: pgsql/src/backend/replication/walreceiver/Makefile,v 1.4 2010/01/15 21:06:26 tgl Exp $
+#    $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/Makefile,v 1.1 2010/01/20 09:16:24 heikki Exp $
 #
 #-------------------------------------------------------------------------
 
-subdir = src/backend/replication/walreceiver
+subdir = src/backend/postmaster/libpqwalreceiver
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
-
-OBJS = walreceiver.o
+override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS)
 
+OBJS = libpqwalreceiver.o
 SHLIB_LINK = $(libpq)
-
-NAME := walreceiver
+NAME = libpqwalreceiver
 
 all: submake-libpq all-shared-lib
 
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
new file mode 100644 (file)
index 0000000..54b86fd
--- /dev/null
@@ -0,0 +1,317 @@
+/*-------------------------------------------------------------------------
+ *
+ * libpqwalreceiver.c
+ *
+ * This file contains the libpq-specific parts of walreceiver. It's
+ * loaded as a dynamic module to avoid linking the main server binary with
+ * libpq.
+ *
+ * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *       $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.1 2010/01/20 09:16:24 heikki Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <unistd.h>
+#include <sys/time.h>
+
+#include "libpq-fe.h"
+#include "access/xlog.h"
+#include "miscadmin.h"
+#include "replication/walreceiver.h"
+#include "utils/builtins.h"
+
+#ifdef HAVE_POLL_H
+#include <poll.h>
+#endif
+#ifdef HAVE_SYS_POLL_H
+#include <sys/poll.h>
+#endif
+#ifdef HAVE_SYS_SELECT_H
+#include <sys/select.h>
+#endif
+
+PG_MODULE_MAGIC;
+
+void           _PG_init(void);
+
+/* Current connection to the primary, if any */
+static PGconn *streamConn = NULL;
+static bool justconnected = false;
+
+/* Buffer for currently read records */
+static char *recvBuf = NULL;
+
+/* Prototypes for interface functions */
+static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
+static bool libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer,
+                         int *len);
+static void libpqrcv_disconnect(void);
+
+/* Prototypes for private functions */
+static bool libpq_select(int timeout_ms);
+
+/*
+ * Module load callback
+ */
+void
+_PG_init(void)
+{
+       /* Tell walreceiver how to reach us */
+       if (walrcv_connect != NULL || walrcv_receive != NULL || walrcv_disconnect)
+               elog(ERROR, "libpqwalreceiver already loaded");
+       walrcv_connect = libpqrcv_connect;
+       walrcv_receive = libpqrcv_receive;
+       walrcv_disconnect = libpqrcv_disconnect;
+}
+
+/*
+ * Establish the connection to the primary server for XLOG streaming
+ */
+static bool
+libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
+{
+       char            conninfo_repl[MAXCONNINFO + 14];
+       char       *primary_sysid;
+       char            standby_sysid[32];
+       TimeLineID      primary_tli;
+       TimeLineID      standby_tli;
+       PGresult   *res;
+       char            cmd[64];
+
+       Assert(startpoint.xlogid != 0 || startpoint.xrecoff != 0);
+
+       /* Connect */
+       snprintf(conninfo_repl, sizeof(conninfo_repl), "%s replication=true", conninfo);
+
+       streamConn = PQconnectdb(conninfo_repl);
+       if (PQstatus(streamConn) != CONNECTION_OK)
+               ereport(ERROR,
+                               (errmsg("could not connect to the primary server : %s",
+                                               PQerrorMessage(streamConn))));
+
+       /*
+        * Get the system identifier and timeline ID as a DataRow message
+        * from the primary server.
+        */
+       res = PQexec(streamConn, "IDENTIFY_SYSTEM");
+       if (PQresultStatus(res) != PGRES_TUPLES_OK)
+    {
+               PQclear(res);
+               ereport(ERROR,
+                               (errmsg("could not receive the SYSID and timeline ID from "
+                                               "the primary server: %s",
+                                               PQerrorMessage(streamConn))));
+    }
+       if (PQnfields(res) != 2 || PQntuples(res) != 1)
+       {
+               int ntuples = PQntuples(res);
+               int nfields = PQnfields(res);
+               PQclear(res);
+               ereport(ERROR,
+                               (errmsg("invalid response from primary server"),
+                                errdetail("expected 1 tuple with 2 fields, got %d tuples with %d fields",
+                                                  ntuples, nfields)));
+       }
+       primary_sysid = PQgetvalue(res, 0, 0);
+       primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
+
+       /*
+        * Confirm that the system identifier of the primary is the same
+        * as ours.
+        */
+       snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
+                        GetSystemIdentifier());
+       if (strcmp(primary_sysid, standby_sysid) != 0)
+       {
+               PQclear(res);
+               ereport(ERROR,
+                               (errmsg("system differs between the primary and standby"),
+                                errdetail("the primary SYSID is %s, standby SYSID is %s",
+                                                  primary_sysid, standby_sysid)));
+       }
+
+       /*
+        * Confirm that the current timeline of the primary is the same
+        * as the recovery target timeline.
+        */
+       standby_tli = GetRecoveryTargetTLI();
+       PQclear(res);
+       if (primary_tli != standby_tli)
+               ereport(ERROR,
+                               (errmsg("timeline %u of the primary does not match recovery target timeline %u",
+                                               primary_tli, standby_tli)));
+       ThisTimeLineID = primary_tli;
+
+       /* Start streaming from the point requested by startup process */
+       snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
+                        startpoint.xlogid, startpoint.xrecoff);
+       res = PQexec(streamConn, cmd);
+       if (PQresultStatus(res) != PGRES_COPY_OUT)
+               ereport(ERROR,
+                               (errmsg("could not start XLOG streaming: %s",
+                                               PQerrorMessage(streamConn))));
+       PQclear(res);
+
+       justconnected = true;
+
+       return true;
+}
+
+/*
+ * Wait until we can read WAL stream, or timeout.
+ *
+ * Returns true if data has become available for reading, false if timed out
+ * or interrupted by signal.
+ *
+ * This is based on pqSocketCheck.
+ */
+static bool
+libpq_select(int timeout_ms)
+{
+       int     ret;
+
+       Assert(streamConn != NULL);
+       if (PQsocket(streamConn) < 0)
+               ereport(ERROR,
+                               (errcode_for_socket_access(),
+                                errmsg("socket not open")));
+
+       /* We use poll(2) if available, otherwise select(2) */
+       {
+#ifdef HAVE_POLL
+               struct pollfd input_fd;
+
+               input_fd.fd = PQsocket(streamConn);
+               input_fd.events = POLLIN | POLLERR;
+               input_fd.revents = 0;
+
+               ret = poll(&input_fd, 1, timeout_ms);
+#else                                                  /* !HAVE_POLL */
+
+               fd_set          input_mask;
+               struct timeval timeout;
+               struct timeval *ptr_timeout;
+
+               FD_ZERO(&input_mask);
+               FD_SET(PQsocket(streamConn), &input_mask);
+
+               if (timeout_ms < 0)
+                       ptr_timeout = NULL;
+               else
+               {
+                       timeout.tv_sec  = timeout_ms / 1000;
+                       timeout.tv_usec = (timeout_ms % 1000) * 1000;
+                       ptr_timeout             = &timeout;
+               }
+
+               ret = select(PQsocket(streamConn) + 1, &input_mask,
+                                        NULL, NULL, ptr_timeout);
+#endif   /* HAVE_POLL */
+       }
+
+       if (ret == 0 || (ret < 0 && errno == EINTR))
+               return false;
+       if (ret < 0)
+               ereport(ERROR,
+                               (errcode_for_socket_access(),
+                                errmsg("select() failed: %m")));
+       return true;
+}
+
+/*
+ * Disconnect connection to primary, if any.
+ */
+static void
+libpqrcv_disconnect(void)
+{
+       PQfinish(streamConn);
+       streamConn = NULL;
+       justconnected = false;
+}
+
+/*
+ * Receive any WAL records available from XLOG stream, blocking for
+ * maximum of 'timeout' ms.
+ *
+ * Returns:
+ *
+ *   True if data was received. *recptr, *buffer and *len are set to
+ *   the WAL location of the received data, buffer holding it, and length,
+ *   respectively.
+ *
+ *   False if no data was available within timeout, or wait was interrupted
+ *   by signal.
+ *
+ * The buffer returned is only valid until the next call of this function or
+ * libpq_connect/disconnect.
+ *
+ * ereports on error.
+ */
+static bool
+libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len)
+{
+       int                     rawlen;
+
+       if (recvBuf != NULL)
+               PQfreemem(recvBuf);
+       recvBuf = NULL;
+
+       /*
+        * If the caller requested to block, wait for data to arrive. But if
+        * this is the first call after connecting, don't wait, because
+        * there might already be some data in libpq buffer that we haven't
+        * returned to caller.
+        */
+       if (timeout > 0 && !justconnected)
+       {
+               if (!libpq_select(timeout))
+                       return false;
+
+               if (PQconsumeInput(streamConn) == 0)
+                       ereport(ERROR,
+                                       (errmsg("could not read xlog records: %s",
+                                                       PQerrorMessage(streamConn))));
+       }
+       justconnected = false;
+
+       /* Receive CopyData message */
+       rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
+       if (rawlen == 0)        /* no records available yet, then return */
+               return false;
+       if (rawlen == -1)       /* end-of-streaming or error */
+       {
+               PGresult        *res;
+
+               res = PQgetResult(streamConn);
+               if (PQresultStatus(res) == PGRES_COMMAND_OK)
+               {
+                       PQclear(res);
+                       ereport(ERROR,
+                                       (errmsg("replication terminated by primary server")));
+               }
+               PQclear(res);
+               ereport(ERROR,
+                               (errmsg("could not read xlog records: %s",
+                                               PQerrorMessage(streamConn))));
+       }
+       if (rawlen < -1)
+               ereport(ERROR,
+                               (errmsg("could not read xlog records: %s",
+                                               PQerrorMessage(streamConn))));
+
+       if (rawlen < sizeof(XLogRecPtr))
+               ereport(ERROR,
+                               (errmsg("invalid WAL message received from primary")));
+
+       /* Return received WAL records to caller */
+       *recptr = *((XLogRecPtr *) recvBuf);
+       *buffer = recvBuf + sizeof(XLogRecPtr);
+       *len = rawlen - sizeof(XLogRecPtr);
+
+       return true;
+}
similarity index 66%
rename from src/backend/replication/walreceiver/walreceiver.c
rename to src/backend/replication/walreceiver.c
index 65b1dfe1e6bb91e5dca1526658ae7b701153cc25..f805e673e114b651cc16c4a6404b2b32ac3776ce 100644 (file)
  * of the connection and a FATAL error are treated not as a crash but as
  * normal operation.
  *
- * Walreceiver is a postmaster child process like others, but it's compiled
- * as a dynamic module to avoid linking libpq with the main server binary.
+ * This file contains the server-facing parts of walreceiver. The libpq-
+ * specific parts are in the libpqwalreceiver module. It's loaded
+ * dynamically to avoid linking the server with libpq.
  *
  * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/replication/walreceiver/walreceiver.c,v 1.2 2010/01/16 01:55:28 momjian Exp $
+ *       $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.1 2010/01/20 09:16:24 heikki Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
+#include <signal.h>
 #include <unistd.h>
-#include <sys/time.h>
 
 #include "access/xlog_internal.h"
-#include "libpq-fe.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "replication/walreceiver.h"
 #include "utils/ps_status.h"
 #include "utils/resowner.h"
 
-#ifdef HAVE_POLL_H
-#include <poll.h>
-#endif
-#ifdef HAVE_SYS_POLL_H
-#include <sys/poll.h>
-#endif
-#ifdef HAVE_SYS_SELECT_H
-#include <sys/select.h>
-#endif
-
-PG_MODULE_MAGIC;
-
-PG_FUNCTION_INFO_V1(WalReceiverMain);
-Datum WalReceiverMain(PG_FUNCTION_ARGS);
-
-/* streamConn is a PGconn object of a connection to walsender from walreceiver */
-static PGconn *streamConn = NULL;
+/* libpqreceiver hooks to these when loaded */
+walrcv_connect_type walrcv_connect = NULL;
+walrcv_receive_type walrcv_receive = NULL;
+walrcv_disconnect_type walrcv_disconnect = NULL;
 
 #define NAPTIME_PER_CYCLE 100  /* max sleep time between cycles (100ms) */
 
@@ -79,16 +66,16 @@ static uint32 recvId = 0;
 static uint32 recvSeg = 0;
 static uint32 recvOff = 0;
 
-/* Buffer for currently read records */
-static char *recvBuf = NULL;
-
-/* Flags set by interrupt handlers of walreceiver for later service in the main loop */
+/*
+ * Flags set by interrupt handlers of walreceiver for later service in the
+ * main loop.
+ */
 static volatile sig_atomic_t got_SIGHUP = false;
 static volatile sig_atomic_t got_SIGTERM = false;
 
 static void ProcessWalRcvInterrupts(void);
-static void EnableImmediateExit(void);
-static void DisableImmediateExit(void);
+static void EnableWalRcvImmediateExit(void);
+static void DisableWalRcvImmediateExit(void);
 
 /*
  * About SIGTERM handling:
@@ -128,14 +115,14 @@ ProcessWalRcvInterrupts(void)
 }
 
 static void
-EnableImmediateExit()
+EnableWalRcvImmediateExit()
 {
        WalRcvImmediateInterruptOK = true;
        ProcessWalRcvInterrupts();
 }
 
 static void
-DisableImmediateExit()
+DisableWalRcvImmediateExit()
 {
        WalRcvImmediateInterruptOK = false;
        ProcessWalRcvInterrupts();
@@ -147,12 +134,8 @@ static void WalRcvShutdownHandler(SIGNAL_ARGS);
 static void WalRcvQuickDieHandler(SIGNAL_ARGS);
 
 /* Prototypes for private functions */
-static void WalRcvLoop(void);
 static void InitWalRcv(void);
-static void WalRcvConnect(void);
-static bool WalRcvWait(int timeout_ms);
 static void WalRcvKill(int code, Datum arg);
-static void XLogRecv(void);
 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
 static void XLogWalRcvFlush(void);
 
@@ -167,11 +150,21 @@ static struct
 } LogstreamResult;
 
 /* Main entry point for walreceiver process */
-Datum
-WalReceiverMain(PG_FUNCTION_ARGS)
+void
+WalReceiverMain(void)
 {
        sigjmp_buf      local_sigjmp_buf;
        MemoryContext walrcv_context;
+       char conninfo[MAXCONNINFO];
+       XLogRecPtr startpoint;
+       /* use volatile pointer to prevent code rearrangement */
+       volatile WalRcvData *walrcv = WalRcv;
+
+       /* Load the libpq-specific functions */
+       load_file("libpqwalreceiver", false);
+       if (walrcv_connect == NULL || walrcv_receive == NULL ||
+               walrcv_disconnect == NULL)
+               elog(ERROR, "libpqwalreceiver didn't initialize correctly");
 
        /* Mark walreceiver in progress */
        InitWalRcv();
@@ -236,7 +229,7 @@ WalReceiverMain(PG_FUNCTION_ARGS)
                error_context_stack = NULL;
 
                /* Reset WalRcvImmediateInterruptOK */
-               DisableImmediateExit();
+               DisableWalRcvImmediateExit();
 
                /* Prevent interrupts while cleaning up */
                HOLD_INTERRUPTS();
@@ -244,12 +237,10 @@ WalReceiverMain(PG_FUNCTION_ARGS)
                /* Report the error to the server log */
                EmitErrorReport();
 
-               /* Free the data structure related to a connection */
-               PQfinish(streamConn);
-               streamConn = NULL;
-               if (recvBuf != NULL)
-                       PQfreemem(recvBuf);
-               recvBuf = NULL;
+               /* Disconnect any previous connection. */
+               EnableWalRcvImmediateExit();
+               walrcv_disconnect();
+               DisableWalRcvImmediateExit();
 
                /*
                 * Now return to normal top-level context and clear ErrorContext for
@@ -278,22 +269,24 @@ WalReceiverMain(PG_FUNCTION_ARGS)
        /* Unblock signals (they were blocked when the postmaster forked us) */
        PG_SETMASK(&UnBlockSig);
 
-       /* Establish the connection to the primary for XLOG streaming */
-       WalRcvConnect();
-
-       /* Main loop of walreceiver */
-       WalRcvLoop();
+       /* Fetch connection information from shared memory */
+       SpinLockAcquire(&walrcv->mutex);
+       strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
+       startpoint = walrcv->receivedUpto;
+       SpinLockRelease(&walrcv->mutex);
 
-       PG_RETURN_VOID(); /* WalRcvLoop() never returns, but keep compiler quiet */
-}
+       /* Establish the connection to the primary for XLOG streaming */
+       EnableWalRcvImmediateExit();
+       walrcv_connect(conninfo, startpoint);
+       DisableWalRcvImmediateExit();
 
-/* Main loop of walreceiver process */
-static void
-WalRcvLoop(void)
-{
        /* Loop until end-of-streaming or error */
        for (;;)
        {
+               XLogRecPtr recptr;
+               char   *buf;
+               int             len;
+
                /*
                 * Emergency bailout if postmaster has died.  This is to avoid the
                 * necessity for manual cleanup of all postmaster children.
@@ -319,14 +312,20 @@ WalRcvLoop(void)
                }
 
                /* Wait a while for data to arrive */
-               if (WalRcvWait(NAPTIME_PER_CYCLE))
+               if (walrcv_receive(NAPTIME_PER_CYCLE, &recptr, &buf, &len))
                {
-                       /* data has arrived. Process it */
-                       if (PQconsumeInput(streamConn) == 0)
-                               ereport(ERROR,
-                                               (errmsg("could not read xlog records: %s",
-                                                               PQerrorMessage(streamConn))));
-                       XLogRecv();
+                       /* Write received WAL records to disk */
+                       XLogWalRcvWrite(buf, len, recptr);
+
+                       /* Receive any more WAL records we can without sleeping */
+                       while(walrcv_receive(0, &recptr, &buf, &len))
+                               XLogWalRcvWrite(buf, len, recptr);
+
+                       /*
+                        * Now that we've written some records, flush them to disk and
+                        * let the startup process know about them.
+                        */
+                       XLogWalRcvFlush();
                }
        }
 }
@@ -362,178 +361,6 @@ InitWalRcv(void)
        on_shmem_exit(WalRcvKill, 0);
 }
 
-/*
- * Establish the connection to the primary server for XLOG streaming
- */
-static void
-WalRcvConnect(void)
-{
-       char            conninfo[MAXCONNINFO + 14];
-       char       *primary_sysid;
-       char            standby_sysid[32];
-       TimeLineID      primary_tli;
-       TimeLineID      standby_tli;
-       PGresult   *res;
-       XLogRecPtr      recptr;
-       char            cmd[64];
-       /* use volatile pointer to prevent code rearrangement */
-       volatile WalRcvData *walrcv = WalRcv;
-
-       /*
-        * Set up a connection for XLOG streaming
-        */
-       SpinLockAcquire(&walrcv->mutex);
-       snprintf(conninfo, sizeof(conninfo), "%s replication=true", walrcv->conninfo);
-       recptr = walrcv->receivedUpto;
-       SpinLockRelease(&walrcv->mutex);
-
-       /* initialize local XLOG pointers */
-       LogstreamResult.Write = LogstreamResult.Flush = recptr;
-
-       Assert(recptr.xlogid != 0 || recptr.xrecoff != 0);
-
-       EnableImmediateExit();
-       streamConn = PQconnectdb(conninfo);
-       DisableImmediateExit();
-       if (PQstatus(streamConn) != CONNECTION_OK)
-               ereport(ERROR,
-                               (errmsg("could not connect to the primary server : %s",
-                                               PQerrorMessage(streamConn))));
-
-       /*
-        * Get the system identifier and timeline ID as a DataRow message
-        * from the primary server.
-        */
-       EnableImmediateExit();
-       res = PQexec(streamConn, "IDENTIFY_SYSTEM");
-       DisableImmediateExit();
-       if (PQresultStatus(res) != PGRES_TUPLES_OK)
-    {
-               PQclear(res);
-               ereport(ERROR,
-                               (errmsg("could not receive the SYSID and timeline ID from "
-                                               "the primary server: %s",
-                                               PQerrorMessage(streamConn))));
-    }
-       if (PQnfields(res) != 2 || PQntuples(res) != 1)
-       {
-               int ntuples = PQntuples(res);
-               int nfields = PQnfields(res);
-               PQclear(res);
-               ereport(ERROR,
-                               (errmsg("invalid response from primary server"),
-                                errdetail("expected 1 tuple with 2 fields, got %d tuples with %d fields",
-                                                  ntuples, nfields)));
-       }
-       primary_sysid = PQgetvalue(res, 0, 0);
-       primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
-
-       /*
-        * Confirm that the system identifier of the primary is the same
-        * as ours.
-        */
-       snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
-                        GetSystemIdentifier());
-       if (strcmp(primary_sysid, standby_sysid) != 0)
-       {
-               PQclear(res);
-               ereport(ERROR,
-                               (errmsg("system differs between the primary and standby"),
-                                errdetail("the primary SYSID is %s, standby SYSID is %s",
-                                                  primary_sysid, standby_sysid)));
-       }
-
-       /*
-        * Confirm that the current timeline of the primary is the same
-        * as the recovery target timeline.
-        */
-       standby_tli = GetRecoveryTargetTLI();
-       PQclear(res);
-       if (primary_tli != standby_tli)
-               ereport(ERROR,
-                               (errmsg("timeline %u of the primary does not match recovery target timeline %u",
-                                               primary_tli, standby_tli)));
-       ThisTimeLineID = primary_tli;
-
-       /* Start streaming from the point requested by startup process */
-       snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X", recptr.xlogid, recptr.xrecoff);
-       EnableImmediateExit();
-       res = PQexec(streamConn, cmd);
-       DisableImmediateExit();
-       if (PQresultStatus(res) != PGRES_COPY_OUT)
-               ereport(ERROR,
-                               (errmsg("could not start XLOG streaming: %s",
-                                               PQerrorMessage(streamConn))));
-       PQclear(res);
-
-       /*
-        * Process the outstanding messages before beginning to wait for
-        * new message to arrive.
-        */
-       XLogRecv();
-}
-
-/*
- * Wait until we can read WAL stream, or timeout.
- *
- * Returns true if data has become available for reading, false if timed out
- * or interrupted by signal.
- *
- * This is based on pqSocketCheck.
- */
-static bool
-WalRcvWait(int timeout_ms)
-{
-       int     ret;
-
-       Assert(streamConn != NULL);
-       if (PQsocket(streamConn) < 0)
-               ereport(ERROR,
-                               (errcode_for_socket_access(),
-                                errmsg("socket not open")));
-
-       /* We use poll(2) if available, otherwise select(2) */
-       {
-#ifdef HAVE_POLL
-               struct pollfd input_fd;
-
-               input_fd.fd = PQsocket(streamConn);
-               input_fd.events = POLLIN | POLLERR;
-               input_fd.revents = 0;
-
-               ret = poll(&input_fd, 1, timeout_ms);
-#else                                                  /* !HAVE_POLL */
-
-               fd_set          input_mask;
-               struct timeval timeout;
-               struct timeval *ptr_timeout;
-
-               FD_ZERO(&input_mask);
-               FD_SET(PQsocket(streamConn), &input_mask);
-
-               if (timeout_ms < 0)
-                       ptr_timeout = NULL;
-               else
-               {
-                       timeout.tv_sec  = timeout_ms / 1000;
-                       timeout.tv_usec = (timeout_ms % 1000) * 1000;
-                       ptr_timeout             = &timeout;
-               }
-
-               ret = select(PQsocket(streamConn) + 1, &input_mask,
-                                        NULL, NULL, ptr_timeout);
-#endif   /* HAVE_POLL */
-       }
-
-       if (ret == 0 || (ret < 0 && errno == EINTR))
-               return false;
-       if (ret < 0)
-               ereport(ERROR,
-                               (errcode_for_socket_access(),
-                                errmsg("select() failed: %m")));
-       return true;
-}
-
 /*
  * Clear our pid from shared memory at exit.
  */
@@ -555,7 +382,7 @@ WalRcvKill(int code, Datum arg)
        walrcv->pid = 0;
        SpinLockRelease(&walrcv->mutex);
 
-       PQfinish(streamConn);
+       walrcv_disconnect();
 
        /* If requested to stop, tell postmaster to not restart us. */
        if (stopped)
@@ -612,64 +439,6 @@ WalRcvQuickDieHandler(SIGNAL_ARGS)
        exit(2);
 }
 
-/*
- * Receive any WAL records available without blocking from XLOG stream and
- * write it to the disk.
- */
-static void
-XLogRecv(void)
-{
-       XLogRecPtr *recptr;
-       int                     len;
-
-       for (;;)
-       {
-               /* Receive CopyData message */
-               len = PQgetCopyData(streamConn, &recvBuf, 1);
-               if (len == 0)   /* no records available yet, then return */
-                       break;
-               if (len == -1)  /* end-of-streaming or error */
-               {
-                       PGresult        *res;
-
-                       res = PQgetResult(streamConn);
-                       if (PQresultStatus(res) == PGRES_COMMAND_OK)
-                       {
-                               PQclear(res);
-                               ereport(ERROR,
-                                               (errmsg("replication terminated by primary server")));
-                       }
-                       PQclear(res);
-                       ereport(ERROR,
-                                       (errmsg("could not read xlog records: %s",
-                                                       PQerrorMessage(streamConn))));
-               }
-               if (len < -1)
-                       ereport(ERROR,
-                                       (errmsg("could not read xlog records: %s",
-                                                       PQerrorMessage(streamConn))));
-
-               if (len < sizeof(XLogRecPtr))
-                       ereport(ERROR,
-                                       (errmsg("invalid WAL message received from primary")));
-
-               /* Write received WAL records to disk */
-               recptr = (XLogRecPtr *) recvBuf;
-               XLogWalRcvWrite(recvBuf + sizeof(XLogRecPtr),
-                                               len - sizeof(XLogRecPtr), *recptr);
-
-               if (recvBuf != NULL)
-                       PQfreemem(recvBuf);
-               recvBuf = NULL;
-       }
-
-       /*
-        * Now that we've written some records, flush them to disk and let the
-        * startup process know about them.
-        */
-       XLogWalRcvFlush();
-}
-
 /*
  * Write XLOG data to disk.
  */
index 4342e252d65e2b05315415241ee44beccb268b0c..c1d7b5588740808afef7f3e925b553f0123bce4e 100644 (file)
@@ -4,13 +4,13 @@
  *
  * This file contains functions used by the startup process to communicate
  * with the walreceiver process. Functions implementing walreceiver itself
- * are in src/backend/replication/walreceiver subdirectory.
+ * are in walreceiver.c.
  *
  * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.1 2010/01/15 09:19:03 heikki Exp $
+ *       $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.2 2010/01/20 09:16:24 heikki Exp $
  *
  *-------------------------------------------------------------------------
  */
index f848a9e509c71efbd129448805ed07ad55c5797a..57de368d41feb915fccfde7c4d954429d0315730 100644 (file)
@@ -5,13 +5,14 @@
  *
  * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
  *
- * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.2 2010/01/16 00:04:41 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.3 2010/01/20 09:16:24 heikki Exp $
  *
  *-------------------------------------------------------------------------
  */
 #ifndef _WALRECEIVER_H
 #define _WALRECEIVER_H
 
+#include "access/xlogdefs.h"
 #include "storage/spin.h"
 
 /*
@@ -60,6 +61,17 @@ typedef struct
 
 extern PGDLLIMPORT WalRcvData *WalRcv;
 
+/* libpqwalreceiver hooks */
+typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint);
+extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
+
+typedef bool (*walrcv_receive_type) (int timeout, XLogRecPtr *recptr, char **buffer, int *len);
+extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
+
+typedef void (*walrcv_disconnect_type) (void);
+extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
+
+extern void WalReceiverMain(void);
 extern Size WalRcvShmemSize(void);
 extern void WalRcvShmemInit(void);
 extern bool WalRcvInProgress(void);