]> granicus.if.org Git - postgresql/commitdiff
Fix bugs in cascading replication with recovery_target_timeline='latest'
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 5 Sep 2012 01:47:03 +0000 (18:47 -0700)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 5 Sep 2012 02:33:32 +0000 (19:33 -0700)
The cascading replication code assumed that the current RecoveryTargetTLI
never changes, but that's not true with recovery_target_timeline='latest'.
The obvious upshot of that is that RecoveryTargetTLI in shared memory needs
to be protected by a lock. A less obvious consequence is that when a
cascading standby is connected, and the standby switches to a new target
timeline after scanning the archive, it will continue to stream WAL to the
cascading standby, but from a wrong file, ie. the file of the previous
timeline. For example, if the standby is currently streaming from the middle
of file 000000010000000000000005, and the timeline changes, the standby
will continue to stream from that file. However, the WAL on the new
timeline is in file 000000020000000000000005, so the standby sends garbage
from 000000010000000000000005 to the cascading standby, instead of the
correct WAL from file 000000020000000000000005.

This also fixes a related bug where a partial WAL segment is restored from
the archive and streamed to a cascading standby. The code assumed that when
a WAL segment is copied from the archive, it can immediately be fully
streamed to a cascading standby. However, if the segment is only partially
filled, ie. has the right size, but only N first bytes contain valid WAL,
that's not safe. That can happen if a partial WAL segment is manually copied
to the archive, or if a partial WAL segment is archived because a server is
started up on a new timeline within that segment. The cascading standby will
get confused if the WAL it received is not valid, and will get stuck until
it's restarted. This patch fixes that problem by not allowing WAL restored
from the archive to be streamed to a cascading standby until it's been
replayed, and thus validated.

src/backend/access/transam/xlog.c
src/backend/replication/walsender.c
src/include/access/xlog.h

index b75dab5e10fe5694ae90051f5426acddae900605..063355c46a2141ca966687bec933a2597cba1256 100644 (file)
@@ -406,7 +406,6 @@ typedef struct XLogCtlData
        XLogRecPtr *xlblocks;           /* 1st byte ptr-s + XLOG_BLCKSZ */
        int                     XLogCacheBlck;  /* highest allocated xlog buffer index */
        TimeLineID      ThisTimeLineID;
-       TimeLineID      RecoveryTargetTLI;
 
        /*
         * archiveCleanupCommand is read from recovery.conf but needs to be in
@@ -455,14 +454,14 @@ typedef struct XLogCtlData
        XLogRecPtr      recoveryLastRecPtr;
        /* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
        TimestampTz recoveryLastXTime;
+       /* current effective recovery target timeline */
+       TimeLineID      RecoveryTargetTLI;
 
        /*
         * timestamp of when we started replaying the current chunk of WAL data,
         * only relevant for replication or archive recovery
         */
        TimestampTz currentChunkStartTime;
-       /* end of the last record restored from the archive */
-       XLogRecPtr      restoreLastRecPtr;
        /* Are we requested to pause recovery? */
        bool            recoveryPause;
 
@@ -2880,19 +2879,6 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
                if (reload)
                        WalSndRqstFileReload();
 
-               /*
-                * Calculate the end location of the restored WAL file and save it in
-                * shmem. It's used as current standby flush position, and cascading
-                * walsenders try to send WAL records up to this location.
-                */
-               endptr.xlogid = log;
-               endptr.xrecoff = seg * XLogSegSize;
-               XLByteAdvance(endptr, XLogSegSize);
-
-               SpinLockAcquire(&xlogctl->info_lck);
-               xlogctl->restoreLastRecPtr = endptr;
-               SpinLockRelease(&xlogctl->info_lck);
-
                /* Signal walsender that new WAL has arrived */
                if (AllowCascadeReplication())
                        WalSndWakeup();
