void _PG_init(void);
-/* Current connection to the primary, if any */
-static PGconn *streamConn = NULL;
-
-/* Buffer for currently read records */
-static char *recvBuf = NULL;
+struct WalReceiverConn
+{
+ /* Current connection to the primary, if any */
+ PGconn *streamConn;
+ /* Used to remember if the connection is logical or physical */
+ bool logical;
+ /* Buffer for currently read records */
+ char *recvBuf;
+};
/* Prototypes for interface functions */
-static void libpqrcv_connect(char *conninfo);
-static char *libpqrcv_get_conninfo(void);
-static void libpqrcv_identify_system(TimeLineID *primary_tli);
-static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len);
-static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint,
- char *slotname);
-static void libpqrcv_endstreaming(TimeLineID *next_tli);
-static int libpqrcv_receive(char **buffer, pgsocket *wait_fd);
-static void libpqrcv_send(const char *buffer, int nbytes);
-static void libpqrcv_disconnect(void);
+static WalReceiverConn *libpqrcv_connect(const char *conninfo,
+ bool logical, const char *appname);
+static char *libpqrcv_get_conninfo(WalReceiverConn *conn);
+static char *libpqrcv_identify_system(WalReceiverConn *conn,
+ TimeLineID *primary_tli);
+static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
+ TimeLineID tli, char **filename,
+ char **content, int *len);
+static bool libpqrcv_startstreaming(WalReceiverConn *conn,
+ TimeLineID tli, XLogRecPtr startpoint,
+ const char *slotname);
+static void libpqrcv_endstreaming(WalReceiverConn *conn,
+ TimeLineID *next_tli);
+static int libpqrcv_receive(WalReceiverConn *conn, char **buffer,
+ pgsocket *wait_fd);
+static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
+ int nbytes);
+static void libpqrcv_disconnect(WalReceiverConn *conn);
+
+static WalReceiverFunctionsType PQWalReceiverFunctions = {
+ libpqrcv_connect,
+ libpqrcv_get_conninfo,
+ libpqrcv_identify_system,
+ libpqrcv_readtimelinehistoryfile,
+ libpqrcv_startstreaming,
+ libpqrcv_endstreaming,
+ libpqrcv_receive,
+ libpqrcv_send,
+ libpqrcv_disconnect
+};
/* Prototypes for private functions */
-static PGresult *libpqrcv_PQexec(const char *query);
+static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
/*
- * Module load callback
+ * Module initialization function
*/
void
_PG_init(void)
{
- /* Tell walreceiver how to reach us */
- if (walrcv_connect != NULL || walrcv_identify_system != NULL ||
- walrcv_readtimelinehistoryfile != NULL ||
- walrcv_startstreaming != NULL || walrcv_endstreaming != NULL ||
- walrcv_receive != NULL || walrcv_send != NULL ||
- walrcv_disconnect != NULL)
+ if (WalReceiverFunctions != NULL)
elog(ERROR, "libpqwalreceiver already loaded");
- walrcv_connect = libpqrcv_connect;
- walrcv_get_conninfo = libpqrcv_get_conninfo;
- walrcv_identify_system = libpqrcv_identify_system;
- walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile;
- walrcv_startstreaming = libpqrcv_startstreaming;
- walrcv_endstreaming = libpqrcv_endstreaming;
- walrcv_receive = libpqrcv_receive;
- walrcv_send = libpqrcv_send;
- walrcv_disconnect = libpqrcv_disconnect;
+ WalReceiverFunctions = &PQWalReceiverFunctions;
}
/*
* Establish the connection to the primary server for XLOG streaming
*/
-static void
-libpqrcv_connect(char *conninfo)
+static WalReceiverConn *
+libpqrcv_connect(const char *conninfo, bool logical, const char *appname)
{
+ WalReceiverConn *conn;
const char *keys[5];
const char *vals[5];
+ int i = 0;
/*
* We use the expand_dbname parameter to process the connection string (or
* database name is ignored by the server in replication mode, but specify
* "replication" for .pgpass lookup.
*/
- keys[0] = "dbname";
- vals[0] = conninfo;
- keys[1] = "replication";
- vals[1] = "true";
- keys[2] = "dbname";
- vals[2] = "replication";
- keys[3] = "fallback_application_name";
- vals[3] = "walreceiver";
- keys[4] = NULL;
- vals[4] = NULL;
-
- streamConn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true);
- if (PQstatus(streamConn) != CONNECTION_OK)
+ keys[i] = "dbname";
+ vals[i] = conninfo;
+ keys[++i] = "replication";
+ vals[i] = logical ? "database" : "true";
+ if (!logical)
+ {
+ keys[++i] = "dbname";
+ vals[i] = "replication";
+ }
+ keys[++i] = "fallback_application_name";
+ vals[i] = appname;
+ keys[++i] = NULL;
+ vals[i] = NULL;
+
+ conn = palloc0(sizeof(WalReceiverConn));
+ conn->streamConn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true);
+ if (PQstatus(conn->streamConn) != CONNECTION_OK)
ereport(ERROR,
(errmsg("could not connect to the primary server: %s",
- PQerrorMessage(streamConn))));
+ PQerrorMessage(conn->streamConn))));
+ conn->logical = logical;
+
+ return conn;
}
/*
* are obfuscated.
*/
static char *
-libpqrcv_get_conninfo(void)
+libpqrcv_get_conninfo(WalReceiverConn *conn)
{
PQconninfoOption *conn_opts;
PQconninfoOption *conn_opt;
PQExpBufferData buf;
char *retval;
- Assert(streamConn != NULL);
+ Assert(conn->streamConn != NULL);
initPQExpBuffer(&buf);
- conn_opts = PQconninfo(streamConn);
+ conn_opts = PQconninfo(conn->streamConn);
if (conn_opts == NULL)
ereport(ERROR,
* Check that primary's system identifier matches ours, and fetch the current
* timeline ID of the primary.
*/
-static void
-libpqrcv_identify_system(TimeLineID *primary_tli)
+static char *
+libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
{
PGresult *res;
char *primary_sysid;
- char standby_sysid[32];
/*
* Get the system identifier and timeline ID as a DataRow message from the
* primary server.
*/
- res = libpqrcv_PQexec("IDENTIFY_SYSTEM");
+ res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
PQclear(res);
ereport(ERROR,
(errmsg("could not receive database system identifier and timeline ID from "
"the primary server: %s",
- PQerrorMessage(streamConn))));
+ PQerrorMessage(conn->streamConn))));
}
if (PQnfields(res) < 3 || PQntuples(res) != 1)
{
errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
ntuples, nfields, 3, 1)));
}
- primary_sysid = PQgetvalue(res, 0, 0);
+ primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
*primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
-
- /*
- * Confirm that the system identifier of the primary is the same as ours.
- */
- snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
- GetSystemIdentifier());
- if (strcmp(primary_sysid, standby_sysid) != 0)
- {
- primary_sysid = pstrdup(primary_sysid);
- PQclear(res);
- ereport(ERROR,
- (errmsg("database system identifier differs between the primary and standby"),
- errdetail("The primary's identifier is %s, the standby's identifier is %s.",
- primary_sysid, standby_sysid)));
- }
PQclear(res);
+
+ return primary_sysid;
}
/*
* throws an ERROR.
*/
static bool
-libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, char *slotname)
+libpqrcv_startstreaming(WalReceiverConn *conn,
+ TimeLineID tli, XLogRecPtr startpoint,
+ const char *slotname)
{
- char cmd[256];
+ StringInfoData cmd;
PGresult *res;
+ Assert(!conn->logical);
+
+ initStringInfo(&cmd);
+
/* Start streaming from the point requested by startup process */
if (slotname != NULL)
- snprintf(cmd, sizeof(cmd),
- "START_REPLICATION SLOT \"%s\" %X/%X TIMELINE %u", slotname,
- (uint32) (startpoint >> 32), (uint32) startpoint, tli);
+ appendStringInfo(&cmd,
+ "START_REPLICATION SLOT \"%s\" %X/%X TIMELINE %u",
+ slotname,
+ (uint32) (startpoint >> 32), (uint32) startpoint,
+ tli);
else
- snprintf(cmd, sizeof(cmd),
- "START_REPLICATION %X/%X TIMELINE %u",
- (uint32) (startpoint >> 32), (uint32) startpoint, tli);
- res = libpqrcv_PQexec(cmd);
+ appendStringInfo(&cmd, "START_REPLICATION %X/%X TIMELINE %u",
+ (uint32) (startpoint >> 32), (uint32) startpoint,
+ tli);
+ res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+ pfree(cmd.data);
if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
PQclear(res);
ereport(ERROR,
(errmsg("could not start WAL streaming: %s",
- PQerrorMessage(streamConn))));
+ PQerrorMessage(conn->streamConn))));
}
PQclear(res);
return true;
* reported by the server, or 0 if it did not report it.
*/
static void
-libpqrcv_endstreaming(TimeLineID *next_tli)
+libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
{
PGresult *res;
- if (PQputCopyEnd(streamConn, NULL) <= 0 || PQflush(streamConn))
+ if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
+ PQflush(conn->streamConn))
ereport(ERROR,
(errmsg("could not send end-of-streaming message to primary: %s",
- PQerrorMessage(streamConn))));
+ PQerrorMessage(conn->streamConn))));
+
+ *next_tli = 0;
/*
* After COPY is finished, we should receive a result set indicating the
* called after receiving CopyDone from the backend - the walreceiver
* never terminates replication on its own initiative.
*/
- res = PQgetResult(streamConn);
+ res = PQgetResult(conn->streamConn);
if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
/*
PQclear(res);
/* the result set should be followed by CommandComplete */
- res = PQgetResult(streamConn);
+ res = PQgetResult(conn->streamConn);
+ }
+ else if (PQresultStatus(res) == PGRES_COPY_OUT)
+ {
+ PQclear(res);
+
+ /* End the copy */
+ PQendcopy(conn->streamConn);
+
+ /* CommandComplete should follow */
+ res = PQgetResult(conn->streamConn);
}
- else
- *next_tli = 0;
if (PQresultStatus(res) != PGRES_COMMAND_OK)
ereport(ERROR,
(errmsg("error reading result of streaming command: %s",
- PQerrorMessage(streamConn))));
+ PQerrorMessage(conn->streamConn))));
PQclear(res);
/* Verify that there are no more results */
- res = PQgetResult(streamConn);
+ res = PQgetResult(conn->streamConn);
if (res != NULL)
ereport(ERROR,
(errmsg("unexpected result after CommandComplete: %s",
- PQerrorMessage(streamConn))));
+ PQerrorMessage(conn->streamConn))));
}
/*
* Fetch the timeline history file for 'tli' from primary.
*/
static void
-libpqrcv_readtimelinehistoryfile(TimeLineID tli,
- char **filename, char **content, int *len)
+libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
+ TimeLineID tli, char **filename,
+ char **content, int *len)
{
PGresult *res;
char cmd[64];
+ Assert(!conn->logical);
+
/*
* Request the primary to send over the history file for given timeline.
*/
snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
- res = libpqrcv_PQexec(cmd);
+ res = libpqrcv_PQexec(conn->streamConn, cmd);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
PQclear(res);
ereport(ERROR,
(errmsg("could not receive timeline history file from "
"the primary server: %s",
- PQerrorMessage(streamConn))));
+ PQerrorMessage(conn->streamConn))));
}
if (PQnfields(res) != 2 || PQntuples(res) != 1)
{
* Queries are always executed on the connection in streamConn.
*/
static PGresult *
-libpqrcv_PQexec(const char *query)
+libpqrcv_PQexec(PGconn *streamConn, const char *query)
{
PGresult *result = NULL;
PGresult *lastResult = NULL;
* Disconnect connection to primary, if any.
*/
static void
-libpqrcv_disconnect(void)
+libpqrcv_disconnect(WalReceiverConn *conn)
{
- PQfinish(streamConn);
- streamConn = NULL;
+ PQfinish(conn->streamConn);
+ if (conn->recvBuf != NULL)
+ PQfreemem(conn->recvBuf);
+ pfree(conn);
}
/*
* ereports on error.
*/
static int
-libpqrcv_receive(char **buffer, pgsocket *wait_fd)
+libpqrcv_receive(WalReceiverConn *conn, char **buffer,
+ pgsocket *wait_fd)
{
int rawlen;
- if (recvBuf != NULL)
- PQfreemem(recvBuf);
- recvBuf = NULL;
+ if (conn->recvBuf != NULL)
+ PQfreemem(conn->recvBuf);
+ conn->recvBuf = NULL;
/* Try to receive a CopyData message */
- rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
+ rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
if (rawlen == 0)
{
/* Try consuming some data. */
- if (PQconsumeInput(streamConn) == 0)
+ if (PQconsumeInput(conn->streamConn) == 0)
ereport(ERROR,
(errmsg("could not receive data from WAL stream: %s",
- PQerrorMessage(streamConn))));
+ PQerrorMessage(conn->streamConn))));
/* Now that we've consumed some input, try again */
- rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
+ rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
if (rawlen == 0)
{
/* Tell caller to try again when our socket is ready. */
- *wait_fd = PQsocket(streamConn);
+ *wait_fd = PQsocket(conn->streamConn);
return 0;
}
}
{
PGresult *res;
- res = PQgetResult(streamConn);
+ res = PQgetResult(conn->streamConn);
if (PQresultStatus(res) == PGRES_COMMAND_OK ||
PQresultStatus(res) == PGRES_COPY_IN)
{
PQclear(res);
ereport(ERROR,
(errmsg("could not receive data from WAL stream: %s",
- PQerrorMessage(streamConn))));
+ PQerrorMessage(conn->streamConn))));
}
}
if (rawlen < -1)
ereport(ERROR,
(errmsg("could not receive data from WAL stream: %s",
- PQerrorMessage(streamConn))));
+ PQerrorMessage(conn->streamConn))));
/* Return received messages to caller */
- *buffer = recvBuf;
+ *buffer = conn->recvBuf;
return rawlen;
}
* ereports on error.
*/
static void
-libpqrcv_send(const char *buffer, int nbytes)
+libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
{
- if (PQputCopyData(streamConn, buffer, nbytes) <= 0 ||
- PQflush(streamConn))
+ if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
+ PQflush(conn->streamConn))
ereport(ERROR,
(errmsg("could not send data to WAL stream: %s",
- PQerrorMessage(streamConn))));
+ PQerrorMessage(conn->streamConn))));
}
int wal_receiver_timeout;
bool hot_standby_feedback;
-/* libpqreceiver hooks to these when loaded */
-walrcv_connect_type walrcv_connect = NULL;
-walrcv_get_conninfo_type walrcv_get_conninfo = NULL;
-walrcv_identify_system_type walrcv_identify_system = NULL;
-walrcv_startstreaming_type walrcv_startstreaming = NULL;
-walrcv_endstreaming_type walrcv_endstreaming = NULL;
-walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile = NULL;
-walrcv_receive_type walrcv_receive = NULL;
-walrcv_send_type walrcv_send = NULL;
-walrcv_disconnect_type walrcv_disconnect = NULL;
+/* libpqwalreceiver connection */
+static WalReceiverConn *wrconn = NULL;
+WalReceiverFunctionsType *WalReceiverFunctions = NULL;
#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
/* Load the libpq-specific functions */
load_file("libpqwalreceiver", false);
- if (walrcv_connect == NULL ||
- walrcv_get_conninfo == NULL ||
- walrcv_startstreaming == NULL ||
- walrcv_endstreaming == NULL ||
- walrcv_identify_system == NULL ||
- walrcv_readtimelinehistoryfile == NULL ||
- walrcv_receive == NULL || walrcv_send == NULL ||
- walrcv_disconnect == NULL)
+ if (WalReceiverFunctions == NULL)
elog(ERROR, "libpqwalreceiver didn't initialize correctly");
/*
/* Establish the connection to the primary for XLOG streaming */
EnableWalRcvImmediateExit();
- walrcv_connect(conninfo);
+ wrconn = walrcv_connect(conninfo, false, "walreceiver");
DisableWalRcvImmediateExit();
/*
* Save user-visible connection string. This clobbers the original
* conninfo, for security.
*/
- tmp_conninfo = walrcv_get_conninfo();
+ tmp_conninfo = walrcv_get_conninfo(wrconn);
SpinLockAcquire(&walrcv->mutex);
memset(walrcv->conninfo, 0, MAXCONNINFO);
if (tmp_conninfo)
first_stream = true;
for (;;)
{
+ char *primary_sysid;
+ char standby_sysid[32];
+
/*
* Check that we're connected to a valid server using the
* IDENTIFY_SYSTEM replication command,
*/
EnableWalRcvImmediateExit();
- walrcv_identify_system(&primaryTLI);
+ primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
+
+ snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
+ GetSystemIdentifier());
+ if (strcmp(primary_sysid, standby_sysid) != 0)
+ {
+ ereport(ERROR,
+ (errmsg("database system identifier differs between the primary and standby"),
+ errdetail("The primary's identifier is %s, the standby's identifier is %s.",
+ primary_sysid, standby_sysid)));
+ }
DisableWalRcvImmediateExit();
/*
* on the new timeline.
*/
ThisTimeLineID = startpointTLI;
- if (walrcv_startstreaming(startpointTLI, startpoint,
+ if (walrcv_startstreaming(wrconn, startpointTLI, startpoint,
slotname[0] != '\0' ? slotname : NULL))
{
if (first_stream)
}
/* See if we can read data immediately */
- len = walrcv_receive(&buf, &wait_fd);
+ len = walrcv_receive(wrconn, &buf, &wait_fd);
if (len != 0)
{
/*
endofwal = true;
break;
}
- len = walrcv_receive(&buf, &wait_fd);
+ len = walrcv_receive(wrconn, &buf, &wait_fd);
}
/* Let the master know that we received some data. */
* our side, too.
*/
EnableWalRcvImmediateExit();
- walrcv_endstreaming(&primaryTLI);
+ walrcv_endstreaming(wrconn, &primaryTLI);
DisableWalRcvImmediateExit();
/*
tli)));
EnableWalRcvImmediateExit();
- walrcv_readtimelinehistoryfile(tli, &fname, &content, &len);
+ walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
DisableWalRcvImmediateExit();
/*
SpinLockRelease(&walrcv->mutex);
/* Terminate the connection gracefully. */
- if (walrcv_disconnect != NULL)
- walrcv_disconnect();
+ if (wrconn != NULL)
+ walrcv_disconnect(wrconn);
/* Wake up the startup process to notice promptly that we're gone */
WakeupRecovery();
(uint32) (applyPtr >> 32), (uint32) applyPtr,
requestReply ? " (reply requested)" : "");
- walrcv_send(reply_message.data, reply_message.len);
+ walrcv_send(wrconn, reply_message.data, reply_message.len);
}
/*
pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
pq_sendint(&reply_message, xmin, 4);
pq_sendint(&reply_message, nextEpoch, 4);
- walrcv_send(reply_message.data, reply_message.len);
+ walrcv_send(wrconn, reply_message.data, reply_message.len);
if (TransactionIdIsValid(xmin))
master_has_standby_xmin = true;
else
extern WalRcvData *WalRcv;
-/* libpqwalreceiver hooks */
-typedef void (*walrcv_connect_type) (char *conninfo);
-extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
-
-typedef char *(*walrcv_get_conninfo_type) (void);
-extern PGDLLIMPORT walrcv_get_conninfo_type walrcv_get_conninfo;
-
-typedef void (*walrcv_identify_system_type) (TimeLineID *primary_tli);
-extern PGDLLIMPORT walrcv_identify_system_type walrcv_identify_system;
-
-typedef void (*walrcv_readtimelinehistoryfile_type) (TimeLineID tli, char **filename, char **content, int *size);
-extern PGDLLIMPORT walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile;
-
-typedef bool (*walrcv_startstreaming_type) (TimeLineID tli, XLogRecPtr startpoint, char *slotname);
-extern PGDLLIMPORT walrcv_startstreaming_type walrcv_startstreaming;
+struct WalReceiverConn;
+typedef struct WalReceiverConn WalReceiverConn;
-typedef void (*walrcv_endstreaming_type) (TimeLineID *next_tli);
-extern PGDLLIMPORT walrcv_endstreaming_type walrcv_endstreaming;
-
-typedef int (*walrcv_receive_type) (char **buffer, pgsocket *wait_fd);
-extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
-
-typedef void (*walrcv_send_type) (const char *buffer, int nbytes);
-extern PGDLLIMPORT walrcv_send_type walrcv_send;
-
-typedef void (*walrcv_disconnect_type) (void);
-extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
+/* libpqwalreceiver hooks */
+typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, bool logical,
+ const char *appname);
+typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn);
+typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
+ TimeLineID *primary_tli);
+typedef void (*walrcv_readtimelinehistoryfile_fn) (WalReceiverConn *conn,
+ TimeLineID tli,
+ char **filename,
+ char **content, int *size);
+typedef bool (*walrcv_startstreaming_fn) (WalReceiverConn *conn,
+ TimeLineID tli,
+ XLogRecPtr startpoint,
+ const char *slotname);
+typedef void (*walrcv_endstreaming_fn) (WalReceiverConn *conn,
+ TimeLineID *next_tli);
+typedef int (*walrcv_receive_fn) (WalReceiverConn *conn, char **buffer,
+ pgsocket *wait_fd);
+typedef void (*walrcv_send_fn) (WalReceiverConn *conn, const char *buffer,
+ int nbytes);
+typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
+
+typedef struct WalReceiverFunctionsType
+{
+ walrcv_connect_fn connect;
+ walrcv_get_conninfo_fn get_conninfo;
+ walrcv_identify_system_fn identify_system;
+ walrcv_readtimelinehistoryfile_fn readtimelinehistoryfile;
+ walrcv_startstreaming_fn startstreaming;
+ walrcv_endstreaming_fn endstreaming;
+ walrcv_receive_fn receive;
+ walrcv_send_fn send;
+ walrcv_disconnect_fn disconnect;
+} WalReceiverFunctionsType;
+
+extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
+
+#define walrcv_connect(conninfo, logical, appname) \
+ WalReceiverFunctions->connect(conninfo, logical, appname)
+#define walrcv_get_conninfo(conn) \
+ WalReceiverFunctions->get_conninfo(conn)
+#define walrcv_identify_system(conn, primary_tli) \
+ WalReceiverFunctions->identify_system(conn, primary_tli)
+#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \
+ WalReceiverFunctions->readtimelinehistoryfile(conn, tli, filename, content, size)
+#define walrcv_startstreaming(conn, tli, startpoint, slotname) \
+ WalReceiverFunctions->startstreaming(conn, tli, startpoint, slotname)
+#define walrcv_endstreaming(conn, next_tli) \
+ WalReceiverFunctions->endstreaming(conn, next_tli)
+#define walrcv_receive(conn, buffer, wait_fd) \
+ WalReceiverFunctions->receive(conn, buffer, wait_fd)
+#define walrcv_send(conn, buffer, nbytes) \
+ WalReceiverFunctions->send(conn, buffer, nbytes)
+#define walrcv_disconnect(conn) \
+ WalReceiverFunctions->disconnect(conn)
/* prototypes for functions in walreceiver.c */
extern void WalReceiverMain(void) pg_attribute_noreturn();