]> granicus.if.org Git - postgresql/commitdiff
Automatically terminate replication connections that are idle for more
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 30 Mar 2011 07:10:32 +0000 (10:10 +0300)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 30 Mar 2011 07:20:37 +0000 (10:20 +0300)
than replication_timeout (a new GUC) milliseconds. The TCP timeout is often
too long, you want the master to notice a dead connection much sooner.
People complained about that in 9.0 too, but with synchronous replication
it's even more important to notice dead connections promptly.

Fujii Masao and Heikki Linnakangas

doc/src/sgml/config.sgml
src/backend/libpq/pqcomm.c
src/backend/port/unix_latch.c
src/backend/port/win32/socket.c
src/backend/port/win32_latch.c
src/backend/replication/walsender.c
src/backend/utils/misc/guc.c
src/backend/utils/misc/postgresql.conf.sample
src/include/libpq/libpq.h
src/include/replication/walsender.h
src/include/storage/latch.h

index e0ebee63ea5844c48634616cb5e99de416f9f8bb..217e4e781deea025e9335e597dd8d57da2b13e7d 100644 (file)
@@ -2019,6 +2019,29 @@ SET ENABLE_SEQSCAN TO OFF;
        </para>
       </listitem>
      </varlistentry>
+
+     <varlistentry id="guc-replication-timeout" xreflabel="replication_timeout">
+      <term><varname>replication_timeout</varname> (<type>integer</type>)</term>
+      <indexterm>
+       <primary><varname>replication_timeout</> configuration parameter</primary>
+      </indexterm>
+      <listitem>
+       <para>
+        Terminate replication connections that are inactive longer
+        than the specified number of milliseconds. This is useful for
+        the primary server to detect a standby crash or network outage.
+        A value of zero means wait forever.  This parameter can only be set in
+        the <filename>postgresql.conf</> file or on the server command line.
+        The default value is 60 seconds.
+       </para>
+       <para>
+        To prevent connections from being terminated prematurely,
+        <xref linkend="guc-wal-receiver-status-interval">
+        must be enabled on the standby, and its value must be less than the
+        value of <varname>replication_timeout</>.
+       </para>
+      </listitem>
+     </varlistentry>
      </variablelist>
     </sect2>
 
@@ -2216,6 +2239,11 @@ SET ENABLE_SEQSCAN TO OFF;
        the <filename>postgresql.conf</> file or on the server command line.
        The default value is 10 seconds.
       </para>
+      <para>
+       When <xref linkend="guc-replication-timeout"> is enabled on the primary,
+       <varname>wal_receiver_status_interval</> must be enabled, and its value
+       must be less than the value of <varname>replication_timeout</>.
+      </para>
       </listitem>
      </varlistentry>
 
index 3c7b05ba8a82b082426685c61686fe4c52ae1770..724d3ae8940c5e54b40195845fe27a61538161b2 100644 (file)
  *             pq_peekbyte             - peek at next byte from connection
  *             pq_putbytes             - send bytes to connection (not flushed until pq_flush)
  *             pq_flush                - flush pending output
+ *             pq_flush_if_writable - flush pending output if writable without blocking
  *             pq_getbyte_if_available - get a byte if available without blocking
  *
  * message-level I/O (and old-style-COPY-OUT cruft):
  *             pq_putmessage   - send a normal message (suppressed in COPY OUT mode)
+ *             pq_putmessage_noblock - buffer a normal message (suppressed in COPY OUT)
  *             pq_startcopyout - inform libpq that a COPY OUT transfer is beginning
  *             pq_endcopyout   - end a COPY OUT transfer
  *
@@ -92,6 +94,7 @@
 #include "miscadmin.h"
 #include "storage/ipc.h"
 #include "utils/guc.h"
+#include "utils/memutils.h"
 
 /*
  * Configuration options
@@ -105,15 +108,21 @@ static char sock_path[MAXPGPATH];
 
 
 /*
- * Buffers for low-level I/O
+ * Buffers for low-level I/O.
+ *
+ * The receive buffer is fixed size. Send buffer is usually 8k, but can be
+ * enlarged by pq_putmessage_noblock() if the message doesn't fit otherwise.
  */
 
