]> granicus.if.org Git - postgresql/blobdiff - src/backend/replication/walsender.c
Introduce timeout handling framework
[postgresql] / src / backend / replication / walsender.c
index cacd577acce1b28a7c4a6f1ed9e2385a576ae3fd..37a030b5f5e4e1536c44227672be05ffcff88537 100644 (file)
@@ -25,7 +25,7 @@
  * shutdown checkpoint record, and then exit.
  *
  *
- * Portions Copyright (c) 2010-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
  *
  * IDENTIFICATION
  *       src/backend/replication/walsender.c
 #include <signal.h>
 #include <unistd.h>
 
-#include "funcapi.h"
+#include "access/transam.h"
 #include "access/xlog_internal.h"
 #include "catalog/pg_type.h"
+#include "funcapi.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
+#include "nodes/replnodes.h"
 #include "replication/basebackup.h"
+#include "replication/syncrep.h"
 #include "replication/walprotocol.h"
+#include "replication/walreceiver.h"
 #include "replication/walsender.h"
+#include "replication/walsender_private.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
 #include "utils/resowner.h"
+#include "utils/timeout.h"
+#include "utils/timestamp.h"
 
 
 /* Array of WalSnds in shared memory */
 WalSndCtlData *WalSndCtl = NULL;
 
 /* My slot in the shared memory array */
-static WalSnd *MyWalSnd = NULL;
+WalSnd    *MyWalSnd = NULL;
 
 /* Global state */
 bool           am_walsender = false;           /* Am I a walsender process ? */
+bool           am_cascading_walsender = false;         /* Am I cascading WAL to
+                                                                                                * another standby ? */
 
 /* User-settable parameters for walsender */
 int                    max_wal_senders = 0;    /* the maximum number of concurrent walsenders */
-int                    WalSndDelay = 200;      /* max sleep time between some actions */
+int                    replication_timeout = 60 * 1000;        /* maximum time to send one
+                                                                                                * WAL data message */
+/*
+ * State for WalSndWakeupRequest
+ */
+bool wake_wal_senders = false;
 
 /*
  * These variables are used similarly to openLogFile/Id/Seg/Off,
  * but for walsender to read the XLOG.
  */
 static int     sendFile = -1;
-static uint32 sendId = 0;
-static uint32 sendSeg = 0;
+static XLogSegNo sendSegNo = 0;
 static uint32 sendOff = 0;
 
 /*
  * How far have we sent WAL already? This is also advertised in
  * MyWalSnd->sentPtr.  (Actually, this is the next WAL location to send.)
  */
-static XLogRecPtr sentPtr = {0, 0};
+static XLogRecPtr sentPtr = 0;
+
+/*
+ * Buffer for processing reply messages.
+ */
+static StringInfoData reply_message;
+
+/*
+ * Timestamp of the last receipt of the reply from the standby.
+ */
+static TimestampTz last_reply_timestamp;
 
 /* Flags set by signal handlers for later service in main loop */
 static volatile sig_atomic_t got_SIGHUP = false;
-static volatile sig_atomic_t shutdown_requested = false;
-static volatile sig_atomic_t ready_to_stop = false;
+volatile sig_atomic_t walsender_shutdown_requested = false;
+volatile sig_atomic_t walsender_ready_to_stop = false;
 
 /* Signal handlers */
 static void WalSndSigHupHandler(SIGNAL_ARGS);
@@ -99,25 +124,28 @@ static void WalSndXLogSendHandler(SIGNAL_ARGS);
 static void WalSndLastCycleHandler(SIGNAL_ARGS);
 
 /* Prototypes for private functions */
-static int     WalSndLoop(void);
+static bool HandleReplicationCommand(const char *cmd_string);
+static void WalSndLoop(void) __attribute__((noreturn));
 static void InitWalSnd(void);
 static void WalSndHandshake(void);
 static void WalSndKill(int code, Datum arg);
-static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
-static bool XLogSend(char *msgbuf, bool *caughtup);
-static void CheckClosedConnection(void);
+static void XLogSend(char *msgbuf, bool *caughtup);
+static void IdentifySystem(void);
+static void StartReplication(StartReplicationCmd *cmd);
+static void ProcessStandbyMessage(void);
+static void ProcessStandbyReplyMessage(void);
+static void ProcessStandbyHSFeedbackMessage(void);
+static void ProcessRepliesIfAny(void);
+static void WalSndKeepalive(char *msgbuf);
 
 
 /* Main entry point for walsender process */
-int
+void
 WalSenderMain(void)
 {
        MemoryContext walsnd_context;
 
-       if (RecoveryInProgress())
-               ereport(FATAL,
-                               (errcode(ERRCODE_CANNOT_CONNECT_NOW),
-                                errmsg("recovery is still in progress, can't accept WAL streaming connections")));
+       am_cascading_walsender = RecoveryInProgress();
 
        /* Create a per-walsender data structure in shared memory */
        InitWalSnd();
@@ -144,6 +172,12 @@ WalSenderMain(void)
        /* Unblock signals (they were blocked when the postmaster forked us) */
        PG_SETMASK(&UnBlockSig);
 
+       /*
+        * Use the recovery target timeline ID during recovery
+        */
+       if (am_cascading_walsender)
+               ThisTimeLineID = GetRecoveryTargetTLI();
+
        /* Tell the standby that walsender is ready for receiving commands */
        ReadyForQuery(DestRemote);
 
@@ -160,8 +194,10 @@ WalSenderMain(void)
                SpinLockRelease(&walsnd->mutex);
        }
 
+       SyncRepInitConfig();
+
        /* Main loop of walsender */
-       return WalSndLoop();
+       WalSndLoop();
 }
 
 /*
@@ -189,7 +225,7 @@ WalSndHandshake(void)
                 * Emergency bailout if postmaster has died.  This is to avoid the
                 * necessity for manual cleanup of all postmaster children.
                 */
