]> granicus.if.org Git - postgresql/commitdiff
Follow TLI of last replayed record, not recovery target TLI, in walsenders.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Thu, 20 Dec 2012 12:23:31 +0000 (14:23 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Thu, 20 Dec 2012 12:39:04 +0000 (14:39 +0200)
Most of the time, the last replayed record comes from the recovery target
timeline, but there is a corner case where it makes a difference. When
the startup process scans for a new timeline, and decides to change recovery
target timeline, there is a window where the recovery target TLI has already
been bumped, but there are no WAL segments from the new timeline in pg_xlog
yet. For example, if we have just replayed up to point 0/30002D8, on
timeline 1, there is a WAL file called 000000010000000000000003 in pg_xlog
that contains the WAL up to that point. When recovery switches recovery
target timeline to 2, a walsender can immediately try to read WAL from
0/30002D8, from timeline 2, so it will try to open WAL file
000000020000000000000003. However, that doesn't exist yet - the startup
process hasn't copied that file from the archive yet nor has the walreceiver
streamed it yet, so walsender fails with error "requested WAL segment
000000020000000000000003 has already been removed". That's harmless, in that
the standby will try to reconnect later and by that time the segment is
already created, but error messages that should be ignored are not good.

To fix that, have walsender track the TLI of the last replayed record,
instead of the recovery target timeline. That way walsender will not try to
read anything from timeline 2, until the WAL segment has been created and at
least one record has been replayed from it. The recovery target timeline is
now xlog.c's internal affair, it doesn't need to be exposed in shared memory
anymore.

This fixes the error reported by Thom Brown. depesz the same error message,
but I'm not sure if this fixes his scenario.

src/backend/access/transam/xlog.c
src/backend/access/transam/xlogfuncs.c
src/backend/replication/walreceiver.c
src/backend/replication/walreceiverfuncs.c
src/backend/replication/walsender.c
src/include/access/xlog.h

index d7e83a937c991e154a16438c15df55387e0b5119..d808607ecdbb31c90f3a20c14eb01b0e6ca0fb12 100644 (file)
@@ -453,6 +453,7 @@ typedef struct XLogCtlData
         * replayed, otherwise it's equal to lastReplayedEndRecPtr.
         */
        XLogRecPtr      lastReplayedEndRecPtr;
+       TimeLineID      lastReplayedTLI;
        XLogRecPtr      replayEndRecPtr;
        TimeLineID      replayEndTLI;
        /* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
@@ -3829,7 +3830,6 @@ rescanLatestTimeLine(void)
        TimeLineID      newtarget;
        TimeLineHistoryEntry *currentTle = NULL;
        /* use volatile pointer to prevent code rearrangement */
-       volatile XLogCtlData *xlogctl = XLogCtl;
 
        newtarget = findNewestTimeLine(recoveryTargetTLI);
        if (newtarget == recoveryTargetTLI)
@@ -3888,20 +3888,10 @@ rescanLatestTimeLine(void)
        list_free_deep(expectedTLEs);
        expectedTLEs = newExpectedTLEs;
 
-       SpinLockAcquire(&xlogctl->info_lck);
-       xlogctl->RecoveryTargetTLI = recoveryTargetTLI;
-       SpinLockRelease(&xlogctl->info_lck);
-
        ereport(LOG,
                        (errmsg("new target timeline is %u",
                                        recoveryTargetTLI)));
 
-       /*
-        * Wake up any walsenders to notice that we have a new target timeline.
-        */
-       if (AllowCascadeReplication())
-               WalSndWakeup();
-
        return true;
 }
 
@@ -5389,11 +5379,9 @@ StartupXLOG(void)
                                                ControlFile->minRecoveryPointTLI)));
 
        /*
-        * Save the selected recovery target timeline ID and
-        * archive_cleanup_command in shared memory so that other processes can
-        * see them
+        * Save archive_cleanup_command in shared memory so that other processes
+        * can see it.
         */
-       XLogCtl->RecoveryTargetTLI = recoveryTargetTLI;
        strncpy(XLogCtl->archiveCleanupCommand,
                        archiveCleanupCommand ? archiveCleanupCommand : "",
                        sizeof(XLogCtl->archiveCleanupCommand));
@@ -5770,6 +5758,7 @@ StartupXLOG(void)
                xlogctl->replayEndRecPtr = ReadRecPtr;
                xlogctl->replayEndTLI = ThisTimeLineID;
                xlogctl->lastReplayedEndRecPtr = EndRecPtr;
+               xlogctl->lastReplayedEndRecPtr = ThisTimeLineID;
                xlogctl->recoveryLastXTime = 0;
                xlogctl->currentChunkStartTime = 0;
                xlogctl->recoveryPause = false;