-#define PQ_BUFFER_SIZE 8192
+#define PQ_SEND_BUFFER_SIZE 8192
+#define PQ_RECV_BUFFER_SIZE 8192
 
-static char PqSendBuffer[PQ_BUFFER_SIZE];
+static char *PqSendBuffer;
+static int     PqSendBufferSize;       /* Size send buffer */
 static int     PqSendPointer;          /* Next index to store a byte in PqSendBuffer */
+static int     PqSendStart;            /* Next index to send a byte in PqSendBuffer */
 
-static char PqRecvBuffer[PQ_BUFFER_SIZE];
+static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
 static int     PqRecvPointer;          /* Next index to read a byte from PqRecvBuffer */
 static int     PqRecvLength;           /* End of data available in PqRecvBuffer */
 
@@ -128,6 +137,7 @@ static bool DoingCopyOut;
 static void pq_close(int code, Datum arg);
 static int     internal_putbytes(const char *s, size_t len);
 static int     internal_flush(void);
+static void pq_set_nonblocking(bool nonblocking);
 
 #ifdef HAVE_UNIX_SOCKETS
 static int     Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName);
@@ -142,7 +152,9 @@ static int  Setup_AF_UNIX(void);
 void
 pq_init(void)
 {
-       PqSendPointer = PqRecvPointer = PqRecvLength = 0;
+       PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
+       PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
+       PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
        PqCommBusy = false;
        DoingCopyOut = false;
        on_proc_exit(pq_close, 0);
@@ -732,6 +744,42 @@ TouchSocketFile(void)
  * --------------------------------
  */
 
+/* --------------------------------
+ *            pq_set_nonblocking - set socket blocking/non-blocking
+ *
+ * Sets the socket non-blocking if nonblocking is TRUE, or sets it
+ * blocking otherwise.
+ * --------------------------------
+ */
+static void
+pq_set_nonblocking(bool nonblocking)
+{
+       if (MyProcPort->noblock == nonblocking)
+               return;
+
+#ifdef WIN32
+       pgwin32_noblock = nonblocking ? 1 : 0;
+#else
+       /*
+        * Use COMMERROR on failure, because ERROR would try to send the error
+        * to the client, which might require changing the mode again, leading
+        * to infinite recursion.
+        */
+       if (nonblocking)
+       {
+               if (!pg_set_noblock(MyProcPort->sock))
+                       ereport(COMMERROR,
+                                       (errmsg("could not set socket to non-blocking mode: %m")));
+       }
+       else
+       {
+               if (!pg_set_block(MyProcPort->sock))
+                       ereport(COMMERROR,
+                                       (errmsg("could not set socket to blocking mode: %m")));
+       }
+#endif
+       MyProcPort->noblock = nonblocking;
+}
 
 /* --------------------------------
  *             pq_recvbuf - load some bytes into the input buffer
@@ -756,13 +804,16 @@ pq_recvbuf(void)
                        PqRecvLength = PqRecvPointer = 0;
        }
 
+       /* Ensure that we're in blocking mode */
+       pq_set_nonblocking(false);
+
        /* Can fill buffer from PqRecvLength and upwards */
        for (;;)
        {
                int                     r;
 
                r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
-                                               PQ_BUFFER_SIZE - PqRecvLength);
+                                               PQ_RECV_BUFFER_SIZE - PqRecvLength);
 
                if (r < 0)
                {
@@ -825,7 +876,6 @@ pq_peekbyte(void)
        return (unsigned char) PqRecvBuffer[PqRecvPointer];
 }
 
