]> granicus.if.org Git - postgresql/blobdiff - src/backend/replication/walsender.c
Introduce timeout handling framework
[postgresql] / src / backend / replication / walsender.c
index dde773d79e18de8122d372abc59e0f967a639698..37a030b5f5e4e1536c44227672be05ffcff88537 100644 (file)
@@ -25,7 +25,7 @@
  * shutdown checkpoint record, and then exit.
  *
  *
- * Portions Copyright (c) 2010-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
  *
  * IDENTIFICATION
  *       src/backend/replication/walsender.c
 #include <signal.h>
 #include <unistd.h>
 
-#include "funcapi.h"
-#include "access/xlog_internal.h"
 #include "access/transam.h"
+#include "access/xlog_internal.h"
 #include "catalog/pg_type.h"
+#include "funcapi.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
+#include "nodes/replnodes.h"
 #include "replication/basebackup.h"
-#include "replication/replnodes.h"
+#include "replication/syncrep.h"
 #include "replication/walprotocol.h"
+#include "replication/walreceiver.h"
 #include "replication/walsender.h"
+#include "replication/walsender_private.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/pmsignal.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
 #include "utils/resowner.h"
+#include "utils/timeout.h"
+#include "utils/timestamp.h"
 
 
 /* Array of WalSnds in shared memory */
 WalSndCtlData *WalSndCtl = NULL;
 
 /* My slot in the shared memory array */
-WalSnd *MyWalSnd = NULL;
+WalSnd    *MyWalSnd = NULL;
 
 /* Global state */
 bool           am_walsender = false;           /* Am I a walsender process ? */
+bool           am_cascading_walsender = false;         /* Am I cascading WAL to
+                                                                                                * another standby ? */
 
 /* User-settable parameters for walsender */
 int                    max_wal_senders = 0;    /* the maximum number of concurrent walsenders */
-int                    WalSndDelay = 1000;     /* max sleep time between some actions */
-int                    replication_timeout = 60 * 1000;        /* maximum time to send one WAL data message */
+int                    replication_timeout = 60 * 1000;        /* maximum time to send one
+                                                                                                * WAL data message */
+/*
+ * State for WalSndWakeupRequest
+ */
+bool wake_wal_senders = false;
 
 /*
  * These variables are used similarly to openLogFile/Id/Seg/Off,
  * but for walsender to read the XLOG.
  */
 static int     sendFile = -1;
-static uint32 sendId = 0;
-static uint32 sendSeg = 0;
+static XLogSegNo sendSegNo = 0;
 static uint32 sendOff = 0;
 
 /*
  * How far have we sent WAL already? This is also advertised in
  * MyWalSnd->sentPtr.  (Actually, this is the next WAL location to send.)
  */
-static XLogRecPtr sentPtr = {0, 0};
+static XLogRecPtr sentPtr = 0;
 
 /*
  * Buffer for processing reply messages.
@@ -115,29 +125,27 @@ static void WalSndLastCycleHandler(SIGNAL_ARGS);
 
 /* Prototypes for private functions */
 static bool HandleReplicationCommand(const char *cmd_string);
-static int     WalSndLoop(void);
+static void WalSndLoop(void) __attribute__((noreturn));
 static void InitWalSnd(void);
 static void WalSndHandshake(void);
 static void WalSndKill(int code, Datum arg);
 static void XLogSend(char *msgbuf, bool *caughtup);
 static void IdentifySystem(void);
-static void StartReplication(StartReplicationCmd * cmd);
+static void StartReplication(StartReplicationCmd *cmd);
 static void ProcessStandbyMessage(void);
 static void ProcessStandbyReplyMessage(void);
 static void ProcessStandbyHSFeedbackMessage(void);
 static void ProcessRepliesIfAny(void);
+static void WalSndKeepalive(char *msgbuf);
 
 
 /* Main entry point for walsender process */
-int
+void
 WalSenderMain(void)
 {
        MemoryContext walsnd_context;
 
-       if (RecoveryInProgress())
-               ereport(FATAL,
-                               (errcode(ERRCODE_CANNOT_CONNECT_NOW),
-                                errmsg("recovery is still in progress, can't accept WAL streaming connections")));
+       am_cascading_walsender = RecoveryInProgress();
 
        /* Create a per-walsender data structure in shared memory */
        InitWalSnd();
@@ -164,6 +172,12 @@ WalSenderMain(void)
        /* Unblock signals (they were blocked when the postmaster forked us) */
        PG_SETMASK(&UnBlockSig);
 
+       /*
+        * Use the recovery target timeline ID during recovery
+        */
+       if (am_cascading_walsender)
+               ThisTimeLineID = GetRecoveryTargetTLI();
+
        /* Tell the standby that walsender is ready for receiving commands */
        ReadyForQuery(DestRemote);
 
@@ -183,7 +197,7 @@ WalSenderMain(void)
        SyncRepInitConfig();
 
        /* Main loop of walsender */
-       return WalSndLoop();
+       WalSndLoop();
 }
 
 /*
@@ -211,7 +225,7 @@ WalSndHandshake(void)
                 * Emergency bailout if postmaster has died.  This is to avoid the
                 * necessity for manual cleanup of all postmaster children.
                 */