@@ -5837,6 +5826,7 @@ StartupXLOG(void)
                         */
                        do
                        {
+                               bool switchedTLI = false;
 #ifdef WAL_DEBUG
                                if (XLOG_DEBUG ||
                                 (rmid == RM_XACT_ID && trace_recovery_messages <= DEBUG2) ||
@@ -5942,6 +5932,7 @@ StartupXLOG(void)
 
                                                /* Following WAL records should be run with new TLI */
                                                ThisTimeLineID = newTLI;
+                                               switchedTLI = true;
                                        }
                                }
 
@@ -5974,6 +5965,7 @@ StartupXLOG(void)
                                 */
                                SpinLockAcquire(&xlogctl->info_lck);
                                xlogctl->lastReplayedEndRecPtr = EndRecPtr;
+                               xlogctl->lastReplayedTLI = ThisTimeLineID;
                                SpinLockRelease(&xlogctl->info_lck);
 
                                /* Remember this record as the last-applied one */
@@ -5982,6 +5974,13 @@ StartupXLOG(void)
                                /* Allow read-only connections if we're consistent now */
                                CheckRecoveryConsistency();
 
+                               /*
+                                * If this record was a timeline switch, wake up any
+                                * walsenders to notice that we are on a new timeline.
+                                */
+                               if (switchedTLI && AllowCascadeReplication())
+                                       WalSndWakeup();
+
                                /* Exit loop if we reached inclusive recovery target */
                                if (!recoveryContinue)
                                        break;
@@ -6822,23 +6821,6 @@ GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
        *epoch = ckptXidEpoch;
 }
 
-/*
- * GetRecoveryTargetTLI - get the current recovery target timeline ID
- */
-TimeLineID
-GetRecoveryTargetTLI(void)
-{
-       /* use volatile pointer to prevent code rearrangement */
-       volatile XLogCtlData *xlogctl = XLogCtl;
-       TimeLineID result;
-
-       SpinLockAcquire(&xlogctl->info_lck);
-       result = xlogctl->RecoveryTargetTLI;
-       SpinLockRelease(&xlogctl->info_lck);
-
-       return result;
-}
-
 /*
  * This must be called ONCE during postmaster or standalone-backend shutdown
  */