-
 /* --------------------------------
  *             pq_getbyte_if_available - get a single byte from connection,
  *                     if available
@@ -845,72 +895,38 @@ pq_getbyte_if_available(unsigned char *c)
                return 1;
        }
 
-       /* Temporarily put the socket into non-blocking mode */
-#ifdef WIN32
-       pgwin32_noblock = 1;
-#else
-       if (!pg_set_noblock(MyProcPort->sock))
-               ereport(ERROR,
-                               (errmsg("could not set socket to non-blocking mode: %m")));
-#endif
-       MyProcPort->noblock = true;
-       PG_TRY();
+       /* Put the socket into non-blocking mode */
+       pq_set_nonblocking(true);
+
+       r = secure_read(MyProcPort, c, 1);
+       if (r < 0)
        {
-               r = secure_read(MyProcPort, c, 1);
-               if (r < 0)
+               /*
+                * Ok if no data available without blocking or interrupted (though
+                * EINTR really shouldn't happen with a non-blocking socket).
+                * Report other errors.
+                */
+               if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
+                       r = 0;
+               else
                {
                        /*
-                        * Ok if no data available without blocking or interrupted (though
-                        * EINTR really shouldn't happen with a non-blocking socket).
-                        * Report other errors.
+                        * Careful: an ereport() that tries to write to the client
+                        * would cause recursion to here, leading to stack overflow
+                        * and core dump!  This message must go *only* to the
+                        * postmaster log.
                         */
-                       if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
-                               r = 0;
-                       else
-                       {
-                               /*
-                                * Careful: an ereport() that tries to write to the client
-                                * would cause recursion to here, leading to stack overflow
-                                * and core dump!  This message must go *only* to the
-                                * postmaster log.
-                                */
-                               ereport(COMMERROR,
-                                               (errcode_for_socket_access(),
-                                                errmsg("could not receive data from client: %m")));
-                               r = EOF;
-                       }
-               }
-               else if (r == 0)
-               {
-                       /* EOF detected */
+                       ereport(COMMERROR,
+                                       (errcode_for_socket_access(),
+                                        errmsg("could not receive data from client: %m")));
                        r = EOF;
                }
        }
-       PG_CATCH();
+       else if (r == 0)
        {
-               /*
-                * The rest of the backend code assumes the socket is in blocking
-                * mode, so treat failure as FATAL.
-                */
-#ifdef WIN32
-               pgwin32_noblock = 0;
-#else
-               if (!pg_set_block(MyProcPort->sock))
-                       ereport(FATAL,
-                                       (errmsg("could not set socket to blocking mode: %m")));
-#endif
-               MyProcPort->noblock = false;
-               PG_RE_THROW();
+               /* EOF detected */
+               r = EOF;
        }
-       PG_END_TRY();
-#ifdef WIN32
-       pgwin32_noblock = 0;
-#else
-       if (!pg_set_block(MyProcPort->sock))
-               ereport(FATAL,
-                               (errmsg("could not set socket to blocking mode: %m")));
-#endif
-       MyProcPort->noblock = false;
 
        return r;
 }
@@ -1138,10 +1154,13 @@ internal_putbytes(const char *s, size_t len)
        while (len > 0)
        {
                /* If buffer is full, then flush it out */
-               if (PqSendPointer >= PQ_BUFFER_SIZE)
+               if (PqSendPointer >= PqSendBufferSize)
+               {
+                       pq_set_nonblocking(false);
                        if (internal_flush())
                                return EOF;
-               amount = PQ_BUFFER_SIZE - PqSendPointer;
+               }
+               amount = PqSendBufferSize - PqSendPointer;
                if (amount > len)
                        amount = len;
                memcpy(PqSendBuffer + PqSendPointer, s, amount);
@@ -1167,17 +1186,25 @@ pq_flush(void)
        if (PqCommBusy)
                return 0;
        PqCommBusy = true;
+       pq_set_nonblocking(false);
        res = internal_flush();
        PqCommBusy = false;
        return res;
 }
 
+/* --------------------------------
+ *             internal_flush - flush pending output
+ *
+ * Returns 0 if OK (meaning everything was sent, or operation would block
+ * and the socket is in non-blocking mode), or EOF if trouble.
+ * --------------------------------
+ */
 static int
 internal_flush(void)
 {
        static int      last_reported_send_errno = 0;
 
-       char       *bufptr = PqSendBuffer;
+       char       *bufptr = PqSendBuffer + PqSendStart;
        char       *bufend = PqSendBuffer + PqSendPointer;
 
        while (bufptr < bufend)
@@ -1191,6 +1218,16 @@ internal_flush(void)
                        if (errno == EINTR)
                                continue;               /* Ok if we were interrupted */
 
+                       /*
+                        * Ok if no data writable without blocking, and the socket
+                        * is in non-blocking mode.
+                        */
+                       if (errno == EAGAIN ||
+                               errno == EWOULDBLOCK)
+                       {
+                               return 0;
+                       }
+
                        /*
                         * Careful: an ereport() that tries to write to the client would
                         * cause recursion to here, leading to stack overflow and core
@@ -1212,18 +1249,56 @@ internal_flush(void)
                         * We drop the buffered data anyway so that processing can
                         * continue, even though we'll probably quit soon.
                         */
-                       PqSendPointer = 0;
+                       PqSendStart = PqSendPointer = 0;
                        return EOF;
                }
 
                last_reported_send_errno = 0;   /* reset after any successful send */
                bufptr += r;