-               if (!PostmasterIsAlive(true))
+               if (!PostmasterIsAlive())
                        exit(1);
 
                /*
@@ -218,118 +254,14 @@ WalSndHandshake(void)
                        case 'Q':                       /* Query message */
                                {
                                        const char *query_string;
-                                       XLogRecPtr      recptr;
 
                                        query_string = pq_getmsgstring(&input_message);
                                        pq_getmsgend(&input_message);
 
-                                       if (strcmp(query_string, "IDENTIFY_SYSTEM") == 0)
-                                       {
-                                               StringInfoData buf;
-                                               char            sysid[32];
-                                               char            tli[11];
-
-                                               /*
-                                                * Reply with a result set with one row, two columns.
-                                                * First col is system ID, and second is timeline ID
-                                                */
-
-                                               snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
-                                                                GetSystemIdentifier());
-                                               snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
-
-                                               /* Send a RowDescription message */
-                                               pq_beginmessage(&buf, 'T');
-                                               pq_sendint(&buf, 2, 2); /* 2 fields */
-
-                                               /* first field */
-                                               pq_sendstring(&buf, "systemid");                /* col name */
-                                               pq_sendint(&buf, 0, 4); /* table oid */
-                                               pq_sendint(&buf, 0, 2); /* attnum */
-                                               pq_sendint(&buf, TEXTOID, 4);   /* type oid */
-                                               pq_sendint(&buf, -1, 2);                /* typlen */
-                                               pq_sendint(&buf, 0, 4); /* typmod */
-                                               pq_sendint(&buf, 0, 2); /* format code */
-
-                                               /* second field */
-                                               pq_sendstring(&buf, "timeline");                /* col name */
-                                               pq_sendint(&buf, 0, 4); /* table oid */
-                                               pq_sendint(&buf, 0, 2); /* attnum */
-                                               pq_sendint(&buf, INT4OID, 4);   /* type oid */
-                                               pq_sendint(&buf, 4, 2); /* typlen */
-                                               pq_sendint(&buf, 0, 4); /* typmod */
-                                               pq_sendint(&buf, 0, 2); /* format code */
-                                               pq_endmessage(&buf);
-
-                                               /* Send a DataRow message */
-                                               pq_beginmessage(&buf, 'D');
-                                               pq_sendint(&buf, 2, 2); /* # of columns */
-                                               pq_sendint(&buf, strlen(sysid), 4);             /* col1 len */
-                                               pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
-                                               pq_sendint(&buf, strlen(tli), 4);               /* col2 len */
-                                               pq_sendbytes(&buf, (char *) tli, strlen(tli));
-                                               pq_endmessage(&buf);
-
-                                               /* Send CommandComplete and ReadyForQuery messages */
-                                               EndCommand("SELECT", DestRemote);
-                                               ReadyForQuery(DestRemote);
-                                               /* ReadyForQuery did pq_flush for us */
-                                       }
-                                       else if (sscanf(query_string, "START_REPLICATION %X/%X",
-                                                                       &recptr.xlogid, &recptr.xrecoff) == 2)
-                                       {
-                                               StringInfoData buf;
-
-                                               /*
-                                                * Check that we're logging enough information in the
-                                                * WAL for log-shipping.
-                                                *
-                                                * NOTE: This only checks the current value of
-                                                * wal_level. Even if the current setting is not
-                                                * 'minimal', there can be old WAL in the pg_xlog
-                                                * directory that was created with 'minimal'. So this
-                                                * is not bulletproof, the purpose is just to give a
-                                                * user-friendly error message that hints how to
-                                                * configure the system correctly.
-                                                */
-                                               if (wal_level == WAL_LEVEL_MINIMAL)
-                                                       ereport(FATAL,
-                                                                       (errcode(ERRCODE_CANNOT_CONNECT_NOW),
-                                                                        errmsg("standby connections not allowed because wal_level=minimal")));
-
-                                               /* Send a CopyBothResponse message, and start streaming */
-                                               pq_beginmessage(&buf, 'W');
-                                               pq_sendbyte(&buf, 0);
-                                               pq_sendint(&buf, 0, 2);
-                                               pq_endmessage(&buf);
-                                               pq_flush();
-
-                                               /*
-                                                * Initialize position to the received one, then the
-                                                * xlog records begin to be shipped from that position
-                                                */
-                                               sentPtr = recptr;
-
-                                               /* break out of the loop */
+                                       if (HandleReplicationCommand(query_string))
                                                replication_started = true;
-                                       }
-                                       else if (strncmp(query_string, "BASE_BACKUP ", 12) == 0)
-                                       {
-                                               /* Command is BASE_BACKUP <options>;<label> */
-                                               SendBaseBackup(query_string + strlen("BASE_BACKUP "));
-                                               /* Send CommandComplete and ReadyForQuery messages */
-                                               EndCommand("SELECT", DestRemote);
-                                               ReadyForQuery(DestRemote);
-                                               /* ReadyForQuery did pq_flush for us */
-                                       }
-                                       else
-                                       {
-                                               ereport(FATAL,
-                                                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                                                                errmsg("invalid standby query string: %s", query_string)));
-                                       }
-                                       break;
                                }
+                               break;
 
                        case 'X':
                                /* standby is closing the connection */
@@ -350,49 +282,436 @@ WalSndHandshake(void)
        }
 }
 