-               if (!PostmasterIsAlive(true))
+               if (!PostmasterIsAlive())
                        exit(1);
 
                /*
@@ -281,18 +295,17 @@ IdentifySystem(void)
        XLogRecPtr      logptr;
 
        /*
-        * Reply with a result set with one row, three columns. First col is system
-        * ID, second is timeline ID, and third is current xlog location.
+        * Reply with a result set with one row, three columns. First col is
+        * system ID, second is timeline ID, and third is current xlog location.
         */
 
        snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
                         GetSystemIdentifier());
        snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
 
-       logptr = GetInsertRecPtr();
+       logptr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetInsertRecPtr();
 
-       snprintf(xpos, sizeof(xpos), "%X/%X",
-                        logptr.xlogid, logptr.xrecoff);
+       snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
 
        /* Send a RowDescription message */
        pq_beginmessage(&buf, 'T');
@@ -348,42 +361,65 @@ IdentifySystem(void)
  * START_REPLICATION
  */
 static void
-StartReplication(StartReplicationCmd * cmd)
+StartReplication(StartReplicationCmd *cmd)
 {
        StringInfoData buf;
 
        /*
-        * Let postmaster know that we're streaming. Once we've declared us as
-        * a WAL sender process, postmaster will let us outlive the bgwriter and
-        * kill us last in the shutdown sequence, so we get a chance to stream
-        * all remaining WAL at shutdown, including the shutdown checkpoint.
-        * Note that there's no going back, and we mustn't write any WAL records
-        * after this.
+        * Let postmaster know that we're streaming. Once we've declared us as a
+        * WAL sender process, postmaster will let us outlive the bgwriter and
+        * kill us last in the shutdown sequence, so we get a chance to stream all
+        * remaining WAL at shutdown, including the shutdown checkpoint. Note that
+        * there's no going back, and we mustn't write any WAL records after this.
         */
        MarkPostmasterChildWalSender();
+       SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
 
        /*
-        * Check that we're logging enough information in the WAL for
-        * log-shipping.
+        * When promoting a cascading standby, postmaster sends SIGUSR2 to any
+        * cascading walsenders to kill them. But there is a corner-case where
+        * such walsender fails to receive SIGUSR2 and survives a standby
+        * promotion unexpectedly. This happens when postmaster sends SIGUSR2
+        * before the walsender marks itself as a WAL sender, because postmaster
+        * sends SIGUSR2 to only the processes marked as a WAL sender.
         *
-        * NOTE: This only checks the current value of wal_level. Even if the
-        * current setting is not 'minimal', there can be old WAL in the pg_xlog
-        * directory that was created with 'minimal'. So this is not bulletproof,
-        * the purpose is just to give a user-friendly error message that hints
-        * how to configure the system correctly.
+        * To avoid this corner-case, if recovery is NOT in progress even though
+        * the walsender is cascading one, we do the same thing as SIGUSR2 signal
+        * handler does, i.e., set walsender_ready_to_stop to true. Which causes
+        * the walsender to end later.
+        *
+        * When terminating cascading walsenders, usually postmaster writes the
+        * log message announcing the terminations. But there is a race condition
+        * here. If there is no walsender except this process before reaching
+        * here, postmaster thinks that there is no walsender and suppresses that
+        * log message. To handle this case, we always emit that log message here.
+        * This might cause duplicate log messages, but which is less likely to
+        * happen, so it's not worth writing some code to suppress them.
+        */
+       if (am_cascading_walsender && !RecoveryInProgress())
+       {
+               ereport(LOG,
+                  (errmsg("terminating walsender process to force cascaded standby "
+                                  "to update timeline and reconnect")));
+               walsender_ready_to_stop = true;
+       }
+
+       /*
+        * We assume here that we're logging enough information in the WAL for
+        * log-shipping, since this is checked in PostmasterMain().
+        *
+        * NOTE: wal_level can only change at shutdown, so in most cases it is
+        * difficult for there to be WAL data that we can still see that was
+        * written at wal_level='minimal'.
         */
-       if (wal_level == WAL_LEVEL_MINIMAL)
-               ereport(FATAL,
-                               (errcode(ERRCODE_CANNOT_CONNECT_NOW),
-               errmsg("standby connections not allowed because wal_level=minimal")));
 
        /*
         * When we first start replication the standby will be behind the primary.
         * For some applications, for example, synchronous replication, it is
         * important to have a clear state for this initial catchup mode, so we
         * can trigger actions when we change streaming state later. We may stay
-        * in this state for a long time, which is exactly why we want to be
-        * able to monitor whether or not we are still here.
+        * in this state for a long time, which is exactly why we want to be able
+        * to monitor whether or not we are still here.
         */
        WalSndSetState(WALSNDSTATE_CATCHUP);
 
@@ -475,7 +511,7 @@ ProcessRepliesIfAny(void)
 {
        unsigned char firstchar;
        int                     r;
-       int             received = false;
+       bool            received = false;
 
        for (;;)
        {
@@ -514,13 +550,13 @@ ProcessRepliesIfAny(void)
                        default:
                                ereport(FATAL,
                                                (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                                                errmsg("invalid standby message type %d",
+                                                errmsg("invalid standby message type \"%c\"",
                                                                firstchar)));
                }
        }
+
        /*
-        * Save the last reply timestamp if we've received at least
-        * one reply.
+        * Save the last reply timestamp if we've received at least one reply.
         */
        if (received)
                last_reply_timestamp = GetCurrentTimestamp();
@@ -532,7 +568,7 @@ ProcessRepliesIfAny(void)
 static void
 ProcessStandbyMessage(void)
 {
-       char msgtype;
+       char            msgtype;
 
        resetStringInfo(&reply_message);
 
@@ -565,7 +601,7 @@ ProcessStandbyMessage(void)
                default:
                        ereport(COMMERROR,
                                        (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                                        errmsg("unexpected message type %c", msgtype)));
+                                        errmsg("unexpected message type \"%c\"", msgtype)));
                        proc_exit(0);
        }
 }