+               PqSendStart += r;
        }
 
-       PqSendPointer = 0;
+       PqSendStart = PqSendPointer = 0;
        return 0;
 }
 
+/* --------------------------------
+ *             pq_flush_if_writable - flush pending output if writable without blocking
+ *
+ * Returns 0 if OK, or EOF if trouble.
+ * --------------------------------
+ */
+int
+pq_flush_if_writable(void)
+{
+       int                     res;
+
+       /* Quick exit if nothing to do */
+       if (PqSendPointer == PqSendStart)
+               return 0;
+
+       /* No-op if reentrant call */
+       if (PqCommBusy)
+               return 0;
+
+       /* Temporarily put the socket into non-blocking mode */
+       pq_set_nonblocking(true);
+
+       PqCommBusy = true;
+       res = internal_flush();
+       PqCommBusy = false;
+       return res;
+}
+
+/* --------------------------------
+ *             pq_is_send_pending      - is there any pending data in the output buffer?
+ * --------------------------------
+ */
+bool
+pq_is_send_pending(void)
+{
+       return (PqSendStart < PqSendPointer);
+}
 
 /* --------------------------------
  * Message-level I/O routines begin here.
@@ -1285,6 +1360,33 @@ fail:
        return EOF;
 }
 
+/* --------------------------------
+ *             pq_putmessage_noblock   - like pq_putmessage, but never blocks
+ *
+ *             If the output buffer is too small to hold the message, the buffer
+ *             is enlarged.
+ */
+void
+pq_putmessage_noblock(char msgtype, const char *s, size_t len)
+{
+       int res;
+       int required;
+
+       /*
+        * Ensure we have enough space in the output buffer for the message header
+        * as well as the message itself.
+        */
+       required = PqSendPointer + 1 + 4 + len;
+       if (required > PqSendBufferSize)
+       {
+               PqSendBuffer = repalloc(PqSendBuffer, required);
+               PqSendBufferSize = required;
+       }
+       res = pq_putmessage(msgtype, s, len);
+       Assert(res == 0);       /* should not fail when the message fits in buffer */
+}
+
+
 /* --------------------------------
  *             pq_startcopyout - inform libpq that an old-style COPY OUT transfer
  *                     is beginning
index a4f559ed3fa4811947620c6328e592372f273305..32d0cb5e3f86d3f9f6600c45e7c9c0b9fb9c3305 100644 (file)
@@ -193,19 +193,21 @@ DisownLatch(volatile Latch *latch)
 bool
 WaitLatch(volatile Latch *latch, long timeout)
 {
-       return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0;
+       return WaitLatchOrSocket(latch, PGINVALID_SOCKET, false, false, timeout) > 0;
 }
 
 /*
  * Like WaitLatch, but will also return when there's data available in
- * 'sock' for reading. Returns 0 if timeout was reached, 1 if the latch
- * was set, or 2 if the scoket became readable.
+ * 'sock' for reading or writing. Returns 0 if timeout was reached,
+ * 1 if the latch was set, 2 if the socket became readable or writable.
  */
 int
-WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
+WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, bool forRead,
+                                 bool forWrite, long timeout)
 {
        struct timeval tv, *tvp = NULL;
        fd_set          input_mask;
+       fd_set          output_mask;
        int                     rc;
        int                     result = 0;
 
@@ -241,14 +243,22 @@ WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
                FD_ZERO(&input_mask);
                FD_SET(selfpipe_readfd, &input_mask);
                hifd = selfpipe_readfd;
-               if (sock != PGINVALID_SOCKET)
+               if (sock != PGINVALID_SOCKET && forRead)
                {
                        FD_SET(sock, &input_mask);
                        if (sock > hifd)
                                hifd = sock;
                }
 
-               rc = select(hifd + 1, &input_mask, NULL, NULL, tvp);
+               FD_ZERO(&output_mask);
+               if (sock != PGINVALID_SOCKET && forWrite)
+               {
+                       FD_SET(sock, &output_mask);
+                       if (sock > hifd)
+                               hifd = sock;
+               }
+
+               rc = select(hifd + 1, &input_mask, &output_mask, NULL, tvp);
                if (rc < 0)
                {
                        if (errno == EINTR)
@@ -263,7 +273,9 @@ WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
                        result = 0;
                        break;
                }
-               if (sock != PGINVALID_SOCKET && FD_ISSET(sock, &input_mask))
+               if (sock != PGINVALID_SOCKET &&
+                       ((forRead && FD_ISSET(sock, &input_mask)) ||
+                        (forWrite && FD_ISSET(sock, &output_mask))))
                {
                        result = 2;
                        break;          /* data available in socket */
index 76dd6be9a63f7dd89208fef37509cd3671501614..dbbd4a35d16849233595ac5c4061b95235c10351 100644 (file)
@@ -14,7 +14,8 @@
 #include "postgres.h"
 
 /*
- * Indicate if pgwin32_recv() should operate in non-blocking mode.
+ * Indicate if pgwin32_recv() and pgwin32_send() should operate
+ * in non-blocking mode.
  *
  * Since the socket emulation layer always sets the actual socket to
  * non-blocking mode in order to be able to deliver signals, we must
@@ -399,6 +400,16 @@ pgwin32_send(SOCKET s, char *buf, int len, int flags)
                        return -1;
                }
 
+               if (pgwin32_noblock)
+               {
+                       /*
+                        * No data sent, and we are in "emulated non-blocking mode", so
+                        * return indicating that we'd block if we were to continue.
+                        */
+                       errno = EWOULDBLOCK;
+                       return -1;
+               }
+
                /* No error, zero bytes (win2000+) or error+WSAEWOULDBLOCK (<=nt4) */
 
                if (pgwin32_waitforsinglesocket(s, FD_WRITE | FD_CLOSE, INFINITE) == 0)
index ac20c4958f1fa8ae9ca83f6aa9166ff3db331dcd..f42cfef40e76c267841bcaebef4cefc7091cd806 100644 (file)
@@ -85,11 +85,12 @@ DisownLatch(volatile Latch *latch)
 bool
 WaitLatch(volatile Latch *latch, long timeout)
 {
-       return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0;
+       return WaitLatchOrSocket(latch, PGINVALID_SOCKET, false, false, timeout) > 0;
 }
 
 int
-WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
+WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, bool forRead,
+                                 bool forWrite, long timeout)
 {
        DWORD           rc;
        HANDLE          events[3];
@@ -103,10 +104,17 @@ WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
        events[0] = latchevent;
        events[1] = pgwin32_signal_event;
        numevents = 2;
-       if (sock != PGINVALID_SOCKET)
+       if (sock != PGINVALID_SOCKET && (forRead || forWrite))
        {
+               int             flags = 0;
+
+               if (forRead)
+                       flags |= FD_READ;
+               if (forWrite)
+                       flags |= FD_WRITE;
+
                sockevent = WSACreateEvent();
-               WSAEventSelect(sock, sockevent, FD_READ);
+               WSAEventSelect(sock, sockevent, flags);
                events[numevents++] = sockevent;
        }
 
@@ -139,8 +147,18 @@ WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
                        pgwin32_dispatch_queued_signals();
                else if (rc == WAIT_OBJECT_0 + 2)
                {
+                       WSANETWORKEVENTS resEvents;
+
                        Assert(sock != PGINVALID_SOCKET);
-                       result = 2;
+
+                       ZeroMemory(&resEvents, sizeof(resEvents));
+                       if (WSAEnumNetworkEvents(sock, sockevent, &resEvents) == SOCKET_ERROR)
+                               ereport(FATAL,
+                                               (errmsg_internal("failed to enumerate network events: %i", (int) GetLastError())));
+
+                       if ((forRead && resEvents.lNetworkEvents & FD_READ) ||
+                               (forWrite && resEvents.lNetworkEvents & FD_WRITE))
+                               result = 2;
                        break;
                }
                else if (rc != WAIT_OBJECT_0)
@@ -148,7 +166,7 @@ WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
        }
 
        /* Clean up the handle we created for the socket */