+/*
+ * IDENTIFY_SYSTEM
+ */
+static void
+IdentifySystem(void)
+{
+       StringInfoData buf;
+       char            sysid[32];
+       char            tli[11];
+       char            xpos[MAXFNAMELEN];
+       XLogRecPtr      logptr;
+
+       /*
+        * Reply with a result set with one row, three columns. First col is
+        * system ID, second is timeline ID, and third is current xlog location.
+        */
+
+       snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
+                        GetSystemIdentifier());
+       snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
+
+       logptr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetInsertRecPtr();
+
+       snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
+
+       /* Send a RowDescription message */
+       pq_beginmessage(&buf, 'T');
+       pq_sendint(&buf, 3, 2);         /* 3 fields */
+
+       /* first field */
+       pq_sendstring(&buf, "systemid");        /* col name */
+       pq_sendint(&buf, 0, 4);         /* table oid */
+       pq_sendint(&buf, 0, 2);         /* attnum */
+       pq_sendint(&buf, TEXTOID, 4);           /* type oid */
+       pq_sendint(&buf, -1, 2);        /* typlen */
+       pq_sendint(&buf, 0, 4);         /* typmod */
+       pq_sendint(&buf, 0, 2);         /* format code */
+
+       /* second field */
+       pq_sendstring(&buf, "timeline");        /* col name */
+       pq_sendint(&buf, 0, 4);         /* table oid */
+       pq_sendint(&buf, 0, 2);         /* attnum */
+       pq_sendint(&buf, INT4OID, 4);           /* type oid */
+       pq_sendint(&buf, 4, 2);         /* typlen */
+       pq_sendint(&buf, 0, 4);         /* typmod */
+       pq_sendint(&buf, 0, 2);         /* format code */
+
+       /* third field */
+       pq_sendstring(&buf, "xlogpos");
+       pq_sendint(&buf, 0, 4);
+       pq_sendint(&buf, 0, 2);
+       pq_sendint(&buf, TEXTOID, 4);
+       pq_sendint(&buf, -1, 2);
+       pq_sendint(&buf, 0, 4);
+       pq_sendint(&buf, 0, 2);
+       pq_endmessage(&buf);
+
+       /* Send a DataRow message */
+       pq_beginmessage(&buf, 'D');
+       pq_sendint(&buf, 3, 2);         /* # of columns */
+       pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
+       pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
+       pq_sendint(&buf, strlen(tli), 4);       /* col2 len */
+       pq_sendbytes(&buf, (char *) tli, strlen(tli));
+       pq_sendint(&buf, strlen(xpos), 4);      /* col3 len */
+       pq_sendbytes(&buf, (char *) xpos, strlen(xpos));
+
+       pq_endmessage(&buf);
+
+       /* Send CommandComplete and ReadyForQuery messages */
+       EndCommand("SELECT", DestRemote);
+       ReadyForQuery(DestRemote);
+       /* ReadyForQuery did pq_flush for us */
+}
+
+/*
+ * START_REPLICATION
+ */
+static void
+StartReplication(StartReplicationCmd *cmd)
+{
+       StringInfoData buf;
+
+       /*
+        * Let postmaster know that we're streaming. Once we've declared us as a
+        * WAL sender process, postmaster will let us outlive the bgwriter and
+        * kill us last in the shutdown sequence, so we get a chance to stream all
+        * remaining WAL at shutdown, including the shutdown checkpoint. Note that
+        * there's no going back, and we mustn't write any WAL records after this.
+        */
+       MarkPostmasterChildWalSender();
+       SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
+
+       /*
+        * When promoting a cascading standby, postmaster sends SIGUSR2 to any
+        * cascading walsenders to kill them. But there is a corner-case where
+        * such walsender fails to receive SIGUSR2 and survives a standby
+        * promotion unexpectedly. This happens when postmaster sends SIGUSR2
+        * before the walsender marks itself as a WAL sender, because postmaster
+        * sends SIGUSR2 to only the processes marked as a WAL sender.
+        *
+        * To avoid this corner-case, if recovery is NOT in progress even though
+        * the walsender is cascading one, we do the same thing as SIGUSR2 signal
+        * handler does, i.e., set walsender_ready_to_stop to true. Which causes
+        * the walsender to end later.
+        *
+        * When terminating cascading walsenders, usually postmaster writes the
+        * log message announcing the terminations. But there is a race condition
+        * here. If there is no walsender except this process before reaching
+        * here, postmaster thinks that there is no walsender and suppresses that
+        * log message. To handle this case, we always emit that log message here.
+        * This might cause duplicate log messages, but which is less likely to
+        * happen, so it's not worth writing some code to suppress them.
+        */
+       if (am_cascading_walsender && !RecoveryInProgress())
+       {
+               ereport(LOG,
+                  (errmsg("terminating walsender process to force cascaded standby "
+                                  "to update timeline and reconnect")));
+               walsender_ready_to_stop = true;
+       }
+
+       /*
+        * We assume here that we're logging enough information in the WAL for
+        * log-shipping, since this is checked in PostmasterMain().
+        *
+        * NOTE: wal_level can only change at shutdown, so in most cases it is
+        * difficult for there to be WAL data that we can still see that was
+        * written at wal_level='minimal'.
+        */
+
+       /*
+        * When we first start replication the standby will be behind the primary.
+        * For some applications, for example, synchronous replication, it is
+        * important to have a clear state for this initial catchup mode, so we
+        * can trigger actions when we change streaming state later. We may stay
+        * in this state for a long time, which is exactly why we want to be able
+        * to monitor whether or not we are still here.
+        */
+       WalSndSetState(WALSNDSTATE_CATCHUP);
+
+       /* Send a CopyBothResponse message, and start streaming */
+       pq_beginmessage(&buf, 'W');
+       pq_sendbyte(&buf, 0);
+       pq_sendint(&buf, 0, 2);
+       pq_endmessage(&buf);
+       pq_flush();
+
+       /*
+        * Initialize position to the received one, then the xlog records begin to
+        * be shipped from that position
+        */
+       sentPtr = cmd->startpoint;
+}
+
+/*
+ * Execute an incoming replication command.
+ */
+static bool
+HandleReplicationCommand(const char *cmd_string)
+{
+       bool            replication_started = false;
+       int                     parse_rc;
+       Node       *cmd_node;
+       MemoryContext cmd_context;
+       MemoryContext old_context;
+
+       elog(DEBUG1, "received replication command: %s", cmd_string);
+
+       cmd_context = AllocSetContextCreate(CurrentMemoryContext,
+                                                                               "Replication command context",
+                                                                               ALLOCSET_DEFAULT_MINSIZE,
+                                                                               ALLOCSET_DEFAULT_INITSIZE,
+                                                                               ALLOCSET_DEFAULT_MAXSIZE);
+       old_context = MemoryContextSwitchTo(cmd_context);
+
+       replication_scanner_init(cmd_string);
+       parse_rc = replication_yyparse();
+       if (parse_rc != 0)
+               ereport(ERROR,
+                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                (errmsg_internal("replication command parser returned %d",
+                                                                 parse_rc))));
+
+       cmd_node = replication_parse_result;
+
+       switch (cmd_node->type)
+       {
+               case T_IdentifySystemCmd:
+                       IdentifySystem();
+                       break;
+
+               case T_StartReplicationCmd:
+                       StartReplication((StartReplicationCmd *) cmd_node);
+
+                       /* break out of the loop */
+                       replication_started = true;
+                       break;
+
+               case T_BaseBackupCmd:
+                       SendBaseBackup((BaseBackupCmd *) cmd_node);
+
+                       /* Send CommandComplete and ReadyForQuery messages */
+                       EndCommand("SELECT", DestRemote);
+                       ReadyForQuery(DestRemote);
+                       /* ReadyForQuery did pq_flush for us */
+                       break;
+
+               default:
+                       ereport(FATAL,
+                                       (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                        errmsg("invalid standby query string: %s", cmd_string)));
+       }
+
+       /* done */
+       MemoryContextSwitchTo(old_context);
+       MemoryContextDelete(cmd_context);
+
+       return replication_started;
+}
+
 /*
  * Check if the remote end has closed the connection.
  */
 static void
