]> granicus.if.org Git - postgresql/blobdiff - src/backend/access/transam/xlog.c
Revert WAL posix_fallocate() patches.
[postgresql] / src / backend / access / transam / xlog.c
index 9208bc21d462d51bd975b560a28e27f467fab39e..dc47c4760bf253e8855ad7b21ae2864f2a502367 100644 (file)
@@ -4,7 +4,7 @@
  *             PostgreSQL transaction log manager
  *
  *
- * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * src/backend/access/transam/xlog.c
 #include "access/twophase.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
+#include "access/xlogreader.h"
 #include "access/xlogutils.h"
 #include "catalog/catversion.h"
 #include "catalog/pg_control.h"
 #include "catalog/pg_database.h"
-#include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/bgwriter.h"
 #include "postmaster/startup.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
+#include "storage/barrier.h"
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "utils/timestamp.h"
 #include "pg_trace.h"
 
+extern uint32 bootstrap_data_checksum_version;
 
 /* File path names (all relative to $PGDATA) */
 #define RECOVERY_COMMAND_FILE  "recovery.conf"
 #define RECOVERY_COMMAND_DONE  "recovery.done"
-#define PROMOTE_SIGNAL_FILE "promote"
+#define PROMOTE_SIGNAL_FILE            "promote"
+#define FALLBACK_PROMOTE_SIGNAL_FILE "fallback_promote"
 
 
 /* User-settable parameters */
@@ -81,6 +84,7 @@ int                   sync_method = DEFAULT_SYNC_METHOD;
 int                    wal_level = WAL_LEVEL_MINIMAL;
 int                    CommitDelay = 0;        /* precommit delay in microseconds */
 int                    CommitSiblings = 5; /* # concurrent xacts needed to sleep */
+int                    num_xloginsert_slots = 8;
 
 #ifdef WAL_DEBUG
 bool           XLOG_DEBUG = false;
@@ -153,6 +157,7 @@ static XLogRecPtr LastRec;
 
 /* Local copy of WalRcv->receivedUpto */
 static XLogRecPtr receivedUpto = 0;
+static TimeLineID receiveTLI = 0;
 
 /*
  * During recovery, lastFullPageWrites keeps track of full_page_writes that
@@ -186,14 +191,25 @@ static bool LocalHotStandbyActive = false;
  */
 static int     LocalXLogInsertAllowed = -1;
 
-/* Are we recovering using offline XLOG archives? (only valid in the startup process) */
-bool InArchiveRecovery = false;
+/*
+ * When ArchiveRecoveryRequested is set, archive recovery was requested,
+ * ie. recovery.conf file was present. When InArchiveRecovery is set, we are
+ * currently recovering using offline XLOG archives. These variables are only
+ * valid in the startup process.
+ *
+ * When ArchiveRecoveryRequested is true, but InArchiveRecovery is false, we're
+ * currently performing crash recovery using only XLOG files in pg_xlog, but
+ * will switch to using offline XLOG archives as soon as we reach the end of
+ * WAL in pg_xlog.
+*/
+bool           ArchiveRecoveryRequested = false;
+bool           InArchiveRecovery = false;
 
 /* Was the last xlog file restored from archive, or local? */
 static bool restoredFromArchive = false;
 
 /* options taken from recovery.conf for archive recovery */
-char *recoveryRestoreCommand = NULL;
+char      *recoveryRestoreCommand = NULL;
 static char *recoveryEndCommand = NULL;
 static char *archiveCleanupCommand = NULL;
 static RecoveryTargetType recoveryTarget = RECOVERY_TARGET_UNSET;
@@ -204,10 +220,16 @@ static TimestampTz recoveryTargetTime;
 static char *recoveryTargetName;
 
 /* options taken from recovery.conf for XLOG streaming */
-bool StandbyMode = false;
+static bool StandbyModeRequested = false;
 static char *PrimaryConnInfo = NULL;
 static char *TriggerFile = NULL;
 
+/* are we currently in standby mode? */
+bool           StandbyMode = false;
+
+/* whether request for fast promotion has been made yet */
+static bool fast_promote = false;
+
 /* if recoveryStopsHere returns true, it saves actual stop xid/time/name here */
 static TransactionId recoveryStopXid;
 static TimestampTz recoveryStopTime;
@@ -226,7 +248,7 @@ static bool recoveryStopAfter;
  *
  * recoveryTargetIsLatest: was the requested target timeline 'latest'?
  *
- * expectedTLIs: an integer list of recoveryTargetTLI and the TLIs of
+ * expectedTLEs: a list of TimeLineHistoryEntries for recoveryTargetTLI and the timelines of
  * its known parents, newest first (so recoveryTargetTLI is always the
  * first list member). Only these TLIs are expected to be seen in the WAL
  * segments we read, and indeed only these TLIs will be considered as
@@ -240,7 +262,7 @@ static bool recoveryStopAfter;
  */
 static TimeLineID recoveryTargetTLI;
 static bool recoveryTargetIsLatest = false;
-static List *expectedTLIs;
+static List *expectedTLEs;
 static TimeLineID curFileTLI;
 
 /*
@@ -259,8 +281,8 @@ XLogRecPtr  XactLastRecEnd = InvalidXLogRecPtr;
  * (which is almost but not quite the same as a pointer to the most recent
  * CHECKPOINT record). We update this from the shared-memory copy,
  * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
- * hold the Insert lock).  See XLogInsert for details. We are also allowed
- * to update from XLogCtl->Insert.RedoRecPtr if we hold the info_lck;
+ * hold an insertion slot).  See XLogInsert for details.  We are also allowed
+ * to update from XLogCtl->RedoRecPtr if we hold the info_lck;
  * see GetRedoRecPtr.  A freshly spawned backend obtains the value during
  * InitXLOGAccess.
  */
@@ -301,7 +323,10 @@ static XLogRecPtr RedoStartLSN = InvalidXLogRecPtr;
  * so it's a plain spinlock.  The other locks are held longer (potentially
  * over I/O operations), so we use LWLocks for them.  These locks are:
  *
- * WALInsertLock: must be held to insert a record into the WAL buffers.
+ * WALBufMappingLock: must be held to replace a page in the WAL buffer cache.
+ * It is only held while initializing and changing the mapping.  If the
+ * contents of the buffer being replaced haven't been written yet, the mapping
+ * lock is released while the write is done, and reacquired afterwards.
  *
  * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
  * XLogFlush).
@@ -328,17 +353,89 @@ typedef struct XLogwrtResult
        XLogRecPtr      Flush;                  /* last byte + 1 flushed */
 } XLogwrtResult;
 
+
+/*
+ * A slot for inserting to the WAL. This is similar to an LWLock, the main
+ * difference is that there is an extra xlogInsertingAt field that is protected
+ * by the same mutex. Unlike an LWLock, a slot can only be acquired in
+ * exclusive mode.
+ *
+ * The xlogInsertingAt field is used to advertise to other processes how far
+ * the slot owner has progressed in inserting the record. When a backend
+ * acquires a slot, it initializes xlogInsertingAt to 1, because it doesn't
+ * yet know where it's going to insert the record. That's conservative
+ * but correct; the new insertion is certainly going to go to a byte position
+ * greater than 1. If another backend needs to flush the WAL, it will have to
+ * wait for the new insertion. xlogInsertingAt is updated after finishing the
+ * insert or when crossing a page boundary, which will wake up anyone waiting
+ * for it, whether the wait was necessary in the first place or not.
+ *
+ * A process can wait on a slot in two modes: LW_EXCLUSIVE or
+ * LW_WAIT_UNTIL_FREE. LW_EXCLUSIVE works like in an lwlock; when the slot is
+ * released, the first LW_EXCLUSIVE waiter in the queue is woken up. Processes
+ * waiting in LW_WAIT_UNTIL_FREE mode are woken up whenever the slot is
+ * released, or xlogInsertingAt is updated. In other words, a process in
+ * LW_WAIT_UNTIL_FREE mode is woken up whenever the inserter makes any progress
+ * copying the record in place. LW_WAIT_UNTIL_FREE waiters are always added to
+ * the front of the queue, while LW_EXCLUSIVE waiters are appended to the end.
+ *
+ * To join the wait queue, a process must set MyProc->lwWaitMode to the mode
+ * it wants to wait in, MyProc->lwWaiting to true, and link MyProc to the head
+ * or tail of the wait queue. The same mechanism is used to wait on an LWLock,
+ * see lwlock.c for details.
+ */
+typedef struct
+{
+       slock_t         mutex;                  /* protects the below fields */
+       XLogRecPtr      xlogInsertingAt; /* insert has completed up to this point */
+
+       PGPROC     *owner;                      /* for debugging purposes */
+
+       bool            releaseOK;              /* T if ok to release waiters */
+       char            exclusive;              /* # of exclusive holders (0 or 1) */
+       PGPROC     *head;                       /* head of list of waiting PGPROCs */
+       PGPROC     *tail;                       /* tail of list of waiting PGPROCs */
+       /* tail is undefined when head is NULL */
+} XLogInsertSlot;
+
+/*
+ * All the slots are allocated as an array in shared memory. We force the
+ * array stride to be a power of 2, which saves a few cycles in indexing, but
+ * more importantly also ensures that individual slots don't cross cache line
+ * boundaries. (Of course, we have to also ensure that the array start
+ * address is suitably aligned.)
+ */
+typedef union XLogInsertSlotPadded
+{
+       XLogInsertSlot slot;
+       char            pad[CACHE_LINE_SIZE];
+} XLogInsertSlotPadded;
+
 /*
  * Shared state data for XLogInsert.
  */
 typedef struct XLogCtlInsert
 {
-       XLogRecPtr      PrevRecord;             /* start of previously-inserted record */
-       int                     curridx;                /* current block index in cache */
-       XLogPageHeader currpage;        /* points to header of block in cache */
-       char       *currpos;            /* current insertion point in cache */
-       XLogRecPtr      RedoRecPtr;             /* current redo point for insertions */
-       bool            forcePageWrites;        /* forcing full-page writes for PITR? */
+       slock_t         insertpos_lck;  /* protects CurrBytePos and PrevBytePos */
+
+       /*
+        * CurrBytePos is the end of reserved WAL. The next record will be inserted
+        * at that position. PrevBytePos is the start position of the previously
+        * inserted (or rather, reserved) record - it is copied to the the prev-
+        * link of the next record. These are stored as "usable byte positions"
+        * rather than XLogRecPtrs (see XLogBytePosToRecPtr()).
+        */
+       uint64          CurrBytePos;
+       uint64          PrevBytePos;
+
+       /*
+        * Make sure the above heavily-contended spinlock and byte positions are
+        * on their own cache line. In particular, the RedoRecPtr and full page
+        * write variables below should be on a different cache line. They are
+        * read on every WAL insertion, but updated rarely, and we don't want
+        * those reads to steal the cache line containing Curr/PrevBytePos.
+        */
+       char            pad[CACHE_LINE_SIZE];
 
        /*
         * fullPageWrites is the master copy used by all backends to determine
@@ -346,7 +443,12 @@ typedef struct XLogCtlInsert
         * This is required because, when full_page_writes is changed by SIGHUP,
         * we must WAL-log it before it actually affects WAL-logging by backends.
         * Checkpointer sets at startup or after SIGHUP.
+        *
+        * To read these fields, you must hold an insertion slot. To modify them,
+        * you must hold ALL the slots.
         */
+       XLogRecPtr      RedoRecPtr;             /* current redo point for insertions */
+       bool            forcePageWrites;        /* forcing full-page writes for PITR? */
        bool            fullPageWrites;
 
        /*
@@ -359,34 +461,33 @@ typedef struct XLogCtlInsert
        bool            exclusiveBackup;
        int                     nonExclusiveBackups;
        XLogRecPtr      lastBackupStart;
-} XLogCtlInsert;
 
-/*
- * Shared state data for XLogWrite/XLogFlush.
- */
-typedef struct XLogCtlWrite
-{
-       int                     curridx;                /* cache index of next block to write */
-       pg_time_t       lastSegSwitchTime;              /* time of last xlog segment switch */
-} XLogCtlWrite;
+       /* insertion slots, see XLogInsertSlot struct above for details */
+       XLogInsertSlotPadded *insertSlots;
+} XLogCtlInsert;
 
 /*
  * Total shared-memory state for XLOG.
  */
 typedef struct XLogCtlData
 {
-       /* Protected by WALInsertLock: */
        XLogCtlInsert Insert;
 
        /* Protected by info_lck: */
        XLogwrtRqst LogwrtRqst;
+       XLogRecPtr      RedoRecPtr;             /* a recent copy of Insert->RedoRecPtr */
        uint32          ckptXidEpoch;   /* nextXID & epoch of latest checkpoint */
        TransactionId ckptXid;
        XLogRecPtr      asyncXactLSN;   /* LSN of newest async commit/abort */
-       XLogSegNo       lastRemovedSegNo; /* latest removed/recycled XLOG segment */
+       XLogSegNo       lastRemovedSegNo;               /* latest removed/recycled XLOG
+                                                                                * segment */
 
-       /* Protected by WALWriteLock: */
-       XLogCtlWrite Write;
+       /* Fake LSN counter, for unlogged relations. Protected by ulsn_lck. */
+       XLogRecPtr      unloggedLSN;
+       slock_t         ulsn_lck;
+
+       /* Time of last xlog segment switch. Protected by WALWriteLock. */
+       pg_time_t       lastSegSwitchTime;
 
        /*
         * Protected by info_lck and WALWriteLock (you must hold either lock to
@@ -394,15 +495,34 @@ typedef struct XLogCtlData
         */
        XLogwrtResult LogwrtResult;
 
+       /*
+        * Latest initialized page in the cache (last byte position + 1).
+        *
+        * To change the identity of a buffer (and InitializedUpTo), you need to
+        * hold WALBufMappingLock.  To change the identity of a buffer that's still
+        * dirty, the old page needs to be written out first, and for that you
+        * need WALWriteLock, and you need to ensure that there are no in-progress
+        * insertions to the page by calling WaitXLogInsertionsToFinish().
+        */
+       XLogRecPtr      InitializedUpTo;
+
        /*
         * These values do not change after startup, although the pointed-to pages
-        * and xlblocks values certainly do.  Permission to read/write the pages
-        * and xlblocks values depends on WALInsertLock and WALWriteLock.
+        * and xlblocks values certainly do.  xlblock values are protected by
+        * WALBufMappingLock.
         */
        char       *pages;                      /* buffers for unwritten XLOG pages */
        XLogRecPtr *xlblocks;           /* 1st byte ptr-s + XLOG_BLCKSZ */
        int                     XLogCacheBlck;  /* highest allocated xlog buffer index */
+
+       /*
+        * Shared copy of ThisTimeLineID. Does not change after end-of-recovery.
+        * If we created a new timeline when the system was started up,
+        * PrevTimeLineID is the old timeline's ID that we forked off from.
+        * Otherwise it's equal to ThisTimeLineID.
+        */
        TimeLineID      ThisTimeLineID;
+       TimeLineID      PrevTimeLineID;
 
        /*
         * archiveCleanupCommand is read from recovery.conf but needs to be in
@@ -445,10 +565,16 @@ typedef struct XLogCtlData
        XLogRecPtr      lastCheckPointRecPtr;
        CheckPoint      lastCheckPoint;
 
-       /* end+1 of the last record replayed (or being replayed) */
+       /*
+        * lastReplayedEndRecPtr points to end+1 of the last record successfully
+        * replayed. When we're currently replaying a record, ie. in a redo
+        * function, replayEndRecPtr points to the end+1 of the record being
+        * replayed, otherwise it's equal to lastReplayedEndRecPtr.
+        */
+       XLogRecPtr      lastReplayedEndRecPtr;
+       TimeLineID      lastReplayedTLI;
        XLogRecPtr      replayEndRecPtr;
-       /* end+1 of the last record replayed */
-       XLogRecPtr      recoveryLastRecPtr;
+       TimeLineID      replayEndTLI;
        /* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
        TimestampTz recoveryLastXTime;
        /* current effective recovery target timeline */
@@ -479,25 +605,29 @@ static XLogCtlData *XLogCtl = NULL;
 static ControlFileData *ControlFile = NULL;
 
 /*
- * Macros for managing XLogInsert state.  In most cases, the calling routine
- * has local copies of XLogCtl->Insert and/or XLogCtl->Insert->curridx,
- * so these are passed as parameters instead of being fetched via XLogCtl.
+ * Calculate the amount of space left on the page after 'endptr'. Beware
+ * multiple evaluation!
  */
+#define INSERT_FREESPACE(endptr)       \
+       (((endptr) % XLOG_BLCKSZ == 0) ? 0 : (XLOG_BLCKSZ - (endptr) % XLOG_BLCKSZ))
 
-/* Free space remaining in the current xlog page buffer */
-#define INSERT_FREESPACE(Insert)  \
-       (XLOG_BLCKSZ - ((Insert)->currpos - (char *) (Insert)->currpage))
-
-/* Construct XLogRecPtr value for current insertion point */
-#define INSERT_RECPTR(recptr,Insert,curridx)  \
-               (recptr) = XLogCtl->xlblocks[curridx] - INSERT_FREESPACE(Insert)
-
-#define PrevBufIdx(idx)                \
-               (((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))
-
+/* Macro to advance to next buffer index. */
 #define NextBufIdx(idx)                \
                (((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
 
+/*
+ * XLogRecPtrToBufIdx returns the index of the WAL buffer that holds, or
+ * would hold if it was in cache, the page containing 'recptr'.
+ */
+#define XLogRecPtrToBufIdx(recptr)     \
+       (((recptr) / XLOG_BLCKSZ) % (XLogCtl->XLogCacheBlck + 1))
+
+/*
+ * These are the number of bytes in a WAL page and segment usable for WAL data.
+ */
+#define UsableBytesInPage (XLOG_BLCKSZ - SizeOfXLogShortPHD)
+#define UsableBytesInSegment ((XLOG_SEG_SIZE / XLOG_BLCKSZ) * UsableBytesInPage - (SizeOfXLogLongPHD - SizeOfXLogShortPHD))
+
 /*
  * Private, possibly out-of-date copy of shared LogwrtResult.
  * See discussion above.
@@ -506,12 +636,18 @@ static XLogwrtResult LogwrtResult = {0, 0};
 
 /*
  * Codes indicating where we got a WAL file from during recovery, or where
- * to attempt to get one.  These are chosen so that they can be OR'd together
- * in a bitmask state variable.
+ * to attempt to get one.
  */
-#define XLOG_FROM_ARCHIVE              (1<<0)  /* Restored using restore_command */
-#define XLOG_FROM_PG_XLOG              (1<<1)  /* Existing file in pg_xlog */
-#define XLOG_FROM_STREAM               (1<<2)  /* Streamed from master */
+typedef enum
+{
+       XLOG_FROM_ANY = 0,                      /* request to read WAL from any source */
+       XLOG_FROM_ARCHIVE,                      /* restored using restore_command */
+       XLOG_FROM_PG_XLOG,                      /* existing file in pg_xlog */
+       XLOG_FROM_STREAM,                       /* streamed from master */
+} XLogSource;
+
+/* human-readable names for XLogSources, for debugging output */
+static const char *xlogSourceNames[] = {"any", "archive", "pg_xlog", "stream"};
 
 /*
  * openLogFile is -1 or a kernel FD for an open log file segment.
@@ -535,39 +671,43 @@ static int        readFile = -1;
 static XLogSegNo readSegNo = 0;
 static uint32 readOff = 0;
 static uint32 readLen = 0;
-static bool    readFileHeaderValidated = false;
-static int     readSource = 0;         /* XLOG_FROM_* code */
+static XLogSource readSource = 0;              /* XLOG_FROM_* code */
 
 /*
- * Keeps track of which sources we've tried to read the current WAL
- * record from and failed.
+ * Keeps track of which source we're currently reading from. This is
+ * different from readSource in that this is always set, even when we don't
+ * currently have a WAL file open. If lastSourceFailed is set, our last
+ * attempt to read from currentSource failed, and we should try another source
+ * next.
  */
-static int     failedSources = 0;      /* OR of XLOG_FROM_* codes */
+static XLogSource currentSource = 0;   /* XLOG_FROM_* code */
+static bool lastSourceFailed = false;
+
+typedef struct XLogPageReadPrivate
+{
+       int                     emode;
+       bool            fetching_ckpt;  /* are we fetching a checkpoint record? */
+       bool            randAccess;
+} XLogPageReadPrivate;
 
 /*
  * These variables track when we last obtained some WAL data to process,
  * and where we got it from.  (XLogReceiptSource is initially the same as
  * readSource, but readSource gets reset to zero when we don't have data
- * to process right now.)
+ * to process right now.  It is also different from currentSource, which
+ * also changes when we try to read from a source and fail, while
+ * XLogReceiptSource tracks where we last successfully read some WAL.)
  */
 static TimestampTz XLogReceiptTime = 0;
-static int     XLogReceiptSource = 0;          /* XLOG_FROM_* code */
-
-/* Buffer for currently read page (XLOG_BLCKSZ bytes) */
-static char *readBuf = NULL;
-
-/* Buffer for current ReadRecord result (expandable) */
-static char *readRecordBuf = NULL;
-static uint32 readRecordBufSize = 0;
+static XLogSource XLogReceiptSource = 0;               /* XLOG_FROM_* code */
 
 /* State information for XLOG reading */
 static XLogRecPtr ReadRecPtr;  /* start of last record read */
 static XLogRecPtr EndRecPtr;   /* end+1 of last record read */
-static TimeLineID lastPageTLI = 0;
-static TimeLineID lastSegmentTLI = 0;
 
 static XLogRecPtr minRecoveryPoint;            /* local copy of
                                                                                 * ControlFile->minRecoveryPoint */
+static TimeLineID minRecoveryPointTLI;
 static bool updateMinRecoveryPoint = true;
 
 /*
@@ -582,6 +722,9 @@ static bool InRedo = false;
 /* Have we launched bgwriter during recovery? */
 static bool bgwriterLaunched = false;
 
+/* For WALInsertSlotAcquire/Release functions */
+static int     MySlotNo = 0;
+static bool holdingAllSlots = false;
 
 static void readRecoveryCommandFile(void);
 static void exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo);
@@ -591,25 +734,31 @@ static void SetLatestXTime(TimestampTz xtime);
 static void SetCurrentChunkStartTime(TimestampTz xtime);
 static void CheckRequiredParameterValues(void);
 static void XLogReportParameters(void);
+static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI,
+                                       TimeLineID prevTLI);
 static void LocalSetXLogInsertAllowed(void);
+static void CreateEndOfRecoveryRecord(void);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
 
-static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
+static bool XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock,
                                XLogRecPtr *lsn, BkpBlock *bkpb);
-static bool AdvanceXLInsertBuffer(bool new_segment);
+static Buffer RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb,
+                                                char *blk, bool get_cleanup_lock, bool keep_buffer);
+static void AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic);
 static bool XLogCheckpointNeeded(XLogSegNo new_segno);
-static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
+static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
 static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
                                           bool find_free, int *max_advance,
                                           bool use_lock);
 static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
                         int source, bool notexistOk);
-static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources);
-static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
-                        bool randAccess);
+static int     XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source);
+static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
+                        int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
+                        TimeLineID *readTLI);
 static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
-                                                       bool fetching_ckpt);
+                                                       bool fetching_ckpt, XLogRecPtr tliRecPtr);
 static int     emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
 static void XLogFileClose(void);
 static void PreallocXlogFiles(XLogRecPtr endptr);
@@ -618,12 +767,11 @@ static void UpdateLastRemovedPtr(char *filename);
 static void ValidateXLOGDirectoryStructure(void);
 static void CleanupBackupHistory(void);
 static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
-static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt);
+static XLogRecord *ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
+                  int emode, bool fetching_ckpt);
 static void CheckRecoveryConsistency(void);
-static bool ValidXLogPageHeader(XLogPageHeader hdr, int emode, bool segmentonly);
-static bool ValidXLogRecordHeader(XLogRecPtr *RecPtr, XLogRecord *record,
-                                         int emode, bool randAccess);
-static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
+static XLogRecord *ReadCheckpointRecord(XLogReaderState *xlogreader,
+                                        XLogRecPtr RecPtr, int whichChkpti, bool report);
 static bool rescanLatestTimeLine(void);
 static void WriteControlFile(void);
 static void ReadControlFile(void);
@@ -639,6 +787,24 @@ static bool read_backup_label(XLogRecPtr *checkPointLoc,
 static void rm_redo_error_callback(void *arg);
 static int     get_sync_bit(int method);
 
+static void CopyXLogRecordToWAL(int write_len, bool isLogSwitch,
+                                 XLogRecData *rdata,
+                                 XLogRecPtr StartPos, XLogRecPtr EndPos);
+static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
+                                                 XLogRecPtr *EndPos, XLogRecPtr *PrevPtr);
+static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
+                                 XLogRecPtr *PrevPtr);
+static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
+static void WakeupWaiters(XLogRecPtr EndPos);
+static char *GetXLogBuffer(XLogRecPtr ptr);
+static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
+static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
+static uint64 XLogRecPtrToBytePos(XLogRecPtr ptr);
+
+static void WALInsertSlotAcquire(bool exclusive);
+static void WALInsertSlotAcquireOne(int slotno);
+static void WALInsertSlotRelease(void);
+static void WALInsertSlotReleaseOne(int slotno);
 
 /*
  * Insert an XLOG record having the specified RMID and info bytes,
@@ -659,10 +825,6 @@ XLogRecPtr
 XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
 {
        XLogCtlInsert *Insert = &XLogCtl->Insert;
-       XLogRecPtr      RecPtr;
-       XLogRecPtr      WriteRqst;
-       uint32          freespace;
-       int                     curridx;
        XLogRecData *rdt;
        XLogRecData *rdt_lastnormal;
        Buffer          dtbuf[XLR_MAX_BKP_BLOCKS];
@@ -677,11 +839,13 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
        uint32          len,
                                write_len;
        unsigned        i;
-       bool            updrqst;
        bool            doPageWrites;
        bool            isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
+       bool            inserted;
        uint8           info_orig = info;
        static XLogRecord *rechdr;
+       XLogRecPtr      StartPos;
+       XLogRecPtr      EndPos;
 
        if (rechdr == NULL)
        {
@@ -707,8 +871,8 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
         */
        if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
        {
-               RecPtr = SizeOfXLogLongPHD;             /* start of 1st chkpt record */
-               return RecPtr;
+               EndPos = SizeOfXLogLongPHD;             /* start of 1st chkpt record */
+               return EndPos;
        }
 
        /*
@@ -716,9 +880,9 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
         * up.
         *
         * We may have to loop back to here if a race condition is detected below.
-        * We could prevent the race by doing all this work while holding the
-        * insert lock, but it seems better to avoid doing CRC calculations while
-        * holding the lock.
+        * We could prevent the race by doing all this work while holding an
+        * insertion slot, but it seems better to avoid doing CRC calculations
+        * while holding one.
         *
         * We add entries for backup blocks to the chain, so that they don't need
         * any special treatment in the critical section where the chunks are
@@ -735,8 +899,8 @@ begin:;
        /*
         * Decide if we need to do full-page writes in this XLOG record: true if
         * full_page_writes is on or we have a PITR request for it.  Since we
-        * don't yet have the insert lock, fullPageWrites and forcePageWrites
-        * could change under us, but we'll recheck them once we have the lock.
+        * don't yet have an insertion slot, fullPageWrites and forcePageWrites
+        * could change under us, but we'll recheck them once we have a slot.
         */
        doPageWrites = Insert->fullPageWrites || Insert->forcePageWrites;
 
@@ -769,8 +933,8 @@ begin:;
                                {
                                        /* OK, put it in this slot */
                                        dtbuf[i] = rdt->buffer;
-                                       if (XLogCheckBuffer(rdt, doPageWrites,
-                                                                               &(dtbuf_lsn[i]), &(dtbuf_xlg[i])))
+                                       if (doPageWrites && XLogCheckBuffer(rdt, true,
+                                                                                  &(dtbuf_lsn[i]), &(dtbuf_xlg[i])))
                                        {
                                                dtbuf_bkp[i] = true;
                                                rdt->data = NULL;
@@ -876,25 +1040,60 @@ begin:;
                COMP_CRC32(rdata_crc, rdt->data, rdt->len);
 
        /*
-        * Construct record header (prev-link and CRC are filled in later), and
-        * make that the first chunk in the chain.
+        * Construct record header (prev-link is filled in later, after reserving
+        * the space for the record), and make that the first chunk in the chain.
+        *
+        * The CRC calculated for the header here doesn't include prev-link,
+        * because we don't know it yet. It will be added later.
         */
        rechdr->xl_xid = GetCurrentTransactionIdIfAny();
        rechdr->xl_tot_len = SizeOfXLogRecord + write_len;
        rechdr->xl_len = len;           /* doesn't include backup blocks */
        rechdr->xl_info = info;
        rechdr->xl_rmid = rmid;
+       rechdr->xl_prev = InvalidXLogRecPtr;
+       COMP_CRC32(rdata_crc, ((char *) rechdr), offsetof(XLogRecord, xl_prev));
 
        hdr_rdt.next = rdata;
        hdr_rdt.data = (char *) rechdr;
        hdr_rdt.len = SizeOfXLogRecord;
-
        write_len += SizeOfXLogRecord;
 
+       /*----------
+        *
+        * We have now done all the preparatory work we can without holding a
+        * lock or modifying shared state. From here on, inserting the new WAL
+        * record to the shared WAL buffer cache is a two-step process:
+        *
+        * 1. Reserve the right amount of space from the WAL. The current head of
+        *    reserved space is kept in Insert->CurrBytePos, and is protected by
+        *    insertpos_lck.
+        *
+        * 2. Copy the record to the reserved WAL space. This involves finding the
+        *    correct WAL buffer containing the reserved space, and copying the
+        *    record in place. This can be done concurrently in multiple processes.
+        *
+        * To keep track of which insertions are still in-progress, each concurrent
+        * inserter allocates an "insertion slot", which tells others how far the
+        * inserter has progressed. There is a small fixed number of insertion
+        * slots, determined by the num_xloginsert_slots GUC. When an inserter
+        * finishes, it updates the xlogInsertingAt of its slot to the end of the
+        * record it inserted, to let others know that it's done. xlogInsertingAt
+        * is also updated when crossing over to a new WAL buffer, to allow the
+        * the previous buffer to be flushed.
+        *
+        * Holding onto a slot also protects RedoRecPtr and fullPageWrites from
+        * changing until the insertion is finished.
+        *
+        * Step 2 can usually be done completely in parallel. If the required WAL
+        * page is not initialized yet, you have to grab WALBufMappingLock to
+        * initialize it, but the WAL writer tries to do that ahead of insertions
+        * to avoid that from happening in the critical path.
+        *
+        *----------
+        */
        START_CRIT_SECTION();
-
-       /* Now wait to get insert lock */
-       LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+       WALInsertSlotAcquire(isLogSwitch);
 
        /*
         * Check to see if my RedoRecPtr is out of date.  If so, may have to go
@@ -905,9 +1104,9 @@ begin:;
         * affect the contents of the XLOG record, so we'll update our local copy
         * but not force a recomputation.
         */
-       if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))
+       if (RedoRecPtr != Insert->RedoRecPtr)
        {
-               Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));
+               Assert(RedoRecPtr < Insert->RedoRecPtr);
                RedoRecPtr = Insert->RedoRecPtr;
 
                if (doPageWrites)
@@ -917,13 +1116,13 @@ begin:;
                                if (dtbuf[i] == InvalidBuffer)
                                        continue;
                                if (dtbuf_bkp[i] == false &&
-                                       XLByteLE(dtbuf_lsn[i], RedoRecPtr))
+                                       dtbuf_lsn[i] <= RedoRecPtr)
                                {
                                        /*
                                         * Oops, this buffer now needs to be backed up, but we
                                         * didn't think so above.  Start over.
                                         */
-                                       LWLockRelease(WALInsertLock);
+                                       WALInsertSlotRelease();
                                        END_CRIT_SECTION();
                                        rdt_lastnormal->next = NULL;
                                        info = info_orig;
@@ -942,7 +1141,7 @@ begin:;
        if ((Insert->fullPageWrites || Insert->forcePageWrites) && !doPageWrites)
        {
                /* Oops, must redo it with full-page data. */
-               LWLockRelease(WALInsertLock);
+               WALInsertSlotRelease();
                END_CRIT_SECTION();
                rdt_lastnormal->next = NULL;
                info = info_orig;
@@ -950,62 +1149,94 @@ begin:;
        }
 
        /*
-        * If the current page is completely full, the record goes to the next
-        * page, right after the page header.
+        * Reserve space for the record in the WAL. This also sets the xl_prev
+        * pointer.
         */
-       updrqst = false;
-       freespace = INSERT_FREESPACE(Insert);
-       if (freespace == 0)
+       if (isLogSwitch)
+               inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev);
+       else
+       {
+               ReserveXLogInsertLocation(write_len, &StartPos, &EndPos,
+                                                                 &rechdr->xl_prev);
+               inserted = true;
+       }
+
+       if (inserted)
        {
-               updrqst = AdvanceXLInsertBuffer(false);
-               freespace = INSERT_FREESPACE(Insert);
+               /*
+                * Now that xl_prev has been filled in, finish CRC calculation of the
+                * record header.
+                */
+               COMP_CRC32(rdata_crc, ((char *) &rechdr->xl_prev), sizeof(XLogRecPtr));
+               FIN_CRC32(rdata_crc);
+               rechdr->xl_crc = rdata_crc;
+
+               /*
+                * All the record data, including the header, is now ready to be
+                * inserted. Copy the record in the space reserved.
+                */
+               CopyXLogRecordToWAL(write_len, isLogSwitch, &hdr_rdt, StartPos, EndPos);
+       }
+       else
+       {
+               /*
+                * This was an xlog-switch record, but the current insert location was
+                * already exactly at the beginning of a segment, so there was no need
+                * to do anything.
+                */
        }
 
-       /* Compute record's XLOG location */
-       curridx = Insert->curridx;
-       INSERT_RECPTR(RecPtr, Insert, curridx);
+       /*
+        * Done! Let others know that we're finished.
+        */
+       WALInsertSlotRelease();
+
+       END_CRIT_SECTION();
 
        /*
-        * If the record is an XLOG_SWITCH, and we are exactly at the start of a
-        * segment, we need not insert it (and don't want to because we'd like
-        * consecutive switch requests to be no-ops).  Instead, make sure
-        * everything is written and flushed through the end of the prior segment,
-        * and return the prior segment's end address.
+        * Update shared LogwrtRqst.Write, if we crossed page boundary.
         */
-       if (isLogSwitch && (RecPtr % XLogSegSize) == SizeOfXLogLongPHD)
+       if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
        {
-               /* We can release insert lock immediately */
-               LWLockRelease(WALInsertLock);
+               /* use volatile pointer to prevent code rearrangement */
+               volatile XLogCtlData *xlogctl = XLogCtl;
 
-               RecPtr -= SizeOfXLogLongPHD;
+               SpinLockAcquire(&xlogctl->info_lck);
+               /* advance global request to include new block(s) */
+               if (xlogctl->LogwrtRqst.Write < EndPos)
+                       xlogctl->LogwrtRqst.Write = EndPos;
+               /* update local result copy while I have the chance */
+               LogwrtResult = xlogctl->LogwrtResult;
+               SpinLockRelease(&xlogctl->info_lck);
+       }
 