-               if (sock != PGINVALID_SOCKET)
+       if (sock != PGINVALID_SOCKET && (forRead || forWrite))
        {
                WSAEventSelect(sock, sockevent, 0);
                WSACloseEvent(sockevent);
index 2e2659a8c9fec86ea0ac8194a8f9025f3235483c..363f6181adb34722e57b3231f3a6a54bff46d770 100644 (file)
@@ -74,6 +74,7 @@ bool          am_walsender = false;           /* Am I a walsender process ? */
 /* User-settable parameters for walsender */
 int                    max_wal_senders = 0;    /* the maximum number of concurrent walsenders */
 int                    WalSndDelay = 1000;     /* max sleep time between some actions */
+int                    replication_timeout = 60 * 1000;        /* maximum time to send one WAL data message */
 
 /*
  * These variables are used similarly to openLogFile/Id/Seg/Off,
@@ -95,6 +96,11 @@ static XLogRecPtr sentPtr = {0, 0};
  */
 static StringInfoData reply_message;
 
+/*
+ * Timestamp of the last receipt of the reply from the standby.
+ */
+static TimestampTz last_reply_timestamp;
+
 /* Flags set by signal handlers for later service in main loop */
 static volatile sig_atomic_t got_SIGHUP = false;
 volatile sig_atomic_t walsender_shutdown_requested = false;
@@ -113,7 +119,7 @@ static int  WalSndLoop(void);
 static void InitWalSnd(void);
 static void WalSndHandshake(void);
 static void WalSndKill(int code, Datum arg);
-static bool XLogSend(char *msgbuf, bool *caughtup);
+static void XLogSend(char *msgbuf, bool *caughtup);
 static void IdentifySystem(void);
 static void StartReplication(StartReplicationCmd * cmd);
 static void ProcessStandbyMessage(void);
@@ -469,6 +475,7 @@ ProcessRepliesIfAny(void)
 {
        unsigned char firstchar;
        int                     r;
+       int             received = false;
 
        for (;;)
        {
@@ -484,7 +491,7 @@ ProcessRepliesIfAny(void)
                if (r == 0)
                {
                        /* no data available without blocking */
-                       return;
+                       break;
                }
 
                /* Handle the very limited subset of commands expected in this phase */
@@ -495,6 +502,7 @@ ProcessRepliesIfAny(void)
                                 */
                        case 'd':
                                ProcessStandbyMessage();
+                               received = true;
                                break;
 
                                /*
@@ -510,6 +518,12 @@ ProcessRepliesIfAny(void)
                                                                firstchar)));
                }
        }
+       /*
+        * Save the last reply timestamp if we've received at least
+        * one reply.
+        */
+       if (received)
+               last_reply_timestamp = GetCurrentTimestamp();
 }
 
 /*
@@ -688,6 +702,9 @@ WalSndLoop(void)
         */
        initStringInfo(&reply_message);
 
+       /* Initialize the last reply timestamp */
+       last_reply_timestamp = GetCurrentTimestamp();
+
        /* Loop forever, unless we get an error */
        for (;;)
        {
@@ -706,19 +723,6 @@ WalSndLoop(void)
                        SyncRepInitConfig();
                }
 
-               /*
-                * When SIGUSR2 arrives, we send all outstanding logs up to the
-                * shutdown checkpoint record (i.e., the latest record) and exit.
-                */
-               if (walsender_ready_to_stop)
-               {
-                       if (!XLogSend(output_message, &caughtup))
-                               break;
-                       ProcessRepliesIfAny();
-                       if (caughtup)
-                               walsender_shutdown_requested = true;
-               }
-
                /* Normal exit from the walsender is here */
                if (walsender_shutdown_requested)
                {
@@ -730,11 +734,13 @@ WalSndLoop(void)
                }
 
                /*
-                * If we had sent all accumulated WAL in last round, nap for the
-                * configured time before retrying.
+                * If we don't have any pending data in the output buffer, try to
+                * send some more.
                 */
-               if (caughtup)
+               if (!pq_is_send_pending())
                {
+                       XLogSend(output_message, &caughtup);
+
                        /*
                         * Even if we wrote all the WAL that was available when we started
                         * sending, more might have arrived while we were sending this
@@ -742,28 +748,79 @@ WalSndLoop(void)
                         * received any signals from that time. Let's arm the latch
                         * again, and after that check that we're still up-to-date.
                         */
-                       ResetLatch(&MyWalSnd->latch);
-
-                       if (!XLogSend(output_message, &caughtup))
-                               break;
-                       if (caughtup && !got_SIGHUP && !walsender_ready_to_stop && !walsender_shutdown_requested)
+                       if (caughtup && !pq_is_send_pending())
                        {
-                               /*
-                                * XXX: We don't really need the periodic wakeups anymore,
-                                * WaitLatchOrSocket should reliably wake up as soon as
-                                * something interesting happens.
-                                */
+                               ResetLatch(&MyWalSnd->latch);
 
-                               /* Sleep */
-                               WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
-                                                                 WalSndDelay * 1000L);
+                               XLogSend(output_message, &caughtup);
                        }
                }
-               else
+
+               /* Flush pending output to the client */
+               if (pq_flush_if_writable() != 0)
+                       break;
+
+               /*
+                * When SIGUSR2 arrives, we send any outstanding logs up to the
+                * shutdown checkpoint record (i.e., the latest record) and exit.
+                */
+               if (walsender_ready_to_stop && !pq_is_send_pending())
                {
-                       /* Attempt to send the log once every loop */
-                       if (!XLogSend(output_message, &caughtup))
+                       XLogSend(output_message, &caughtup);
+                       ProcessRepliesIfAny();
+                       if (caughtup && !pq_is_send_pending())
+                               walsender_shutdown_requested = true;
+               }
+
+               if ((caughtup || pq_is_send_pending()) &&
+                       !got_SIGHUP &&
+                       !walsender_shutdown_requested)
+               {
+                       TimestampTz     finish_time;
+                       long            sleeptime;
+
+                       /* Reschedule replication timeout */
+                       if (replication_timeout > 0)
+                       {
+                               long            secs;
+                               int             usecs;
+
+                               finish_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
+                                                                                                                 replication_timeout);
+                               TimestampDifference(GetCurrentTimestamp(),
+                                                                       finish_time, &secs, &usecs);
+                               sleeptime = secs * 1000 + usecs / 1000;
+                               if (WalSndDelay < sleeptime)
+                                       sleeptime = WalSndDelay;
+                       }
+                       else
+                       {
+                               /*
+                                * XXX: Without timeout, we don't really need the periodic
+                                * wakeups anymore, WaitLatchOrSocket should reliably wake up
+                                * as soon as something interesting happens.
+                                */
+                               sleeptime = WalSndDelay;
+                       }
+
+                       /* Sleep */
+                       WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
+                                                         true, pq_is_send_pending(),
+                                                         sleeptime * 1000L);
+
+                       /* Check for replication timeout */
+                       if (replication_timeout > 0 &&
+                               GetCurrentTimestamp() >= finish_time)
+                       {
+                               /*
+                                * Since typically expiration of replication timeout means
+                                * communication problem, we don't send the error message
+                                * to the standby.
+                                */
+                               ereport(COMMERROR,
+                                               (errmsg("terminating walsender process due to replication timeout")));
                                break;
+                       }
                }
 
                /*
@@ -993,7 +1050,8 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
 
 /*
  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
- * but not yet sent to the client, and send it.
+ * but not yet sent to the client, and buffer it in the libpq output
+ * buffer.
  *
  * msgbuf is a work area in which the output message is constructed.  It's
  * passed in just so we can avoid re-palloc'ing the buffer on each cycle.
@@ -1001,10 +1059,9 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
  *
  * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
  * *caughtup is set to false.
- *
- * Returns true if OK, false if trouble.
+
  */