@@ -4467,12 +4453,17 @@ rescanLatestTimeLine(void)
                                                        ThisTimeLineID)));
                else
                {
+                       /* use volatile pointer to prevent code rearrangement */
+                       volatile XLogCtlData *xlogctl = XLogCtl;
+
                        /* Switch target */
                        recoveryTargetTLI = newtarget;
                        list_free(expectedTLIs);
                        expectedTLIs = newExpectedTLIs;
 
-                       XLogCtl->RecoveryTargetTLI = recoveryTargetTLI;
+                       SpinLockAcquire(&xlogctl->info_lck);
+                       xlogctl->RecoveryTargetTLI = recoveryTargetTLI;
+                       SpinLockRelease(&xlogctl->info_lck);
 
                        ereport(LOG,
                                        (errmsg("new target timeline is %u",
@@ -7519,13 +7510,20 @@ GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
 }
 
 /*
- * GetRecoveryTargetTLI - get the recovery target timeline ID
+ * GetRecoveryTargetTLI - get the current recovery target timeline ID
  */
 TimeLineID
 GetRecoveryTargetTLI(void)
 {
-       /* RecoveryTargetTLI doesn't change so we need no lock to copy it */
-       return XLogCtl->RecoveryTargetTLI;
+       /* use volatile pointer to prevent code rearrangement */
+       volatile XLogCtlData *xlogctl = XLogCtl;
+       TimeLineID result;
+
+       SpinLockAcquire(&xlogctl->info_lck);
+       result = xlogctl->RecoveryTargetTLI;
+       SpinLockRelease(&xlogctl->info_lck);
+
+       return result;
 }
 
 /*
@@ -8321,7 +8319,7 @@ CreateRestartPoint(int flags)
                XLogRecPtr      endptr;
 
                /* Get the current (or recent) end of xlog */
-               endptr = GetStandbyFlushRecPtr();
+               endptr = GetStandbyFlushRecPtr(NULL);
 
                KeepLogSeg(endptr, &_logId, &_logSeg);
                PrevLogSeg(_logId, _logSeg);
@@ -9837,14 +9835,13 @@ do_pg_abort_backup(void)
 /*
  * Get latest redo apply position.
  *
- * Optionally, returns the end byte position of the last restored
- * WAL segment. Callers not interested in that value may pass
- * NULL for restoreLastRecPtr.
+ * Optionally, returns the current recovery target timeline. Callers not
+ * interested in that may pass NULL for targetTLI.
  *
  * Exported to allow WALReceiver to read the pointer directly.
  */
 XLogRecPtr
-GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr)
+GetXLogReplayRecPtr(TimeLineID *targetTLI)
 {
        /* use volatile pointer to prevent code rearrangement */
        volatile XLogCtlData *xlogctl = XLogCtl;
@@ -9852,8 +9849,8 @@ GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr)
 
        SpinLockAcquire(&xlogctl->info_lck);
        recptr = xlogctl->recoveryLastRecPtr;
-       if (restoreLastRecPtr)
-               *restoreLastRecPtr = xlogctl->restoreLastRecPtr;
+       if (targetTLI)
+               *targetTLI = xlogctl->RecoveryTargetTLI;
        SpinLockRelease(&xlogctl->info_lck);
 
        return recptr;
@@ -9862,21 +9859,23 @@ GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr)
 /*
  * Get current standby flush position, ie, the last WAL position
  * known to be fsync'd to disk in standby.
+ *
+ * If 'targetTLI' is not NULL, it's set to the current recovery target
+ * timeline.
  */
 XLogRecPtr
-GetStandbyFlushRecPtr(void)
+GetStandbyFlushRecPtr(TimeLineID *targetTLI)
 {
        XLogRecPtr      receivePtr;
        XLogRecPtr      replayPtr;
-       XLogRecPtr      restorePtr;
 
        receivePtr = GetWalRcvWriteRecPtr(NULL);
-       replayPtr = GetXLogReplayRecPtr(&restorePtr);
+       replayPtr = GetXLogReplayRecPtr(targetTLI);
 
        if (XLByteLT(receivePtr, replayPtr))
-               return XLByteLT(replayPtr, restorePtr) ? restorePtr : replayPtr;
+               return replayPtr;
        else
-               return XLByteLT(receivePtr, restorePtr) ? restorePtr : receivePtr;
+               return receivePtr;
 }
 
 /*
index 3f060b82c09c0df3bf183f0cf90b160dd6adc426..064ddd549531d660cb500bc7a620392b42067ade 100644 (file)
@@ -299,7 +299,7 @@ IdentifySystem(void)
                         GetSystemIdentifier());
        snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
 
-       logptr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetInsertRecPtr();
+       logptr = am_cascading_walsender ? GetStandbyFlushRecPtr(NULL) : GetInsertRecPtr();
 
        snprintf(xpos, sizeof(xpos), "%X/%X",
                         logptr.xlogid, logptr.xrecoff);
@@ -1144,7 +1144,31 @@ XLogSend(char *msgbuf, bool *caughtup)
         * subsequently crashes and restarts, slaves must not have applied any WAL
         * that gets lost on the master.
         */
-       SendRqstPtr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetFlushRecPtr();
+       if (am_cascading_walsender)
+       {
+               TimeLineID      currentTargetTLI;
+               SendRqstPtr = GetStandbyFlushRecPtr(&currentTargetTLI);
+
+               /*
+                * If the recovery target timeline changed, bail out. It's a bit
+                * unfortunate that we have to just disconnect, but there is no way
+                * to tell the client that the timeline changed. We also don't know
+                * exactly where the switch happened, so we cannot safely try to send
+                * up to the switchover point before disconnecting.
+                */
+               if (currentTargetTLI != ThisTimeLineID)
+               {
+                       if (!walsender_ready_to_stop)
+                               ereport(LOG,
+                                               (errmsg("terminating walsender process to force cascaded standby "
+                                                               "to update timeline and reconnect")));
+                       walsender_ready_to_stop = true;
+                       *caughtup = true;
+                       return;
+               }
+       }
+       else
+               SendRqstPtr = GetFlushRecPtr();
 
        /* Quick exit if nothing to do */
        if (XLByteLE(SendRqstPtr, sentPtr))
index df5f232eeea44684b25ce81673b54e4ad47cc33c..4d4558ddb67e5aca40b55b3fc025a0e5fe6f1e1f 100644 (file)
@@ -286,8 +286,8 @@ extern bool RecoveryInProgress(void);
 extern bool HotStandbyActive(void);
 extern bool XLogInsertAllowed(void);
 extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
-extern XLogRecPtr GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr);
-extern XLogRecPtr GetStandbyFlushRecPtr(void);
+extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *targetTLI);
+extern XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *targetTLI);
 extern XLogRecPtr GetXLogInsertRecPtr(void);
 extern XLogRecPtr GetXLogWriteRecPtr(void);
 extern bool RecoveryIsPaused(void);