]> granicus.if.org Git - postgresql/blobdiff - src/backend/replication/walsender.c
Introduce timeout handling framework
[postgresql] / src / backend / replication / walsender.c
index c8fd165dcb3c58607bfd5e6cca56fa0ae765737f..37a030b5f5e4e1536c44227672be05ffcff88537 100644 (file)
@@ -25,7 +25,7 @@
  * shutdown checkpoint record, and then exit.
  *
  *
- * Portions Copyright (c) 2010-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
  *
  * IDENTIFICATION
  *       src/backend/replication/walsender.c
@@ -63,6 +63,7 @@
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
 #include "utils/resowner.h"
+#include "utils/timeout.h"
 #include "utils/timestamp.h"
 
 
@@ -74,27 +75,31 @@ WalSnd         *MyWalSnd = NULL;
 
 /* Global state */
 bool           am_walsender = false;           /* Am I a walsender process ? */
-bool           am_cascading_walsender = false; /* Am I cascading WAL to another standby ? */
+bool           am_cascading_walsender = false;         /* Am I cascading WAL to
+                                                                                                * another standby ? */
 
 /* User-settable parameters for walsender */
 int                    max_wal_senders = 0;    /* the maximum number of concurrent walsenders */
 int                    replication_timeout = 60 * 1000;        /* maximum time to send one
                                                                                                 * WAL data message */
+/*
+ * State for WalSndWakeupRequest
+ */
+bool wake_wal_senders = false;
 
 /*
  * These variables are used similarly to openLogFile/Id/Seg/Off,
  * but for walsender to read the XLOG.
  */
 static int     sendFile = -1;
-static uint32 sendId = 0;
-static uint32 sendSeg = 0;
+static XLogSegNo sendSegNo = 0;
 static uint32 sendOff = 0;
 
 /*
  * How far have we sent WAL already? This is also advertised in
  * MyWalSnd->sentPtr.  (Actually, this is the next WAL location to send.)
  */
-static XLogRecPtr sentPtr = {0, 0};
+static XLogRecPtr sentPtr = 0;
 
 /*
  * Buffer for processing reply messages.
@@ -120,7 +125,7 @@ static void WalSndLastCycleHandler(SIGNAL_ARGS);
 
 /* Prototypes for private functions */
 static bool HandleReplicationCommand(const char *cmd_string);
-static int     WalSndLoop(void);
+static void WalSndLoop(void) __attribute__((noreturn));
 static void InitWalSnd(void);
 static void WalSndHandshake(void);
 static void WalSndKill(int code, Datum arg);
@@ -131,10 +136,11 @@ static void ProcessStandbyMessage(void);
 static void ProcessStandbyReplyMessage(void);
 static void ProcessStandbyHSFeedbackMessage(void);
 static void ProcessRepliesIfAny(void);
+static void WalSndKeepalive(char *msgbuf);
 
 
 /* Main entry point for walsender process */
-int
+void
 WalSenderMain(void)
 {
        MemoryContext walsnd_context;
@@ -191,7 +197,7 @@ WalSenderMain(void)
        SyncRepInitConfig();
 
        /* Main loop of walsender */
-       return WalSndLoop();
+       WalSndLoop();
 }
 
 /*
@@ -299,8 +305,7 @@ IdentifySystem(void)
 
        logptr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetInsertRecPtr();
 
-       snprintf(xpos, sizeof(xpos), "%X/%X",
-                        logptr.xlogid, logptr.xrecoff);
+       snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
 
        /* Send a RowDescription message */
        pq_beginmessage(&buf, 'T');
