From 754baa21f723255272c24dc5f9ab456858e361e3 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Wed, 30 Mar 2011 10:10:32 +0300 Subject: [PATCH] Automatically terminate replication connections that are idle for more than replication_timeout (a new GUC) milliseconds. The TCP timeout is often too long, you want the master to notice a dead connection much sooner. People complained about that in 9.0 too, but with synchronous replication it's even more important to notice dead connections promptly. Fujii Masao and Heikki Linnakangas --- doc/src/sgml/config.sgml | 28 ++ src/backend/libpq/pqcomm.c | 240 +++++++++++++----- src/backend/port/unix_latch.c | 26 +- src/backend/port/win32/socket.c | 13 +- src/backend/port/win32_latch.c | 30 ++- src/backend/replication/walsender.c | 143 +++++++---- src/backend/utils/misc/guc.c | 10 + src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/libpq/libpq.h | 3 + src/include/replication/walsender.h | 1 + src/include/storage/latch.h | 2 +- 11 files changed, 368 insertions(+), 129 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index e0ebee63ea..217e4e781d 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -2019,6 +2019,29 @@ SET ENABLE_SEQSCAN TO OFF; + + + replication_timeout (integer) + + replication_timeout configuration parameter + + + + Terminate replication connections that are inactive longer + than the specified number of milliseconds. This is useful for + the primary server to detect a standby crash or network outage. + A value of zero means wait forever. This parameter can only be set in + the postgresql.conf file or on the server command line. + The default value is 60 seconds. + + + To prevent connections from being terminated prematurely, + + must be enabled on the standby, and its value must be less than the + value of replication_timeout. + + + @@ -2216,6 +2239,11 @@ SET ENABLE_SEQSCAN TO OFF; the postgresql.conf file or on the server command line. The default value is 10 seconds. + + When is enabled on the primary, + wal_receiver_status_interval must be enabled, and its value + must be less than the value of replication_timeout. + diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 3c7b05ba8a..724d3ae894 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -55,10 +55,12 @@ * pq_peekbyte - peek at next byte from connection * pq_putbytes - send bytes to connection (not flushed until pq_flush) * pq_flush - flush pending output + * pq_flush_if_writable - flush pending output if writable without blocking * pq_getbyte_if_available - get a byte if available without blocking * * message-level I/O (and old-style-COPY-OUT cruft): * pq_putmessage - send a normal message (suppressed in COPY OUT mode) + * pq_putmessage_noblock - buffer a normal message (suppressed in COPY OUT) * pq_startcopyout - inform libpq that a COPY OUT transfer is beginning * pq_endcopyout - end a COPY OUT transfer * @@ -92,6 +94,7 @@ #include "miscadmin.h" #include "storage/ipc.h" #include "utils/guc.h" +#include "utils/memutils.h" /* * Configuration options @@ -105,15 +108,21 @@ static char sock_path[MAXPGPATH]; /* - * Buffers for low-level I/O + * Buffers for low-level I/O. + * + * The receive buffer is fixed size. Send buffer is usually 8k, but can be + * enlarged by pq_putmessage_noblock() if the message doesn't fit otherwise. */ -#define PQ_BUFFER_SIZE 8192 +#define PQ_SEND_BUFFER_SIZE 8192 +#define PQ_RECV_BUFFER_SIZE 8192 -static char PqSendBuffer[PQ_BUFFER_SIZE]; +static char *PqSendBuffer; +static int PqSendBufferSize; /* Size send buffer */ static int PqSendPointer; /* Next index to store a byte in PqSendBuffer */ +static int PqSendStart; /* Next index to send a byte in PqSendBuffer */ -static char PqRecvBuffer[PQ_BUFFER_SIZE]; +static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE]; static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */ static int PqRecvLength; /* End of data available in PqRecvBuffer */ @@ -128,6 +137,7 @@ static bool DoingCopyOut; static void pq_close(int code, Datum arg); static int internal_putbytes(const char *s, size_t len); static int internal_flush(void); +static void pq_set_nonblocking(bool nonblocking); #ifdef HAVE_UNIX_SOCKETS static int Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName); @@ -142,7 +152,9 @@ static int Setup_AF_UNIX(void); void pq_init(void) { - PqSendPointer = PqRecvPointer = PqRecvLength = 0; + PqSendBufferSize = PQ_SEND_BUFFER_SIZE; + PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize); + PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0; PqCommBusy = false; DoingCopyOut = false; on_proc_exit(pq_close, 0); @@ -732,6 +744,42 @@ TouchSocketFile(void) * -------------------------------- */ +/* -------------------------------- + * pq_set_nonblocking - set socket blocking/non-blocking + * + * Sets the socket non-blocking if nonblocking is TRUE, or sets it + * blocking otherwise. + * -------------------------------- + */ +static void +pq_set_nonblocking(bool nonblocking) +{ + if (MyProcPort->noblock == nonblocking) + return; + +#ifdef WIN32 + pgwin32_noblock = nonblocking ? 1 : 0; +#else + /* + * Use COMMERROR on failure, because ERROR would try to send the error + * to the client, which might require changing the mode again, leading + * to infinite recursion. + */ + if (nonblocking) + { + if (!pg_set_noblock(MyProcPort->sock)) + ereport(COMMERROR, + (errmsg("could not set socket to non-blocking mode: %m"))); + } + else + { + if (!pg_set_block(MyProcPort->sock)) + ereport(COMMERROR, + (errmsg("could not set socket to blocking mode: %m"))); + } +#endif + MyProcPort->noblock = nonblocking; +} /* -------------------------------- * pq_recvbuf - load some bytes into the input buffer @@ -756,13 +804,16 @@ pq_recvbuf(void) PqRecvLength = PqRecvPointer = 0; } + /* Ensure that we're in blocking mode */ + pq_set_nonblocking(false); + /* Can fill buffer from PqRecvLength and upwards */ for (;;) { int r; r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength, - PQ_BUFFER_SIZE - PqRecvLength); + PQ_RECV_BUFFER_SIZE - PqRecvLength); if (r < 0) { @@ -825,7 +876,6 @@ pq_peekbyte(void) return (unsigned char) PqRecvBuffer[PqRecvPointer]; } - /* -------------------------------- * pq_getbyte_if_available - get a single byte from connection, * if available @@ -845,72 +895,38 @@ pq_getbyte_if_available(unsigned char *c) return 1; } - /* Temporarily put the socket into non-blocking mode */ -#ifdef WIN32 - pgwin32_noblock = 1; -#else - if (!pg_set_noblock(MyProcPort->sock)) - ereport(ERROR, - (errmsg("could not set socket to non-blocking mode: %m"))); -#endif - MyProcPort->noblock = true; - PG_TRY(); + /* Put the socket into non-blocking mode */ + pq_set_nonblocking(true); + + r = secure_read(MyProcPort, c, 1); + if (r < 0) { - r = secure_read(MyProcPort, c, 1); - if (r < 0) + /* + * Ok if no data available without blocking or interrupted (though + * EINTR really shouldn't happen with a non-blocking socket). + * Report other errors. + */ + if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) + r = 0; + else { /* - * Ok if no data available without blocking or interrupted (though - * EINTR really shouldn't happen with a non-blocking socket). - * Report other errors. + * Careful: an ereport() that tries to write to the client + * would cause recursion to here, leading to stack overflow + * and core dump! This message must go *only* to the + * postmaster log. */ - if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) - r = 0; - else - { - /* - * Careful: an ereport() that tries to write to the client - * would cause recursion to here, leading to stack overflow - * and core dump! This message must go *only* to the - * postmaster log. - */ - ereport(COMMERROR, - (errcode_for_socket_access(), - errmsg("could not receive data from client: %m"))); - r = EOF; - } - } - else if (r == 0) - { - /* EOF detected */ + ereport(COMMERROR, + (errcode_for_socket_access(), + errmsg("could not receive data from client: %m"))); r = EOF; } } - PG_CATCH(); + else if (r == 0) { - /* - * The rest of the backend code assumes the socket is in blocking - * mode, so treat failure as FATAL. - */ -#ifdef WIN32 - pgwin32_noblock = 0; -#else - if (!pg_set_block(MyProcPort->sock)) - ereport(FATAL, - (errmsg("could not set socket to blocking mode: %m"))); -#endif - MyProcPort->noblock = false; - PG_RE_THROW(); + /* EOF detected */ + r = EOF; } - PG_END_TRY(); -#ifdef WIN32 - pgwin32_noblock = 0; -#else - if (!pg_set_block(MyProcPort->sock)) - ereport(FATAL, - (errmsg("could not set socket to blocking mode: %m"))); -#endif - MyProcPort->noblock = false; return r; } @@ -1138,10 +1154,13 @@ internal_putbytes(const char *s, size_t len) while (len > 0) { /* If buffer is full, then flush it out */ - if (PqSendPointer >= PQ_BUFFER_SIZE) + if (PqSendPointer >= PqSendBufferSize) + { + pq_set_nonblocking(false); if (internal_flush()) return EOF; - amount = PQ_BUFFER_SIZE - PqSendPointer; + } + amount = PqSendBufferSize - PqSendPointer; if (amount > len) amount = len; memcpy(PqSendBuffer + PqSendPointer, s, amount); @@ -1167,17 +1186,25 @@ pq_flush(void) if (PqCommBusy) return 0; PqCommBusy = true; + pq_set_nonblocking(false); res = internal_flush(); PqCommBusy = false; return res; } +/* -------------------------------- + * internal_flush - flush pending output + * + * Returns 0 if OK (meaning everything was sent, or operation would block + * and the socket is in non-blocking mode), or EOF if trouble. + * -------------------------------- + */ static int internal_flush(void) { static int last_reported_send_errno = 0; - char *bufptr = PqSendBuffer; + char *bufptr = PqSendBuffer + PqSendStart; char *bufend = PqSendBuffer + PqSendPointer; while (bufptr < bufend) @@ -1191,6 +1218,16 @@ internal_flush(void) if (errno == EINTR) continue; /* Ok if we were interrupted */ + /* + * Ok if no data writable without blocking, and the socket + * is in non-blocking mode. + */ + if (errno == EAGAIN || + errno == EWOULDBLOCK) + { + return 0; + } + /* * Careful: an ereport() that tries to write to the client would * cause recursion to here, leading to stack overflow and core @@ -1212,18 +1249,56 @@ internal_flush(void) * We drop the buffered data anyway so that processing can * continue, even though we'll probably quit soon. */ - PqSendPointer = 0; + PqSendStart = PqSendPointer = 0; return EOF; } last_reported_send_errno = 0; /* reset after any successful send */ bufptr += r; + PqSendStart += r; } - PqSendPointer = 0; + PqSendStart = PqSendPointer = 0; return 0; } +/* -------------------------------- + * pq_flush_if_writable - flush pending output if writable without blocking + * + * Returns 0 if OK, or EOF if trouble. + * -------------------------------- + */ +int +pq_flush_if_writable(void) +{ + int res; + + /* Quick exit if nothing to do */ + if (PqSendPointer == PqSendStart) + return 0; + + /* No-op if reentrant call */ + if (PqCommBusy) + return 0; + + /* Temporarily put the socket into non-blocking mode */ + pq_set_nonblocking(true); + + PqCommBusy = true; + res = internal_flush(); + PqCommBusy = false; + return res; +} + +/* -------------------------------- + * pq_is_send_pending - is there any pending data in the output buffer? + * -------------------------------- + */ +bool +pq_is_send_pending(void) +{ + return (PqSendStart < PqSendPointer); +} /* -------------------------------- * Message-level I/O routines begin here. @@ -1285,6 +1360,33 @@ fail: return EOF; } +/* -------------------------------- + * pq_putmessage_noblock - like pq_putmessage, but never blocks + * + * If the output buffer is too small to hold the message, the buffer + * is enlarged. + */ +void +pq_putmessage_noblock(char msgtype, const char *s, size_t len) +{ + int res; + int required; + + /* + * Ensure we have enough space in the output buffer for the message header + * as well as the message itself. + */ + required = PqSendPointer + 1 + 4 + len; + if (required > PqSendBufferSize) + { + PqSendBuffer = repalloc(PqSendBuffer, required); + PqSendBufferSize = required; + } + res = pq_putmessage(msgtype, s, len); + Assert(res == 0); /* should not fail when the message fits in buffer */ +} + + /* -------------------------------- * pq_startcopyout - inform libpq that an old-style COPY OUT transfer * is beginning diff --git a/src/backend/port/unix_latch.c b/src/backend/port/unix_latch.c index a4f559ed3f..32d0cb5e3f 100644 --- a/src/backend/port/unix_latch.c +++ b/src/backend/port/unix_latch.c @@ -193,19 +193,21 @@ DisownLatch(volatile Latch *latch) bool WaitLatch(volatile Latch *latch, long timeout) { - return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0; + return WaitLatchOrSocket(latch, PGINVALID_SOCKET, false, false, timeout) > 0; } /* * Like WaitLatch, but will also return when there's data available in - * 'sock' for reading. Returns 0 if timeout was reached, 1 if the latch - * was set, or 2 if the scoket became readable. + * 'sock' for reading or writing. Returns 0 if timeout was reached, + * 1 if the latch was set, 2 if the socket became readable or writable. */ int -WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout) +WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, bool forRead, + bool forWrite, long timeout) { struct timeval tv, *tvp = NULL; fd_set input_mask; + fd_set output_mask; int rc; int result = 0; @@ -241,14 +243,22 @@ WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout) FD_ZERO(&input_mask); FD_SET(selfpipe_readfd, &input_mask); hifd = selfpipe_readfd; - if (sock != PGINVALID_SOCKET) + if (sock != PGINVALID_SOCKET && forRead) { FD_SET(sock, &input_mask); if (sock > hifd) hifd = sock; } - rc = select(hifd + 1, &input_mask, NULL, NULL, tvp); + FD_ZERO(&output_mask); + if (sock != PGINVALID_SOCKET && forWrite) + { + FD_SET(sock, &output_mask); + if (sock > hifd) + hifd = sock; + } + + rc = select(hifd + 1, &input_mask, &output_mask, NULL, tvp); if (rc < 0) { if (errno == EINTR) @@ -263,7 +273,9 @@ WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout) result = 0; break; } - if (sock != PGINVALID_SOCKET && FD_ISSET(sock, &input_mask)) + if (sock != PGINVALID_SOCKET && + ((forRead && FD_ISSET(sock, &input_mask)) || + (forWrite && FD_ISSET(sock, &output_mask)))) { result = 2; break; /* data available in socket */ diff --git a/src/backend/port/win32/socket.c b/src/backend/port/win32/socket.c index 76dd6be9a6..dbbd4a35d1 100644 --- a/src/backend/port/win32/socket.c +++ b/src/backend/port/win32/socket.c @@ -14,7 +14,8 @@ #include "postgres.h" /* - * Indicate if pgwin32_recv() should operate in non-blocking mode. + * Indicate if pgwin32_recv() and pgwin32_send() should operate + * in non-blocking mode. * * Since the socket emulation layer always sets the actual socket to * non-blocking mode in order to be able to deliver signals, we must @@ -399,6 +400,16 @@ pgwin32_send(SOCKET s, char *buf, int len, int flags) return -1; } + if (pgwin32_noblock) + { + /* + * No data sent, and we are in "emulated non-blocking mode", so + * return indicating that we'd block if we were to continue. + */ + errno = EWOULDBLOCK; + return -1; + } + /* No error, zero bytes (win2000+) or error+WSAEWOULDBLOCK (<=nt4) */ if (pgwin32_waitforsinglesocket(s, FD_WRITE | FD_CLOSE, INFINITE) == 0) diff --git a/src/backend/port/win32_latch.c b/src/backend/port/win32_latch.c index ac20c4958f..f42cfef40e 100644 --- a/src/backend/port/win32_latch.c +++ b/src/backend/port/win32_latch.c @@ -85,11 +85,12 @@ DisownLatch(volatile Latch *latch) bool WaitLatch(volatile Latch *latch, long timeout) { - return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0; + return WaitLatchOrSocket(latch, PGINVALID_SOCKET, false, false, timeout) > 0; } int -WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout) +WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, bool forRead, + bool forWrite, long timeout) { DWORD rc; HANDLE events[3]; @@ -103,10 +104,17 @@ WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout) events[0] = latchevent; events[1] = pgwin32_signal_event; numevents = 2; - if (sock != PGINVALID_SOCKET) + if (sock != PGINVALID_SOCKET && (forRead || forWrite)) { + int flags = 0; + + if (forRead) + flags |= FD_READ; + if (forWrite) + flags |= FD_WRITE; + sockevent = WSACreateEvent(); - WSAEventSelect(sock, sockevent, FD_READ); + WSAEventSelect(sock, sockevent, flags); events[numevents++] = sockevent; } @@ -139,8 +147,18 @@ WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout) pgwin32_dispatch_queued_signals(); else if (rc == WAIT_OBJECT_0 + 2) { + WSANETWORKEVENTS resEvents; + Assert(sock != PGINVALID_SOCKET); - result = 2; + + ZeroMemory(&resEvents, sizeof(resEvents)); + if (WSAEnumNetworkEvents(sock, sockevent, &resEvents) == SOCKET_ERROR) + ereport(FATAL, + (errmsg_internal("failed to enumerate network events: %i", (int) GetLastError()))); + + if ((forRead && resEvents.lNetworkEvents & FD_READ) || + (forWrite && resEvents.lNetworkEvents & FD_WRITE)) + result = 2; break; } else if (rc != WAIT_OBJECT_0) @@ -148,7 +166,7 @@ WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout) } /* Clean up the handle we created for the socket */ - if (sock != PGINVALID_SOCKET) + if (sock != PGINVALID_SOCKET && (forRead || forWrite)) { WSAEventSelect(sock, sockevent, 0); WSACloseEvent(sockevent); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 2e2659a8c9..363f6181ad 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -74,6 +74,7 @@ bool am_walsender = false; /* Am I a walsender process ? */ /* User-settable parameters for walsender */ int max_wal_senders = 0; /* the maximum number of concurrent walsenders */ int WalSndDelay = 1000; /* max sleep time between some actions */ +int replication_timeout = 60 * 1000; /* maximum time to send one WAL data message */ /* * These variables are used similarly to openLogFile/Id/Seg/Off, @@ -95,6 +96,11 @@ static XLogRecPtr sentPtr = {0, 0}; */ static StringInfoData reply_message; +/* + * Timestamp of the last receipt of the reply from the standby. + */ +static TimestampTz last_reply_timestamp; + /* Flags set by signal handlers for later service in main loop */ static volatile sig_atomic_t got_SIGHUP = false; volatile sig_atomic_t walsender_shutdown_requested = false; @@ -113,7 +119,7 @@ static int WalSndLoop(void); static void InitWalSnd(void); static void WalSndHandshake(void); static void WalSndKill(int code, Datum arg); -static bool XLogSend(char *msgbuf, bool *caughtup); +static void XLogSend(char *msgbuf, bool *caughtup); static void IdentifySystem(void); static void StartReplication(StartReplicationCmd * cmd); static void ProcessStandbyMessage(void); @@ -469,6 +475,7 @@ ProcessRepliesIfAny(void) { unsigned char firstchar; int r; + int received = false; for (;;) { @@ -484,7 +491,7 @@ ProcessRepliesIfAny(void) if (r == 0) { /* no data available without blocking */ - return; + break; } /* Handle the very limited subset of commands expected in this phase */ @@ -495,6 +502,7 @@ ProcessRepliesIfAny(void) */ case 'd': ProcessStandbyMessage(); + received = true; break; /* @@ -510,6 +518,12 @@ ProcessRepliesIfAny(void) firstchar))); } } + /* + * Save the last reply timestamp if we've received at least + * one reply. + */ + if (received) + last_reply_timestamp = GetCurrentTimestamp(); } /* @@ -688,6 +702,9 @@ WalSndLoop(void) */ initStringInfo(&reply_message); + /* Initialize the last reply timestamp */ + last_reply_timestamp = GetCurrentTimestamp(); + /* Loop forever, unless we get an error */ for (;;) { @@ -706,19 +723,6 @@ WalSndLoop(void) SyncRepInitConfig(); } - /* - * When SIGUSR2 arrives, we send all outstanding logs up to the - * shutdown checkpoint record (i.e., the latest record) and exit. - */ - if (walsender_ready_to_stop) - { - if (!XLogSend(output_message, &caughtup)) - break; - ProcessRepliesIfAny(); - if (caughtup) - walsender_shutdown_requested = true; - } - /* Normal exit from the walsender is here */ if (walsender_shutdown_requested) { @@ -730,11 +734,13 @@ WalSndLoop(void) } /* - * If we had sent all accumulated WAL in last round, nap for the - * configured time before retrying. + * If we don't have any pending data in the output buffer, try to + * send some more. */ - if (caughtup) + if (!pq_is_send_pending()) { + XLogSend(output_message, &caughtup); + /* * Even if we wrote all the WAL that was available when we started * sending, more might have arrived while we were sending this @@ -742,28 +748,79 @@ WalSndLoop(void) * received any signals from that time. Let's arm the latch * again, and after that check that we're still up-to-date. */ - ResetLatch(&MyWalSnd->latch); - - if (!XLogSend(output_message, &caughtup)) - break; - if (caughtup && !got_SIGHUP && !walsender_ready_to_stop && !walsender_shutdown_requested) + if (caughtup && !pq_is_send_pending()) { - /* - * XXX: We don't really need the periodic wakeups anymore, - * WaitLatchOrSocket should reliably wake up as soon as - * something interesting happens. - */ + ResetLatch(&MyWalSnd->latch); - /* Sleep */ - WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock, - WalSndDelay * 1000L); + XLogSend(output_message, &caughtup); } } - else + + /* Flush pending output to the client */ + if (pq_flush_if_writable() != 0) + break; + + /* + * When SIGUSR2 arrives, we send any outstanding logs up to the + * shutdown checkpoint record (i.e., the latest record) and exit. + */ + if (walsender_ready_to_stop && !pq_is_send_pending()) { - /* Attempt to send the log once every loop */ - if (!XLogSend(output_message, &caughtup)) + XLogSend(output_message, &caughtup); + ProcessRepliesIfAny(); + if (caughtup && !pq_is_send_pending()) + walsender_shutdown_requested = true; + } + + if ((caughtup || pq_is_send_pending()) && + !got_SIGHUP && + !walsender_shutdown_requested) + { + TimestampTz finish_time; + long sleeptime; + + /* Reschedule replication timeout */ + if (replication_timeout > 0) + { + long secs; + int usecs; + + finish_time = TimestampTzPlusMilliseconds(last_reply_timestamp, + replication_timeout); + TimestampDifference(GetCurrentTimestamp(), + finish_time, &secs, &usecs); + sleeptime = secs * 1000 + usecs / 1000; + if (WalSndDelay < sleeptime) + sleeptime = WalSndDelay; + } + else + { + /* + * XXX: Without timeout, we don't really need the periodic + * wakeups anymore, WaitLatchOrSocket should reliably wake up + * as soon as something interesting happens. + */ + sleeptime = WalSndDelay; + } + + /* Sleep */ + WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock, + true, pq_is_send_pending(), + sleeptime * 1000L); + + /* Check for replication timeout */ + if (replication_timeout > 0 && + GetCurrentTimestamp() >= finish_time) + { + /* + * Since typically expiration of replication timeout means + * communication problem, we don't send the error message + * to the standby. + */ + ereport(COMMERROR, + (errmsg("terminating walsender process due to replication timeout"))); break; + } } /* @@ -993,7 +1050,8 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) /* * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk, - * but not yet sent to the client, and send it. + * but not yet sent to the client, and buffer it in the libpq output + * buffer. * * msgbuf is a work area in which the output message is constructed. It's * passed in just so we can avoid re-palloc'ing the buffer on each cycle. @@ -1001,10 +1059,9 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) * * If there is no unsent WAL remaining, *caughtup is set to true, otherwise * *caughtup is set to false. - * - * Returns true if OK, false if trouble. + */ -static bool +static void XLogSend(char *msgbuf, bool *caughtup) { XLogRecPtr SendRqstPtr; @@ -1027,7 +1084,7 @@ XLogSend(char *msgbuf, bool *caughtup) if (XLByteLE(SendRqstPtr, sentPtr)) { *caughtup = true; - return true; + return; } /* @@ -1099,11 +1156,7 @@ XLogSend(char *msgbuf, bool *caughtup) memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader)); - pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes); - - /* Flush pending output to the client */ - if (pq_flush()) - return false; + pq_putmessage_noblock('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes); sentPtr = endptr; @@ -1127,7 +1180,7 @@ XLogSend(char *msgbuf, bool *caughtup) set_ps_display(activitymsg, false); } - return true; + return; } /* SIGHUP: set flag to re-read config file at next convenient time */ diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 9ca1329e1e..b49bdaea9d 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -1855,6 +1855,16 @@ static struct config_int ConfigureNamesInt[] = 1000, 1, 10000, NULL, NULL }, + { + {"replication_timeout", PGC_SIGHUP, WAL_REPLICATION, + gettext_noop("Sets the maximum time to wait for WAL replication."), + NULL, + GUC_UNIT_MS + }, + &replication_timeout, + 60 * 1000, 0, INT_MAX, NULL, NULL + }, + { {"commit_delay", PGC_USERSET, WAL_SETTINGS, gettext_noop("Sets the delay in microseconds between transaction commit and " diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index ed70223f13..4348185999 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -200,6 +200,7 @@ #wal_sender_delay = 1s # walsender cycle time, 1-10000 milliseconds #wal_keep_segments = 0 # in logfile segments, 16MB each; 0 disables #vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed +#replication_timeout = 60s # in milliseconds, 0 is disabled # - Standby Servers - diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h index 8ecab6d5ee..2df735f61f 100644 --- a/src/include/libpq/libpq.h +++ b/src/include/libpq/libpq.h @@ -60,7 +60,10 @@ extern int pq_peekbyte(void); extern int pq_getbyte_if_available(unsigned char *c); extern int pq_putbytes(const char *s, size_t len); extern int pq_flush(void); +extern int pq_flush_if_writable(void); +extern bool pq_is_send_pending(void); extern int pq_putmessage(char msgtype, const char *s, size_t len); +extern void pq_putmessage_noblock(char msgtype, const char *s, size_t len); extern void pq_startcopyout(void); extern void pq_endcopyout(bool errorAbort); diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 150a71fddd..2670a2e806 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -98,6 +98,7 @@ extern volatile sig_atomic_t walsender_ready_to_stop; /* user-settable parameters */ extern int WalSndDelay; extern int max_wal_senders; +extern int replication_timeout; extern int WalSenderMain(void); extern void WalSndSignals(void); diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h index 31744ff252..f64e13bed2 100644 --- a/src/include/storage/latch.h +++ b/src/include/storage/latch.h @@ -40,7 +40,7 @@ extern void OwnLatch(volatile Latch *latch); extern void DisownLatch(volatile Latch *latch); extern bool WaitLatch(volatile Latch *latch, long timeout); extern int WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, - long timeout); + bool forRead, bool forWrite, long timeout); extern void SetLatch(volatile Latch *latch); extern void ResetLatch(volatile Latch *latch); #define TestLatch(latch) (((volatile Latch *) latch)->is_set) -- 2.40.0