-CheckClosedConnection(void)
+ProcessRepliesIfAny(void)
 {
        unsigned char firstchar;
        int                     r;
+       bool            received = false;
 
-       r = pq_getbyte_if_available(&firstchar);
-       if (r < 0)
+       for (;;)
+       {
+               r = pq_getbyte_if_available(&firstchar);
+               if (r < 0)
+               {
+                       /* unexpected error or EOF */
+                       ereport(COMMERROR,
+                                       (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                        errmsg("unexpected EOF on standby connection")));
+                       proc_exit(0);
+               }
+               if (r == 0)
+               {
+                       /* no data available without blocking */
+                       break;
+               }
+
+               /* Handle the very limited subset of commands expected in this phase */
+               switch (firstchar)
+               {
+                               /*
+                                * 'd' means a standby reply wrapped in a CopyData packet.
+                                */
+                       case 'd':
+                               ProcessStandbyMessage();
+                               received = true;
+                               break;
+
+                               /*
+                                * 'X' means that the standby is closing down the socket.
+                                */
+                       case 'X':
+                               proc_exit(0);
+
+                       default:
+                               ereport(FATAL,
+                                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                                errmsg("invalid standby message type \"%c\"",
+                                                               firstchar)));
+               }
+       }
+
+       /*
+        * Save the last reply timestamp if we've received at least one reply.
+        */
+       if (received)
+               last_reply_timestamp = GetCurrentTimestamp();
+}
+
+/*
+ * Process a status update message received from standby.
+ */
+static void
+ProcessStandbyMessage(void)
+{
+       char            msgtype;
+
+       resetStringInfo(&reply_message);
+
+       /*
+        * Read the message contents.
+        */
+       if (pq_getmessage(&reply_message, 0))
        {
-               /* unexpected error or EOF */
                ereport(COMMERROR,
                                (errcode(ERRCODE_PROTOCOL_VIOLATION),
                                 errmsg("unexpected EOF on standby connection")));
                proc_exit(0);
        }
-       if (r == 0)
-       {
-               /* no data available without blocking */
-               return;
-       }
 
-       /* Handle the very limited subset of commands expected in this phase */
-       switch (firstchar)
+       /*
+        * Check message type from the first byte.
+        */
+       msgtype = pq_getmsgbyte(&reply_message);
+
+       switch (msgtype)
        {
-                       /*
-                        * 'X' means that the standby is closing down the socket.
-                        */
-               case 'X':
-                       proc_exit(0);
+               case 'r':
+                       ProcessStandbyReplyMessage();
+                       break;
+
+               case 'h':
+                       ProcessStandbyHSFeedbackMessage();
+                       break;
 
                default:
-                       ereport(FATAL,
+                       ereport(COMMERROR,
                                        (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                                        errmsg("invalid standby closing message type %d",
-                                                       firstchar)));
+                                        errmsg("unexpected message type \"%c\"", msgtype)));
+                       proc_exit(0);
        }
 }
 
+/*
+ * Regular reply from standby advising of WAL positions on standby server.
+ */
+static void
+ProcessStandbyReplyMessage(void)
+{
+       StandbyReplyMessage reply;
+
+       pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyReplyMessage));
+
+       elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X",
+                (uint32) (reply.write >> 32), (uint32) reply.write,
+                (uint32) (reply.flush >> 32), (uint32) reply.flush,
+                (uint32) (reply.apply >> 32), (uint32) reply.apply);
+
+       /*
+        * Update shared state for this WalSender process based on reply data from
+        * standby.
+        */
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile WalSnd *walsnd = MyWalSnd;
+
+               SpinLockAcquire(&walsnd->mutex);
+               walsnd->write = reply.write;
+               walsnd->flush = reply.flush;
+               walsnd->apply = reply.apply;
+               SpinLockRelease(&walsnd->mutex);
+       }
+
+       if (!am_cascading_walsender)
+               SyncRepReleaseWaiters();
+}
+
+/*
+ * Hot Standby feedback
+ */
+static void
+ProcessStandbyHSFeedbackMessage(void)
+{
+       StandbyHSFeedbackMessage reply;
+       TransactionId nextXid;
+       uint32          nextEpoch;
+
+       /* Decipher the reply message */
+       pq_copymsgbytes(&reply_message, (char *) &reply,
+                                       sizeof(StandbyHSFeedbackMessage));
+
+       elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
+                reply.xmin,
+                reply.epoch);
+
+       /* Ignore invalid xmin (can't actually happen with current walreceiver) */
+       if (!TransactionIdIsNormal(reply.xmin))
+               return;
+
+       /*
+        * Check that the provided xmin/epoch are sane, that is, not in the future
+        * and not so far back as to be already wrapped around.  Ignore if not.
+        *
+        * Epoch of nextXid should be same as standby, or if the counter has
+        * wrapped, then one greater than standby.
+        */
+       GetNextXidAndEpoch(&nextXid, &nextEpoch);
+
+       if (reply.xmin <= nextXid)
+       {
+               if (reply.epoch != nextEpoch)
+                       return;
+       }
+       else
+       {
+               if (reply.epoch + 1 != nextEpoch)
+                       return;
+       }
+
+       if (!TransactionIdPrecedesOrEquals(reply.xmin, nextXid))
+               return;                                 /* epoch OK, but it's wrapped around */
+
+       /*
+        * Set the WalSender's xmin equal to the standby's requested xmin, so that
+        * the xmin will be taken into account by GetOldestXmin.  This will hold
+        * back the removal of dead rows and thereby prevent the generation of
+        * cleanup conflicts on the standby server.
+        *
+        * There is a small window for a race condition here: although we just
+        * checked that reply.xmin precedes nextXid, the nextXid could have gotten
+        * advanced between our fetching it and applying the xmin below, perhaps
+        * far enough to make reply.xmin wrap around.  In that case the xmin we
+        * set here would be "in the future" and have no effect.  No point in
+        * worrying about this since it's too late to save the desired data
+        * anyway.      Assuming that the standby sends us an increasing sequence of
+        * xmins, this could only happen during the first reply cycle, else our
+        * own xmin would prevent nextXid from advancing so far.
+        *
+        * We don't bother taking the ProcArrayLock here.  Setting the xmin field
+        * is assumed atomic, and there's no real need to prevent a concurrent
+        * GetOldestXmin.  (If we're moving our xmin forward, this is obviously
+        * safe, and if we're moving it backwards, well, the data is at risk
+        * already since a VACUUM could have just finished calling GetOldestXmin.)
+        */
+       MyPgXact->xmin = reply.xmin;
+}
+
 /* Main loop of walsender process */