@@ -371,31 +376,31 @@ StartReplication(StartReplicationCmd *cmd)
        SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
 
        /*
-        * When promoting a cascading standby, postmaster sends SIGUSR2 to
-        * any cascading walsenders to kill them. But there is a corner-case where
-        * such walsender fails to receive SIGUSR2 and survives a standby promotion
-        * unexpectedly. This happens when postmaster sends SIGUSR2 before
-        * the walsender marks itself as a WAL sender, because postmaster sends
-        * SIGUSR2 to only the processes marked as a WAL sender.
+        * When promoting a cascading standby, postmaster sends SIGUSR2 to any
+        * cascading walsenders to kill them. But there is a corner-case where
+        * such walsender fails to receive SIGUSR2 and survives a standby
+        * promotion unexpectedly. This happens when postmaster sends SIGUSR2
+        * before the walsender marks itself as a WAL sender, because postmaster
+        * sends SIGUSR2 to only the processes marked as a WAL sender.
         *
         * To avoid this corner-case, if recovery is NOT in progress even though
         * the walsender is cascading one, we do the same thing as SIGUSR2 signal
         * handler does, i.e., set walsender_ready_to_stop to true. Which causes
         * the walsender to end later.
         *
-        * When terminating cascading walsenders, usually postmaster writes
-        * the log message announcing the terminations. But there is a race condition
-        * here. If there is no walsender except this process before reaching here,
-        * postmaster thinks that there is no walsender and suppresses that
+        * When terminating cascading walsenders, usually postmaster writes the
+        * log message announcing the terminations. But there is a race condition
+        * here. If there is no walsender except this process before reaching
+        * here, postmaster thinks that there is no walsender and suppresses that
         * log message. To handle this case, we always emit that log message here.
-        * This might cause duplicate log messages, but which is less likely to happen,
-        * so it's not worth writing some code to suppress them.
+        * This might cause duplicate log messages, but which is less likely to
+        * happen, so it's not worth writing some code to suppress them.
         */
        if (am_cascading_walsender && !RecoveryInProgress())
        {
                ereport(LOG,
-                               (errmsg("terminating walsender process to force cascaded standby "
-                                               "to update timeline and reconnect")));
+                  (errmsg("terminating walsender process to force cascaded standby "
+                                  "to update timeline and reconnect")));
                walsender_ready_to_stop = true;
        }
 
@@ -404,8 +409,8 @@ StartReplication(StartReplicationCmd *cmd)
         * log-shipping, since this is checked in PostmasterMain().
         *
         * NOTE: wal_level can only change at shutdown, so in most cases it is
-        * difficult for there to be WAL data that we can still see that was written
-        * at wal_level='minimal'.
+        * difficult for there to be WAL data that we can still see that was
+        * written at wal_level='minimal'.
         */
 
        /*
@@ -612,9 +617,9 @@ ProcessStandbyReplyMessage(void)
        pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyReplyMessage));
 
        elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X",
-                reply.write.xlogid, reply.write.xrecoff,
-                reply.flush.xlogid, reply.flush.xrecoff,
-                reply.apply.xlogid, reply.apply.xrecoff);
+                (uint32) (reply.write >> 32), (uint32) reply.write,
+                (uint32) (reply.flush >> 32), (uint32) reply.flush,
+                (uint32) (reply.apply >> 32), (uint32) reply.apply);
 
        /*
         * Update shared state for this WalSender process based on reply data from
@@ -642,80 +647,71 @@ static void
 ProcessStandbyHSFeedbackMessage(void)
 {
        StandbyHSFeedbackMessage reply;
-       TransactionId newxmin = InvalidTransactionId;
+       TransactionId nextXid;
+       uint32          nextEpoch;
 
-       pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyHSFeedbackMessage));
+       /* Decipher the reply message */
+       pq_copymsgbytes(&reply_message, (char *) &reply,
+                                       sizeof(StandbyHSFeedbackMessage));
 
        elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
                 reply.xmin,
                 reply.epoch);
 
+       /* Ignore invalid xmin (can't actually happen with current walreceiver) */
+       if (!TransactionIdIsNormal(reply.xmin))
+               return;
+
        /*
-        * Update the WalSender's proc xmin to allow it to be visible to
-        * snapshots. This will hold back the removal of dead rows and thereby
-        * prevent the generation of cleanup conflicts on the standby server.
+        * Check that the provided xmin/epoch are sane, that is, not in the future
+        * and not so far back as to be already wrapped around.  Ignore if not.
+        *
+        * Epoch of nextXid should be same as standby, or if the counter has
+        * wrapped, then one greater than standby.
         */