-               LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
-               LogwrtResult = XLogCtl->LogwrtResult;
-               if (!XLByteLE(RecPtr, LogwrtResult.Flush))
+       /*
+        * If this was an XLOG_SWITCH record, flush the record and the empty
+        * padding space that fills the rest of the segment, and perform
+        * end-of-segment actions (eg, notifying archiver).
+        */
+       if (isLogSwitch)
+       {
+               TRACE_POSTGRESQL_XLOG_SWITCH();
+               XLogFlush(EndPos);
+               /*
+                * Even though we reserved the rest of the segment for us, which is
+                * reflected in EndPos, we return a pointer to just the end of the
+                * xlog-switch record.
+                */
+               if (inserted)
                {
-                       XLogwrtRqst FlushRqst;
-
-                       FlushRqst.Write = RecPtr;
-                       FlushRqst.Flush = RecPtr;
-                       XLogWrite(FlushRqst, false, false);
+                       EndPos = StartPos + SizeOfXLogRecord;
+                       if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
+                       {
+                               if (EndPos % XLOG_SEG_SIZE == EndPos % XLOG_BLCKSZ)
+                                       EndPos += SizeOfXLogLongPHD;
+                               else
+                                       EndPos += SizeOfXLogShortPHD;
+                       }
                }
-               LWLockRelease(WALWriteLock);
-
-               END_CRIT_SECTION();
-
-               /* wake up walsenders now that we've released heavily contended locks */
-               WalSndWakeupProcessRequests();
-               return RecPtr;
        }
 
-       /* Finish the record header */
-       rechdr->xl_prev = Insert->PrevRecord;
-
-       /* Now we can finish computing the record's CRC */
-       COMP_CRC32(rdata_crc, (char *) rechdr, offsetof(XLogRecord, xl_crc));
-       FIN_CRC32(rdata_crc);
-       rechdr->xl_crc = rdata_crc;
-
 #ifdef WAL_DEBUG
        if (XLOG_DEBUG)
        {
@@ -1013,7 +1244,7 @@ begin:;
 
                initStringInfo(&buf);
                appendStringInfo(&buf, "INSERT @ %X/%X: ",
-                                                (uint32) (RecPtr >> 32), (uint32) RecPtr);
+                                                (uint32) (EndPos >> 32), (uint32) EndPos);
                xlog_outrec(&buf, rechdr);
                if (rdata->data != NULL)
                {
@@ -1025,380 +1256,1304 @@ begin:;
        }
 #endif
 
-       /* Record begin of record in appropriate places */
-       ProcLastRecPtr = RecPtr;
-       Insert->PrevRecord = RecPtr;
-
        /*
-        * Append the data, including backup blocks if any
+        * Update our global variables
         */
-       rdata = &hdr_rdt;
-       while (write_len)
-       {
-               while (rdata->data == NULL)
-                       rdata = rdata->next;
+       ProcLastRecPtr = StartPos;
+       XactLastRecEnd = EndPos;
 
-               if (freespace > 0)
-               {
-                       if (rdata->len > freespace)
-                       {
-                               memcpy(Insert->currpos, rdata->data, freespace);
-                               rdata->data += freespace;
-                               rdata->len -= freespace;
-                               write_len -= freespace;
-                       }
-                       else
-                       {
-                               memcpy(Insert->currpos, rdata->data, rdata->len);
-                               freespace -= rdata->len;
-                               write_len -= rdata->len;
-                               Insert->currpos += rdata->len;
-                               rdata = rdata->next;
-                               continue;
-                       }
-               }
+       return EndPos;
+}
 
-               /* Use next buffer */
-               updrqst = AdvanceXLInsertBuffer(false);
-               curridx = Insert->curridx;
-               /* Mark page header to indicate this record continues on the page */
-               Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
-               Insert->currpage->xlp_rem_len = write_len;
-               freespace = INSERT_FREESPACE(Insert);
-       }
+/*
+ * Reserves the right amount of space for a record of given size from the WAL.
+ * *StartPos is set to the beginning of the reserved section, *EndPos to
+ * its end+1. *PrevPtr is set to the beginning of the previous record; it is
+ * used to set the xl_prev of this record.
+ *
+ * This is the performance critical part of XLogInsert that must be serialized
+ * across backends. The rest can happen mostly in parallel. Try to keep this
+ * section as short as possible, insertpos_lck can be heavily contended on a
+ * busy system.
+ *
+ * NB: The space calculation here must match the code in CopyXLogRecordToWAL,
+ * where we actually copy the record to the reserved space.
+ */
+static void
+ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos,
+                                                 XLogRecPtr *PrevPtr)
+{
+       volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+       uint64          startbytepos;
+       uint64          endbytepos;
+       uint64          prevbytepos;
 
-       /* Ensure next record will be properly aligned */
-       Insert->currpos = (char *) Insert->currpage +
-               MAXALIGN(Insert->currpos - (char *) Insert->currpage);
-       freespace = INSERT_FREESPACE(Insert);
+       size = MAXALIGN(size);
 
-       /*
-        * The recptr I return is the beginning of the *next* record. This will be
-        * stored as LSN for changed data pages...
-        */
-       INSERT_RECPTR(RecPtr, Insert, curridx);
+       /* All (non xlog-switch) records should contain data. */
+       Assert(size > SizeOfXLogRecord);
 
        /*
-        * If the record is an XLOG_SWITCH, we must now write and flush all the
-        * existing data, and then forcibly advance to the start of the next
-        * segment.  It's not good to do this I/O while holding the insert lock,
-        * but there seems too much risk of confusion if we try to release the
-        * lock sooner.  Fortunately xlog switch needn't be a high-performance
-        * operation anyway...
+        * The duration the spinlock needs to be held is minimized by minimizing
+        * the calculations that have to be done while holding the lock. The
+        * current tip of reserved WAL is kept in CurrBytePos, as a byte position
+        * that only counts "usable" bytes in WAL, that is, it excludes all WAL
+        * page headers. The mapping between "usable" byte positions and physical
+        * positions (XLogRecPtrs) can be done outside the locked region, and
+        * because the usable byte position doesn't include any headers, reserving
+        * X bytes from WAL is almost as simple as "CurrBytePos += X".
         */
-       if (isLogSwitch)
-       {
-               XLogwrtRqst FlushRqst;
-               XLogRecPtr      OldSegEnd;
-
-               TRACE_POSTGRESQL_XLOG_SWITCH();
+       SpinLockAcquire(&Insert->insertpos_lck);
 
-               LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
+       startbytepos = Insert->CurrBytePos;
+       endbytepos = startbytepos + size;
+       prevbytepos = Insert->PrevBytePos;
+       Insert->CurrBytePos = endbytepos;
+       Insert->PrevBytePos = startbytepos;
 
-               /*
-                * Flush through the end of the page containing XLOG_SWITCH, and
-                * perform end-of-segment actions (eg, notifying archiver).
-                */
-               WriteRqst = XLogCtl->xlblocks[curridx];
-               FlushRqst.Write = WriteRqst;
-               FlushRqst.Flush = WriteRqst;
-               XLogWrite(FlushRqst, false, true);
-
-               /* Set up the next buffer as first page of next segment */
-               /* Note: AdvanceXLInsertBuffer cannot need to do I/O here */
-               (void) AdvanceXLInsertBuffer(true);
-
-               /* There should be no unwritten data */
-               curridx = Insert->curridx;
-               Assert(curridx == XLogCtl->Write.curridx);
+       SpinLockRelease(&Insert->insertpos_lck);
 
-               /* Compute end address of old segment */
-               OldSegEnd = XLogCtl->xlblocks[curridx];
-               OldSegEnd -= XLOG_BLCKSZ;
+       *StartPos = XLogBytePosToRecPtr(startbytepos);
+       *EndPos = XLogBytePosToEndRecPtr(endbytepos);
+       *PrevPtr = XLogBytePosToRecPtr(prevbytepos);
 
-               /* Make it look like we've written and synced all of old segment */
-               LogwrtResult.Write = OldSegEnd;
-               LogwrtResult.Flush = OldSegEnd;
+       /*
+        * Check that the conversions between "usable byte positions" and
+        * XLogRecPtrs work consistently in both directions.
+        */
+       Assert(XLogRecPtrToBytePos(*StartPos) == startbytepos);
+       Assert(XLogRecPtrToBytePos(*EndPos) == endbytepos);
+       Assert(XLogRecPtrToBytePos(*PrevPtr) == prevbytepos);
+}
 
-               /*
-                * Update shared-memory status --- this code should match XLogWrite
-                */
-               {
-                       /* use volatile pointer to prevent code rearrangement */
-                       volatile XLogCtlData *xlogctl = XLogCtl;
+/*
+ * Like ReserveXLogInsertLocation(), but for an xlog-switch record.
+ *
+ * A log-switch record is handled slightly differently. The rest of the
+ * segment will be reserved for this insertion, as indicated by the returned
+ * *EndPos value. However, if we are already at the beginning of the current
+ * segment, *StartPos and *EndPos are set to the current location without
+ * reserving any space, and the function returns false.
+*/
+static bool
+ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
+{
+       volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+       uint64          startbytepos;
+       uint64          endbytepos;
+       uint64          prevbytepos;
+       uint32          size = SizeOfXLogRecord;
+       XLogRecPtr      ptr;
+       uint32          segleft;
 
-                       SpinLockAcquire(&xlogctl->info_lck);
-                       xlogctl->LogwrtResult = LogwrtResult;
-                       if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
-                               xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
-                       if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
-                               xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
-                       SpinLockRelease(&xlogctl->info_lck);
-               }
+       /*
+        * These calculations are a bit heavy-weight to be done while holding a
+        * spinlock, but since we're holding all the WAL insertion slots, there
+        * are no other inserters competing for it. GetXLogInsertRecPtr() does
+        * compete for it, but that's not called very frequently.
+        */
+       SpinLockAcquire(&Insert->insertpos_lck);
 
-               LWLockRelease(WALWriteLock);
+       startbytepos = Insert->CurrBytePos;
 
-               updrqst = false;                /* done already */
-       }
-       else
+       ptr = XLogBytePosToEndRecPtr(startbytepos);
+       if (ptr % XLOG_SEG_SIZE == 0)
        {
-               /* normal case, ie not xlog switch */
-
-               /* Need to update shared LogwrtRqst if some block was filled up */
-               if (freespace == 0)
-               {
-                       /* curridx is filled and available for writing out */
-                       updrqst = true;
-               }
-               else
-               {
-                       /* if updrqst already set, write through end of previous buf */
-                       curridx = PrevBufIdx(curridx);
-               }
-               WriteRqst = XLogCtl->xlblocks[curridx];
+               SpinLockRelease(&Insert->insertpos_lck);
+               *EndPos = *StartPos = ptr;
+               return false;
        }
 
-       LWLockRelease(WALInsertLock);
+       endbytepos = startbytepos + size;
+       prevbytepos = Insert->PrevBytePos;
 
-       if (updrqst)
-       {
-               /* use volatile pointer to prevent code rearrangement */
-               volatile XLogCtlData *xlogctl = XLogCtl;
+       *StartPos = XLogBytePosToRecPtr(startbytepos);
+       *EndPos = XLogBytePosToEndRecPtr(endbytepos);
 
-               SpinLockAcquire(&xlogctl->info_lck);
-               /* advance global request to include new block(s) */
-               if (XLByteLT(xlogctl->LogwrtRqst.Write, WriteRqst))
-                       xlogctl->LogwrtRqst.Write = WriteRqst;
-               /* update local result copy while I have the chance */
-               LogwrtResult = xlogctl->LogwrtResult;
-               SpinLockRelease(&xlogctl->info_lck);
+       segleft = XLOG_SEG_SIZE - ((*EndPos) % XLOG_SEG_SIZE);
+       if (segleft != XLOG_SEG_SIZE)
+       {
+               /* consume the rest of the segment */
+               *EndPos += segleft;
+               endbytepos = XLogRecPtrToBytePos(*EndPos);
        }
+       Insert->CurrBytePos = endbytepos;
+       Insert->PrevBytePos = startbytepos;
 
-       XactLastRecEnd = RecPtr;
+       SpinLockRelease(&Insert->insertpos_lck);
 
-       END_CRIT_SECTION();
+       *PrevPtr = XLogBytePosToRecPtr(prevbytepos);
 
-       /* wake up walsenders now that we've released heavily contended locks */
-       WalSndWakeupProcessRequests();
+       Assert((*EndPos) % XLOG_SEG_SIZE == 0);
+       Assert(XLogRecPtrToBytePos(*EndPos) == endbytepos);
+       Assert(XLogRecPtrToBytePos(*StartPos) == startbytepos);
+       Assert(XLogRecPtrToBytePos(*PrevPtr) == prevbytepos);
 
-       return RecPtr;
+       return true;
 }
 
 /*
- * Determine whether the buffer referenced by an XLogRecData item has to
- * be backed up, and if so fill a BkpBlock struct for it.  In any case
- * save the buffer's LSN at *lsn.
+ * Subroutine of XLogInsert.  Copies a WAL record to an already-reserved
+ * area in the WAL.
  */
-static bool
-XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
-                               XLogRecPtr *lsn, BkpBlock *bkpb)
+static void
+CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
+                                       XLogRecPtr StartPos, XLogRecPtr EndPos)
 {
-       Page            page;
+       char       *currpos;
+       int                     freespace;
+       int                     written;
+       XLogRecPtr      CurrPos;
+       XLogPageHeader pagehdr;
 
-       page = BufferGetPage(rdata->buffer);
+       /* The first chunk is the record header */
+       Assert(rdata->len == SizeOfXLogRecord);
+
+       /*
+        * Get a pointer to the right place in the right WAL buffer to start
+        * inserting to.
+        */
+       CurrPos = StartPos;
+       currpos = GetXLogBuffer(CurrPos);
+       freespace = INSERT_FREESPACE(CurrPos);
 
        /*
-        * XXX We assume page LSN is first data on *every* page that can be passed
-        * to XLogInsert, whether it otherwise has the standard page layout or
-        * not.
+        * there should be enough space for at least the first field (xl_tot_len)
+        * on this page.
         */
-       *lsn = PageGetLSN(page);
+       Assert(freespace >= sizeof(uint32));
 
-       if (doPageWrites &&
-               XLByteLE(PageGetLSN(page), RedoRecPtr))
+       /* Copy record data */
+       written = 0;
+       while (rdata != NULL)
        {
-               /*
-                * The page needs to be backed up, so set up *bkpb
-                */
-               BufferGetTag(rdata->buffer, &bkpb->node, &bkpb->fork, &bkpb->block);
+               char       *rdata_data = rdata->data;
+               int                     rdata_len = rdata->len;
 
-               if (rdata->buffer_std)
+               while (rdata_len > freespace)
                {
-                       /* Assume we can omit data between pd_lower and pd_upper */
-                       uint16          lower = ((PageHeader) page)->pd_lower;
-                       uint16          upper = ((PageHeader) page)->pd_upper;
+                       /*
+                        * Write what fits on this page, and continue on the next page.
+                        */
+                       Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || freespace == 0);
+                       memcpy(currpos, rdata_data, freespace);
+                       rdata_data += freespace;
+                       rdata_len -= freespace;
+                       written += freespace;
+                       CurrPos += freespace;
 
-                       if (lower >= SizeOfPageHeaderData &&
-                               upper > lower &&
-                               upper <= BLCKSZ)
+                       /*
+                        * Get pointer to beginning of next page, and set the xlp_rem_len
+                        * in the page header. Set XLP_FIRST_IS_CONTRECORD.
+                        *
+                        * It's safe to set the contrecord flag and xlp_rem_len without a
+                        * lock on the page. All the other flags were already set when the
+                        * page was initialized, in AdvanceXLInsertBuffer, and we're the
+                        * only backend that needs to set the contrecord flag.
+                        */
+                       currpos = GetXLogBuffer(CurrPos);
+                       pagehdr = (XLogPageHeader) currpos;
+                       pagehdr->xlp_rem_len = write_len - written;
+                       pagehdr->xlp_info |= XLP_FIRST_IS_CONTRECORD;
+
+                       /* skip over the page header */
+                       if (CurrPos % XLogSegSize == 0)
                        {
-                               bkpb->hole_offset = lower;
-                               bkpb->hole_length = upper - lower;
+                               CurrPos += SizeOfXLogLongPHD;
+                               currpos += SizeOfXLogLongPHD;
                        }
                        else
                        {
-                               /* No "hole" to compress out */
-                               bkpb->hole_offset = 0;
-                               bkpb->hole_length = 0;
+                               CurrPos += SizeOfXLogShortPHD;
+                               currpos += SizeOfXLogShortPHD;
                        }
-               }
-               else
-               {
-                       /* Not a standard page header, don't try to eliminate "hole" */
-                       bkpb->hole_offset = 0;
-                       bkpb->hole_length = 0;
+                       freespace = INSERT_FREESPACE(CurrPos);
                }
 
-               return true;                    /* buffer requires backup */
-       }
+               Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || rdata_len == 0);
+               memcpy(currpos, rdata_data, rdata_len);
+               currpos += rdata_len;
+               CurrPos += rdata_len;
+               freespace -= rdata_len;
+               written += rdata_len;
 
-       return false;                           /* buffer does not need to be backed up */
-}
+               rdata = rdata->next;
+       }
+       Assert(written == write_len);
 
-/*
- * Advance the Insert state to the next buffer page, writing out the next
- * buffer if it still contains unwritten data.
- *
- * If new_segment is TRUE then we set up the next buffer page as the first
- * page of the next xlog segment file, possibly but not usually the next
- * consecutive file page.
- *
- * The global LogwrtRqst.Write pointer needs to be advanced to include the
- * just-filled page.  If we can do this for free (without an extra lock),
- * we do so here.  Otherwise the caller must do it.  We return TRUE if the
- * request update still needs to be done, FALSE if we did it internally.
- *
- * Must be called with WALInsertLock held.
- */
-static bool
-AdvanceXLInsertBuffer(bool new_segment)
-{
-       XLogCtlInsert *Insert = &XLogCtl->Insert;
-       int                     nextidx = NextBufIdx(Insert->curridx);
-       bool            update_needed = true;
-       XLogRecPtr      OldPageRqstPtr;
-       XLogwrtRqst WriteRqst;
-       XLogRecPtr      NewPageEndPtr;
-       XLogRecPtr      NewPageBeginPtr;
-       XLogPageHeader NewPage;
+       /* Align the end position, so that the next record starts aligned */
+       CurrPos = MAXALIGN(CurrPos);
 
        /*
-        * Get ending-offset of the buffer page we need to replace (this may be
-        * zero if the buffer hasn't been used yet).  Fall through if it's already
-        * written out.
+        * If this was an xlog-switch, it's not enough to write the switch record,
+        * we also have to consume all the remaining space in the WAL segment.
+        * We have already reserved it for us, but we still need to make sure it's
+        * allocated and zeroed in the WAL buffers so that when the caller (or
+        * someone else) does XLogWrite(), it can really write out all the zeros.
         */
-       OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
-       if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
+       if (isLogSwitch && CurrPos % XLOG_SEG_SIZE != 0)
        {
-               /* nope, got work to do... */
-               XLogRecPtr      FinishedPageRqstPtr;
+               /* An xlog-switch record doesn't contain any data besides the header */
+               Assert(write_len == SizeOfXLogRecord);
 
-               FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx];
+               /*
+                * We do this one page at a time, to make sure we don't deadlock
+                * against ourselves if wal_buffers < XLOG_SEG_SIZE.
+                */
+               Assert(EndPos % XLogSegSize == 0);
 
-               /* Before waiting, get info_lck and update LogwrtResult */
-               {
-                       /* use volatile pointer to prevent code rearrangement */
-                       volatile XLogCtlData *xlogctl = XLogCtl;
+               /* Use up all the remaining space on the first page */
+               CurrPos += freespace;
 
-                       SpinLockAcquire(&xlogctl->info_lck);
-                       if (XLByteLT(xlogctl->LogwrtRqst.Write, FinishedPageRqstPtr))
-                               xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr;
-                       LogwrtResult = xlogctl->LogwrtResult;
-                       SpinLockRelease(&xlogctl->info_lck);
+               while (CurrPos < EndPos)
+               {
+                       /* initialize the next page (if not initialized already) */
+                       WakeupWaiters(CurrPos);
+                       AdvanceXLInsertBuffer(CurrPos, false);
+                       CurrPos += XLOG_BLCKSZ;
                }
+       }
 
-               update_needed = false;  /* Did the shared-request update */
+       if (CurrPos != EndPos)
+               elog(PANIC, "space reserved for WAL record does not match what was written");
+}
 
-               /*
-                * Now that we have an up-to-date LogwrtResult value, see if we still
-                * need to write it or if someone else already did.
+/*
+ * Allocate a slot for insertion.
+ *
+ * In exclusive mode, all slots are reserved for the current process. That
+ * blocks all concurrent insertions.
+ */
+static void
+WALInsertSlotAcquire(bool exclusive)
+{
+       int                     i;
+
+       if (exclusive)
+       {
+               for (i = 0; i < num_xloginsert_slots; i++)
+                       WALInsertSlotAcquireOne(i);
+               holdingAllSlots = true;
+       }
+       else
+               WALInsertSlotAcquireOne(-1);
+}
+
+/*
+ * Workhorse of WALInsertSlotAcquire. Acquires the given slot, or an arbitrary
+ * one if slotno == -1. The index of the slot that was acquired is stored in
+ * MySlotNo.
+ *
+ * This is more or less equivalent to LWLockAcquire().
+ */
+static void
+WALInsertSlotAcquireOne(int slotno)
+{
+       volatile XLogInsertSlot *slot;
+       PGPROC     *proc = MyProc;
+       bool            retry = false;
+       int                     extraWaits = 0;
+       static int      slotToTry = -1;
+
+       /*
+        * Try to use the slot we used last time. If the system isn't particularly
+        * busy, it's a good bet that it's available, and it's good to have some
+        * affinity to a particular slot so that you don't unnecessarily bounce
+        * cache lines between processes when there is no contention.
+        *
+        * If this is the first time through in this backend, pick a slot
+        * (semi-)randomly. This allows the slots to be used evenly if you have a
+        * lot of very short connections.
+        */
+       if (slotno != -1)
+               MySlotNo = slotno;
+       else
+       {
+               if (slotToTry == -1)
+                       slotToTry = MyProc->pgprocno % num_xloginsert_slots;
+               MySlotNo = slotToTry;
+       }
+
+       /*
+        * We can't wait if we haven't got a PGPROC.  This should only occur
+        * during bootstrap or shared memory initialization.  Put an Assert here
+        * to catch unsafe coding practices.
+        */
+       Assert(MyProc != NULL);
+
+       /*
+        * Lock out cancel/die interrupts until we exit the code section protected
+        * by the slot.  This ensures that interrupts will not interfere with
+        * manipulations of data structures in shared memory. There is no cleanup
+        * mechanism to release the slot if the backend dies while holding one,
+        * so make this a critical section.
+        */
+       START_CRIT_SECTION();
+
+       /*
+        * Loop here to try to acquire slot after each time we are signaled by
+        * WALInsertSlotRelease.
+        */
+       for (;;)
+       {
+               bool            mustwait;
+
+               slot = &XLogCtl->Insert.insertSlots[MySlotNo].slot;
+
+               /* Acquire mutex.  Time spent holding mutex should be short! */
+               SpinLockAcquire(&slot->mutex);
+
+               /* If retrying, allow WALInsertSlotRelease to release waiters again */
+               if (retry)
+                       slot->releaseOK = true;
+
+               /* If I can get the slot, do so quickly. */
+               if (slot->exclusive == 0)
+               {
+                       slot->exclusive++;
+                       mustwait = false;
+               }
+               else
+                       mustwait = true;
+
+               if (!mustwait)
+                       break;                          /* got the lock */
+
+               Assert(slot->owner != MyProc);
+
+               /*
+                * Add myself to wait queue.
+                */
+               proc->lwWaiting = true;
+               proc->lwWaitMode = LW_EXCLUSIVE;
+               proc->lwWaitLink = NULL;
+               if (slot->head == NULL)
+                       slot->head = proc;
+               else
+                       slot->tail->lwWaitLink = proc;
+               slot->tail = proc;
+
+               /* Can release the mutex now */
+               SpinLockRelease(&slot->mutex);
+
+               /*
+                * Wait until awakened.
+                *
+                * Since we share the process wait semaphore with the regular lock
+                * manager and ProcWaitForSignal, and we may need to acquire a slot
+                * while one of those is pending, it is possible that we get awakened
+                * for a reason other than being signaled by WALInsertSlotRelease. If
+                * so, loop back and wait again.  Once we've gotten the slot,
+                * re-increment the sema by the number of additional signals received,
+                * so that the lock manager or signal manager will see the received
+                * signal when it next waits.
                 */
-               if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
+               for (;;)
                {
-                       /* Must acquire write lock */
-                       LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
-                       LogwrtResult = XLogCtl->LogwrtResult;
-                       if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
-                       {
-                               /* OK, someone wrote it already */
-                               LWLockRelease(WALWriteLock);
-                       }
-                       else
+                       /* "false" means cannot accept cancel/die interrupt here. */
+                       PGSemaphoreLock(&proc->sem, false);
+                       if (!proc->lwWaiting)
+                               break;
+                       extraWaits++;
+               }
+
+               /* Now loop back and try to acquire lock again. */
+               retry = true;
+       }
+
+       slot->owner = proc;
+
+       /*
+        * Normally, we initialize the xlogInsertingAt value of the slot to 1,
+        * because we don't yet know where in the WAL we're going to insert. It's
+        * not critical what it points to right now - leaving it to a too small
+        * value just means that WaitXlogInsertionsToFinish() might wait on us
+        * unnecessarily, until we update the value (when we finish the insert or
+        * move to next page).
+        *
+        * If we're grabbing all the slots, however, stamp all but the last one
+        * with InvalidXLogRecPtr, meaning there is no insert in progress. The last
+        * slot is the one that we will update as we proceed with the insert, the
+        * rest are held just to keep off other inserters.
+        */
+       if (slotno != -1 && slotno != num_xloginsert_slots - 1)
+               slot->xlogInsertingAt = InvalidXLogRecPtr;
+       else
+               slot->xlogInsertingAt = 1;
+
+       /* We are done updating shared state of the slot itself. */
+       SpinLockRelease(&slot->mutex);
+
+       /*
+        * Fix the process wait semaphore's count for any absorbed wakeups.
+        */
+       while (extraWaits-- > 0)
+               PGSemaphoreUnlock(&proc->sem);
+
+       /*
+        * If we couldn't get the slot immediately, try another slot next time.
+        * On a system with more insertion slots than concurrent inserters, this
+        * causes all the inserters to eventually migrate to a slot that no-one
+        * else is using. On a system with more inserters than slots, it still
+        * causes the inserters to be distributed quite evenly across the slots.
+        */
+       if (slotno != -1 && retry)
+               slotToTry = (slotToTry + 1) % num_xloginsert_slots;
+}
+
+/*
+ * Wait for the given slot to become free, or for its xlogInsertingAt location
+ * to change to something else than 'waitptr'. In other words, wait for the
+ * inserter using the given slot to finish its insertion, or to at least make
+ * some progress.
+ */
+static void
+WaitOnSlot(volatile XLogInsertSlot *slot, XLogRecPtr waitptr)
+{
+       PGPROC     *proc = MyProc;
+       int                     extraWaits = 0;
+
+       /*
+        * Lock out cancel/die interrupts while we sleep on the slot. There is
+        * no cleanup mechanism to remove us from the wait queue if we got
+        * interrupted.
+        */
+       HOLD_INTERRUPTS();
+
+       /*
+        * Loop here to try to acquire lock after each time we are signaled.
+        */
+       for (;;)
+       {
+               bool            mustwait;
+
+               /* Acquire mutex.  Time spent holding mutex should be short! */
+               SpinLockAcquire(&slot->mutex);
+
+               /* If I can get the lock, do so quickly. */
+               if (slot->exclusive == 0 || slot->xlogInsertingAt != waitptr)
+                       mustwait = false;
+               else
+                       mustwait = true;
+
+               if (!mustwait)
+                       break;                          /* the lock was free */
+
+               Assert(slot->owner != MyProc);
+
+               /*
+                * Add myself to wait queue.
+                */
+               proc->lwWaiting = true;
+               proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
+               proc->lwWaitLink = NULL;
+
+               /* waiters are added to the front of the queue */
+               proc->lwWaitLink = slot->head;
+               if (slot->head == NULL)
+                       slot->tail = proc;
+               slot->head = proc;
+
+               /* Can release the mutex now */
+               SpinLockRelease(&slot->mutex);
+
+               /*
+                * Wait until awakened.
+                *
+                * Since we share the process wait semaphore with other things, like
+                * the regular lock manager and ProcWaitForSignal, and we may need to
+                * acquire an LWLock while one of those is pending, it is possible that
+                * we get awakened for a reason other than being signaled by
+                * LWLockRelease. If so, loop back and wait again.  Once we've gotten
+                * the LWLock, re-increment the sema by the number of additional
+                * signals received, so that the lock manager or signal manager will
+                * see the received signal when it next waits.
+                */
+               for (;;)
+               {
+                       /* "false" means cannot accept cancel/die interrupt here. */
+                       PGSemaphoreLock(&proc->sem, false);
+                       if (!proc->lwWaiting)
+                               break;
+                       extraWaits++;
+               }
+
+               /* Now loop back and try to acquire lock again. */
+       }
+
+       /* We are done updating shared state of the lock itself. */
+       SpinLockRelease(&slot->mutex);
+
+       /*
+        * Fix the process wait semaphore's count for any absorbed wakeups.
+        */
+       while (extraWaits-- > 0)
+               PGSemaphoreUnlock(&proc->sem);
+
+       /*
+        * Now okay to allow cancel/die interrupts.
+        */
+       RESUME_INTERRUPTS();
+}
+
+/*
+ * Wake up all processes waiting for us with WaitOnSlot(). Sets our
+ * xlogInsertingAt value to EndPos, without releasing the slot.
+ */
+static void
+WakeupWaiters(XLogRecPtr EndPos)
+{
+       volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[MySlotNo].slot;
+       PGPROC     *head;
+       PGPROC     *proc;
+       PGPROC     *next;
+
+       /*
+        * If we have already reported progress up to the same point, do nothing.
+        * No other process can modify xlogInsertingAt, so we can check this before
+        * grabbing the spinlock.
+        */
+       if (slot->xlogInsertingAt == EndPos)
+               return;
+       /* xlogInsertingAt should not go backwards */
+       Assert(slot->xlogInsertingAt < EndPos);
+
+       /* Acquire mutex.  Time spent holding mutex should be short! */
+       SpinLockAcquire(&slot->mutex);
+
+       /* we should own the slot */
+       Assert(slot->exclusive == 1 && slot->owner == MyProc);
+
+       slot->xlogInsertingAt = EndPos;
+
+       /*
+        * See if there are any waiters that need to be woken up.
+        */
+       head = slot->head;
+
+       if (head != NULL)
+       {
+               proc = head;
+
+               /* LW_WAIT_UNTIL_FREE waiters are always in the front of the queue */
+               next = proc->lwWaitLink;
+               while (next && next->lwWaitMode == LW_WAIT_UNTIL_FREE)
+               {
+                       proc = next;
+                       next = next->lwWaitLink;
+               }
+
+               /* proc is now the last PGPROC to be released */
+               slot->head = next;
+               proc->lwWaitLink = NULL;
+       }
+
+       /* We are done updating shared state of the lock itself. */
+       SpinLockRelease(&slot->mutex);
+
+       /*
+        * Awaken any waiters I removed from the queue.
+        */
+       while (head != NULL)
+       {
+               proc = head;
+               head = proc->lwWaitLink;
+               proc->lwWaitLink = NULL;
+               proc->lwWaiting = false;
+               PGSemaphoreUnlock(&proc->sem);
+       }
+}
+
+/*
+ * Release our insertion slot (or slots, if we're holding them all).
+ */
+static void
+WALInsertSlotRelease(void)
+{
+       int                     i;
+
+       if (holdingAllSlots)
+       {
+               for (i = 0; i < num_xloginsert_slots; i++)
+                       WALInsertSlotReleaseOne(i);
+               holdingAllSlots = false;
+       }
+       else
+               WALInsertSlotReleaseOne(MySlotNo);
+}
+
+static void
+WALInsertSlotReleaseOne(int slotno)
+{
+       volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[slotno].slot;
+       PGPROC     *head;
+       PGPROC     *proc;
+
+       /* Acquire mutex.  Time spent holding mutex should be short! */
+       SpinLockAcquire(&slot->mutex);
+
+       /* we must be holding it */
+       Assert(slot->exclusive == 1 && slot->owner == MyProc);
+
+       slot->xlogInsertingAt = InvalidXLogRecPtr;
+
+       /* Release my hold on the slot */
+       slot->exclusive = 0;
+       slot->owner = NULL;
+
+       /*
+        * See if I need to awaken any waiters..
+        */
+       head = slot->head;
+       if (head != NULL)
+       {
+               if (slot->releaseOK)
+               {
+                       /*
+                        * Remove the to-be-awakened PGPROCs from the queue.
+                        */
+                       bool            releaseOK = true;
+
+                       proc = head;
+
+                       /*
+                        * First wake up any backends that want to be woken up without
+                        * acquiring the lock. These are always in the front of the queue.
+                        */
+                       while (proc->lwWaitMode == LW_WAIT_UNTIL_FREE && proc->lwWaitLink)
+                               proc = proc->lwWaitLink;
+
+                       /*
+                        * Awaken the first exclusive-waiter, if any.
+                        */
+                       if (proc->lwWaitLink)
                        {
-                               /*
-                                * Have to write buffers while holding insert lock. This is
-                                * not good, so only write as much as we absolutely must.
-                                */
-                               TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
-                               WriteRqst.Write = OldPageRqstPtr;
-                               WriteRqst.Flush = 0;
-                               XLogWrite(WriteRqst, false, false);
-                               LWLockRelease(WALWriteLock);
-                               TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
+                               Assert(proc->lwWaitLink->lwWaitMode == LW_EXCLUSIVE);
+                               proc = proc->lwWaitLink;
+                               releaseOK = false;
                        }
+                       /* proc is now the last PGPROC to be released */
+                       slot->head = proc->lwWaitLink;
+                       proc->lwWaitLink = NULL;
+
+                       slot->releaseOK = releaseOK;
                }
+               else
+                       head = NULL;
        }
 
+       /* We are done updating shared state of the slot itself. */
+       SpinLockRelease(&slot->mutex);
+
        /*
-        * Now the next buffer slot is free and we can set it up to be the next
-        * output page.
+        * Awaken any waiters I removed from the queue.
         */
-       NewPageBeginPtr = XLogCtl->xlblocks[Insert->curridx];
+       while (head != NULL)
+       {
+               proc = head;
+               head = proc->lwWaitLink;
+               proc->lwWaitLink = NULL;
+               proc->lwWaiting = false;
+               PGSemaphoreUnlock(&proc->sem);
+       }
+
+       /*
+        * Now okay to allow cancel/die interrupts.
+        */
+       END_CRIT_SECTION();
+}
+
 