-static int
+static void
 WalSndLoop(void)
 {
        char       *output_message;
@@ -405,14 +724,26 @@ WalSndLoop(void)
         */
        output_message = palloc(1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
 
+       /*
+        * Allocate buffer that will be used for processing reply messages.  As
+        * above, do this just once to reduce palloc overhead.
+        */
+       initStringInfo(&reply_message);
+
+       /* Initialize the last reply timestamp */
+       last_reply_timestamp = GetCurrentTimestamp();
+
        /* Loop forever, unless we get an error */
        for (;;)
        {
+               /* Clear any already-pending wakeups */
+               ResetLatch(&MyWalSnd->latch);
+
                /*
                 * Emergency bailout if postmaster has died.  This is to avoid the
                 * necessity for manual cleanup of all postmaster children.
                 */
-               if (!PostmasterIsAlive(true))
+               if (!PostmasterIsAlive())
                        exit(1);
 
                /* Process any requests or signals received recently */
@@ -420,72 +751,130 @@ WalSndLoop(void)
                {
                        got_SIGHUP = false;
                        ProcessConfigFile(PGC_SIGHUP);
-               }
-
-               /*
-                * When SIGUSR2 arrives, we send all outstanding logs up to the
-                * shutdown checkpoint record (i.e., the latest record) and exit.
-                */
-               if (ready_to_stop)
-               {
-                       if (!XLogSend(output_message, &caughtup))
-                               break;
-                       if (caughtup)
-                               shutdown_requested = true;
+                       SyncRepInitConfig();
                }
 
                /* Normal exit from the walsender is here */
-               if (shutdown_requested)
+               if (walsender_shutdown_requested)
                {
-                       /* Inform the standby that XLOG streaming was done */
+                       /* Inform the standby that XLOG streaming is done */
                        pq_puttextmessage('C', "COPY 0");
                        pq_flush();
 
                        proc_exit(0);
                }
 
+               /* Check for input from the client */
+               ProcessRepliesIfAny();
+
                /*
-                * If we had sent all accumulated WAL in last round, nap for the
-                * configured time before retrying.
+                * If we don't have any pending data in the output buffer, try to send
+                * some more.  If there is some, we don't bother to call XLogSend
+                * again until we've flushed it ... but we'd better assume we are not
+                * caught up.
                 */
-               if (caughtup)
+               if (!pq_is_send_pending())
+                       XLogSend(output_message, &caughtup);
+               else
+                       caughtup = false;
+
+               /* Try to flush pending output to the client */
+               if (pq_flush_if_writable() != 0)
+                       break;
+
+               /* If nothing remains to be sent right now ... */
+               if (caughtup && !pq_is_send_pending())
                {
                        /*
-                        * Even if we wrote all the WAL that was available when we started
-                        * sending, more might have arrived while we were sending this
-                        * batch. We had the latch set while sending, so we have not
-                        * received any signals from that time. Let's arm the latch
-                        * again, and after that check that we're still up-to-date.
+                        * If we're in catchup state, move to streaming.  This is an
+                        * important state change for users to know about, since before
+                        * this point data loss might occur if the primary dies and we
+                        * need to failover to the standby. The state change is also
+                        * important for synchronous replication, since commits that
+                        * started to wait at that point might wait for some time.
                         */
-                       ResetLatch(&MyWalSnd->latch);
-
-                       if (!XLogSend(output_message, &caughtup))
-                               break;
-                       if (caughtup && !got_SIGHUP && !ready_to_stop && !shutdown_requested)
+                       if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
                        {
-                               /*
-                                * XXX: We don't really need the periodic wakeups anymore,
-                                * WaitLatchOrSocket should reliably wake up as soon as
-                                * something interesting happens.
-                                */
-
-                               /* Sleep */
-                               WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
-                                                                 WalSndDelay * 1000L);
+                               ereport(DEBUG1,
+                                        (errmsg("standby \"%s\" has now caught up with primary",
+                                                        application_name)));
+                               WalSndSetState(WALSNDSTATE_STREAMING);
                        }
 
-                       /* Check if the connection was closed */
-                       CheckClosedConnection();
+                       /*
+                        * When SIGUSR2 arrives, we send any outstanding logs up to the
+                        * shutdown checkpoint record (i.e., the latest record) and exit.
+                        * This may be a normal termination at shutdown, or a promotion,
+                        * the walsender is not sure which.
+                        */
+                       if (walsender_ready_to_stop)
+                       {
+                               /* ... let's just be real sure we're caught up ... */
+                               XLogSend(output_message, &caughtup);
+                               if (caughtup && !pq_is_send_pending())
+                               {
+                                       walsender_shutdown_requested = true;
+                                       continue;       /* don't want to wait more */
+                               }
+                       }
                }
-               else
+
+               /*
+                * We don't block if not caught up, unless there is unsent data
+                * pending in which case we'd better block until the socket is
+                * write-ready.  This test is only needed for the case where XLogSend
+                * loaded a subset of the available data but then pq_flush_if_writable
+                * flushed it all --- we should immediately try to send more.
+                */
+               if (caughtup || pq_is_send_pending())
                {
-                       /* Attempt to send the log once every loop */
-                       if (!XLogSend(output_message, &caughtup))
+                       TimestampTz timeout = 0;
+                       long            sleeptime = 10000;              /* 10 s */
+                       int                     wakeEvents;
+
+                       wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
+                               WL_SOCKET_READABLE | WL_TIMEOUT;
+
+                       if (pq_is_send_pending())
+                               wakeEvents |= WL_SOCKET_WRITEABLE;
+                       else
+                       {
+                               WalSndKeepalive(output_message);
+                               /* Try to flush pending output to the client */
+                               if (pq_flush_if_writable() != 0)
+                                       break;
+                       }
+
+                       /* Determine time until replication timeout */
+                       if (replication_timeout > 0)
+                       {
+                               timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
+                                                                                                         replication_timeout);
+                               sleeptime = 1 + (replication_timeout / 10);
+                       }
+
+                       /* Sleep until something happens or replication timeout */
+                       WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
+                                                         MyProcPort->sock, sleeptime);
+
+                       /*
+                        * Check for replication timeout.  Note we ignore the corner case
+                        * possibility that the client replied just as we reached the
+                        * timeout ... he's supposed to reply *before* that.
+                        */
+                       if (replication_timeout > 0 &&
+                               GetCurrentTimestamp() >= timeout)
+                       {
+                               /*
+                                * Since typically expiration of replication timeout means
+                                * communication problem, we don't send the error message to
+                                * the standby.
+                                */
+                               ereport(COMMERROR,
+                                               (errmsg("terminating walsender process due to replication timeout")));
                                break;
+                       }
                }