-       if (TransactionIdIsValid(reply.xmin))
-       {
-               TransactionId nextXid;
-               uint32          nextEpoch;
-               bool            epochOK = false;
-
-               GetNextXidAndEpoch(&nextXid, &nextEpoch);
-
-               /*
-                * Epoch of oldestXmin should be same as standby or if the counter has
-                * wrapped, then one less than reply.
-                */
-               if (reply.xmin <= nextXid)
-               {
-                       if (reply.epoch == nextEpoch)
-                               epochOK = true;
-               }
-               else
-               {
-                       if (nextEpoch > 0 && reply.epoch == nextEpoch - 1)
-                               epochOK = true;
-               }
+       GetNextXidAndEpoch(&nextXid, &nextEpoch);
 
-               /*
-                * Feedback from standby must not go backwards, nor should it go
-                * forwards further than our most recent xid.
-                */
-               if (epochOK && TransactionIdPrecedesOrEquals(reply.xmin, nextXid))
-               {
-                       if (!TransactionIdIsValid(MyProc->xmin))
-                       {
-                               TransactionId oldestXmin = GetOldestXmin(true, true);
-
-                               if (TransactionIdPrecedes(oldestXmin, reply.xmin))
-                                       newxmin = reply.xmin;
-                               else
-                                       newxmin = oldestXmin;
-                       }
-                       else
-                       {
-                               if (TransactionIdPrecedes(MyProc->xmin, reply.xmin))
-                                       newxmin = reply.xmin;
-                               else
-                                       newxmin = MyProc->xmin;         /* stay the same */
-                       }
-               }
+       if (reply.xmin <= nextXid)
+       {
+               if (reply.epoch != nextEpoch)
+                       return;
+       }
+       else
+       {
+               if (reply.epoch + 1 != nextEpoch)
+                       return;
        }
 
+       if (!TransactionIdPrecedesOrEquals(reply.xmin, nextXid))
+               return;                                 /* epoch OK, but it's wrapped around */
+
        /*
-        * Grab the ProcArrayLock to set xmin, or invalidate for bad reply
+        * Set the WalSender's xmin equal to the standby's requested xmin, so that
+        * the xmin will be taken into account by GetOldestXmin.  This will hold
+        * back the removal of dead rows and thereby prevent the generation of
+        * cleanup conflicts on the standby server.
+        *
+        * There is a small window for a race condition here: although we just
+        * checked that reply.xmin precedes nextXid, the nextXid could have gotten
+        * advanced between our fetching it and applying the xmin below, perhaps
+        * far enough to make reply.xmin wrap around.  In that case the xmin we
+        * set here would be "in the future" and have no effect.  No point in
+        * worrying about this since it's too late to save the desired data
+        * anyway.      Assuming that the standby sends us an increasing sequence of
+        * xmins, this could only happen during the first reply cycle, else our
+        * own xmin would prevent nextXid from advancing so far.
+        *
+        * We don't bother taking the ProcArrayLock here.  Setting the xmin field
+        * is assumed atomic, and there's no real need to prevent a concurrent
+        * GetOldestXmin.  (If we're moving our xmin forward, this is obviously
+        * safe, and if we're moving it backwards, well, the data is at risk
+        * already since a VACUUM could have just finished calling GetOldestXmin.)
         */
-       if (MyProc->xmin != newxmin)
-       {
-               LWLockAcquire(ProcArrayLock, LW_SHARED);
-               MyProc->xmin = newxmin;
-               LWLockRelease(ProcArrayLock);
-       }
+       MyPgXact->xmin = reply.xmin;
 }
 
 /* Main loop of walsender process */
-static int
+static void
 WalSndLoop(void)
 {
        char       *output_message;
@@ -800,8 +796,8 @@ WalSndLoop(void)
                        if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
                        {
                                ereport(DEBUG1,
-                                               (errmsg("standby \"%s\" has now caught up with primary",
-                                                               application_name)));
+                                        (errmsg("standby \"%s\" has now caught up with primary",
+                                                        application_name)));
                                WalSndSetState(WALSNDSTATE_STREAMING);
                        }
 
@@ -818,7 +814,7 @@ WalSndLoop(void)
                                if (caughtup && !pq_is_send_pending())
                                {
                                        walsender_shutdown_requested = true;
-                                       continue;               /* don't want to wait more */
+                                       continue;       /* don't want to wait more */
                                }
                        }
                }