@@ -576,18 +612,18 @@ ProcessStandbyMessage(void)
 static void
 ProcessStandbyReplyMessage(void)
 {
-       StandbyReplyMessage     reply;
+       StandbyReplyMessage reply;
 
        pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyReplyMessage));
 
        elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X",
-                reply.write.xlogid, reply.write.xrecoff,
-                reply.flush.xlogid, reply.flush.xrecoff,
-                reply.apply.xlogid, reply.apply.xrecoff);
+                (uint32) (reply.write >> 32), (uint32) reply.write,
+                (uint32) (reply.flush >> 32), (uint32) reply.flush,
+                (uint32) (reply.apply >> 32), (uint32) reply.apply);
 
        /*
-        * Update shared state for this WalSender process
-        * based on reply data from standby.
+        * Update shared state for this WalSender process based on reply data from
+        * standby.
         */
        {
                /* use volatile pointer to prevent code rearrangement */
@@ -600,7 +636,8 @@ ProcessStandbyReplyMessage(void)
                SpinLockRelease(&walsnd->mutex);
        }
 
-       SyncRepReleaseWaiters();
+       if (!am_cascading_walsender)
+               SyncRepReleaseWaiters();
 }
 
 /*
@@ -609,81 +646,72 @@ ProcessStandbyReplyMessage(void)
 static void
 ProcessStandbyHSFeedbackMessage(void)
 {
-       StandbyHSFeedbackMessage        reply;
-       TransactionId newxmin = InvalidTransactionId;
+       StandbyHSFeedbackMessage reply;
+       TransactionId nextXid;
+       uint32          nextEpoch;
 
-       pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyHSFeedbackMessage));
+       /* Decipher the reply message */
+       pq_copymsgbytes(&reply_message, (char *) &reply,
+                                       sizeof(StandbyHSFeedbackMessage));
 
        elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
                 reply.xmin,
                 reply.epoch);
 
+       /* Ignore invalid xmin (can't actually happen with current walreceiver) */
+       if (!TransactionIdIsNormal(reply.xmin))
+               return;
+
        /*
-        * Update the WalSender's proc xmin to allow it to be visible
-        * to snapshots. This will hold back the removal of dead rows
-        * and thereby prevent the generation of cleanup conflicts
-        * on the standby server.
+        * Check that the provided xmin/epoch are sane, that is, not in the future
+        * and not so far back as to be already wrapped around.  Ignore if not.
+        *
+        * Epoch of nextXid should be same as standby, or if the counter has
+        * wrapped, then one greater than standby.
         */
-       if (TransactionIdIsValid(reply.xmin))
-       {
-               TransactionId   nextXid;
-               uint32                  nextEpoch;
-               bool                    epochOK = false;
+       GetNextXidAndEpoch(&nextXid, &nextEpoch);
 
-               GetNextXidAndEpoch(&nextXid, &nextEpoch);
-
-               /*
-                * Epoch of oldestXmin should be same as standby or
-                * if the counter has wrapped, then one less than reply.
-                */
-               if (reply.xmin <= nextXid)
-               {
-                       if (reply.epoch == nextEpoch)
-                               epochOK = true;
-               }
-               else
-               {
-                       if (nextEpoch > 0 && reply.epoch == nextEpoch - 1)
-                               epochOK = true;
-               }
-
-               /*
-                * Feedback from standby must not go backwards, nor should it go
-                * forwards further than our most recent xid.
-                */
-               if (epochOK && TransactionIdPrecedesOrEquals(reply.xmin, nextXid))
-               {
-                       if (!TransactionIdIsValid(MyProc->xmin))
-                       {
-                               TransactionId oldestXmin = GetOldestXmin(true, true);
-                               if (TransactionIdPrecedes(oldestXmin, reply.xmin))
-                                       newxmin = reply.xmin;
-                               else
-                                       newxmin = oldestXmin;
-                       }
-                       else
-                       {
-                               if (TransactionIdPrecedes(MyProc->xmin, reply.xmin))
-                                       newxmin = reply.xmin;
-                               else
-                                       newxmin = MyProc->xmin; /* stay the same */
-                       }
-               }
+       if (reply.xmin <= nextXid)
+       {
+               if (reply.epoch != nextEpoch)
+                       return;
+       }
+       else
+       {
+               if (reply.epoch + 1 != nextEpoch)
+                       return;
        }
 
+       if (!TransactionIdPrecedesOrEquals(reply.xmin, nextXid))
+               return;                                 /* epoch OK, but it's wrapped around */
+
        /*
-        * Grab the ProcArrayLock to set xmin, or invalidate for bad reply
+        * Set the WalSender's xmin equal to the standby's requested xmin, so that
+        * the xmin will be taken into account by GetOldestXmin.  This will hold
+        * back the removal of dead rows and thereby prevent the generation of
+        * cleanup conflicts on the standby server.
+        *
+        * There is a small window for a race condition here: although we just
+        * checked that reply.xmin precedes nextXid, the nextXid could have gotten
+        * advanced between our fetching it and applying the xmin below, perhaps
+        * far enough to make reply.xmin wrap around.  In that case the xmin we
+        * set here would be "in the future" and have no effect.  No point in
+        * worrying about this since it's too late to save the desired data
+        * anyway.      Assuming that the standby sends us an increasing sequence of
+        * xmins, this could only happen during the first reply cycle, else our
+        * own xmin would prevent nextXid from advancing so far.
+        *
+        * We don't bother taking the ProcArrayLock here.  Setting the xmin field
+        * is assumed atomic, and there's no real need to prevent a concurrent
+        * GetOldestXmin.  (If we're moving our xmin forward, this is obviously
+        * safe, and if we're moving it backwards, well, the data is at risk
+        * already since a VACUUM could have just finished calling GetOldestXmin.)
         */
-       if (MyProc->xmin != newxmin)
-       {
-               LWLockAcquire(ProcArrayLock, LW_SHARED);
-               MyProc->xmin = newxmin;
-               LWLockRelease(ProcArrayLock);
-       }
+       MyPgXact->xmin = reply.xmin;
 }
 
 /* Main loop of walsender process */