-       if (new_segment)
+/*
+ * Wait for any WAL insertions < upto to finish.
+ *
+ * Returns the location of the oldest insertion that is still in-progress.
+ * Any WAL prior to that point has been fully copied into WAL buffers, and
+ * can be flushed out to disk. Because this waits for any insertions older
+ * than 'upto' to finish, the return value is always >= 'upto'.
+ *
+ * Note: When you are about to write out WAL, you must call this function
+ * *before* acquiring WALWriteLock, to avoid deadlocks. This function might
+ * need to wait for an insertion to finish (or at least advance to next
+ * uninitialized page), and the inserter might need to evict an old WAL buffer
+ * to make room for a new one, which in turn requires WALWriteLock.
+ */
+static XLogRecPtr
+WaitXLogInsertionsToFinish(XLogRecPtr upto)
+{
+       uint64          bytepos;
+       XLogRecPtr      reservedUpto;
+       XLogRecPtr      finishedUpto;
+       volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+       int                     i;
+
+       if (MyProc == NULL)
+               elog(PANIC, "cannot wait without a PGPROC structure");
+
+       /* Read the current insert position */
+       SpinLockAcquire(&Insert->insertpos_lck);
+       bytepos = Insert->CurrBytePos;
+       SpinLockRelease(&Insert->insertpos_lck);
+       reservedUpto = XLogBytePosToEndRecPtr(bytepos);
+
+       /*
+        * No-one should request to flush a piece of WAL that hasn't even been
+        * reserved yet. However, it can happen if there is a block with a bogus
+        * LSN on disk, for example. XLogFlush checks for that situation and
+        * complains, but only after the flush. Here we just assume that to mean
+        * that all WAL that has been reserved needs to be finished. In this
+        * corner-case, the return value can be smaller than 'upto' argument.
+        */
+       if (upto > reservedUpto)
+       {
+               elog(LOG, "request to flush past end of generated WAL; request %X/%X, currpos %X/%X",
+                        (uint32) (upto >> 32), (uint32) upto,
+                        (uint32) (reservedUpto >> 32), (uint32) reservedUpto);
+               upto = reservedUpto;
+       }
+
+       /*
+        * finishedUpto is our return value, indicating the point upto which
+        * all the WAL insertions have been finished. Initialize it to the head
+        * of reserved WAL, and as we iterate through the insertion slots, back it
+        * out for any insertion that's still in progress.
+        */
+       finishedUpto = reservedUpto;
+
+       /*
+        * Loop through all the slots, sleeping on any in-progress insert older
+        * than 'upto'.
+        */
+       for (i = 0; i < num_xloginsert_slots; i++)
        {
-               /* force it to a segment start point */
-               if (NewPageBeginPtr % XLogSegSize != 0)
-                       XLByteAdvance(NewPageBeginPtr,
-                                                 XLogSegSize - NewPageBeginPtr % XLogSegSize);
+               volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[i].slot;
+               XLogRecPtr insertingat;
+
+       retry:
+               /*
+                * We can check if the slot is in use without grabbing the spinlock.
+                * The spinlock acquisition of insertpos_lck before this loop acts
+                * as a memory barrier. If someone acquires the slot after that, it
+                * can't possibly be inserting to anything < reservedUpto. If it was
+                * acquired before that, an unlocked test will return true.
+                */
+               if (!slot->exclusive)
+                       continue;
+
+               SpinLockAcquire(&slot->mutex);
+               /* re-check now that we have the lock */
+               if (!slot->exclusive)
+               {
+                       SpinLockRelease(&slot->mutex);
+                       continue;
+               }
+               insertingat = slot->xlogInsertingAt;
+               SpinLockRelease(&slot->mutex);
+
+               if (insertingat == InvalidXLogRecPtr)
+               {
+                       /*
+                        * slot is reserved just to hold off other inserters, there is no
+                        * actual insert in progress.
+                        */
+                       continue;
+               }
+
+               /*
+                * This insertion is still in progress. Do we need to wait for it?
+                *
+                * When an inserter acquires a slot, it doesn't reset 'insertingat', so
+                * it will initially point to the old value of some already-finished
+                * insertion. The inserter will update the value as soon as it finishes
+                * the insertion, moves to the next page, or has to do I/O to flush an
+                * old dirty buffer. That means that when we see a slot with
+                * insertingat value < upto, we don't know if that insertion is still
+                * truly in progress, or if the slot is reused by a new inserter that
+                * hasn't updated the insertingat value yet. We have to assume it's the
+                * latter, and wait.
+                */
+               if (insertingat < upto)
+               {
+                       WaitOnSlot(slot, insertingat);
+                       goto retry;
+               }
+               else
+               {
+                       /*
+                        * We don't need to wait for this insertion, but update the
+                        * return value.
+                        */
+                       if (insertingat < finishedUpto)
+                               finishedUpto = insertingat;
+               }
        }
+       return finishedUpto;
+}
 
-       NewPageEndPtr = NewPageBeginPtr;
-       XLByteAdvance(NewPageEndPtr, XLOG_BLCKSZ);
-       XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
-       NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
+/*
+ * Get a pointer to the right location in the WAL buffer containing the
+ * given XLogRecPtr.
+ *
+ * If the page is not initialized yet, it is initialized. That might require
+ * evicting an old dirty buffer from the buffer cache, which means I/O.
+ *
+ * The caller must ensure that the page containing the requested location
+ * isn't evicted yet, and won't be evicted. The way to ensure that is to
+ * hold onto an XLogInsertSlot with the xlogInsertingAt position set to
+ * something <= ptr. GetXLogBuffer() will update xlogInsertingAt if it needs
+ * to evict an old page from the buffer. (This means that once you call
+ * GetXLogBuffer() with a given 'ptr', you must not access anything before
+ * that point anymore, and must not call GetXLogBuffer() with an older 'ptr'
+ * later, because older buffers might be recycled already)
+ */
+static char *
+GetXLogBuffer(XLogRecPtr ptr)
+{
+       int                     idx;
+       XLogRecPtr      endptr;
+       static uint64 cachedPage = 0;
+       static char *cachedPos = NULL;
+       XLogRecPtr      expectedEndPtr;
 
-       Insert->curridx = nextidx;
-       Insert->currpage = NewPage;
+       /*
+        * Fast path for the common case that we need to access again the same
+        * page as last time.
+        */
+       if (ptr / XLOG_BLCKSZ == cachedPage)
+       {
+               Assert(((XLogPageHeader) cachedPos)->xlp_magic == XLOG_PAGE_MAGIC);
+               Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr == ptr - (ptr % XLOG_BLCKSZ));
+               return cachedPos + ptr % XLOG_BLCKSZ;
+       }
 
-       Insert->currpos = ((char *) NewPage) +SizeOfXLogShortPHD;
+       /*
+        * The XLog buffer cache is organized so that a page is always loaded
+        * to a particular buffer.  That way we can easily calculate the buffer
+        * a given page must be loaded into, from the XLogRecPtr alone.
+        */
+       idx = XLogRecPtrToBufIdx(ptr);
 
        /*
-        * Be sure to re-zero the buffer so that bytes beyond what we've written
-        * will look like zeroes and not valid XLOG records...
+        * See what page is loaded in the buffer at the moment. It could be the
+        * page we're looking for, or something older. It can't be anything newer
+        * - that would imply the page we're looking for has already been written
+        * out to disk and evicted, and the caller is responsible for making sure
+        * that doesn't happen.
+        *
+        * However, we don't hold a lock while we read the value. If someone has
+        * just initialized the page, it's possible that we get a "torn read" of
+        * the XLogRecPtr if 64-bit fetches are not atomic on this platform. In
+        * that case we will see a bogus value. That's ok, we'll grab the mapping
+        * lock (in AdvanceXLInsertBuffer) and retry if we see anything else than
+        * the page we're looking for. But it means that when we do this unlocked
+        * read, we might see a value that appears to be ahead of the page we're
+        * looking for. Don't PANIC on that, until we've verified the value while
+        * holding the lock.
         */
-       MemSet((char *) NewPage, 0, XLOG_BLCKSZ);
+       expectedEndPtr = ptr;
+       expectedEndPtr += XLOG_BLCKSZ - ptr % XLOG_BLCKSZ;
+
+       endptr = XLogCtl->xlblocks[idx];
+       if (expectedEndPtr != endptr)
+       {
+               /*
+                * Let others know that we're finished inserting the record up
+                * to the page boundary.
+                */
+               WakeupWaiters(expectedEndPtr - XLOG_BLCKSZ);
+
+               AdvanceXLInsertBuffer(ptr, false);
+               endptr = XLogCtl->xlblocks[idx];
+
+               if (expectedEndPtr != endptr)
+                       elog(PANIC, "could not find WAL buffer for %X/%X",
+                                (uint32) (ptr >> 32) , (uint32) ptr);
+       }
+       else
+       {
+               /*
+                * Make sure the initialization of the page is visible to us, and
+                * won't arrive later to overwrite the WAL data we write on the page.
+                */
+               pg_memory_barrier();
+       }
 
        /*
-        * Fill the new page's header
+        * Found the buffer holding this page. Return a pointer to the right
+        * offset within the page.
         */
-       NewPage   ->xlp_magic = XLOG_PAGE_MAGIC;
+       cachedPage = ptr / XLOG_BLCKSZ;
+       cachedPos = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ;
+
+       Assert(((XLogPageHeader) cachedPos)->xlp_magic == XLOG_PAGE_MAGIC);
+       Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr == ptr - (ptr % XLOG_BLCKSZ));
+
+       return cachedPos + ptr % XLOG_BLCKSZ;
+}
 
-       /* NewPage->xlp_info = 0; */    /* done by memset */
-       NewPage   ->xlp_tli = ThisTimeLineID;
-       NewPage   ->xlp_pageaddr = NewPageBeginPtr;
+/*
+ * Converts a "usable byte position" to XLogRecPtr. A usable byte position
+ * is the position starting from the beginning of WAL, excluding all WAL
+ * page headers.
+ */
+static XLogRecPtr
+XLogBytePosToRecPtr(uint64 bytepos)
+{
+       uint64          fullsegs;
+       uint64          fullpages;
+       uint64          bytesleft;
+       uint32          seg_offset;
+       XLogRecPtr      result;
+
+       fullsegs = bytepos / UsableBytesInSegment;
+       bytesleft = bytepos % UsableBytesInSegment;
+
+       if (bytesleft < XLOG_BLCKSZ - SizeOfXLogLongPHD)
+       {
+               /* fits on first page of segment */
+               seg_offset = bytesleft + SizeOfXLogLongPHD;
+       }
+       else
+       {
+               /* account for the first page on segment with long header */
+               seg_offset = XLOG_BLCKSZ;
+               bytesleft -= XLOG_BLCKSZ - SizeOfXLogLongPHD;
+
+               fullpages = bytesleft / UsableBytesInPage;
+               bytesleft = bytesleft % UsableBytesInPage;
+
+               seg_offset += fullpages * XLOG_BLCKSZ + bytesleft + SizeOfXLogShortPHD;
+       }
+
+       XLogSegNoOffsetToRecPtr(fullsegs, seg_offset, result);
+
+       return result;
+}
+
+/*
+ * Like XLogBytePosToRecPtr, but if the position is at a page boundary,
+ * returns a pointer to the beginning of the page (ie. before page header),
+ * not to where the first xlog record on that page would go to. This is used
+ * when converting a pointer to the end of a record.
+ */
+static XLogRecPtr
+XLogBytePosToEndRecPtr(uint64 bytepos)
+{
+       uint64          fullsegs;
+       uint64          fullpages;
+       uint64          bytesleft;
+       uint32          seg_offset;
+       XLogRecPtr      result;
+
+       fullsegs = bytepos / UsableBytesInSegment;
+       bytesleft = bytepos % UsableBytesInSegment;
+
+       if (bytesleft < XLOG_BLCKSZ - SizeOfXLogLongPHD)
+       {
+               /* fits on first page of segment */
+               if (bytesleft == 0)
+                       seg_offset = 0;
+               else
+                       seg_offset = bytesleft + SizeOfXLogLongPHD;
+       }
+       else
+       {
+               /* account for the first page on segment with long header */
+               seg_offset = XLOG_BLCKSZ;
+               bytesleft -= XLOG_BLCKSZ - SizeOfXLogLongPHD;
+
+               fullpages = bytesleft / UsableBytesInPage;
+               bytesleft = bytesleft % UsableBytesInPage;
+
+               if (bytesleft == 0)
+                       seg_offset += fullpages * XLOG_BLCKSZ + bytesleft;
+               else
+                       seg_offset += fullpages * XLOG_BLCKSZ + bytesleft + SizeOfXLogShortPHD;
+       }
+
+       XLogSegNoOffsetToRecPtr(fullsegs, seg_offset, result);
+
+       return result;
+}
+
+/*
+ * Convert an XLogRecPtr to a "usable byte position".
+ */
+static uint64
+XLogRecPtrToBytePos(XLogRecPtr ptr)
+{
+       uint64          fullsegs;
+       uint32          fullpages;
+       uint32          offset;
+       uint64          result;
+
+       XLByteToSeg(ptr, fullsegs);
+
+       fullpages = (ptr % XLOG_SEG_SIZE) / XLOG_BLCKSZ;
+       offset = ptr % XLOG_BLCKSZ;
+
+       if (fullpages == 0)
+       {
+               result = fullsegs * UsableBytesInSegment;
+               if (offset > 0)
+               {
+                       Assert(offset >= SizeOfXLogLongPHD);
+                       result += offset - SizeOfXLogLongPHD;
+               }
+       }
+       else
+       {
+               result = fullsegs * UsableBytesInSegment +
+                       (XLOG_BLCKSZ - SizeOfXLogLongPHD) +  /* account for first page */
+                       (fullpages - 1) * UsableBytesInPage; /* full pages */
+               if (offset > 0)
+               {
+                       Assert(offset >= SizeOfXLogShortPHD);
+                       result += offset - SizeOfXLogShortPHD;
+               }
+       }
+
+       return result;
+}
+
+/*
+ * Determine whether the buffer referenced by an XLogRecData item has to
+ * be backed up, and if so fill a BkpBlock struct for it.  In any case
+ * save the buffer's LSN at *lsn.
+ */
+static bool
+XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock,
+                               XLogRecPtr *lsn, BkpBlock *bkpb)
+{
+       Page            page;
+
+       page = BufferGetPage(rdata->buffer);
 
        /*
-        * If online backup is not in progress, mark the header to indicate that
-        * WAL records beginning in this page have removable backup blocks.  This
-        * allows the WAL archiver to know whether it is safe to compress archived
-        * WAL data by transforming full-block records into the non-full-block
-        * format.      It is sufficient to record this at the page level because we
-        * force a page switch (in fact a segment switch) when starting a backup,
-        * so the flag will be off before any records can be written during the
-        * backup.      At the end of a backup, the last page will be marked as all
-        * unsafe when perhaps only part is unsafe, but at worst the archiver
-        * would miss the opportunity to compress a few records.
+        * We assume page LSN is first data on *every* page that can be passed to
+        * XLogInsert, whether it has the standard page layout or not. We don't
+        * need to take the buffer header lock for PageGetLSN if we hold an
+        * exclusive lock on the page and/or the relation.
         */
-       if (!Insert->forcePageWrites)
-               NewPage   ->xlp_info |= XLP_BKP_REMOVABLE;
+       if (holdsExclusiveLock)
+               *lsn = PageGetLSN(page);
+       else
+               *lsn = BufferGetLSNAtomic(rdata->buffer);
+
+       if (*lsn <= RedoRecPtr)
+       {
+               /*
+                * The page needs to be backed up, so set up *bkpb
+                */
+               BufferGetTag(rdata->buffer, &bkpb->node, &bkpb->fork, &bkpb->block);
+
+               if (rdata->buffer_std)
+               {
+                       /* Assume we can omit data between pd_lower and pd_upper */
+                       uint16          lower = ((PageHeader) page)->pd_lower;
+                       uint16          upper = ((PageHeader) page)->pd_upper;
+
+                       if (lower >= SizeOfPageHeaderData &&
+                               upper > lower &&
+                               upper <= BLCKSZ)
+                       {
+                               bkpb->hole_offset = lower;
+                               bkpb->hole_length = upper - lower;
+                       }
+                       else
+                       {
+                               /* No "hole" to compress out */
+                               bkpb->hole_offset = 0;
+                               bkpb->hole_length = 0;
+                       }
+               }
+               else
+               {
+                       /* Not a standard page header, don't try to eliminate "hole" */
+                       bkpb->hole_offset = 0;
+                       bkpb->hole_length = 0;
+               }
+
+               return true;                    /* buffer requires backup */
+       }
+
+       return false;                           /* buffer does not need to be backed up */
+}
+
+/*
+ * Initialize XLOG buffers, writing out old buffers if they still contain
+ * unwritten data, upto the page containing 'upto'. Or if 'opportunistic' is
+ * true, initialize as many pages as we can without having to write out
+ * unwritten data. Any new pages are initialized to zeros, with pages headers
+ * initialized properly.
+ */
+static void
+AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
+{
+       XLogCtlInsert *Insert = &XLogCtl->Insert;
+       int                     nextidx;
+       XLogRecPtr      OldPageRqstPtr;
+       XLogwrtRqst WriteRqst;
+       XLogRecPtr      NewPageEndPtr = InvalidXLogRecPtr;
+       XLogRecPtr      NewPageBeginPtr;
+       XLogPageHeader NewPage;
+       int                     npages = 0;
+
+       LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
 
        /*
-        * If first page of an XLOG segment file, make it a long header.
+        * Now that we have the lock, check if someone initialized the page
+        * already.
         */
-       if ((NewPage->xlp_pageaddr % XLogSegSize) == 0)
+       while (upto >= XLogCtl->InitializedUpTo || opportunistic)
        {
-               XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;
+               nextidx = XLogRecPtrToBufIdx(XLogCtl->InitializedUpTo);
+
+               /*
+                * Get ending-offset of the buffer page we need to replace (this may
+                * be zero if the buffer hasn't been used yet).  Fall through if it's
+                * already written out.
+                */
+               OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
+               if (LogwrtResult.Write < OldPageRqstPtr)
+               {
+                       /*
+                        * Nope, got work to do. If we just want to pre-initialize as much
+                        * as we can without flushing, give up now.
+                        */
+                       if (opportunistic)
+                               break;
 
-               NewLongPage->xlp_sysid = ControlFile->system_identifier;
-               NewLongPage->xlp_seg_size = XLogSegSize;
-               NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
-               NewPage   ->xlp_info |= XLP_LONG_HEADER;
+                       /* Before waiting, get info_lck and update LogwrtResult */
+                       {
+                               /* use volatile pointer to prevent code rearrangement */
+                               volatile XLogCtlData *xlogctl = XLogCtl;
+
+                               SpinLockAcquire(&xlogctl->info_lck);
+                               if (xlogctl->LogwrtRqst.Write < OldPageRqstPtr)
+                                       xlogctl->LogwrtRqst.Write = OldPageRqstPtr;
+                               LogwrtResult = xlogctl->LogwrtResult;
+                               SpinLockRelease(&xlogctl->info_lck);
+                       }
+
+                       /*
+                        * Now that we have an up-to-date LogwrtResult value, see if we
+                        * still need to write it or if someone else already did.
+                        */
+                       if (LogwrtResult.Write < OldPageRqstPtr)
+                       {
+                               /*
+                                * Must acquire write lock. Release WALBufMappingLock first,
+                                * to make sure that all insertions that we need to wait for
+                                * can finish (up to this same position). Otherwise we risk
+                                * deadlock.
+                                */
+                               LWLockRelease(WALBufMappingLock);
+
+                               WaitXLogInsertionsToFinish(OldPageRqstPtr);
+
+                               LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
+
+                               LogwrtResult = XLogCtl->LogwrtResult;
+                               if (LogwrtResult.Write >= OldPageRqstPtr)
+                               {
+                                       /* OK, someone wrote it already */
+                                       LWLockRelease(WALWriteLock);
+                               }
+                               else
+                               {
+                                       /* Have to write it ourselves */
+                                       TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
+                                       WriteRqst.Write = OldPageRqstPtr;
+                                       WriteRqst.Flush = 0;
+                                       XLogWrite(WriteRqst, false);
+                                       LWLockRelease(WALWriteLock);
+                                       TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
+                               }
+                               /* Re-acquire WALBufMappingLock and retry */
+                               LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
+                               continue;
+                       }
+               }
+
+               /*
+                * Now the next buffer slot is free and we can set it up to be the next
+                * output page.
+                */
+               NewPageBeginPtr = XLogCtl->InitializedUpTo;
+               NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ;
+
+               Assert(XLogRecPtrToBufIdx(NewPageBeginPtr) == nextidx);
 
-               Insert->currpos = ((char *) NewPage) +SizeOfXLogLongPHD;
+               NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
+
+               /*
+                * Be sure to re-zero the buffer so that bytes beyond what we've
+                * written will look like zeroes and not valid XLOG records...
+                */
+               MemSet((char *) NewPage, 0, XLOG_BLCKSZ);
+
+               /*
+                * Fill the new page's header
+                */
+               NewPage   ->xlp_magic = XLOG_PAGE_MAGIC;
+
+               /* NewPage->xlp_info = 0; */    /* done by memset */
+               NewPage   ->xlp_tli = ThisTimeLineID;
+               NewPage   ->xlp_pageaddr = NewPageBeginPtr;
+               /* NewPage->xlp_rem_len = 0; */         /* done by memset */
+
+               /*
+                * If online backup is not in progress, mark the header to indicate
+                * that* WAL records beginning in this page have removable backup
+                * blocks.  This allows the WAL archiver to know whether it is safe to
+                * compress archived WAL data by transforming full-block records into
+                * the non-full-block format.  It is sufficient to record this at the
+                * page level because we force a page switch (in fact a segment switch)
+                * when starting a backup, so the flag will be off before any records
+                * can be written during the backup.  At the end of a backup, the last
+                * page will be marked as all unsafe when perhaps only part is unsafe,
+                * but at worst the archiver would miss the opportunity to compress a
+                * few records.
+                */
+               if (!Insert->forcePageWrites)
+                       NewPage   ->xlp_info |= XLP_BKP_REMOVABLE;
+
+               /*
+                * If first page of an XLOG segment file, make it a long header.
+                */
+               if ((NewPage->xlp_pageaddr % XLogSegSize) == 0)
+               {
+                       XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;
+
+                       NewLongPage->xlp_sysid = ControlFile->system_identifier;
+                       NewLongPage->xlp_seg_size = XLogSegSize;
+                       NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
+                       NewPage   ->xlp_info |= XLP_LONG_HEADER;
+               }
+
+               /*
+                * Make sure the initialization of the page becomes visible to others
+                * before the xlblocks update. GetXLogBuffer() reads xlblocks without
+                * holding a lock.
+                */
+               pg_write_barrier();
+
+               *((volatile XLogRecPtr *) &XLogCtl->xlblocks[nextidx]) = NewPageEndPtr;
+
+               XLogCtl->InitializedUpTo = NewPageEndPtr;
+
+               npages++;
        }
+       LWLockRelease(WALBufMappingLock);
 
-       return update_needed;
+#ifdef WAL_DEBUG
+       if (npages > 0)
+       {
+               elog(DEBUG1, "initialized %d pages, upto %X/%X",
+                        npages, (uint32) (NewPageEndPtr >> 32), (uint32) NewPageEndPtr);
+       }
+#endif
 }
 
 /*
@@ -1430,18 +2585,13 @@ XLogCheckpointNeeded(XLogSegNo new_segno)
  * This option allows us to avoid uselessly issuing multiple writes when a
  * single one would do.
  *
- * If xlog_switch == TRUE, we are intending an xlog segment switch, so
- * perform end-of-segment actions after writing the last page, even if
- * it's not physically the end of its segment.  (NB: this will work properly
- * only if caller specifies WriteRqst == page-end and flexible == false,
- * and there is some data to write.)
- *
- * Must be called with WALWriteLock held.
+ * Must be called with WALWriteLock held. WaitXLogInsertionsToFinish(WriteRqst)
+ * must be called before grabbing the lock, to make sure the data is ready to
+ * write.
  */
 static void
-XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
+XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 {
-       XLogCtlWrite *Write = &XLogCtl->Write;
        bool            ispartialpage;
        bool            last_iteration;
        bool            finishing_seg;
@@ -1474,29 +2624,28 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
 
        /*
         * Within the loop, curridx is the cache block index of the page to
-        * consider writing.  We advance Write->curridx only after successfully
-        * writing pages.  (Right now, this refinement is useless since we are
-        * going to PANIC if any error occurs anyway; but someday it may come in
-        * useful.)
+        * consider writing.  Begin at the buffer containing the next unwritten
+        * page, or last partially written page.
         */
-       curridx = Write->curridx;
+       curridx = XLogRecPtrToBufIdx(LogwrtResult.Write);
 
-       while (XLByteLT(LogwrtResult.Write, WriteRqst.Write))
+       while (LogwrtResult.Write < WriteRqst.Write)
        {
                /*
                 * Make sure we're not ahead of the insert process.  This could happen
                 * if we're passed a bogus WriteRqst.Write that is past the end of the
                 * last page that's been initialized by AdvanceXLInsertBuffer.
                 */
-               if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[curridx]))
+               XLogRecPtr EndPtr = XLogCtl->xlblocks[curridx];
+               if (LogwrtResult.Write >= EndPtr)
                        elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
-                                (uint32) (LogwrtResult.Write >> 32), (uint32) LogwrtResult.Write,
-                                (uint32) (XLogCtl->xlblocks[curridx] >> 32),
-                                (uint32) XLogCtl->xlblocks[curridx]);
+                                (uint32) (LogwrtResult.Write >> 32),
+                                (uint32) LogwrtResult.Write,
+                                (uint32) (EndPtr >> 32), (uint32) EndPtr);
 
                /* Advance LogwrtResult.Write to end of current buffer page */
-               LogwrtResult.Write = XLogCtl->xlblocks[curridx];
-               ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);
+               LogwrtResult.Write = EndPtr;
+               ispartialpage = WriteRqst.Write < LogwrtResult.Write;
 
                if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo))
                {
@@ -1538,7 +2687,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
                 * contiguous in memory), or if we are at the end of the logfile
                 * segment.
                 */
-               last_iteration = !XLByteLT(LogwrtResult.Write, WriteRqst.Write);
+               last_iteration = WriteRqst.Write <= LogwrtResult.Write;
 
                finishing_seg = !ispartialpage &&
                        (startoffset + npages * XLOG_BLCKSZ) >= XLogSegSize;
@@ -1549,6 +2698,8 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
                {
                        char       *from;
                        Size            nbytes;
+                       Size            nleft;
+                       int                     written;
 
                        /* Need to seek in the file? */
                        if (openLogOff != startoffset)
@@ -1556,32 +2707,37 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
                                if (lseek(openLogFile, (off_t) startoffset, SEEK_SET) < 0)
                                        ereport(PANIC,
                                                        (errcode_for_file_access(),
-                                                        errmsg("could not seek in log file %s to offset %u: %m",
-                                                                       XLogFileNameP(ThisTimeLineID, openLogSegNo),
-                                                                       startoffset)));
+                                        errmsg("could not seek in log file %s to offset %u: %m",
+                                                       XLogFileNameP(ThisTimeLineID, openLogSegNo),
+                                                       startoffset)));
                                openLogOff = startoffset;
                        }
 
                        /* OK to write the page(s) */
                        from = XLogCtl->pages + startidx * (Size) XLOG_BLCKSZ;
                        nbytes = npages * (Size) XLOG_BLCKSZ;
-                       errno = 0;
-                       if (write(openLogFile, from, nbytes) != nbytes)
+                       nleft = nbytes;
+                       do
                        {
-                               /* if write didn't set errno, assume no disk space */
-                               if (errno == 0)
-                                       errno = ENOSPC;
-                               ereport(PANIC,
-                                               (errcode_for_file_access(),
-                                                errmsg("could not write to log file %s "
-                                                               "at offset %u, length %lu: %m",
-                                                               XLogFileNameP(ThisTimeLineID, openLogSegNo),
-                                                               openLogOff, (unsigned long) nbytes)));
-                       }
+                               errno = 0;
+                               written  = write(openLogFile, from, nleft);
+                               if (written <= 0)
+                               {
+                                       if (errno == EINTR)
+                                               continue;
+                                       ereport(PANIC,
+                                                       (errcode_for_file_access(),
+                                                        errmsg("could not write to log file %s "
+                                                                       "at offset %u, length %lu: %m",
+                                                                       XLogFileNameP(ThisTimeLineID, openLogSegNo),
+                                                                       openLogOff, (unsigned long) nbytes)));
+                               }
+                               nleft -= written;
+                               from += written;
+                       } while (nleft > 0);
 
                        /* Update state for write */
                        openLogOff += nbytes;
-                       Write->curridx = ispartialpage ? curridx : NextBufIdx(curridx);
                        npages = 0;
 
                        /*
@@ -1591,16 +2747,13 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
                         * later. Doing it here ensures that one and only one backend will
                         * perform this fsync.
                         *
-                        * We also do this if this is the last page written for an xlog
-                        * switch.
-                        *
                         * This is also the right place to notify the Archiver that the
                         * segment is ready to copy to archival storage, and to update the
                         * timer for archive_timeout, and to signal for a checkpoint if
                         * too many logfile segments have been used since the last
                         * checkpoint.
                         */
-                       if (finishing_seg || (xlog_switch && last_iteration))
+                       if (finishing_seg)
                        {
                                issue_xlog_fsync(openLogFile, openLogSegNo);
 
@@ -1612,7 +2765,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
                                if (XLogArchivingActive())
                                        XLogArchiveNotifySeg(openLogSegNo);
 
-                               Write->lastSegSwitchTime = (pg_time_t) time(NULL);
+                               XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);
 
                                /*
                                 * Request a checkpoint if we've consumed too much xlog since
@@ -1644,13 +2797,13 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
        }
 
        Assert(npages == 0);
-       Assert(curridx == Write->curridx);
 
        /*
         * If asked to flush, do so
         */
-       if (XLByteLT(LogwrtResult.Flush, WriteRqst.Flush) &&
-               XLByteLT(LogwrtResult.Flush, LogwrtResult.Write))
+       if (LogwrtResult.Flush < WriteRqst.Flush &&
+               LogwrtResult.Flush < LogwrtResult.Write)
+
        {
                /*
                 * Could get here without iterating above loop, in which case we might
@@ -1692,9 +2845,9 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
 
                SpinLockAcquire(&xlogctl->info_lck);
                xlogctl->LogwrtResult = LogwrtResult;
-               if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
+               if (xlogctl->LogwrtRqst.Write < LogwrtResult.Write)
                        xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
-               if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
+               if (xlogctl->LogwrtRqst.Flush < LogwrtResult.Flush)
                        xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
                SpinLockRelease(&xlogctl->info_lck);
        }
@@ -1717,7 +2870,7 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
        SpinLockAcquire(&xlogctl->info_lck);
        LogwrtResult = xlogctl->LogwrtResult;
        sleeping = xlogctl->WalWriterSleeping;
-       if (XLByteLT(xlogctl->asyncXactLSN, asyncXactLSN))
+       if (xlogctl->asyncXactLSN < asyncXactLSN)
                xlogctl->asyncXactLSN = asyncXactLSN;
        SpinLockRelease(&xlogctl->info_lck);
 
@@ -1732,7 +2885,7 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
                WriteRqstPtr -= WriteRqstPtr % XLOG_BLCKSZ;
 
                /* if we have already flushed that far, we're done */
-               if (XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
+               if (WriteRqstPtr <= LogwrtResult.Flush)
                        return;
        }
 
@@ -1758,13 +2911,14 @@ static void
 UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force)
 {
        /* Quick check using our local copy of the variable */
-       if (!updateMinRecoveryPoint || (!force && XLByteLE(lsn, minRecoveryPoint)))
+       if (!updateMinRecoveryPoint || (!force && lsn <= minRecoveryPoint))
                return;
 
        LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
 
        /* update local copy */
        minRecoveryPoint = ControlFile->minRecoveryPoint;
+       minRecoveryPointTLI = ControlFile->minRecoveryPointTLI;
 
        /*
         * An invalid minRecoveryPoint means that we need to recover all the WAL,
@@ -1773,11 +2927,12 @@ UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force)
         */
        if (minRecoveryPoint == 0)
                updateMinRecoveryPoint = false;
-       else if (force || XLByteLT(minRecoveryPoint, lsn))
+       else if (force || minRecoveryPoint < lsn)
        {
                /* use volatile pointer to prevent code rearrangement */
                volatile XLogCtlData *xlogctl = XLogCtl;
                XLogRecPtr      newMinRecoveryPoint;
+               TimeLineID      newMinRecoveryPointTLI;
 
                /*
                 * To avoid having to update the control file too often, we update it
@@ -1794,26 +2949,30 @@ UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force)
                 */
                SpinLockAcquire(&xlogctl->info_lck);
                newMinRecoveryPoint = xlogctl->replayEndRecPtr;
+               newMinRecoveryPointTLI = xlogctl->replayEndTLI;
                SpinLockRelease(&xlogctl->info_lck);
 
-               if (!force && XLByteLT(newMinRecoveryPoint, lsn))
+               if (!force && newMinRecoveryPoint < lsn)
                        elog(WARNING,
                           "xlog min recovery request %X/%X is past current point %X/%X",
-                                (uint32) (lsn >> 32) , (uint32) lsn,
+                                (uint32) (lsn >> 32), (uint32) lsn,
                                 (uint32) (newMinRecoveryPoint >> 32),
                                 (uint32) newMinRecoveryPoint);
 
                /* update control file */
-               if (XLByteLT(ControlFile->minRecoveryPoint, newMinRecoveryPoint))
+               if (ControlFile->minRecoveryPoint < newMinRecoveryPoint)
                {
                        ControlFile->minRecoveryPoint = newMinRecoveryPoint;
+                       ControlFile->minRecoveryPointTLI = newMinRecoveryPointTLI;
                        UpdateControlFile();
                        minRecoveryPoint = newMinRecoveryPoint;
+                       minRecoveryPointTLI = newMinRecoveryPointTLI;
 
                        ereport(DEBUG2,
-                                       (errmsg("updated min recovery point to %X/%X",
-                                                       (uint32) (minRecoveryPoint >> 32),
-                                                       (uint32) minRecoveryPoint)));
+                               (errmsg("updated min recovery point to %X/%X on timeline %u",
+                                               (uint32) (minRecoveryPoint >> 32),
+                                               (uint32) minRecoveryPoint,
+                                               newMinRecoveryPointTLI)));
                }
        }
        LWLockRelease(ControlFileLock);
@@ -1845,7 +3004,7 @@ XLogFlush(XLogRecPtr record)
        }
 
        /* Quick exit if already known flushed */
-       if (XLByteLE(record, LogwrtResult.Flush))
+       if (record <= LogwrtResult.Flush)
                return;
 
 #ifdef WAL_DEBUG
@@ -1853,7 +3012,7 @@ XLogFlush(XLogRecPtr record)
                elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X",
                         (uint32) (record >> 32), (uint32) record,
                         (uint32) (LogwrtResult.Write >> 32), (uint32) LogwrtResult.Write,