@@ -7642,10 +7624,16 @@ CreateRestartPoint(int flags)
         */
        if (_logSegNo)
        {
+               XLogRecPtr      receivePtr;
+               XLogRecPtr      replayPtr;
                XLogRecPtr      endptr;
 
-               /* Get the current (or recent) end of xlog */
-               endptr = GetStandbyFlushRecPtr();
+               /*
+                * Get the current end of xlog replayed or received, whichever is later.
+                */
+               receivePtr = GetWalRcvWriteRecPtr(NULL, NULL);
+               replayPtr = GetXLogReplayRecPtr(NULL);
+               endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
 
                KeepLogSeg(endptr, &_logSegNo);
                _logSegNo--;
@@ -9109,38 +9097,23 @@ do_pg_abort_backup(void)
  * Exported to allow WALReceiver to read the pointer directly.
  */
 XLogRecPtr
-GetXLogReplayRecPtr(void)
+GetXLogReplayRecPtr(TimeLineID *replayTLI)
 {
        /* use volatile pointer to prevent code rearrangement */
        volatile XLogCtlData *xlogctl = XLogCtl;
        XLogRecPtr      recptr;
+       TimeLineID      tli;
 
        SpinLockAcquire(&xlogctl->info_lck);
        recptr = xlogctl->lastReplayedEndRecPtr;
+       tli = xlogctl->lastReplayedTLI;
        SpinLockRelease(&xlogctl->info_lck);
 
+       if (replayTLI)
+               *replayTLI = tli;
        return recptr;
 }
 
-/*
- * Get current standby flush position, ie, the last WAL position
- * known to be fsync'd to disk in standby.
- */
-XLogRecPtr
-GetStandbyFlushRecPtr(void)
-{
-       XLogRecPtr      receivePtr;
-       XLogRecPtr      replayPtr;
-
-       receivePtr = GetWalRcvWriteRecPtr(NULL, NULL);
-       replayPtr = GetXLogReplayRecPtr();
-
-       if (XLByteLT(receivePtr, replayPtr))
-               return replayPtr;
-       else
-               return receivePtr;
-}
-
 /*
  * Get latest WAL insert pointer
  */
index e91bdc3f4af93b6b56367a0d2010651f49d95c4c..47624c3e75fdc0b922ad83ff58eda2e60c340233 100644 (file)
@@ -248,7 +248,7 @@ pg_last_xlog_replay_location(PG_FUNCTION_ARGS)
        XLogRecPtr      recptr;
        char            location[MAXFNAMELEN];
 
-       recptr = GetXLogReplayRecPtr();
+       recptr = GetXLogReplayRecPtr(NULL);
 
        if (recptr == 0)
                PG_RETURN_NULL();
index 303edb75a32061c0d268c1e5fe6334be4f1f9049..a0960f2ceab20bc44fe8b41acdbda30df78fb03e 100644 (file)
@@ -370,7 +370,7 @@ WalReceiverMain(void)
                        first_stream = false;
 
                        /* Initialize LogstreamResult and buffers for processing messages */
-                       LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr();
+                       LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
                        initStringInfo(&reply_message);
                        initStringInfo(&incoming_message);
 
@@ -1026,7 +1026,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
        /* Construct a new message */
        writePtr = LogstreamResult.Write;
        flushPtr = LogstreamResult.Flush;
-       applyPtr = GetXLogReplayRecPtr();
+       applyPtr = GetXLogReplayRecPtr(NULL);
 
        resetStringInfo(&reply_message);
        pq_sendbyte(&reply_message, 'r');
index a8ccfc66398bab64b90c8443182a0a5f1ca10d43..1aaafbb49fc0424ecb8aef6fa514c48e15950189 100644 (file)
@@ -324,7 +324,7 @@ GetReplicationApplyDelay(void)
        receivePtr = walrcv->receivedUpto;
        SpinLockRelease(&walrcv->mutex);
 
-       replayPtr = GetXLogReplayRecPtr();
+       replayPtr = GetXLogReplayRecPtr(NULL);
 
        if (XLByteEQ(receivePtr, replayPtr))
                return 0;
index aec57f5535fc42f17330fc066c1e10175663f963..29a25eb9035e1234d8473a7ed0b86804e39fb375 100644 (file)
@@ -169,6 +169,7 @@ static void WalSndLoop(void);
 static void InitWalSenderSlot(void);
 static void WalSndKill(int code, Datum arg);
 static void XLogSend(bool *caughtup);
+static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID currentTLI);
 static void IdentifySystem(void);
 static void StartReplication(StartReplicationCmd *cmd);
 static void ProcessStandbyMessage(void);
@@ -190,12 +191,6 @@ InitWalSender(void)
        /* Set up resource owner */
        CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
 
-       /*
-        * Use the recovery target timeline ID during recovery
-        */
-       if (am_cascading_walsender)
-               ThisTimeLineID = GetRecoveryTargetTLI();
-
        /*
         * Let postmaster know that we're a WAL sender. Once we've declared us as
         * a WAL sender process, postmaster will let us outlive the bgwriter and
@@ -254,8 +249,8 @@ IdentifySystem(void)
        am_cascading_walsender = RecoveryInProgress();
        if (am_cascading_walsender)
        {
-               logptr = GetStandbyFlushRecPtr();
-               ThisTimeLineID = GetRecoveryTargetTLI();
+               /* this also updates ThisTimeLineID */
+               logptr = GetStandbyFlushRecPtr(0);
        }
        else
                logptr = GetInsertRecPtr();