-
-               /* Update our state to indicate if we're behind or not */
-               WalSndSetState(caughtup ? WALSNDSTATE_STREAMING : WALSNDSTATE_CATCHUP);
        }
 
        /*
@@ -498,7 +887,7 @@ WalSndLoop(void)
                whereToSendOutput = DestNone;
 
        proc_exit(0);
-       return 1;                                       /* keep the compiler quiet */
+       abort();                                        /* keep the compiler quiet */
 }
 
 /* Initialize a per-walsender data structure for this walsender process */
@@ -575,20 +964,29 @@ WalSndKill(int code, Datum arg)
 }
 
 /*
- * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr'
+ * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
  *
  * XXX probably this should be improved to suck data directly from the
  * WAL buffers when possible.
+ *
+ * Will open, and keep open, one WAL segment stored in the global file
+ * descriptor sendFile. This means if XLogRead is used once, there will
+ * always be one descriptor left open until the process ends, but never
+ * more than one.
  */
-static void
-XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
+void
+XLogRead(char *buf, XLogRecPtr startptr, Size count)
 {
-       XLogRecPtr      startRecPtr = recptr;
-       char            path[MAXPGPATH];
-       uint32          lastRemovedLog;
-       uint32          lastRemovedSeg;
-       uint32          log;
-       uint32          seg;
+       char       *p;
+       XLogRecPtr      recptr;
+       Size            nbytes;
+       XLogSegNo       lastRemovedSegNo;
+       XLogSegNo       segno;
+
+retry:
+       p = buf;
+       recptr = startptr;
+       nbytes = count;
 
        while (nbytes > 0)
        {
@@ -596,16 +994,18 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
                int                     segbytes;
                int                     readbytes;
 
-               startoff = recptr.xrecoff % XLogSegSize;
+               startoff = recptr % XLogSegSize;
 
-               if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg))
+               if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
                {
+                       char            path[MAXPGPATH];
+
                        /* Switch to another logfile segment */
                        if (sendFile >= 0)
                                close(sendFile);
 
-                       XLByteToSeg(recptr, sendId, sendSeg);
-                       XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);
+                       XLByteToSeg(recptr, sendSegNo);
+                       XLogFilePath(path, ThisTimeLineID, sendSegNo);
 
                        sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
                        if (sendFile < 0)
@@ -616,20 +1016,15 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
                                 * removed or recycled.
                                 */
                                if (errno == ENOENT)
-                               {
-                                       char            filename[MAXFNAMELEN];
-
-                                       XLogFileName(filename, ThisTimeLineID, sendId, sendSeg);
                                        ereport(ERROR,
                                                        (errcode_for_file_access(),
                                                         errmsg("requested WAL segment %s has already been removed",
-                                                                       filename)));
-                               }
+                                                                       XLogFileNameP(ThisTimeLineID, sendSegNo))));
                                else
                                        ereport(ERROR,
                                                        (errcode_for_file_access(),
-                                                        errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
-                                                                       path, sendId, sendSeg)));
+                                                        errmsg("could not open file \"%s\": %m",
+                                                                       path)));
                        }
                        sendOff = 0;
                }
@@ -640,8 +1035,9 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
                        if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
                                ereport(ERROR,
                                                (errcode_for_file_access(),
-                                                errmsg("could not seek in log file %u, segment %u to offset %u: %m",
-                                                               sendId, sendSeg, startoff)));
+                                                errmsg("could not seek in log segment %s to offset %u: %m",
+                                                               XLogFileNameP(ThisTimeLineID, sendSegNo),
+                                                               startoff)));
                        sendOff = startoff;
                }
 
@@ -651,20 +1047,22 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
                else
                        segbytes = nbytes;
 
-               readbytes = read(sendFile, buf, segbytes);
+               readbytes = read(sendFile, p, segbytes);
                if (readbytes <= 0)
+               {
                        ereport(ERROR,
                                        (errcode_for_file_access(),
-                       errmsg("could not read from log file %u, segment %u, offset %u, "
-                                  "length %lu: %m",
-                                  sendId, sendSeg, sendOff, (unsigned long) segbytes)));
+                       errmsg("could not read from log segment %s, offset %u, length %lu: %m",
+                                  XLogFileNameP(ThisTimeLineID, sendSegNo),
+                                  sendOff, (unsigned long) segbytes)));
+               }
 
                /* Update state for read */
                XLByteAdvance(recptr, readbytes);
 
                sendOff += readbytes;
                nbytes -= readbytes;
-               buf += readbytes;
+               p += readbytes;
        }
 
        /*
@@ -674,24 +1072,45 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
         * read() succeeds in that case, but the data we tried to read might
         * already have been overwritten with new WAL records.
         */
-       XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
-       XLByteToSeg(startRecPtr, log, seg);
-       if (log < lastRemovedLog ||
-               (log == lastRemovedLog && seg <= lastRemovedSeg))
-       {
-               char            filename[MAXFNAMELEN];
-
-               XLogFileName(filename, ThisTimeLineID, log, seg);
+       XLogGetLastRemoved(&lastRemovedSegNo);
+       XLByteToSeg(startptr, segno);
+       if (segno <= lastRemovedSegNo)
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("requested WAL segment %s has already been removed",
-                                               filename)));
+                                               XLogFileNameP(ThisTimeLineID, segno))));
+
+       /*
+        * During recovery, the currently-open WAL file might be replaced with the
+        * file of the same name retrieved from archive. So we always need to
+        * check what we read was valid after reading into the buffer. If it's
+        * invalid, we try to open and read the file again.
+        */
+       if (am_cascading_walsender)
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile WalSnd *walsnd = MyWalSnd;
+               bool            reload;
+
+               SpinLockAcquire(&walsnd->mutex);
+               reload = walsnd->needreload;
+               walsnd->needreload = false;
+               SpinLockRelease(&walsnd->mutex);
+
+               if (reload && sendFile >= 0)
+               {
+                       close(sendFile);
+                       sendFile = -1;
+
+                       goto retry;
+               }
        }
 }
 
 /*
  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
- * but not yet sent to the client, and send it.
+ * but not yet sent to the client, and buffer it in the libpq output
+ * buffer.
  *
  * msgbuf is a work area in which the output message is constructed.  It's
  * passed in just so we can avoid re-palloc'ing the buffer on each cycle.
@@ -699,10 +1118,9 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
  *
  * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
  * *caughtup is set to false.
- *
- * Returns true if OK, false if trouble.
+
  */