-                        (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
+                  (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
 #endif
 
        START_CRIT_SECTION();
@@ -1877,18 +3036,25 @@ XLogFlush(XLogRecPtr record)
        {
                /* use volatile pointer to prevent code rearrangement */
                volatile XLogCtlData *xlogctl = XLogCtl;
+               XLogRecPtr      insertpos;
 
                /* read LogwrtResult and update local state */
                SpinLockAcquire(&xlogctl->info_lck);
-               if (XLByteLT(WriteRqstPtr, xlogctl->LogwrtRqst.Write))
+               if (WriteRqstPtr < xlogctl->LogwrtRqst.Write)
                        WriteRqstPtr = xlogctl->LogwrtRqst.Write;
                LogwrtResult = xlogctl->LogwrtResult;
                SpinLockRelease(&xlogctl->info_lck);
 
                /* done already? */
-               if (XLByteLE(record, LogwrtResult.Flush))
+               if (record <= LogwrtResult.Flush)
                        break;
 
+               /*
+                * Before actually performing the write, wait for all in-flight
+                * insertions to the pages we're about to write to finish.
+                */
+               insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr);
+
                /*
                 * Try to get the write lock. If we can't get it immediately, wait
                 * until it's released, and recheck if we still need to do the flush
@@ -1908,7 +3074,7 @@ XLogFlush(XLogRecPtr record)
 
                /* Got the lock; recheck whether request is satisfied */
                LogwrtResult = XLogCtl->LogwrtResult;
-               if (XLByteLE(record, LogwrtResult.Flush))
+               if (record <= LogwrtResult.Flush)
                {
                        LWLockRelease(WALWriteLock);
                        break;
@@ -1917,39 +3083,35 @@ XLogFlush(XLogRecPtr record)
                /*
                 * Sleep before flush! By adding a delay here, we may give further
                 * backends the opportunity to join the backlog of group commit
-                * followers; this can significantly improve transaction throughput, at
-                * the risk of increasing transaction latency.
+                * followers; this can significantly improve transaction throughput,
+                * at the risk of increasing transaction latency.
                 *
                 * We do not sleep if enableFsync is not turned on, nor if there are
                 * fewer than CommitSiblings other backends with active transactions.
                 */
                if (CommitDelay > 0 && enableFsync &&
                        MinimumActiveBackends(CommitSiblings))
+               {
                        pg_usleep(CommitDelay);
 
+                       /*
+                        * Re-check how far we can now flush the WAL. It's generally not
+                        * safe to call WaitXLogInsetionsToFinish while holding
+                        * WALWriteLock, because an in-progress insertion might need to
+                        * also grab WALWriteLock to make progress. But we know that all
+                        * the insertions up to insertpos have already finished, because
+                        * that's what the earlier WaitXLogInsertionsToFinish() returned.
+                        * We're only calling it again to allow insertpos to be moved
+                        * further forward, not to actually wait for anyone.
+                        */
+                       insertpos = WaitXLogInsertionsToFinish(insertpos);
+               }
+
                /* try to write/flush later additions to XLOG as well */
-               if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE))
-               {
-                       XLogCtlInsert *Insert = &XLogCtl->Insert;
-                       uint32          freespace = INSERT_FREESPACE(Insert);
+               WriteRqst.Write = insertpos;
+               WriteRqst.Flush = insertpos;
 
-                       if (freespace == 0)             /* buffer is full */
-                               WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
-                       else
-                       {
-                               WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
-                               WriteRqstPtr -= freespace;
-                       }
-                       LWLockRelease(WALInsertLock);
-                       WriteRqst.Write = WriteRqstPtr;
-                       WriteRqst.Flush = WriteRqstPtr;
-               }
-               else
-               {
-                       WriteRqst.Write = WriteRqstPtr;
-                       WriteRqst.Flush = record;
-               }
-               XLogWrite(WriteRqst, false, false);
+               XLogWrite(WriteRqst, false);
 
                LWLockRelease(WALWriteLock);
                /* done */
@@ -1982,11 +3144,11 @@ XLogFlush(XLogRecPtr record)
         * calls from bufmgr.c are not within critical sections and so we will not
         * force a restart for a bad LSN on a data page.
         */
-       if (XLByteLT(LogwrtResult.Flush, record))
+       if (LogwrtResult.Flush < record)
                elog(ERROR,
                "xlog flush request %X/%X is not satisfied --- flushed only to %X/%X",
                         (uint32) (record >> 32), (uint32) record,
-                        (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
+                  (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
 }
 
 /*
@@ -2032,7 +3194,7 @@ XLogBackgroundFlush(void)
        WriteRqstPtr -= WriteRqstPtr % XLOG_BLCKSZ;
 
        /* if we have already flushed that far, consider async commit records */
-       if (XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
+       if (WriteRqstPtr <= LogwrtResult.Flush)
        {
                /* use volatile pointer to prevent code rearrangement */
                volatile XLogCtlData *xlogctl = XLogCtl;
@@ -2048,7 +3210,7 @@ XLogBackgroundFlush(void)
         * holding an open file handle to a logfile that's no longer in use,
         * preventing the file from being deleted.
         */
-       if (XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
+       if (WriteRqstPtr <= LogwrtResult.Flush)
        {
                if (openLogFile >= 0)
                {
@@ -2065,21 +3227,22 @@ XLogBackgroundFlush(void)
                elog(LOG, "xlog bg flush request %X/%X; write %X/%X; flush %X/%X",
                         (uint32) (WriteRqstPtr >> 32), (uint32) WriteRqstPtr,
                         (uint32) (LogwrtResult.Write >> 32), (uint32) LogwrtResult.Write,
-                        (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
+                  (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
 #endif
 
        START_CRIT_SECTION();
 
-       /* now wait for the write lock */
+       /* now wait for any in-progress insertions to finish and get write lock */
+       WaitXLogInsertionsToFinish(WriteRqstPtr);
        LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
        LogwrtResult = XLogCtl->LogwrtResult;
-       if (!XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
+       if (WriteRqstPtr > LogwrtResult.Flush)
        {
                XLogwrtRqst WriteRqst;
 
                WriteRqst.Write = WriteRqstPtr;
                WriteRqst.Flush = WriteRqstPtr;
-               XLogWrite(WriteRqst, flexible, false);
+               XLogWrite(WriteRqst, flexible);
                wrote_something = true;
        }
        LWLockRelease(WALWriteLock);
@@ -2089,6 +3252,12 @@ XLogBackgroundFlush(void)
        /* wake up walsenders now that we've released heavily contended locks */
        WalSndWakeupProcessRequests();
 
+       /*
+        * Great, done. To take some work off the critical path, try to initialize
+        * as many of the no-longer-needed WAL buffers for future use as we can.
+        */
+       AdvanceXLInsertBuffer(InvalidXLogRecPtr, true);
+
        return wrote_something;
 }
 
@@ -2109,7 +3278,7 @@ XLogNeedsFlush(XLogRecPtr record)
        if (RecoveryInProgress())
        {
                /* Quick exit if already known updated */
-               if (XLByteLE(record, minRecoveryPoint) || !updateMinRecoveryPoint)
+               if (record <= minRecoveryPoint || !updateMinRecoveryPoint)
                        return false;
 
                /*
@@ -2119,6 +3288,7 @@ XLogNeedsFlush(XLogRecPtr record)
                if (!LWLockConditionalAcquire(ControlFileLock, LW_SHARED))
                        return true;
                minRecoveryPoint = ControlFile->minRecoveryPoint;
+               minRecoveryPointTLI = ControlFile->minRecoveryPointTLI;
                LWLockRelease(ControlFileLock);
 
                /*
@@ -2131,14 +3301,14 @@ XLogNeedsFlush(XLogRecPtr record)
                        updateMinRecoveryPoint = false;
 
                /* check again */
-               if (XLByteLE(record, minRecoveryPoint) || !updateMinRecoveryPoint)
+               if (record <= minRecoveryPoint || !updateMinRecoveryPoint)
                        return false;
                else
                        return true;
        }
 
        /* Quick exit if already known flushed */
-       if (XLByteLE(record, LogwrtResult.Flush))
+       if (record <= LogwrtResult.Flush)
                return false;
 
        /* read LogwrtResult and update local state */
@@ -2152,7 +3322,7 @@ XLogNeedsFlush(XLogRecPtr record)
        }
 
        /* check again */
-       if (XLByteLE(record, LogwrtResult.Flush))
+       if (record <= LogwrtResult.Flush)
                return false;
 
        return true;
@@ -2316,7 +3486,7 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
        if (fd < 0)
                ereport(ERROR,
                                (errcode_for_file_access(),
-                  errmsg("could not open file \"%s\": %m", path)));
+                                errmsg("could not open file \"%s\": %m", path)));
 
        elog(DEBUG2, "done creating and filling new WAL file");
 
@@ -2543,7 +3713,7 @@ XLogFileOpen(XLogSegNo segno)
        if (fd < 0)
                ereport(PANIC,
                                (errcode_for_file_access(),
-                                errmsg("could not open xlog file \"%s\": %m", path)));
+                                errmsg("could not open transaction log file \"%s\": %m", path)));
 
        return fd;
 }
@@ -2551,7 +3721,7 @@ XLogFileOpen(XLogSegNo segno)
 /*
  * Open a logfile segment for reading (during recovery).
  *
- * If source = XLOG_FROM_ARCHIVE, the segment is retrieved from archive.
+ * If source == XLOG_FROM_ARCHIVE, the segment is retrieved from archive.
  * Otherwise, it's assumed to be already available in pg_xlog.
  */
 static int
@@ -2597,68 +3767,12 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
         */
        if (source == XLOG_FROM_ARCHIVE)
        {
-               char            xlogfpath[MAXPGPATH];
-               bool            reload = false;
-               struct stat statbuf;
-
-               XLogFilePath(xlogfpath, tli, segno);
-               if (stat(xlogfpath, &statbuf) == 0)
-               {
-                       char oldpath[MAXPGPATH];
-#ifdef WIN32
-                       static unsigned int deletedcounter = 1;
-                       /*
-                        * On Windows, if another process (e.g a walsender process) holds
-                        * the file open in FILE_SHARE_DELETE mode, unlink will succeed,
-                        * but the file will still show up in directory listing until the
-                        * last handle is closed, and we cannot rename the new file in its
-                        * place until that. To avoid that problem, rename the old file to
-                        * a temporary name first. Use a counter to create a unique
-                        * filename, because the same file might be restored from the
-                        * archive multiple times, and a walsender could still be holding
-                        * onto an old deleted version of it.
-                        */
-                       snprintf(oldpath, MAXPGPATH, "%s.deleted%u",
-                                        xlogfpath, deletedcounter++);
-                       if (rename(xlogfpath, oldpath) != 0)
-                       {
-                               ereport(ERROR,
-                                               (errcode_for_file_access(),
-                                                errmsg("could not rename file \"%s\" to \"%s\": %m",
-                                                               xlogfpath, oldpath)));
-                       }
-#else
-                       strncpy(oldpath, xlogfpath, MAXPGPATH);
-#endif
-                       if (unlink(oldpath) != 0)
-                               ereport(FATAL,
-                                               (errcode_for_file_access(),
-                                                errmsg("could not remove file \"%s\": %m",
-                                                               xlogfpath)));
-                       reload = true;
-               }
-
-               if (rename(path, xlogfpath) < 0)
-                       ereport(ERROR,
-                                       (errcode_for_file_access(),
-                                        errmsg("could not rename file \"%s\" to \"%s\": %m",
-                                                       path, xlogfpath)));
+               KeepFileRestoredFromArchive(path, xlogfname);
 
                /*
                 * Set path to point at the new file in pg_xlog.
                 */
-               strncpy(path, xlogfpath, MAXPGPATH);
-
-               /*
-                * If the existing segment was replaced, since walsenders might have
-                * it open, request them to reload a currently-open segment.
-                */
-               if (reload)
-                       WalSndRqstFileReload();
-
-               /* Signal walsender that new WAL has arrived */
-               if (AllowCascadeReplication())
-                       WalSndWakeup();
+               snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlogfname);
        }
 
        fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
@@ -2679,9 +3793,6 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
                if (source != XLOG_FROM_STREAM)
                        XLogReceiptTime = GetCurrentTimestamp();
 
-               /* The file header needs to be validated on first access */
-               readFileHeaderValidated = false;
-
                return fd;
        }
        if (errno != ENOENT || !notfoundOk) /* unexpected failure? */
@@ -2694,47 +3805,69 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
 /*
  * Open a logfile segment for reading (during recovery).
  *
- * This version searches for the segment with any TLI listed in expectedTLIs.
+ * This version searches for the segment with any TLI listed in expectedTLEs.
  */
 static int
-XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources)
+XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source)
 {
        char            path[MAXPGPATH];
        ListCell   *cell;
        int                     fd;
+       List       *tles;
 
        /*
         * Loop looking for a suitable timeline ID: we might need to read any of
-        * the timelines listed in expectedTLIs.
+        * the timelines listed in expectedTLEs.
         *
         * We expect curFileTLI on entry to be the TLI of the preceding file in
         * sequence, or 0 if there was no predecessor.  We do not allow curFileTLI
         * to go backwards; this prevents us from picking up the wrong file when a
         * parent timeline extends to higher segment numbers than the child we
         * want to read.
-        */
-       foreach(cell, expectedTLIs)
+        *
+        * If we haven't read the timeline history file yet, read it now, so that
+        * we know which TLIs to scan.  We don't save the list in expectedTLEs,
+        * however, unless we actually find a valid segment.  That way if there is
+        * neither a timeline history file nor a WAL segment in the archive, and
+        * streaming replication is set up, we'll read the timeline history file
+        * streamed from the master when we start streaming, instead of recovering
+        * with a dummy history generated here.
+        */
+       if (expectedTLEs)
+               tles = expectedTLEs;
+       else
+               tles = readTimeLineHistory(recoveryTargetTLI);
+
+       foreach(cell, tles)
        {
-               TimeLineID      tli = (TimeLineID) lfirst_int(cell);
+               TimeLineID      tli = ((TimeLineHistoryEntry *) lfirst(cell))->tli;
 
                if (tli < curFileTLI)
                        break;                          /* don't bother looking at too-old TLIs */
 
-               if (sources & XLOG_FROM_ARCHIVE)
+               if (source == XLOG_FROM_ANY || source == XLOG_FROM_ARCHIVE)
                {
-                       fd = XLogFileRead(segno, emode, tli, XLOG_FROM_ARCHIVE, true);
+                       fd = XLogFileRead(segno, emode, tli,
+                                                         XLOG_FROM_ARCHIVE, true);
                        if (fd != -1)
                        {
                                elog(DEBUG1, "got WAL segment from archive");
+                               if (!expectedTLEs)
+                                       expectedTLEs = tles;
                                return fd;
                        }
                }
 
-               if (sources & XLOG_FROM_PG_XLOG)
+               if (source == XLOG_FROM_ANY || source == XLOG_FROM_PG_XLOG)
                {
-                       fd = XLogFileRead(segno, emode, tli, XLOG_FROM_PG_XLOG, true);
+                       fd = XLogFileRead(segno, emode, tli,
+                                                         XLOG_FROM_PG_XLOG, true);
                        if (fd != -1)
+                       {
+                               if (!expectedTLEs)
+                                       expectedTLEs = tles;
                                return fd;
+                       }
                }
        }
 
@@ -2804,18 +3937,33 @@ PreallocXlogFiles(XLogRecPtr endptr)
 }
 
 /*
- * Get the segno of the latest removed or recycled WAL segment.
- * Returns 0/0 if no WAL segments have been removed since startup.
+ * Throws an error if the given log segment has already been removed or
+ * recycled. The caller should only pass a segment that it knows to have
+ * existed while the server has been running, as this function always
+ * succeeds if no WAL segments have been removed since startup.
+ * 'tli' is only used in the error message.
  */
 void
-XLogGetLastRemoved(XLogSegNo *segno)
+CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
 {
        /* use volatile pointer to prevent code rearrangement */
        volatile XLogCtlData *xlogctl = XLogCtl;
+       XLogSegNo       lastRemovedSegNo;
 
        SpinLockAcquire(&xlogctl->info_lck);
-       *segno = xlogctl->lastRemovedSegNo;
+       lastRemovedSegNo = xlogctl->lastRemovedSegNo;
        SpinLockRelease(&xlogctl->info_lck);
+
+       if (segno <= lastRemovedSegNo)
+       {
+               char            filename[MAXFNAMELEN];
+
+               XLogFileName(filename, tli, segno);
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("requested WAL segment %s has already been removed",
+                                               filename)));
+       }
 }
 
 /*
@@ -2873,7 +4021,12 @@ RemoveOldXlogFiles(XLogSegNo segno, XLogRecPtr endptr)
                                 errmsg("could not open transaction log directory \"%s\": %m",
                                                XLOGDIR)));
 
-       XLogFileName(lastoff, ThisTimeLineID, segno);
+       /*
+        * Construct a filename of the last segment to be kept. The timeline ID
+        * doesn't matter, we ignore that in the comparison. (During recovery,
+        * ThisTimeLineID isn't set, so we can't use that.)
+        */
+       XLogFileName(lastoff, 0, segno);
 
        elog(DEBUG2, "attempting to remove WAL segments older than log file %s",
                 lastoff);