@@ -832,30 +828,29 @@ WalSndLoop(void)
                 */
                if (caughtup || pq_is_send_pending())
                {
-                       TimestampTz finish_time = 0;
-                       long            sleeptime = -1;
+                       TimestampTz timeout = 0;
+                       long            sleeptime = 10000;              /* 10 s */
                        int                     wakeEvents;
 
                        wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
-                               WL_SOCKET_READABLE;
+                               WL_SOCKET_READABLE | WL_TIMEOUT;
+
                        if (pq_is_send_pending())
                                wakeEvents |= WL_SOCKET_WRITEABLE;
+                       else
+                       {
+                               WalSndKeepalive(output_message);
+                               /* Try to flush pending output to the client */
+                               if (pq_flush_if_writable() != 0)
+                                       break;
+                       }
 
                        /* Determine time until replication timeout */
                        if (replication_timeout > 0)
                        {
-                               long            secs;
-                               int                     usecs;
-
-                               finish_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
-                                                                                                                 replication_timeout);
-                               TimestampDifference(GetCurrentTimestamp(),
-                                                                       finish_time, &secs, &usecs);
-                               sleeptime = secs * 1000 + usecs / 1000;
-                               /* Avoid Assert in WaitLatchOrSocket if timeout is past */
-                               if (sleeptime < 0)
-                                       sleeptime = 0;
-                               wakeEvents |= WL_TIMEOUT;
+                               timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
+                                                                                                         replication_timeout);
+                               sleeptime = 1 + (replication_timeout / 10);
                        }
 
                        /* Sleep until something happens or replication timeout */
@@ -868,7 +863,7 @@ WalSndLoop(void)
                         * timeout ... he's supposed to reply *before* that.
                         */
                        if (replication_timeout > 0 &&
-                               GetCurrentTimestamp() >= finish_time)
+                               GetCurrentTimestamp() >= timeout)
                        {
                                /*
                                 * Since typically expiration of replication timeout means
@@ -892,7 +887,7 @@ WalSndLoop(void)
                whereToSendOutput = DestNone;
 
        proc_exit(0);
-       return 1;                                       /* keep the compiler quiet */
+       abort();                                        /* keep the compiler quiet */
 }
 
 /* Initialize a per-walsender data structure for this walsender process */
@@ -982,13 +977,11 @@ WalSndKill(int code, Datum arg)
 void
 XLogRead(char *buf, XLogRecPtr startptr, Size count)
 {
-       char               *p;
+       char       *p;
        XLogRecPtr      recptr;
-       Size                    nbytes;
-       uint32          lastRemovedLog;
-       uint32          lastRemovedSeg;
-       uint32          log;
-       uint32          seg;
+       Size            nbytes;
+       XLogSegNo       lastRemovedSegNo;
+       XLogSegNo       segno;
 
 retry:
        p = buf;
@@ -1001,9 +994,9 @@ retry:
                int                     segbytes;
                int                     readbytes;
 
-               startoff = recptr.xrecoff % XLogSegSize;
+               startoff = recptr % XLogSegSize;
 
-               if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg))
+               if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
                {
                        char            path[MAXPGPATH];
 
@@ -1011,8 +1004,8 @@ retry:
                        if (sendFile >= 0)
                                close(sendFile);
 
-                       XLByteToSeg(recptr, sendId, sendSeg);
-                       XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);
+                       XLByteToSeg(recptr, sendSegNo);
+                       XLogFilePath(path, ThisTimeLineID, sendSegNo);
 
                        sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
                        if (sendFile < 0)
@@ -1023,20 +1016,15 @@ retry:
                                 * removed or recycled.
                                 */
                                if (errno == ENOENT)
-                               {
-                                       char            filename[MAXFNAMELEN];
-
-                                       XLogFileName(filename, ThisTimeLineID, sendId, sendSeg);
                                        ereport(ERROR,
                                                        (errcode_for_file_access(),
                                                         errmsg("requested WAL segment %s has already been removed",
-                                                                       filename)));
-                               }
+                                                                       XLogFileNameP(ThisTimeLineID, sendSegNo))));
                                else
                                        ereport(ERROR,
                                                        (errcode_for_file_access(),
-                                                        errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
-                                                                       path, sendId, sendSeg)));
+                                                        errmsg("could not open file \"%s\": %m",
+                                                                       path)));
                        }
                        sendOff = 0;
                }
@@ -1047,8 +1035,9 @@ retry:
                        if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
                                ereport(ERROR,
                                                (errcode_for_file_access(),
-                                                errmsg("could not seek in log file %u, segment %u to offset %u: %m",
-                                                               sendId, sendSeg, startoff)));
+                                                errmsg("could not seek in log segment %s to offset %u: %m",
+                                                               XLogFileNameP(ThisTimeLineID, sendSegNo),
+                                                               startoff)));
                        sendOff = startoff;
                }
 
@@ -1060,11 +1049,13 @@ retry:
 
                readbytes = read(sendFile, p, segbytes);
                if (readbytes <= 0)