-static bool
+static void
 XLogSend(char *msgbuf, bool *caughtup)
 {
        XLogRecPtr      SendRqstPtr;
@@ -1027,7 +1084,7 @@ XLogSend(char *msgbuf, bool *caughtup)
        if (XLByteLE(SendRqstPtr, sentPtr))
        {
                *caughtup = true;
-               return true;
+               return;
        }
 
        /*
@@ -1099,11 +1156,7 @@ XLogSend(char *msgbuf, bool *caughtup)
 
        memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
 
-       pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
-
-       /* Flush pending output to the client */
-       if (pq_flush())
-               return false;
+       pq_putmessage_noblock('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
 
        sentPtr = endptr;
 
@@ -1127,7 +1180,7 @@ XLogSend(char *msgbuf, bool *caughtup)
                set_ps_display(activitymsg, false);
        }
 
-       return true;
+       return;
 }
 
 /* SIGHUP: set flag to re-read config file at next convenient time */
index 9ca1329e1e35edd94cf79f75cb7ea6fe01c5f990..b49bdaea9d038633d65a99dfb69ab880fba19c21 100644 (file)
@@ -1855,6 +1855,16 @@ static struct config_int ConfigureNamesInt[] =
                1000, 1, 10000, NULL, NULL
        },
 
