/* Current connection to the primary, if any */
static PGconn *streamConn = NULL;
-static bool justconnected = false;
/* Buffer for currently read records */
static char *recvBuf = NULL;
}
PQclear(res);
- justconnected = true;
ereport(LOG,
(errmsg("streaming replication successfully connected to primary")));
{
PQfinish(streamConn);
streamConn = NULL;
- justconnected = false;
}
/*
PQfreemem(recvBuf);
recvBuf = NULL;
- /*
- * If the caller requested to block, wait for data to arrive. But if this
- * is the first call after connecting, don't wait, because there might
- * already be some data in libpq buffer that we haven't returned to
- * caller.
- */
- if (timeout > 0 && !justconnected)
+ /* Try to receive a CopyData message */
+ rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
+ if (rawlen == 0)
{
- if (!libpq_select(timeout))
- return false;
+ /*
+ * No data available yet. If the caller requested to block, wait for
+ * more data to arrive.
+ */
+ if (timeout > 0)
+ {
+ if (!libpq_select(timeout))
+ return false;
+ }
if (PQconsumeInput(streamConn) == 0)
ereport(ERROR,
(errmsg("could not receive data from WAL stream: %s",
PQerrorMessage(streamConn))));
- }
- justconnected = false;
- /* Receive CopyData message */
- rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
- if (rawlen == 0) /* no data available yet, then return */
- return false;
+ /* Now that we've consumed some input, try again */
+ rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
+ if (rawlen == 0)
+ return false;
+ }
if (rawlen == -1) /* end-of-streaming or error */
{
PGresult *res;