+               {
                        ereport(ERROR,
                                        (errcode_for_file_access(),
-                       errmsg("could not read from log file %u, segment %u, offset %u, "
-                                  "length %lu: %m",
-                                  sendId, sendSeg, sendOff, (unsigned long) segbytes)));
+                       errmsg("could not read from log segment %s, offset %u, length %lu: %m",
+                                  XLogFileNameP(ThisTimeLineID, sendSegNo),
+                                  sendOff, (unsigned long) segbytes)));
+               }
 
                /* Update state for read */
                XLByteAdvance(recptr, readbytes);
@@ -1081,24 +1072,18 @@ retry:
         * read() succeeds in that case, but the data we tried to read might
         * already have been overwritten with new WAL records.
         */
-       XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
-       XLByteToSeg(startptr, log, seg);
-       if (log < lastRemovedLog ||
-               (log == lastRemovedLog && seg <= lastRemovedSeg))
-       {
-               char            filename[MAXFNAMELEN];
-
-               XLogFileName(filename, ThisTimeLineID, log, seg);
+       XLogGetLastRemoved(&lastRemovedSegNo);
+       XLByteToSeg(startptr, segno);
+       if (segno <= lastRemovedSegNo)
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("requested WAL segment %s has already been removed",
-                                               filename)));
-       }
+                                               XLogFileNameP(ThisTimeLineID, segno))));
 
        /*
-        * During recovery, the currently-open WAL file might be replaced with
-        * the file of the same name retrieved from archive. So we always need
-        * to check what we read was valid after reading into the buffer. If it's
+        * During recovery, the currently-open WAL file might be replaced with the
+        * file of the same name retrieved from archive. So we always need to
+        * check what we read was valid after reading into the buffer. If it's
         * invalid, we try to open and read the file again.
         */
        if (am_cascading_walsender)
@@ -1173,25 +1158,8 @@ XLogSend(char *msgbuf, bool *caughtup)
         * SendRqstPtr never points to the middle of a WAL record.
         */
        startptr = sentPtr;
-       if (startptr.xrecoff >= XLogFileSize)
-       {
-               /*
-                * crossing a logid boundary, skip the non-existent last log segment
-                * in previous logical log file.
-                */
-               startptr.xlogid += 1;
-               startptr.xrecoff = 0;
-       }
-
        endptr = startptr;
        XLByteAdvance(endptr, MAX_SEND_SIZE);
-       if (endptr.xlogid != startptr.xlogid)
-       {
-               /* Don't cross a logfile boundary within one message */
-               Assert(endptr.xlogid == startptr.xlogid + 1);
-               endptr.xlogid = startptr.xlogid;
-               endptr.xrecoff = XLogFileSize;
-       }
 
        /* if we went beyond SendRqstPtr, back off */
        if (XLByteLE(SendRqstPtr, endptr))
@@ -1202,11 +1170,11 @@ XLogSend(char *msgbuf, bool *caughtup)
        else
        {
                /* round down to page boundary. */
-               endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
+               endptr -= (endptr % XLOG_BLCKSZ);
                *caughtup = false;
        }
 
-       nbytes = endptr.xrecoff - startptr.xrecoff;
+       nbytes = endptr - startptr;
        Assert(nbytes <= MAX_SEND_SIZE);
 
        /*
@@ -1250,7 +1218,7 @@ XLogSend(char *msgbuf, bool *caughtup)
                char            activitymsg[50];
 
                snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
-                                sentPtr.xlogid, sentPtr.xrecoff);
+                                (uint32) (sentPtr >> 32), (uint32) sentPtr);
                set_ps_display(activitymsg, false);
        }
 
@@ -1303,8 +1271,8 @@ WalSndShutdownHandler(SIGNAL_ARGS)
                SetLatch(&MyWalSnd->latch);
 
        /*
-        * Set the standard (non-walsender) state as well, so that we can
-        * abort things like do_pg_stop_backup().
+        * Set the standard (non-walsender) state as well, so that we can abort
+        * things like do_pg_stop_backup().
         */
        InterruptPending = true;
        ProcDiePending = true;
@@ -1378,7 +1346,7 @@ WalSndSignals(void)
        pqsignal(SIGINT, SIG_IGN);      /* not used */
        pqsignal(SIGTERM, WalSndShutdownHandler);       /* request shutdown */
        pqsignal(SIGQUIT, WalSndQuickDieHandler);       /* hard crash time */