@@ -2895,7 +4048,7 @@ RemoveOldXlogFiles(XLogSegNo segno, XLogRecPtr endptr)
                        strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
                        strcmp(xlde->d_name + 8, lastoff + 8) <= 0)
                {
-                       if (RecoveryInProgress() || XLogArchiveCheckDone(xlde->d_name))
+                       if (XLogArchiveCheckDone(xlde->d_name))
                        {
                                snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
 
@@ -3103,681 +4256,226 @@ Buffer
 RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index,
                                   bool get_cleanup_lock, bool keep_buffer)
 {
-       Buffer          buffer;
-       Page            page;
        BkpBlock        bkpb;
        char       *blk;
        int                     i;
 
        /* Locate requested BkpBlock in the record */
-       blk = (char *) XLogRecGetData(record) + record->xl_len;
-       for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
-       {
-               if (!(record->xl_info & XLR_BKP_BLOCK(i)))
-                       continue;
-
-               memcpy(&bkpb, blk, sizeof(BkpBlock));
-               blk += sizeof(BkpBlock);
-
-               if (i == block_index)
-               {
-                       /* Found it, apply the update */
-                       buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block,
-                                                                                       RBM_ZERO);
-                       Assert(BufferIsValid(buffer));
-                       if (get_cleanup_lock)
-                               LockBufferForCleanup(buffer);
-                       else
-                               LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
-
-                       page = (Page) BufferGetPage(buffer);
-
-                       if (bkpb.hole_length == 0)
-                       {
-                               memcpy((char *) page, blk, BLCKSZ);
-                       }
-                       else
-                       {
-                               memcpy((char *) page, blk, bkpb.hole_offset);
-                               /* must zero-fill the hole */
-                               MemSet((char *) page + bkpb.hole_offset, 0, bkpb.hole_length);
-                               memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length),
-                                          blk + bkpb.hole_offset,
-                                          BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
-                       }
-
-                       PageSetLSN(page, lsn);
-                       PageSetTLI(page, ThisTimeLineID);
-                       MarkBufferDirty(buffer);
-
-                       if (!keep_buffer)
-                               UnlockReleaseBuffer(buffer);
-
-                       return buffer;
-               }
-
-               blk += BLCKSZ - bkpb.hole_length;
-       }
-
-       /* Caller specified a bogus block_index */
-       elog(ERROR, "failed to restore block_index %d", block_index);
-       return InvalidBuffer;           /* keep compiler quiet */
-}
-
-/*
- * CRC-check an XLOG record.  We do not believe the contents of an XLOG
- * record (other than to the minimal extent of computing the amount of
- * data to read in) until we've checked the CRCs.
- *
- * We assume all of the record (that is, xl_tot_len bytes) has been read
- * into memory at *record.  Also, ValidXLogRecordHeader() has accepted the
- * record's header, which means in particular that xl_tot_len is at least
- * SizeOfXlogRecord, so it is safe to fetch xl_len.
- */
-static bool
-RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
-{
-       pg_crc32        crc;
-       int                     i;
-       uint32          len = record->xl_len;
-       BkpBlock        bkpb;
-       char       *blk;
-       size_t          remaining = record->xl_tot_len;
-
-       /* First the rmgr data */
-       if (remaining < SizeOfXLogRecord + len)
-       {
-               /* ValidXLogRecordHeader() should've caught this already... */
-               ereport(emode_for_corrupt_record(emode, recptr),
-                               (errmsg("invalid record length at %X/%X",
-                                               (uint32) (recptr >> 32), (uint32) recptr)));
-               return false;
-       }
-       remaining -= SizeOfXLogRecord + len;
-       INIT_CRC32(crc);
-       COMP_CRC32(crc, XLogRecGetData(record), len);
-
-       /* Add in the backup blocks, if any */
-       blk = (char *) XLogRecGetData(record) + len;
-       for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
-       {
-               uint32          blen;
-
-               if (!(record->xl_info & XLR_BKP_BLOCK(i)))
-                       continue;
-
-               if (remaining < sizeof(BkpBlock))
-               {
-                       ereport(emode_for_corrupt_record(emode, recptr),
-                                       (errmsg("invalid backup block size in record at %X/%X",
-                                                       (uint32) (recptr >> 32), (uint32) recptr)));
-                       return false;
-               }
-               memcpy(&bkpb, blk, sizeof(BkpBlock));
-
-               if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ)
-               {
-                       ereport(emode_for_corrupt_record(emode, recptr),
-                                       (errmsg("incorrect hole size in record at %X/%X",
-                                                       (uint32) (recptr >> 32), (uint32) recptr)));
-                       return false;
-               }
-               blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length;
-
-               if (remaining < blen)
-               {
-                       ereport(emode_for_corrupt_record(emode, recptr),
-                                       (errmsg("invalid backup block size in record at %X/%X",
-                                                       (uint32) (recptr >> 32), (uint32) recptr)));
-                       return false;
-               }
-               remaining -= blen;
-               COMP_CRC32(crc, blk, blen);
-               blk += blen;
-       }
-
-       /* Check that xl_tot_len agrees with our calculation */
-       if (remaining != 0)
-       {
-               ereport(emode_for_corrupt_record(emode, recptr),
-                               (errmsg("incorrect total length in record at %X/%X",
-                                               (uint32) (recptr >> 32), (uint32) recptr)));
-               return false;
-       }
-
-       /* Finally include the record header */
-       COMP_CRC32(crc, (char *) record, offsetof(XLogRecord, xl_crc));
-       FIN_CRC32(crc);
-
-       if (!EQ_CRC32(record->xl_crc, crc))
-       {
-               ereport(emode_for_corrupt_record(emode, recptr),
-               (errmsg("incorrect resource manager data checksum in record at %X/%X",
-                               (uint32) (recptr >> 32), (uint32) recptr)));
-               return false;
-       }
-
-       return true;
-}
-
-/*
- * Attempt to read an XLOG record.
- *
- * If RecPtr is not NULL, try to read a record at that position.  Otherwise
- * try to read a record just after the last one previously read.
- *
- * If no valid record is available, returns NULL, or fails if emode is PANIC.
- * (emode must be either PANIC, LOG)
- *
- * The record is copied into readRecordBuf, so that on successful return,
- * the returned record pointer always points there.
- */
-static XLogRecord *
-ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
-{
-       XLogRecord *record;
-       XLogRecPtr      tmpRecPtr = EndRecPtr;
-       bool            randAccess = false;
-       uint32          len,
-                               total_len;
-       uint32          targetRecOff;
-       uint32          pageHeaderSize;
-       bool            gotheader;
-
-       if (readBuf == NULL)
-       {
-               /*
-                * First time through, permanently allocate readBuf.  We do it this
-                * way, rather than just making a static array, for two reasons: (1)
-                * no need to waste the storage in most instantiations of the backend;
-                * (2) a static char array isn't guaranteed to have any particular
-                * alignment, whereas malloc() will provide MAXALIGN'd storage.
-                */
-               readBuf = (char *) malloc(XLOG_BLCKSZ);
-               Assert(readBuf != NULL);
-       }
-
-       if (RecPtr == NULL)
-       {
-               RecPtr = &tmpRecPtr;
-
-               /*
-                * RecPtr is pointing to end+1 of the previous WAL record.  If
-                * we're at a page boundary, no more records can fit on the current
-                * page. We must skip over the page header, but we can't do that
-                * until we've read in the page, since the header size is variable.
-                */
-       }
-       else
-       {
-               /*
-                * In this case, the passed-in record pointer should already be
-                * pointing to a valid record starting position.
-                */
-               if (!XRecOffIsValid(*RecPtr))
-                       ereport(PANIC,
-                                       (errmsg("invalid record offset at %X/%X",
-                                                       (uint32) (*RecPtr >> 32), (uint32) *RecPtr)));
-
-               /*
-                * Since we are going to a random position in WAL, forget any prior
-                * state about what timeline we were in, and allow it to be any
-                * timeline in expectedTLIs.  We also set a flag to allow curFileTLI
-                * to go backwards (but we can't reset that variable right here, since
-                * we might not change files at all).
-                */
-               /* see comment in ValidXLogPageHeader */
-               lastPageTLI = lastSegmentTLI = 0;
-               randAccess = true;              /* allow curFileTLI to go backwards too */
-       }
-
-       /* This is the first try to read this page. */
-       failedSources = 0;
-retry:
-       /* Read the page containing the record */
-       if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess))
-               return NULL;
-
-       pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
-       targetRecOff = (*RecPtr) % XLOG_BLCKSZ;
-       if (targetRecOff == 0)
-       {
-               /*
-                * At page start, so skip over page header.  The Assert checks that
-                * we're not scribbling on caller's record pointer; it's OK because we
-                * can only get here in the continuing-from-prev-record case, since
-                * XRecOffIsValid rejected the zero-page-offset case otherwise.
-                */
-               Assert(RecPtr == &tmpRecPtr);
-               (*RecPtr) += pageHeaderSize;
-               targetRecOff = pageHeaderSize;
-       }
-       else if (targetRecOff < pageHeaderSize)
-       {
-               ereport(emode_for_corrupt_record(emode, *RecPtr),
-                               (errmsg("invalid record offset at %X/%X",
-                                               (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-               goto next_record_is_invalid;
-       }
-       if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
-               targetRecOff == pageHeaderSize)
-       {
-               ereport(emode_for_corrupt_record(emode, *RecPtr),
-                               (errmsg("contrecord is requested by %X/%X",
-                                               (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-               goto next_record_is_invalid;
-       }
-
-       /*
-        * Read the record length.
-        *
-        * NB: Even though we use an XLogRecord pointer here, the whole record
-        * header might not fit on this page. xl_tot_len is the first field of
-        * the struct, so it must be on this page (the records are MAXALIGNed),
-        * but we cannot access any other fields until we've verified that we
-        * got the whole header.
-        */
-       record = (XLogRecord *) (readBuf + (*RecPtr) % XLOG_BLCKSZ);
-       total_len = record->xl_tot_len;
-
-       /*
-        * If the whole record header is on this page, validate it immediately.
-        * Otherwise do just a basic sanity check on xl_tot_len, and validate the
-        * rest of the header after reading it from the next page.  The xl_tot_len
-        * check is necessary here to ensure that we enter the "Need to reassemble
-        * record" code path below; otherwise we might fail to apply
-        * ValidXLogRecordHeader at all.
-        */
-       if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
-       {
-               if (!ValidXLogRecordHeader(RecPtr, record, emode, randAccess))
-                       goto next_record_is_invalid;
-               gotheader = true;
-       }
-       else
-       {
-               if (total_len < SizeOfXLogRecord)
-               {
-                       ereport(emode_for_corrupt_record(emode, *RecPtr),
-                                       (errmsg("invalid record length at %X/%X",
-                                                       (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-                       goto next_record_is_invalid;
-               }
-               gotheader = false;
-       }
-
-       /*
-        * Allocate or enlarge readRecordBuf as needed.  To avoid useless small
-        * increases, round its size to a multiple of XLOG_BLCKSZ, and make sure
-        * it's at least 4*Max(BLCKSZ, XLOG_BLCKSZ) to start with.  (That is
-        * enough for all "normal" records, but very large commit or abort records
-        * might need more space.)
-        */
-       if (total_len > readRecordBufSize)
-       {
-               uint32          newSize = total_len;
-
-               newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
-               newSize = Max(newSize, 4 * Max(BLCKSZ, XLOG_BLCKSZ));
-               if (readRecordBuf)
-                       free(readRecordBuf);
-               readRecordBuf = (char *) malloc(newSize);
-               if (!readRecordBuf)
-               {
-                       readRecordBufSize = 0;
-                       /* We treat this as a "bogus data" condition */
-                       ereport(emode_for_corrupt_record(emode, *RecPtr),
-                                       (errmsg("record length %u at %X/%X too long",
-                                                       total_len, (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-                       goto next_record_is_invalid;
-               }
-               readRecordBufSize = newSize;
-       }
-
-       len = XLOG_BLCKSZ - (*RecPtr) % XLOG_BLCKSZ;
-       if (total_len > len)
-       {
-               /* Need to reassemble record */
-               char       *contrecord;
-               XLogPageHeader pageHeader;
-               XLogRecPtr      pagelsn;
-               char       *buffer;
-               uint32          gotlen;
-
-               /* Initialize pagelsn to the beginning of the page this record is on */
-               pagelsn = ((*RecPtr) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
-
-               /* Copy the first fragment of the record from the first page. */
-               memcpy(readRecordBuf, readBuf + (*RecPtr) % XLOG_BLCKSZ, len);
-               buffer = readRecordBuf + len;
-               gotlen = len;
-
-               do
-               {
-                       /* Calculate pointer to beginning of next page */
-                       XLByteAdvance(pagelsn, XLOG_BLCKSZ);
-                       /* Wait for the next page to become available */
-                       if (!XLogPageRead(&pagelsn, emode, false, false))
-                               return NULL;
-
-                       /* Check that the continuation on next page looks valid */
-                       pageHeader = (XLogPageHeader) readBuf;
-                       if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
-                       {
-                               ereport(emode_for_corrupt_record(emode, *RecPtr),
-                                               (errmsg("there is no contrecord flag in log segment %s, offset %u",
-                                                               XLogFileNameP(curFileTLI, readSegNo),
-                                                               readOff)));
-                               goto next_record_is_invalid;
-                       }
-                       /*
-                        * Cross-check that xlp_rem_len agrees with how much of the record
-                        * we expect there to be left.
-                        */
-                       if (pageHeader->xlp_rem_len == 0 ||
-                               total_len != (pageHeader->xlp_rem_len + gotlen))
-                       {
-                               ereport(emode_for_corrupt_record(emode, *RecPtr),
-                                               (errmsg("invalid contrecord length %u in log segment %s, offset %u",
-                                                               pageHeader->xlp_rem_len,
-                                                               XLogFileNameP(curFileTLI, readSegNo),
-                                                               readOff)));
-                               goto next_record_is_invalid;
-                       }
-
-                       /* Append the continuation from this page to the buffer */
-                       pageHeaderSize = XLogPageHeaderSize(pageHeader);
-                       contrecord = (char *) readBuf + pageHeaderSize;
-                       len = XLOG_BLCKSZ - pageHeaderSize;
-                       if (pageHeader->xlp_rem_len < len)
-                               len = pageHeader->xlp_rem_len;
-                       memcpy(buffer, (char *) contrecord, len);
-                       buffer += len;
-                       gotlen += len;
-
-                       /* If we just reassembled the record header, validate it. */
-                       if (!gotheader)
-                       {
-                               record = (XLogRecord *) readRecordBuf;
-                               if (!ValidXLogRecordHeader(RecPtr, record, emode, randAccess))
-                                       goto next_record_is_invalid;
-                               gotheader = true;
-                       }
-               } while (pageHeader->xlp_rem_len > len);
-
-               record = (XLogRecord *) readRecordBuf;
-               if (!RecordIsValid(record, *RecPtr, emode))
-                       goto next_record_is_invalid;
-               pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
-               XLogSegNoOffsetToRecPtr(
-                       readSegNo,
-                       readOff + pageHeaderSize + MAXALIGN(pageHeader->xlp_rem_len),
-                       EndRecPtr);
-               ReadRecPtr = *RecPtr;
-       }
-       else
-       {
-               /* Record does not cross a page boundary */
-               if (!RecordIsValid(record, *RecPtr, emode))
-                       goto next_record_is_invalid;
-               EndRecPtr = *RecPtr + MAXALIGN(total_len);
-
-               ReadRecPtr = *RecPtr;
-               memcpy(readRecordBuf, record, total_len);
-       }
-
-       /*
-        * Special processing if it's an XLOG SWITCH record
-        */
-       if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
+       blk = (char *) XLogRecGetData(record) + record->xl_len;
+       for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
        {
-               /* Pretend it extends to end of segment */
-               EndRecPtr += XLogSegSize - 1;
-               EndRecPtr -= EndRecPtr % XLogSegSize;
+               if (!(record->xl_info & XLR_BKP_BLOCK(i)))
+                       continue;
 
-               /*
-                * Pretend that readBuf contains the last page of the segment. This is
-                * just to avoid Assert failure in StartupXLOG if XLOG ends with this
-                * segment.
-                */
-               readOff = XLogSegSize - XLOG_BLCKSZ;
-       }
-       return record;
+               memcpy(&bkpb, blk, sizeof(BkpBlock));
+               blk += sizeof(BkpBlock);
 
-next_record_is_invalid:
-       failedSources |= readSource;
+               if (i == block_index)
+               {
+                       /* Found it, apply the update */
+                       return RestoreBackupBlockContents(lsn, bkpb, blk, get_cleanup_lock,
+                                                                                         keep_buffer);
+               }
 
-       if (readFile >= 0)
-       {
-               close(readFile);
-               readFile = -1;
+               blk += BLCKSZ - bkpb.hole_length;
        }
 
-       /* In standby-mode, keep trying */
-       if (StandbyMode)
-               goto retry;
-       else
-               return NULL;
+       /* Caller specified a bogus block_index */
+       elog(ERROR, "failed to restore block_index %d", block_index);
+       return InvalidBuffer;           /* keep compiler quiet */
 }
 
 /*
- * Check whether the xlog header of a page just read in looks valid.
+ * Workhorse for RestoreBackupBlock usable without an xlog record
  *
- * This is just a convenience subroutine to avoid duplicated code in
- * ReadRecord. It's not intended for use from anywhere else.
+ * Restores a full-page image from BkpBlock and a data pointer.
  */
-static bool
-ValidXLogPageHeader(XLogPageHeader hdr, int emode, bool segmentonly)
+static Buffer
+RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb, char *blk,
+                                                  bool get_cleanup_lock, bool keep_buffer)
 {
-       XLogRecPtr      recaddr;
-
-       XLogSegNoOffsetToRecPtr(readSegNo, readOff, recaddr);
+       Buffer          buffer;
+       Page            page;
 
-       if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
-       {
-               ereport(emode_for_corrupt_record(emode, recaddr),
-                               (errmsg("invalid magic number %04X in log segment %s, offset %u",
-                                               hdr->xlp_magic,
-                                               XLogFileNameP(curFileTLI, readSegNo),
-                                               readOff)));
-               return false;
-       }
-       if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
-       {
-               ereport(emode_for_corrupt_record(emode, recaddr),
-                               (errmsg("invalid info bits %04X in log segment %s, offset %u",
-                                               hdr->xlp_info,
-                                               XLogFileNameP(curFileTLI, readSegNo),
-                                               readOff)));
-               return false;
-       }
-       if (hdr->xlp_info & XLP_LONG_HEADER)
-       {
-               XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
+       buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block,
+                                                                       RBM_ZERO);
+       Assert(BufferIsValid(buffer));
+       if (get_cleanup_lock)
+               LockBufferForCleanup(buffer);
+       else
+               LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
-               if (longhdr->xlp_sysid != ControlFile->system_identifier)
-               {
-                       char            fhdrident_str[32];
-                       char            sysident_str[32];
+       page = (Page) BufferGetPage(buffer);
 
-                       /*
-                        * Format sysids separately to keep platform-dependent format code
-                        * out of the translatable message string.
-                        */
-                       snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT,
-                                        longhdr->xlp_sysid);
-                       snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT,
-                                        ControlFile->system_identifier);
-                       ereport(emode_for_corrupt_record(emode, recaddr),
-                                       (errmsg("WAL file is from different database system"),
-                                        errdetail("WAL file database system identifier is %s, pg_control database system identifier is %s.",
-                                                          fhdrident_str, sysident_str)));
-                       return false;
-               }
-               if (longhdr->xlp_seg_size != XLogSegSize)
-               {
-                       ereport(emode_for_corrupt_record(emode, recaddr),
-                                       (errmsg("WAL file is from different database system"),
-                                        errdetail("Incorrect XLOG_SEG_SIZE in page header.")));
-                       return false;
-               }
-               if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
-               {
-                       ereport(emode_for_corrupt_record(emode, recaddr),
-                                       (errmsg("WAL file is from different database system"),
-                                        errdetail("Incorrect XLOG_BLCKSZ in page header.")));
-                       return false;
-               }
-       }
-       else if (readOff == 0)
+       if (bkpb.hole_length == 0)
        {
-               /* hmm, first page of file doesn't have a long header? */
-               ereport(emode_for_corrupt_record(emode, recaddr),
-                               (errmsg("invalid info bits %04X in log segment %s, offset %u",
-                                               hdr->xlp_info,
-                                               XLogFileNameP(curFileTLI, readSegNo),
-                                               readOff)));
-               return false;
+               memcpy((char *) page, blk, BLCKSZ);
        }
-
-       if (!XLByteEQ(hdr->xlp_pageaddr, recaddr))
+       else
        {
-               ereport(emode_for_corrupt_record(emode, recaddr),
-                               (errmsg("unexpected pageaddr %X/%X in log segment %s, offset %u",
-                                               (uint32) (hdr->xlp_pageaddr >> 32), (uint32) hdr->xlp_pageaddr,
-                                               XLogFileNameP(curFileTLI, readSegNo),
-                                               readOff)));
-               return false;
+               memcpy((char *) page, blk, bkpb.hole_offset);
+               /* must zero-fill the hole */
+               MemSet((char *) page + bkpb.hole_offset, 0, bkpb.hole_length);
+               memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length),
+                          blk + bkpb.hole_offset,
+                          BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
        }
 
        /*
-        * Check page TLI is one of the expected values.
+        * The checksum value on this page is currently invalid. We don't need to
+        * reset it here since it will be set before being written.
         */
-       if (!list_member_int(expectedTLIs, (int) hdr->xlp_tli))
-       {
-               ereport(emode_for_corrupt_record(emode, recaddr),
-                               (errmsg("unexpected timeline ID %u in log segment %s, offset %u",
-                                               hdr->xlp_tli,
-                                               XLogFileNameP(curFileTLI, readSegNo),
-                                               readOff)));
-               return false;
-       }
 
-       /*
-        * Since child timelines are always assigned a TLI greater than their
-        * immediate parent's TLI, we should never see TLI go backwards across
-        * successive pages of a consistent WAL sequence.
-        *
-        * Of course this check should only be applied when advancing sequentially
-        * across pages; therefore ReadRecord resets lastPageTLI and
-        * lastSegmentTLI to zero when going to a random page.
-        *
-        * Sometimes we re-open a segment that's already been partially replayed.
-        * In that case we cannot perform the normal TLI check: if there is a
-        * timeline switch within the segment, the first page has a smaller TLI
-        * than later pages following the timeline switch, and we might've read
-        * them already. As a weaker test, we still check that it's not smaller
-        * than the TLI we last saw at the beginning of a segment. Pass
-        * segmentonly = true when re-validating the first page like that, and the
-        * page you're actually interested in comes later.
-        */
-       if (hdr->xlp_tli < (segmentonly ? lastSegmentTLI : lastPageTLI))
-       {
-               ereport(emode_for_corrupt_record(emode, recaddr),
-                               (errmsg("out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u",
-                                               hdr->xlp_tli,
-                                               segmentonly ? lastSegmentTLI : lastPageTLI,
-                                               XLogFileNameP(curFileTLI, readSegNo),
-                                               readOff)));
-               return false;
-       }
-       lastPageTLI = hdr->xlp_tli;
-       if (readOff == 0)
-               lastSegmentTLI = hdr->xlp_tli;
+       PageSetLSN(page, lsn);
+       MarkBufferDirty(buffer);
 
-       return true;
+       if (!keep_buffer)
+               UnlockReleaseBuffer(buffer);
+
+       return buffer;
 }
 
 /*
- * Validate an XLOG record header.
+ * Attempt to read an XLOG record.
+ *
+ * If RecPtr is not NULL, try to read a record at that position.  Otherwise
+ * try to read a record just after the last one previously read.
+ *
+ * If no valid record is available, returns NULL, or fails if emode is PANIC.
+ * (emode must be either PANIC, LOG). In standby mode, retries until a valid
+ * record is available.
  *
- * This is just a convenience subroutine to avoid duplicated code in
- * ReadRecord. It's not intended for use from anywhere else.
+ * The record is copied into readRecordBuf, so that on successful return,
+ * the returned record pointer always points there.
  */
-static bool
-ValidXLogRecordHeader(XLogRecPtr *RecPtr, XLogRecord *record, int emode,
-                                         bool randAccess)
+static XLogRecord *
+ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
+                  bool fetching_ckpt)
 {
-       /*
-        * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
-        * required.
-        */
-       if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
+       XLogRecord *record;
+       XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
+
+       /* Pass through parameters to XLogPageRead */
+       private->fetching_ckpt = fetching_ckpt;
+       private->emode = emode;
+       private->randAccess = (RecPtr != InvalidXLogRecPtr);
+
+       /* This is the first attempt to read this page. */
+       lastSourceFailed = false;
+
+       for (;;)
        {
-               if (record->xl_len != 0)
+               char       *errormsg;
+
+               record = XLogReadRecord(xlogreader, RecPtr, &errormsg);
+               ReadRecPtr = xlogreader->ReadRecPtr;
+               EndRecPtr = xlogreader->EndRecPtr;
+               if (record == NULL)
                {
-                       ereport(emode_for_corrupt_record(emode, *RecPtr),
-                                       (errmsg("invalid xlog switch record at %X/%X",
-                                                       (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-                       return false;
+                       if (readFile >= 0)
+                       {
+                               close(readFile);
+                               readFile = -1;
+                       }
+
+                       /*
+                        * We only end up here without a message when XLogPageRead()
+                        * failed - in that case we already logged something. In
+                        * StandbyMode that only happens if we have been triggered, so we
+                        * shouldn't loop anymore in that case.
+                        */
+                       if (errormsg)
+                               ereport(emode_for_corrupt_record(emode,
+                                                                                                RecPtr ? RecPtr : EndRecPtr),
+                               (errmsg_internal("%s", errormsg) /* already translated */ ));
                }
-       }
-       else if (record->xl_len == 0)
-       {
-               ereport(emode_for_corrupt_record(emode, *RecPtr),
-                               (errmsg("record with zero length at %X/%X",
-                                               (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-               return false;
-       }
-       if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len ||
-               record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
-               XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
-       {
-               ereport(emode_for_corrupt_record(emode, *RecPtr),
-                               (errmsg("invalid record length at %X/%X",
-                                               (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-               return false;
-       }
-       if (record->xl_rmid > RM_MAX_ID)
-       {
-               ereport(emode_for_corrupt_record(emode, *RecPtr),
-                               (errmsg("invalid resource manager ID %u at %X/%X",
-                                               record->xl_rmid, (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-               return false;
-       }
-       if (randAccess)
-       {
+
                /*
-                * We can't exactly verify the prev-link, but surely it should be less
-                * than the record's own address.
+                * Check page TLI is one of the expected values.
                 */
-               if (!XLByteLT(record->xl_prev, *RecPtr))
+               else if (!tliInHistory(xlogreader->latestPageTLI, expectedTLEs))
                {
-                       ereport(emode_for_corrupt_record(emode, *RecPtr),
-                                       (errmsg("record with incorrect prev-link %X/%X at %X/%X",
-                                                       (uint32) (record->xl_prev >> 32), (uint32) record->xl_prev,
-                                                       (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-                       return false;
+                       char            fname[MAXFNAMELEN];
+                       XLogSegNo       segno;
+                       int32           offset;
+
+                       XLByteToSeg(xlogreader->latestPagePtr, segno);
+                       offset = xlogreader->latestPagePtr % XLogSegSize;
+                       XLogFileName(fname, xlogreader->readPageTLI, segno);
+                       ereport(emode_for_corrupt_record(emode,
+                                                                                        RecPtr ? RecPtr : EndRecPtr),
+                       (errmsg("unexpected timeline ID %u in log segment %s, offset %u",
+                                       xlogreader->latestPageTLI,
+                                       fname,
+                                       offset)));
+                       record = NULL;
                }
-       }
-       else
-       {
-               /*
-                * Record's prev-link should exactly match our previous location. This
-                * check guards against torn WAL pages where a stale but valid-looking
-                * WAL record starts on a sector boundary.
-                */
-               if (!XLByteEQ(record->xl_prev, ReadRecPtr))
+
+               if (record)
                {
-                       ereport(emode_for_corrupt_record(emode, *RecPtr),
-                                       (errmsg("record with incorrect prev-link %X/%X at %X/%X",
-                                                       (uint32) (record->xl_prev >> 32), (uint32) record->xl_prev,
-                                                       (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-                       return false;
+                       /* Great, got a record */
+                       return record;
                }
-       }
+               else
+               {
+                       /* No valid record available from this source */
+                       lastSourceFailed = true;
 
-       return true;
+                       /*
+                        * If archive recovery was requested, but we were still doing
+                        * crash recovery, switch to archive recovery and retry using the
+                        * offline archive. We have now replayed all the valid WAL in
+                        * pg_xlog, so we are presumably now consistent.
+                        *
+                        * We require that there's at least some valid WAL present in
+                        * pg_xlog, however (!fetch_ckpt). We could recover using the WAL
+                        * from the archive, even if pg_xlog is completely empty, but we'd
+                        * have no idea how far we'd have to replay to reach consistency.
+                        * So err on the safe side and give up.
+                        */
+                       if (!InArchiveRecovery && ArchiveRecoveryRequested &&
+                               !fetching_ckpt)
+                       {
+                               ereport(DEBUG1,
+                                               (errmsg_internal("reached end of WAL in pg_xlog, entering archive recovery")));
+                               InArchiveRecovery = true;
+                               if (StandbyModeRequested)
+                                       StandbyMode = true;
+
+                               /* initialize minRecoveryPoint to this record */
+                               LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+                               ControlFile->state = DB_IN_ARCHIVE_RECOVERY;
+                               if (ControlFile->minRecoveryPoint < EndRecPtr)
+                               {
+                                       ControlFile->minRecoveryPoint = EndRecPtr;
+                                       ControlFile->minRecoveryPointTLI = ThisTimeLineID;
+                               }
+                               /* update local copy */
+                               minRecoveryPoint = ControlFile->minRecoveryPoint;
+                               minRecoveryPointTLI = ControlFile->minRecoveryPointTLI;
+
+                               UpdateControlFile();
+                               LWLockRelease(ControlFileLock);
+
+                               CheckRecoveryConsistency();
+
+                               /*
+                                * Before we retry, reset lastSourceFailed and currentSource
+                                * so that we will check the archive next.
+                                */
+                               lastSourceFailed = false;
+                               currentSource = 0;
+
+                               continue;
+                       }
+
+                       /* In standby mode, loop back to retry. Otherwise, give up. */
+                       if (StandbyMode && !CheckForStandbyTrigger())
+                               continue;
+                       else
+                               return NULL;
+               }
+       }
 }
 
 /*
@@ -3790,57 +4488,81 @@ ValidXLogRecordHeader(XLogRecPtr *RecPtr, XLogRecord *record, int emode,
 static bool
 rescanLatestTimeLine(void)
 {
+       List       *newExpectedTLEs;
+       bool            found;
+       ListCell   *cell;
        TimeLineID      newtarget;
+       TimeLineID      oldtarget = recoveryTargetTLI;
+       TimeLineHistoryEntry *currentTle = NULL;
 
        newtarget = findNewestTimeLine(recoveryTargetTLI);
-       if (newtarget != recoveryTargetTLI)
+       if (newtarget == recoveryTargetTLI)
        {
-               /*
-                * Determine the list of expected TLIs for the new TLI
-                */
-               List       *newExpectedTLIs;
-
-               newExpectedTLIs = readTimeLineHistory(newtarget);
+               /* No new timelines found */
+               return false;
+       }
 
-               /*
-                * If the current timeline is not part of the history of the new
-                * timeline, we cannot proceed to it.
-                *
-                * XXX This isn't foolproof: The new timeline might have forked from
-                * the current one, but before the current recovery location. In that
-                * case we will still switch to the new timeline and proceed replaying
-                * from it even though the history doesn't match what we already
-                * replayed. That's not good. We will likely notice at the next online
-                * checkpoint, as the TLI won't match what we expected, but it's not
-                * guaranteed. The admin needs to make sure that doesn't happen.
-                */
-               if (!list_member_int(newExpectedTLIs,
-                                                        (int) recoveryTargetTLI))
-                       ereport(LOG,
-                                       (errmsg("new timeline %u is not a child of database system timeline %u",
-                                                       newtarget,
-                                                       ThisTimeLineID)));
-               else
-               {
-                       /* use volatile pointer to prevent code rearrangement */
-                       volatile XLogCtlData *xlogctl = XLogCtl;
+       /*
+        * Determine the list of expected TLIs for the new TLI
+        */
 
-                       /* Switch target */
-                       recoveryTargetTLI = newtarget;
-                       list_free(expectedTLIs);
-                       expectedTLIs = newExpectedTLIs;
+       newExpectedTLEs = readTimeLineHistory(newtarget);
 
-                       SpinLockAcquire(&xlogctl->info_lck);
-                       xlogctl->RecoveryTargetTLI = recoveryTargetTLI;
-                       SpinLockRelease(&xlogctl->info_lck);
+       /*
+        * If the current timeline is not part of the history of the new timeline,
+        * we cannot proceed to it.
+        */
+       found = false;
+       foreach(cell, newExpectedTLEs)
+       {
+               currentTle = (TimeLineHistoryEntry *) lfirst(cell);
 
-                       ereport(LOG,
-                                       (errmsg("new target timeline is %u",
-                                                       recoveryTargetTLI)));
-                       return true;
+               if (currentTle->tli == recoveryTargetTLI)
+               {
+                       found = true;
+                       break;
                }
        }
-       return false;
+       if (!found)
+       {
+               ereport(LOG,
+                               (errmsg("new timeline %u is not a child of database system timeline %u",
+                                               newtarget,
+                                               ThisTimeLineID)));
+               return false;
+       }
+
+       /*
+        * The current timeline was found in the history file, but check that the
+        * next timeline was forked off from it *after* the current recovery
+        * location.
+        */
+       if (currentTle->end < EndRecPtr)
+       {
+               ereport(LOG,
+                               (errmsg("new timeline %u forked off current database system timeline %u before current recovery point %X/%X",
+                                               newtarget,
+                                               ThisTimeLineID,
+                                               (uint32) (EndRecPtr >> 32), (uint32) EndRecPtr)));
+               return false;
+       }
+
+       /* The new timeline history seems valid. Switch target */
+       recoveryTargetTLI = newtarget;
+       list_free_deep(expectedTLEs);
+       expectedTLEs = newExpectedTLEs;
+
+       /*
+        * As in StartupXLOG(), try to ensure we have all the history files
+        * between the old target and new target in pg_xlog.
+        */
+       restoreTimeLineHistoryFiles(oldtarget + 1, newtarget);
+
+       ereport(LOG,
+                       (errmsg("new target timeline is %u",
+                                       recoveryTargetTLI)));
+
+       return true;
 }
 
 /*
@@ -4176,6 +4898,41 @@ GetSystemIdentifier(void)
        return ControlFile->system_identifier;
 }
 
+/*
+ * Are checksums enabled for data pages?
+ */
+bool
+DataChecksumsEnabled(void)
+{
+       Assert(ControlFile != NULL);
+       return (ControlFile->data_checksum_version > 0);
+}
+
+/*
+ * Returns a fake LSN for unlogged relations.
+ *
+ * Each call generates an LSN that is greater than any previous value
+ * returned. The current counter value is saved and restored across clean
+ * shutdowns, but like unlogged relations, does not survive a crash. This can
+ * be used in lieu of real LSN values returned by XLogInsert, if you need an
+ * LSN-like increasing sequence of numbers without writing any WAL.
+ */
+XLogRecPtr
+GetFakeLSNForUnloggedRel(void)
+{
+       XLogRecPtr      nextUnloggedLSN;
+
+       /* use volatile pointer to prevent code rearrangement */
+       volatile XLogCtlData *xlogctl = XLogCtl;
+
+       /* increment the unloggedLSN counter, need SpinLock */
+       SpinLockAcquire(&xlogctl->ulsn_lck);
+       nextUnloggedLSN = xlogctl->unloggedLSN++;
+       SpinLockRelease(&xlogctl->ulsn_lck);
+
+       return nextUnloggedLSN;
+}
+
 /*
  * Auto-tune the number of XLOG buffers.
  *
@@ -4260,10 +5017,13 @@ XLOGShmemSize(void)
 
        /* XLogCtl */
        size = sizeof(XLogCtlData);
+
+       /* xlog insertion slots, plus alignment */
+       size = add_size(size, mul_size(sizeof(XLogInsertSlotPadded), num_xloginsert_slots + 1));
        /* xlblocks array */
        size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers));
        /* extra alignment padding for XLOG I/O buffers */
-       size = add_size(size, ALIGNOF_XLOG_BUFFER);
+       size = add_size(size, XLOG_BLCKSZ);
        /* and the buffers themselves */
        size = add_size(size, mul_size(XLOG_BLCKSZ, XLOGbuffers));
 
@@ -4282,6 +5042,7 @@ XLOGShmemInit(void)
        bool            foundCFile,
                                foundXLog;
        char       *allocptr;
+       int                     i;
 
        ControlFile = (ControlFileData *)
                ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile);
@@ -4294,7 +5055,6 @@ XLOGShmemInit(void)
                Assert(foundCFile && foundXLog);
                return;
        }
-
        memset(XLogCtl, 0, sizeof(XLogCtlData));
 
        /*
@@ -4307,10 +5067,18 @@ XLOGShmemInit(void)
        memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
        allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
 
+       /* Xlog insertion slots. Ensure they're aligned to the full padded size */
+       allocptr += sizeof(XLogInsertSlotPadded) -
+               ((uintptr_t) allocptr) % sizeof(XLogInsertSlotPadded);
+       XLogCtl->Insert.insertSlots = (XLogInsertSlotPadded *) allocptr;
+       allocptr += sizeof(XLogInsertSlotPadded) * num_xloginsert_slots;
+
        /*
-        * Align the start of the page buffers to an ALIGNOF_XLOG_BUFFER boundary.
+        * Align the start of the page buffers to a full xlog block size boundary.
+        * This simplifies some calculations in XLOG insertion. It is also required
+        * for O_DIRECT.
         */
-       allocptr = (char *) TYPEALIGN(ALIGNOF_XLOG_BUFFER, allocptr);
+       allocptr = (char *) TYPEALIGN(XLOG_BLCKSZ, allocptr);
        XLogCtl->pages = allocptr;
        memset(XLogCtl->pages, 0, (Size) XLOG_BLCKSZ * XLOGbuffers);
 
@@ -4322,8 +5090,23 @@ XLOGShmemInit(void)
        XLogCtl->SharedRecoveryInProgress = true;
        XLogCtl->SharedHotStandbyActive = false;
        XLogCtl->WalWriterSleeping = false;
-       XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
+
+       for (i = 0; i < num_xloginsert_slots; i++)
+       {
+               XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[i].slot;
+               SpinLockInit(&slot->mutex);
+               slot->xlogInsertingAt = InvalidXLogRecPtr;
+               slot->owner = NULL;
+
+               slot->releaseOK = true;
+               slot->exclusive = 0;
+               slot->head = NULL;
+               slot->tail = NULL;
+       }
+
+       SpinLockInit(&XLogCtl->Insert.insertpos_lck);
        SpinLockInit(&XLogCtl->info_lck);
+       SpinLockInit(&XLogCtl->ulsn_lck);
        InitSharedLatch(&XLogCtl->recoveryWakeupLatch);
 
        /*
@@ -4372,8 +5155,8 @@ BootStrapXLOG(void)
        ThisTimeLineID = 1;
 
        /* page buffer must be aligned suitably for O_DIRECT */
-       buffer = (char *) palloc(XLOG_BLCKSZ + ALIGNOF_XLOG_BUFFER);
-       page = (XLogPageHeader) TYPEALIGN(ALIGNOF_XLOG_BUFFER, buffer);
+       buffer = (char *) palloc(XLOG_BLCKSZ + XLOG_BLCKSZ);
+       page = (XLogPageHeader) TYPEALIGN(XLOG_BLCKSZ, buffer);
        memset(page, 0, XLOG_BLCKSZ);
 
        /*
@@ -4385,6 +5168,7 @@ BootStrapXLOG(void)
         */
        checkPoint.redo = XLogSegSize + SizeOfXLogLongPHD;
        checkPoint.ThisTimeLineID = ThisTimeLineID;
+       checkPoint.PrevTimeLineID = ThisTimeLineID;
        checkPoint.fullPageWrites = fullPageWrites;
        checkPoint.nextXidEpoch = 0;
        checkPoint.nextXid = FirstNormalTransactionId;
@@ -4393,6 +5177,8 @@ BootStrapXLOG(void)
        checkPoint.nextMultiOffset = 0;
        checkPoint.oldestXid = FirstNormalTransactionId;
        checkPoint.oldestXidDB = TemplateDbOid;
+       checkPoint.oldestMulti = FirstMultiXactId;
+       checkPoint.oldestMultiDB = TemplateDbOid;
        checkPoint.time = (pg_time_t) time(NULL);
        checkPoint.oldestActiveXid = InvalidTransactionId;
 
@@ -4401,6 +5187,7 @@ BootStrapXLOG(void)
        ShmemVariableCache->oidCount = 0;
        MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
        SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+       SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
 
        /* Set up the XLOG page header */
        page->xlp_magic = XLOG_PAGE_MAGIC;
@@ -4465,12 +5252,15 @@ BootStrapXLOG(void)
        ControlFile->time = checkPoint.time;
        ControlFile->checkPoint = checkPoint.redo;
        ControlFile->checkPointCopy = checkPoint;
+       ControlFile->unloggedLSN = 1;
 
        /* Set important parameter values for use when replaying WAL */
        ControlFile->MaxConnections = MaxConnections;
+       ControlFile->max_worker_processes = max_worker_processes;
        ControlFile->max_prepared_xacts = max_prepared_xacts;
        ControlFile->max_locks_per_xact = max_locks_per_xact;
        ControlFile->wal_level = wal_level;
+       ControlFile->data_checksum_version = bootstrap_data_checksum_version;
 
        /* some additional ControlFile fields are set in WriteControlFile() */
 
@@ -4658,7 +5448,7 @@ readRecoveryCommandFile(void)
                }
                else if (strcmp(item->name, "standby_mode") == 0)
                {
-                       if (!parse_bool(item->value, &StandbyMode))
+                       if (!parse_bool(item->value, &StandbyModeRequested))
                                ereport(ERROR,
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                                 errmsg("parameter \"%s\" requires a Boolean value",
@@ -4689,7 +5479,7 @@ readRecoveryCommandFile(void)
        /*
         * Check for compulsory parameters
         */
-       if (StandbyMode)
+       if (StandbyModeRequested)
        {
                if (PrimaryConnInfo == NULL && recoveryRestoreCommand == NULL)
                        ereport(WARNING,
@@ -4706,7 +5496,7 @@ readRecoveryCommandFile(void)
        }
 
        /* Enable fetching from archive recovery area */
-       InArchiveRecovery = true;
+       ArchiveRecoveryRequested = true;
 
        /*
         * If user specified recovery_target_timeline, validate it or compute the
@@ -4988,13 +5778,19 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
 }
 
 /*
- * Recheck shared recoveryPause by polling.
+ * Wait until shared recoveryPause flag is cleared.
  *
- * XXX Can also be done with shared latch.
+ * XXX Could also be done with shared latch, avoiding the pg_usleep loop.
+ * Probably not worth the trouble though.  This state shouldn't be one that
+ * anyone cares about server power consumption in.
  */
 static void
 recoveryPausesHere(void)
 {
+       /* Don't pause unless users can connect! */
+       if (!LocalHotStandbyActive)
+               return;
+
        ereport(LOG,
                        (errmsg("recovery has paused"),
                         errhint("Execute pg_xlog_replay_resume() to continue.")));
@@ -5168,6 +5964,9 @@ CheckRequiredParameterValues(void)
                RecoveryRequiresIntParameter("max_connections",
                                                                         MaxConnections,
                                                                         ControlFile->MaxConnections);
+               RecoveryRequiresIntParameter("max_worker_processes",
+                                                                        max_worker_processes,
+                                                                        ControlFile->max_worker_processes);
                RecoveryRequiresIntParameter("max_prepared_transactions",
                                                                         max_prepared_xacts,
                                                                         ControlFile->max_prepared_xacts);
@@ -5192,12 +5991,15 @@ StartupXLOG(void)
                                checkPointLoc,
                                EndOfLog;
        XLogSegNo       endLogSegNo;
+       TimeLineID      PrevTimeLineID;
        XLogRecord *record;
-       uint32          freespace;
        TransactionId oldestActiveXID;
        bool            backupEndRequired = false;
        bool            backupFromStandby = false;
        DBState         dbstate_at_startup;
+       XLogReaderState *xlogreader;
+       XLogPageReadPrivate private;
+       bool            fast_promoted = false;
 
        /*
         * Read control file and check XLOG status looks valid.
@@ -5214,9 +6016,12 @@ StartupXLOG(void)
                                (errmsg("control file contains invalid data")));
 
        if (ControlFile->state == DB_SHUTDOWNED)
-               ereport(LOG,
+       {
+               /* This is the expected case, so don't be chatty in standalone mode */
+               ereport(IsPostmasterEnvironment ? LOG : NOTICE,
                                (errmsg("database system was shut down at %s",
                                                str_time(ControlFile->time))));
+       }
        else if (ControlFile->state == DB_SHUTDOWNED_IN_RECOVERY)
                ereport(LOG,
                                (errmsg("database system was shut down in recovery at %s",
@@ -5266,10 +6071,14 @@ StartupXLOG(void)
        RelationCacheInitFileRemove();
 
        /*
-        * Initialize on the assumption we want to recover to the same timeline
+        * Initialize on the assumption we want to recover to the latest timeline
         * that's active according to pg_control.
         */
-       recoveryTargetTLI = ControlFile->checkPointCopy.ThisTimeLineID;
+       if (ControlFile->minRecoveryPointTLI >
+               ControlFile->checkPointCopy.ThisTimeLineID)
+               recoveryTargetTLI = ControlFile->minRecoveryPointTLI;
+       else
+               recoveryTargetTLI = ControlFile->checkPointCopy.ThisTimeLineID;
 
        /*
         * Check for recovery control file, and if so set up state for offline
@@ -5277,34 +6086,17 @@ StartupXLOG(void)
         */
        readRecoveryCommandFile();
 
-       /* Now we can determine the list of expected TLIs */
-       expectedTLIs = readTimeLineHistory(recoveryTargetTLI);
-
-       /*
-        * If pg_control's timeline is not in expectedTLIs, then we cannot
-        * proceed: the backup is not part of the history of the requested
-        * timeline.
-        */
-       if (!list_member_int(expectedTLIs,
-                                                (int) ControlFile->checkPointCopy.ThisTimeLineID))
-               ereport(FATAL,
-                               (errmsg("requested timeline %u is not a child of database system timeline %u",
-                                               recoveryTargetTLI,
-                                               ControlFile->checkPointCopy.ThisTimeLineID)));
-
        /*
-        * Save the selected recovery target timeline ID and
-        * archive_cleanup_command in shared memory so that other processes can
-        * see them
+        * Save archive_cleanup_command in shared memory so that other processes
+        * can see it.
         */
-       XLogCtl->RecoveryTargetTLI = recoveryTargetTLI;
        strncpy(XLogCtl->archiveCleanupCommand,
                        archiveCleanupCommand ? archiveCleanupCommand : "",
                        sizeof(XLogCtl->archiveCleanupCommand));
 
-       if (InArchiveRecovery)
+       if (ArchiveRecoveryRequested)
        {
-               if (StandbyMode)
+               if (StandbyModeRequested)
                        ereport(LOG,
                                        (errmsg("entering standby mode")));
                else if (recoveryTarget == RECOVERY_TARGET_XID)
@@ -5328,24 +6120,43 @@ StartupXLOG(void)
         * Take ownership of the wakeup latch if we're going to sleep during
         * recovery.
         */
-       if (StandbyMode)
+       if (StandbyModeRequested)
                OwnLatch(&XLogCtl->recoveryWakeupLatch);
 
+       /* Set up XLOG reader facility */
+       MemSet(&private, 0, sizeof(XLogPageReadPrivate));
+       xlogreader = XLogReaderAllocate(&XLogPageRead, &private);
+       if (!xlogreader)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OUT_OF_MEMORY),
+                                errmsg("out of memory"),
+                       errdetail("Failed while allocating an XLog reading processor.")));
+       xlogreader->system_identifier = ControlFile->system_identifier;
+
        if (read_backup_label(&checkPointLoc, &backupEndRequired,
                                                  &backupFromStandby))
        {
+               /*
+                * Archive recovery was requested, and thanks to the backup label
+                * file, we know how far we need to replay to reach consistency. Enter
+                * archive recovery directly.
+                */
+               InArchiveRecovery = true;
+               if (StandbyModeRequested)
+                       StandbyMode = true;
+
                /*
                 * When a backup_label file is present, we want to roll forward from
                 * the checkpoint it identifies, rather than using pg_control.
                 */
-               record = ReadCheckpointRecord(checkPointLoc, 0);
+               record = ReadCheckpointRecord(xlogreader, checkPointLoc, 0, true);
                if (record != NULL)
                {
                        memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
                        wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
                        ereport(DEBUG1,
                                        (errmsg("checkpoint record is at %X/%X",
-                                                       (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
+                                  (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
                        InRecovery = true;      /* force recovery even if SHUTDOWNED */
 
                        /*
@@ -5354,9 +6165,9 @@ StartupXLOG(void)
                         * backup_label around that references a WAL segment that's
                         * already been archived.
                         */
-                       if (XLByteLT(checkPoint.redo, checkPointLoc))
+                       if (checkPoint.redo < checkPointLoc)
                        {
-                               if (!ReadRecord(&(checkPoint.redo), LOG, false))
+                               if (!ReadRecord(xlogreader, checkPoint.redo, LOG, false))
                                        ereport(FATAL,
                                                        (errmsg("could not find redo location referenced by checkpoint record"),
                                                         errhint("If you are not restoring from a backup, try removing the file \"%s/backup_label\".", DataDir)));
@@ -5374,18 +6185,45 @@ StartupXLOG(void)
        }
        else
        {
+               /*
+                * It's possible that archive recovery was requested, but we don't
+                * know how far we need to replay the WAL before we reach consistency.
+                * This can happen for example if a base backup is taken from a
+                * running server using an atomic filesystem snapshot, without calling
+                * pg_start/stop_backup. Or if you just kill a running master server
+                * and put it into archive recovery by creating a recovery.conf file.
+                *
+                * Our strategy in that case is to perform crash recovery first,
+                * replaying all the WAL present in pg_xlog, and only enter archive
+                * recovery after that.
+                *
+                * But usually we already know how far we need to replay the WAL (up
+                * to minRecoveryPoint, up to backupEndPoint, or until we see an
+                * end-of-backup record), and we can enter archive recovery directly.
+                */
+               if (ArchiveRecoveryRequested &&
+                       (ControlFile->minRecoveryPoint != InvalidXLogRecPtr ||
+                        ControlFile->backupEndRequired ||
+                        ControlFile->backupEndPoint != InvalidXLogRecPtr ||
+                        ControlFile->state == DB_SHUTDOWNED))
+               {
+                       InArchiveRecovery = true;
+                       if (StandbyModeRequested)
+                               StandbyMode = true;
+               }
+
                /*
                 * Get the last valid checkpoint record.  If the latest one according
                 * to pg_control is broken, try the next-to-last one.
                 */
                checkPointLoc = ControlFile->checkPoint;
                RedoStartLSN = ControlFile->checkPointCopy.redo;
-               record = ReadCheckpointRecord(checkPointLoc, 1);
+               record = ReadCheckpointRecord(xlogreader, checkPointLoc, 1, true);
                if (record != NULL)
                {
                        ereport(DEBUG1,
                                        (errmsg("checkpoint record is at %X/%X",
-                                                       (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
+                                  (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
                }
                else if (StandbyMode)
                {
@@ -5399,12 +6237,12 @@ StartupXLOG(void)
                else
                {
                        checkPointLoc = ControlFile->prevCheckPoint;
-                       record = ReadCheckpointRecord(checkPointLoc, 2);
+                       record = ReadCheckpointRecord(xlogreader, checkPointLoc, 2, true);
                        if (record != NULL)
                        {
                                ereport(LOG,
                                                (errmsg("using previous checkpoint record at %X/%X",
-                                                               (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
+                                  (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
                                InRecovery = true;              /* force recovery even if SHUTDOWNED */
                        }
                        else
@@ -5415,11 +6253,53 @@ StartupXLOG(void)
                wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
        }
 
+       /*
+        * If the location of the checkpoint record is not on the expected
+        * timeline in the history of the requested timeline, we cannot proceed:
+        * the backup is not part of the history of the requested timeline.
+        */
+       Assert(expectedTLEs);           /* was initialized by reading checkpoint
+                                                                * record */
+       if (tliOfPointInHistory(checkPointLoc, expectedTLEs) !=
+               checkPoint.ThisTimeLineID)
+       {
+               XLogRecPtr      switchpoint;
+
+               /*
+                * tliSwitchPoint will throw an error if the checkpoint's timeline is
+                * not in expectedTLEs at all.
+                */
+               switchpoint = tliSwitchPoint(ControlFile->checkPointCopy.ThisTimeLineID, expectedTLEs, NULL);
+               ereport(FATAL,
+                               (errmsg("requested timeline %u is not a child of this server's history",
+                                               recoveryTargetTLI),
+                                errdetail("Latest checkpoint is at %X/%X on timeline %u, but in the history of the requested timeline, the server forked off from that timeline at %X/%X.",
+                                                  (uint32) (ControlFile->checkPoint >> 32),
+                                                  (uint32) ControlFile->checkPoint,
+                                                  ControlFile->checkPointCopy.ThisTimeLineID,
+                                                  (uint32) (switchpoint >> 32),
+                                                  (uint32) switchpoint)));
+       }
+
+       /*
+        * The min recovery point should be part of the requested timeline's
+        * history, too.
+        */
+       if (!XLogRecPtrIsInvalid(ControlFile->minRecoveryPoint) &&
+         tliOfPointInHistory(ControlFile->minRecoveryPoint - 1, expectedTLEs) !=
+               ControlFile->minRecoveryPointTLI)
+               ereport(FATAL,
+                               (errmsg("requested timeline %u does not contain minimum recovery point %X/%X on timeline %u",
+                                               recoveryTargetTLI,
+                                               (uint32) (ControlFile->minRecoveryPoint >> 32),
+                                               (uint32) ControlFile->minRecoveryPoint,
+                                               ControlFile->minRecoveryPointTLI)));
+
        LastRec = RecPtr = checkPointLoc;
 
        ereport(DEBUG1,
                        (errmsg("redo record is at %X/%X; shutdown %s",
-                                       (uint32) (checkPoint.redo >> 32), (uint32) checkPoint.redo,
+                                 (uint32) (checkPoint.redo >> 32), (uint32) checkPoint.redo,
                                        wasShutdown ? "TRUE" : "FALSE")));
        ereport(DEBUG1,
                        (errmsg("next transaction ID: %u/%u; next OID: %u",
@@ -5431,6 +6311,9 @@ StartupXLOG(void)
        ereport(DEBUG1,
                        (errmsg("oldest unfrozen transaction ID: %u, in database %u",
                                        checkPoint.oldestXid, checkPoint.oldestXidDB)));
+       ereport(DEBUG1,
+                       (errmsg("oldest MultiXactId: %u, in database %u",
+                                       checkPoint.oldestMulti, checkPoint.oldestMultiDB)));
        if (!TransactionIdIsNormal(checkPoint.nextXid))
                ereport(PANIC,
                                (errmsg("invalid next transaction ID")));
@@ -5441,9 +6324,20 @@ StartupXLOG(void)
        ShmemVariableCache->oidCount = 0;
        MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
        SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+       SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
        XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
        XLogCtl->ckptXid = checkPoint.nextXid;
 
+       /*
+        * Initialize unlogged LSN. On a clean shutdown, it's restored from the
+        * control file. On recovery, all unlogged relations are blown away, so
+        * the unlogged LSN counter can be reset too.
+        */
+       if (ControlFile->state == DB_SHUTDOWNED)
+               XLogCtl->unloggedLSN = ControlFile->unloggedLSN;
+       else
+               XLogCtl->unloggedLSN = 1;
+
        /*
         * We must replay WAL entries using the same TimeLineID they were created
         * under, so temporarily adopt the TLI indicated by the checkpoint (see
@@ -5451,11 +6345,25 @@ StartupXLOG(void)
         */
        ThisTimeLineID = checkPoint.ThisTimeLineID;
 
+       /*
+        * Copy any missing timeline history files between 'now' and the recovery
+        * target timeline from archive to pg_xlog. While we don't need those
+        * files ourselves - the history file of the recovery target timeline
+        * covers all the previous timelines in the history too - a cascading
+        * standby server might be interested in them. Or, if you archive the WAL
+        * from this server to a different archive than the master, it'd be good
+        * for all the history files to get archived there after failover, so that
+        * you can use one of the old timelines as a PITR target. Timeline history
+        * files are small, so it's better to copy them unnecessarily than not
+        * copy them and regret later.
+        */
+       restoreTimeLineHistoryFiles(ThisTimeLineID, recoveryTargetTLI);
+
        lastFullPageWrites = checkPoint.fullPageWrites;
 
-       RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
+       RedoRecPtr = XLogCtl->RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
 
-       if (XLByteLT(RecPtr, checkPoint.redo))
+       if (RecPtr < checkPoint.redo)
                ereport(PANIC,
                                (errmsg("invalid redo in checkpoint record")));
 
@@ -5464,7 +6372,7 @@ StartupXLOG(void)
         * have been a clean shutdown and we did not have a recovery.conf file,
         * then assume no recovery needed.
         */
-       if (XLByteLT(checkPoint.redo, RecPtr))
+       if (checkPoint.redo < RecPtr)
        {
                if (wasShutdown)
                        ereport(PANIC,
@@ -5473,7 +6381,7 @@ StartupXLOG(void)
        }
        else if (ControlFile->state != DB_SHUTDOWNED)
                InRecovery = true;
-       else if (InArchiveRecovery)
+       else if (ArchiveRecoveryRequested)
        {
                /* force recovery due to presence of recovery.conf */
                InRecovery = true;
@@ -5501,6 +6409,12 @@ StartupXLOG(void)
                        ereport(LOG,
                                        (errmsg("database system was not properly shut down; "
                                                        "automatic recovery in progress")));
+                       if (recoveryTargetTLI > ControlFile->checkPointCopy.ThisTimeLineID)
+                               ereport(LOG,
+                                               (errmsg("crash recovery starts in timeline %u "
+                                                               "and has target timeline %u",
+                                                               ControlFile->checkPointCopy.ThisTimeLineID,
+                                                               recoveryTargetTLI)));
                        ControlFile->state = DB_IN_CRASH_RECOVERY;
                }
                ControlFile->prevCheckPoint = ControlFile->checkPoint;
@@ -5509,8 +6423,11 @@ StartupXLOG(void)
                if (InArchiveRecovery)
                {
                        /* initialize minRecoveryPoint if not set yet */
-                       if (XLByteLT(ControlFile->minRecoveryPoint, checkPoint.redo))
+                       if (ControlFile->minRecoveryPoint < checkPoint.redo)
+                       {
                                ControlFile->minRecoveryPoint = checkPoint.redo;
+                               ControlFile->minRecoveryPointTLI = checkPoint.ThisTimeLineID;
+                       }
                }
 
                /*
@@ -5543,6 +6460,7 @@ StartupXLOG(void)
 
                /* initialize our local copy of minRecoveryPoint */
                minRecoveryPoint = ControlFile->minRecoveryPoint;
+               minRecoveryPointTLI = ControlFile->minRecoveryPointTLI;
 
                /*
                 * Reset pgstat data, because it may be invalid after recovery.
@@ -5590,7 +6508,7 @@ StartupXLOG(void)
                 * control file and we've established a recovery snapshot from a
                 * running-xacts WAL record.
                 */
-               if (InArchiveRecovery && EnableHotStandby)
+               if (ArchiveRecoveryRequested && EnableHotStandby)
                {
                        TransactionId *xids;
                        int                     nxids;
@@ -5606,6 +6524,9 @@ StartupXLOG(void)
                                oldestActiveXID = checkPoint.oldestActiveXid;
                        Assert(TransactionIdIsValid(oldestActiveXID));
 
+                       /* Tell procarray about the range of xids it has to deal with */
+                       ProcArrayInitRecovery(ShmemVariableCache->nextXid);
+
                        /*
                         * Startup commit log and subtrans only. Other SLRUs are not
                         * maintained during recovery and need not be started yet.
@@ -5655,11 +6576,11 @@ StartupXLOG(void)
                }
 
                /*
-                * Initialize shared replayEndRecPtr, recoveryLastRecPtr, and
+                * Initialize shared replayEndRecPtr, lastReplayedEndRecPtr, and
                 * recoveryLastXTime.
                 *
                 * This is slightly confusing if we're starting from an online
-                * checkpoint; we've just read and replayed the chekpoint record, but
+                * checkpoint; we've just read and replayed the checkpoint record, but
                 * we're going to start replay from its redo pointer, which precedes
                 * the location of the checkpoint record itself. So even though the
                 * last record we've replayed is indeed ReadRecPtr, we haven't
@@ -5668,7 +6589,9 @@ StartupXLOG(void)
                 */
                SpinLockAcquire(&xlogctl->info_lck);
                xlogctl->replayEndRecPtr = ReadRecPtr;
-               xlogctl->recoveryLastRecPtr = EndRecPtr;
+               xlogctl->replayEndTLI = ThisTimeLineID;
+               xlogctl->lastReplayedEndRecPtr = EndRecPtr;
+               xlogctl->lastReplayedTLI = ThisTimeLineID;
                xlogctl->recoveryLastXTime = 0;
                xlogctl->currentChunkStartTime = 0;
                xlogctl->recoveryPause = false;
@@ -5689,7 +6612,7 @@ StartupXLOG(void)
                 * process in addition to postmaster!  Also, fsync requests are
                 * subsequently to be handled by the checkpointer, not locally.
                 */
-               if (InArchiveRecovery && IsUnderPostmaster)
+               if (ArchiveRecoveryRequested && IsUnderPostmaster)
                {
                        PublishStartupProcessInformation();
                        SetForwardFsyncRequests();
@@ -5707,22 +6630,21 @@ StartupXLOG(void)
                 * Find the first record that logically follows the checkpoint --- it
                 * might physically precede it, though.
                 */
-               if (XLByteLT(checkPoint.redo, RecPtr))
+               if (checkPoint.redo < RecPtr)
                {
                        /* back up to find the record */
-                       record = ReadRecord(&(checkPoint.redo), PANIC, false);
+                       record = ReadRecord(xlogreader, checkPoint.redo, PANIC, false);
                }
                else
                {
                        /* just have to read next record after CheckPoint */
-                       record = ReadRecord(NULL, LOG, false);
+                       record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
                }
 
                if (record != NULL)
                {
                        bool            recoveryContinue = true;
                        bool            recoveryApply = true;
-                       bool            recoveryPause = false;
                        ErrorContextCallback errcallback;
                        TimestampTz xtime;
 
@@ -5730,13 +6652,15 @@ StartupXLOG(void)
 
                        ereport(LOG,
                                        (errmsg("redo starts at %X/%X",
-                                                       (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr)));
+                                                (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr)));
 
                        /*
                         * main redo apply loop
                         */
                        do
                        {
+                               bool            switchedTLI = false;
+
 #ifdef WAL_DEBUG
                                if (XLOG_DEBUG ||
                                 (rmid == RM_XACT_ID && trace_recovery_messages <= DEBUG2) ||
@@ -5746,8 +6670,8 @@ StartupXLOG(void)
 
                                        initStringInfo(&buf);
                                        appendStringInfo(&buf, "REDO @ %X/%X; LSN %X/%X: ",
-                                                                        (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr,
-                                                                        (uint32) (EndRecPtr >> 32), (uint32) EndRecPtr);
+                                                       (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr,
+                                                        (uint32) (EndRecPtr >> 32), (uint32) EndRecPtr);
                                        xlog_outrec(&buf, record);
                                        appendStringInfo(&buf, " - ");
                                        RmgrTable[record->xl_rmid].rm_desc(&buf,
@@ -5761,25 +6685,36 @@ StartupXLOG(void)
                                /* Handle interrupt signals of startup process */
                                HandleStartupProcInterrupts();
 
-                               /* Allow read-only connections if we're consistent now */
-                               CheckRecoveryConsistency();
+                               /*
+                                * Pause WAL replay, if requested by a hot-standby session via
+                                * SetRecoveryPause().
+                                *
+                                * Note that we intentionally don't take the info_lck spinlock
+                                * here.  We might therefore read a slightly stale value of
+                                * the recoveryPause flag, but it can't be very stale (no
+                                * worse than the last spinlock we did acquire).  Since a
+                                * pause request is a pretty asynchronous thing anyway,
+                                * possibly responding to it one WAL record later than we
+                                * otherwise would is a minor issue, so it doesn't seem worth
+                                * adding another spinlock cycle to prevent that.
+                                */
+                               if (xlogctl->recoveryPause)
+                                       recoveryPausesHere();
 
                                /*
                                 * Have we reached our recovery target?
                                 */
                                if (recoveryStopsHere(record, &recoveryApply))
                                {
-                                       /*
-                                        * Pause only if users can connect to send a resume
-                                        * message
-                                        */
-                                       if (recoveryPauseAtTarget && standbyState == STANDBY_SNAPSHOT_READY)
+                                       if (recoveryPauseAtTarget)
                                        {
                                                SetRecoveryPause(true);
                                                recoveryPausesHere();
                                        }
                                        reachedStopPoint = true;        /* see below */
                                        recoveryContinue = false;
+
+                                       /* Exit loop if we reached non-inclusive recovery target */
                                        if (!recoveryApply)
                                                break;
                                }
@@ -5806,21 +6741,58 @@ StartupXLOG(void)
                                        LWLockRelease(XidGenLock);
                                }
 
+                               /*
+                                * Before replaying this record, check if this record causes
+                                * the current timeline to change. The record is already
+                                * considered to be part of the new timeline, so we update
+                                * ThisTimeLineID before replaying it. That's important so
+                                * that replayEndTLI, which is recorded as the minimum
+                                * recovery point's TLI if recovery stops after this record,
+                                * is set correctly.
+                                */
+                               if (record->xl_rmid == RM_XLOG_ID)
+                               {
+                                       TimeLineID      newTLI = ThisTimeLineID;
+                                       TimeLineID      prevTLI = ThisTimeLineID;
+                                       uint8           info = record->xl_info & ~XLR_INFO_MASK;
+
+                                       if (info == XLOG_CHECKPOINT_SHUTDOWN)
+                                       {
+                                               CheckPoint      checkPoint;
+
+                                               memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
+                                               newTLI = checkPoint.ThisTimeLineID;
+                                               prevTLI = checkPoint.PrevTimeLineID;
+                                       }
+                                       else if (info == XLOG_END_OF_RECOVERY)
+                                       {
+                                               xl_end_of_recovery xlrec;
+
+                                               memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_end_of_recovery));
+                                               newTLI = xlrec.ThisTimeLineID;
+                                               prevTLI = xlrec.PrevTimeLineID;
+                                       }
+
+                                       if (newTLI != ThisTimeLineID)
+                                       {
+                                               /* Check that it's OK to switch to this TLI */
+                                               checkTimeLineSwitch(EndRecPtr, newTLI, prevTLI);
+
+                                               /* Following WAL records should be run with new TLI */
+                                               ThisTimeLineID = newTLI;
+                                               switchedTLI = true;
+                                       }
+                               }
+
                                /*
                                 * Update shared replayEndRecPtr before replaying this record,
                                 * so that XLogFlush will update minRecoveryPoint correctly.
                                 */
                                SpinLockAcquire(&xlogctl->info_lck);
                                xlogctl->replayEndRecPtr = EndRecPtr;
-                               recoveryPause = xlogctl->recoveryPause;
+                               xlogctl->replayEndTLI = ThisTimeLineID;
                                SpinLockRelease(&xlogctl->info_lck);
 
-                               /*
-                                * Pause only if users can connect to send a resume message
-                                */
-                               if (recoveryPause && standbyState == STANDBY_SNAPSHOT_READY)
-                                       recoveryPausesHere();
-
                                /*
                                 * If we are attempting to enter Hot Standby mode, process
                                 * XIDs we see
@@ -5835,39 +6807,35 @@ StartupXLOG(void)
                                /* Pop the error context stack */
                                error_context_stack = errcallback.previous;
 
-                               if (!XLogRecPtrIsInvalid(ControlFile->backupEndPoint) &&
-                                       XLByteLE(ControlFile->backupEndPoint, EndRecPtr))
-                               {
-                                       /*
-                                        * We have reached the end of base backup, the point where
-                                        * the minimum recovery point in pg_control indicates. The
-                                        * data on disk is now consistent. Reset backupStartPoint
-                                        * and backupEndPoint.
-                                        */
-                                       elog(DEBUG1, "end of backup reached");
-
-                                       LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
-
-                                       MemSet(&ControlFile->backupStartPoint, 0, sizeof(XLogRecPtr));
-                                       MemSet(&ControlFile->backupEndPoint, 0, sizeof(XLogRecPtr));
-                                       ControlFile->backupEndRequired = false;
-                                       UpdateControlFile();
-
-                                       LWLockRelease(ControlFileLock);
-                               }
-
                                /*
-                                * Update shared recoveryLastRecPtr after this record has been
-                                * replayed.
+                                * Update lastReplayedEndRecPtr after this record has been
+                                * successfully replayed.
                                 */
                                SpinLockAcquire(&xlogctl->info_lck);
-                               xlogctl->recoveryLastRecPtr = EndRecPtr;
+                               xlogctl->lastReplayedEndRecPtr = EndRecPtr;
+                               xlogctl->lastReplayedTLI = ThisTimeLineID;
                                SpinLockRelease(&xlogctl->info_lck);
 
+                               /* Remember this record as the last-applied one */
                                LastRec = ReadRecPtr;
 
-                               record = ReadRecord(NULL, LOG, false);
-                       } while (record != NULL && recoveryContinue);
+                               /* Allow read-only connections if we're consistent now */
+                               CheckRecoveryConsistency();
+
+                               /*
+                                * If this record was a timeline switch, wake up any
+                                * walsenders to notice that we are on a new timeline.
+                                */
+                               if (switchedTLI && AllowCascadeReplication())
+                                       WalSndWakeup();
+
+                               /* Exit loop if we reached inclusive recovery target */
+                               if (!recoveryContinue)
+                                       break;
+
+                               /* Else, try to fetch the next WAL record */
+                               record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
+                       } while (record != NULL);
 
                        /*
                         * end of main redo apply loop
@@ -5875,7 +6843,7 @@ StartupXLOG(void)
 
                        ereport(LOG,
                                        (errmsg("redo done at %X/%X",
-                                                       (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr)));
+                                                (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr)));
                        xtime = GetLatestXTime();
                        if (xtime)
                                ereport(LOG,
@@ -5902,7 +6870,7 @@ StartupXLOG(void)
         * We don't need the latch anymore. It's not strictly necessary to disown
         * it, but let's do it for the sake of tidiness.
         */
-       if (StandbyMode)
+       if (StandbyModeRequested)
                DisownLatch(&XLogCtl->recoveryWakeupLatch);
 
        /*
@@ -5916,7 +6884,7 @@ StartupXLOG(void)
         * Re-fetch the last valid or last applied record, so we can identify the
         * exact endpoint of what we consider the valid portion of WAL.
         */
-       record = ReadRecord(&LastRec, PANIC, false);
+       record = ReadRecord(xlogreader, LastRec, PANIC, false);
        EndOfLog = EndRecPtr;
        XLByteToPrevSeg(EndOfLog, endLogSegNo);
 
@@ -5928,7 +6896,7 @@ StartupXLOG(void)
         * advanced beyond the WAL we processed.
         */
        if (InRecovery &&
-               (XLByteLT(EndOfLog, minRecoveryPoint) ||
+               (EndOfLog < minRecoveryPoint ||
                 !XLogRecPtrIsInvalid(ControlFile->backupStartPoint)))
        {
                if (reachedStopPoint)
@@ -5947,7 +6915,7 @@ StartupXLOG(void)
                 * crashes while an online backup is in progress. We must not treat
                 * that as an error, or the database will refuse to start up.
                 */
-               if (InArchiveRecovery || ControlFile->backupEndRequired)
+               if (ArchiveRecoveryRequested || ControlFile->backupEndRequired)
                {
                        if (ControlFile->backupEndRequired)
                                ereport(FATAL,
@@ -5977,17 +6945,20 @@ StartupXLOG(void)
         *
         * In a normal crash recovery, we can just extend the timeline we were in.
         */
-       if (InArchiveRecovery)
+       PrevTimeLineID = ThisTimeLineID;
+       if (ArchiveRecoveryRequested)
        {
-               char    reason[200];
+               char            reason[200];
+
+               Assert(InArchiveRecovery);
 
                ThisTimeLineID = findNewestTimeLine(recoveryTargetTLI) + 1;
                ereport(LOG,
                                (errmsg("selected new timeline ID: %u", ThisTimeLineID)));
 
                /*
-                * Write comment to history file to explain why and where timeline
-                * changed. Comment varies according to the recovery target used.
+                * Create a comment for the history file to explain why and where
+                * timeline changed.
                 */
                if (recoveryTarget == RECOVERY_TARGET_XID)
                        snprintf(reason, sizeof(reason),
@@ -6007,11 +6978,12 @@ StartupXLOG(void)
                        snprintf(reason, sizeof(reason), "no recovery target specified");
 
                writeTimeLineHistory(ThisTimeLineID, recoveryTargetTLI,
-                                                        curFileTLI, endLogSegNo, reason);
+                                                        EndRecPtr, reason);
        }
 
        /* Save the selected TimeLineID in shared memory, too */
        XLogCtl->ThisTimeLineID = ThisTimeLineID;
+       XLogCtl->PrevTimeLineID = PrevTimeLineID;
 
        /*
         * We are now done reading the old WAL.  Turn off archive fetching if it
@@ -6019,8 +6991,8 @@ StartupXLOG(void)
         * that we also have a copy of the last block of the old WAL in readBuf;
         * we will use that below.)
         */
-       if (InArchiveRecovery)
-               exitArchiveRecovery(curFileTLI, endLogSegNo);
+       if (ArchiveRecoveryRequested)
+               exitArchiveRecovery(xlogreader->readPageTLI, endLogSegNo);
 
        /*
         * Prepare to write WAL starting at EndOfLog position, and init xlog
@@ -6031,46 +7003,52 @@ StartupXLOG(void)
        openLogFile = XLogFileOpen(openLogSegNo);
        openLogOff = 0;
        Insert = &XLogCtl->Insert;
-       Insert->PrevRecord = LastRec;
-       XLogCtl->xlblocks[0] = ((EndOfLog - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ;
+       Insert->PrevBytePos = XLogRecPtrToBytePos(LastRec);
+       Insert->CurrBytePos = XLogRecPtrToBytePos(EndOfLog);
 
        /*
         * Tricky point here: readBuf contains the *last* block that the LastRec
         * record spans, not the one it starts in.      The last block is indeed the
         * one we want to use.
         */
-       Assert(readOff == (XLogCtl->xlblocks[0] - XLOG_BLCKSZ) % XLogSegSize);
-       memcpy((char *) Insert->currpage, readBuf, XLOG_BLCKSZ);
-       Insert->currpos = (char *) Insert->currpage +
-               (EndOfLog + XLOG_BLCKSZ - XLogCtl->xlblocks[0]);
+       if (EndOfLog % XLOG_BLCKSZ != 0)
+       {
+               char       *page;
+               int                     len;
+               int                     firstIdx;
+               XLogRecPtr      pageBeginPtr;
 
-       LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
+               pageBeginPtr = EndOfLog - (EndOfLog % XLOG_BLCKSZ);
+               Assert(readOff == pageBeginPtr % XLogSegSize);
 
-       XLogCtl->LogwrtResult = LogwrtResult;
+               firstIdx = XLogRecPtrToBufIdx(EndOfLog);
 
-       XLogCtl->LogwrtRqst.Write = EndOfLog;
-       XLogCtl->LogwrtRqst.Flush = EndOfLog;
+               /* Copy the valid part of the last block, and zero the rest */
+               page = &XLogCtl->pages[firstIdx * XLOG_BLCKSZ];
+               len = EndOfLog % XLOG_BLCKSZ;
+               memcpy(page, xlogreader->readBuf, len);
+               memset(page + len, 0, XLOG_BLCKSZ - len);
 
-       freespace = INSERT_FREESPACE(Insert);
-       if (freespace > 0)
-       {
-               /* Make sure rest of page is zero */
-               MemSet(Insert->currpos, 0, freespace);
-               XLogCtl->Write.curridx = 0;
+               XLogCtl->xlblocks[firstIdx] = pageBeginPtr + XLOG_BLCKSZ;
+               XLogCtl->InitializedUpTo = pageBeginPtr + XLOG_BLCKSZ;
        }
        else
        {
                /*
-                * Whenever LogwrtResult points to exactly the end of a page,
-                * Write.curridx must point to the *next* page (see XLogWrite()).
-                *
-                * Note: it might seem we should do AdvanceXLInsertBuffer() here, but
-                * this is sufficient.  The first actual attempt to insert a log
-                * record will advance the insert state.
+                * There is no partial block to copy. Just set InitializedUpTo,
+                * and let the first attempt to insert a log record to initialize
+                * the next buffer.
                 */
-               XLogCtl->Write.curridx = NextBufIdx(0);
+               XLogCtl->InitializedUpTo = EndOfLog;
        }
 
+       LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
+
+       XLogCtl->LogwrtResult = LogwrtResult;
+
+       XLogCtl->LogwrtRqst.Write = EndOfLog;
+       XLogCtl->LogwrtRqst.Flush = EndOfLog;
+
        /* Pre-scan prepared transactions to find out the range of XIDs present */
        oldestActiveXID = PrescanPreparedTransactions(NULL, NULL);
 
@@ -6115,11 +7093,48 @@ StartupXLOG(void)
                 * assigning a new TLI, using a shutdown checkpoint allows us to have
                 * the rule that TLI only changes in shutdown checkpoints, which
                 * allows some extra error checking in xlog_redo.
+                *
+                * In fast promotion, only create a lightweight end-of-recovery record
+                * instead of a full checkpoint. A checkpoint is requested later,
+                * after we're fully out of recovery mode and already accepting
+                * queries.
                 */
                if (bgwriterLaunched)
-                       RequestCheckpoint(CHECKPOINT_END_OF_RECOVERY |
-                                                         CHECKPOINT_IMMEDIATE |
-                                                         CHECKPOINT_WAIT);
+               {
+                       if (fast_promote)
+                       {
+                               checkPointLoc = ControlFile->prevCheckPoint;
+
+                               /*
+                                * Confirm the last checkpoint is available for us to recover
+                                * from if we fail. Note that we don't check for the secondary
+                                * checkpoint since that isn't available in most base backups.
+                                */
+                               record = ReadCheckpointRecord(xlogreader, checkPointLoc, 1, false);
+                               if (record != NULL)
+                               {
+                                       fast_promoted = true;
+
+                                       /*
+                                        * Insert a special WAL record to mark the end of
+                                        * recovery, since we aren't doing a checkpoint. That
+                                        * means that the checkpointer process may likely be in
+                                        * the middle of a time-smoothed restartpoint and could
+                                        * continue to be for minutes after this. That sounds
+                                        * strange, but the effect is roughly the same and it
+                                        * would be stranger to try to come out of the
+                                        * restartpoint and then checkpoint. We request a
+                                        * checkpoint later anyway, just for safety.
+                                        */
+                                       CreateEndOfRecoveryRecord();
+                               }
+                       }
+
+                       if (!fast_promoted)
+                               RequestCheckpoint(CHECKPOINT_END_OF_RECOVERY |
+                                                                 CHECKPOINT_IMMEDIATE |
+                                                                 CHECKPOINT_WAIT);
+               }
                else
                        CreateCheckPoint(CHECKPOINT_END_OF_RECOVERY | CHECKPOINT_IMMEDIATE);
 
@@ -6157,7 +7172,7 @@ StartupXLOG(void)
        LWLockRelease(ControlFileLock);
 
        /* start the archive_timeout timer running */
-       XLogCtl->Write.lastSegSwitchTime = (pg_time_t) time(NULL);
+       XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);
 
        /* also initialize latestCompletedXid, to nextXid - 1 */
        LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
@@ -6191,23 +7206,13 @@ StartupXLOG(void)
        if (standbyState != STANDBY_DISABLED)
                ShutdownRecoveryTransactionEnvironment();
 
-       /* Shut down readFile facility, free space */
+       /* Shut down xlogreader */
        if (readFile >= 0)
        {
                close(readFile);
                readFile = -1;
        }
-       if (readBuf)
-       {
-               free(readBuf);
-               readBuf = NULL;
-       }
-       if (readRecordBuf)
-       {
-               free(readRecordBuf);
-               readRecordBuf = NULL;
-               readRecordBufSize = 0;
-       }
+       XLogReaderFree(xlogreader);
 
        /*
         * If any of the critical GUCs have changed, log them before we allow
@@ -6230,6 +7235,21 @@ StartupXLOG(void)
                xlogctl->SharedRecoveryInProgress = false;
                SpinLockRelease(&xlogctl->info_lck);
        }
+
+       /*
+        * If there were cascading standby servers connected to us, nudge any wal
+        * sender processes to notice that we've been promoted.
+        */
+       WalSndWakeup();
+
+       /*
+        * If this was a fast promotion, request an (online) checkpoint now. This
+        * isn't required for consistency, but the last restartpoint might be far
+        * back, and in case of a crash, recovering from it might take a longer
+        * than is appropriate now that we're not in standby mode anymore.
+        */
+       if (fast_promoted)
+               RequestCheckpoint(CHECKPOINT_FORCE);
 }
 
 /*
@@ -6248,10 +7268,42 @@ CheckRecoveryConsistency(void)
                return;
 
        /*
-        * Have we passed our safe starting point?
+        * Have we reached the point where our base backup was completed?
+        */
+       if (!XLogRecPtrIsInvalid(ControlFile->backupEndPoint) &&
+               ControlFile->backupEndPoint <= EndRecPtr)
+       {
+               /*
+                * We have reached the end of base backup, as indicated by pg_control.
+                * The data on disk is now consistent. Reset backupStartPoint and
+                * backupEndPoint, and update minRecoveryPoint to make sure we don't
+                * allow starting up at an earlier point even if recovery is stopped
+                * and restarted soon after this.
+                */
+               elog(DEBUG1, "end of backup reached");
+
+               LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+
+               if (ControlFile->minRecoveryPoint < EndRecPtr)
+                       ControlFile->minRecoveryPoint = EndRecPtr;
+
+               ControlFile->backupStartPoint = InvalidXLogRecPtr;
+               ControlFile->backupEndPoint = InvalidXLogRecPtr;
+               ControlFile->backupEndRequired = false;
+               UpdateControlFile();
+
+               LWLockRelease(ControlFileLock);
+       }
+
+       /*
+        * Have we passed our safe starting point? Note that minRecoveryPoint is
+        * known to be incorrectly set if ControlFile->backupEndRequired, until
+        * the XLOG_BACKUP_RECORD arrives to advise us of the correct
+        * minRecoveryPoint. All we know prior to that is that we're not
+        * consistent yet.
         */
-       if (!reachedConsistency &&
-               XLByteLE(minRecoveryPoint, EndRecPtr) &&
+       if (!reachedConsistency && !ControlFile->backupEndRequired &&
+               minRecoveryPoint <= XLogCtl->lastReplayedEndRecPtr &&
                XLogRecPtrIsInvalid(ControlFile->backupStartPoint))
        {
                /*
@@ -6263,7 +7315,8 @@ CheckRecoveryConsistency(void)
                reachedConsistency = true;
                ereport(LOG,
                                (errmsg("consistent recovery state reached at %X/%X",
-                                               (uint32) (EndRecPtr >> 32), (uint32) EndRecPtr)));
+                                               (uint32) (XLogCtl->lastReplayedEndRecPtr >> 32),
+                                               (uint32) XLogCtl->lastReplayedEndRecPtr)));
        }
 
        /*
@@ -6418,12 +7471,16 @@ LocalSetXLogInsertAllowed(void)
  * 1 for "primary", 2 for "secondary", 0 for "other" (backup_label)
  */
 static XLogRecord *
-ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
+ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
+                                        int whichChkpt, bool report)
 {
        XLogRecord *record;
 
        if (!XRecOffIsValid(RecPtr))
        {
+               if (!report)
+                       return NULL;
+
                switch (whichChkpt)
                {
                        case 1:
@@ -6442,10 +7499,13 @@ ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
                return NULL;
        }
 
-       record = ReadRecord(&RecPtr, LOG, true);
+       record = ReadRecord(xlogreader, RecPtr, LOG, true);
 
        if (record == NULL)
        {
+               if (!report)
+                       return NULL;
+
                switch (whichChkpt)
                {
                        case 1:
@@ -6547,21 +7607,29 @@ InitXLOGAccess(void)
 }
 
 /*
- * Once spawned, a backend may update its local RedoRecPtr from
- * XLogCtl->Insert.RedoRecPtr; it must hold the insert lock or info_lck
- * to do so.  This is done in XLogInsert() or GetRedoRecPtr().
+ * Return the current Redo pointer from shared memory.
+ *
+ * As a side-effect, the local RedoRecPtr copy is updated.
  */
 XLogRecPtr
 GetRedoRecPtr(void)
 {
        /* use volatile pointer to prevent code rearrangement */
        volatile XLogCtlData *xlogctl = XLogCtl;
+       XLogRecPtr ptr;
 
+       /*
+        * The possibly not up-to-date copy in XlogCtl is enough. Even if we
+        * grabbed a WAL insertion slot to read the master copy, someone might
+        * update it just after we've released the lock.
+        */
        SpinLockAcquire(&xlogctl->info_lck);
-       Assert(XLByteLE(RedoRecPtr, xlogctl->Insert.RedoRecPtr));
-       RedoRecPtr = xlogctl->Insert.RedoRecPtr;
+       ptr = xlogctl->RedoRecPtr;
        SpinLockRelease(&xlogctl->info_lck);
 
+       if (RedoRecPtr < ptr)
+               RedoRecPtr = ptr;
+
        return RedoRecPtr;
 }
 
@@ -6570,9 +7638,8 @@ GetRedoRecPtr(void)
  *
  * NOTE: The value *actually* returned is the position of the last full
  * xlog page. It lags behind the real insert position by at most 1 page.
- * For that, we don't need to acquire WALInsertLock which can be quite
- * heavily contended, and an approximation is enough for the current
- * usage of this function.
+ * For that, we don't need to scan through WAL insertion slots, and an
+ * approximation is enough for the current usage of this function.
  */
 XLogRecPtr
 GetInsertRecPtr(void)
@@ -6616,7 +7683,7 @@ GetLastSegSwitchTime(void)
 
        /* Need WALWriteLock, but shared lock is sufficient */
        LWLockAcquire(WALWriteLock, LW_SHARED);
-       result = XLogCtl->Write.lastSegSwitchTime;
+       result = XLogCtl->lastSegSwitchTime;
        LWLockRelease(WALWriteLock);
 
        return result;
@@ -6662,30 +7729,14 @@ GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
        *epoch = ckptXidEpoch;
 }
 
-/*
- * GetRecoveryTargetTLI - get the current recovery target timeline ID
- */
-TimeLineID
-GetRecoveryTargetTLI(void)
-{
-       /* use volatile pointer to prevent code rearrangement */
-       volatile XLogCtlData *xlogctl = XLogCtl;
-       TimeLineID result;
-
-       SpinLockAcquire(&xlogctl->info_lck);
-       result = xlogctl->RecoveryTargetTLI;
-       SpinLockRelease(&xlogctl->info_lck);
-
-       return result;
-}
-
 /*
  * This must be called ONCE during postmaster or standalone-backend shutdown
  */
 void
 ShutdownXLOG(int code, Datum arg)
 {
-       ereport(LOG,
+       /* Don't be chatty in standalone mode */
+       ereport(IsPostmasterEnvironment ? LOG : NOTICE,
                        (errmsg("shutting down")));
 
        if (RecoveryInProgress())
@@ -6707,7 +7758,8 @@ ShutdownXLOG(int code, Datum arg)
        ShutdownSUBTRANS();
        ShutdownMultiXact();
 
-       ereport(LOG,
+       /* Don't be chatty in standalone mode */
+       ereport(IsPostmasterEnvironment ? LOG : NOTICE,
                        (errmsg("database system is shut down")));
 }
 
@@ -6864,6 +7916,8 @@ LogCheckpointEnd(bool restartpoint)
 void
 CreateCheckPoint(int flags)
 {
+       /* use volatile pointer to prevent code rearrangement */
+       volatile XLogCtlData *xlogctl = XLogCtl;
        bool            shutdown;
        CheckPoint      checkPoint;
        XLogRecPtr      recptr;
@@ -6871,8 +7925,9 @@ CreateCheckPoint(int flags)
        XLogRecData rdata;
        uint32          freespace;
        XLogSegNo       _logSegNo;
-       TransactionId *inCommitXids;
-       int                     nInCommit;
+       XLogRecPtr      curInsert;
+       VirtualTransactionId *vxids;
+       int                     nvxids;
 
        /*
         * An end-of-recovery checkpoint is really a shutdown checkpoint, just
@@ -6941,10 +7996,11 @@ CreateCheckPoint(int flags)
                checkPoint.oldestActiveXid = InvalidTransactionId;
 
        /*
-        * We must hold WALInsertLock while examining insert state to determine
-        * the checkpoint REDO pointer.
+        * We must block concurrent insertions while examining insert state to
+        * determine the checkpoint REDO pointer.
         */
-       LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+       WALInsertSlotAcquire(true);
+       curInsert = XLogBytePosToRecPtr(Insert->CurrBytePos);
 
        /*
         * If this isn't a shutdown or forced checkpoint, and we have not inserted
@@ -6964,14 +8020,11 @@ CreateCheckPoint(int flags)
        if ((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY |
                                  CHECKPOINT_FORCE)) == 0)
        {
-               XLogRecPtr      curInsert;
-
-               INSERT_RECPTR(curInsert, Insert, Insert->curridx);
-               if (curInsert == ControlFile->checkPoint + 
+               if (curInsert == ControlFile->checkPoint +
                        MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
                        ControlFile->checkPoint == ControlFile->checkPointCopy.redo)
                {
-                       LWLockRelease(WALInsertLock);
+                       WALInsertSlotRelease();
                        LWLockRelease(CheckpointLock);
                        END_CRIT_SECTION();
                        return;
@@ -6988,6 +8041,11 @@ CreateCheckPoint(int flags)
                LocalSetXLogInsertAllowed();
 
        checkPoint.ThisTimeLineID = ThisTimeLineID;
+       if (flags & CHECKPOINT_END_OF_RECOVERY)
+               checkPoint.PrevTimeLineID = XLogCtl->PrevTimeLineID;
+       else
+               checkPoint.PrevTimeLineID = ThisTimeLineID;
+
        checkPoint.fullPageWrites = Insert->fullPageWrites;
 
        /*
@@ -6998,18 +8056,19 @@ CreateCheckPoint(int flags)
         * the buffer flush work.  Those XLOG records are logically after the
         * checkpoint, even though physically before it.  Got that?
         */
-       freespace = INSERT_FREESPACE(Insert);
+       freespace = INSERT_FREESPACE(curInsert);
        if (freespace == 0)
        {
-               (void) AdvanceXLInsertBuffer(false);
-               /* OK to ignore update return flag, since we will do flush anyway */
-               freespace = INSERT_FREESPACE(Insert);
+               if (curInsert % XLogSegSize == 0)
+                       curInsert += SizeOfXLogLongPHD;
+               else
+                       curInsert += SizeOfXLogShortPHD;
        }
-       INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx);
+       checkPoint.redo = curInsert;
 
        /*
         * Here we update the shared RedoRecPtr for future XLogInsert calls; this
-        * must be done while holding the insert lock AND the info_lck.
+        * must be done while holding the insertion slots.
         *
         * Note: if we fail to complete the checkpoint, RedoRecPtr will be left
         * pointing past where it really needs to point.  This is okay; the only
@@ -7018,20 +8077,18 @@ CreateCheckPoint(int flags)
         * XLogInserts that happen while we are dumping buffers must assume that
         * their buffer changes are not included in the checkpoint.
         */
-       {
-               /* use volatile pointer to prevent code rearrangement */
-               volatile XLogCtlData *xlogctl = XLogCtl;
-
-               SpinLockAcquire(&xlogctl->info_lck);
-               RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
-               SpinLockRelease(&xlogctl->info_lck);
-       }
+       RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
 
        /*
-        * Now we can release WAL insert lock, allowing other xacts to proceed
-        * while we are flushing disk buffers.
+        * Now we can release the WAL insertion slots, allowing other xacts to
+        * proceed while we are flushing disk buffers.
         */
-       LWLockRelease(WALInsertLock);
+       WALInsertSlotRelease();
+
+       /* Update the info_lck-protected copy of RedoRecPtr as well */
+       SpinLockAcquire(&xlogctl->info_lck);
+       xlogctl->RedoRecPtr = checkPoint.redo;
+       SpinLockRelease(&xlogctl->info_lck);
 
        /*
         * If enabled, log checkpoint start.  We postpone this until now so as not
@@ -7043,38 +8100,44 @@ CreateCheckPoint(int flags)
        TRACE_POSTGRESQL_CHECKPOINT_START(flags);
 
        /*
-        * Before flushing data, we must wait for any transactions that are
-        * currently in their commit critical sections.  If an xact inserted its
-        * commit record into XLOG just before the REDO point, then a crash
+        * In some cases there are groups of actions that must all occur on one
+        * side or the other of a checkpoint record. Before flushing the
+        * checkpoint record we must explicitly wait for any backend currently
+        * performing those groups of actions.
+        *
+        * One example is end of transaction, so we must wait for any transactions
+        * that are currently in commit critical sections.      If an xact inserted
+        * its commit record into XLOG just before the REDO point, then a crash
         * restart from the REDO point would not replay that record, which means
         * that our flushing had better include the xact's update of pg_clog.  So
         * we wait till he's out of his commit critical section before proceeding.
         * See notes in RecordTransactionCommit().
         *
-        * Because we've already released WALInsertLock, this test is a bit fuzzy:
-        * it is possible that we will wait for xacts we didn't really need to
-        * wait for.  But the delay should be short and it seems better to make
-        * checkpoint take a bit longer than to hold locks longer than necessary.
+        * Because we've already released the insertion slots, this test is a bit
+        * fuzzy: it is possible that we will wait for xacts we didn't really need
+        * to wait for.  But the delay should be short and it seems better to make
+        * checkpoint take a bit longer than to hold off insertions longer than
+        * necessary.
         * (In fact, the whole reason we have this issue is that xact.c does
         * commit record XLOG insertion and clog update as two separate steps
         * protected by different locks, but again that seems best on grounds of
         * minimizing lock contention.)
         *
-        * A transaction that has not yet set inCommit when we look cannot be at
+        * A transaction that has not yet set delayChkpt when we look cannot be at
         * risk, since he's not inserted his commit record yet; and one that's
         * already cleared it is not at risk either, since he's done fixing clog
         * and we will correctly flush the update below.  So we cannot miss any
         * xacts we need to wait for.
         */
-       nInCommit = GetTransactionsInCommit(&inCommitXids);
-       if (nInCommit > 0)
+       vxids = GetVirtualXIDsDelayingChkpt(&nvxids);
+       if (nvxids > 0)
        {
                do
                {
                        pg_usleep(10000L);      /* wait for 10 msec */
-               } while (HaveTransactionsInCommit(inCommitXids, nInCommit));
+               } while (HaveVirtualXIDsDelayingChkpt(vxids, nvxids));
        }
-       pfree(inCommitXids);
+       pfree(vxids);
 
        /*
         * Get the other info we need for the checkpoint record.
@@ -7098,7 +8161,9 @@ CreateCheckPoint(int flags)
 
        MultiXactGetCheckptMulti(shutdown,
                                                         &checkPoint.nextMulti,
-                                                        &checkPoint.nextMultiOffset);
+                                                        &checkPoint.nextMultiOffset,
+                                                        &checkPoint.oldestMulti,
+                                                        &checkPoint.oldestMultiDB);
 
        /*
         * Having constructed the checkpoint record, ensure all shmem disk buffers
@@ -7159,7 +8224,7 @@ CreateCheckPoint(int flags)
         * We now have ProcLastRecPtr = start of actual checkpoint record, recptr
         * = end of actual checkpoint record.
         */
-       if (shutdown && !XLByteEQ(checkPoint.redo, ProcLastRecPtr))
+       if (shutdown && checkPoint.redo != ProcLastRecPtr)
                ereport(PANIC,
                                (errmsg("concurrent transaction log activity while database system is shutting down")));
 
@@ -7180,7 +8245,18 @@ CreateCheckPoint(int flags)
        ControlFile->checkPointCopy = checkPoint;
        ControlFile->time = (pg_time_t) time(NULL);
        /* crash recovery should always recover to the end of WAL */
-       MemSet(&ControlFile->minRecoveryPoint, 0, sizeof(XLogRecPtr));
+       ControlFile->minRecoveryPoint = InvalidXLogRecPtr;
+       ControlFile->minRecoveryPointTLI = 0;
+
+       /*
+        * Persist unloggedLSN value. It's reset on crash recovery, so this goes
+        * unused on non-shutdown checkpoints, but seems useful to store it always
+        * for debugging purposes.
+        */
+       SpinLockAcquire(&XLogCtl->ulsn_lck);
+       ControlFile->unloggedLSN = XLogCtl->unloggedLSN;
+       SpinLockRelease(&XLogCtl->ulsn_lck);
+
        UpdateControlFile();
        LWLockRelease(ControlFileLock);
 
@@ -7246,6 +8322,62 @@ CreateCheckPoint(int flags)
        LWLockRelease(CheckpointLock);
 }
 
+/*
+ * Mark the end of recovery in WAL though without running a full checkpoint.
+ * We can expect that a restartpoint is likely to be in progress as we
+ * do this, though we are unwilling to wait for it to complete. So be
+ * careful to avoid taking the CheckpointLock anywhere here.
+ *
+ * CreateRestartPoint() allows for the case where recovery may end before
+ * the restartpoint completes so there is no concern of concurrent behaviour.
+ */
+void
+CreateEndOfRecoveryRecord(void)
+{
+       xl_end_of_recovery xlrec;
+       XLogRecData rdata;
+       XLogRecPtr      recptr;
+
+       /* sanity check */
+       if (!RecoveryInProgress())
+               elog(ERROR, "can only be used to end recovery");
+
+       xlrec.end_time = time(NULL);
+
+       WALInsertSlotAcquire(true);
+       xlrec.ThisTimeLineID = ThisTimeLineID;
+       xlrec.PrevTimeLineID = XLogCtl->PrevTimeLineID;
+       WALInsertSlotRelease();
+
+       LocalSetXLogInsertAllowed();
+
+       START_CRIT_SECTION();
+
+       rdata.data = (char *) &xlrec;
+       rdata.len = sizeof(xl_end_of_recovery);
+       rdata.buffer = InvalidBuffer;
+       rdata.next = NULL;
+
+       recptr = XLogInsert(RM_XLOG_ID, XLOG_END_OF_RECOVERY, &rdata);
+
+       XLogFlush(recptr);
+
+       /*
+        * Update the control file so that crash recovery can follow the timeline
+        * changes to this point.
+        */
+       LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+       ControlFile->time = (pg_time_t) xlrec.end_time;
+       ControlFile->minRecoveryPoint = recptr;
+       ControlFile->minRecoveryPointTLI = ThisTimeLineID;
+       UpdateControlFile();
+       LWLockRelease(ControlFileLock);
+
+       END_CRIT_SECTION();
+
+       LocalXLogInsertAllowed = -1;    /* return to "check" state */
+}
+
 /*
  * Flush all data in shared memory to disk, and fsync
  *
@@ -7391,11 +8523,12 @@ CreateRestartPoint(int flags)
         * side-effect.
         */
        if (XLogRecPtrIsInvalid(lastCheckPointRecPtr) ||
-               XLByteLE(lastCheckPoint.redo, ControlFile->checkPointCopy.redo))
+               lastCheckPoint.redo <= ControlFile->checkPointCopy.redo)
        {
                ereport(DEBUG2,
                                (errmsg("skipping restartpoint, already performed at %X/%X",
-                                               (uint32) (lastCheckPoint.redo >> 32), (uint32) lastCheckPoint.redo)));
+                                               (uint32) (lastCheckPoint.redo >> 32),
+                                               (uint32) lastCheckPoint.redo)));
 
                UpdateMinRecoveryPoint(InvalidXLogRecPtr, true);
                if (flags & CHECKPOINT_IS_SHUTDOWN)
@@ -7415,15 +8548,18 @@ CreateRestartPoint(int flags)
         * the number of segments replayed since last restartpoint, and request a
         * restartpoint if it exceeds checkpoint_segments.
         *
-        * You need to hold WALInsertLock and info_lck to update it, although
-        * during recovery acquiring WALInsertLock is just pro forma, because
-        * there is no other processes updating Insert.RedoRecPtr.
+        * Like in CreateCheckPoint(), hold off insertions to update it, although
+        * during recovery this is just pro forma, because no WAL insertions are
+        * happening.
         */
-       LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
-       SpinLockAcquire(&xlogctl->info_lck);
+       WALInsertSlotAcquire(true);
        xlogctl->Insert.RedoRecPtr = lastCheckPoint.redo;
+       WALInsertSlotRelease();
+
+       /* Also update the info_lck-protected copy */
+       SpinLockAcquire(&xlogctl->info_lck);
+       xlogctl->RedoRecPtr = lastCheckPoint.redo;
        SpinLockRelease(&xlogctl->info_lck);
-       LWLockRelease(WALInsertLock);
 
        /*
         * Prepare to accumulate statistics.
@@ -7454,7 +8590,7 @@ CreateRestartPoint(int flags)
         */
        LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
        if (ControlFile->state == DB_IN_ARCHIVE_RECOVERY &&
-               XLByteLT(ControlFile->checkPointCopy.redo, lastCheckPoint.redo))
+               ControlFile->checkPointCopy.redo < lastCheckPoint.redo)
        {
                ControlFile->prevCheckPoint = ControlFile->checkPoint;
                ControlFile->checkPoint = lastCheckPointRecPtr;
@@ -7473,13 +8609,38 @@ CreateRestartPoint(int flags)
         */
        if (_logSegNo)
        {
+               XLogRecPtr      receivePtr;
+               XLogRecPtr      replayPtr;
+               TimeLineID      replayTLI;
                XLogRecPtr      endptr;
 
-               /* Get the current (or recent) end of xlog */
-               endptr = GetStandbyFlushRecPtr(NULL);
+               /*
+                * Get the current end of xlog replayed or received, whichever is
+                * later.
+                */
+               receivePtr = GetWalRcvWriteRecPtr(NULL, NULL);
+               replayPtr = GetXLogReplayRecPtr(&replayTLI);
+               endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
 
                KeepLogSeg(endptr, &_logSegNo);
                _logSegNo--;
+
+               /*
+                * Try to recycle segments on a useful timeline. If we've been promoted
+                * since the beginning of this restartpoint, use the new timeline
+                * chosen at end of recovery (RecoveryInProgress() sets ThisTimeLineID
+                * in that case). If we're still in recovery, use the timeline we're
+                * currently replaying.
+                *
+                * There is no guarantee that the WAL segments will be useful on the
+                * current timeline; if recovery proceeds to a new timeline right
+                * after this, the pre-allocated WAL segments on this timeline will
+                * not be used, and will go wasted until recycled on the next
+                * restartpoint. We'll live with that.
+                */
+               if (RecoveryInProgress())
+                       ThisTimeLineID = replayTLI;
+
                RemoveOldXlogFiles(_logSegNo, endptr);
 
                /*
@@ -7487,6 +8648,16 @@ CreateRestartPoint(int flags)
                 * segments, since that may supply some of the needed files.)
                 */
                PreallocXlogFiles(endptr);
+
+               /*
+                * ThisTimeLineID is normally not set when we're still in recovery.
+                * However, recycling/preallocating segments above needed
+                * ThisTimeLineID to determine which timeline to install the segments
+                * on. Reset it now, to restore the normal state of affairs for
+                * debugging purposes.
+                */
+               if (RecoveryInProgress())
+                       ThisTimeLineID = 0;
        }
 
        /*
@@ -7505,7 +8676,7 @@ CreateRestartPoint(int flags)
        xtime = GetLatestXTime();
        ereport((log_checkpoints ? LOG : DEBUG2),
                        (errmsg("recovery restart point at %X/%X",
-                                       (uint32) (lastCheckPoint.redo >> 32), (uint32) lastCheckPoint.redo),
+                (uint32) (lastCheckPoint.redo >> 32), (uint32) lastCheckPoint.redo),
                   xtime ? errdetail("last completed transaction was at log time %s",
                                                         timestamptz_to_str(xtime)) : 0));
 
@@ -7523,9 +8694,9 @@ CreateRestartPoint(int flags)
 }
 
 /*
- * Calculate the last segment that we need to retain because of
- * wal_keep_segments, by subtracting wal_keep_segments from
- * the given xlog location, recptr.
+ * Retreat *logSegNo to the last segment that we need to retain because of
+ * wal_keep_segments. This is calculated by subtracting wal_keep_segments
+ * from the given xlog location, recptr.
  */
 static void
 KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
@@ -7541,7 +8712,7 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
        if (segno <= wal_keep_segments)
                segno = 1;
        else
-               segno = *logSegNo - wal_keep_segments;
+               segno = segno - wal_keep_segments;
 
        /* don't delete WAL segments newer than the calculated segment */
        if (segno < *logSegNo)
@@ -7636,6 +8807,95 @@ XLogRestorePoint(const char *rpName)
        return RecPtr;
 }
 
+/*
+ * Write a backup block if needed when we are setting a hint. Note that
+ * this may be called for a variety of page types, not just heaps.
+ *
+ * Callable while holding just share lock on the buffer content.
+ *
+ * We can't use the plain backup block mechanism since that relies on the
+ * Buffer being exclusively locked. Since some modifications (setting LSN, hint
+ * bits) are allowed in a sharelocked buffer that can lead to wal checksum
+ * failures. So instead we copy the page and insert the copied data as normal
+ * record data.
+ *
+ * We only need to do something if page has not yet been full page written in
+ * this checkpoint round. The LSN of the inserted wal record is returned if we
+ * had to write, InvalidXLogRecPtr otherwise.
+ *
+ * It is possible that multiple concurrent backends could attempt to write WAL
+ * records. In that case, multiple copies of the same block would be recorded
+ * in separate WAL records by different backends, though that is still OK from
+ * a correctness perspective.
+ */
+XLogRecPtr
+XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
+{
+       XLogRecPtr      recptr = InvalidXLogRecPtr;
+       XLogRecPtr      lsn;
+       XLogRecData rdata[2];
+       BkpBlock        bkpb;
+
+       /*
+        * Ensure no checkpoint can change our view of RedoRecPtr.
+        */
+       Assert(MyPgXact->delayChkpt);
+
+       /*
+        * Update RedoRecPtr so XLogCheckBuffer can make the right decision
+        */
+       GetRedoRecPtr();
+
+       /*
+        * Setup phony rdata element for use within XLogCheckBuffer only. We reuse
+        * and reset rdata for any actual WAL record insert.
+        */
+       rdata[0].buffer = buffer;
+       rdata[0].buffer_std = buffer_std;
+
+       /*
+        * Check buffer while not holding an exclusive lock.
+        */
+       if (XLogCheckBuffer(rdata, false, &lsn, &bkpb))
+       {
+               char            copied_buffer[BLCKSZ];
+               char       *origdata = (char *) BufferGetBlock(buffer);
+
+               /*
+                * Copy buffer so we don't have to worry about concurrent hint bit or
+                * lsn updates. We assume pd_lower/upper cannot be changed without an
+                * exclusive lock, so the contents bkp are not racy.
+                *
+                * With buffer_std set to false, XLogCheckBuffer() sets hole_length and
+                * hole_offset to 0; so the following code is safe for either case.
+                */
+               memcpy(copied_buffer, origdata, bkpb.hole_offset);
+               memcpy(copied_buffer + bkpb.hole_offset,
+                          origdata + bkpb.hole_offset + bkpb.hole_length,
+                          BLCKSZ - bkpb.hole_offset - bkpb.hole_length);
+
+               /*
+                * Header for backup block.
+                */
+               rdata[0].data = (char *) &bkpb;
+               rdata[0].len = sizeof(BkpBlock);
+               rdata[0].buffer = InvalidBuffer;
+               rdata[0].next = &(rdata[1]);
+
+               /*
+                * Save copy of the buffer.
+                */
+               rdata[1].data = copied_buffer;
+               rdata[1].len = BLCKSZ - bkpb.hole_length;
+               rdata[1].buffer = InvalidBuffer;
+               rdata[1].next = NULL;
+
+               recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI, rdata);
+       }
+
+       return recptr;
+}
+
 /*
  * Check if any of the GUC parameters that are critical for hot standby
  * have changed, and update the value in pg_control file if necessary.
@@ -7645,6 +8905,7 @@ XLogReportParameters(void)
 {
        if (wal_level != ControlFile->wal_level ||
                MaxConnections != ControlFile->MaxConnections ||
+               max_worker_processes != ControlFile->max_worker_processes ||
                max_prepared_xacts != ControlFile->max_prepared_xacts ||
                max_locks_per_xact != ControlFile->max_locks_per_xact)
        {
@@ -7661,6 +8922,7 @@ XLogReportParameters(void)
                        xl_parameter_change xlrec;
 
                        xlrec.MaxConnections = MaxConnections;
+                       xlrec.max_worker_processes = max_worker_processes;
                        xlrec.max_prepared_xacts = max_prepared_xacts;
                        xlrec.max_locks_per_xact = max_locks_per_xact;
                        xlrec.wal_level = wal_level;
@@ -7674,6 +8936,7 @@ XLogReportParameters(void)
                }
 
                ControlFile->MaxConnections = MaxConnections;
+               ControlFile->max_worker_processes = max_worker_processes;
                ControlFile->max_prepared_xacts = max_prepared_xacts;
                ControlFile->max_locks_per_xact = max_locks_per_xact;
                ControlFile->wal_level = wal_level;
@@ -7714,9 +8977,9 @@ UpdateFullPageWrites(void)
         */
        if (fullPageWrites)
        {
-               LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+               WALInsertSlotAcquire(true);
                Insert->fullPageWrites = true;
-               LWLockRelease(WALInsertLock);
+               WALInsertSlotRelease();
        }
 
        /*
@@ -7732,16 +8995,62 @@ UpdateFullPageWrites(void)
                rdata.buffer = InvalidBuffer;
                rdata.next = NULL;
 
-               XLogInsert(RM_XLOG_ID, XLOG_FPW_CHANGE, &rdata);
-       }
+               XLogInsert(RM_XLOG_ID, XLOG_FPW_CHANGE, &rdata);
+       }
+
+       if (!fullPageWrites)
+       {
+               WALInsertSlotAcquire(true);
+               Insert->fullPageWrites = false;
+               WALInsertSlotRelease();
+       }
+       END_CRIT_SECTION();
+}
+
+/*
+ * Check that it's OK to switch to new timeline during recovery.
+ *
+ * 'lsn' is the address of the shutdown checkpoint record we're about to
+ * replay. (Currently, timeline can only change at a shutdown checkpoint).
+ */
+static void
+checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI, TimeLineID prevTLI)
+{
+       /* Check that the record agrees on what the current (old) timeline is */
+       if (prevTLI != ThisTimeLineID)
+               ereport(PANIC,
+                               (errmsg("unexpected previous timeline ID %u (current timeline ID %u) in checkpoint record",
+                                               prevTLI, ThisTimeLineID)));
+
+       /*
+        * The new timeline better be in the list of timelines we expect to see,
+        * according to the timeline history. It should also not decrease.
+        */
+       if (newTLI < ThisTimeLineID || !tliInHistory(newTLI, expectedTLEs))
+               ereport(PANIC,
+                (errmsg("unexpected timeline ID %u (after %u) in checkpoint record",
+                                newTLI, ThisTimeLineID)));
 
-       if (!fullPageWrites)
-       {
-               LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
-               Insert->fullPageWrites = false;
-               LWLockRelease(WALInsertLock);
-       }
-       END_CRIT_SECTION();
+       /*
+        * If we have not yet reached min recovery point, and we're about to
+        * switch to a timeline greater than the timeline of the min recovery
+        * point: trouble. After switching to the new timeline, we could not
+        * possibly visit the min recovery point on the correct timeline anymore.
+        * This can happen if there is a newer timeline in the archive that
+        * branched before the timeline the min recovery point is on, and you
+        * attempt to do PITR to the new timeline.
+        */
+       if (!XLogRecPtrIsInvalid(minRecoveryPoint) &&
+               lsn < minRecoveryPoint &&
+               newTLI > minRecoveryPointTLI)
+               ereport(PANIC,
+                               (errmsg("unexpected timeline ID %u in checkpoint record, before reaching minimum recovery point %X/%X on timeline %u",
+                                               newTLI,
+                                               (uint32) (minRecoveryPoint >> 32),
+                                               (uint32) minRecoveryPoint,
+                                               minRecoveryPointTLI)));
+
+       /* Looks good */
 }
 
 /*
@@ -7755,7 +9064,7 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
 {
        uint8           info = record->xl_info & ~XLR_INFO_MASK;
 
-       /* Backup blocks are not used in xlog records */
+       /* Backup blocks are not used by XLOG rmgr */
        Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
 
        if (info == XLOG_NEXTOID)
@@ -7791,13 +9100,14 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
                MultiXactSetNextMXact(checkPoint.nextMulti,
                                                          checkPoint.nextMultiOffset);
                SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+               SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
 
                /*
                 * If we see a shutdown checkpoint while waiting for an end-of-backup
                 * record, the backup was canceled and the end-of-backup record will
                 * never arrive.
                 */
-               if (InArchiveRecovery &&
+               if (ArchiveRecoveryRequested &&
                        !XLogRecPtrIsInvalid(ControlFile->backupStartPoint) &&
                        XLogRecPtrIsInvalid(ControlFile->backupEndPoint))
                        ereport(PANIC,
@@ -7857,19 +9167,13 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
                }
 
                /*
-                * TLI may change in a shutdown checkpoint, but it shouldn't decrease
+                * We should've already switched to the new TLI before replaying this
+                * record.
                 */
                if (checkPoint.ThisTimeLineID != ThisTimeLineID)
-               {
-                       if (checkPoint.ThisTimeLineID < ThisTimeLineID ||
-                               !list_member_int(expectedTLIs,
-                                                                (int) checkPoint.ThisTimeLineID))
-                               ereport(PANIC,
-                                               (errmsg("unexpected timeline ID %u (after %u) in checkpoint record",
-                                                               checkPoint.ThisTimeLineID, ThisTimeLineID)));
-                       /* Following WAL records should be run with new TLI */
-                       ThisTimeLineID = checkPoint.ThisTimeLineID;
-               }
+                       ereport(PANIC,
+                                       (errmsg("unexpected timeline ID %u (should be %u) in checkpoint record",
+                                                       checkPoint.ThisTimeLineID, ThisTimeLineID)));
 
                RecoveryRestartPoint(&checkPoint);
        }
@@ -7895,6 +9199,8 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
                                                                  checkPoint.oldestXid))
                        SetTransactionIdLimit(checkPoint.oldestXid,
                                                                  checkPoint.oldestXidDB);
+               MultiXactAdvanceOldest(checkPoint.oldestMulti,
+                                                          checkPoint.oldestMultiDB);
 
                /* ControlFile->checkPointCopy always tracks the latest ckpt XID */
                ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
@@ -7919,6 +9225,27 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
 
                RecoveryRestartPoint(&checkPoint);
        }
+       else if (info == XLOG_END_OF_RECOVERY)
+       {
+               xl_end_of_recovery xlrec;
+
+               memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_end_of_recovery));
+
+               /*
+                * For Hot Standby, we could treat this like a Shutdown Checkpoint,
+                * but this case is rarer and harder to test, so the benefit doesn't
+                * outweigh the potential extra cost of maintenance.
+                */
+
+               /*
+                * We should've already switched to the new TLI before replaying this
+                * record.
+                */
+               if (xlrec.ThisTimeLineID != ThisTimeLineID)
+                       ereport(PANIC,
+                                       (errmsg("unexpected timeline ID %u (should be %u) in checkpoint record",
+                                                       xlrec.ThisTimeLineID, ThisTimeLineID)));
+       }
        else if (info == XLOG_NOOP)
        {
                /* nothing to do here */
@@ -7931,13 +9258,37 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
        {
                /* nothing to do here */
        }
+       else if (info == XLOG_FPI)
+       {
+               char       *data;
+               BkpBlock        bkpb;
+
+               /*
+                * Full-page image (FPI) records contain a backup block stored "inline"
+                * in the normal data since the locking when writing hint records isn't
+                * sufficient to use the normal backup block mechanism, which assumes
+                * exclusive lock on the buffer supplied.
+                *
+                * Since the only change in these backup block are hint bits, there
+                * are no recovery conflicts generated.
+                *
+                * This also means there is no corresponding API call for this, so an
+                * smgr implementation has no need to implement anything. Which means
+                * nothing is needed in md.c etc
+                */
+               data = XLogRecGetData(record);
+               memcpy(&bkpb, data, sizeof(BkpBlock));
+               data += sizeof(BkpBlock);
+
+               RestoreBackupBlockContents(lsn, bkpb, data, false, false);
+       }
        else if (info == XLOG_BACKUP_END)
        {
                XLogRecPtr      startpoint;
 
                memcpy(&startpoint, XLogRecGetData(record), sizeof(startpoint));
 
-               if (XLByteEQ(ControlFile->backupStartPoint, startpoint))
+               if (ControlFile->backupStartPoint == startpoint)
                {
                        /*
                         * We have reached the end of base backup, the point where
@@ -7950,9 +9301,12 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
 
                        LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
 
-                       if (XLByteLT(ControlFile->minRecoveryPoint, lsn))
+                       if (ControlFile->minRecoveryPoint < lsn)
+                       {
                                ControlFile->minRecoveryPoint = lsn;
-                       MemSet(&ControlFile->backupStartPoint, 0, sizeof(XLogRecPtr));
+                               ControlFile->minRecoveryPointTLI = ThisTimeLineID;
+                       }
+                       ControlFile->backupStartPoint = InvalidXLogRecPtr;
                        ControlFile->backupEndRequired = false;
                        UpdateControlFile();
 
@@ -7968,6 +9322,7 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
 
                LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
                ControlFile->MaxConnections = xlrec.MaxConnections;
+               ControlFile->max_worker_processes = xlrec.max_worker_processes;
                ControlFile->max_prepared_xacts = xlrec.max_prepared_xacts;
                ControlFile->max_locks_per_xact = xlrec.max_locks_per_xact;
                ControlFile->wal_level = xlrec.wal_level;
@@ -7981,9 +9336,11 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
                 * decreasing max_* settings.
                 */
                minRecoveryPoint = ControlFile->minRecoveryPoint;
-               if (minRecoveryPoint != 0 && XLByteLT(minRecoveryPoint, lsn))
+               minRecoveryPointTLI = ControlFile->minRecoveryPointTLI;
+               if (minRecoveryPoint != 0 && minRecoveryPoint < lsn)
                {
                        ControlFile->minRecoveryPoint = lsn;
+                       ControlFile->minRecoveryPointTLI = ThisTimeLineID;
                }
 
                UpdateControlFile();
@@ -8008,7 +9365,7 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
                if (!fpw)
                {
                        SpinLockAcquire(&xlogctl->info_lck);
-                       if (XLByteLT(xlogctl->lastFpwDisableRecPtr, ReadRecPtr))
+                       if (xlogctl->lastFpwDisableRecPtr < ReadRecPtr)
                                xlogctl->lastFpwDisableRecPtr = ReadRecPtr;
                        SpinLockRelease(&xlogctl->info_lck);
                }
@@ -8121,7 +9478,7 @@ assign_xlog_sync_method(int new_sync_method, void *extra)
                                ereport(PANIC,
                                                (errcode_for_file_access(),
                                                 errmsg("could not fsync log segment %s: %m",
-                                                               XLogFileNameP(ThisTimeLineID, openLogSegNo))));
+                                                         XLogFileNameP(ThisTimeLineID, openLogSegNo))));
                        if (get_sync_bit(sync_method) != get_sync_bit(new_sync_method))
                                XLogFileClose();
                }
@@ -8152,8 +9509,8 @@ issue_xlog_fsync(int fd, XLogSegNo segno)
                        if (pg_fsync_writethrough(fd) != 0)
                                ereport(PANIC,
                                                (errcode_for_file_access(),
-                                                errmsg("could not fsync write-through log file %s: %m",
-                                                               XLogFileNameP(ThisTimeLineID, segno))));
+                                         errmsg("could not fsync write-through log file %s: %m",
+                                                        XLogFileNameP(ThisTimeLineID, segno))));
                        break;
 #endif
 #ifdef HAVE_FDATASYNC
@@ -8182,6 +9539,7 @@ char *
 XLogFileNameP(TimeLineID tli, XLogSegNo segno)
 {
        char       *result = palloc(MAXFNAMELEN);
+
        XLogFileName(result, tli, segno);
        return result;
 }
@@ -8204,16 +9562,21 @@ XLogFileNameP(TimeLineID tli, XLogSegNo segno)
  * non-exclusive backups active at the same time, and they don't conflict
  * with an exclusive backup either.
  *
+ * Returns the minimum WAL position that must be present to restore from this
+ * backup, and the corresponding timeline ID in *starttli_p.
+ *
  * Every successfully started non-exclusive backup must be stopped by calling
  * do_pg_stop_backup() or do_pg_abort_backup().
  */
 XLogRecPtr
-do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
+do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
+                                  char **labelfile)
 {
        bool            exclusive = (labelfile == NULL);
        bool            backup_started_in_recovery = false;
        XLogRecPtr      checkpointloc;
        XLogRecPtr      startpoint;
+       TimeLineID      starttli;
        pg_time_t       stamp_time;
        char            strfbuf[128];
        char            xlogfilename[MAXFNAMELEN];
@@ -8224,7 +9587,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
 
        backup_started_in_recovery = RecoveryInProgress();
 
-       if (!superuser() && !is_authenticated_user_replication_role())
+       if (!superuser() && !has_rolreplication(GetUserId()))
                ereport(ERROR,
                                (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                   errmsg("must be superuser or replication role to run a backup")));
@@ -8271,15 +9634,15 @@ do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
         * Note that forcePageWrites has no effect during an online backup from
         * the standby.
         *
-        * We must hold WALInsertLock to change the value of forcePageWrites, to
-        * ensure adequate interlocking against XLogInsert().
+        * We must hold all the insertion slots to change the value of
+        * forcePageWrites, to ensure adequate interlocking against XLogInsert().
         */
-       LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+       WALInsertSlotAcquire(true);
        if (exclusive)
        {
                if (XLogCtl->Insert.exclusiveBackup)
                {
-                       LWLockRelease(WALInsertLock);
+                       WALInsertSlotRelease();
                        ereport(ERROR,
                                        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                         errmsg("a backup is already in progress"),
@@ -8290,7 +9653,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
        else
                XLogCtl->Insert.nonExclusiveBackups++;
        XLogCtl->Insert.forcePageWrites = true;
-       LWLockRelease(WALInsertLock);
+       WALInsertSlotRelease();
 
        /* Ensure we release forcePageWrites if fail below */
        PG_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) BoolGetDatum(exclusive));
@@ -8355,6 +9718,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
                        LWLockAcquire(ControlFileLock, LW_SHARED);
                        checkpointloc = ControlFile->checkPoint;
                        startpoint = ControlFile->checkPointCopy.redo;
+                       starttli = ControlFile->checkPointCopy.ThisTimeLineID;
                        checkpointfpw = ControlFile->checkPointCopy.fullPageWrites;
                        LWLockRelease(ControlFileLock);
 
@@ -8373,12 +9737,12 @@ do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
                                recptr = xlogctl->lastFpwDisableRecPtr;
                                SpinLockRelease(&xlogctl->info_lck);
 
-                               if (!checkpointfpw || XLByteLE(startpoint, recptr))
+                               if (!checkpointfpw || startpoint <= recptr)
                                        ereport(ERROR,
                                                  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                                   errmsg("WAL generated with full_page_writes=off was replayed "
                                                                  "since last restartpoint"),
-                                                  errhint("This means that the backup being taken on standby "
+                                                  errhint("This means that the backup being taken on the standby "
                                                                   "is corrupt and should not be used. "
                                                                   "Enable full_page_writes and run CHECKPOINT on the master, "
                                                                   "and then try an online backup again.")));
@@ -8404,13 +9768,13 @@ do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
                         * taking a checkpoint right after another is not that expensive
                         * either because only few buffers have been dirtied yet.
                         */
-                       LWLockAcquire(WALInsertLock, LW_SHARED);
-                       if (XLByteLT(XLogCtl->Insert.lastBackupStart, startpoint))
+                       WALInsertSlotAcquire(true);
+                       if (XLogCtl->Insert.lastBackupStart < startpoint)
                        {
                                XLogCtl->Insert.lastBackupStart = startpoint;
                                gotUniqueStartpoint = true;
                        }
-                       LWLockRelease(WALInsertLock);
+                       WALInsertSlotRelease();
                } while (!gotUniqueStartpoint);
 
                XLByteToSeg(startpoint, _logSegNo);
@@ -8427,9 +9791,9 @@ do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
                                        "%Y-%m-%d %H:%M:%S %Z",
                                        pg_localtime(&stamp_time, log_timezone));
                appendStringInfo(&labelfbuf, "START WAL LOCATION: %X/%X (file %s)\n",
-                                                (uint32) (startpoint >> 32), (uint32) startpoint, xlogfilename);
+                        (uint32) (startpoint >> 32), (uint32) startpoint, xlogfilename);
                appendStringInfo(&labelfbuf, "CHECKPOINT LOCATION: %X/%X\n",
-                                                (uint32) (checkpointloc >> 32), (uint32) checkpointloc);
+                                        (uint32) (checkpointloc >> 32), (uint32) checkpointloc);
                appendStringInfo(&labelfbuf, "BACKUP METHOD: %s\n",
                                                 exclusive ? "pg_start_backup" : "streamed");
                appendStringInfo(&labelfbuf, "BACKUP FROM: %s\n",
@@ -8488,6 +9852,8 @@ do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
        /*
         * We're done.  As a convenience, return the starting WAL location.
         */
+       if (starttli_p)
+               *starttli_p = starttli;
        return startpoint;
 }
 
@@ -8498,7 +9864,7 @@ pg_start_backup_callback(int code, Datum arg)
        bool            exclusive = DatumGetBool(arg);
 
        /* Update backup counters and forcePageWrites on failure */
-       LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+       WALInsertSlotAcquire(true);
        if (exclusive)
        {
                Assert(XLogCtl->Insert.exclusiveBackup);
@@ -8515,7 +9881,7 @@ pg_start_backup_callback(int code, Datum arg)
        {
                XLogCtl->Insert.forcePageWrites = false;
        }
-       LWLockRelease(WALInsertLock);
+       WALInsertSlotRelease();
 }
 
 /*
@@ -8524,14 +9890,18 @@ pg_start_backup_callback(int code, Datum arg)
 
  * If labelfile is NULL, this stops an exclusive backup. Otherwise this stops
  * the non-exclusive backup specified by 'labelfile'.
+ *
+ * Returns the last WAL position that must be present to restore from this
+ * backup, and the corresponding timeline ID in *stoptli_p.
  */
 XLogRecPtr
-do_pg_stop_backup(char *labelfile, bool waitforarchive)
+do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
 {
        bool            exclusive = (labelfile == NULL);
        bool            backup_started_in_recovery = false;
        XLogRecPtr      startpoint;
        XLogRecPtr      stoppoint;
+       TimeLineID      stoptli;
        XLogRecData rdata;
        pg_time_t       stamp_time;
        char            strfbuf[128];
@@ -8555,7 +9925,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive)
 
        backup_started_in_recovery = RecoveryInProgress();
 
-       if (!superuser() && !is_authenticated_user_replication_role())
+       if (!superuser() && !has_rolreplication(GetUserId()))
                ereport(ERROR,
                                (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                 (errmsg("must be superuser or replication role to run a backup"))));
@@ -8582,7 +9952,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive)
        /*
         * OK to update backup counters and forcePageWrites
         */
-       LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+       WALInsertSlotAcquire(true);
        if (exclusive)
                XLogCtl->Insert.exclusiveBackup = false;
        else
@@ -8602,7 +9972,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive)
        {
                XLogCtl->Insert.forcePageWrites = false;
        }
-       LWLockRelease(WALInsertLock);
+       WALInsertSlotRelease();
 
        if (exclusive)
        {
@@ -8722,21 +10092,24 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive)
                recptr = xlogctl->lastFpwDisableRecPtr;
                SpinLockRelease(&xlogctl->info_lck);
 
-               if (XLByteLE(startpoint, recptr))
+               if (startpoint <= recptr)
                        ereport(ERROR,
                                        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                           errmsg("WAL generated with full_page_writes=off was replayed "
                                          "during online backup"),
-                                errhint("This means that the backup being taken on standby "
-                                                "is corrupt and should not be used. "
+                        errhint("This means that the backup being taken on the standby "
+                                        "is corrupt and should not be used. "
                                 "Enable full_page_writes and run CHECKPOINT on the master, "
-                                                "and then try an online backup again.")));
+                                        "and then try an online backup again.")));
 
 
                LWLockAcquire(ControlFileLock, LW_SHARED);
                stoppoint = ControlFile->minRecoveryPoint;
+               stoptli = ControlFile->minRecoveryPointTLI;
                LWLockRelease(ControlFileLock);
 
+               if (stoptli_p)
+                       *stoptli_p = stoptli;
                return stoppoint;
        }
 
@@ -8748,6 +10121,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive)
        rdata.buffer = InvalidBuffer;
        rdata.next = NULL;
        stoppoint = XLogInsert(RM_XLOG_ID, XLOG_BACKUP_END, &rdata);
+       stoptli = ThisTimeLineID;
 
        /*
         * Force a switch to a new xlog segment file, so that the backup is valid
@@ -8777,7 +10151,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive)
                                 errmsg("could not create file \"%s\": %m",
                                                histfilepath)));
        fprintf(fp, "START WAL LOCATION: %X/%X (file %s)\n",
-                       (uint32) (startpoint >> 32), (uint32) startpoint, startxlogfilename);
+               (uint32) (startpoint >> 32), (uint32) startpoint, startxlogfilename);
        fprintf(fp, "STOP WAL LOCATION: %X/%X (file %s)\n",
                        (uint32) (stoppoint >> 32), (uint32) stoppoint, stopxlogfilename);
        /* transfer remaining lines from label to history file */
@@ -8863,6 +10237,8 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive)
        /*
         * We're done.  As a convenience, return the ending WAL location.
         */
+       if (stoptli_p)
+               *stoptli_p = stoptli;
        return stoppoint;
 }
 
@@ -8881,7 +10257,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive)
 void
 do_pg_abort_backup(void)
 {
-       LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+       WALInsertSlotAcquire(true);
        Assert(XLogCtl->Insert.nonExclusiveBackups > 0);
        XLogCtl->Insert.nonExclusiveBackups--;
 
@@ -8890,69 +10266,46 @@ do_pg_abort_backup(void)
        {
                XLogCtl->Insert.forcePageWrites = false;
        }
-       LWLockRelease(WALInsertLock);
+       WALInsertSlotRelease();
 }
 
 /*
  * Get latest redo apply position.
  *
- * Optionally, returns the current recovery target timeline. Callers not
- * interested in that may pass NULL for targetTLI.
- *
  * Exported to allow WALReceiver to read the pointer directly.
  */
 XLogRecPtr
-GetXLogReplayRecPtr(TimeLineID *targetTLI)
+GetXLogReplayRecPtr(TimeLineID *replayTLI)
 {
        /* use volatile pointer to prevent code rearrangement */
        volatile XLogCtlData *xlogctl = XLogCtl;
        XLogRecPtr      recptr;
+       TimeLineID      tli;
 
        SpinLockAcquire(&xlogctl->info_lck);
-       recptr = xlogctl->recoveryLastRecPtr;
-       if (targetTLI)
-               *targetTLI = xlogctl->RecoveryTargetTLI;
+       recptr = xlogctl->lastReplayedEndRecPtr;
+       tli = xlogctl->lastReplayedTLI;
        SpinLockRelease(&xlogctl->info_lck);
 
+       if (replayTLI)
+               *replayTLI = tli;
        return recptr;
 }
 
-/*
- * Get current standby flush position, ie, the last WAL position
- * known to be fsync'd to disk in standby.
- *
- * If 'targetTLI' is not NULL, it's set to the current recovery target
- * timeline.
- */
-XLogRecPtr
-GetStandbyFlushRecPtr(TimeLineID *targetTLI)
-{
-       XLogRecPtr      receivePtr;
-       XLogRecPtr      replayPtr;
-
-       receivePtr = GetWalRcvWriteRecPtr(NULL);
-       replayPtr = GetXLogReplayRecPtr(targetTLI);
-
-       if (XLByteLT(receivePtr, replayPtr))
-               return replayPtr;
-       else
-               return receivePtr;
-}
-
 /*
  * Get latest WAL insert pointer
  */
 XLogRecPtr
 GetXLogInsertRecPtr(void)
 {
-       XLogCtlInsert *Insert = &XLogCtl->Insert;
-       XLogRecPtr      current_recptr;
+       volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+       uint64          current_bytepos;
 
-       LWLockAcquire(WALInsertLock, LW_SHARED);
-       INSERT_RECPTR(current_recptr, Insert, Insert->curridx);
-       LWLockRelease(WALInsertLock);
+       SpinLockAcquire(&Insert->insertpos_lck);
+       current_bytepos = Insert->CurrBytePos;
+       SpinLockRelease(&Insert->insertpos_lck);
 
-       return current_recptr;
+       return XLogBytePosToRecPtr(current_bytepos);
 }
 
 /*
@@ -9148,7 +10501,9 @@ CancelBackup(void)
 
 /*
  * Read the XLOG page containing RecPtr into readBuf (if not read already).
- * Returns true if the page is read successfully.
+ * Returns number of bytes read, if the page is read successfully, or -1
+ * in case of errors.  When errors occur, they are ereport'ed, but only
+ * if they have not been previously reported.
  *
  * This is responsible for restoring files from archive as needed, as well
  * as for waiting for the requested WAL record to arrive in standby mode.
@@ -9162,39 +10517,35 @@ CancelBackup(void)
  * In standby mode, if after a successful return of XLogPageRead() the
  * caller finds the record it's interested in to be broken, it should
  * ereport the error with the level determined by
- * emode_for_corrupt_record(), and then set "failedSources |= readSource"
+ * emode_for_corrupt_record(), and then set lastSourceFailed
  * and call XLogPageRead() again with the same arguments. This lets
  * XLogPageRead() to try fetching the record from another source, or to
  * sleep and retry.
  */
-static bool
-XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
-                        bool randAccess)
+static int
+XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
+                        XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI)
 {
+       XLogPageReadPrivate *private =
+       (XLogPageReadPrivate *) xlogreader->private_data;
+       int                     emode = private->emode;
        uint32          targetPageOff;
-       uint32          targetRecOff;
-       XLogSegNo       targetSegNo;
+       XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY;
 
-       XLByteToSeg(*RecPtr, targetSegNo);
-       targetPageOff = (((*RecPtr) % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
-       targetRecOff = (*RecPtr) % XLOG_BLCKSZ;
-
-       /* Fast exit if we have read the record in the current buffer already */
-       if (failedSources == 0 && targetSegNo == readSegNo &&
-               targetPageOff == readOff && targetRecOff < readLen)
-               return true;
+       XLByteToSeg(targetPagePtr, targetSegNo);
+       targetPageOff = targetPagePtr % XLogSegSize;
 
        /*
         * See if we need to switch to a new segment because the requested record
         * is not in the currently open one.
         */
-       if (readFile >= 0 && !XLByteInSeg(*RecPtr, readSegNo))
+       if (readFile >= 0 && !XLByteInSeg(targetPagePtr, readSegNo))
        {
                /*
                 * Request a restartpoint if we've replayed too much xlog since the
                 * last one.
                 */
-               if (StandbyMode && bgwriterLaunched)
+               if (StandbyModeRequested && bgwriterLaunched)
                {
                        if (XLogCheckpointNeeded(readSegNo))
                        {
@@ -9209,38 +10560,26 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
                readSource = 0;
        }
 
-       XLByteToSeg(*RecPtr, readSegNo);
+       XLByteToSeg(targetPagePtr, readSegNo);
 
 retry:
        /* See if we need to retrieve more data */
        if (readFile < 0 ||
-               (readSource == XLOG_FROM_STREAM && !XLByteLT(*RecPtr, receivedUpto)))
+               (readSource == XLOG_FROM_STREAM &&
+                receivedUpto < targetPagePtr + reqLen))
        {
-               if (StandbyMode)
+               if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen,
+                                                                                private->randAccess,
+                                                                                private->fetching_ckpt,
+                                                                                targetRecPtr))
                {
-                       if (!WaitForWALToBecomeAvailable(*RecPtr, randAccess,
-                                                                                        fetching_ckpt))
-                               goto triggered;
-               }
-               else
-               {
-                       /* In archive or crash recovery. */
-                       if (readFile < 0)
-                       {
-                               int                     sources;
-
-                               /* Reset curFileTLI if random fetch. */
-                               if (randAccess)
-                                       curFileTLI = 0;
-
-                               sources = XLOG_FROM_PG_XLOG;
-                               if (InArchiveRecovery)
-                                       sources |= XLOG_FROM_ARCHIVE;
+                       if (readFile >= 0)
+                               close(readFile);
+                       readFile = -1;
+                       readLen = 0;
+                       readSource = 0;
 
-                               readFile = XLogFileReadAnyTLI(readSegNo, emode, sources);
-                               if (readFile < 0)
-                                       return false;
-                       }
+                       return -1;
                }
        }
 
@@ -9258,75 +10597,49 @@ retry:
         */
        if (readSource == XLOG_FROM_STREAM)
        {
-               if (((*RecPtr) / XLOG_BLCKSZ) != (receivedUpto / XLOG_BLCKSZ))
-               {
+               if (((targetPagePtr) / XLOG_BLCKSZ) != (receivedUpto / XLOG_BLCKSZ))
                        readLen = XLOG_BLCKSZ;
-               }
                else
                        readLen = receivedUpto % XLogSegSize - targetPageOff;
        }
        else
                readLen = XLOG_BLCKSZ;
 
-       if (!readFileHeaderValidated && targetPageOff != 0)
-       {
-               /*
-                * Whenever switching to a new WAL segment, we read the first page of
-                * the file and validate its header, even if that's not where the
-                * target record is.  This is so that we can check the additional
-                * identification info that is present in the first page's "long"
-                * header.
-                */
-               readOff = 0;
-               if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
-               {
-                       char fname[MAXFNAMELEN];
-                       XLogFileName(fname, curFileTLI, readSegNo);
-                       ereport(emode_for_corrupt_record(emode, *RecPtr),
-                                       (errcode_for_file_access(),
-                                        errmsg("could not read from log segment %s, offset %u: %m",
-                                                       fname, readOff)));
-                       goto next_record_is_invalid;
-               }
-               if (!ValidXLogPageHeader((XLogPageHeader) readBuf, emode, true))
-                       goto next_record_is_invalid;
-       }
-
        /* Read the requested page */
        readOff = targetPageOff;
        if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
        {
-               char fname[MAXFNAMELEN];
+               char            fname[MAXFNAMELEN];
+
                XLogFileName(fname, curFileTLI, readSegNo);
-               ereport(emode_for_corrupt_record(emode, *RecPtr),
+               ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
                                (errcode_for_file_access(),
-                errmsg("could not seek in log segment %s to offset %u: %m",
-                               fname, readOff)));
+                                errmsg("could not seek in log segment %s to offset %u: %m",
+                                               fname, readOff)));
                goto next_record_is_invalid;
        }
+
        if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
        {
-               char fname[MAXFNAMELEN];
+               char            fname[MAXFNAMELEN];
+
                XLogFileName(fname, curFileTLI, readSegNo);
-               ereport(emode_for_corrupt_record(emode, *RecPtr),
+               ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
                                (errcode_for_file_access(),
-                errmsg("could not read from log segment %s, offset %u: %m",
-                               fname, readOff)));
+                                errmsg("could not read from log segment %s, offset %u: %m",
+                                               fname, readOff)));
                goto next_record_is_invalid;
        }
-       if (!ValidXLogPageHeader((XLogPageHeader) readBuf, emode, false))
-               goto next_record_is_invalid;
-
-       readFileHeaderValidated = true;
 
        Assert(targetSegNo == readSegNo);
        Assert(targetPageOff == readOff);
-       Assert(targetRecOff < readLen);
+       Assert(reqLen <= readLen);
 
-       return true;
+       *readTLI = curFileTLI;
+       return readLen;
 
 next_record_is_invalid:
-       failedSources |= readSource;
+       lastSourceFailed = true;
 
        if (readFile >= 0)
                close(readFile);
@@ -9338,23 +10651,29 @@ next_record_is_invalid:
        if (StandbyMode)
                goto retry;
        else
-               return false;
-
-triggered:
-       if (readFile >= 0)
-               close(readFile);
-       readFile = -1;
-       readLen = 0;
-       readSource = 0;
-
-       return false;
+               return -1;
 }
 
 /*
- * In standby mode, wait for the requested record to become available, either
- * via restore_command succeeding to restore the segment, or via walreceiver
- * having streamed the record (or via someone copying the segment directly to
- * pg_xlog, but that is not documented or recommended).
+ * Open the WAL segment containing WAL position 'RecPtr'.
+ *
+ * The segment can be fetched via restore_command, or via walreceiver having
+ * streamed the record, or it can already be present in pg_xlog. Checking
+ * pg_xlog is mainly for crash recovery, but it will be polled in standby mode
+ * too, in case someone copies a new segment directly to pg_xlog. That is not
+ * documented or recommended, though.
+ *
+ * If 'fetching_ckpt' is true, we're fetching a checkpoint record, and should
+ * prepare to read WAL starting from RedoStartLSN after this.
+ *
+ * 'RecPtr' might not point to the beginning of the record we're interested
+ * in, it might also point to the page or segment header. In that case,
+ * 'tliRecPtr' is the position of the WAL record we're interested in. It is
+ * used to decide which timeline to stream the requested WAL from.
+ *
+ * If the the record is not immediately available, the function returns false
+ * if we're not in standby mode. In standby mode, waits for it to become
+ * available.
  *
  * When the requested record becomes available, the function opens the file
  * containing it (if not open already), and returns true. When end of standby
@@ -9363,188 +10682,336 @@ triggered:
  */
 static bool
 WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
-                                                       bool fetching_ckpt)
+                                                       bool fetching_ckpt, XLogRecPtr tliRecPtr)
 {
        static pg_time_t last_fail_time = 0;
+       pg_time_t       now;
+
+       /*-------
+        * Standby mode is implemented by a state machine:
+        *
+        * 1. Read from archive (XLOG_FROM_ARCHIVE)
+        * 2. Read from pg_xlog (XLOG_FROM_PG_XLOG)
+        * 3. Check trigger file
+        * 4. Read from primary server via walreceiver (XLOG_FROM_STREAM)
+        * 5. Rescan timelines
+        * 6. Sleep 5 seconds, and loop back to 1.
+        *
+        * Failure to read from the current source advances the state machine to
+        * the next state. In addition, successfully reading a file from pg_xlog
+        * moves the state machine from state 2 back to state 1 (we always prefer
+        * files in the archive over files in pg_xlog).
+        *
+        * 'currentSource' indicates the current state. There are no currentSource
+        * values for "check trigger", "rescan timelines", and "sleep" states,
+        * those actions are taken when reading from the previous source fails, as
+        * part of advancing to the next state.
+        *-------
+        */
+       if (!InArchiveRecovery)
+               currentSource = XLOG_FROM_PG_XLOG;
+       else if (currentSource == 0)
+               currentSource = XLOG_FROM_ARCHIVE;
 
        for (;;)
        {
-               if (WalRcvInProgress())
-               {
-                       bool            havedata;
+               int                     oldSource = currentSource;
 
-                       /*
-                        * If we find an invalid record in the WAL streamed from master,
-                        * something is seriously wrong. There's little chance that the
-                        * problem will just go away, but PANIC is not good for
-                        * availability either, especially in hot standby mode.
-                        * Disconnect, and retry from archive/pg_xlog again. The WAL in
-                        * the archive should be identical to what was streamed, so it's
-                        * unlikely that it helps, but one can hope...
-                        */
-                       if (failedSources & XLOG_FROM_STREAM)
+               /*
+                * First check if we failed to read from the current source, and
+                * advance the state machine if so. The failure to read might've
+                * happened outside this function, e.g when a CRC check fails on a
+                * record, or within this loop.
+                */
+               if (lastSourceFailed)
+               {
+                       switch (currentSource)
                        {
-                               ShutdownWalRcv();
-                               continue;
-                       }
+                               case XLOG_FROM_ARCHIVE:
+                                       currentSource = XLOG_FROM_PG_XLOG;
+                                       break;
 
-                       /*
-                        * Walreceiver is active, so see if new data has arrived.
-                        *
-                        * We only advance XLogReceiptTime when we obtain fresh WAL from
-                        * walreceiver and observe that we had already processed
-                        * everything before the most recent "chunk" that it flushed to
-                        * disk.  In steady state where we are keeping up with the
-                        * incoming data, XLogReceiptTime will be updated on each cycle.
-                        * When we are behind, XLogReceiptTime will not advance, so the
-                        * grace time allotted to conflicting queries will decrease.
-                        */
-                       if (XLByteLT(RecPtr, receivedUpto))
-                               havedata = true;
-                       else
-                       {
-                               XLogRecPtr      latestChunkStart;
+                               case XLOG_FROM_PG_XLOG:
 
-                               receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart);
-                               if (XLByteLT(RecPtr, receivedUpto))
-                               {
-                                       havedata = true;
-                                       if (!XLByteLT(RecPtr, latestChunkStart))
+                                       /*
+                                        * Check to see if the trigger file exists. Note that we
+                                        * do this only after failure, so when you create the
+                                        * trigger file, we still finish replaying as much as we
+                                        * can from archive and pg_xlog before failover.
+                                        */
+                                       if (StandbyMode && CheckForStandbyTrigger())
                                        {
-                                               XLogReceiptTime = GetCurrentTimestamp();
-                                               SetCurrentChunkStartTime(XLogReceiptTime);
+                                               ShutdownWalRcv();
+                                               return false;
                                        }
-                               }
-                               else
-                                       havedata = false;
-                       }
-                       if (havedata)
-                       {
-                               /*
-                                * Great, streamed far enough.  Open the file if it's not open
-                                * already.  Use XLOG_FROM_STREAM so that source info is set
-                                * correctly and XLogReceiptTime isn't changed.
-                                */
-                               if (readFile < 0)
-                               {
-                                       readFile = XLogFileRead(readSegNo, PANIC,
-                                                                                       recoveryTargetTLI,
-                                                                                       XLOG_FROM_STREAM, false);
-                                       Assert(readFile >= 0);
-                               }
-                               else
-                               {
-                                       /* just make sure source info is correct... */
-                                       readSource = XLOG_FROM_STREAM;
-                                       XLogReceiptSource = XLOG_FROM_STREAM;
-                               }
-                               break;
-                       }
 
-                       /*
-                        * Data not here yet, so check for trigger then sleep for five
-                        * seconds like in the WAL file polling case below.
-                        */
-                       if (CheckForStandbyTrigger())
-                               return false;
+                                       /*
+                                        * Not in standby mode, and we've now tried the archive
+                                        * and pg_xlog.
+                                        */
+                                       if (!StandbyMode)
+                                               return false;
 
-                       /*
-                        * Wait for more WAL to arrive, or timeout to be reached
-                        */
-                       WaitLatch(&XLogCtl->recoveryWakeupLatch,
-                                         WL_LATCH_SET | WL_TIMEOUT,
-                                         5000L);
-                       ResetLatch(&XLogCtl->recoveryWakeupLatch);
+                                       /*
+                                        * If primary_conninfo is set, launch walreceiver to try
+                                        * to stream the missing WAL.
+                                        *
+                                        * If fetching_ckpt is TRUE, RecPtr points to the initial
+                                        * checkpoint location. In that case, we use RedoStartLSN
+                                        * as the streaming start position instead of RecPtr, so
+                                        * that when we later jump backwards to start redo at
+                                        * RedoStartLSN, we will have the logs streamed already.
+                                        */
+                                       if (PrimaryConnInfo)
+                                       {
+                                               XLogRecPtr      ptr;
+                                               TimeLineID      tli;
+
+                                               if (fetching_ckpt)
+                                               {
+                                                       ptr = RedoStartLSN;
+                                                       tli = ControlFile->checkPointCopy.ThisTimeLineID;
+                                               }
+                                               else
+                                               {
+                                                       ptr = tliRecPtr;
+                                                       tli = tliOfPointInHistory(tliRecPtr, expectedTLEs);
+
+                                                       if (curFileTLI > 0 && tli < curFileTLI)
+                                                               elog(ERROR, "according to history file, WAL location %X/%X belongs to timeline %u, but previous recovered WAL file came from timeline %u",
+                                                                        (uint32) (ptr >> 32), (uint32) ptr,
+                                                                        tli, curFileTLI);
+                                               }
+                                               curFileTLI = tli;
+                                               RequestXLogStreaming(tli, ptr, PrimaryConnInfo);
+                                               receivedUpto = 0;
+                                       }
+
+                                       /*
+                                        * Move to XLOG_FROM_STREAM state in either case. We'll
+                                        * get immediate failure if we didn't launch walreceiver,
+                                        * and move on to the next state.
+                                        */
+                                       currentSource = XLOG_FROM_STREAM;
+                                       break;
+
+                               case XLOG_FROM_STREAM:
+
+                                       /*
+                                        * Failure while streaming. Most likely, we got here
+                                        * because streaming replication was terminated, or
+                                        * promotion was triggered. But we also get here if we
+                                        * find an invalid record in the WAL streamed from master,
+                                        * in which case something is seriously wrong. There's
+                                        * little chance that the problem will just go away, but
+                                        * PANIC is not good for availability either, especially
+                                        * in hot standby mode. So, we treat that the same as
+                                        * disconnection, and retry from archive/pg_xlog again.
+                                        * The WAL in the archive should be identical to what was
+                                        * streamed, so it's unlikely that it helps, but one can
+                                        * hope...
+                                        */
+
+                                       /*
+                                        * Before we leave XLOG_FROM_STREAM state, make sure that
+                                        * walreceiver is not active, so that it won't overwrite
+                                        * WAL that we restore from archive.
+                                        */
+                                       if (WalRcvStreaming())
+                                               ShutdownWalRcv();
+
+                                       /*
+                                        * Before we sleep, re-scan for possible new timelines if
+                                        * we were requested to recover to the latest timeline.
+                                        */
+                                       if (recoveryTargetIsLatest)
+                                       {
+                                               if (rescanLatestTimeLine())
+                                               {
+                                                       currentSource = XLOG_FROM_ARCHIVE;
+                                                       break;
+                                               }
+                                       }
+
+                                       /*
+                                        * XLOG_FROM_STREAM is the last state in our state
+                                        * machine, so we've exhausted all the options for
+                                        * obtaining the requested WAL. We're going to loop back
+                                        * and retry from the archive, but if it hasn't been long
+                                        * since last attempt, sleep 5 seconds to avoid
+                                        * busy-waiting.
+                                        */
+                                       now = (pg_time_t) time(NULL);
+                                       if ((now - last_fail_time) < 5)
+                                       {
+                                               pg_usleep(1000000L * (5 - (now - last_fail_time)));
+                                               now = (pg_time_t) time(NULL);
+                                       }
+                                       last_fail_time = now;
+                                       currentSource = XLOG_FROM_ARCHIVE;
+                                       break;
+
+                               default:
+                                       elog(ERROR, "unexpected WAL source %d", currentSource);
+                       }
                }
-               else
+               else if (currentSource == XLOG_FROM_PG_XLOG)
                {
                        /*
-                        * WAL receiver is not active. Poll the archive.
+                        * We just successfully read a file in pg_xlog. We prefer files in
+                        * the archive over ones in pg_xlog, so try the next file again
+                        * from the archive first.
                         */
-                       int                     sources;
-                       pg_time_t       now;
+                       if (InArchiveRecovery)
+                               currentSource = XLOG_FROM_ARCHIVE;
+               }
 
-                       if (readFile >= 0)
-                       {
-                               close(readFile);
-                               readFile = -1;
-                       }
-                       /* Reset curFileTLI if random fetch. */
-                       if (randAccess)
-                               curFileTLI = 0;
+               if (currentSource != oldSource)
+                       elog(DEBUG2, "switched WAL source from %s to %s after %s",
+                                xlogSourceNames[oldSource], xlogSourceNames[currentSource],
+                                lastSourceFailed ? "failure" : "success");
 
-                       /*
-                        * Try to restore the file from archive, or read an existing file
-                        * from pg_xlog.
-                        */
-                       sources = XLOG_FROM_ARCHIVE | XLOG_FROM_PG_XLOG;
-                       if (!(sources & ~failedSources))
-                       {
-                               /*
-                                * We've exhausted all options for retrieving the file. Retry.
-                                */
-                               failedSources = 0;
+               /*
+                * We've now handled possible failure. Try to read from the chosen
+                * source.
+                */
+               lastSourceFailed = false;
 
-                               /*
-                                * Before we sleep, re-scan for possible new timelines if we
-                                * were requested to recover to the latest timeline.
-                                */
-                               if (recoveryTargetIsLatest)
+               switch (currentSource)
+               {
+                       case XLOG_FROM_ARCHIVE:
+                       case XLOG_FROM_PG_XLOG:
+                               /* Close any old file we might have open. */
+                               if (readFile >= 0)
                                {
-                                       if (rescanLatestTimeLine())
-                                               continue;
+                                       close(readFile);
+                                       readFile = -1;
                                }
+                               /* Reset curFileTLI if random fetch. */
+                               if (randAccess)
+                                       curFileTLI = 0;
 
                                /*
-                                * If it hasn't been long since last attempt, sleep to avoid
-                                * busy-waiting.
+                                * Try to restore the file from archive, or read an existing
+                                * file from pg_xlog.
                                 */
-                               now = (pg_time_t) time(NULL);
-                               if ((now - last_fail_time) < 5)
-                               {
-                                       pg_usleep(1000000L * (5 - (now - last_fail_time)));
-                                       now = (pg_time_t) time(NULL);
-                               }
-                               last_fail_time = now;
+                               readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2, currentSource);
+                               if (readFile >= 0)
+                                       return true;    /* success! */
 
                                /*
-                                * If primary_conninfo is set, launch walreceiver to try to
-                                * stream the missing WAL, before retrying to restore from
-                                * archive/pg_xlog.
-                                *
-                                * If fetching_ckpt is TRUE, RecPtr points to the initial
-                                * checkpoint location. In that case, we use RedoStartLSN as
-                                * the streaming start position instead of RecPtr, so that
-                                * when we later jump backwards to start redo at RedoStartLSN,
-                                * we will have the logs streamed already.
+                                * Nope, not found in archive or pg_xlog.
                                 */
-                               if (PrimaryConnInfo)
+                               lastSourceFailed = true;
+                               break;
+
+                       case XLOG_FROM_STREAM:
                                {
-                                       XLogRecPtr ptr = fetching_ckpt ? RedoStartLSN : RecPtr;
+                                       bool            havedata;
 
-                                       RequestXLogStreaming(ptr, PrimaryConnInfo);
-                                       continue;
-                               }
-                       }
-                       /* Don't try to read from a source that just failed */
-                       sources &= ~failedSources;
-                       readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2, sources);
-                       if (readFile >= 0)
-                               break;
+                                       /*
+                                        * Check if WAL receiver is still active.
+                                        */
+                                       if (!WalRcvStreaming())
+                                       {
+                                               lastSourceFailed = true;
+                                               break;
+                                       }
 
-                       /*
-                        * Nope, not found in archive and/or pg_xlog.
-                        */
-                       failedSources |= sources;
+                                       /*
+                                        * Walreceiver is active, so see if new data has arrived.
+                                        *
+                                        * We only advance XLogReceiptTime when we obtain fresh
+                                        * WAL from walreceiver and observe that we had already
+                                        * processed everything before the most recent "chunk"
+                                        * that it flushed to disk.  In steady state where we are
+                                        * keeping up with the incoming data, XLogReceiptTime will
+                                        * be updated on each cycle. When we are behind,
+                                        * XLogReceiptTime will not advance, so the grace time
+                                        * allotted to conflicting queries will decrease.
+                                        */
+                                       if (RecPtr < receivedUpto)
+                                               havedata = true;
+                                       else
+                                       {
+                                               XLogRecPtr      latestChunkStart;
+
+                                               receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart, &receiveTLI);
+                                               if (RecPtr < receivedUpto && receiveTLI == curFileTLI)
+                                               {
+                                                       havedata = true;
+                                                       if (latestChunkStart <= RecPtr)
+                                                       {
+                                                               XLogReceiptTime = GetCurrentTimestamp();
+                                                               SetCurrentChunkStartTime(XLogReceiptTime);
+                                                       }
+                                               }
+                                               else
+                                                       havedata = false;
+                                       }
+                                       if (havedata)
+                                       {
+                                               /*
+                                                * Great, streamed far enough.  Open the file if it's
+                                                * not open already.  Also read the timeline history
+                                                * file if we haven't initialized timeline history
+                                                * yet; it should be streamed over and present in
+                                                * pg_xlog by now.      Use XLOG_FROM_STREAM so that
+                                                * source info is set correctly and XLogReceiptTime
+                                                * isn't changed.
+                                                */
+                                               if (readFile < 0)
+                                               {
+                                                       if (!expectedTLEs)
+                                                               expectedTLEs = readTimeLineHistory(receiveTLI);
+                                                       readFile = XLogFileRead(readSegNo, PANIC,
+                                                                                                       receiveTLI,
+                                                                                                       XLOG_FROM_STREAM, false);
+                                                       Assert(readFile >= 0);
+                                               }
+                                               else
+                                               {
+                                                       /* just make sure source info is correct... */
+                                                       readSource = XLOG_FROM_STREAM;
+                                                       XLogReceiptSource = XLOG_FROM_STREAM;
+                                                       return true;
+                                               }
+                                               break;
+                                       }
 
-                       /*
-                        * Check to see if the trigger file exists. Note that we do this
-                        * only after failure, so when you create the trigger file, we
-                        * still finish replaying as much as we can from archive and
-                        * pg_xlog before failover.
-                        */
-                       if (CheckForStandbyTrigger())
-                               return false;
+                                       /*
+                                        * Data not here yet. Check for trigger, then wait for
+                                        * walreceiver to wake us up when new WAL arrives.
+                                        */
+                                       if (CheckForStandbyTrigger())
+                                       {
+                                               /*
+                                                * Note that we don't "return false" immediately here.
+                                                * After being triggered, we still want to replay all
+                                                * the WAL that was already streamed. It's in pg_xlog
+                                                * now, so we just treat this as a failure, and the
+                                                * state machine will move on to replay the streamed
+                                                * WAL from pg_xlog, and then recheck the trigger and
+                                                * exit replay.
+                                                */
+                                               lastSourceFailed = true;
+                                               break;
+                                       }
+
+                                       /*
+                                        * Wait for more WAL to arrive. Time out after 5 seconds,
+                                        * like when polling the archive, to react to a trigger
+                                        * file promptly.
+                                        */
+                                       WaitLatch(&XLogCtl->recoveryWakeupLatch,
+                                                         WL_LATCH_SET | WL_TIMEOUT,
+                                                         5000L);
+                                       ResetLatch(&XLogCtl->recoveryWakeupLatch);
+                                       break;
+                               }
+
+                       default:
+                               elog(ERROR, "unexpected WAL source %d", currentSource);
                }
 
                /*
@@ -9552,9 +11019,9 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                 * process.
                 */
                HandleStartupProcInterrupts();
-       }
+       } while (StandbyMode);
 
-       return true;
+       return false;
 }
 
 /*
@@ -9582,7 +11049,7 @@ emode_for_corrupt_record(int emode, XLogRecPtr RecPtr)
 
        if (readSource == XLOG_FROM_PG_XLOG && emode == LOG)
        {
-               if (XLByteEQ(RecPtr, lastComplaint))
+               if (RecPtr == lastComplaint)
                        emode = DEBUG1;
                else
                        lastComplaint = RecPtr;
@@ -9592,8 +11059,7 @@ emode_for_corrupt_record(int emode, XLogRecPtr RecPtr)
 
 /*
  * Check to see whether the user-specified trigger file exists and whether a
- * promote request has arrived.  If either condition holds, request postmaster
- * to shut down walreceiver, wait for it to exit, and return true.
+ * promote request has arrived.  If either condition holds, return true.
  */
 static bool
 CheckForStandbyTrigger(void)
@@ -9606,9 +11072,27 @@ CheckForStandbyTrigger(void)
 
        if (IsPromoteTriggered())
        {
-               ereport(LOG,
-                               (errmsg("received promote request")));
-               ShutdownWalRcv();
+               /*
+                * In 9.1 and 9.2 the postmaster unlinked the promote file inside the
+                * signal handler. It now leaves the file in place and lets the
+                * Startup process do the unlink. This allows Startup to know whether
+                * it should create a full checkpoint before starting up (fallback
+                * mode). Fast promotion takes precedence.
+                */
+               if (stat(PROMOTE_SIGNAL_FILE, &stat_buf) == 0)
+               {
+                       unlink(PROMOTE_SIGNAL_FILE);
+                       unlink(FALLBACK_PROMOTE_SIGNAL_FILE);
+                       fast_promote = true;
+               }
+               else if (stat(FALLBACK_PROMOTE_SIGNAL_FILE, &stat_buf) == 0)
+               {
+                       unlink(FALLBACK_PROMOTE_SIGNAL_FILE);
+                       fast_promote = false;
+               }
+
+               ereport(LOG, (errmsg("received promote request")));
+
                ResetPromoteTriggered();
                triggered = true;
                return true;
@@ -9621,9 +11105,9 @@ CheckForStandbyTrigger(void)
        {
                ereport(LOG,
                                (errmsg("trigger file found: %s", TriggerFile)));
-               ShutdownWalRcv();
                unlink(TriggerFile);
                triggered = true;
+               fast_promote = true;
                return true;
        }
        return false;
@@ -9638,15 +11122,10 @@ CheckPromoteSignal(void)
 {
        struct stat stat_buf;
 
-       if (stat(PROMOTE_SIGNAL_FILE, &stat_buf) == 0)
-       {
-               /*
-                * Since we are in a signal handler, it's not safe to elog. We
-                * silently ignore any error from unlink.
-                */
-               unlink(PROMOTE_SIGNAL_FILE);
+       if (stat(PROMOTE_SIGNAL_FILE, &stat_buf) == 0 ||
+               stat(FALLBACK_PROMOTE_SIGNAL_FILE, &stat_buf) == 0)
                return true;
-       }
+
        return false;
 }