-static bool
+static void
 XLogSend(char *msgbuf, bool *caughtup)
 {
        XLogRecPtr      SendRqstPtr;
@@ -719,13 +1137,13 @@ XLogSend(char *msgbuf, bool *caughtup)
         * subsequently crashes and restarts, slaves must not have applied any WAL
         * that gets lost on the master.
         */
-       SendRqstPtr = GetFlushRecPtr();
+       SendRqstPtr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetFlushRecPtr();
 
        /* Quick exit if nothing to do */
        if (XLByteLE(SendRqstPtr, sentPtr))
        {
                *caughtup = true;
-               return true;
+               return;
        }
 
        /*
@@ -740,25 +1158,8 @@ XLogSend(char *msgbuf, bool *caughtup)
         * SendRqstPtr never points to the middle of a WAL record.
         */
        startptr = sentPtr;
-       if (startptr.xrecoff >= XLogFileSize)
-       {
-               /*
-                * crossing a logid boundary, skip the non-existent last log segment
-                * in previous logical log file.
-                */
-               startptr.xlogid += 1;
-               startptr.xrecoff = 0;
-       }
-
        endptr = startptr;
        XLByteAdvance(endptr, MAX_SEND_SIZE);
-       if (endptr.xlogid != startptr.xlogid)
-       {
-               /* Don't cross a logfile boundary within one message */
-               Assert(endptr.xlogid == startptr.xlogid + 1);
-               endptr.xlogid = startptr.xlogid;
-               endptr.xrecoff = XLogFileSize;
-       }
 
        /* if we went beyond SendRqstPtr, back off */
        if (XLByteLE(SendRqstPtr, endptr))
@@ -769,11 +1170,11 @@ XLogSend(char *msgbuf, bool *caughtup)
        else
        {
                /* round down to page boundary. */
-               endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
+               endptr -= (endptr % XLOG_BLCKSZ);
                *caughtup = false;
        }
 
-       nbytes = endptr.xrecoff - startptr.xrecoff;
+       nbytes = endptr - startptr;
        Assert(nbytes <= MAX_SEND_SIZE);
 
        /*
@@ -797,11 +1198,7 @@ XLogSend(char *msgbuf, bool *caughtup)
 
        memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
 
-       pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
-
-       /* Flush pending output to the client */
-       if (pq_flush())
-               return false;
+       pq_putmessage_noblock('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
 
        sentPtr = endptr;
 
@@ -821,29 +1218,66 @@ XLogSend(char *msgbuf, bool *caughtup)
                char            activitymsg[50];
 
                snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
-                                sentPtr.xlogid, sentPtr.xrecoff);
+                                (uint32) (sentPtr >> 32), (uint32) sentPtr);
                set_ps_display(activitymsg, false);
        }
 
-       return true;
+       return;
+}
+
+/*
+ * Request walsenders to reload the currently-open WAL file
+ */
+void
+WalSndRqstFileReload(void)
+{
+       int                     i;
+
+       for (i = 0; i < max_wal_senders; i++)
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+
+               if (walsnd->pid == 0)
+                       continue;
+
+               SpinLockAcquire(&walsnd->mutex);
+               walsnd->needreload = true;
+               SpinLockRelease(&walsnd->mutex);
+       }
 }
 
 /* SIGHUP: set flag to re-read config file at next convenient time */
 static void
 WalSndSigHupHandler(SIGNAL_ARGS)
 {
+       int                     save_errno = errno;
+
        got_SIGHUP = true;
        if (MyWalSnd)
                SetLatch(&MyWalSnd->latch);
+
+       errno = save_errno;
 }
 
 /* SIGTERM: set flag to shut down */
 static void
 WalSndShutdownHandler(SIGNAL_ARGS)
 {
-       shutdown_requested = true;
+       int                     save_errno = errno;
+
+       walsender_shutdown_requested = true;
        if (MyWalSnd)
                SetLatch(&MyWalSnd->latch);
+
+       /*
+        * Set the standard (non-walsender) state as well, so that we can abort
+        * things like do_pg_stop_backup().
+        */
+       InterruptPending = true;
+       ProcDiePending = true;
+
+       errno = save_errno;
 }
 
 /*
@@ -882,16 +1316,24 @@ WalSndQuickDieHandler(SIGNAL_ARGS)
 static void
 WalSndXLogSendHandler(SIGNAL_ARGS)
 {
+       int                     save_errno = errno;
+
        latch_sigusr1_handler();
+
+       errno = save_errno;
 }
 
 /* SIGUSR2: set flag to do a last cycle and shut down afterwards */
 static void
 WalSndLastCycleHandler(SIGNAL_ARGS)
 {
-       ready_to_stop = true;
+       int                     save_errno = errno;
+
+       walsender_ready_to_stop = true;
        if (MyWalSnd)
                SetLatch(&MyWalSnd->latch);
+
+       errno = save_errno;
 }
 
 /* Set up signal handlers */
@@ -904,7 +1346,7 @@ WalSndSignals(void)
        pqsignal(SIGINT, SIG_IGN);      /* not used */
        pqsignal(SIGTERM, WalSndShutdownHandler);       /* request shutdown */
        pqsignal(SIGQUIT, WalSndQuickDieHandler);       /* hard crash time */
-       pqsignal(SIGALRM, SIG_IGN);
+       InitializeTimeouts();           /* establishes SIGALRM handler */
        pqsignal(SIGPIPE, SIG_IGN);
        pqsignal(SIGUSR1, WalSndXLogSendHandler);       /* request WAL sending */
        pqsignal(SIGUSR2, WalSndLastCycleHandler);      /* request a last cycle and
@@ -945,6 +1387,9 @@ WalSndShmemInit(void)
                /* First time through, so initialize */
                MemSet(WalSndCtl, 0, WalSndShmemSize());
 
+               for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
+                       SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
+
                for (i = 0; i < max_wal_senders; i++)
                {
                        WalSnd     *walsnd = &WalSndCtl->walsnds[i];
@@ -955,11 +1400,16 @@ WalSndShmemInit(void)
        }
 }
 
-/* Wake up all walsenders */
+/*
+ * Wake up all walsenders
+ *
+ * This will be called inside critical sections, so throwing an error is not
+ * adviseable.
+ */
 void
 WalSndWakeup(void)
 {
-       int             i;
+       int                     i;
 
        for (i = 0; i < max_wal_senders; i++)
                SetLatch(&WalSndCtl->walsnds[i].latch);
@@ -992,13 +1442,13 @@ WalSndGetStateString(WalSndState state)
        switch (state)
        {
                case WALSNDSTATE_STARTUP:
-                       return "STARTUP";
+                       return "startup";
                case WALSNDSTATE_BACKUP:
-                       return "BACKUP";
+                       return "backup";
                case WALSNDSTATE_CATCHUP:
-                       return "CATCHUP";
+                       return "catchup";
                case WALSNDSTATE_STREAMING:
-                       return "STREAMING";
+                       return "streaming";
        }
        return "UNKNOWN";
 }