-static int
+static void
 WalSndLoop(void)
 {
        char       *output_message;
@@ -708,11 +736,14 @@ WalSndLoop(void)
        /* Loop forever, unless we get an error */
        for (;;)
        {
+               /* Clear any already-pending wakeups */
+               ResetLatch(&MyWalSnd->latch);
+
                /*
                 * Emergency bailout if postmaster has died.  This is to avoid the
                 * necessity for manual cleanup of all postmaster children.
                 */
-               if (!PostmasterIsAlive(true))
+               if (!PostmasterIsAlive())
                        exit(1);
 
                /* Process any requests or signals received recently */
@@ -726,120 +757,124 @@ WalSndLoop(void)
                /* Normal exit from the walsender is here */
                if (walsender_shutdown_requested)
                {
-                       /* Inform the standby that XLOG streaming was done */
+                       /* Inform the standby that XLOG streaming is done */
                        pq_puttextmessage('C', "COPY 0");
                        pq_flush();
 
                        proc_exit(0);
                }
 
+               /* Check for input from the client */
+               ProcessRepliesIfAny();
+
                /*
-                * If we don't have any pending data in the output buffer, try to
-                * send some more.
+                * If we don't have any pending data in the output buffer, try to send
+                * some more.  If there is some, we don't bother to call XLogSend
+                * again until we've flushed it ... but we'd better assume we are not
+                * caught up.
                 */
                if (!pq_is_send_pending())
-               {
                        XLogSend(output_message, &caughtup);
+               else
+                       caughtup = false;
 
+               /* Try to flush pending output to the client */
+               if (pq_flush_if_writable() != 0)
+                       break;
+
+               /* If nothing remains to be sent right now ... */
+               if (caughtup && !pq_is_send_pending())
+               {
                        /*
-                        * Even if we wrote all the WAL that was available when we started
-                        * sending, more might have arrived while we were sending this
-                        * batch. We had the latch set while sending, so we have not
-                        * received any signals from that time. Let's arm the latch
-                        * again, and after that check that we're still up-to-date.
+                        * If we're in catchup state, move to streaming.  This is an
+                        * important state change for users to know about, since before
+                        * this point data loss might occur if the primary dies and we
+                        * need to failover to the standby. The state change is also
+                        * important for synchronous replication, since commits that
+                        * started to wait at that point might wait for some time.
                         */
-                       if (caughtup && !pq_is_send_pending())
+                       if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
                        {
-                               ResetLatch(&MyWalSnd->latch);
+                               ereport(DEBUG1,
+                                        (errmsg("standby \"%s\" has now caught up with primary",
+                                                        application_name)));
+                               WalSndSetState(WALSNDSTATE_STREAMING);
+                       }
 
+                       /*
+                        * When SIGUSR2 arrives, we send any outstanding logs up to the
+                        * shutdown checkpoint record (i.e., the latest record) and exit.
+                        * This may be a normal termination at shutdown, or a promotion,
+                        * the walsender is not sure which.
+                        */
+                       if (walsender_ready_to_stop)
+                       {
+                               /* ... let's just be real sure we're caught up ... */
                                XLogSend(output_message, &caughtup);
+                               if (caughtup && !pq_is_send_pending())
+                               {
+                                       walsender_shutdown_requested = true;
+                                       continue;       /* don't want to wait more */
+                               }
                        }
                }
 
-               /* Flush pending output to the client */
-               if (pq_flush_if_writable() != 0)
-                       break;
-
                /*
-                * When SIGUSR2 arrives, we send any outstanding logs up to the
-                * shutdown checkpoint record (i.e., the latest record) and exit.
+                * We don't block if not caught up, unless there is unsent data
+                * pending in which case we'd better block until the socket is
+                * write-ready.  This test is only needed for the case where XLogSend
+                * loaded a subset of the available data but then pq_flush_if_writable
+                * flushed it all --- we should immediately try to send more.
                 */
-               if (walsender_ready_to_stop && !pq_is_send_pending())
+               if (caughtup || pq_is_send_pending())
                {
-                       XLogSend(output_message, &caughtup);
-                       ProcessRepliesIfAny();
-                       if (caughtup && !pq_is_send_pending())
-                               walsender_shutdown_requested = true;
-               }
+                       TimestampTz timeout = 0;
+                       long            sleeptime = 10000;              /* 10 s */
+                       int                     wakeEvents;
 
-               if ((caughtup || pq_is_send_pending()) &&
-                       !got_SIGHUP &&
-                       !walsender_shutdown_requested)
-               {
-                       TimestampTz     finish_time = 0;
-                       long            sleeptime;
+                       wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
+                               WL_SOCKET_READABLE | WL_TIMEOUT;
 
-                       /* Reschedule replication timeout */
-                       if (replication_timeout > 0)
+                       if (pq_is_send_pending())
+                               wakeEvents |= WL_SOCKET_WRITEABLE;
+                       else
                        {
-                               long            secs;
-                               int             usecs;
-
-                               finish_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
-                                                                                                                 replication_timeout);
-                               TimestampDifference(GetCurrentTimestamp(),
-                                                                       finish_time, &secs, &usecs);
-                               sleeptime = secs * 1000 + usecs / 1000;
-                               if (WalSndDelay < sleeptime)
-                                       sleeptime = WalSndDelay;
+                               WalSndKeepalive(output_message);
+                               /* Try to flush pending output to the client */
+                               if (pq_flush_if_writable() != 0)
+                                       break;
                        }
-                       else
+
+                       /* Determine time until replication timeout */
+                       if (replication_timeout > 0)
                        {
-                               /*
-                                * XXX: Without timeout, we don't really need the periodic
-                                * wakeups anymore, WaitLatchOrSocket should reliably wake up
-                                * as soon as something interesting happens.
-                                */
-                               sleeptime = WalSndDelay;
+                               timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
+                                                                                                         replication_timeout);
+                               sleeptime = 1 + (replication_timeout / 10);
                        }
 
-                       /* Sleep */
-                       WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
-                                                         true, pq_is_send_pending(),
-                                                         sleeptime * 1000L);
+                       /* Sleep until something happens or replication timeout */
+                       WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
+                                                         MyProcPort->sock, sleeptime);
 