@@ -409,6 +404,7 @@ static void
 StartReplication(StartReplicationCmd *cmd)
 {
        StringInfoData buf;
+       XLogRecPtr FlushPtr;
 
        /*
         * We assume here that we're logging enough information in the WAL for
@@ -421,8 +417,17 @@ StartReplication(StartReplicationCmd *cmd)
 
        /*
         * Select the timeline. If it was given explicitly by the client, use
-        * that. Otherwise use the current ThisTimeLineID.
+        * that. Otherwise use the timeline of the last replayed record, which
+        * is kept in ThisTimeLineID.
         */
+       if (am_cascading_walsender)
+       {
+               /* this also updates ThisTimeLineID */
+               FlushPtr = GetStandbyFlushRecPtr(0);
+       }
+       else
+               FlushPtr = GetFlushRecPtr();
+
        if (cmd->timeline != 0)
        {
                XLogRecPtr      switchpoint;
@@ -494,7 +499,6 @@ StartReplication(StartReplicationCmd *cmd)
        if (!sendTimeLineIsHistoric ||
                XLByteLT(cmd->startpoint, sendTimeLineValidUpto))
        {
-               XLogRecPtr FlushPtr;
                /*
                 * When we first start replication the standby will be behind the primary.
                 * For some applications, for example, synchronous replication, it is
@@ -516,10 +520,6 @@ StartReplication(StartReplicationCmd *cmd)
                 * Don't allow a request to stream from a future point in WAL that
                 * hasn't been flushed to disk in this server yet.
                 */
-               if (am_cascading_walsender)
-                       FlushPtr = GetStandbyFlushRecPtr();
-               else
-                       FlushPtr = GetFlushRecPtr();
                if (XLByteLT(FlushPtr, cmd->startpoint))
                {
                        ereport(ERROR,
@@ -1330,7 +1330,7 @@ XLogSend(bool *caughtup)
         * that gets lost on the master.
         */
        if (am_cascading_walsender)
-               FlushPtr = GetStandbyFlushRecPtr();
+               FlushPtr = GetStandbyFlushRecPtr(sendTimeLine);
        else
                FlushPtr = GetFlushRecPtr();
 
@@ -1347,7 +1347,6 @@ XLogSend(bool *caughtup)
        if (!sendTimeLineIsHistoric && am_cascading_walsender)
        {
                bool            becameHistoric = false;
-               TimeLineID      targetTLI;
 
                if (!RecoveryInProgress())
                {
@@ -1355,7 +1354,6 @@ XLogSend(bool *caughtup)
                         * We have been promoted. RecoveryInProgress() updated
                         * ThisTimeLineID to the new current timeline.
                         */
-                       targetTLI = ThisTimeLineID;
                        am_cascading_walsender = false;
                        becameHistoric = true;
                }
@@ -1363,11 +1361,9 @@ XLogSend(bool *caughtup)
                {
                        /*
                         * Still a cascading standby. But is the timeline we're sending
-                        * still the recovery target timeline?
+                        * still the one recovery is recovering from?
                         */
-                       targetTLI = GetRecoveryTargetTLI();
-
-                       if (targetTLI != sendTimeLine)
+                       if (sendTimeLine != ThisTimeLineID)
                                becameHistoric = true;
                }
 
@@ -1380,7 +1376,7 @@ XLogSend(bool *caughtup)
                         */
                        List       *history;
 
-                       history = readTimeLineHistory(targetTLI);
+                       history = readTimeLineHistory(ThisTimeLineID);
                        sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history);
                        Assert(XLByteLE(sentPtr, sendTimeLineValidUpto));
                        list_free_deep(history);
@@ -1521,6 +1517,48 @@ XLogSend(bool *caughtup)
        return;
 }
 
+/*
+ * Returns the latest point in WAL that has been safely flushed to disk, and
+ * can be sent to the standby. This should only be called when in recovery,
+ * ie. we're streaming to a cascaded standby.
+ *
+ * If currentTLI is non-zero, the function returns the point that the WAL on
+ * the given timeline has been flushed upto. If recovery has already switched
+ * to a different timeline, InvalidXLogRecPtr is returned.
+ *
+ * As a side-effect, ThisTimeLineID is updated to the TLI of the last
+ * replayed WAL record.
+ */
+static XLogRecPtr
+GetStandbyFlushRecPtr(TimeLineID currentTLI)
+{
+       XLogRecPtr replayPtr;
+       TimeLineID replayTLI;
+       XLogRecPtr receivePtr;
+       TimeLineID receiveTLI;
+       XLogRecPtr      result;
+
+       /*
+        * We can safely send what's already been replayed. Also, if walreceiver
+        * is streaming WAL from the same timeline, we can send anything that
+        * it has streamed, but hasn't been replayed yet.
+        */
+
+       receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
+       replayPtr = GetXLogReplayRecPtr(&replayTLI);
+
+       ThisTimeLineID = replayTLI;
+
+       if (currentTLI != replayTLI && currentTLI != 0)
+               return InvalidXLogRecPtr;
+
+       result = replayPtr;
+       if (receiveTLI == currentTLI && receivePtr > replayPtr)
+               result = receivePtr;
+
+       return result;
+}
+
 /*
  * Request walsenders to reload the currently-open WAL file
  */
index c8cd37981c58785bcb3326034fd533051124d820..95d01b974446b27d5501df0d526ceace1b833a2e 100644 (file)
@@ -283,8 +283,7 @@ extern bool RecoveryInProgress(void);
 extern bool HotStandbyActive(void);
 extern bool XLogInsertAllowed(void);
 extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
-extern XLogRecPtr GetXLogReplayRecPtr(void);
-extern XLogRecPtr GetStandbyFlushRecPtr(void);
+extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI);
 extern XLogRecPtr GetXLogInsertRecPtr(void);
 extern XLogRecPtr GetXLogWriteRecPtr(void);
 extern bool RecoveryIsPaused(void);