@@ -1011,13 +1461,16 @@ WalSndGetStateString(WalSndState state)
 Datum
 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_WAL_SENDERS_COLS   3
-       ReturnSetInfo      *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
-       TupleDesc                       tupdesc;
-       Tuplestorestate    *tupstore;
-       MemoryContext           per_query_ctx;
-       MemoryContext           oldcontext;
-       int                                     i;
+#define PG_STAT_GET_WAL_SENDERS_COLS   8
+       ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+       TupleDesc       tupdesc;
+       Tuplestorestate *tupstore;
+       MemoryContext per_query_ctx;
+       MemoryContext oldcontext;
+       int                *sync_priority;
+       int                     priority = 0;
+       int                     sync_standby = -1;
+       int                     i;
 
        /* check to see if caller supports us returning a tuplestore */
        if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
@@ -1044,13 +1497,51 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 
        MemoryContextSwitchTo(oldcontext);
 
+       /*
+        * Get the priorities of sync standbys all in one go, to minimise lock
+        * acquisitions and to allow us to evaluate who is the current sync
+        * standby. This code must match the code in SyncRepReleaseWaiters().
+        */
+       sync_priority = palloc(sizeof(int) * max_wal_senders);
+       LWLockAcquire(SyncRepLock, LW_SHARED);
+       for (i = 0; i < max_wal_senders; i++)
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+
+               if (walsnd->pid != 0)
+               {
+                       /*
+                        * Treat a standby such as a pg_basebackup background process
+                        * which always returns an invalid flush location, as an
+                        * asynchronous standby.
+                        */
+                       sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
+                               0 : walsnd->sync_standby_priority;
+
+                       if (walsnd->state == WALSNDSTATE_STREAMING &&
+                               walsnd->sync_standby_priority > 0 &&
+                               (priority == 0 ||
+                                priority > walsnd->sync_standby_priority) &&
+                               !XLogRecPtrIsInvalid(walsnd->flush))
+                       {
+                               priority = walsnd->sync_standby_priority;
+                               sync_standby = i;
+                       }
+               }
+       }
+       LWLockRelease(SyncRepLock);
+
        for (i = 0; i < max_wal_senders; i++)
        {
                /* use volatile pointer to prevent code rearrangement */
                volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
-               char            sent_location[MAXFNAMELEN];
+               char            location[MAXFNAMELEN];
                XLogRecPtr      sentPtr;
-               WalSndState     state;
+               XLogRecPtr      write;
+               XLogRecPtr      flush;
+               XLogRecPtr      apply;
+               WalSndState state;
                Datum           values[PG_STAT_GET_WAL_SENDERS_COLS];
                bool            nulls[PG_STAT_GET_WAL_SENDERS_COLS];
 
@@ -1060,18 +1551,65 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
                SpinLockAcquire(&walsnd->mutex);
                sentPtr = walsnd->sentPtr;
                state = walsnd->state;
+               write = walsnd->write;
+               flush = walsnd->flush;
+               apply = walsnd->apply;
                SpinLockRelease(&walsnd->mutex);
 
-               snprintf(sent_location, sizeof(sent_location), "%X/%X",
-                                       sentPtr.xlogid, sentPtr.xrecoff);
-
                memset(nulls, 0, sizeof(nulls));
                values[0] = Int32GetDatum(walsnd->pid);
-               values[1] = CStringGetTextDatum(WalSndGetStateString(state));
-               values[2] = CStringGetTextDatum(sent_location);
+
+               if (!superuser())
+               {
+                       /*
+                        * Only superusers can see details. Other users only get the pid
+                        * value to know it's a walsender, but no details.
+                        */
+                       MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
+               }
+               else
+               {
+                       values[1] = CStringGetTextDatum(WalSndGetStateString(state));
+
+                       snprintf(location, sizeof(location), "%X/%X",
+                                        (uint32) (sentPtr >> 32), (uint32) sentPtr);
+                       values[2] = CStringGetTextDatum(location);
+
+                       if (write == 0)
+                               nulls[3] = true;
+                       snprintf(location, sizeof(location), "%X/%X",
+                                        (uint32) (write >> 32), (uint32) write);
+                       values[3] = CStringGetTextDatum(location);
+
+                       if (flush == 0)
+                               nulls[4] = true;
+                       snprintf(location, sizeof(location), "%X/%X",
+                                        (uint32) (flush >> 32), (uint32) flush);
+                       values[4] = CStringGetTextDatum(location);
+
+                       if (apply == 0)
+                               nulls[5] = true;
+                       snprintf(location, sizeof(location), "%X/%X",
+                                        (uint32) (apply >> 32), (uint32) apply);
+                       values[5] = CStringGetTextDatum(location);
+
+                       values[6] = Int32GetDatum(sync_priority[i]);
+
+                       /*
+                        * More easily understood version of standby state. This is purely
+                        * informational, not different from priority.
+                        */
+                       if (sync_priority[i] == 0)
+                               values[7] = CStringGetTextDatum("async");
+                       else if (i == sync_standby)
+                               values[7] = CStringGetTextDatum("sync");
+                       else
+                               values[7] = CStringGetTextDatum("potential");
+               }
 
                tuplestore_putvalues(tupstore, tupdesc, values, nulls);
        }
+       pfree(sync_priority);
 
        /* clean up and return the tuplestore */
        tuplestore_donestoring(tupstore);
@@ -1079,6 +1617,23 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
        return (Datum) 0;
 }
 
+static void
+WalSndKeepalive(char *msgbuf)
+{
+       PrimaryKeepaliveMessage keepalive_message;
+
+       /* Construct a new message */
+       keepalive_message.walEnd = sentPtr;
+       keepalive_message.sendTime = GetCurrentTimestamp();
+
+       elog(DEBUG2, "sending replication keepalive");
+
+       /* Prepend with the message type and send it. */
+       msgbuf[0] = 'k';
+       memcpy(msgbuf + 1, &keepalive_message, sizeof(PrimaryKeepaliveMessage));
+       pq_putmessage_noblock('d', msgbuf, sizeof(PrimaryKeepaliveMessage) + 1);
+}
+
 /*
  * This isn't currently used for anything. Monitoring tools might be
  * interested in the future, and we'll need something like this in the