-                       /* Check for replication timeout */
+                       /*
+                        * Check for replication timeout.  Note we ignore the corner case
+                        * possibility that the client replied just as we reached the
+                        * timeout ... he's supposed to reply *before* that.
+                        */
                        if (replication_timeout > 0 &&
-                               GetCurrentTimestamp() >= finish_time)
+                               GetCurrentTimestamp() >= timeout)
                        {
                                /*
                                 * Since typically expiration of replication timeout means
-                                * communication problem, we don't send the error message
-                                * to the standby.
+                                * communication problem, we don't send the error message to
+                                * the standby.
                                 */
                                ereport(COMMERROR,
                                                (errmsg("terminating walsender process due to replication timeout")));
                                break;
                        }
                }
-
-               /*
-                * If we're in catchup state, see if its time to move to streaming.
-                * This is an important state change for users, since before this
-                * point data loss might occur if the primary dies and we need to
-                * failover to the standby. The state change is also important for
-                * synchronous replication, since commits that started to wait at
-                * that point might wait for some time.
-                */
-               if (MyWalSnd->state == WALSNDSTATE_CATCHUP && caughtup)
-               {
-                       ereport(DEBUG1,
-                                       (errmsg("standby \"%s\" has now caught up with primary",
-                                                                       application_name)));
-                       WalSndSetState(WALSNDSTATE_STREAMING);
-               }
-
-               ProcessRepliesIfAny();
        }
 
        /*
@@ -852,7 +887,7 @@ WalSndLoop(void)
                whereToSendOutput = DestNone;
 
        proc_exit(0);
-       return 1;                                       /* keep the compiler quiet */
+       abort();                                        /* keep the compiler quiet */
 }
 
 /* Initialize a per-walsender data structure for this walsender process */
@@ -929,7 +964,7 @@ WalSndKill(int code, Datum arg)
 }
 
 /*
- * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr'
+ * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
  *
  * XXX probably this should be improved to suck data directly from the
  * WAL buffers when possible.
@@ -940,14 +975,18 @@ WalSndKill(int code, Datum arg)
  * more than one.
  */
 void
-XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
+XLogRead(char *buf, XLogRecPtr startptr, Size count)
 {
-       XLogRecPtr      startRecPtr = recptr;
-       char            path[MAXPGPATH];
-       uint32          lastRemovedLog;
-       uint32          lastRemovedSeg;
-       uint32          log;
-       uint32          seg;
+       char       *p;
+       XLogRecPtr      recptr;
+       Size            nbytes;
+       XLogSegNo       lastRemovedSegNo;
+       XLogSegNo       segno;
+
+retry:
+       p = buf;
+       recptr = startptr;
+       nbytes = count;
 
        while (nbytes > 0)
        {
@@ -955,16 +994,18 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
                int                     segbytes;
                int                     readbytes;
 
-               startoff = recptr.xrecoff % XLogSegSize;
+               startoff = recptr % XLogSegSize;
 
-               if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg))
+               if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
                {
+                       char            path[MAXPGPATH];
+
                        /* Switch to another logfile segment */
                        if (sendFile >= 0)
                                close(sendFile);
 
-                       XLByteToSeg(recptr, sendId, sendSeg);
-                       XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);
+                       XLByteToSeg(recptr, sendSegNo);
+                       XLogFilePath(path, ThisTimeLineID, sendSegNo);
 
                        sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
                        if (sendFile < 0)
@@ -975,20 +1016,15 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
                                 * removed or recycled.
                                 */
                                if (errno == ENOENT)