-       pqsignal(SIGALRM, SIG_IGN);
+       InitializeTimeouts();           /* establishes SIGALRM handler */
        pqsignal(SIGPIPE, SIG_IGN);
        pqsignal(SIGUSR1, WalSndXLogSendHandler);       /* request WAL sending */
        pqsignal(SIGUSR2, WalSndLastCycleHandler);      /* request a last cycle and
@@ -1419,7 +1387,8 @@ WalSndShmemInit(void)
                /* First time through, so initialize */
                MemSet(WalSndCtl, 0, WalSndShmemSize());
 
-               SHMQueueInit(&(WalSndCtl->SyncRepQueue));
+               for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
+                       SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
 
                for (i = 0; i < max_wal_senders; i++)
                {
@@ -1431,7 +1400,12 @@ WalSndShmemInit(void)
        }
 }
 
-/* Wake up all walsenders */
+/*
+ * Wake up all walsenders
+ *
+ * This will be called inside critical sections, so throwing an error is not
+ * adviseable.
+ */
 void
 WalSndWakeup(void)
 {
@@ -1537,12 +1511,19 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 
                if (walsnd->pid != 0)
                {
-                       sync_priority[i] = walsnd->sync_standby_priority;
+                       /*
+                        * Treat a standby such as a pg_basebackup background process
+                        * which always returns an invalid flush location, as an
+                        * asynchronous standby.
+                        */
+                       sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
+                               0 : walsnd->sync_standby_priority;
 
                        if (walsnd->state == WALSNDSTATE_STREAMING &&
                                walsnd->sync_standby_priority > 0 &&
                                (priority == 0 ||
-                                priority > walsnd->sync_standby_priority))
+                                priority > walsnd->sync_standby_priority) &&
+                               !XLogRecPtrIsInvalid(walsnd->flush))
                        {
                                priority = walsnd->sync_standby_priority;
                                sync_standby = i;
@@ -1591,25 +1572,25 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
                        values[1] = CStringGetTextDatum(WalSndGetStateString(state));
 
                        snprintf(location, sizeof(location), "%X/%X",
-                                        sentPtr.xlogid, sentPtr.xrecoff);
+                                        (uint32) (sentPtr >> 32), (uint32) sentPtr);
                        values[2] = CStringGetTextDatum(location);
 
-                       if (write.xlogid == 0 && write.xrecoff == 0)
+                       if (write == 0)
                                nulls[3] = true;
                        snprintf(location, sizeof(location), "%X/%X",
-                                        write.xlogid, write.xrecoff);
+                                        (uint32) (write >> 32), (uint32) write);
                        values[3] = CStringGetTextDatum(location);
 
-                       if (flush.xlogid == 0 && flush.xrecoff == 0)
+                       if (flush == 0)
                                nulls[4] = true;
                        snprintf(location, sizeof(location), "%X/%X",
-                                        flush.xlogid, flush.xrecoff);
+                                        (uint32) (flush >> 32), (uint32) flush);
                        values[4] = CStringGetTextDatum(location);
 
-                       if (apply.xlogid == 0 && apply.xrecoff == 0)
+                       if (apply == 0)
                                nulls[5] = true;
                        snprintf(location, sizeof(location), "%X/%X",
-                                        apply.xlogid, apply.xrecoff);
+                                        (uint32) (apply >> 32), (uint32) apply);
                        values[5] = CStringGetTextDatum(location);
 
                        values[6] = Int32GetDatum(sync_priority[i]);
@@ -1636,6 +1617,23 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
        return (Datum) 0;
 }
 
+static void
+WalSndKeepalive(char *msgbuf)
+{
+       PrimaryKeepaliveMessage keepalive_message;
+
+       /* Construct a new message */
+       keepalive_message.walEnd = sentPtr;
+       keepalive_message.sendTime = GetCurrentTimestamp();
+
+       elog(DEBUG2, "sending replication keepalive");
+
+       /* Prepend with the message type and send it. */
+       msgbuf[0] = 'k';
+       memcpy(msgbuf + 1, &keepalive_message, sizeof(PrimaryKeepaliveMessage));
+       pq_putmessage_noblock('d', msgbuf, sizeof(PrimaryKeepaliveMessage) + 1);
+}
+
 /*
  * This isn't currently used for anything. Monitoring tools might be
  * interested in the future, and we'll need something like this in the