+       {
+               {"replication_timeout", PGC_SIGHUP, WAL_REPLICATION,
+                       gettext_noop("Sets the maximum time to wait for WAL replication."),
+                       NULL,
+                       GUC_UNIT_MS
+               },
+               &replication_timeout,
+               60 * 1000, 0, INT_MAX, NULL, NULL
+       },
+
        {
                {"commit_delay", PGC_USERSET, WAL_SETTINGS,
                        gettext_noop("Sets the delay in microseconds between transaction commit and "
index ed70223f135ee197fa561ee53388f0dae4f70c3b..43481859993e18ee8d2437b720335f69ac639584 100644 (file)
 #wal_sender_delay = 1s         # walsender cycle time, 1-10000 milliseconds
 #wal_keep_segments = 0         # in logfile segments, 16MB each; 0 disables
 #vacuum_defer_cleanup_age = 0  # number of xacts by which cleanup is delayed
+#replication_timeout = 60s # in milliseconds, 0 is disabled
 
 # - Standby Servers -
 
index 8ecab6d5eedbae45e9735c3508023bbdc54eb43d..2df735f61f1773be4577ccd60e190242d0898155 100644 (file)
@@ -60,7 +60,10 @@ extern int   pq_peekbyte(void);
 extern int     pq_getbyte_if_available(unsigned char *c);
 extern int     pq_putbytes(const char *s, size_t len);
 extern int     pq_flush(void);
+extern int     pq_flush_if_writable(void);
+extern bool    pq_is_send_pending(void);
 extern int     pq_putmessage(char msgtype, const char *s, size_t len);
+extern void pq_putmessage_noblock(char msgtype, const char *s, size_t len);
 extern void pq_startcopyout(void);
 extern void pq_endcopyout(bool errorAbort);
 
index 150a71fdddfb1c36e387eabc8c7d364ff7c75082..2670a2e80679a3b97a50c71584d6c56b1241dc50 100644 (file)
@@ -98,6 +98,7 @@ extern volatile sig_atomic_t walsender_ready_to_stop;
 /* user-settable parameters */
 extern int     WalSndDelay;
 extern int     max_wal_senders;
+extern int     replication_timeout;
 
 extern int     WalSenderMain(void);
 extern void WalSndSignals(void);
index 31744ff25291c62cf7a36c1e1f48b335289f0b6c..f64e13bed2d02afe01d049f7faad6fe742e894a0 100644 (file)
@@ -40,7 +40,7 @@ extern void OwnLatch(volatile Latch *latch);
 extern void DisownLatch(volatile Latch *latch);
 extern bool WaitLatch(volatile Latch *latch, long timeout);
 extern int     WaitLatchOrSocket(volatile Latch *latch, pgsocket sock,
-                                 long timeout);
+                                 bool forRead, bool forWrite, long timeout);
 extern void SetLatch(volatile Latch *latch);
 extern void ResetLatch(volatile Latch *latch);
 #define TestLatch(latch) (((volatile Latch *) latch)->is_set)