-                               {
-                                       char            filename[MAXFNAMELEN];
-
-                                       XLogFileName(filename, ThisTimeLineID, sendId, sendSeg);
                                        ereport(ERROR,
                                                        (errcode_for_file_access(),
                                                         errmsg("requested WAL segment %s has already been removed",
-                                                                       filename)));
-                               }
+                                                                       XLogFileNameP(ThisTimeLineID, sendSegNo))));
                                else
                                        ereport(ERROR,
                                                        (errcode_for_file_access(),
-                                                        errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
-                                                                       path, sendId, sendSeg)));
+                                                        errmsg("could not open file \"%s\": %m",
+                                                                       path)));
                        }
                        sendOff = 0;
                }
@@ -999,8 +1035,9 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
                        if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
                                ereport(ERROR,
                                                (errcode_for_file_access(),
-                                                errmsg("could not seek in log file %u, segment %u to offset %u: %m",
-                                                               sendId, sendSeg, startoff)));
+                                                errmsg("could not seek in log segment %s to offset %u: %m",
+                                                               XLogFileNameP(ThisTimeLineID, sendSegNo),
+                                                               startoff)));
                        sendOff = startoff;
                }
 
@@ -1010,20 +1047,22 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
                else
                        segbytes = nbytes;
 
-               readbytes = read(sendFile, buf, segbytes);
+               readbytes = read(sendFile, p, segbytes);
                if (readbytes <= 0)
+               {
                        ereport(ERROR,
                                        (errcode_for_file_access(),
-                       errmsg("could not read from log file %u, segment %u, offset %u, "
-                                  "length %lu: %m",
-                                  sendId, sendSeg, sendOff, (unsigned long) segbytes)));
+                       errmsg("could not read from log segment %s, offset %u, length %lu: %m",
+                                  XLogFileNameP(ThisTimeLineID, sendSegNo),
+                                  sendOff, (unsigned long) segbytes)));
+               }
 
                /* Update state for read */
                XLByteAdvance(recptr, readbytes);
 
                sendOff += readbytes;
                nbytes -= readbytes;
-               buf += readbytes;
+               p += readbytes;
        }
 
        /*
@@ -1033,18 +1072,38 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
         * read() succeeds in that case, but the data we tried to read might
         * already have been overwritten with new WAL records.
         */
-       XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
-       XLByteToSeg(startRecPtr, log, seg);
-       if (log < lastRemovedLog ||
-               (log == lastRemovedLog && seg <= lastRemovedSeg))
-       {
-               char            filename[MAXFNAMELEN];
-
-               XLogFileName(filename, ThisTimeLineID, log, seg);
+       XLogGetLastRemoved(&lastRemovedSegNo);
+       XLByteToSeg(startptr, segno);
+       if (segno <= lastRemovedSegNo)
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("requested WAL segment %s has already been removed",
-                                               filename)));
+                                               XLogFileNameP(ThisTimeLineID, segno))));
+
+       /*
+        * During recovery, the currently-open WAL file might be replaced with the
+        * file of the same name retrieved from archive. So we always need to
+        * check what we read was valid after reading into the buffer. If it's
+        * invalid, we try to open and read the file again.
+        */
+       if (am_cascading_walsender)
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile WalSnd *walsnd = MyWalSnd;
+               bool            reload;
+
+               SpinLockAcquire(&walsnd->mutex);
+               reload = walsnd->needreload;
+               walsnd->needreload = false;
+               SpinLockRelease(&walsnd->mutex);
+
+               if (reload && sendFile >= 0)
+               {
+                       close(sendFile);
+                       sendFile = -1;
+
+                       goto retry;
+               }
        }
 }
 
@@ -1078,7 +1137,7 @@ XLogSend(char *msgbuf, bool *caughtup)
         * subsequently crashes and restarts, slaves must not have applied any WAL
         * that gets lost on the master.
         */
-       SendRqstPtr = GetFlushRecPtr();
+       SendRqstPtr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetFlushRecPtr();
 
        /* Quick exit if nothing to do */
        if (XLByteLE(SendRqstPtr, sentPtr))
@@ -1099,25 +1158,8 @@ XLogSend(char *msgbuf, bool *caughtup)
         * SendRqstPtr never points to the middle of a WAL record.
         */
        startptr = sentPtr;
-       if (startptr.xrecoff >= XLogFileSize)
-       {
-               /*
-                * crossing a logid boundary, skip the non-existent last log segment
-                * in previous logical log file.
-                */
-               startptr.xlogid += 1;
-               startptr.xrecoff = 0;
-       }
-
        endptr = startptr;
        XLByteAdvance(endptr, MAX_SEND_SIZE);
-       if (endptr.xlogid != startptr.xlogid)
-       {
-               /* Don't cross a logfile boundary within one message */
-               Assert(endptr.xlogid == startptr.xlogid + 1);
-               endptr.xlogid = startptr.xlogid;
-               endptr.xrecoff = XLogFileSize;
-       }
 
        /* if we went beyond SendRqstPtr, back off */
        if (XLByteLE(SendRqstPtr, endptr))
@@ -1128,11 +1170,11 @@ XLogSend(char *msgbuf, bool *caughtup)
        else
        {
                /* round down to page boundary. */
-               endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
+               endptr -= (endptr % XLOG_BLCKSZ);
                *caughtup = false;
        }
 
-       nbytes = endptr.xrecoff - startptr.xrecoff;
+       nbytes = endptr - startptr;
        Assert(nbytes <= MAX_SEND_SIZE);
 
        /*
@@ -1176,29 +1218,66 @@ XLogSend(char *msgbuf, bool *caughtup)
                char            activitymsg[50];
 
                snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
-                                sentPtr.xlogid, sentPtr.xrecoff);
+                                (uint32) (sentPtr >> 32), (uint32) sentPtr);
                set_ps_display(activitymsg, false);
        }
 
        return;
 }
 
+/*
+ * Request walsenders to reload the currently-open WAL file
+ */
+void
+WalSndRqstFileReload(void)
+{
+       int                     i;
+
+       for (i = 0; i < max_wal_senders; i++)
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+
+               if (walsnd->pid == 0)
+                       continue;
+
+               SpinLockAcquire(&walsnd->mutex);
+               walsnd->needreload = true;
+               SpinLockRelease(&walsnd->mutex);
+       }
+}
+
 /* SIGHUP: set flag to re-read config file at next convenient time */
 static void
 WalSndSigHupHandler(SIGNAL_ARGS)
 {
+       int                     save_errno = errno;
+
        got_SIGHUP = true;
        if (MyWalSnd)
                SetLatch(&MyWalSnd->latch);
+
+       errno = save_errno;
 }
 
 /* SIGTERM: set flag to shut down */
 static void
 WalSndShutdownHandler(SIGNAL_ARGS)
 {
+       int                     save_errno = errno;
+
        walsender_shutdown_requested = true;
        if (MyWalSnd)
                SetLatch(&MyWalSnd->latch);
+
+       /*
+        * Set the standard (non-walsender) state as well, so that we can abort
+        * things like do_pg_stop_backup().
+        */
+       InterruptPending = true;
+       ProcDiePending = true;
+
+       errno = save_errno;
 }
 
 /*
@@ -1237,16 +1316,24 @@ WalSndQuickDieHandler(SIGNAL_ARGS)
 static void
 WalSndXLogSendHandler(SIGNAL_ARGS)
 {
+       int                     save_errno = errno;
+
        latch_sigusr1_handler();
+
+       errno = save_errno;
 }
 
 /* SIGUSR2: set flag to do a last cycle and shut down afterwards */
 static void
 WalSndLastCycleHandler(SIGNAL_ARGS)
 {
+       int                     save_errno = errno;
+
        walsender_ready_to_stop = true;
        if (MyWalSnd)
                SetLatch(&MyWalSnd->latch);
+
+       errno = save_errno;
 }
 
 /* Set up signal handlers */
@@ -1259,7 +1346,7 @@ WalSndSignals(void)
        pqsignal(SIGINT, SIG_IGN);      /* not used */
        pqsignal(SIGTERM, WalSndShutdownHandler);       /* request shutdown */
        pqsignal(SIGQUIT, WalSndQuickDieHandler);       /* hard crash time */
-       pqsignal(SIGALRM, SIG_IGN);
+       InitializeTimeouts();           /* establishes SIGALRM handler */
        pqsignal(SIGPIPE, SIG_IGN);
        pqsignal(SIGUSR1, WalSndXLogSendHandler);       /* request WAL sending */
        pqsignal(SIGUSR2, WalSndLastCycleHandler);      /* request a last cycle and
@@ -1300,7 +1387,8 @@ WalSndShmemInit(void)
                /* First time through, so initialize */
                MemSet(WalSndCtl, 0, WalSndShmemSize());
 
-               SHMQueueInit(&(WalSndCtl->SyncRepQueue));
+               for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
+                       SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
 
                for (i = 0; i < max_wal_senders; i++)
                {
@@ -1312,11 +1400,16 @@ WalSndShmemInit(void)
        }
 }
 
-/* Wake up all walsenders */
+/*
+ * Wake up all walsenders
+ *
+ * This will be called inside critical sections, so throwing an error is not
+ * adviseable.
+ */
 void
 WalSndWakeup(void)
 {
-       int             i;
+       int                     i;
 
        for (i = 0; i < max_wal_senders; i++)
                SetLatch(&WalSndCtl->walsnds[i].latch);
@@ -1349,13 +1442,13 @@ WalSndGetStateString(WalSndState state)
        switch (state)
        {
                case WALSNDSTATE_STARTUP:
-                       return "STARTUP";
+                       return "startup";
                case WALSNDSTATE_BACKUP:
-                       return "BACKUP";
+                       return "backup";
                case WALSNDSTATE_CATCHUP:
-                       return "CATCHUP";
+                       return "catchup";
                case WALSNDSTATE_STREAMING:
-                       return "STREAMING";
+                       return "streaming";
        }
        return "UNKNOWN";
 }
@@ -1368,16 +1461,16 @@ WalSndGetStateString(WalSndState state)
 Datum
 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_WAL_SENDERS_COLS   8
-       ReturnSetInfo      *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
-       TupleDesc                       tupdesc;
-       Tuplestorestate    *tupstore;
-       MemoryContext           per_query_ctx;
-       MemoryContext           oldcontext;
-       int                                     *sync_priority;
-       int                                     priority = 0;
-       int                                     sync_standby = -1;
-       int                                     i;
+#define PG_STAT_GET_WAL_SENDERS_COLS   8
+       ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+       TupleDesc       tupdesc;
+       Tuplestorestate *tupstore;
+       MemoryContext per_query_ctx;
+       MemoryContext oldcontext;
+       int                *sync_priority;
+       int                     priority = 0;
+       int                     sync_standby = -1;
+       int                     i;
 
        /* check to see if caller supports us returning a tuplestore */
        if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
@@ -1405,9 +1498,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
        MemoryContextSwitchTo(oldcontext);
 
        /*
-        * Get the priorities of sync standbys all in one go, to minimise
-        * lock acquisitions and to allow us to evaluate who is the current
-        * sync standby. This code must match the code in SyncRepReleaseWaiters().
+        * Get the priorities of sync standbys all in one go, to minimise lock
+        * acquisitions and to allow us to evaluate who is the current sync
+        * standby. This code must match the code in SyncRepReleaseWaiters().
         */
        sync_priority = palloc(sizeof(int) * max_wal_senders);
        LWLockAcquire(SyncRepLock, LW_SHARED);
@@ -1418,12 +1511,19 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 
                if (walsnd->pid != 0)
                {
-                       sync_priority[i] = walsnd->sync_standby_priority;
+                       /*
+                        * Treat a standby such as a pg_basebackup background process
+                        * which always returns an invalid flush location, as an
+                        * asynchronous standby.
+                        */
+                       sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
+                               0 : walsnd->sync_standby_priority;
 
                        if (walsnd->state == WALSNDSTATE_STREAMING &&
                                walsnd->sync_standby_priority > 0 &&
                                (priority == 0 ||
-                                priority > walsnd->sync_standby_priority))
+                                priority > walsnd->sync_standby_priority) &&
+                               !XLogRecPtrIsInvalid(walsnd->flush))
                        {
                                priority = walsnd->sync_standby_priority;
                                sync_standby = i;
@@ -1441,7 +1541,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
                XLogRecPtr      write;
                XLogRecPtr      flush;
                XLogRecPtr      apply;
-               WalSndState     state;
+               WalSndState state;
                Datum           values[PG_STAT_GET_WAL_SENDERS_COLS];
                bool            nulls[PG_STAT_GET_WAL_SENDERS_COLS];
 
@@ -1462,8 +1562,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
                if (!superuser())
                {
                        /*
-                        * Only superusers can see details. Other users only get
-                        * the pid value to know it's a walsender, but no details.
+                        * Only superusers can see details. Other users only get the pid
+                        * value to know it's a walsender, but no details.
                         */
                        MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
                }
@@ -1472,39 +1572,39 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
                        values[1] = CStringGetTextDatum(WalSndGetStateString(state));
 
                        snprintf(location, sizeof(location), "%X/%X",
-                                        sentPtr.xlogid, sentPtr.xrecoff);
+                                        (uint32) (sentPtr >> 32), (uint32) sentPtr);
                        values[2] = CStringGetTextDatum(location);
 
-                       if (write.xlogid == 0 && write.xrecoff == 0)
+                       if (write == 0)
                                nulls[3] = true;
                        snprintf(location, sizeof(location), "%X/%X",
-                                        write.xlogid, write.xrecoff);
+                                        (uint32) (write >> 32), (uint32) write);
                        values[3] = CStringGetTextDatum(location);
 
-                       if (flush.xlogid == 0 && flush.xrecoff == 0)
+                       if (flush == 0)
                                nulls[4] = true;
                        snprintf(location, sizeof(location), "%X/%X",
-                                       flush.xlogid, flush.xrecoff);
+                                        (uint32) (flush >> 32), (uint32) flush);
                        values[4] = CStringGetTextDatum(location);
 
-                       if (apply.xlogid == 0 && apply.xrecoff == 0)
+                       if (apply == 0)
                                nulls[5] = true;
                        snprintf(location, sizeof(location), "%X/%X",
-                                        apply.xlogid, apply.xrecoff);
+                                        (uint32) (apply >> 32), (uint32) apply);
                        values[5] = CStringGetTextDatum(location);
 
                        values[6] = Int32GetDatum(sync_priority[i]);
 
                        /*
-                        * More easily understood version of standby state.
-                        * This is purely informational, not different from priority.
+                        * More easily understood version of standby state. This is purely
+                        * informational, not different from priority.
                         */
                        if (sync_priority[i] == 0)
-                               values[7] = CStringGetTextDatum("ASYNC");
+                               values[7] = CStringGetTextDatum("async");
                        else if (i == sync_standby)
-                               values[7] = CStringGetTextDatum("SYNC");
+                               values[7] = CStringGetTextDatum("sync");
                        else
-                               values[7] = CStringGetTextDatum("POTENTIAL");
+                               values[7] = CStringGetTextDatum("potential");
                }
 
                tuplestore_putvalues(tupstore, tupdesc, values, nulls);
@@ -1517,6 +1617,23 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
        return (Datum) 0;
 }
 
+static void
+WalSndKeepalive(char *msgbuf)
+{
+       PrimaryKeepaliveMessage keepalive_message;
+
+       /* Construct a new message */
+       keepalive_message.walEnd = sentPtr;
+       keepalive_message.sendTime = GetCurrentTimestamp();
+
+       elog(DEBUG2, "sending replication keepalive");
+
+       /* Prepend with the message type and send it. */
+       msgbuf[0] = 'k';
+       memcpy(msgbuf + 1, &keepalive_message, sizeof(PrimaryKeepaliveMessage));
+       pq_putmessage_noblock('d', msgbuf, sizeof(PrimaryKeepaliveMessage) + 1);
+}
+
 /*
  * This isn't currently used for anything. Monitoring tools might be
  * interested in the future, and we'll need something like this in the