]> granicus.if.org Git - postgresql/blobdiff - src/backend/access/transam/xlog.c
Have multixact be truncated by checkpoint, not vacuum
[postgresql] / src / backend / access / transam / xlog.c
index a02eebcb27af04176f6984803b6ff2d263ecbd80..e5640793eb8e09355b09d13cbdb68d3a9773ae6b 100644 (file)
@@ -4,7 +4,7 @@
  *             PostgreSQL transaction log manager
  *
  *
- * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * src/backend/access/transam/xlog.c
@@ -23,6 +23,7 @@
 
 #include "access/clog.h"
 #include "access/multixact.h"
+#include "access/rewriteheap.h"
 #include "access/subtrans.h"
 #include "access/timeline.h"
 #include "access/transam.h"
 #include "catalog/catversion.h"
 #include "catalog/pg_control.h"
 #include "catalog/pg_database.h"
-#include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/bgwriter.h"
 #include "postmaster/startup.h"
+#include "replication/logical.h"
+#include "replication/slot.h"
+#include "replication/snapbuild.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
+#include "storage/barrier.h"
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
+#include "storage/large_object.h"
 #include "storage/latch.h"
 #include "storage/pmsignal.h"
 #include "storage/predicate.h"
 #include "utils/timestamp.h"
 #include "pg_trace.h"
 
+extern uint32 bootstrap_data_checksum_version;
 
 /* File path names (all relative to $PGDATA) */
 #define RECOVERY_COMMAND_FILE  "recovery.conf"
 #define RECOVERY_COMMAND_DONE  "recovery.done"
-#define PROMOTE_SIGNAL_FILE "promote"
-#define FAST_PROMOTE_SIGNAL_FILE "fast_promote"
+#define PROMOTE_SIGNAL_FILE            "promote"
+#define FALLBACK_PROMOTE_SIGNAL_FILE "fallback_promote"
 
 
 /* User-settable parameters */
@@ -78,11 +84,13 @@ bool                XLogArchiveMode = false;
 char      *XLogArchiveCommand = NULL;
 bool           EnableHotStandby = false;
 bool           fullPageWrites = true;
+bool           wal_log_hints = false;
 bool           log_checkpoints = false;
 int                    sync_method = DEFAULT_SYNC_METHOD;
 int                    wal_level = WAL_LEVEL_MINIMAL;
 int                    CommitDelay = 0;        /* precommit delay in microseconds */
 int                    CommitSiblings = 5; /* # concurrent xacts needed to sleep */
+int                    num_xloginsert_locks = 8;
 
 #ifdef WAL_DEBUG
 bool           XLOG_DEBUG = false;
@@ -94,7 +102,7 @@ bool         XLOG_DEBUG = false;
  * future XLOG segment as long as there aren't already XLOGfileslop future
  * segments; else we'll delete it.  This could be made a separate GUC
  * variable, but at present I think it's sufficient to hardwire it as
- * 2*CheckPointSegments+1.     Under normal conditions, a checkpoint will free
+ * 2*CheckPointSegments+1.  Under normal conditions, a checkpoint will free
  * no more than 2*CheckPointSegments log segments, and we want to recycle all
  * of them; the +1 allows boundary cases to happen without wasting a
  * delete/create-segment cycle.
@@ -124,7 +132,7 @@ const struct config_enum_entry sync_method_options[] = {
 
 /*
  * Statistics for current checkpoint are collected in this global struct.
- * Because only the background writer or a stand-alone backend can perform
+ * Because only the checkpointer or a stand-alone backend can perform
  * checkpoints, this will be unused in normal backends.
  */
 CheckpointStatsData CheckpointStats;
@@ -183,7 +191,7 @@ static bool LocalHotStandbyActive = false;
  *             0: unconditionally not allowed to insert XLOG
  *             -1: must check RecoveryInProgress(); disallow until it is false
  * Most processes start with -1 and transition to 1 after seeing that recovery
- * is not in progress. But we can also force the value for special cases.
+ * is not in progress.  But we can also force the value for special cases.
  * The coding in XLogInsertAllowed() depends on the first two of these states
  * being numerically the same as bool true and false.
  */
@@ -200,14 +208,14 @@ static int        LocalXLogInsertAllowed = -1;
  * will switch to using offline XLOG archives as soon as we reach the end of
  * WAL in pg_xlog.
 */
-bool ArchiveRecoveryRequested = false;
-bool InArchiveRecovery = false;
+bool           ArchiveRecoveryRequested = false;
+bool           InArchiveRecovery = false;
 
 /* Was the last xlog file restored from archive, or local? */
 static bool restoredFromArchive = false;
 
 /* options taken from recovery.conf for archive recovery */
-char *recoveryRestoreCommand = NULL;
+char      *recoveryRestoreCommand = NULL;
 static char *recoveryEndCommand = NULL;
 static char *archiveCleanupCommand = NULL;
 static RecoveryTargetType recoveryTarget = RECOVERY_TARGET_UNSET;
@@ -216,19 +224,25 @@ static bool recoveryPauseAtTarget = true;
 static TransactionId recoveryTargetXid;
 static TimestampTz recoveryTargetTime;
 static char *recoveryTargetName;
+static int     recovery_min_apply_delay = 0;
+static TimestampTz recoveryDelayUntilTime;
 
 /* options taken from recovery.conf for XLOG streaming */
 static bool StandbyModeRequested = false;
 static char *PrimaryConnInfo = NULL;
+static char *PrimarySlotName = NULL;
 static char *TriggerFile = NULL;
 
 /* are we currently in standby mode? */
-bool StandbyMode = false;
+bool           StandbyMode = false;
 
 /* whether request for fast promotion has been made yet */
 static bool fast_promote = false;
 
-/* if recoveryStopsHere returns true, it saves actual stop xid/time/name here */
+/*
+ * if recoveryStopsBefore/After returns true, it saves information of the stop
+ * point here
+ */
 static TransactionId recoveryStopXid;
 static TimestampTz recoveryStopTime;
 static char recoveryStopName[MAXFNAMELEN];
@@ -248,7 +262,7 @@ static bool recoveryStopAfter;
  *
  * expectedTLEs: a list of TimeLineHistoryEntries for recoveryTargetTLI and the timelines of
  * its known parents, newest first (so recoveryTargetTLI is always the
- * first list member). Only these TLIs are expected to be seen in the WAL
+ * first list member).  Only these TLIs are expected to be seen in the WAL
  * segments we read, and indeed only these TLIs will be considered as
  * candidate WAL files to open at all.
  *
@@ -277,10 +291,10 @@ XLogRecPtr        XactLastRecEnd = InvalidXLogRecPtr;
 /*
  * RedoRecPtr is this backend's local copy of the REDO record pointer
  * (which is almost but not quite the same as a pointer to the most recent
- * CHECKPOINT record). We update this from the shared-memory copy,
+ * CHECKPOINT record).  We update this from the shared-memory copy,
  * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
- * hold the Insert lock).  See XLogInsert for details. We are also allowed
- * to update from XLogCtl->Insert.RedoRecPtr if we hold the info_lck;
+ * hold an insertion lock).  See XLogInsert for details.  We are also allowed
+ * to update from XLogCtl->RedoRecPtr if we hold the info_lck;
  * see GetRedoRecPtr.  A freshly spawned backend obtains the value during
  * InitXLOGAccess.
  */
@@ -321,7 +335,10 @@ static XLogRecPtr RedoStartLSN = InvalidXLogRecPtr;
  * so it's a plain spinlock.  The other locks are held longer (potentially
  * over I/O operations), so we use LWLocks for them.  These locks are:
  *
- * WALInsertLock: must be held to insert a record into the WAL buffers.
+ * WALBufMappingLock: must be held to replace a page in the WAL buffer cache.
+ * It is only held while initializing and changing the mapping.  If the
+ * contents of the buffer being replaced haven't been written yet, the mapping
+ * lock is released while the write is done, and reacquired afterwards.
  *
  * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
  * XLogFlush).
@@ -348,17 +365,77 @@ typedef struct XLogwrtResult
        XLogRecPtr      Flush;                  /* last byte + 1 flushed */
 } XLogwrtResult;
 
+/*
+ * Inserting to WAL is protected by a small fixed number of WAL insertion
+ * locks. To insert to the WAL, you must hold one of the locks - it doesn't
+ * matter which one. To lock out other concurrent insertions, you must hold
+ * of them. Each WAL insertion lock consists of a lightweight lock, plus an
+ * indicator of how far the insertion has progressed (insertingAt).
+ *
+ * The insertingAt values are read when a process wants to flush WAL from
+ * the in-memory buffers to disk, to check that all the insertions to the
+ * region the process is about to write out have finished. You could simply
+ * wait for all currently in-progress insertions to finish, but the
+ * insertingAt indicator allows you to ignore insertions to later in the WAL,
+ * so that you only wait for the insertions that are modifying the buffers
+ * you're about to write out.
+ *
+ * This isn't just an optimization. If all the WAL buffers are dirty, an
+ * inserter that's holding a WAL insert lock might need to evict an old WAL
+ * buffer, which requires flushing the WAL. If it's possible for an inserter
+ * to block on another inserter unnecessarily, deadlock can arise when two
+ * inserters holding a WAL insert lock wait for each other to finish their
+ * insertion.
+ *
+ * Small WAL records that don't cross a page boundary never update the value,
+ * the WAL record is just copied to the page and the lock is released. But
+ * to avoid the deadlock-scenario explained above, the indicator is always
+ * updated before sleeping while holding an insertion lock.
+ */
+typedef struct
+{
+       LWLock          lock;
+       XLogRecPtr      insertingAt;
+} WALInsertLock;
+
+/*
+ * All the WAL insertion locks are allocated as an array in shared memory. We
+ * force the array stride to be a power of 2, which saves a few cycles in
+ * indexing, but more importantly also ensures that individual slots don't
+ * cross cache line boundaries. (Of course, we have to also ensure that the
+ * array start address is suitably aligned.)
+ */
+typedef union WALInsertLockPadded
+{
+       WALInsertLock l;
+       char            pad[CACHE_LINE_SIZE];
+} WALInsertLockPadded;
+
 /*
  * Shared state data for XLogInsert.
  */
 typedef struct XLogCtlInsert
 {
-       XLogRecPtr      PrevRecord;             /* start of previously-inserted record */
-       int                     curridx;                /* current block index in cache */
-       XLogPageHeader currpage;        /* points to header of block in cache */
-       char       *currpos;            /* current insertion point in cache */
-       XLogRecPtr      RedoRecPtr;             /* current redo point for insertions */
-       bool            forcePageWrites;        /* forcing full-page writes for PITR? */
+       slock_t         insertpos_lck;  /* protects CurrBytePos and PrevBytePos */
+
+       /*
+        * CurrBytePos is the end of reserved WAL. The next record will be
+        * inserted at that position. PrevBytePos is the start position of the
+        * previously inserted (or rather, reserved) record - it is copied to the
+        * prev-link of the next record. These are stored as "usable byte
+        * positions" rather than XLogRecPtrs (see XLogBytePosToRecPtr()).
+        */
+       uint64          CurrBytePos;
+       uint64          PrevBytePos;
+
+       /*
+        * Make sure the above heavily-contended spinlock and byte positions are
+        * on their own cache line. In particular, the RedoRecPtr and full page
+        * write variables below should be on a different cache line. They are
+        * read on every WAL insertion, but updated rarely, and we don't want
+        * those reads to steal the cache line containing Curr/PrevBytePos.
+        */
+       char            pad[CACHE_LINE_SIZE];
 
        /*
         * fullPageWrites is the master copy used by all backends to determine
@@ -366,7 +443,12 @@ typedef struct XLogCtlInsert
         * This is required because, when full_page_writes is changed by SIGHUP,
         * we must WAL-log it before it actually affects WAL-logging by backends.
         * Checkpointer sets at startup or after SIGHUP.
+        *
+        * To read these fields, you must hold an insertion lock. To modify them,
+        * you must hold ALL the locks.
         */
+       XLogRecPtr      RedoRecPtr;             /* current redo point for insertions */
+       bool            forcePageWrites;        /* forcing full-page writes for PITR? */
        bool            fullPageWrites;
 
        /*
@@ -379,38 +461,39 @@ typedef struct XLogCtlInsert
        bool            exclusiveBackup;
        int                     nonExclusiveBackups;
        XLogRecPtr      lastBackupStart;
-} XLogCtlInsert;
 
-/*
- * Shared state data for XLogWrite/XLogFlush.
- */
-typedef struct XLogCtlWrite
-{
-       int                     curridx;                /* cache index of next block to write */
-       pg_time_t       lastSegSwitchTime;              /* time of last xlog segment switch */
-} XLogCtlWrite;
+       /*
+        * WAL insertion locks.
+        */
+       WALInsertLockPadded *WALInsertLocks;
+       LWLockTranche WALInsertLockTranche;
+       int                     WALInsertLockTrancheId;
+} XLogCtlInsert;
 
 /*
  * Total shared-memory state for XLOG.
  */
 typedef struct XLogCtlData
 {
-       /* Protected by WALInsertLock: */
        XLogCtlInsert Insert;
 
        /* Protected by info_lck: */
        XLogwrtRqst LogwrtRqst;
+       XLogRecPtr      RedoRecPtr;             /* a recent copy of Insert->RedoRecPtr */
        uint32          ckptXidEpoch;   /* nextXID & epoch of latest checkpoint */
        TransactionId ckptXid;
        XLogRecPtr      asyncXactLSN;   /* LSN of newest async commit/abort */
-       XLogSegNo       lastRemovedSegNo; /* latest removed/recycled XLOG segment */
+       XLogRecPtr      replicationSlotMinLSN;  /* oldest LSN needed by any slot */
+
+       XLogSegNo       lastRemovedSegNo;               /* latest removed/recycled XLOG
+                                                                                * segment */
 
-       /* Fake LSN counter, for unlogged relations. Protected by ulsn_lck */
-       XLogRecPtr  unloggedLSN;
+       /* Fake LSN counter, for unlogged relations. Protected by ulsn_lck. */
+       XLogRecPtr      unloggedLSN;
        slock_t         ulsn_lck;
 
-       /* Protected by WALWriteLock: */
-       XLogCtlWrite Write;
+       /* Time of last xlog segment switch. Protected by WALWriteLock. */
+       pg_time_t       lastSegSwitchTime;
 
        /*
         * Protected by info_lck and WALWriteLock (you must hold either lock to
@@ -418,10 +501,22 @@ typedef struct XLogCtlData
         */
        XLogwrtResult LogwrtResult;
 
+       /*
+        * Latest initialized page in the cache (last byte position + 1).
+        *
+        * To change the identity of a buffer (and InitializedUpTo), you need to
+        * hold WALBufMappingLock.  To change the identity of a buffer that's
+        * still dirty, the old page needs to be written out first, and for that
+        * you need WALWriteLock, and you need to ensure that there are no
+        * in-progress insertions to the page by calling
+        * WaitXLogInsertionsToFinish().
+        */
+       XLogRecPtr      InitializedUpTo;
+
        /*
         * These values do not change after startup, although the pointed-to pages
-        * and xlblocks values certainly do.  Permission to read/write the pages
-        * and xlblocks values depends on WALInsertLock and WALWriteLock.
+        * and xlblocks values certainly do.  xlblock values are protected by
+        * WALBufMappingLock.
         */
        char       *pages;                      /* buffers for unwritten XLOG pages */
        XLogRecPtr *xlblocks;           /* 1st byte ptr-s + XLOG_BLCKSZ */
@@ -511,31 +606,38 @@ typedef struct XLogCtlData
 
 static XLogCtlData *XLogCtl = NULL;
 
+/* a private copy of XLogCtl->Insert.WALInsertLocks, for convenience */
+static WALInsertLockPadded *WALInsertLocks = NULL;
+
 /*
  * We maintain an image of pg_control in shared memory.
  */
 static ControlFileData *ControlFile = NULL;
 
 /*
- * Macros for managing XLogInsert state.  In most cases, the calling routine
- * has local copies of XLogCtl->Insert and/or XLogCtl->Insert->curridx,
- * so these are passed as parameters instead of being fetched via XLogCtl.
+ * Calculate the amount of space left on the page after 'endptr'. Beware
+ * multiple evaluation!
  */
+#define INSERT_FREESPACE(endptr)       \
+       (((endptr) % XLOG_BLCKSZ == 0) ? 0 : (XLOG_BLCKSZ - (endptr) % XLOG_BLCKSZ))
 
-/* Free space remaining in the current xlog page buffer */
-#define INSERT_FREESPACE(Insert)  \
-       (XLOG_BLCKSZ - ((Insert)->currpos - (char *) (Insert)->currpage))
-
-/* Construct XLogRecPtr value for current insertion point */
-#define INSERT_RECPTR(recptr,Insert,curridx)  \
-               (recptr) = XLogCtl->xlblocks[curridx] - INSERT_FREESPACE(Insert)
-
-#define PrevBufIdx(idx)                \
-               (((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))
-
+/* Macro to advance to next buffer index. */
 #define NextBufIdx(idx)                \
                (((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
 
+/*
+ * XLogRecPtrToBufIdx returns the index of the WAL buffer that holds, or
+ * would hold if it was in cache, the page containing 'recptr'.
+ */
+#define XLogRecPtrToBufIdx(recptr)     \
+       (((recptr) / XLOG_BLCKSZ) % (XLogCtl->XLogCacheBlck + 1))
+
+/*
+ * These are the number of bytes in a WAL page and segment usable for WAL data.
+ */
+#define UsableBytesInPage (XLOG_BLCKSZ - SizeOfXLogShortPHD)
+#define UsableBytesInSegment ((XLOG_SEG_SIZE / XLOG_BLCKSZ) * UsableBytesInPage - (SizeOfXLogLongPHD - SizeOfXLogShortPHD))
+
 /*
  * Private, possibly out-of-date copy of shared LogwrtResult.
  * See discussion above.
@@ -548,14 +650,14 @@ static XLogwrtResult LogwrtResult = {0, 0};
  */
 typedef enum
 {
-       XLOG_FROM_ANY = 0,              /* request to read WAL from any source */
-       XLOG_FROM_ARCHIVE,              /* restored using restore_command */
-       XLOG_FROM_PG_XLOG,              /* existing file in pg_xlog */
-       XLOG_FROM_STREAM,               /* streamed from master */
+       XLOG_FROM_ANY = 0,                      /* request to read WAL from any source */
+       XLOG_FROM_ARCHIVE,                      /* restored using restore_command */
+       XLOG_FROM_PG_XLOG,                      /* existing file in pg_xlog */
+       XLOG_FROM_STREAM                        /* streamed from master */
 } XLogSource;
 
 /* human-readable names for XLogSources, for debugging output */
-static const char *xlogSourceNames[] = { "any", "archive", "pg_xlog", "stream" };
+static const char *xlogSourceNames[] = {"any", "archive", "pg_xlog", "stream"};
 
 /*
  * openLogFile is -1 or a kernel FD for an open log file segment.
@@ -589,7 +691,7 @@ static XLogSource readSource = 0;           /* XLOG_FROM_* code */
  * next.
  */
 static XLogSource currentSource = 0;   /* XLOG_FROM_* code */
-static bool    lastSourceFailed = false;
+static bool lastSourceFailed = false;
 
 typedef struct XLogPageReadPrivate
 {
@@ -607,7 +709,7 @@ typedef struct XLogPageReadPrivate
  * XLogReceiptSource tracks where we last successfully read some WAL.)
  */
 static TimestampTz XLogReceiptTime = 0;
-static XLogSource XLogReceiptSource = 0;       /* XLOG_FROM_* code */
+static XLogSource XLogReceiptSource = 0;               /* XLOG_FROM_* code */
 
 /* State information for XLOG reading */
 static XLogRecPtr ReadRecPtr;  /* start of last record read */
@@ -630,11 +732,16 @@ static bool InRedo = false;
 /* Have we launched bgwriter during recovery? */
 static bool bgwriterLaunched = false;
 
+/* For WALInsertLockAcquire/Release functions */
+static int     MyLockNo = 0;
+static bool holdingAllLocks = false;
 
 static void readRecoveryCommandFile(void);
 static void exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo);
-static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
+static bool recoveryStopsBefore(XLogRecord *record);
+static bool recoveryStopsAfter(XLogRecord *record);
 static void recoveryPausesHere(void);
+static bool recoveryApplyDelay(XLogRecord *record);
 static void SetLatestXTime(TimestampTz xtime);
 static void SetCurrentChunkStartTime(TimestampTz xtime);
 static void CheckRequiredParameterValues(void);
@@ -645,18 +752,21 @@ static void LocalSetXLogInsertAllowed(void);
 static void CreateEndOfRecoveryRecord(void);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
+static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
 
-static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
+static bool XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock,
                                XLogRecPtr *lsn, BkpBlock *bkpb);
-static bool AdvanceXLInsertBuffer(bool new_segment);
+static Buffer RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb,
+                                                char *blk, bool get_cleanup_lock, bool keep_buffer);
+static void AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic);
 static bool XLogCheckpointNeeded(XLogSegNo new_segno);
-static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
+static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
 static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
                                           bool find_free, int *max_advance,
                                           bool use_lock);
 static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
                         int source, bool notexistOk);
-static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source);
+static int     XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source);
 static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
                         int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
                         TimeLineID *readTLI);
@@ -690,6 +800,23 @@ static bool read_backup_label(XLogRecPtr *checkPointLoc,
 static void rm_redo_error_callback(void *arg);
 static int     get_sync_bit(int method);
 
+static void CopyXLogRecordToWAL(int write_len, bool isLogSwitch,
+                                       XLogRecData *rdata,
+                                       XLogRecPtr StartPos, XLogRecPtr EndPos);
+static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
+                                                 XLogRecPtr *EndPos, XLogRecPtr *PrevPtr);
+static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
+                                 XLogRecPtr *PrevPtr);
+static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
+static char *GetXLogBuffer(XLogRecPtr ptr);
+static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
+static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
+static uint64 XLogRecPtrToBytePos(XLogRecPtr ptr);
+
+static void WALInsertLockAcquire(void);
+static void WALInsertLockAcquireExclusive(void);
+static void WALInsertLockRelease(void);
+static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt);
 
 /*
  * Insert an XLOG record having the specified RMID and info bytes,
@@ -710,10 +837,6 @@ XLogRecPtr
 XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
 {
        XLogCtlInsert *Insert = &XLogCtl->Insert;
-       XLogRecPtr      RecPtr;
-       XLogRecPtr      WriteRqst;
-       uint32          freespace;
-       int                     curridx;
        XLogRecData *rdt;
        XLogRecData *rdt_lastnormal;
        Buffer          dtbuf[XLR_MAX_BKP_BLOCKS];
@@ -728,17 +851,19 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
        uint32          len,
                                write_len;
        unsigned        i;
-       bool            updrqst;
        bool            doPageWrites;
        bool            isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
+       bool            inserted;
        uint8           info_orig = info;
        static XLogRecord *rechdr;
+       XLogRecPtr      StartPos;
+       XLogRecPtr      EndPos;
 
        if (rechdr == NULL)
        {
-               rechdr = malloc(SizeOfXLogRecord);
-               if (rechdr == NULL)
-                       elog(ERROR, "out of memory");
+               static char rechdrbuf[SizeOfXLogRecord + MAXIMUM_ALIGNOF];
+
+               rechdr = (XLogRecord *) MAXALIGN(&rechdrbuf);
                MemSet(rechdr, 0, SizeOfXLogRecord);
        }
 
@@ -758,8 +883,8 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
         */
        if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
        {
-               RecPtr = SizeOfXLogLongPHD;             /* start of 1st chkpt record */
-               return RecPtr;
+               EndPos = SizeOfXLogLongPHD;             /* start of 1st chkpt record */
+               return EndPos;
        }
 
        /*
@@ -767,9 +892,9 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
         * up.
         *
         * We may have to loop back to here if a race condition is detected below.
-        * We could prevent the race by doing all this work while holding the
-        * insert lock, but it seems better to avoid doing CRC calculations while
-        * holding the lock.
+        * We could prevent the race by doing all this work while holding an
+        * insertion lock, but it seems better to avoid doing CRC calculations
+        * while holding one.
         *
         * We add entries for backup blocks to the chain, so that they don't need
         * any special treatment in the critical section where the chunks are
@@ -786,8 +911,8 @@ begin:;
        /*
         * Decide if we need to do full-page writes in this XLOG record: true if
         * full_page_writes is on or we have a PITR request for it.  Since we
-        * don't yet have the insert lock, fullPageWrites and forcePageWrites
-        * could change under us, but we'll recheck them once we have the lock.
+        * don't yet have an insertion lock, fullPageWrites and forcePageWrites
+        * could change under us, but we'll recheck them once we have a lock.
         */
        doPageWrites = Insert->fullPageWrites || Insert->forcePageWrites;
 
@@ -820,8 +945,8 @@ begin:;
                                {
                                        /* OK, put it in this slot */
                                        dtbuf[i] = rdt->buffer;
-                                       if (XLogCheckBuffer(rdt, doPageWrites,
-                                                                               &(dtbuf_lsn[i]), &(dtbuf_xlg[i])))
+                                       if (doPageWrites && XLogCheckBuffer(rdt, true,
+                                                                                  &(dtbuf_lsn[i]), &(dtbuf_xlg[i])))
                                        {
                                                dtbuf_bkp[i] = true;
                                                rdt->data = NULL;
@@ -927,25 +1052,62 @@ begin:;
                COMP_CRC32(rdata_crc, rdt->data, rdt->len);
 
        /*
-        * Construct record header (prev-link and CRC are filled in later), and
-        * make that the first chunk in the chain.
+        * Construct record header (prev-link is filled in later, after reserving
+        * the space for the record), and make that the first chunk in the chain.
+        *
+        * The CRC calculated for the header here doesn't include prev-link,
+        * because we don't know it yet. It will be added later.
         */
        rechdr->xl_xid = GetCurrentTransactionIdIfAny();
        rechdr->xl_tot_len = SizeOfXLogRecord + write_len;
        rechdr->xl_len = len;           /* doesn't include backup blocks */
        rechdr->xl_info = info;
        rechdr->xl_rmid = rmid;
+       rechdr->xl_prev = InvalidXLogRecPtr;
+       COMP_CRC32(rdata_crc, ((char *) rechdr), offsetof(XLogRecord, xl_prev));
 
        hdr_rdt.next = rdata;
        hdr_rdt.data = (char *) rechdr;
        hdr_rdt.len = SizeOfXLogRecord;
-
        write_len += SizeOfXLogRecord;
 
+       /*----------
+        *
+        * We have now done all the preparatory work we can without holding a
+        * lock or modifying shared state. From here on, inserting the new WAL
+        * record to the shared WAL buffer cache is a two-step process:
+        *
+        * 1. Reserve the right amount of space from the WAL. The current head of
+        *        reserved space is kept in Insert->CurrBytePos, and is protected by
+        *        insertpos_lck.
+        *
+        * 2. Copy the record to the reserved WAL space. This involves finding the
+        *        correct WAL buffer containing the reserved space, and copying the
+        *        record in place. This can be done concurrently in multiple processes.
+        *
+        * To keep track of which insertions are still in-progress, each concurrent
+        * inserter acquires an insertion lock. In addition to just indicating that
+        * an insertion is in progress, the lock tells others how far the inserter
+        * has progressed. There is a small fixed number of insertion locks,
+        * determined by the num_xloginsert_locks GUC. When an inserter crosses a
+        * page boundary, it updates the value stored in the lock to the how far it
+        * has inserted, to allow the previous buffer to be flushed.
+        *
+        * Holding onto an insertion lock also protects RedoRecPtr and
+        * fullPageWrites from changing until the insertion is finished.
+        *
+        * Step 2 can usually be done completely in parallel. If the required WAL
+        * page is not initialized yet, you have to grab WALBufMappingLock to
+        * initialize it, but the WAL writer tries to do that ahead of insertions
+        * to avoid that from happening in the critical path.
+        *
+        *----------
+        */
        START_CRIT_SECTION();
-
-       /* Now wait to get insert lock */
-       LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+       if (isLogSwitch)
+               WALInsertLockAcquireExclusive();
+       else
+               WALInsertLockAcquire();
 
        /*
         * Check to see if my RedoRecPtr is out of date.  If so, may have to go
@@ -974,7 +1136,7 @@ begin:;
                                         * Oops, this buffer now needs to be backed up, but we
                                         * didn't think so above.  Start over.
                                         */
-                                       LWLockRelease(WALInsertLock);
+                                       WALInsertLockRelease();
                                        END_CRIT_SECTION();
                                        rdt_lastnormal->next = NULL;
                                        info = info_orig;
@@ -993,7 +1155,7 @@ begin:;
        if ((Insert->fullPageWrites || Insert->forcePageWrites) && !doPageWrites)
        {
                /* Oops, must redo it with full-page data. */
-               LWLockRelease(WALInsertLock);
+               WALInsertLockRelease();
                END_CRIT_SECTION();
                rdt_lastnormal->next = NULL;
                info = info_orig;
@@ -1001,61 +1163,96 @@ begin:;
        }
 
        /*
-        * If the current page is completely full, the record goes to the next
-        * page, right after the page header.
+        * Reserve space for the record in the WAL. This also sets the xl_prev
+        * pointer.
         */
-       updrqst = false;
-       freespace = INSERT_FREESPACE(Insert);
-       if (freespace == 0)
+       if (isLogSwitch)
+               inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev);
+       else
        {
-               updrqst = AdvanceXLInsertBuffer(false);
-               freespace = INSERT_FREESPACE(Insert);
+               ReserveXLogInsertLocation(write_len, &StartPos, &EndPos,
+                                                                 &rechdr->xl_prev);
+               inserted = true;
        }
 
-       /* Compute record's XLOG location */
-       curridx = Insert->curridx;
-       INSERT_RECPTR(RecPtr, Insert, curridx);
+       if (inserted)
+       {
+               /*
+                * Now that xl_prev has been filled in, finish CRC calculation of the
+                * record header.
+                */
+               COMP_CRC32(rdata_crc, ((char *) &rechdr->xl_prev), sizeof(XLogRecPtr));
+               FIN_CRC32(rdata_crc);
+               rechdr->xl_crc = rdata_crc;
+
+               /*
+                * All the record data, including the header, is now ready to be
+                * inserted. Copy the record in the space reserved.
+                */
+               CopyXLogRecordToWAL(write_len, isLogSwitch, &hdr_rdt, StartPos, EndPos);
+       }
+       else
+       {
+               /*
+                * This was an xlog-switch record, but the current insert location was
+                * already exactly at the beginning of a segment, so there was no need
+                * to do anything.
+                */
+       }
 
        /*
-        * If the record is an XLOG_SWITCH, and we are exactly at the start of a
-        * segment, we need not insert it (and don't want to because we'd like
-        * consecutive switch requests to be no-ops).  Instead, make sure
-        * everything is written and flushed through the end of the prior segment,
-        * and return the prior segment's end address.
+        * Done! Let others know that we're finished.
         */
-       if (isLogSwitch && (RecPtr % XLogSegSize) == SizeOfXLogLongPHD)
-       {
-               /* We can release insert lock immediately */
-               LWLockRelease(WALInsertLock);
+       WALInsertLockRelease();
 
-               RecPtr -= SizeOfXLogLongPHD;
+       MarkCurrentTransactionIdLoggedIfAny();
 
-               LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
-               LogwrtResult = XLogCtl->LogwrtResult;
-               if (LogwrtResult.Flush < RecPtr)
-               {
-                       XLogwrtRqst FlushRqst;
-
-                       FlushRqst.Write = RecPtr;
-                       FlushRqst.Flush = RecPtr;
-                       XLogWrite(FlushRqst, false, false);
-               }
-               LWLockRelease(WALWriteLock);
+       END_CRIT_SECTION();
 
-               END_CRIT_SECTION();
+       /*
+        * Update shared LogwrtRqst.Write, if we crossed page boundary.
+        */
+       if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile XLogCtlData *xlogctl = XLogCtl;
 
-               /* wake up walsenders now that we've released heavily contended locks */
-               WalSndWakeupProcessRequests();
-               return RecPtr;
+               SpinLockAcquire(&xlogctl->info_lck);
+               /* advance global request to include new block(s) */
+               if (xlogctl->LogwrtRqst.Write < EndPos)
+                       xlogctl->LogwrtRqst.Write = EndPos;
+               /* update local result copy while I have the chance */
+               LogwrtResult = xlogctl->LogwrtResult;
+               SpinLockRelease(&xlogctl->info_lck);
        }
 
-       /* Finish the record header */
-       rechdr->xl_prev = Insert->PrevRecord;
+       /*
+        * If this was an XLOG_SWITCH record, flush the record and the empty
+        * padding space that fills the rest of the segment, and perform
+        * end-of-segment actions (eg, notifying archiver).
+        */
+       if (isLogSwitch)
+       {
+               TRACE_POSTGRESQL_XLOG_SWITCH();
+               XLogFlush(EndPos);
 
-       /* Now we can finish computing the record's CRC */
-       COMP_CRC32(rdata_crc, (char *) rechdr, offsetof(XLogRecord, xl_crc));
-       FIN_CRC32(rdata_crc);
-       rechdr->xl_crc = rdata_crc;
+               /*
+                * Even though we reserved the rest of the segment for us, which is
+                * reflected in EndPos, we return a pointer to just the end of the
+                * xlog-switch record.
+                */
+               if (inserted)
+               {
+                       EndPos = StartPos + SizeOfXLogRecord;
+                       if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
+                       {
+                               if (EndPos % XLOG_SEG_SIZE == EndPos % XLOG_BLCKSZ)
+                                       EndPos += SizeOfXLogLongPHD;
+                               else
+                                       EndPos += SizeOfXLogShortPHD;
+                       }
+               }
+       }
 
 #ifdef WAL_DEBUG
        if (XLOG_DEBUG)
@@ -1064,175 +1261,758 @@ begin:;
 
                initStringInfo(&buf);
                appendStringInfo(&buf, "INSERT @ %X/%X: ",
-                                                (uint32) (RecPtr >> 32), (uint32) RecPtr);
+                                                (uint32) (EndPos >> 32), (uint32) EndPos);
                xlog_outrec(&buf, rechdr);
                if (rdata->data != NULL)
                {
-                       appendStringInfo(&buf, " - ");
-                       RmgrTable[rechdr->xl_rmid].rm_desc(&buf, rechdr->xl_info, rdata->data);
+                       StringInfoData recordbuf;
+
+                       /*
+                        * We have to piece together the WAL record data from the
+                        * XLogRecData entries, so that we can pass it to the rm_desc
+                        * function as one contiguous chunk. (but we can leave out any
+                        * extra entries we created for backup blocks)
+                        */
+                       rdt_lastnormal->next = NULL;
+
+                       initStringInfo(&recordbuf);
+                       appendBinaryStringInfo(&recordbuf, (char *) rechdr, sizeof(XLogRecord));
+                       for (; rdata != NULL; rdata = rdata->next)
+                               appendBinaryStringInfo(&recordbuf, rdata->data, rdata->len);
+
+                       appendStringInfoString(&buf, " - ");
+                       RmgrTable[rechdr->xl_rmid].rm_desc(&buf, (XLogRecord *) recordbuf.data);
+                       pfree(recordbuf.data);
                }
                elog(LOG, "%s", buf.data);
                pfree(buf.data);
        }
 #endif
 
-       /* Record begin of record in appropriate places */
-       ProcLastRecPtr = RecPtr;
-       Insert->PrevRecord = RecPtr;
+       /*
+        * Update our global variables
+        */
+       ProcLastRecPtr = StartPos;
+       XactLastRecEnd = EndPos;
+
+       return EndPos;
+}
+
+/*
+ * Reserves the right amount of space for a record of given size from the WAL.
+ * *StartPos is set to the beginning of the reserved section, *EndPos to
+ * its end+1. *PrevPtr is set to the beginning of the previous record; it is
+ * used to set the xl_prev of this record.
+ *
+ * This is the performance critical part of XLogInsert that must be serialized
+ * across backends. The rest can happen mostly in parallel. Try to keep this
+ * section as short as possible, insertpos_lck can be heavily contended on a
+ * busy system.
+ *
+ * NB: The space calculation here must match the code in CopyXLogRecordToWAL,
+ * where we actually copy the record to the reserved space.
+ */
+static void
+ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos,
+                                                 XLogRecPtr *PrevPtr)
+{
+       volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+       uint64          startbytepos;
+       uint64          endbytepos;
+       uint64          prevbytepos;
+
+       size = MAXALIGN(size);
+
+       /* All (non xlog-switch) records should contain data. */
+       Assert(size > SizeOfXLogRecord);
 
        /*
-        * Append the data, including backup blocks if any
+        * The duration the spinlock needs to be held is minimized by minimizing
+        * the calculations that have to be done while holding the lock. The
+        * current tip of reserved WAL is kept in CurrBytePos, as a byte position
+        * that only counts "usable" bytes in WAL, that is, it excludes all WAL
+        * page headers. The mapping between "usable" byte positions and physical
+        * positions (XLogRecPtrs) can be done outside the locked region, and
+        * because the usable byte position doesn't include any headers, reserving
+        * X bytes from WAL is almost as simple as "CurrBytePos += X".
         */
-       rdata = &hdr_rdt;
-       while (write_len)
+       SpinLockAcquire(&Insert->insertpos_lck);
+
+       startbytepos = Insert->CurrBytePos;
+       endbytepos = startbytepos + size;
+       prevbytepos = Insert->PrevBytePos;
+       Insert->CurrBytePos = endbytepos;
+       Insert->PrevBytePos = startbytepos;
+
+       SpinLockRelease(&Insert->insertpos_lck);
+
+       *StartPos = XLogBytePosToRecPtr(startbytepos);
+       *EndPos = XLogBytePosToEndRecPtr(endbytepos);
+       *PrevPtr = XLogBytePosToRecPtr(prevbytepos);
+
+       /*
+        * Check that the conversions between "usable byte positions" and
+        * XLogRecPtrs work consistently in both directions.
+        */
+       Assert(XLogRecPtrToBytePos(*StartPos) == startbytepos);
+       Assert(XLogRecPtrToBytePos(*EndPos) == endbytepos);
+       Assert(XLogRecPtrToBytePos(*PrevPtr) == prevbytepos);
+}
+
+/*
+ * Like ReserveXLogInsertLocation(), but for an xlog-switch record.
+ *
+ * A log-switch record is handled slightly differently. The rest of the
+ * segment will be reserved for this insertion, as indicated by the returned
+ * *EndPos value. However, if we are already at the beginning of the current
+ * segment, *StartPos and *EndPos are set to the current location without
+ * reserving any space, and the function returns false.
+*/
+static bool
+ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
+{
+       volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+       uint64          startbytepos;
+       uint64          endbytepos;
+       uint64          prevbytepos;
+       uint32          size = SizeOfXLogRecord;
+       XLogRecPtr      ptr;
+       uint32          segleft;
+
+       /*
+        * These calculations are a bit heavy-weight to be done while holding a
+        * spinlock, but since we're holding all the WAL insertion locks, there
+        * are no other inserters competing for it. GetXLogInsertRecPtr() does
+        * compete for it, but that's not called very frequently.
+        */
+       SpinLockAcquire(&Insert->insertpos_lck);
+
+       startbytepos = Insert->CurrBytePos;
+
+       ptr = XLogBytePosToEndRecPtr(startbytepos);
+       if (ptr % XLOG_SEG_SIZE == 0)
+       {
+               SpinLockRelease(&Insert->insertpos_lck);
+               *EndPos = *StartPos = ptr;
+               return false;
+       }
+
+       endbytepos = startbytepos + size;
+       prevbytepos = Insert->PrevBytePos;
+
+       *StartPos = XLogBytePosToRecPtr(startbytepos);
+       *EndPos = XLogBytePosToEndRecPtr(endbytepos);
+
+       segleft = XLOG_SEG_SIZE - ((*EndPos) % XLOG_SEG_SIZE);
+       if (segleft != XLOG_SEG_SIZE)
+       {
+               /* consume the rest of the segment */
+               *EndPos += segleft;
+               endbytepos = XLogRecPtrToBytePos(*EndPos);
+       }
+       Insert->CurrBytePos = endbytepos;
+       Insert->PrevBytePos = startbytepos;
+
+       SpinLockRelease(&Insert->insertpos_lck);
+
+       *PrevPtr = XLogBytePosToRecPtr(prevbytepos);
+
+       Assert((*EndPos) % XLOG_SEG_SIZE == 0);
+       Assert(XLogRecPtrToBytePos(*EndPos) == endbytepos);
+       Assert(XLogRecPtrToBytePos(*StartPos) == startbytepos);
+       Assert(XLogRecPtrToBytePos(*PrevPtr) == prevbytepos);
+
+       return true;
+}
+
+/*
+ * Subroutine of XLogInsert.  Copies a WAL record to an already-reserved
+ * area in the WAL.
+ */
+static void
+CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
+                                       XLogRecPtr StartPos, XLogRecPtr EndPos)
+{
+       char       *currpos;
+       int                     freespace;
+       int                     written;
+       XLogRecPtr      CurrPos;
+       XLogPageHeader pagehdr;
+
+       /* The first chunk is the record header */
+       Assert(rdata->len == SizeOfXLogRecord);
+
+       /*
+        * Get a pointer to the right place in the right WAL buffer to start
+        * inserting to.
+        */
+       CurrPos = StartPos;
+       currpos = GetXLogBuffer(CurrPos);
+       freespace = INSERT_FREESPACE(CurrPos);
+
+       /*
+        * there should be enough space for at least the first field (xl_tot_len)
+        * on this page.
+        */
+       Assert(freespace >= sizeof(uint32));
+
+       /* Copy record data */
+       written = 0;
+       while (rdata != NULL)
        {
-               while (rdata->data == NULL)
-                       rdata = rdata->next;
+               char       *rdata_data = rdata->data;
+               int                     rdata_len = rdata->len;
 
-               if (freespace > 0)
+               while (rdata_len > freespace)
                {
-                       if (rdata->len > freespace)
+                       /*
+                        * Write what fits on this page, and continue on the next page.
+                        */
+                       Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || freespace == 0);
+                       memcpy(currpos, rdata_data, freespace);
+                       rdata_data += freespace;
+                       rdata_len -= freespace;
+                       written += freespace;
+                       CurrPos += freespace;
+
+                       /*
+                        * Get pointer to beginning of next page, and set the xlp_rem_len
+                        * in the page header. Set XLP_FIRST_IS_CONTRECORD.
+                        *
+                        * It's safe to set the contrecord flag and xlp_rem_len without a
+                        * lock on the page. All the other flags were already set when the
+                        * page was initialized, in AdvanceXLInsertBuffer, and we're the
+                        * only backend that needs to set the contrecord flag.
+                        */
+                       currpos = GetXLogBuffer(CurrPos);
+                       pagehdr = (XLogPageHeader) currpos;
+                       pagehdr->xlp_rem_len = write_len - written;
+                       pagehdr->xlp_info |= XLP_FIRST_IS_CONTRECORD;
+
+                       /* skip over the page header */
+                       if (CurrPos % XLogSegSize == 0)
                        {
-                               memcpy(Insert->currpos, rdata->data, freespace);
-                               rdata->data += freespace;
-                               rdata->len -= freespace;
-                               write_len -= freespace;
+                               CurrPos += SizeOfXLogLongPHD;
+                               currpos += SizeOfXLogLongPHD;
                        }
                        else
                        {
-                               memcpy(Insert->currpos, rdata->data, rdata->len);
-                               freespace -= rdata->len;
-                               write_len -= rdata->len;
-                               Insert->currpos += rdata->len;
-                               rdata = rdata->next;
-                               continue;
+                               CurrPos += SizeOfXLogShortPHD;
+                               currpos += SizeOfXLogShortPHD;
                        }
+                       freespace = INSERT_FREESPACE(CurrPos);
                }
 
-               /* Use next buffer */
-               updrqst = AdvanceXLInsertBuffer(false);
-               curridx = Insert->curridx;
-               /* Mark page header to indicate this record continues on the page */
-               Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
-               Insert->currpage->xlp_rem_len = write_len;
-               freespace = INSERT_FREESPACE(Insert);
+               Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || rdata_len == 0);
+               memcpy(currpos, rdata_data, rdata_len);
+               currpos += rdata_len;
+               CurrPos += rdata_len;
+               freespace -= rdata_len;
+               written += rdata_len;
+
+               rdata = rdata->next;
+       }
+       Assert(written == write_len);
+
+       /* Align the end position, so that the next record starts aligned */
+       CurrPos = MAXALIGN64(CurrPos);
+
+       /*
+        * If this was an xlog-switch, it's not enough to write the switch record,
+        * we also have to consume all the remaining space in the WAL segment. We
+        * have already reserved it for us, but we still need to make sure it's
+        * allocated and zeroed in the WAL buffers so that when the caller (or
+        * someone else) does XLogWrite(), it can really write out all the zeros.
+        */
+       if (isLogSwitch && CurrPos % XLOG_SEG_SIZE != 0)
+       {
+               /* An xlog-switch record doesn't contain any data besides the header */
+               Assert(write_len == SizeOfXLogRecord);
+
+               /*
+                * We do this one page at a time, to make sure we don't deadlock
+                * against ourselves if wal_buffers < XLOG_SEG_SIZE.
+                */
+               Assert(EndPos % XLogSegSize == 0);
+
+               /* Use up all the remaining space on the first page */
+               CurrPos += freespace;
+
+               while (CurrPos < EndPos)
+               {
+                       /* initialize the next page (if not initialized already) */
+                       WALInsertLockUpdateInsertingAt(CurrPos);
+                       AdvanceXLInsertBuffer(CurrPos, false);
+                       CurrPos += XLOG_BLCKSZ;
+               }
        }
 
-       /* Ensure next record will be properly aligned */
-       Insert->currpos = (char *) Insert->currpage +
-               MAXALIGN(Insert->currpos - (char *) Insert->currpage);
-       freespace = INSERT_FREESPACE(Insert);
+       if (CurrPos != EndPos)
+               elog(PANIC, "space reserved for WAL record does not match what was written");
+}
+
+/*
+ * Acquire a WAL insertion lock, for inserting to WAL.
+ */
+static void
+WALInsertLockAcquire(void)
+{
+       bool            immed;
 
        /*
-        * The recptr I return is the beginning of the *next* record. This will be
-        * stored as LSN for changed data pages...
+        * It doesn't matter which of the WAL insertion locks we acquire, so try
+        * the one we used last time.  If the system isn't particularly busy, it's
+        * a good bet that it's still available, and it's good to have some
+        * affinity to a particular lock so that you don't unnecessarily bounce
+        * cache lines between processes when there's no contention.
+        *
+        * If this is the first time through in this backend, pick a lock
+        * (semi-)randomly.  This allows the locks to be used evenly if you have a
+        * lot of very short connections.
         */
-       INSERT_RECPTR(RecPtr, Insert, curridx);
+       static int      lockToTry = -1;
+
+       if (lockToTry == -1)
+               lockToTry = MyProc->pgprocno % num_xloginsert_locks;
+       MyLockNo = lockToTry;
 
        /*
-        * If the record is an XLOG_SWITCH, we must now write and flush all the
-        * existing data, and then forcibly advance to the start of the next
-        * segment.  It's not good to do this I/O while holding the insert lock,
-        * but there seems too much risk of confusion if we try to release the
-        * lock sooner.  Fortunately xlog switch needn't be a high-performance
-        * operation anyway...
+        * The insertingAt value is initially set to 0, as we don't know our
+        * insert location yet.
         */
-       if (isLogSwitch)
+       immed = LWLockAcquireWithVar(&WALInsertLocks[MyLockNo].l.lock,
+                                                                &WALInsertLocks[MyLockNo].l.insertingAt,
+                                                                0);
+       if (!immed)
        {
-               XLogwrtRqst FlushRqst;
-               XLogRecPtr      OldSegEnd;
+               /*
+                * If we couldn't get the lock immediately, try another lock next
+                * time.  On a system with more insertion locks than concurrent
+                * inserters, this causes all the inserters to eventually migrate to a
+                * lock that no-one else is using.  On a system with more inserters
+                * than locks, it still helps to distribute the inserters evenly
+                * across the locks.
+                */
+               lockToTry = (lockToTry + 1) % num_xloginsert_locks;
+       }
+}
 
-               TRACE_POSTGRESQL_XLOG_SWITCH();
+/*
+ * Acquire all WAL insertion locks, to prevent other backends from inserting
+ * to WAL.
+ */
+static void
+WALInsertLockAcquireExclusive(void)
+{
+       int                     i;
+
+       /*
+        * When holding all the locks, we only update the last lock's insertingAt
+        * indicator.  The others are set to 0xFFFFFFFFFFFFFFFF, which is higher
+        * than any real XLogRecPtr value, to make sure that no-one blocks waiting
+        * on those.
+        */
+       for (i = 0; i < num_xloginsert_locks - 1; i++)
+       {
+               LWLockAcquireWithVar(&WALInsertLocks[i].l.lock,
+                                                        &WALInsertLocks[i].l.insertingAt,
+                                                        UINT64CONST(0xFFFFFFFFFFFFFFFF));
+       }
+       LWLockAcquireWithVar(&WALInsertLocks[i].l.lock,
+                                                &WALInsertLocks[i].l.insertingAt,
+                                                0);
 
-               LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
+       holdingAllLocks = true;
+}
+
+/*
+ * Release our insertion lock (or locks, if we're holding them all).
+ */
+static void
+WALInsertLockRelease(void)
+{
+       if (holdingAllLocks)
+       {
+               int                     i;
+
+               for (i = 0; i < num_xloginsert_locks; i++)
+                       LWLockRelease(&WALInsertLocks[i].l.lock);
+
+               holdingAllLocks = false;
+       }
+       else
+       {
+               LWLockRelease(&WALInsertLocks[MyLockNo].l.lock);
+       }
+}
 
+/*
+ * Update our insertingAt value, to let others know that we've finished
+ * inserting up to that point.
+ */
+static void
+WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt)
+{
+       if (holdingAllLocks)
+       {
                /*
-                * Flush through the end of the page containing XLOG_SWITCH, and
-                * perform end-of-segment actions (eg, notifying archiver).
+                * We use the last lock to mark our actual position, see comments in
+                * WALInsertLockAcquireExclusive.
                 */
-               WriteRqst = XLogCtl->xlblocks[curridx];
-               FlushRqst.Write = WriteRqst;
-               FlushRqst.Flush = WriteRqst;
-               XLogWrite(FlushRqst, false, true);
+               LWLockUpdateVar(&WALInsertLocks[num_xloginsert_locks - 1].l.lock,
+                                        &WALInsertLocks[num_xloginsert_locks - 1].l.insertingAt,
+                                               insertingAt);
+       }
+       else
+               LWLockUpdateVar(&WALInsertLocks[MyLockNo].l.lock,
+                                               &WALInsertLocks[MyLockNo].l.insertingAt,
+                                               insertingAt);
+}
+
+/*
+ * Wait for any WAL insertions < upto to finish.
+ *
+ * Returns the location of the oldest insertion that is still in-progress.
+ * Any WAL prior to that point has been fully copied into WAL buffers, and
+ * can be flushed out to disk. Because this waits for any insertions older
+ * than 'upto' to finish, the return value is always >= 'upto'.
+ *
+ * Note: When you are about to write out WAL, you must call this function
+ * *before* acquiring WALWriteLock, to avoid deadlocks. This function might
+ * need to wait for an insertion to finish (or at least advance to next
+ * uninitialized page), and the inserter might need to evict an old WAL buffer
+ * to make room for a new one, which in turn requires WALWriteLock.
+ */
+static XLogRecPtr
+WaitXLogInsertionsToFinish(XLogRecPtr upto)
+{
+       uint64          bytepos;
+       XLogRecPtr      reservedUpto;
+       XLogRecPtr      finishedUpto;
+       volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+       int                     i;
+
+       if (MyProc == NULL)
+               elog(PANIC, "cannot wait without a PGPROC structure");
+
+       /* Read the current insert position */
+       SpinLockAcquire(&Insert->insertpos_lck);
+       bytepos = Insert->CurrBytePos;
+       SpinLockRelease(&Insert->insertpos_lck);
+       reservedUpto = XLogBytePosToEndRecPtr(bytepos);
+
+       /*
+        * No-one should request to flush a piece of WAL that hasn't even been
+        * reserved yet. However, it can happen if there is a block with a bogus
+        * LSN on disk, for example. XLogFlush checks for that situation and
+        * complains, but only after the flush. Here we just assume that to mean
+        * that all WAL that has been reserved needs to be finished. In this
+        * corner-case, the return value can be smaller than 'upto' argument.
+        */
+       if (upto > reservedUpto)
+       {
+               elog(LOG, "request to flush past end of generated WAL; request %X/%X, currpos %X/%X",
+                        (uint32) (upto >> 32), (uint32) upto,
+                        (uint32) (reservedUpto >> 32), (uint32) reservedUpto);
+               upto = reservedUpto;
+       }
+
+       /*
+        * Loop through all the locks, sleeping on any in-progress insert older
+        * than 'upto'.
+        *
+        * finishedUpto is our return value, indicating the point upto which all
+        * the WAL insertions have been finished. Initialize it to the head of
+        * reserved WAL, and as we iterate through the insertion locks, back it
+        * out for any insertion that's still in progress.
+        */
+       finishedUpto = reservedUpto;
+       for (i = 0; i < num_xloginsert_locks; i++)
+       {
+               XLogRecPtr      insertingat = InvalidXLogRecPtr;
+
+               do
+               {
+                       /*
+                        * See if this insertion is in progress. LWLockWait will wait for
+                        * the lock to be released, or for the 'value' to be set by a
+                        * LWLockUpdateVar call.  When a lock is initially acquired, its
+                        * value is 0 (InvalidXLogRecPtr), which means that we don't know
+                        * where it's inserting yet.  We will have to wait for it.  If
+                        * it's a small insertion, the record will most likely fit on the
+                        * same page and the inserter will release the lock without ever
+                        * calling LWLockUpdateVar.  But if it has to sleep, it will
+                        * advertise the insertion point with LWLockUpdateVar before
+                        * sleeping.
+                        */
+                       if (LWLockWaitForVar(&WALInsertLocks[i].l.lock,
+                                                                &WALInsertLocks[i].l.insertingAt,
+                                                                insertingat, &insertingat))
+                       {
+                               /* the lock was free, so no insertion in progress */
+                               insertingat = InvalidXLogRecPtr;
+                               break;
+                       }
+
+                       /*
+                        * This insertion is still in progress. Have to wait, unless the
+                        * inserter has proceeded past 'upto'.
+                        */
+               } while (insertingat < upto);
+
+               if (insertingat != InvalidXLogRecPtr && insertingat < finishedUpto)
+                       finishedUpto = insertingat;
+       }
+       return finishedUpto;
+}
+
+/*
+ * Get a pointer to the right location in the WAL buffer containing the
+ * given XLogRecPtr.
+ *
+ * If the page is not initialized yet, it is initialized. That might require
+ * evicting an old dirty buffer from the buffer cache, which means I/O.
+ *
+ * The caller must ensure that the page containing the requested location
+ * isn't evicted yet, and won't be evicted. The way to ensure that is to
+ * hold onto a WAL insertion lock with the insertingAt position set to
+ * something <= ptr. GetXLogBuffer() will update insertingAt if it needs
+ * to evict an old page from the buffer. (This means that once you call
+ * GetXLogBuffer() with a given 'ptr', you must not access anything before
+ * that point anymore, and must not call GetXLogBuffer() with an older 'ptr'
+ * later, because older buffers might be recycled already)
+ */
+static char *
+GetXLogBuffer(XLogRecPtr ptr)
+{
+       int                     idx;
+       XLogRecPtr      endptr;
+       static uint64 cachedPage = 0;
+       static char *cachedPos = NULL;
+       XLogRecPtr      expectedEndPtr;
+
+       /*
+        * Fast path for the common case that we need to access again the same
+        * page as last time.
+        */
+       if (ptr / XLOG_BLCKSZ == cachedPage)
+       {
+               Assert(((XLogPageHeader) cachedPos)->xlp_magic == XLOG_PAGE_MAGIC);
+               Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr == ptr - (ptr % XLOG_BLCKSZ));
+               return cachedPos + ptr % XLOG_BLCKSZ;
+       }
 
-               /* Set up the next buffer as first page of next segment */
-               /* Note: AdvanceXLInsertBuffer cannot need to do I/O here */
-               (void) AdvanceXLInsertBuffer(true);
+       /*
+        * The XLog buffer cache is organized so that a page is always loaded to a
+        * particular buffer.  That way we can easily calculate the buffer a given
+        * page must be loaded into, from the XLogRecPtr alone.
+        */
+       idx = XLogRecPtrToBufIdx(ptr);
 
-               /* There should be no unwritten data */
-               curridx = Insert->curridx;
-               Assert(curridx == XLogCtl->Write.curridx);
+       /*
+        * See what page is loaded in the buffer at the moment. It could be the
+        * page we're looking for, or something older. It can't be anything newer
+        * - that would imply the page we're looking for has already been written
+        * out to disk and evicted, and the caller is responsible for making sure
+        * that doesn't happen.
+        *
+        * However, we don't hold a lock while we read the value. If someone has
+        * just initialized the page, it's possible that we get a "torn read" of
+        * the XLogRecPtr if 64-bit fetches are not atomic on this platform. In
+        * that case we will see a bogus value. That's ok, we'll grab the mapping
+        * lock (in AdvanceXLInsertBuffer) and retry if we see anything else than
+        * the page we're looking for. But it means that when we do this unlocked
+        * read, we might see a value that appears to be ahead of the page we're
+        * looking for. Don't PANIC on that, until we've verified the value while
+        * holding the lock.
+        */
+       expectedEndPtr = ptr;
+       expectedEndPtr += XLOG_BLCKSZ - ptr % XLOG_BLCKSZ;
 
-               /* Compute end address of old segment */
-               OldSegEnd = XLogCtl->xlblocks[curridx];
-               OldSegEnd -= XLOG_BLCKSZ;
+       endptr = XLogCtl->xlblocks[idx];
+       if (expectedEndPtr != endptr)
+       {
+               /*
+                * Let others know that we're finished inserting the record up to the
+                * page boundary.
+                */
+               WALInsertLockUpdateInsertingAt(expectedEndPtr - XLOG_BLCKSZ);
 
-               /* Make it look like we've written and synced all of old segment */
-               LogwrtResult.Write = OldSegEnd;
-               LogwrtResult.Flush = OldSegEnd;
+               AdvanceXLInsertBuffer(ptr, false);
+               endptr = XLogCtl->xlblocks[idx];
 
+               if (expectedEndPtr != endptr)
+                       elog(PANIC, "could not find WAL buffer for %X/%X",
+                                (uint32) (ptr >> 32), (uint32) ptr);
+       }
+       else
+       {
                /*
-                * Update shared-memory status --- this code should match XLogWrite
+                * Make sure the initialization of the page is visible to us, and
+                * won't arrive later to overwrite the WAL data we write on the page.
                 */
-               {
-                       /* use volatile pointer to prevent code rearrangement */
-                       volatile XLogCtlData *xlogctl = XLogCtl;
+               pg_memory_barrier();
+       }
 
-                       SpinLockAcquire(&xlogctl->info_lck);
-                       xlogctl->LogwrtResult = LogwrtResult;
-                       if (xlogctl->LogwrtRqst.Write < LogwrtResult.Write)
-                               xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
-                       if (xlogctl->LogwrtRqst.Flush < LogwrtResult.Flush)
-                               xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
-                       SpinLockRelease(&xlogctl->info_lck);
-               }
+       /*
+        * Found the buffer holding this page. Return a pointer to the right
+        * offset within the page.
+        */
+       cachedPage = ptr / XLOG_BLCKSZ;
+       cachedPos = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ;
 
-               LWLockRelease(WALWriteLock);
+       Assert(((XLogPageHeader) cachedPos)->xlp_magic == XLOG_PAGE_MAGIC);
+       Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr == ptr - (ptr % XLOG_BLCKSZ));
+
+       return cachedPos + ptr % XLOG_BLCKSZ;
+}
+
+/*
+ * Converts a "usable byte position" to XLogRecPtr. A usable byte position
+ * is the position starting from the beginning of WAL, excluding all WAL
+ * page headers.
+ */
+static XLogRecPtr
+XLogBytePosToRecPtr(uint64 bytepos)
+{
+       uint64          fullsegs;
+       uint64          fullpages;
+       uint64          bytesleft;
+       uint32          seg_offset;
+       XLogRecPtr      result;
+
+       fullsegs = bytepos / UsableBytesInSegment;
+       bytesleft = bytepos % UsableBytesInSegment;
 
-               updrqst = false;                /* done already */
+       if (bytesleft < XLOG_BLCKSZ - SizeOfXLogLongPHD)
+       {
+               /* fits on first page of segment */
+               seg_offset = bytesleft + SizeOfXLogLongPHD;
        }
        else
        {
-               /* normal case, ie not xlog switch */
+               /* account for the first page on segment with long header */
+               seg_offset = XLOG_BLCKSZ;
+               bytesleft -= XLOG_BLCKSZ - SizeOfXLogLongPHD;
+
+               fullpages = bytesleft / UsableBytesInPage;
+               bytesleft = bytesleft % UsableBytesInPage;
+
+               seg_offset += fullpages * XLOG_BLCKSZ + bytesleft + SizeOfXLogShortPHD;
+       }
+
+       XLogSegNoOffsetToRecPtr(fullsegs, seg_offset, result);
 
-               /* Need to update shared LogwrtRqst if some block was filled up */
-               if (freespace == 0)
+       return result;
+}
+
+/*
+ * Like XLogBytePosToRecPtr, but if the position is at a page boundary,
+ * returns a pointer to the beginning of the page (ie. before page header),
+ * not to where the first xlog record on that page would go to. This is used
+ * when converting a pointer to the end of a record.
+ */
+static XLogRecPtr
+XLogBytePosToEndRecPtr(uint64 bytepos)
+{
+       uint64          fullsegs;
+       uint64          fullpages;
+       uint64          bytesleft;
+       uint32          seg_offset;
+       XLogRecPtr      result;
+
+       fullsegs = bytepos / UsableBytesInSegment;
+       bytesleft = bytepos % UsableBytesInSegment;
+
+       if (bytesleft < XLOG_BLCKSZ - SizeOfXLogLongPHD)
+       {
+               /* fits on first page of segment */
+               if (bytesleft == 0)
+                       seg_offset = 0;
+               else
+                       seg_offset = bytesleft + SizeOfXLogLongPHD;
+       }
+       else
+       {
+               /* account for the first page on segment with long header */
+               seg_offset = XLOG_BLCKSZ;
+               bytesleft -= XLOG_BLCKSZ - SizeOfXLogLongPHD;
+
+               fullpages = bytesleft / UsableBytesInPage;
+               bytesleft = bytesleft % UsableBytesInPage;
+
+               if (bytesleft == 0)
+                       seg_offset += fullpages * XLOG_BLCKSZ + bytesleft;
+               else
+                       seg_offset += fullpages * XLOG_BLCKSZ + bytesleft + SizeOfXLogShortPHD;
+       }
+
+       XLogSegNoOffsetToRecPtr(fullsegs, seg_offset, result);
+
+       return result;
+}
+
+/*
+ * Convert an XLogRecPtr to a "usable byte position".
+ */
+static uint64
+XLogRecPtrToBytePos(XLogRecPtr ptr)
+{
+       uint64          fullsegs;
+       uint32          fullpages;
+       uint32          offset;
+       uint64          result;
+
+       XLByteToSeg(ptr, fullsegs);
+
+       fullpages = (ptr % XLOG_SEG_SIZE) / XLOG_BLCKSZ;
+       offset = ptr % XLOG_BLCKSZ;
+
+       if (fullpages == 0)
+       {
+               result = fullsegs * UsableBytesInSegment;
+               if (offset > 0)
                {
-                       /* curridx is filled and available for writing out */
-                       updrqst = true;
+                       Assert(offset >= SizeOfXLogLongPHD);
+                       result += offset - SizeOfXLogLongPHD;
                }
-               else
+       }
+       else
+       {
+               result = fullsegs * UsableBytesInSegment +
+                       (XLOG_BLCKSZ - SizeOfXLogLongPHD) + /* account for first page */
+                       (fullpages - 1) * UsableBytesInPage;            /* full pages */
+               if (offset > 0)
                {
-                       /* if updrqst already set, write through end of previous buf */
-                       curridx = PrevBufIdx(curridx);
+                       Assert(offset >= SizeOfXLogShortPHD);
+                       result += offset - SizeOfXLogShortPHD;
                }
-               WriteRqst = XLogCtl->xlblocks[curridx];
        }
 
-       LWLockRelease(WALInsertLock);
-
-       if (updrqst)
-       {
-               /* use volatile pointer to prevent code rearrangement */
-               volatile XLogCtlData *xlogctl = XLogCtl;
+       return result;
+}
 
-               SpinLockAcquire(&xlogctl->info_lck);
-               /* advance global request to include new block(s) */
-               if (xlogctl->LogwrtRqst.Write < WriteRqst)
-                       xlogctl->LogwrtRqst.Write = WriteRqst;
-               /* update local result copy while I have the chance */
-               LogwrtResult = xlogctl->LogwrtResult;
-               SpinLockRelease(&xlogctl->info_lck);
-       }
+/*
+ * Determine whether the buffer referenced has to be backed up.
+ *
+ * Since we don't yet have the insert lock, fullPageWrites and forcePageWrites
+ * could change later, so the result should be used for optimization purposes
+ * only.
+ */
+bool
+XLogCheckBufferNeedsBackup(Buffer buffer)
+{
+       bool            doPageWrites;
+       Page            page;
 
-       XactLastRecEnd = RecPtr;
+       page = BufferGetPage(buffer);
 
-       END_CRIT_SECTION();
+       doPageWrites = XLogCtl->Insert.fullPageWrites || XLogCtl->Insert.forcePageWrites;
 
-       /* wake up walsenders now that we've released heavily contended locks */
-       WalSndWakeupProcessRequests();
+       if (doPageWrites && PageGetLSN(page) <= RedoRecPtr)
+               return true;                    /* buffer requires backup */
 
-       return RecPtr;
+       return false;                           /* buffer does not need to be backed up */
 }
 
 /*
@@ -1241,7 +2021,7 @@ begin:;
  * save the buffer's LSN at *lsn.
  */
 static bool
-XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
+XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock,
                                XLogRecPtr *lsn, BkpBlock *bkpb)
 {
        Page            page;
@@ -1249,15 +2029,17 @@ XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
        page = BufferGetPage(rdata->buffer);
 
        /*
-        * XXX We assume page LSN is first data on *every* page that can be passed
-        * to XLogInsert, whether it otherwise has the standard page layout or
-        * not. We don't need the buffer header lock for PageGetLSN because we
-        * have exclusive lock on the page and/or the relation.
+        * We assume page LSN is first data on *every* page that can be passed to
+        * XLogInsert, whether it has the standard page layout or not. We don't
+        * need to take the buffer header lock for PageGetLSN if we hold an
+        * exclusive lock on the page and/or the relation.
         */
-       *lsn = PageGetLSN(page);
+       if (holdsExclusiveLock)
+               *lsn = PageGetLSN(page);
+       else
+               *lsn = BufferGetLSNAtomic(rdata->buffer);
 
-       if (doPageWrites &&
-               PageGetLSN(page) <= RedoRecPtr)
+       if (*lsn <= RedoRecPtr)
        {
                /*
                 * The page needs to be backed up, so set up *bkpb
@@ -1298,158 +2080,180 @@ XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
 }
 
 /*
- * Advance the Insert state to the next buffer page, writing out the next
- * buffer if it still contains unwritten data.
- *
- * If new_segment is TRUE then we set up the next buffer page as the first
- * page of the next xlog segment file, possibly but not usually the next
- * consecutive file page.
- *
- * The global LogwrtRqst.Write pointer needs to be advanced to include the
- * just-filled page.  If we can do this for free (without an extra lock),
- * we do so here.  Otherwise the caller must do it.  We return TRUE if the
- * request update still needs to be done, FALSE if we did it internally.
- *
- * Must be called with WALInsertLock held.
+ * Initialize XLOG buffers, writing out old buffers if they still contain
+ * unwritten data, upto the page containing 'upto'. Or if 'opportunistic' is
+ * true, initialize as many pages as we can without having to write out
+ * unwritten data. Any new pages are initialized to zeros, with pages headers
+ * initialized properly.
  */
-static bool
-AdvanceXLInsertBuffer(bool new_segment)
+static void
+AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 {
        XLogCtlInsert *Insert = &XLogCtl->Insert;
-       int                     nextidx = NextBufIdx(Insert->curridx);
-       bool            update_needed = true;
+       int                     nextidx;
        XLogRecPtr      OldPageRqstPtr;
        XLogwrtRqst WriteRqst;
-       XLogRecPtr      NewPageEndPtr;
+       XLogRecPtr      NewPageEndPtr = InvalidXLogRecPtr;
        XLogRecPtr      NewPageBeginPtr;
        XLogPageHeader NewPage;
+       int                     npages = 0;
+
+       LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
 
        /*
-        * Get ending-offset of the buffer page we need to replace (this may be
-        * zero if the buffer hasn't been used yet).  Fall through if it's already
-        * written out.
+        * Now that we have the lock, check if someone initialized the page
+        * already.
         */
-       OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
-       if (LogwrtResult.Write < OldPageRqstPtr)
+       while (upto >= XLogCtl->InitializedUpTo || opportunistic)
        {
-               /* nope, got work to do... */
-               XLogRecPtr      FinishedPageRqstPtr;
-
-               FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx];
-
-               /* Before waiting, get info_lck and update LogwrtResult */
-               {
-                       /* use volatile pointer to prevent code rearrangement */
-                       volatile XLogCtlData *xlogctl = XLogCtl;
-
-                       SpinLockAcquire(&xlogctl->info_lck);
-                       if (xlogctl->LogwrtRqst.Write < FinishedPageRqstPtr)
-                               xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr;
-                       LogwrtResult = xlogctl->LogwrtResult;
-                       SpinLockRelease(&xlogctl->info_lck);
-               }
-
-               update_needed = false;  /* Did the shared-request update */
+               nextidx = XLogRecPtrToBufIdx(XLogCtl->InitializedUpTo);
 
                /*
-                * Now that we have an up-to-date LogwrtResult value, see if we still
-                * need to write it or if someone else already did.
+                * Get ending-offset of the buffer page we need to replace (this may
+                * be zero if the buffer hasn't been used yet).  Fall through if it's
+                * already written out.
                 */
+               OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
                if (LogwrtResult.Write < OldPageRqstPtr)
                {
-                       /* Must acquire write lock */
-                       LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
-                       LogwrtResult = XLogCtl->LogwrtResult;
-                       if (LogwrtResult.Write >= OldPageRqstPtr)
+                       /*
+                        * Nope, got work to do. If we just want to pre-initialize as much
+                        * as we can without flushing, give up now.
+                        */
+                       if (opportunistic)
+                               break;
+
+                       /* Before waiting, get info_lck and update LogwrtResult */
                        {
-                               /* OK, someone wrote it already */
-                               LWLockRelease(WALWriteLock);
+                               /* use volatile pointer to prevent code rearrangement */
+                               volatile XLogCtlData *xlogctl = XLogCtl;
+
+                               SpinLockAcquire(&xlogctl->info_lck);
+                               if (xlogctl->LogwrtRqst.Write < OldPageRqstPtr)
+                                       xlogctl->LogwrtRqst.Write = OldPageRqstPtr;
+                               LogwrtResult = xlogctl->LogwrtResult;
+                               SpinLockRelease(&xlogctl->info_lck);
                        }
-                       else
+
+                       /*
+                        * Now that we have an up-to-date LogwrtResult value, see if we
+                        * still need to write it or if someone else already did.
+                        */
+                       if (LogwrtResult.Write < OldPageRqstPtr)
                        {
                                /*
-                                * Have to write buffers while holding insert lock. This is
-                                * not good, so only write as much as we absolutely must.
+                                * Must acquire write lock. Release WALBufMappingLock first,
+                                * to make sure that all insertions that we need to wait for
+                                * can finish (up to this same position). Otherwise we risk
+                                * deadlock.
                                 */
-                               TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
-                               WriteRqst.Write = OldPageRqstPtr;
-                               WriteRqst.Flush = 0;
-                               XLogWrite(WriteRqst, false, false);
-                               LWLockRelease(WALWriteLock);
-                               TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
+                               LWLockRelease(WALBufMappingLock);
+
+                               WaitXLogInsertionsToFinish(OldPageRqstPtr);
+
+                               LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
+
+                               LogwrtResult = XLogCtl->LogwrtResult;
+                               if (LogwrtResult.Write >= OldPageRqstPtr)
+                               {
+                                       /* OK, someone wrote it already */
+                                       LWLockRelease(WALWriteLock);
+                               }
+                               else
+                               {
+                                       /* Have to write it ourselves */
+                                       TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
+                                       WriteRqst.Write = OldPageRqstPtr;
+                                       WriteRqst.Flush = 0;
+                                       XLogWrite(WriteRqst, false);
+                                       LWLockRelease(WALWriteLock);
+                                       TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
+                               }
+                               /* Re-acquire WALBufMappingLock and retry */
+                               LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
+                               continue;
                        }
                }
-       }
 
-       /*
-        * Now the next buffer slot is free and we can set it up to be the next
-        * output page.
-        */
-       NewPageBeginPtr = XLogCtl->xlblocks[Insert->curridx];
+               /*
+                * Now the next buffer slot is free and we can set it up to be the
+                * next output page.
+                */
+               NewPageBeginPtr = XLogCtl->InitializedUpTo;
+               NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ;
 
-       if (new_segment)
-       {
-               /* force it to a segment start point */
-               if (NewPageBeginPtr % XLogSegSize != 0)
-                       NewPageBeginPtr += XLogSegSize - NewPageBeginPtr % XLogSegSize;
-       }
+               Assert(XLogRecPtrToBufIdx(NewPageBeginPtr) == nextidx);
 
-       NewPageEndPtr = NewPageBeginPtr;
-       NewPageEndPtr += XLOG_BLCKSZ;
-       XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
-       NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
+               NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
 
-       Insert->curridx = nextidx;
-       Insert->currpage = NewPage;
+               /*
+                * Be sure to re-zero the buffer so that bytes beyond what we've
+                * written will look like zeroes and not valid XLOG records...
+                */
+               MemSet((char *) NewPage, 0, XLOG_BLCKSZ);
 
-       Insert->currpos = ((char *) NewPage) +SizeOfXLogShortPHD;
+               /*
+                * Fill the new page's header
+                */
+               NewPage   ->xlp_magic = XLOG_PAGE_MAGIC;
 
-       /*
-        * Be sure to re-zero the buffer so that bytes beyond what we've written
-        * will look like zeroes and not valid XLOG records...
-        */
-       MemSet((char *) NewPage, 0, XLOG_BLCKSZ);
+               /* NewPage->xlp_info = 0; */    /* done by memset */
+               NewPage   ->xlp_tli = ThisTimeLineID;
+               NewPage   ->xlp_pageaddr = NewPageBeginPtr;
 
-       /*
-        * Fill the new page's header
-        */
-       NewPage   ->xlp_magic = XLOG_PAGE_MAGIC;
+               /* NewPage->xlp_rem_len = 0; */ /* done by memset */
 
-       /* NewPage->xlp_info = 0; */    /* done by memset */
-       NewPage   ->xlp_tli = ThisTimeLineID;
-       NewPage   ->xlp_pageaddr = NewPageBeginPtr;
+               /*
+                * If online backup is not in progress, mark the header to indicate
+                * that* WAL records beginning in this page have removable backup
+                * blocks.  This allows the WAL archiver to know whether it is safe to
+                * compress archived WAL data by transforming full-block records into
+                * the non-full-block format.  It is sufficient to record this at the
+                * page level because we force a page switch (in fact a segment
+                * switch) when starting a backup, so the flag will be off before any
+                * records can be written during the backup.  At the end of a backup,
+                * the last page will be marked as all unsafe when perhaps only part
+                * is unsafe, but at worst the archiver would miss the opportunity to
+                * compress a few records.
+                */
+               if (!Insert->forcePageWrites)
+                       NewPage   ->xlp_info |= XLP_BKP_REMOVABLE;
 
-       /*
-        * If online backup is not in progress, mark the header to indicate that
-        * WAL records beginning in this page have removable backup blocks.  This
-        * allows the WAL archiver to know whether it is safe to compress archived
-        * WAL data by transforming full-block records into the non-full-block
-        * format.      It is sufficient to record this at the page level because we
-        * force a page switch (in fact a segment switch) when starting a backup,
-        * so the flag will be off before any records can be written during the
-        * backup.      At the end of a backup, the last page will be marked as all
-        * unsafe when perhaps only part is unsafe, but at worst the archiver
-        * would miss the opportunity to compress a few records.
-        */
-       if (!Insert->forcePageWrites)
-               NewPage   ->xlp_info |= XLP_BKP_REMOVABLE;
+               /*
+                * If first page of an XLOG segment file, make it a long header.
+                */
+               if ((NewPage->xlp_pageaddr % XLogSegSize) == 0)
+               {
+                       XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;
 
-       /*
-        * If first page of an XLOG segment file, make it a long header.
-        */
-       if ((NewPage->xlp_pageaddr % XLogSegSize) == 0)
-       {
-               XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;
+                       NewLongPage->xlp_sysid = ControlFile->system_identifier;
+                       NewLongPage->xlp_seg_size = XLogSegSize;
+                       NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
+                       NewPage   ->xlp_info |= XLP_LONG_HEADER;
+               }
+
+               /*
+                * Make sure the initialization of the page becomes visible to others
+                * before the xlblocks update. GetXLogBuffer() reads xlblocks without
+                * holding a lock.
+                */
+               pg_write_barrier();
 
-               NewLongPage->xlp_sysid = ControlFile->system_identifier;
-               NewLongPage->xlp_seg_size = XLogSegSize;
-               NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
-               NewPage   ->xlp_info |= XLP_LONG_HEADER;
+               *((volatile XLogRecPtr *) &XLogCtl->xlblocks[nextidx]) = NewPageEndPtr;
 
-               Insert->currpos = ((char *) NewPage) +SizeOfXLogLongPHD;
+               XLogCtl->InitializedUpTo = NewPageEndPtr;
+
+               npages++;
        }
+       LWLockRelease(WALBufMappingLock);
 
-       return update_needed;
+#ifdef WAL_DEBUG
+       if (npages > 0)
+       {
+               elog(DEBUG1, "initialized %d pages, upto %X/%X",
+                        npages, (uint32) (NewPageEndPtr >> 32), (uint32) NewPageEndPtr);
+       }
+#endif
 }
 
 /*
@@ -1481,18 +2285,13 @@ XLogCheckpointNeeded(XLogSegNo new_segno)
  * This option allows us to avoid uselessly issuing multiple writes when a
  * single one would do.
  *
- * If xlog_switch == TRUE, we are intending an xlog segment switch, so
- * perform end-of-segment actions after writing the last page, even if
- * it's not physically the end of its segment.  (NB: this will work properly
- * only if caller specifies WriteRqst == page-end and flexible == false,
- * and there is some data to write.)
- *
- * Must be called with WALWriteLock held.
+ * Must be called with WALWriteLock held. WaitXLogInsertionsToFinish(WriteRqst)
+ * must be called before grabbing the lock, to make sure the data is ready to
+ * write.
  */
 static void
-XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
+XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 {
-       XLogCtlWrite *Write = &XLogCtl->Write;
        bool            ispartialpage;
        bool            last_iteration;
        bool            finishing_seg;
@@ -1525,12 +2324,10 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
 
        /*
         * Within the loop, curridx is the cache block index of the page to
-        * consider writing.  We advance Write->curridx only after successfully
-        * writing pages.  (Right now, this refinement is useless since we are
-        * going to PANIC if any error occurs anyway; but someday it may come in
-        * useful.)
+        * consider writing.  Begin at the buffer containing the next unwritten
+        * page, or last partially written page.
         */
-       curridx = Write->curridx;
+       curridx = XLogRecPtrToBufIdx(LogwrtResult.Write);
 
        while (LogwrtResult.Write < WriteRqst.Write)
        {
@@ -1539,14 +2336,16 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
                 * if we're passed a bogus WriteRqst.Write that is past the end of the
                 * last page that's been initialized by AdvanceXLInsertBuffer.
                 */
-               if (LogwrtResult.Write >= XLogCtl->xlblocks[curridx])
+               XLogRecPtr      EndPtr = XLogCtl->xlblocks[curridx];
+
+               if (LogwrtResult.Write >= EndPtr)
                        elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
-                                (uint32) (LogwrtResult.Write >> 32), (uint32) LogwrtResult.Write,
-                                (uint32) (XLogCtl->xlblocks[curridx] >> 32),
-                                (uint32) XLogCtl->xlblocks[curridx]);
+                                (uint32) (LogwrtResult.Write >> 32),
+                                (uint32) LogwrtResult.Write,
+                                (uint32) (EndPtr >> 32), (uint32) EndPtr);
 
                /* Advance LogwrtResult.Write to end of current buffer page */
-               LogwrtResult.Write = XLogCtl->xlblocks[curridx];
+               LogwrtResult.Write = EndPtr;
                ispartialpage = WriteRqst.Write < LogwrtResult.Write;
 
                if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo))
@@ -1600,6 +2399,8 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
                {
                        char       *from;
                        Size            nbytes;
+                       Size            nleft;
+                       int                     written;
 
                        /* Need to seek in the file? */
                        if (openLogOff != startoffset)
@@ -1607,32 +2408,37 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
                                if (lseek(openLogFile, (off_t) startoffset, SEEK_SET) < 0)
                                        ereport(PANIC,
                                                        (errcode_for_file_access(),
-                                                        errmsg("could not seek in log file %s to offset %u: %m",
-                                                                       XLogFileNameP(ThisTimeLineID, openLogSegNo),
-                                                                       startoffset)));
+                                        errmsg("could not seek in log file %s to offset %u: %m",
+                                                       XLogFileNameP(ThisTimeLineID, openLogSegNo),
+                                                       startoffset)));
                                openLogOff = startoffset;
                        }
 
                        /* OK to write the page(s) */
                        from = XLogCtl->pages + startidx * (Size) XLOG_BLCKSZ;
                        nbytes = npages * (Size) XLOG_BLCKSZ;
-                       errno = 0;
-                       if (write(openLogFile, from, nbytes) != nbytes)
+                       nleft = nbytes;
+                       do
                        {
-                               /* if write didn't set errno, assume no disk space */
-                               if (errno == 0)
-                                       errno = ENOSPC;
-                               ereport(PANIC,
-                                               (errcode_for_file_access(),
-                                                errmsg("could not write to log file %s "
-                                                               "at offset %u, length %lu: %m",
-                                                               XLogFileNameP(ThisTimeLineID, openLogSegNo),
-                                                               openLogOff, (unsigned long) nbytes)));
-                       }
+                               errno = 0;
+                               written = write(openLogFile, from, nleft);
+                               if (written <= 0)
+                               {
+                                       if (errno == EINTR)
+                                               continue;
+                                       ereport(PANIC,
+                                                       (errcode_for_file_access(),
+                                                        errmsg("could not write to log file %s "
+                                                                       "at offset %u, length %zu: %m",
+                                                                XLogFileNameP(ThisTimeLineID, openLogSegNo),
+                                                                       openLogOff, nbytes)));
+                               }
+                               nleft -= written;
+                               from += written;
+                       } while (nleft > 0);
 
                        /* Update state for write */
                        openLogOff += nbytes;
-                       Write->curridx = ispartialpage ? curridx : NextBufIdx(curridx);
                        npages = 0;
 
                        /*
@@ -1642,16 +2448,13 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
                         * later. Doing it here ensures that one and only one backend will
                         * perform this fsync.
                         *
-                        * We also do this if this is the last page written for an xlog
-                        * switch.
-                        *
                         * This is also the right place to notify the Archiver that the
                         * segment is ready to copy to archival storage, and to update the
                         * timer for archive_timeout, and to signal for a checkpoint if
                         * too many logfile segments have been used since the last
                         * checkpoint.
                         */
-                       if (finishing_seg || (xlog_switch && last_iteration))
+                       if (finishing_seg)
                        {
                                issue_xlog_fsync(openLogFile, openLogSegNo);
 
@@ -1663,7 +2466,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
                                if (XLogArchivingActive())
                                        XLogArchiveNotifySeg(openLogSegNo);
 
-                               Write->lastSegSwitchTime = (pg_time_t) time(NULL);
+                               XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);
 
                                /*
                                 * Request a checkpoint if we've consumed too much xlog since
@@ -1695,7 +2498,6 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
        }
 
        Assert(npages == 0);
-       Assert(curridx == Write->curridx);
 
        /*
         * If asked to flush, do so
@@ -1706,7 +2508,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
        {
                /*
                 * Could get here without iterating above loop, in which case we might
-                * have no open file or the wrong one.  However, we do not need to
+                * have no open file or the wrong one.  However, we do not need to
                 * fsync more than one file.
                 */
                if (sync_method != SYNC_METHOD_OPEN &&
@@ -1775,7 +2577,7 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 
        /*
         * If the WALWriter is sleeping, we should kick it to make it come out of
-        * low-power mode.      Otherwise, determine whether there's a full page of
+        * low-power mode.  Otherwise, determine whether there's a full page of
         * WAL available to write.
         */
        if (!sleeping)
@@ -1797,6 +2599,40 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
                SetLatch(ProcGlobal->walwriterLatch);
 }
 
+/*
+ * Record the LSN up to which we can remove WAL because it's not required by
+ * any replication slot.
+ */
+void
+XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile XLogCtlData *xlogctl = XLogCtl;
+
+       SpinLockAcquire(&xlogctl->info_lck);
+       xlogctl->replicationSlotMinLSN = lsn;
+       SpinLockRelease(&xlogctl->info_lck);
+}
+
+
+/*
+ * Return the oldest LSN we must retain to satisfy the needs of some
+ * replication slot.
+ */
+static XLogRecPtr
+XLogGetReplicationSlotMinimumLSN(void)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile XLogCtlData *xlogctl = XLogCtl;
+       XLogRecPtr      retval;
+
+       SpinLockAcquire(&xlogctl->info_lck);
+       retval = xlogctl->replicationSlotMinLSN;
+       SpinLockRelease(&xlogctl->info_lck);
+
+       return retval;
+}
+
 /*
  * Advance minRecoveryPoint in control file.
  *
@@ -1854,7 +2690,7 @@ UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force)
                if (!force && newMinRecoveryPoint < lsn)
                        elog(WARNING,
                           "xlog min recovery request %X/%X is past current point %X/%X",
-                                (uint32) (lsn >> 32) , (uint32) lsn,
+                                (uint32) (lsn >> 32), (uint32) lsn,
                                 (uint32) (newMinRecoveryPoint >> 32),
                                 (uint32) newMinRecoveryPoint);
 
@@ -1868,10 +2704,10 @@ UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force)
                        minRecoveryPointTLI = newMinRecoveryPointTLI;
 
                        ereport(DEBUG2,
-                                       (errmsg("updated min recovery point to %X/%X on timeline %u",
-                                                       (uint32) (minRecoveryPoint >> 32),
-                                                       (uint32) minRecoveryPoint,
-                                                       newMinRecoveryPointTLI)));
+                               (errmsg("updated min recovery point to %X/%X on timeline %u",
+                                               (uint32) (minRecoveryPoint >> 32),
+                                               (uint32) minRecoveryPoint,
+                                               newMinRecoveryPointTLI)));
                }
        }
        LWLockRelease(ControlFileLock);
@@ -1911,7 +2747,7 @@ XLogFlush(XLogRecPtr record)
                elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X",
                         (uint32) (record >> 32), (uint32) record,
                         (uint32) (LogwrtResult.Write >> 32), (uint32) LogwrtResult.Write,
-                        (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
+                  (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
 #endif
 
        START_CRIT_SECTION();
@@ -1935,6 +2771,7 @@ XLogFlush(XLogRecPtr record)
        {
                /* use volatile pointer to prevent code rearrangement */
                volatile XLogCtlData *xlogctl = XLogCtl;
+               XLogRecPtr      insertpos;
 
                /* read LogwrtResult and update local state */
                SpinLockAcquire(&xlogctl->info_lck);
@@ -1947,6 +2784,12 @@ XLogFlush(XLogRecPtr record)
                if (record <= LogwrtResult.Flush)
                        break;
 
+               /*
+                * Before actually performing the write, wait for all in-flight
+                * insertions to the pages we're about to write to finish.
+                */
+               insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr);
+
                /*
                 * Try to get the write lock. If we can't get it immediately, wait
                 * until it's released, and recheck if we still need to do the flush
@@ -1975,39 +2818,35 @@ XLogFlush(XLogRecPtr record)
                /*
                 * Sleep before flush! By adding a delay here, we may give further
                 * backends the opportunity to join the backlog of group commit
-                * followers; this can significantly improve transaction throughput, at
-                * the risk of increasing transaction latency.
+                * followers; this can significantly improve transaction throughput,
+                * at the risk of increasing transaction latency.
                 *
                 * We do not sleep if enableFsync is not turned on, nor if there are
                 * fewer than CommitSiblings other backends with active transactions.
                 */
                if (CommitDelay > 0 && enableFsync &&
                        MinimumActiveBackends(CommitSiblings))
+               {
                        pg_usleep(CommitDelay);
 
+                       /*
+                        * Re-check how far we can now flush the WAL. It's generally not
+                        * safe to call WaitXLogInsetionsToFinish while holding
+                        * WALWriteLock, because an in-progress insertion might need to
+                        * also grab WALWriteLock to make progress. But we know that all
+                        * the insertions up to insertpos have already finished, because
+                        * that's what the earlier WaitXLogInsertionsToFinish() returned.
+                        * We're only calling it again to allow insertpos to be moved
+                        * further forward, not to actually wait for anyone.
+                        */
+                       insertpos = WaitXLogInsertionsToFinish(insertpos);
+               }
+
                /* try to write/flush later additions to XLOG as well */
-               if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE))
-               {
-                       XLogCtlInsert *Insert = &XLogCtl->Insert;
-                       uint32          freespace = INSERT_FREESPACE(Insert);
+               WriteRqst.Write = insertpos;
+               WriteRqst.Flush = insertpos;
 
-                       if (freespace == 0)             /* buffer is full */
-                               WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
-                       else
-                       {
-                               WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
-                               WriteRqstPtr -= freespace;
-                       }
-                       LWLockRelease(WALInsertLock);
-                       WriteRqst.Write = WriteRqstPtr;
-                       WriteRqst.Flush = WriteRqstPtr;
-               }
-               else
-               {
-                       WriteRqst.Write = WriteRqstPtr;
-                       WriteRqst.Flush = record;
-               }
-               XLogWrite(WriteRqst, false, false);
+               XLogWrite(WriteRqst, false);
 
                LWLockRelease(WALWriteLock);
                /* done */
@@ -2044,7 +2883,7 @@ XLogFlush(XLogRecPtr record)
                elog(ERROR,
                "xlog flush request %X/%X is not satisfied --- flushed only to %X/%X",
                         (uint32) (record >> 32), (uint32) record,
-                        (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
+                  (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
 }
 
 /*
@@ -2053,9 +2892,9 @@ XLogFlush(XLogRecPtr record)
  * We normally flush only completed blocks; but if there is nothing to do on
  * that basis, we check for unflushed async commits in the current incomplete
  * block, and flush through the latest one of those.  Thus, if async commits
- * are not being used, we will flush complete blocks only.     We can guarantee
+ * are not being used, we will flush complete blocks only.  We can guarantee
  * that async commits reach disk after at most three cycles; normally only
- * one or two. (When flushing complete blocks, we allow XLogWrite to write
+ * one or two.  (When flushing complete blocks, we allow XLogWrite to write
  * "flexibly", meaning it can stop at the end of the buffer ring; this makes a
  * difference only with very high load or long wal_writer_delay, but imposes
  * one extra cycle for the worst case for async commits.)
@@ -2123,12 +2962,13 @@ XLogBackgroundFlush(void)
                elog(LOG, "xlog bg flush request %X/%X; write %X/%X; flush %X/%X",
                         (uint32) (WriteRqstPtr >> 32), (uint32) WriteRqstPtr,
                         (uint32) (LogwrtResult.Write >> 32), (uint32) LogwrtResult.Write,
-                        (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
+                  (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
 #endif
 
        START_CRIT_SECTION();
 
-       /* now wait for the write lock */
+       /* now wait for any in-progress insertions to finish and get write lock */
+       WaitXLogInsertionsToFinish(WriteRqstPtr);
        LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
        LogwrtResult = XLogCtl->LogwrtResult;
        if (WriteRqstPtr > LogwrtResult.Flush)
@@ -2137,7 +2977,7 @@ XLogBackgroundFlush(void)
 
                WriteRqst.Write = WriteRqstPtr;
                WriteRqst.Flush = WriteRqstPtr;
-               XLogWrite(WriteRqst, flexible, false);
+               XLogWrite(WriteRqst, flexible);
                wrote_something = true;
        }
        LWLockRelease(WALWriteLock);
@@ -2147,6 +2987,12 @@ XLogBackgroundFlush(void)
        /* wake up walsenders now that we've released heavily contended locks */
        WalSndWakeupProcessRequests();
 
+       /*
+        * Great, done. To take some work off the critical path, try to initialize
+        * as many of the no-longer-needed WAL buffers for future use as we can.
+        */
+       AdvanceXLInsertBuffer(InvalidXLogRecPtr, true);
+
        return wrote_something;
 }
 
@@ -2223,7 +3069,7 @@ XLogNeedsFlush(XLogRecPtr record)
  * log, seg: identify segment to be created/opened.
  *
  * *use_existent: if TRUE, OK to use a pre-existing file (else, any
- * pre-existing file will be deleted). On return, TRUE if a pre-existing
+ * pre-existing file will be deleted).  On return, TRUE if a pre-existing
  * file was used.
  *
  * use_lock: if TRUE, acquire ControlFileLock while moving file into
@@ -2242,6 +3088,7 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
 {
        char            path[MAXPGPATH];
        char            tmppath[MAXPGPATH];
+       char            zbuffer_raw[XLOG_BLCKSZ + MAXIMUM_ALIGNOF];
        char       *zbuffer;
        XLogSegNo       installed_segno;
        int                     max_advance;
@@ -2280,16 +3127,6 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
 
        unlink(tmppath);
 
-       /*
-        * Allocate a buffer full of zeros. This is done before opening the file
-        * so that we don't leak the file descriptor if palloc fails.
-        *
-        * Note: palloc zbuffer, instead of just using a local char array, to
-        * ensure it is reasonably well-aligned; this may save a few cycles
-        * transferring data to the kernel.
-        */
-       zbuffer = (char *) palloc0(XLOG_BLCKSZ);
-
        /* do not use get_sync_bit() here --- want to fsync only at end of fill */
        fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
                                           S_IRUSR | S_IWUSR);
@@ -2299,14 +3136,19 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
                                 errmsg("could not create file \"%s\": %m", tmppath)));
 
        /*
-        * Zero-fill the file.  We have to do this the hard way to ensure that all
+        * Zero-fill the file.  We have to do this the hard way to ensure that all
         * the file space has really been allocated --- on platforms that allow
         * "holes" in files, just seeking to the end doesn't allocate intermediate
         * space.  This way, we know that we have all the space and (after the
-        * fsync below) that all the indirect blocks are down on disk.  Therefore,
+        * fsync below) that all the indirect blocks are down on disk.  Therefore,
         * fdatasync(2) or O_DSYNC will be sufficient to sync future writes to the
         * log file.
+        *
+        * Note: ensure the buffer is reasonably well-aligned; this may save a few
+        * cycles transferring data to the kernel.
         */
+       zbuffer = (char *) MAXALIGN(zbuffer_raw);
+       memset(zbuffer, 0, XLOG_BLCKSZ);
        for (nbytes = 0; nbytes < XLogSegSize; nbytes += XLOG_BLCKSZ)
        {
                errno = 0;
@@ -2329,7 +3171,6 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
                                         errmsg("could not write to file \"%s\": %m", tmppath)));
                }
        }
-       pfree(zbuffer);
 
        if (pg_fsync(fd) != 0)
        {
@@ -2375,7 +3216,7 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
        if (fd < 0)
                ereport(ERROR,
                                (errcode_for_file_access(),
-                  errmsg("could not open file \"%s\": %m", path)));
+                                errmsg("could not open file \"%s\": %m", path)));
 
        elog(DEBUG2, "done creating and filling new WAL file");
 
@@ -2391,7 +3232,7 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
  *             a different timeline)
  *
  * Currently this is only used during recovery, and so there are no locking
- * considerations.     But we should be just as tense as XLogFileInit to avoid
+ * considerations.  But we should be just as tense as XLogFileInit to avoid
  * emplacing a bogus file.
  */
 static void
@@ -2602,7 +3443,7 @@ XLogFileOpen(XLogSegNo segno)
        if (fd < 0)
                ereport(PANIC,
                                (errcode_for_file_access(),
-                                errmsg("could not open xlog file \"%s\": %m", path)));
+                       errmsg("could not open transaction log file \"%s\": %m", path)));
 
        return fd;
 }
@@ -2709,7 +3550,7 @@ XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source)
         * the timelines listed in expectedTLEs.
         *
         * We expect curFileTLI on entry to be the TLI of the preceding file in
-        * sequence, or 0 if there was no predecessor.  We do not allow curFileTLI
+        * sequence, or 0 if there was no predecessor.  We do not allow curFileTLI
         * to go backwards; this prevents us from picking up the wrong file when a
         * parent timeline extends to higher segment numbers than the child we
         * want to read.
@@ -2779,7 +3620,7 @@ XLogFileClose(void)
 
        /*
         * WAL segment files will not be re-read in normal operation, so we advise
-        * the OS to release any cached pages.  But do not do so if WAL archiving
+        * the OS to release any cached pages.  But do not do so if WAL archiving
         * or streaming is active, because archiver and walsender process could
         * use the cache to read the WAL segment.
         */
@@ -2855,6 +3696,27 @@ CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
        }
 }
 
+/*
+ * Return the last WAL segment removed, or 0 if no segment has been removed
+ * since startup.
+ *
+ * NB: the result can be out of date arbitrarily fast, the caller has to deal
+ * with that.
+ */
+XLogSegNo
+XLogGetLastRemovedSegno(void)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile XLogCtlData *xlogctl = XLogCtl;
+       XLogSegNo       lastRemovedSegNo;
+
+       SpinLockAcquire(&xlogctl->info_lck);
+       lastRemovedSegNo = xlogctl->lastRemovedSegNo;
+       SpinLockRelease(&xlogctl->info_lck);
+
+       return lastRemovedSegNo;
+}
+
 /*
  * Update the last removed segno pointer in shared memory, to reflect
  * that the given XLOG file has been removed.
@@ -2924,7 +3786,7 @@ RemoveOldXlogFiles(XLogSegNo segno, XLogRecPtr endptr)
        {
                /*
                 * We ignore the timeline part of the XLOG segment identifiers in
-                * deciding whether a segment is still needed.  This ensures that we
+                * deciding whether a segment is still needed.  This ensures that we
                 * won't prematurely remove a segment from a parent timeline. We could
                 * probably be a little more proactive about removing segments of
                 * non-parent timelines, but that would be a whole lot more
@@ -3145,8 +4007,6 @@ Buffer
 RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index,
                                   bool get_cleanup_lock, bool keep_buffer)
 {
-       Buffer          buffer;
-       Page            page;
        BkpBlock        bkpb;
        char       *blk;
        int                     i;
@@ -3164,38 +4024,8 @@ RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index,
                if (i == block_index)
                {
                        /* Found it, apply the update */
-                       buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block,
-                                                                                       RBM_ZERO);
-                       Assert(BufferIsValid(buffer));
-                       if (get_cleanup_lock)
-                               LockBufferForCleanup(buffer);
-                       else
-                               LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
-
-                       page = (Page) BufferGetPage(buffer);
-
-                       if (bkpb.hole_length == 0)
-                       {
-                               memcpy((char *) page, blk, BLCKSZ);
-                       }
-                       else
-                       {
-                               memcpy((char *) page, blk, bkpb.hole_offset);
-                               /* must zero-fill the hole */
-                               MemSet((char *) page + bkpb.hole_offset, 0, bkpb.hole_length);
-                               memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length),
-                                          blk + bkpb.hole_offset,
-                                          BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
-                       }
-
-                       PageSetLSN(page, lsn);
-                       PageSetTLI(page, ThisTimeLineID);
-                       MarkBufferDirty(buffer);
-
-                       if (!keep_buffer)
-                               UnlockReleaseBuffer(buffer);
-
-                       return buffer;
+                       return RestoreBackupBlockContents(lsn, bkpb, blk, get_cleanup_lock,
+                                                                                         keep_buffer);
                }
 
                blk += BLCKSZ - bkpb.hole_length;
@@ -3206,6 +4036,56 @@ RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index,
        return InvalidBuffer;           /* keep compiler quiet */
 }
 
+/*
+ * Workhorse for RestoreBackupBlock usable without an xlog record
+ *
+ * Restores a full-page image from BkpBlock and a data pointer.
+ */
+static Buffer
+RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb, char *blk,
+                                                  bool get_cleanup_lock, bool keep_buffer)
+{
+       Buffer          buffer;
+       Page            page;
+
+       buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block,
+                                                                       RBM_ZERO);
+       Assert(BufferIsValid(buffer));
+       if (get_cleanup_lock)
+               LockBufferForCleanup(buffer);
+       else
+               LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
+       page = (Page) BufferGetPage(buffer);
+
+       if (bkpb.hole_length == 0)
+       {
+               memcpy((char *) page, blk, BLCKSZ);
+       }
+       else
+       {
+               memcpy((char *) page, blk, bkpb.hole_offset);
+               /* must zero-fill the hole */
+               MemSet((char *) page + bkpb.hole_offset, 0, bkpb.hole_length);
+               memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length),
+                          blk + bkpb.hole_offset,
+                          BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
+       }
+
+       /*
+        * The checksum value on this page is currently invalid. We don't need to
+        * reset it here since it will be set before being written.
+        */
+
+       PageSetLSN(page, lsn);
+       MarkBufferDirty(buffer);
+
+       if (!keep_buffer)
+               UnlockReleaseBuffer(buffer);
+
+       return buffer;
+}
+
 /*
  * Attempt to read an XLOG record.
  *
@@ -3236,7 +4116,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
 
        for (;;)
        {
-               char   *errormsg;
+               char       *errormsg;
 
                record = XLogReadRecord(xlogreader, RecPtr, &errormsg);
                ReadRecPtr = xlogreader->ReadRecPtr;
@@ -3250,34 +4130,35 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
                        }
 
                        /*
-                        * We only end up here without a message when XLogPageRead() failed
-                        * - in that case we already logged something.
-                        * In StandbyMode that only happens if we have been triggered, so
-                        * we shouldn't loop anymore in that case.
+                        * We only end up here without a message when XLogPageRead()
+                        * failed - in that case we already logged something. In
+                        * StandbyMode that only happens if we have been triggered, so we
+                        * shouldn't loop anymore in that case.
                         */
                        if (errormsg)
                                ereport(emode_for_corrupt_record(emode,
                                                                                                 RecPtr ? RecPtr : EndRecPtr),
-                                               (errmsg_internal("%s", errormsg) /* already translated */));
+                               (errmsg_internal("%s", errormsg) /* already translated */ ));
                }
+
                /*
                 * Check page TLI is one of the expected values.
                 */
                else if (!tliInHistory(xlogreader->latestPageTLI, expectedTLEs))
                {
                        char            fname[MAXFNAMELEN];
-                       XLogSegNo segno;
-                       int32 offset;
+                       XLogSegNo       segno;
+                       int32           offset;
 
                        XLByteToSeg(xlogreader->latestPagePtr, segno);
                        offset = xlogreader->latestPagePtr % XLogSegSize;
                        XLogFileName(fname, xlogreader->readPageTLI, segno);
                        ereport(emode_for_corrupt_record(emode,
                                                                                         RecPtr ? RecPtr : EndRecPtr),
-                                       (errmsg("unexpected timeline ID %u in log segment %s, offset %u",
-                                                       xlogreader->latestPageTLI,
-                                                       fname,
-                                                       offset)));
+                       (errmsg("unexpected timeline ID %u in log segment %s, offset %u",
+                                       xlogreader->latestPageTLI,
+                                       fname,
+                                       offset)));
                        record = NULL;
                }
 
@@ -3292,10 +4173,10 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
                        lastSourceFailed = true;
 
                        /*
-                        * If archive recovery was requested, but we were still doing crash
-                        * recovery, switch to archive recovery and retry using the offline
-                        * archive. We have now replayed all the valid WAL in pg_xlog, so
-                        * we are presumably now consistent.
+                        * If archive recovery was requested, but we were still doing
+                        * crash recovery, switch to archive recovery and retry using the
+                        * offline archive. We have now replayed all the valid WAL in
+                        * pg_xlog, so we are presumably now consistent.
                         *
                         * We require that there's at least some valid WAL present in
                         * pg_xlog, however (!fetch_ckpt). We could recover using the WAL
@@ -3379,11 +4260,11 @@ rescanLatestTimeLine(void)
        newExpectedTLEs = readTimeLineHistory(newtarget);
 
        /*
-        * If the current timeline is not part of the history of the new
-        * timeline, we cannot proceed to it.
+        * If the current timeline is not part of the history of the new timeline,
+        * we cannot proceed to it.
         */
        found = false;
-       foreach (cell, newExpectedTLEs)
+       foreach(cell, newExpectedTLEs)
        {
                currentTle = (TimeLineHistoryEntry *) lfirst(cell);
 
@@ -3439,7 +4320,7 @@ rescanLatestTimeLine(void)
  * I/O routines for pg_control
  *
  * *ControlFile is a buffer in shared memory that holds an image of the
- * contents of pg_control.     WriteControlFile() initializes pg_control
+ * contents of pg_control.  WriteControlFile() initializes pg_control
  * given a preloaded buffer, ReadControlFile() loads the buffer from
  * the pg_control file (during postmaster or standalone-backend startup),
  * and UpdateControlFile() rewrites pg_control after we modify xlog state.
@@ -3473,6 +4354,7 @@ WriteControlFile(void)
        ControlFile->indexMaxKeys = INDEX_MAX_KEYS;
 
        ControlFile->toast_max_chunk_size = TOAST_MAX_CHUNK_SIZE;
+       ControlFile->loblksize = LOBLKSIZE;
 
 #ifdef HAVE_INT64_TIMESTAMP
        ControlFile->enableIntTimes = true;
@@ -3666,6 +4548,13 @@ ReadControlFile(void)
                                " but the server was compiled with TOAST_MAX_CHUNK_SIZE %d.",
                          ControlFile->toast_max_chunk_size, (int) TOAST_MAX_CHUNK_SIZE),
                                 errhint("It looks like you need to recompile or initdb.")));
+       if (ControlFile->loblksize != LOBLKSIZE)
+               ereport(FATAL,
+                               (errmsg("database files are incompatible with server"),
+                 errdetail("The database cluster was initialized with LOBLKSIZE %d,"
+                                       " but the server was compiled with LOBLKSIZE %d.",
+                                       ControlFile->loblksize, (int) LOBLKSIZE),
+                                errhint("It looks like you need to recompile or initdb.")));
 
 #ifdef HAVE_INT64_TIMESTAMP
        if (ControlFile->enableIntTimes != true)
@@ -3714,6 +4603,10 @@ ReadControlFile(void)
                                  " but the server was compiled without USE_FLOAT8_BYVAL."),
                                 errhint("It looks like you need to recompile or initdb.")));
 #endif
+
+       /* Make the initdb settings visible as GUC variables, too */
+       SetConfigOption("data_checksums", DataChecksumsEnabled() ? "yes" : "no",
+                                       PGC_INTERNAL, PGC_S_OVERRIDE);
 }
 
 void
@@ -3768,6 +4661,16 @@ GetSystemIdentifier(void)
        return ControlFile->system_identifier;
 }
 
+/*
+ * Are checksums enabled for data pages?
+ */
+bool
+DataChecksumsEnabled(void)
+{
+       Assert(ControlFile != NULL);
+       return (ControlFile->data_checksum_version > 0);
+}
+
 /*
  * Returns a fake LSN for unlogged relations.
  *
@@ -3780,7 +4683,7 @@ GetSystemIdentifier(void)
 XLogRecPtr
 GetFakeLSNForUnloggedRel(void)
 {
-       XLogRecPtr nextUnloggedLSN;
+       XLogRecPtr      nextUnloggedLSN;
 
        /* use volatile pointer to prevent code rearrangement */
        volatile XLogCtlData *xlogctl = XLogCtl;
@@ -3830,7 +4733,7 @@ check_wal_buffers(int *newval, void **extra, GucSource source)
        {
                /*
                 * If we haven't yet changed the boot_val default of -1, just let it
-                * be.  We'll fix it when XLOGShmemSize is called.
+                * be.  We'll fix it when XLOGShmemSize is called.
                 */
                if (XLOGbuffers == -1)
                        return true;
@@ -3877,10 +4780,13 @@ XLOGShmemSize(void)
 
        /* XLogCtl */
        size = sizeof(XLogCtlData);
+
+       /* WAL insertion locks, plus alignment */
+       size = add_size(size, mul_size(sizeof(WALInsertLockPadded), num_xloginsert_locks + 1));
        /* xlblocks array */
        size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers));
        /* extra alignment padding for XLOG I/O buffers */
-       size = add_size(size, ALIGNOF_XLOG_BUFFER);
+       size = add_size(size, XLOG_BLCKSZ);
        /* and the buffers themselves */
        size = add_size(size, mul_size(XLOG_BLCKSZ, XLOGbuffers));
 
@@ -3899,6 +4805,7 @@ XLOGShmemInit(void)
        bool            foundCFile,
                                foundXLog;
        char       *allocptr;
+       int                     i;
 
        ControlFile = (ControlFileData *)
                ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile);
@@ -3911,7 +4818,6 @@ XLOGShmemInit(void)
                Assert(foundCFile && foundXLog);
                return;
        }
-
        memset(XLogCtl, 0, sizeof(XLogCtlData));
 
        /*
@@ -3924,10 +4830,34 @@ XLOGShmemInit(void)
        memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
        allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
 
+
+       /* WAL insertion locks. Ensure they're aligned to the full padded size */
+       allocptr += sizeof(WALInsertLockPadded) -
+               ((uintptr_t) allocptr) %sizeof(WALInsertLockPadded);
+       WALInsertLocks = XLogCtl->Insert.WALInsertLocks =
+               (WALInsertLockPadded *) allocptr;
+       allocptr += sizeof(WALInsertLockPadded) * num_xloginsert_locks;
+
+       XLogCtl->Insert.WALInsertLockTrancheId = LWLockNewTrancheId();
+
+       XLogCtl->Insert.WALInsertLockTranche.name = "WALInsertLocks";
+       XLogCtl->Insert.WALInsertLockTranche.array_base = WALInsertLocks;
+       XLogCtl->Insert.WALInsertLockTranche.array_stride = sizeof(WALInsertLockPadded);
+
+       LWLockRegisterTranche(XLogCtl->Insert.WALInsertLockTrancheId, &XLogCtl->Insert.WALInsertLockTranche);
+       for (i = 0; i < num_xloginsert_locks; i++)
+       {
+               LWLockInitialize(&WALInsertLocks[i].l.lock,
+                                                XLogCtl->Insert.WALInsertLockTrancheId);
+               WALInsertLocks[i].l.insertingAt = InvalidXLogRecPtr;
+       }
+
        /*
-        * Align the start of the page buffers to an ALIGNOF_XLOG_BUFFER boundary.
+        * Align the start of the page buffers to a full xlog block size boundary.
+        * This simplifies some calculations in XLOG insertion. It is also
+        * required for O_DIRECT.
         */
-       allocptr = (char *) TYPEALIGN(ALIGNOF_XLOG_BUFFER, allocptr);
+       allocptr = (char *) TYPEALIGN(XLOG_BLCKSZ, allocptr);
        XLogCtl->pages = allocptr;
        memset(XLogCtl->pages, 0, (Size) XLOG_BLCKSZ * XLOGbuffers);
 
@@ -3939,7 +4869,8 @@ XLOGShmemInit(void)
        XLogCtl->SharedRecoveryInProgress = true;
        XLogCtl->SharedHotStandbyActive = false;
        XLogCtl->WalWriterSleeping = false;
-       XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
+
+       SpinLockInit(&XLogCtl->Insert.insertpos_lck);
        SpinLockInit(&XLogCtl->info_lck);
        SpinLockInit(&XLogCtl->ulsn_lck);
        InitSharedLatch(&XLogCtl->recoveryWakeupLatch);
@@ -3976,22 +4907,23 @@ BootStrapXLOG(void)
         * field, as being about as unique as we can easily get.  (Think not to
         * use random(), since it hasn't been seeded and there's no portable way
         * to seed it other than the system clock value...)  The upper half of the
-        * uint64 value is just the tv_sec part, while the lower half is the XOR
-        * of tv_sec and tv_usec.  This is to ensure that we don't lose uniqueness
-        * unnecessarily if "uint64" is really only 32 bits wide.  A person
-        * knowing this encoding can determine the initialization time of the
-        * installation, which could perhaps be useful sometimes.
+        * uint64 value is just the tv_sec part, while the lower half contains the
+        * tv_usec part (which must fit in 20 bits), plus 12 bits from our current
+        * PID for a little extra uniqueness.  A person knowing this encoding can
+        * determine the initialization time of the installation, which could
+        * perhaps be useful sometimes.
         */
        gettimeofday(&tv, NULL);
        sysidentifier = ((uint64) tv.tv_sec) << 32;
-       sysidentifier |= (uint32) (tv.tv_sec | tv.tv_usec);
+       sysidentifier |= ((uint64) tv.tv_usec) << 12;
+       sysidentifier |= getpid() & 0xFFF;
 
        /* First timeline ID is always 1 */
        ThisTimeLineID = 1;
 
        /* page buffer must be aligned suitably for O_DIRECT */
-       buffer = (char *) palloc(XLOG_BLCKSZ + ALIGNOF_XLOG_BUFFER);
-       page = (XLogPageHeader) TYPEALIGN(ALIGNOF_XLOG_BUFFER, buffer);
+       buffer = (char *) palloc(XLOG_BLCKSZ + XLOG_BLCKSZ);
+       page = (XLogPageHeader) TYPEALIGN(XLOG_BLCKSZ, buffer);
        memset(page, 0, XLOG_BLCKSZ);
 
        /*
@@ -4091,9 +5023,12 @@ BootStrapXLOG(void)
 
        /* Set important parameter values for use when replaying WAL */
        ControlFile->MaxConnections = MaxConnections;
+       ControlFile->max_worker_processes = max_worker_processes;
        ControlFile->max_prepared_xacts = max_prepared_xacts;
        ControlFile->max_locks_per_xact = max_locks_per_xact;
        ControlFile->wal_level = wal_level;
+       ControlFile->wal_log_hints = wal_log_hints;
+       ControlFile->data_checksum_version = bootstrap_data_checksum_version;
 
        /* some additional ControlFile fields are set in WriteControlFile() */
 
@@ -4223,13 +5158,6 @@ readRecoveryCommandFile(void)
                }
                else if (strcmp(item->name, "recovery_target_time") == 0)
                {
-                       /*
-                        * if recovery_target_xid or recovery_target_name specified, then
-                        * this overrides recovery_target_time
-                        */
-                       if (recoveryTarget == RECOVERY_TARGET_XID ||
-                               recoveryTarget == RECOVERY_TARGET_NAME)
-                               continue;
                        recoveryTarget = RECOVERY_TARGET_TIME;
 
                        /*
@@ -4246,12 +5174,6 @@ readRecoveryCommandFile(void)
                }
                else if (strcmp(item->name, "recovery_target_name") == 0)
                {
-                       /*
-                        * if recovery_target_xid specified, then this overrides
-                        * recovery_target_name
-                        */
-                       if (recoveryTarget == RECOVERY_TARGET_XID)
-                               continue;
                        recoveryTarget = RECOVERY_TARGET_NAME;
 
                        recoveryTargetName = pstrdup(item->value);
@@ -4265,6 +5187,19 @@ readRecoveryCommandFile(void)
                                        (errmsg_internal("recovery_target_name = '%s'",
                                                                         recoveryTargetName)));
                }
+               else if (strcmp(item->name, "recovery_target") == 0)
+               {
+                       if (strcmp(item->value, "immediate") == 0)
+                               recoveryTarget = RECOVERY_TARGET_IMMEDIATE;
+                       else
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                                errmsg("invalid recovery_target parameter"),
+                                                errhint("The only allowed value is 'immediate'")));
+                       ereport(DEBUG2,
+                                       (errmsg_internal("recovery_target = '%s'",
+                                                                        item->value)));
+               }
                else if (strcmp(item->name, "recovery_target_inclusive") == 0)
                {
                        /*
@@ -4296,6 +5231,14 @@ readRecoveryCommandFile(void)
                                        (errmsg_internal("primary_conninfo = '%s'",
                                                                         PrimaryConnInfo)));
                }
+               else if (strcmp(item->name, "primary_slot_name") == 0)
+               {
+                       ReplicationSlotValidateName(item->value, ERROR);
+                       PrimarySlotName = pstrdup(item->value);
+                       ereport(DEBUG2,
+                                       (errmsg_internal("primary_slot_name = '%s'",
+                                                                        PrimarySlotName)));
+               }
                else if (strcmp(item->name, "trigger_file") == 0)
                {
                        TriggerFile = pstrdup(item->value);
@@ -4303,6 +5246,20 @@ readRecoveryCommandFile(void)
                                        (errmsg_internal("trigger_file = '%s'",
                                                                         TriggerFile)));
                }
+               else if (strcmp(item->name, "recovery_min_apply_delay") == 0)
+               {
+                       const char *hintmsg;
+
+                       if (!parse_int(item->value, &recovery_min_apply_delay, GUC_UNIT_MS,
+                                                  &hintmsg))
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                                errmsg("parameter \"%s\" requires a temporal value",
+                                                               "recovery_min_apply_delay"),
+                                                hintmsg ? errhint("%s", _(hintmsg)) : 0));
+                       ereport(DEBUG2,
+                                       (errmsg("recovery_min_apply_delay = '%s'", item->value)));
+               }
                else
                        ereport(FATAL,
                                        (errmsg("unrecognized recovery parameter \"%s\"",
@@ -4333,7 +5290,7 @@ readRecoveryCommandFile(void)
 
        /*
         * If user specified recovery_target_timeline, validate it or compute the
-        * "latest" value.      We can't do this until after we've gotten the restore
+        * "latest" value.  We can't do this until after we've gotten the restore
         * command and set InArchiveRecovery, because we need to fetch timeline
         * history files from the archive.
         */
@@ -4442,75 +5399,83 @@ exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo)
                        (errmsg("archive recovery complete")));
 }
 
+/*
+ * Extract timestamp from WAL record.
+ *
+ * If the record contains a timestamp, returns true, and saves the timestamp
+ * in *recordXtime. If the record type has no timestamp, returns false.
+ * Currently, only transaction commit/abort records and restore points contain
+ * timestamps.
+ */
+static bool
+getRecordTimestamp(XLogRecord *record, TimestampTz *recordXtime)
+{
+       uint8           record_info = record->xl_info & ~XLR_INFO_MASK;
+
+       if (record->xl_rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT)
+       {
+               *recordXtime = ((xl_restore_point *) XLogRecGetData(record))->rp_time;
+               return true;
+       }
+       if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_COMPACT)
+       {
+               *recordXtime = ((xl_xact_commit_compact *) XLogRecGetData(record))->xact_time;
+               return true;
+       }
+       if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT)
+       {
+               *recordXtime = ((xl_xact_commit *) XLogRecGetData(record))->xact_time;
+               return true;
+       }
+       if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT)
+       {
+               *recordXtime = ((xl_xact_abort *) XLogRecGetData(record))->xact_time;
+               return true;
+       }
+       return false;
+}
+
 /*
  * For point-in-time recovery, this function decides whether we want to
- * stop applying the XLOG at or after the current record.
- *
- * Returns TRUE if we are stopping, FALSE otherwise.  On TRUE return,
- * *includeThis is set TRUE if we should apply this record before stopping.
+ * stop applying the XLOG before the current record.
  *
- * We also track the timestamp of the latest applied COMMIT/ABORT
- * record in XLogCtl->recoveryLastXTime, for logging purposes.
- * Also, some information is saved in recoveryStopXid et al for use in
- * annotating the new timeline's history file.
+ * Returns TRUE if we are stopping, FALSE otherwise. If stopping, some
+ * information is saved in recoveryStopXid et al for use in annotating the
+ * new timeline's history file.
  */
 static bool
-recoveryStopsHere(XLogRecord *record, bool *includeThis)
+recoveryStopsBefore(XLogRecord *record)
 {
-       bool            stopsHere;
+       bool            stopsHere = false;
        uint8           record_info;
-       TimestampTz recordXtime;
-       char            recordRPName[MAXFNAMELEN];
-
-       /* We only consider stopping at COMMIT, ABORT or RESTORE POINT records */
-       if (record->xl_rmid != RM_XACT_ID && record->xl_rmid != RM_XLOG_ID)
-               return false;
-       record_info = record->xl_info & ~XLR_INFO_MASK;
-       if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_COMPACT)
-       {
-               xl_xact_commit_compact *recordXactCommitData;
-
-               recordXactCommitData = (xl_xact_commit_compact *) XLogRecGetData(record);
-               recordXtime = recordXactCommitData->xact_time;
-       }
-       else if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT)
-       {
-               xl_xact_commit *recordXactCommitData;
+       bool            isCommit;
+       TimestampTz recordXtime = 0;
 
-               recordXactCommitData = (xl_xact_commit *) XLogRecGetData(record);
-               recordXtime = recordXactCommitData->xact_time;
-       }
-       else if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT)
+       /* Check if we should stop as soon as reaching consistency */
+       if (recoveryTarget == RECOVERY_TARGET_IMMEDIATE && reachedConsistency)
        {
-               xl_xact_abort *recordXactAbortData;
+               ereport(LOG,
+                               (errmsg("recovery stopping after reaching consistency")));
 
-               recordXactAbortData = (xl_xact_abort *) XLogRecGetData(record);
-               recordXtime = recordXactAbortData->xact_time;
+               recoveryStopAfter = false;
+               recoveryStopXid = InvalidTransactionId;
+               recoveryStopTime = 0;
+               recoveryStopName[0] = '\0';
+               return true;
        }
-       else if (record->xl_rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT)
-       {
-               xl_restore_point *recordRestorePointData;
 
-               recordRestorePointData = (xl_restore_point *) XLogRecGetData(record);
-               recordXtime = recordRestorePointData->rp_time;
-               strncpy(recordRPName, recordRestorePointData->rp_name, MAXFNAMELEN);
-       }
-       else
+       /* Otherwise we only consider stopping before COMMIT or ABORT records. */
+       if (record->xl_rmid != RM_XACT_ID)
                return false;
-
-       /* Do we have a PITR target at all? */
-       if (recoveryTarget == RECOVERY_TARGET_UNSET)
-       {
-               /*
-                * Save timestamp of latest transaction commit/abort if this is a
-                * transaction record
-                */
-               if (record->xl_rmid == RM_XACT_ID)
-                       SetLatestXTime(recordXtime);
+       record_info = record->xl_info & ~XLR_INFO_MASK;
+       if (record_info == XLOG_XACT_COMMIT_COMPACT || record_info == XLOG_XACT_COMMIT)
+               isCommit = true;
+       else if (record_info == XLOG_XACT_ABORT)
+               isCommit = false;
+       else
                return false;
-       }
 
-       if (recoveryTarget == RECOVERY_TARGET_XID)
+       if (recoveryTarget == RECOVERY_TARGET_XID && !recoveryTargetInclusive)
        {
                /*
                 * There can be only one transaction end record with this exact
@@ -4522,24 +5487,10 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
                 * 50% of the time...
                 */
                stopsHere = (record->xl_xid == recoveryTargetXid);
-               if (stopsHere)
-                       *includeThis = recoveryTargetInclusive;
        }
-       else if (recoveryTarget == RECOVERY_TARGET_NAME)
-       {
-               /*
-                * There can be many restore points that share the same name, so we
-                * stop at the first one
-                */
-               stopsHere = (strcmp(recordRPName, recoveryTargetName) == 0);
 
-               /*
-                * Ignore recoveryTargetInclusive because this is not a transaction
-                * record
-                */
-               *includeThis = false;
-       }
-       else
+       if (recoveryTarget == RECOVERY_TARGET_TIME &&
+               getRecordTimestamp(record, &recordXtime))
        {
                /*
                 * There can be many transactions that share the same commit time, so
@@ -4550,64 +5501,132 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
                        stopsHere = (recordXtime > recoveryTargetTime);
                else
                        stopsHere = (recordXtime >= recoveryTargetTime);
-               if (stopsHere)
-                       *includeThis = false;
        }
 
        if (stopsHere)
        {
+               recoveryStopAfter = false;
                recoveryStopXid = record->xl_xid;
                recoveryStopTime = recordXtime;
-               recoveryStopAfter = *includeThis;
+               recoveryStopName[0] = '\0';
 
-               if (record_info == XLOG_XACT_COMMIT_COMPACT || record_info == XLOG_XACT_COMMIT)
+               if (isCommit)
                {
-                       if (recoveryStopAfter)
-                               ereport(LOG,
-                                               (errmsg("recovery stopping after commit of transaction %u, time %s",
-                                                               recoveryStopXid,
-                                                               timestamptz_to_str(recoveryStopTime))));
-                       else
-                               ereport(LOG,
-                                               (errmsg("recovery stopping before commit of transaction %u, time %s",
-                                                               recoveryStopXid,
-                                                               timestamptz_to_str(recoveryStopTime))));
+                       ereport(LOG,
+                                       (errmsg("recovery stopping before commit of transaction %u, time %s",
+                                                       recoveryStopXid,
+                                                       timestamptz_to_str(recoveryStopTime))));
                }
-               else if (record_info == XLOG_XACT_ABORT)
+               else
                {
-                       if (recoveryStopAfter)
-                               ereport(LOG,
-                                               (errmsg("recovery stopping after abort of transaction %u, time %s",
-                                                               recoveryStopXid,
-                                                               timestamptz_to_str(recoveryStopTime))));
-                       else
-                               ereport(LOG,
-                                               (errmsg("recovery stopping before abort of transaction %u, time %s",
-                                                               recoveryStopXid,
-                                                               timestamptz_to_str(recoveryStopTime))));
+                       ereport(LOG,
+                                       (errmsg("recovery stopping before abort of transaction %u, time %s",
+                                                       recoveryStopXid,
+                                                       timestamptz_to_str(recoveryStopTime))));
                }
-               else
+       }
+
+       return stopsHere;
+}
+
+/*
+ * Same as recoveryStopsBefore, but called after applying the record.
+ *
+ * We also track the timestamp of the latest applied COMMIT/ABORT
+ * record in XLogCtl->recoveryLastXTime.
+ */
+static bool
+recoveryStopsAfter(XLogRecord *record)
+{
+       uint8           record_info;
+       TimestampTz recordXtime;
+
+       record_info = record->xl_info & ~XLR_INFO_MASK;
+
+       /*
+        * There can be many restore points that share the same name; we stop at
+        * the first one.
+        */
+       if (recoveryTarget == RECOVERY_TARGET_NAME &&
+               record->xl_rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT)
+       {
+               xl_restore_point *recordRestorePointData;
+
+               recordRestorePointData = (xl_restore_point *) XLogRecGetData(record);
+
+               if (strcmp(recordRestorePointData->rp_name, recoveryTargetName) == 0)
                {
-                       strncpy(recoveryStopName, recordRPName, MAXFNAMELEN);
+                       recoveryStopAfter = true;
+                       recoveryStopXid = InvalidTransactionId;
+                       (void) getRecordTimestamp(record, &recoveryStopTime);
+                       strlcpy(recoveryStopName, recordRestorePointData->rp_name, MAXFNAMELEN);
 
                        ereport(LOG,
                                (errmsg("recovery stopping at restore point \"%s\", time %s",
                                                recoveryStopName,
                                                timestamptz_to_str(recoveryStopTime))));
+                       return true;
                }
+       }
+
+       if (record->xl_rmid == RM_XACT_ID &&
+               (record_info == XLOG_XACT_COMMIT_COMPACT ||
+                record_info == XLOG_XACT_COMMIT ||
+                record_info == XLOG_XACT_ABORT))
+       {
+               /* Update the last applied transaction timestamp */
+               if (getRecordTimestamp(record, &recordXtime))
+                       SetLatestXTime(recordXtime);
 
                /*
-                * Note that if we use a RECOVERY_TARGET_TIME then we can stop at a
-                * restore point since they are timestamped, though the latest
-                * transaction time is not updated.
+                * There can be only one transaction end record with this exact
+                * transactionid
+                *
+                * when testing for an xid, we MUST test for equality only, since
+                * transactions are numbered in the order they start, not the order
+                * they complete. A higher numbered xid will complete before you about
+                * 50% of the time...
                 */
-               if (record->xl_rmid == RM_XACT_ID && recoveryStopAfter)
-                       SetLatestXTime(recordXtime);
+               if (recoveryTarget == RECOVERY_TARGET_XID && recoveryTargetInclusive &&
+                       record->xl_xid == recoveryTargetXid)
+               {
+                       recoveryStopAfter = true;
+                       recoveryStopXid = record->xl_xid;
+                       recoveryStopTime = recordXtime;
+                       recoveryStopName[0] = '\0';
+
+                       if (record_info == XLOG_XACT_COMMIT_COMPACT || record_info == XLOG_XACT_COMMIT)
+                       {
+                               ereport(LOG,
+                                               (errmsg("recovery stopping after commit of transaction %u, time %s",
+                                                               recoveryStopXid,
+                                                               timestamptz_to_str(recoveryStopTime))));
+                       }
+                       else if (record_info == XLOG_XACT_ABORT)
+                       {
+                               ereport(LOG,
+                                               (errmsg("recovery stopping after abort of transaction %u, time %s",
+                                                               recoveryStopXid,
+                                                               timestamptz_to_str(recoveryStopTime))));
+                       }
+                       return true;
+               }
        }
-       else if (record->xl_rmid == RM_XACT_ID)
-               SetLatestXTime(recordXtime);
 
-       return stopsHere;
+       /* Check if we should stop as soon as reaching consistency */
+       if (recoveryTarget == RECOVERY_TARGET_IMMEDIATE && reachedConsistency)
+       {
+               ereport(LOG,
+                               (errmsg("recovery stopping after reaching consistency")));
+
+               recoveryStopAfter = true;
+               recoveryStopXid = InvalidTransactionId;
+               recoveryStopTime = 0;
+               recoveryStopName[0] = '\0';
+               return true;
+       }
+
+       return false;
 }
 
 /*
@@ -4660,6 +5679,90 @@ SetRecoveryPause(bool recoveryPause)
        SpinLockRelease(&xlogctl->info_lck);
 }
 
+/*
+ * When recovery_min_apply_delay is set, we wait long enough to make sure
+ * certain record types are applied at least that interval behind the master.
+ *
+ * Returns true if we waited.
+ *
+ * Note that the delay is calculated between the WAL record log time and
+ * the current time on standby. We would prefer to keep track of when this
+ * standby received each WAL record, which would allow a more consistent
+ * approach and one not affected by time synchronisation issues, but that
+ * is significantly more effort and complexity for little actual gain in
+ * usability.
+ */
+static bool
+recoveryApplyDelay(XLogRecord *record)
+{
+       uint8           record_info;
+       TimestampTz xtime;
+       long            secs;
+       int                     microsecs;
+
+       /* nothing to do if no delay configured */
+       if (recovery_min_apply_delay == 0)
+               return false;
+
+       /*
+        * Is it a COMMIT record?
+        *
+        * We deliberately choose not to delay aborts since they have no effect on
+        * MVCC. We already allow replay of records that don't have a timestamp,
+        * so there is already opportunity for issues caused by early conflicts on
+        * standbys.
+        */
+       record_info = record->xl_info & ~XLR_INFO_MASK;
+       if (!(record->xl_rmid == RM_XACT_ID &&
+                 (record_info == XLOG_XACT_COMMIT_COMPACT ||
+                  record_info == XLOG_XACT_COMMIT)))
+               return false;
+
+       if (!getRecordTimestamp(record, &xtime))
+               return false;
+
+       recoveryDelayUntilTime =
+               TimestampTzPlusMilliseconds(xtime, recovery_min_apply_delay);
+
+       /*
+        * Exit without arming the latch if it's already past time to apply this
+        * record
+        */
+       TimestampDifference(GetCurrentTimestamp(), recoveryDelayUntilTime,
+                                               &secs, &microsecs);
+       if (secs <= 0 && microsecs <= 0)
+               return false;
+
+       while (true)
+       {
+               ResetLatch(&XLogCtl->recoveryWakeupLatch);
+
+               /* might change the trigger file's location */
+               HandleStartupProcInterrupts();
+
+               if (CheckForStandbyTrigger())
+                       break;
+
+               /*
+                * Wait for difference between GetCurrentTimestamp() and
+                * recoveryDelayUntilTime
+                */
+               TimestampDifference(GetCurrentTimestamp(), recoveryDelayUntilTime,
+                                                       &secs, &microsecs);
+
+               if (secs <= 0 && microsecs <= 0)
+                       break;
+
+               elog(DEBUG2, "recovery apply delay %ld seconds, %d milliseconds",
+                        secs, microsecs / 1000);
+
+               WaitLatch(&XLogCtl->recoveryWakeupLatch,
+                                 WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+                                 secs * 1000L + microsecs / 1000);
+       }
+       return true;
+}
+
 /*
  * Save timestamp of latest processed commit/abort record.
  *
@@ -4775,7 +5878,7 @@ CheckRequiredParameterValues(void)
         * For archive recovery, the WAL must be generated with at least 'archive'
         * wal_level.
         */
-       if (InArchiveRecovery && ControlFile->wal_level == WAL_LEVEL_MINIMAL)
+       if (ArchiveRecoveryRequested && ControlFile->wal_level == WAL_LEVEL_MINIMAL)
        {
                ereport(WARNING,
                                (errmsg("WAL was generated with wal_level=minimal, data may be missing"),
@@ -4786,17 +5889,20 @@ CheckRequiredParameterValues(void)
         * For Hot Standby, the WAL must be generated with 'hot_standby' mode, and
         * we must have at least as many backend slots as the primary.
         */
-       if (InArchiveRecovery && EnableHotStandby)
+       if (ArchiveRecoveryRequested && EnableHotStandby)
        {
                if (ControlFile->wal_level < WAL_LEVEL_HOT_STANDBY)
                        ereport(ERROR,
-                                       (errmsg("hot standby is not possible because wal_level was not set to \"hot_standby\" on the master server"),
+                                       (errmsg("hot standby is not possible because wal_level was not set to \"hot_standby\" or higher on the master server"),
                                         errhint("Either set wal_level to \"hot_standby\" on the master, or turn off hot_standby here.")));
 
                /* We ignore autovacuum_max_workers when we make this test. */
                RecoveryRequiresIntParameter("max_connections",
                                                                         MaxConnections,
                                                                         ControlFile->MaxConnections);
+               RecoveryRequiresIntParameter("max_worker_processes",
+                                                                        max_worker_processes,
+                                                                        ControlFile->max_worker_processes);
                RecoveryRequiresIntParameter("max_prepared_transactions",
                                                                         max_prepared_xacts,
                                                                         ControlFile->max_prepared_xacts);
@@ -4823,7 +5929,6 @@ StartupXLOG(void)
        XLogSegNo       endLogSegNo;
        TimeLineID      PrevTimeLineID;
        XLogRecord *record;
-       uint32          freespace;
        TransactionId oldestActiveXID;
        bool            backupEndRequired = false;
        bool            backupFromStandby = false;
@@ -4847,9 +5952,12 @@ StartupXLOG(void)
                                (errmsg("control file contains invalid data")));
 
        if (ControlFile->state == DB_SHUTDOWNED)
-               ereport(LOG,
+       {
+               /* This is the expected case, so don't be chatty in standalone mode */
+               ereport(IsPostmasterEnvironment ? LOG : NOTICE,
                                (errmsg("database system was shut down at %s",
                                                str_time(ControlFile->time))));
+       }
        else if (ControlFile->state == DB_SHUTDOWNED_IN_RECOVERY)
                ereport(LOG,
                                (errmsg("database system was shut down in recovery at %s",
@@ -4889,7 +5997,7 @@ StartupXLOG(void)
        ValidateXLOGDirectoryStructure();
 
        /*
-        * Clear out any old relcache cache files.      This is *necessary* if we do
+        * Clear out any old relcache cache files.  This is *necessary* if we do
         * any WAL replay, since that would probably result in the cache files
         * being out of sync with database reality.  In theory we could leave them
         * in place if the database had been cleanly shut down, but it seems
@@ -4899,10 +6007,14 @@ StartupXLOG(void)
        RelationCacheInitFileRemove();
 
        /*
-        * Initialize on the assumption we want to recover to the same timeline
+        * Initialize on the assumption we want to recover to the latest timeline
         * that's active according to pg_control.
         */
-       recoveryTargetTLI = ControlFile->checkPointCopy.ThisTimeLineID;
+       if (ControlFile->minRecoveryPointTLI >
+               ControlFile->checkPointCopy.ThisTimeLineID)
+               recoveryTargetTLI = ControlFile->minRecoveryPointTLI;
+       else
+               recoveryTargetTLI = ControlFile->checkPointCopy.ThisTimeLineID;
 
        /*
         * Check for recovery control file, and if so set up state for offline
@@ -4914,7 +6026,7 @@ StartupXLOG(void)
         * Save archive_cleanup_command in shared memory so that other processes
         * can see it.
         */
-       strncpy(XLogCtl->archiveCleanupCommand,
+       strlcpy(XLogCtl->archiveCleanupCommand,
                        archiveCleanupCommand ? archiveCleanupCommand : "",
                        sizeof(XLogCtl->archiveCleanupCommand));
 
@@ -4935,26 +6047,13 @@ StartupXLOG(void)
                        ereport(LOG,
                                        (errmsg("starting point-in-time recovery to \"%s\"",
                                                        recoveryTargetName)));
+               else if (recoveryTarget == RECOVERY_TARGET_IMMEDIATE)
+                       ereport(LOG,
+                                       (errmsg("starting point-in-time recovery to earliest consistent point")));
                else
                        ereport(LOG,
                                        (errmsg("starting archive recovery")));
        }
-       else if (ControlFile->minRecoveryPointTLI > 0)
-       {
-               /*
-                * If the minRecoveryPointTLI is set when not in Archive Recovery
-                * it means that we have crashed after ending recovery and
-                * yet before we wrote a new checkpoint on the new timeline.
-                * That means we are doing a crash recovery that needs to cross
-                * timelines to get to our newly assigned timeline again.
-                * The timeline we are headed for is exact and not 'latest'.
-                * As soon as we hit a checkpoint, the minRecoveryPointTLI is
-                * reset, so we will not enter crash recovery again.
-                */
-               Assert(ControlFile->minRecoveryPointTLI != 1);
-               recoveryTargetTLI = ControlFile->minRecoveryPointTLI;
-               recoveryTargetIsLatest = false;
-       }
 
        /*
         * Take ownership of the wakeup latch if we're going to sleep during
@@ -4970,15 +6069,15 @@ StartupXLOG(void)
                ereport(ERROR,
                                (errcode(ERRCODE_OUT_OF_MEMORY),
                                 errmsg("out of memory"),
-                                errdetail("Failed while allocating an XLog reading processor")));
+                  errdetail("Failed while allocating an XLog reading processor.")));
        xlogreader->system_identifier = ControlFile->system_identifier;
 
        if (read_backup_label(&checkPointLoc, &backupEndRequired,
                                                  &backupFromStandby))
        {
                /*
-                * Archive recovery was requested, and thanks to the backup label file,
-                * we know how far we need to replay to reach consistency. Enter
+                * Archive recovery was requested, and thanks to the backup label
+                * file, we know how far we need to replay to reach consistency. Enter
                 * archive recovery directly.
                 */
                InArchiveRecovery = true;
@@ -4996,7 +6095,7 @@ StartupXLOG(void)
                        wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
                        ereport(DEBUG1,
                                        (errmsg("checkpoint record is at %X/%X",
-                                                       (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
+                                  (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
                        InRecovery = true;      /* force recovery even if SHUTDOWNED */
 
                        /*
@@ -5028,8 +6127,8 @@ StartupXLOG(void)
                /*
                 * It's possible that archive recovery was requested, but we don't
                 * know how far we need to replay the WAL before we reach consistency.
-                * This can happen for example if a base backup is taken from a running
-                * server using an atomic filesystem snapshot, without calling
+                * This can happen for example if a base backup is taken from a
+                * running server using an atomic filesystem snapshot, without calling
                 * pg_start/stop_backup. Or if you just kill a running master server
                 * and put it into archive recovery by creating a recovery.conf file.
                 *
@@ -5037,8 +6136,8 @@ StartupXLOG(void)
                 * replaying all the WAL present in pg_xlog, and only enter archive
                 * recovery after that.
                 *
-                * But usually we already know how far we need to replay the WAL (up to
-                * minRecoveryPoint, up to backupEndPoint, or until we see an
+                * But usually we already know how far we need to replay the WAL (up
+                * to minRecoveryPoint, up to backupEndPoint, or until we see an
                 * end-of-backup record), and we can enter archive recovery directly.
                 */
                if (ArchiveRecoveryRequested &&
@@ -5063,7 +6162,7 @@ StartupXLOG(void)
                {
                        ereport(DEBUG1,
                                        (errmsg("checkpoint record is at %X/%X",
-                                                       (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
+                                  (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
                }
                else if (StandbyMode)
                {
@@ -5082,7 +6181,7 @@ StartupXLOG(void)
                        {
                                ereport(LOG,
                                                (errmsg("using previous checkpoint record at %X/%X",
-                                                               (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
+                                  (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
                                InRecovery = true;              /* force recovery even if SHUTDOWNED */
                        }
                        else
@@ -5098,21 +6197,22 @@ StartupXLOG(void)
         * timeline in the history of the requested timeline, we cannot proceed:
         * the backup is not part of the history of the requested timeline.
         */
-       Assert(expectedTLEs); /* was initialized by reading checkpoint record */
+       Assert(expectedTLEs);           /* was initialized by reading checkpoint
+                                                                * record */
        if (tliOfPointInHistory(checkPointLoc, expectedTLEs) !=
-                       checkPoint.ThisTimeLineID)
+               checkPoint.ThisTimeLineID)
        {
-               XLogRecPtr switchpoint;
+               XLogRecPtr      switchpoint;
 
                /*
-                * tliSwitchPoint will throw an error if the checkpoint's timeline
-                * is not in expectedTLEs at all.
+                * tliSwitchPoint will throw an error if the checkpoint's timeline is
+                * not in expectedTLEs at all.
                 */
                switchpoint = tliSwitchPoint(ControlFile->checkPointCopy.ThisTimeLineID, expectedTLEs, NULL);
                ereport(FATAL,
                                (errmsg("requested timeline %u is not a child of this server's history",
                                                recoveryTargetTLI),
-                                errdetail("Latest checkpoint is at %X/%X on timeline %u, but in the history of the requested timeline, the server forked off from that timeline at %X/%X",
+                                errdetail("Latest checkpoint is at %X/%X on timeline %u, but in the history of the requested timeline, the server forked off from that timeline at %X/%X.",
                                                   (uint32) (ControlFile->checkPoint >> 32),
                                                   (uint32) ControlFile->checkPoint,
                                                   ControlFile->checkPointCopy.ThisTimeLineID,
@@ -5125,8 +6225,8 @@ StartupXLOG(void)
         * history, too.
         */
        if (!XLogRecPtrIsInvalid(ControlFile->minRecoveryPoint) &&
-               tliOfPointInHistory(ControlFile->minRecoveryPoint - 1, expectedTLEs) !=
-                       ControlFile->minRecoveryPointTLI)
+         tliOfPointInHistory(ControlFile->minRecoveryPoint - 1, expectedTLEs) !=
+               ControlFile->minRecoveryPointTLI)
                ereport(FATAL,
                                (errmsg("requested timeline %u does not contain minimum recovery point %X/%X on timeline %u",
                                                recoveryTargetTLI,
@@ -5138,7 +6238,7 @@ StartupXLOG(void)
 
        ereport(DEBUG1,
                        (errmsg("redo record is at %X/%X; shutdown %s",
-                                       (uint32) (checkPoint.redo >> 32), (uint32) checkPoint.redo,
+                                 (uint32) (checkPoint.redo >> 32), (uint32) checkPoint.redo,
                                        wasShutdown ? "TRUE" : "FALSE")));
        ereport(DEBUG1,
                        (errmsg("next transaction ID: %u/%u; next OID: %u",
@@ -5164,9 +6264,30 @@ StartupXLOG(void)
        MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
        SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
        SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
+       MultiXactSetSafeTruncate(checkPoint.oldestMulti);
        XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
        XLogCtl->ckptXid = checkPoint.nextXid;
 
+       /*
+        * Initialize replication slots, before there's a chance to remove
+        * required resources.
+        */
+       StartupReplicationSlots();
+
+       /*
+        * Startup logical state, needs to be setup now so we have proper data
+        * during crash recovery.
+        */
+       StartupReorderBuffer();
+
+       /*
+        * Startup MultiXact.  We need to do this early for two reasons: one is
+        * that we might try to access multixacts when we do tuple freezing, and
+        * the other is we need its state initialized because we attempt
+        * truncation during restartpoints.
+        */
+       StartupMultiXact();
+
        /*
         * Initialize unlogged LSN. On a clean shutdown, it's restored from the
         * control file. On recovery, all unlogged relations are blown away, so
@@ -5185,22 +6306,22 @@ StartupXLOG(void)
        ThisTimeLineID = checkPoint.ThisTimeLineID;
 
        /*
-        * Copy any missing timeline history files between 'now' and the
-        * recovery target timeline from archive to pg_xlog. While we don't need
-        * those files ourselves - the history file of the recovery target
-        * timeline covers all the previous timelines in the history too - a
-        * cascading standby server might be interested in them. Or, if you
-        * archive the WAL from this server to a different archive than the
-        * master, it'd be good for all the history files to get archived there
-        * after failover, so that you can use one of the old timelines as a
-        * PITR target. Timeline history files are small, so it's better to copy
-        * them unnecessarily than not copy them and regret later.
+        * Copy any missing timeline history files between 'now' and the recovery
+        * target timeline from archive to pg_xlog. While we don't need those
+        * files ourselves - the history file of the recovery target timeline
+        * covers all the previous timelines in the history too - a cascading
+        * standby server might be interested in them. Or, if you archive the WAL
+        * from this server to a different archive than the master, it'd be good
+        * for all the history files to get archived there after failover, so that
+        * you can use one of the old timelines as a PITR target. Timeline history
+        * files are small, so it's better to copy them unnecessarily than not
+        * copy them and regret later.
         */
        restoreTimeLineHistoryFiles(ThisTimeLineID, recoveryTargetTLI);
 
        lastFullPageWrites = checkPoint.fullPageWrites;
 
-       RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
+       RedoRecPtr = XLogCtl->RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
 
        if (RecPtr < checkPoint.redo)
                ereport(PANIC,
@@ -5248,12 +6369,12 @@ StartupXLOG(void)
                        ereport(LOG,
                                        (errmsg("database system was not properly shut down; "
                                                        "automatic recovery in progress")));
-                       if (recoveryTargetTLI > 0)
+                       if (recoveryTargetTLI > ControlFile->checkPointCopy.ThisTimeLineID)
                                ereport(LOG,
-                                       (errmsg("crash recovery starts in timeline %u "
-                                                       "and has target timeline %u",
-                                                       ControlFile->checkPointCopy.ThisTimeLineID,
-                                                       recoveryTargetTLI)));
+                                               (errmsg("crash recovery starts in timeline %u "
+                                                               "and has target timeline %u",
+                                                               ControlFile->checkPointCopy.ThisTimeLineID,
+                                                               recoveryTargetTLI)));
                        ControlFile->state = DB_IN_CRASH_RECOVERY;
                }
                ControlFile->prevCheckPoint = ControlFile->checkPoint;
@@ -5363,9 +6484,13 @@ StartupXLOG(void)
                                oldestActiveXID = checkPoint.oldestActiveXid;
                        Assert(TransactionIdIsValid(oldestActiveXID));
 
+                       /* Tell procarray about the range of xids it has to deal with */
+                       ProcArrayInitRecovery(ShmemVariableCache->nextXid);
+
                        /*
-                        * Startup commit log and subtrans only. Other SLRUs are not
-                        * maintained during recovery and need not be started yet.
+                        * Startup commit log and subtrans only. MultiXact has already
+                        * been started up and other SLRUs are not maintained during
+                        * recovery and need not be started yet.
                         */
                        StartupCLOG();
                        StartupSUBTRANS(oldestActiveXID);
@@ -5412,22 +6537,18 @@ StartupXLOG(void)
                }
 
                /*
-                * Initialize shared replayEndRecPtr, lastReplayedEndRecPtr, and
-                * recoveryLastXTime.
-                *
-                * This is slightly confusing if we're starting from an online
-                * checkpoint; we've just read and replayed the chekpoint record, but
-                * we're going to start replay from its redo pointer, which precedes
-                * the location of the checkpoint record itself. So even though the
-                * last record we've replayed is indeed ReadRecPtr, we haven't
-                * replayed all the preceding records yet. That's OK for the current
-                * use of these variables.
+                * Initialize shared variables for tracking progress of WAL replay, as
+                * if we had just replayed the record before the REDO location (or the
+                * checkpoint record itself, if it's a shutdown checkpoint).
                 */
                SpinLockAcquire(&xlogctl->info_lck);
-               xlogctl->replayEndRecPtr = ReadRecPtr;
+               if (checkPoint.redo < RecPtr)
+                       xlogctl->replayEndRecPtr = checkPoint.redo;
+               else
+                       xlogctl->replayEndRecPtr = EndRecPtr;
                xlogctl->replayEndTLI = ThisTimeLineID;
-               xlogctl->lastReplayedEndRecPtr = EndRecPtr;
-               xlogctl->lastReplayedTLI = ThisTimeLineID;
+               xlogctl->lastReplayedEndRecPtr = xlogctl->replayEndRecPtr;
+               xlogctl->lastReplayedTLI = xlogctl->replayEndTLI;
                xlogctl->recoveryLastXTime = 0;
                xlogctl->currentChunkStartTime = 0;
                xlogctl->recoveryPause = false;
@@ -5479,8 +6600,6 @@ StartupXLOG(void)
 
                if (record != NULL)
                {
-                       bool            recoveryContinue = true;
-                       bool            recoveryApply = true;
                        ErrorContextCallback errcallback;
                        TimestampTz xtime;
 
@@ -5488,14 +6607,15 @@ StartupXLOG(void)
 
                        ereport(LOG,
                                        (errmsg("redo starts at %X/%X",
-                                                       (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr)));
+                                                (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr)));
 
                        /*
                         * main redo apply loop
                         */
                        do
                        {
-                               bool switchedTLI = false;
+                               bool            switchedTLI = false;
+
 #ifdef WAL_DEBUG
                                if (XLOG_DEBUG ||
                                 (rmid == RM_XACT_ID && trace_recovery_messages <= DEBUG2) ||
@@ -5505,13 +6625,11 @@ StartupXLOG(void)
 
                                        initStringInfo(&buf);
                                        appendStringInfo(&buf, "REDO @ %X/%X; LSN %X/%X: ",
-                                                                        (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr,
-                                                                        (uint32) (EndRecPtr >> 32), (uint32) EndRecPtr);
+                                                       (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr,
+                                                        (uint32) (EndRecPtr >> 32), (uint32) EndRecPtr);
                                        xlog_outrec(&buf, record);
-                                       appendStringInfo(&buf, " - ");
-                                       RmgrTable[record->xl_rmid].rm_desc(&buf,
-                                                                                                          record->xl_info,
-                                                                                                        XLogRecGetData(record));
+                                       appendStringInfoString(&buf, " - ");
+                                       RmgrTable[record->xl_rmid].rm_desc(&buf, record);
                                        elog(LOG, "%s", buf.data);
                                        pfree(buf.data);
                                }
@@ -5539,19 +6657,27 @@ StartupXLOG(void)
                                /*
                                 * Have we reached our recovery target?
                                 */
-                               if (recoveryStopsHere(record, &recoveryApply))
+                               if (recoveryStopsBefore(record))
                                {
-                                       if (recoveryPauseAtTarget)
-                                       {
-                                               SetRecoveryPause(true);
-                                               recoveryPausesHere();
-                                       }
                                        reachedStopPoint = true;        /* see below */
-                                       recoveryContinue = false;
+                                       break;
+                               }
 
-                                       /* Exit loop if we reached non-inclusive recovery target */
-                                       if (!recoveryApply)
-                                               break;
+                               /*
+                                * If we've been asked to lag the master, wait on latch until
+                                * enough time has passed.
+                                */
+                               if (recoveryApplyDelay(record))
+                               {
+                                       /*
+                                        * We test for paused recovery again here. If user sets
+                                        * delayed apply, it may be because they expect to pause
+                                        * recovery in case of problems, so we must test again
+                                        * here otherwise pausing during the delay-wait wouldn't
+                                        * work.
+                                        */
+                                       if (xlogctl->recoveryPause)
+                                               recoveryPausesHere();
                                }
 
                                /* Setup error traceback support for ereport() */
@@ -5577,13 +6703,13 @@ StartupXLOG(void)
                                }
 
                                /*
-                                * Before replaying this record, check if this record
-                                * causes the current timeline to change. The record is
-                                * already considered to be part of the new timeline,
-                                * so we update ThisTimeLineID before replaying it.
-                                * That's important so that replayEndTLI, which is
-                                * recorded as the minimum recovery point's TLI if
-                                * recovery stops after this record, is set correctly.
+                                * Before replaying this record, check if this record causes
+                                * the current timeline to change. The record is already
+                                * considered to be part of the new timeline, so we update
+                                * ThisTimeLineID before replaying it. That's important so
+                                * that replayEndTLI, which is recorded as the minimum
+                                * recovery point's TLI if recovery stops after this record,
+                                * is set correctly.
                                 */
                                if (record->xl_rmid == RM_XLOG_ID)
                                {
@@ -5601,7 +6727,7 @@ StartupXLOG(void)
                                        }
                                        else if (info == XLOG_END_OF_RECOVERY)
                                        {
-                                               xl_end_of_recovery      xlrec;
+                                               xl_end_of_recovery xlrec;
 
                                                memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_end_of_recovery));
                                                newTLI = xlrec.ThisTimeLineID;
@@ -5665,8 +6791,11 @@ StartupXLOG(void)
                                        WalSndWakeup();
 
                                /* Exit loop if we reached inclusive recovery target */
-                               if (!recoveryContinue)
+                               if (recoveryStopsAfter(record))
+                               {
+                                       reachedStopPoint = true;
                                        break;
+                               }
 
                                /* Else, try to fetch the next WAL record */
                                record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
@@ -5676,9 +6805,22 @@ StartupXLOG(void)
                         * end of main redo apply loop
                         */
 
+                       if (recoveryPauseAtTarget && reachedStopPoint)
+                       {
+                               SetRecoveryPause(true);
+                               recoveryPausesHere();
+                       }
+
+                       /* Allow resource managers to do any required cleanup. */
+                       for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
+                       {
+                               if (RmgrTable[rmid].rm_cleanup != NULL)
+                                       RmgrTable[rmid].rm_cleanup();
+                       }
+
                        ereport(LOG,
                                        (errmsg("redo done at %X/%X",
-                                                       (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr)));
+                                                (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr)));
                        xtime = GetLatestXTime();
                        if (xtime)
                                ereport(LOG,
@@ -5769,8 +6911,8 @@ StartupXLOG(void)
        /*
         * Consider whether we need to assign a new timeline ID.
         *
-        * If we are doing an archive recovery, we always assign a new ID.      This
-        * handles a couple of issues.  If we stopped short of the end of WAL
+        * If we are doing an archive recovery, we always assign a new ID.  This
+        * handles a couple of issues.  If we stopped short of the end of WAL
         * during recovery, then we are clearly generating a new timeline and must
         * assign it a unique new ID.  Even if we ran to the end, modifying the
         * current last segment is problematic because it may result in trying to
@@ -5783,7 +6925,7 @@ StartupXLOG(void)
        PrevTimeLineID = ThisTimeLineID;
        if (ArchiveRecoveryRequested)
        {
-               char    reason[200];
+               char            reason[200];
 
                Assert(InArchiveRecovery);
 
@@ -5809,6 +6951,8 @@ StartupXLOG(void)
                        snprintf(reason, sizeof(reason),
                                         "at restore point \"%s\"",
                                         recoveryStopName);
+               else if (recoveryTarget == RECOVERY_TARGET_IMMEDIATE)
+                       snprintf(reason, sizeof(reason), "reached consistency");
                else
                        snprintf(reason, sizeof(reason), "no recovery target specified");
 
@@ -5838,25 +6982,44 @@ StartupXLOG(void)
        openLogFile = XLogFileOpen(openLogSegNo);
        openLogOff = 0;
        Insert = &XLogCtl->Insert;
-       Insert->PrevRecord = LastRec;
-       XLogCtl->xlblocks[0] = ((EndOfLog - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ;
+       Insert->PrevBytePos = XLogRecPtrToBytePos(LastRec);
+       Insert->CurrBytePos = XLogRecPtrToBytePos(EndOfLog);
 
        /*
         * Tricky point here: readBuf contains the *last* block that the LastRec
-        * record spans, not the one it starts in.      The last block is indeed the
+        * record spans, not the one it starts in.  The last block is indeed the
         * one we want to use.
         */
-       if (EndOfLog % XLOG_BLCKSZ == 0)
+       if (EndOfLog % XLOG_BLCKSZ != 0)
        {
-               memset(Insert->currpage, 0, XLOG_BLCKSZ);
+               char       *page;
+               int                     len;
+               int                     firstIdx;
+               XLogRecPtr      pageBeginPtr;
+
+               pageBeginPtr = EndOfLog - (EndOfLog % XLOG_BLCKSZ);
+               Assert(readOff == pageBeginPtr % XLogSegSize);
+
+               firstIdx = XLogRecPtrToBufIdx(EndOfLog);
+
+               /* Copy the valid part of the last block, and zero the rest */
+               page = &XLogCtl->pages[firstIdx * XLOG_BLCKSZ];
+               len = EndOfLog % XLOG_BLCKSZ;
+               memcpy(page, xlogreader->readBuf, len);
+               memset(page + len, 0, XLOG_BLCKSZ - len);
+
+               XLogCtl->xlblocks[firstIdx] = pageBeginPtr + XLOG_BLCKSZ;
+               XLogCtl->InitializedUpTo = pageBeginPtr + XLOG_BLCKSZ;
        }
        else
        {
-               Assert(readOff == (XLogCtl->xlblocks[0] - XLOG_BLCKSZ) % XLogSegSize);
-               memcpy((char *) Insert->currpage, xlogreader->readBuf, XLOG_BLCKSZ);
+               /*
+                * There is no partial block to copy. Just set InitializedUpTo, and
+                * let the first attempt to insert a log record to initialize the next
+                * buffer.
+                */
+               XLogCtl->InitializedUpTo = EndOfLog;
        }
-       Insert->currpos = (char *) Insert->currpage +
-               (EndOfLog + XLOG_BLCKSZ - XLogCtl->xlblocks[0]);
 
        LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
 
@@ -5865,26 +7028,6 @@ StartupXLOG(void)
        XLogCtl->LogwrtRqst.Write = EndOfLog;
        XLogCtl->LogwrtRqst.Flush = EndOfLog;
 
-       freespace = INSERT_FREESPACE(Insert);
-       if (freespace > 0)
-       {
-               /* Make sure rest of page is zero */
-               MemSet(Insert->currpos, 0, freespace);
-               XLogCtl->Write.curridx = 0;
-       }
-       else
-       {
-               /*
-                * Whenever LogwrtResult points to exactly the end of a page,
-                * Write.curridx must point to the *next* page (see XLogWrite()).
-                *
-                * Note: it might seem we should do AdvanceXLInsertBuffer() here, but
-                * this is sufficient.  The first actual attempt to insert a log
-                * record will advance the insert state.
-                */
-               XLogCtl->Write.curridx = NextBufIdx(0);
-       }
-
        /* Pre-scan prepared transactions to find out the range of XIDs present */
        oldestActiveXID = PrescanPreparedTransactions(NULL, NULL);
 
@@ -5900,27 +7043,6 @@ StartupXLOG(void)
 
        if (InRecovery)
        {
-               int                     rmid;
-
-               /*
-                * Resource managers might need to write WAL records, eg, to record
-                * index cleanup actions.  So temporarily enable XLogInsertAllowed in
-                * this process only.
-                */
-               LocalSetXLogInsertAllowed();
-
-               /*
-                * Allow resource managers to do any required cleanup.
-                */
-               for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
-               {
-                       if (RmgrTable[rmid].rm_cleanup != NULL)
-                               RmgrTable[rmid].rm_cleanup();
-               }
-
-               /* Disallow XLogInsert again */
-               LocalXLogInsertAllowed = -1;
-
                /*
                 * Perform a checkpoint to update all our recovery activity to disk.
                 *
@@ -5931,8 +7053,9 @@ StartupXLOG(void)
                 * allows some extra error checking in xlog_redo.
                 *
                 * In fast promotion, only create a lightweight end-of-recovery record
-                * instead of a full checkpoint. A checkpoint is requested later, after
-                * we're fully out of recovery mode and already accepting queries.
+                * instead of a full checkpoint. A checkpoint is requested later,
+                * after we're fully out of recovery mode and already accepting
+                * queries.
                 */
                if (bgwriterLaunched)
                {
@@ -5949,14 +7072,26 @@ StartupXLOG(void)
                                if (record != NULL)
                                {
                                        fast_promoted = true;
+
+                                       /*
+                                        * Insert a special WAL record to mark the end of
+                                        * recovery, since we aren't doing a checkpoint. That
+                                        * means that the checkpointer process may likely be in
+                                        * the middle of a time-smoothed restartpoint and could
+                                        * continue to be for minutes after this. That sounds
+                                        * strange, but the effect is roughly the same and it
+                                        * would be stranger to try to come out of the
+                                        * restartpoint and then checkpoint. We request a
+                                        * checkpoint later anyway, just for safety.
+                                        */
                                        CreateEndOfRecoveryRecord();
                                }
                        }
 
                        if (!fast_promoted)
                                RequestCheckpoint(CHECKPOINT_END_OF_RECOVERY |
-                                                                       CHECKPOINT_IMMEDIATE |
-                                                                       CHECKPOINT_WAIT);
+                                                                 CHECKPOINT_IMMEDIATE |
+                                                                 CHECKPOINT_WAIT);
                }
                else
                        CreateCheckPoint(CHECKPOINT_END_OF_RECOVERY | CHECKPOINT_IMMEDIATE);
@@ -5995,7 +7130,7 @@ StartupXLOG(void)
        LWLockRelease(ControlFileLock);
 
        /* start the archive_timeout timer running */
-       XLogCtl->Write.lastSegSwitchTime = (pg_time_t) time(NULL);
+       XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);
 
        /* also initialize latestCompletedXid, to nextXid - 1 */
        LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
@@ -6016,8 +7151,8 @@ StartupXLOG(void)
        /*
         * Perform end of recovery actions for any SLRUs that need it.
         */
-       StartupMultiXact();
        TrimCLOG();
+       TrimMultiXact();
 
        /* Reload shared-memory state for prepared transactions */
        RecoverPreparedTransactions();
@@ -6045,7 +7180,7 @@ StartupXLOG(void)
        XLogReportParameters();
 
        /*
-        * All done.  Allow backends to write WAL.      (Although the bool flag is
+        * All done.  Allow backends to write WAL.  (Although the bool flag is
         * probably atomic in itself, we use the info_lck here to ensure that
         * there are no race conditions concerning visibility of other recent
         * updates to shared memory.)
@@ -6060,8 +7195,8 @@ StartupXLOG(void)
        }
 
        /*
-        * If there were cascading standby servers connected to us, nudge any
-        * wal sender processes to notice that we've been promoted.
+        * If there were cascading standby servers connected to us, nudge any wal
+        * sender processes to notice that we've been promoted.
         */
        WalSndWakeup();
 
@@ -6072,7 +7207,7 @@ StartupXLOG(void)
         * than is appropriate now that we're not in standby mode anymore.
         */
        if (fast_promoted)
-               RequestCheckpoint(0);
+               RequestCheckpoint(CHECKPOINT_FORCE);
 }
 
 /*
@@ -6083,6 +7218,8 @@ StartupXLOG(void)
 static void
 CheckRecoveryConsistency(void)
 {
+       XLogRecPtr      lastReplayedEndRecPtr;
+
        /*
         * During crash recovery, we don't reach a consistent state until we've
         * replayed all the WAL.
@@ -6090,11 +7227,17 @@ CheckRecoveryConsistency(void)
        if (XLogRecPtrIsInvalid(minRecoveryPoint))
                return;
 
+       /*
+        * assume that we are called in the startup process, and hence don't need
+        * a lock to read lastReplayedEndRecPtr
+        */
+       lastReplayedEndRecPtr = XLogCtl->lastReplayedEndRecPtr;
+
        /*
         * Have we reached the point where our base backup was completed?
         */
        if (!XLogRecPtrIsInvalid(ControlFile->backupEndPoint) &&
-               ControlFile->backupEndPoint <= EndRecPtr)
+               ControlFile->backupEndPoint <= lastReplayedEndRecPtr)
        {
                /*
                 * We have reached the end of base backup, as indicated by pg_control.
@@ -6107,8 +7250,8 @@ CheckRecoveryConsistency(void)
 
                LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
 
-               if (ControlFile->minRecoveryPoint < EndRecPtr)
-                       ControlFile->minRecoveryPoint = EndRecPtr;
+               if (ControlFile->minRecoveryPoint < lastReplayedEndRecPtr)
+                       ControlFile->minRecoveryPoint = lastReplayedEndRecPtr;
 
                ControlFile->backupStartPoint = InvalidXLogRecPtr;
                ControlFile->backupEndPoint = InvalidXLogRecPtr;
@@ -6119,14 +7262,14 @@ CheckRecoveryConsistency(void)
        }
 
        /*
-        * Have we passed our safe starting point? Note that minRecoveryPoint
-        * is known to be incorrectly set if ControlFile->backupEndRequired,
-        * until the XLOG_BACKUP_RECORD arrives to advise us of the correct
+        * Have we passed our safe starting point? Note that minRecoveryPoint is
+        * known to be incorrectly set if ControlFile->backupEndRequired, until
+        * the XLOG_BACKUP_RECORD arrives to advise us of the correct
         * minRecoveryPoint. All we know prior to that is that we're not
         * consistent yet.
         */
        if (!reachedConsistency && !ControlFile->backupEndRequired &&
-               minRecoveryPoint <= XLogCtl->lastReplayedEndRecPtr &&
+               minRecoveryPoint <= lastReplayedEndRecPtr &&
                XLogRecPtrIsInvalid(ControlFile->backupStartPoint))
        {
                /*
@@ -6138,8 +7281,8 @@ CheckRecoveryConsistency(void)
                reachedConsistency = true;
                ereport(LOG,
                                (errmsg("consistent recovery state reached at %X/%X",
-                                               (uint32) (XLogCtl->lastReplayedEndRecPtr >> 32),
-                                               (uint32) XLogCtl->lastReplayedEndRecPtr)));
+                                               (uint32) (lastReplayedEndRecPtr >> 32),
+                                               (uint32) lastReplayedEndRecPtr)));
        }
 
        /*
@@ -6186,22 +7329,36 @@ RecoveryInProgress(void)
                return false;
        else
        {
-               /* use volatile pointer to prevent code rearrangement */
+               /*
+                * use volatile pointer to make sure we make a fresh read of the
+                * shared variable.
+                */
                volatile XLogCtlData *xlogctl = XLogCtl;
 
-               /* spinlock is essential on machines with weak memory ordering! */
-               SpinLockAcquire(&xlogctl->info_lck);
                LocalRecoveryInProgress = xlogctl->SharedRecoveryInProgress;
-               SpinLockRelease(&xlogctl->info_lck);
 
                /*
                 * Initialize TimeLineID and RedoRecPtr when we discover that recovery
                 * is finished. InitPostgres() relies upon this behaviour to ensure
-                * that InitXLOGAccess() is called at backend startup.  (If you change
+                * that InitXLOGAccess() is called at backend startup.  (If you change
                 * this, see also LocalSetXLogInsertAllowed.)
                 */
                if (!LocalRecoveryInProgress)
+               {
+                       /*
+                        * If we just exited recovery, make sure we read TimeLineID and
+                        * RedoRecPtr after SharedRecoveryInProgress (for machines with
+                        * weak memory ordering).
+                        */
+                       pg_memory_barrier();
                        InitXLOGAccess();
+               }
+
+               /*
+                * Note: We don't need a memory barrier when we're still in recovery.
+                * We might exit recovery immediately after return, so the caller
+                * can't rely on 'true' meaning that we're still in recovery anyway.
+                */
 
                return LocalRecoveryInProgress;
        }
@@ -6213,7 +7370,8 @@ RecoveryInProgress(void)
  * true. Postmaster knows this by way of signal, not via shared memory.
  *
  * Unlike testing standbyState, this works in any process that's connected to
- * shared memory.
+ * shared memory.  (And note that standbyState alone doesn't tell the truth
+ * anyway.)
  */
 bool
 HotStandbyActive(void)
@@ -6239,6 +7397,17 @@ HotStandbyActive(void)
        }
 }
 
+/*
+ * Like HotStandbyActive(), but to be used only in WAL replay code,
+ * where we don't need to ask any other process what the state is.
+ */
+bool
+HotStandbyActiveInReplay(void)
+{
+       Assert(AmStartupProcess());
+       return LocalHotStandbyActive;
+}
+
 /*
  * Is this process allowed to insert new WAL records?
  *
@@ -6425,26 +7594,39 @@ InitXLOGAccess(void)
        ThisTimeLineID = XLogCtl->ThisTimeLineID;
        Assert(ThisTimeLineID != 0 || IsBootstrapProcessingMode());
 
+       /* Initialize our copy of WALInsertLocks and register the tranche */
+       WALInsertLocks = XLogCtl->Insert.WALInsertLocks;
+       LWLockRegisterTranche(XLogCtl->Insert.WALInsertLockTrancheId,
+                                                 &XLogCtl->Insert.WALInsertLockTranche);
+
        /* Use GetRedoRecPtr to copy the RedoRecPtr safely */
        (void) GetRedoRecPtr();
 }
 
 /*
- * Once spawned, a backend may update its local RedoRecPtr from
- * XLogCtl->Insert.RedoRecPtr; it must hold the insert lock or info_lck
- * to do so.  This is done in XLogInsert() or GetRedoRecPtr().
+ * Return the current Redo pointer from shared memory.
+ *
+ * As a side-effect, the local RedoRecPtr copy is updated.
  */
 XLogRecPtr
 GetRedoRecPtr(void)
 {
        /* use volatile pointer to prevent code rearrangement */
        volatile XLogCtlData *xlogctl = XLogCtl;
+       XLogRecPtr      ptr;
 
+       /*
+        * The possibly not up-to-date copy in XlogCtl is enough. Even if we
+        * grabbed a WAL insertion lock to read the master copy, someone might
+        * update it just after we've released the lock.
+        */
        SpinLockAcquire(&xlogctl->info_lck);
-       Assert(RedoRecPtr <= xlogctl->Insert.RedoRecPtr);
-       RedoRecPtr = xlogctl->Insert.RedoRecPtr;
+       ptr = xlogctl->RedoRecPtr;
        SpinLockRelease(&xlogctl->info_lck);
 
+       if (RedoRecPtr < ptr)
+               RedoRecPtr = ptr;
+
        return RedoRecPtr;
 }
 
@@ -6453,9 +7635,8 @@ GetRedoRecPtr(void)
  *
  * NOTE: The value *actually* returned is the position of the last full
  * xlog page. It lags behind the real insert position by at most 1 page.
- * For that, we don't need to acquire WALInsertLock which can be quite
- * heavily contended, and an approximation is enough for the current
- * usage of this function.
+ * For that, we don't need to scan through WAL insertion locks, and an
+ * approximation is enough for the current usage of this function.
  */
 XLogRecPtr
 GetInsertRecPtr(void)
@@ -6499,7 +7680,7 @@ GetLastSegSwitchTime(void)
 
        /* Need WALWriteLock, but shared lock is sufficient */
        LWLockAcquire(WALWriteLock, LW_SHARED);
-       result = XLogCtl->Write.lastSegSwitchTime;
+       result = XLogCtl->lastSegSwitchTime;
        LWLockRelease(WALWriteLock);
 
        return result;
@@ -6551,7 +7732,8 @@ GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
 void
 ShutdownXLOG(int code, Datum arg)
 {
-       ereport(LOG,
+       /* Don't be chatty in standalone mode */
+       ereport(IsPostmasterEnvironment ? LOG : NOTICE,
                        (errmsg("shutting down")));
 
        if (RecoveryInProgress())
@@ -6573,7 +7755,8 @@ ShutdownXLOG(int code, Datum arg)
        ShutdownSUBTRANS();
        ShutdownMultiXact();
 
-       ereport(LOG,
+       /* Don't be chatty in standalone mode */
+       ereport(IsPostmasterEnvironment ? LOG : NOTICE,
                        (errmsg("database system is shut down")));
 }
 
@@ -6730,6 +7913,8 @@ LogCheckpointEnd(bool restartpoint)
 void
 CreateCheckPoint(int flags)
 {
+       /* use volatile pointer to prevent code rearrangement */
+       volatile XLogCtlData *xlogctl = XLogCtl;
        bool            shutdown;
        CheckPoint      checkPoint;
        XLogRecPtr      recptr;
@@ -6737,8 +7922,9 @@ CreateCheckPoint(int flags)
        XLogRecData rdata;
        uint32          freespace;
        XLogSegNo       _logSegNo;
+       XLogRecPtr      curInsert;
        VirtualTransactionId *vxids;
-       int     nvxids;
+       int                     nvxids;
 
        /*
         * An end-of-recovery checkpoint is really a shutdown checkpoint, just
@@ -6807,15 +7993,16 @@ CreateCheckPoint(int flags)
                checkPoint.oldestActiveXid = InvalidTransactionId;
 
        /*
-        * We must hold WALInsertLock while examining insert state to determine
-        * the checkpoint REDO pointer.
+        * We must block concurrent insertions while examining insert state to
+        * determine the checkpoint REDO pointer.
         */
-       LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+       WALInsertLockAcquireExclusive();
+       curInsert = XLogBytePosToRecPtr(Insert->CurrBytePos);
 
        /*
         * If this isn't a shutdown or forced checkpoint, and we have not inserted
         * any XLOG records since the start of the last checkpoint, skip the
-        * checkpoint.  The idea here is to avoid inserting duplicate checkpoints
+        * checkpoint.  The idea here is to avoid inserting duplicate checkpoints
         * when the system is idle. That wastes log space, and more importantly it
         * exposes us to possible loss of both current and previous checkpoint
         * records if the machine crashes just as we're writing the update.
@@ -6830,14 +8017,11 @@ CreateCheckPoint(int flags)
        if ((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY |
                                  CHECKPOINT_FORCE)) == 0)
        {
-               XLogRecPtr      curInsert;
-
-               INSERT_RECPTR(curInsert, Insert, Insert->curridx);
-               if (curInsert == ControlFile->checkPoint + 
+               if (curInsert == ControlFile->checkPoint +
                        MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
                        ControlFile->checkPoint == ControlFile->checkPointCopy.redo)
                {
-                       LWLockRelease(WALInsertLock);
+                       WALInsertLockRelease();
                        LWLockRelease(CheckpointLock);
                        END_CRIT_SECTION();
                        return;
@@ -6869,18 +8053,19 @@ CreateCheckPoint(int flags)
         * the buffer flush work.  Those XLOG records are logically after the
         * checkpoint, even though physically before it.  Got that?
         */
-       freespace = INSERT_FREESPACE(Insert);
+       freespace = INSERT_FREESPACE(curInsert);
        if (freespace == 0)
        {
-               (void) AdvanceXLInsertBuffer(false);
-               /* OK to ignore update return flag, since we will do flush anyway */
-               freespace = INSERT_FREESPACE(Insert);
+               if (curInsert % XLogSegSize == 0)
+                       curInsert += SizeOfXLogLongPHD;
+               else
+                       curInsert += SizeOfXLogShortPHD;
        }
-       INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx);
+       checkPoint.redo = curInsert;
 
        /*
         * Here we update the shared RedoRecPtr for future XLogInsert calls; this
-        * must be done while holding the insert lock AND the info_lck.
+        * must be done while holding all the insertion locks.
         *
         * Note: if we fail to complete the checkpoint, RedoRecPtr will be left
         * pointing past where it really needs to point.  This is okay; the only
@@ -6889,20 +8074,18 @@ CreateCheckPoint(int flags)
         * XLogInserts that happen while we are dumping buffers must assume that
         * their buffer changes are not included in the checkpoint.
         */
-       {
-               /* use volatile pointer to prevent code rearrangement */
-               volatile XLogCtlData *xlogctl = XLogCtl;
-
-               SpinLockAcquire(&xlogctl->info_lck);
-               RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
-               SpinLockRelease(&xlogctl->info_lck);
-       }
+       RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
 
        /*
-        * Now we can release WAL insert lock, allowing other xacts to proceed
-        * while we are flushing disk buffers.
+        * Now we can release the WAL insertion locks, allowing other xacts to
+        * proceed while we are flushing disk buffers.
         */
-       LWLockRelease(WALInsertLock);
+       WALInsertLockRelease();
+
+       /* Update the info_lck-protected copy of RedoRecPtr as well */
+       SpinLockAcquire(&xlogctl->info_lck);
+       xlogctl->RedoRecPtr = checkPoint.redo;
+       SpinLockRelease(&xlogctl->info_lck);
 
        /*
         * If enabled, log checkpoint start.  We postpone this until now so as not
@@ -6913,48 +8096,6 @@ CreateCheckPoint(int flags)
 
        TRACE_POSTGRESQL_CHECKPOINT_START(flags);
 
-       /*
-        * In some cases there are groups of actions that must all occur on
-        * one side or the other of a checkpoint record. Before flushing the
-        * checkpoint record we must explicitly wait for any backend currently
-        * performing those groups of actions.
-        *
-        * One example is end of transaction, so we must wait for any transactions
-        * that are currently in commit critical sections.  If an xact inserted
-        * its commit record into XLOG just before the REDO point, then a crash
-        * restart from the REDO point would not replay that record, which means
-        * that our flushing had better include the xact's update of pg_clog.  So
-        * we wait till he's out of his commit critical section before proceeding.
-        * See notes in RecordTransactionCommit().
-        *
-        * Because we've already released WALInsertLock, this test is a bit fuzzy:
-        * it is possible that we will wait for xacts we didn't really need to
-        * wait for.  But the delay should be short and it seems better to make
-        * checkpoint take a bit longer than to hold locks longer than necessary.
-        * (In fact, the whole reason we have this issue is that xact.c does
-        * commit record XLOG insertion and clog update as two separate steps
-        * protected by different locks, but again that seems best on grounds of
-        * minimizing lock contention.)
-        *
-        * A transaction that has not yet set delayChkpt when we look cannot be at
-        * risk, since he's not inserted his commit record yet; and one that's
-        * already cleared it is not at risk either, since he's done fixing clog
-        * and we will correctly flush the update below.  So we cannot miss any
-        * xacts we need to wait for.
-        */
-       vxids = GetVirtualXIDsDelayingChkpt(&nvxids);
-       if (nvxids > 0)
-       {
-               uint32  nwaits = 0;
-
-               do
-               {
-                       pg_usleep(10000L);      /* wait for 10 msec */
-                       nwaits++;
-               } while (HaveVirtualXIDsDelayingChkpt(vxids, nvxids));
-       }
-       pfree(vxids);
-
        /*
         * Get the other info we need for the checkpoint record.
         */
@@ -6991,6 +8132,45 @@ CreateCheckPoint(int flags)
         */
        END_CRIT_SECTION();
 
+       /*
+        * In some cases there are groups of actions that must all occur on one
+        * side or the other of a checkpoint record. Before flushing the
+        * checkpoint record we must explicitly wait for any backend currently
+        * performing those groups of actions.
+        *
+        * One example is end of transaction, so we must wait for any transactions
+        * that are currently in commit critical sections.  If an xact inserted
+        * its commit record into XLOG just before the REDO point, then a crash
+        * restart from the REDO point would not replay that record, which means
+        * that our flushing had better include the xact's update of pg_clog.  So
+        * we wait till he's out of his commit critical section before proceeding.
+        * See notes in RecordTransactionCommit().
+        *
+        * Because we've already released the insertion locks, this test is a bit
+        * fuzzy: it is possible that we will wait for xacts we didn't really need
+        * to wait for.  But the delay should be short and it seems better to make
+        * checkpoint take a bit longer than to hold off insertions longer than
+        * necessary. (In fact, the whole reason we have this issue is that xact.c
+        * does commit record XLOG insertion and clog update as two separate steps
+        * protected by different locks, but again that seems best on grounds of
+        * minimizing lock contention.)
+        *
+        * A transaction that has not yet set delayChkpt when we look cannot be at
+        * risk, since he's not inserted his commit record yet; and one that's
+        * already cleared it is not at risk either, since he's done fixing clog
+        * and we will correctly flush the update below.  So we cannot miss any
+        * xacts we need to wait for.
+        */
+       vxids = GetVirtualXIDsDelayingChkpt(&nvxids);
+       if (nvxids > 0)
+       {
+               do
+               {
+                       pg_usleep(10000L);      /* wait for 10 msec */
+               } while (HaveVirtualXIDsDelayingChkpt(vxids, nvxids));
+       }
+       pfree(vxids);
+
        CheckPointGuts(checkPoint.redo, flags);
 
        /*
@@ -7093,6 +8273,12 @@ CreateCheckPoint(int flags)
         */
        END_CRIT_SECTION();
 
+       /*
+        * Now that the checkpoint is safely on disk, we can update the point to
+        * which multixact can be truncated.
+        */
+       MultiXactSetSafeTruncate(checkPoint.oldestMulti);
+
        /*
         * Let smgr do post-checkpoint cleanup (eg, deleting old files).
         */
@@ -7118,13 +8304,18 @@ CreateCheckPoint(int flags)
 
        /*
         * Truncate pg_subtrans if possible.  We can throw away all data before
-        * the oldest XMIN of any running transaction.  No future transaction will
+        * the oldest XMIN of any running transaction.  No future transaction will
         * attempt to reference any pg_subtrans entry older than that (see Asserts
-        * in subtrans.c).      During recovery, though, we mustn't do this because
+        * in subtrans.c).  During recovery, though, we mustn't do this because
         * StartupSUBTRANS hasn't been called yet.
         */
        if (!RecoveryInProgress())
-               TruncateSUBTRANS(GetOldestXmin(true, false));
+               TruncateSUBTRANS(GetOldestXmin(NULL, false));
+
+       /*
+        * Truncate pg_multixact too.
+        */
+       TruncateMultiXact();
 
        /* Real work is done, but log and update stats before releasing lock. */
        LogCheckpointEnd(false);
@@ -7147,12 +8338,12 @@ CreateCheckPoint(int flags)
  * CreateRestartPoint() allows for the case where recovery may end before
  * the restartpoint completes so there is no concern of concurrent behaviour.
  */
-void
+static void
 CreateEndOfRecoveryRecord(void)
 {
-       xl_end_of_recovery      xlrec;
-       XLogRecData                     rdata;
-       XLogRecPtr                      recptr;
+       xl_end_of_recovery xlrec;
+       XLogRecData rdata;
+       XLogRecPtr      recptr;
 
        /* sanity check */
        if (!RecoveryInProgress())
@@ -7160,10 +8351,10 @@ CreateEndOfRecoveryRecord(void)
 
        xlrec.end_time = time(NULL);
 
-       LWLockAcquire(WALInsertLock, LW_SHARED);
+       WALInsertLockAcquireExclusive();
        xlrec.ThisTimeLineID = ThisTimeLineID;
        xlrec.PrevTimeLineID = XLogCtl->PrevTimeLineID;
-       LWLockRelease(WALInsertLock);
+       WALInsertLockRelease();
 
        LocalSetXLogInsertAllowed();
 
@@ -7179,8 +8370,8 @@ CreateEndOfRecoveryRecord(void)
        XLogFlush(recptr);
 
        /*
-        * Update the control file so that crash recovery can follow
-        * the timeline changes to this point.
+        * Update the control file so that crash recovery can follow the timeline
+        * changes to this point.
         */
        LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
        ControlFile->time = (pg_time_t) xlrec.end_time;
@@ -7191,7 +8382,7 @@ CreateEndOfRecoveryRecord(void)
 
        END_CRIT_SECTION();
 
-       LocalXLogInsertAllowed = -1;            /* return to "check" state */
+       LocalXLogInsertAllowed = -1;    /* return to "check" state */
 }
 
 /*
@@ -7208,6 +8399,9 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
        CheckPointMultiXact();
        CheckPointPredicate();
        CheckPointRelationMap();
+       CheckPointReplicationSlots();
+       CheckPointSnapBuild();
+       CheckPointLogicalRewriteHeap();
        CheckPointBuffers(flags);       /* performs all required fsyncs */
        /* We deliberately delay 2PC checkpointing as long as possible */
        CheckPointTwoPhase(checkPointRedo);
@@ -7226,31 +8420,9 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
 static void
 RecoveryRestartPoint(const CheckPoint *checkPoint)
 {
-       int                     rmid;
-
        /* use volatile pointer to prevent code rearrangement */
        volatile XLogCtlData *xlogctl = XLogCtl;
 
-       /*
-        * Is it safe to restartpoint?  We must ask each of the resource managers
-        * whether they have any partial state information that might prevent a
-        * correct restart from this point.  If so, we skip this opportunity, but
-        * return at the next checkpoint record for another try.
-        */
-       for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
-       {
-               if (RmgrTable[rmid].rm_safe_restartpoint != NULL)
-                       if (!(RmgrTable[rmid].rm_safe_restartpoint()))
-                       {
-                               elog(trace_recovery(DEBUG2),
-                                        "RM %d not safe to record restart point at %X/%X",
-                                        rmid,
-                                        (uint32) (checkPoint->redo >> 32),
-                                        (uint32) checkPoint->redo);
-                               return;
-                       }
-       }
-
        /*
         * Also refrain from creating a restartpoint if we have seen any
         * references to non-existent pages. Restarting recovery from the
@@ -7343,7 +8515,8 @@ CreateRestartPoint(int flags)
        {
                ereport(DEBUG2,
                                (errmsg("skipping restartpoint, already performed at %X/%X",
-                                               (uint32) (lastCheckPoint.redo >> 32), (uint32) lastCheckPoint.redo)));
+                                               (uint32) (lastCheckPoint.redo >> 32),
+                                               (uint32) lastCheckPoint.redo)));
 
                UpdateMinRecoveryPoint(InvalidXLogRecPtr, true);
                if (flags & CHECKPOINT_IS_SHUTDOWN)
@@ -7363,15 +8536,18 @@ CreateRestartPoint(int flags)
         * the number of segments replayed since last restartpoint, and request a
         * restartpoint if it exceeds checkpoint_segments.
         *
-        * You need to hold WALInsertLock and info_lck to update it, although
-        * during recovery acquiring WALInsertLock is just pro forma, because
-        * there is no other processes updating Insert.RedoRecPtr.
+        * Like in CreateCheckPoint(), hold off insertions to update it, although
+        * during recovery this is just pro forma, because no WAL insertions are
+        * happening.
         */
-       LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
-       SpinLockAcquire(&xlogctl->info_lck);
+       WALInsertLockAcquireExclusive();
        xlogctl->Insert.RedoRecPtr = lastCheckPoint.redo;
+       WALInsertLockRelease();
+
+       /* Also update the info_lck-protected copy */
+       SpinLockAcquire(&xlogctl->info_lck);
+       xlogctl->RedoRecPtr = lastCheckPoint.redo;
        SpinLockRelease(&xlogctl->info_lck);
-       LWLockRelease(WALInsertLock);
 
        /*
         * Prepare to accumulate statistics.
@@ -7423,21 +8599,26 @@ CreateRestartPoint(int flags)
        {
                XLogRecPtr      receivePtr;
                XLogRecPtr      replayPtr;
+               TimeLineID      replayTLI;
                XLogRecPtr      endptr;
 
                /*
-                * Get the current end of xlog replayed or received, whichever is later.
+                * Get the current end of xlog replayed or received, whichever is
+                * later.
                 */
                receivePtr = GetWalRcvWriteRecPtr(NULL, NULL);
-               replayPtr = GetXLogReplayRecPtr(NULL);
+               replayPtr = GetXLogReplayRecPtr(&replayTLI);
                endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
 
                KeepLogSeg(endptr, &_logSegNo);
                _logSegNo--;
 
                /*
-                * Update ThisTimeLineID to the timeline we're currently replaying,
-                * so that we install any recycled segments on that timeline.
+                * Try to recycle segments on a useful timeline. If we've been
+                * promoted since the beginning of this restartpoint, use the new
+                * timeline chosen at end of recovery (RecoveryInProgress() sets
+                * ThisTimeLineID in that case). If we're still in recovery, use the
+                * timeline we're currently replaying.
                 *
                 * There is no guarantee that the WAL segments will be useful on the
                 * current timeline; if recovery proceeds to a new timeline right
@@ -7445,7 +8626,8 @@ CreateRestartPoint(int flags)
                 * not be used, and will go wasted until recycled on the next
                 * restartpoint. We'll live with that.
                 */
-               (void) GetXLogReplayRecPtr(&ThisTimeLineID);
+               if (RecoveryInProgress())
+                       ThisTimeLineID = replayTLI;
 
                RemoveOldXlogFiles(_logSegNo, endptr);
 
@@ -7454,17 +8636,42 @@ CreateRestartPoint(int flags)
                 * segments, since that may supply some of the needed files.)
                 */
                PreallocXlogFiles(endptr);
+
+               /*
+                * ThisTimeLineID is normally not set when we're still in recovery.
+                * However, recycling/preallocating segments above needed
+                * ThisTimeLineID to determine which timeline to install the segments
+                * on. Reset it now, to restore the normal state of affairs for
+                * debugging purposes.
+                */
+               if (RecoveryInProgress())
+                       ThisTimeLineID = 0;
        }
 
+       /*
+        * Due to an historical accident multixact truncations are not WAL-logged,
+        * but just performed everytime the mxact horizon is increased. So, unless
+        * we explicitly execute truncations on a standby it will never clean out
+        * /pg_multixact which obviously is bad, both because it uses space and
+        * because we can wrap around into pre-existing data...
+        *
+        * We can only do the truncation here, after the UpdateControlFile()
+        * above, because we've now safely established a restart point.  That
+        * guarantees we will not need to access those multis.
+        *
+        * It's probably worth improving this.
+        */
+       TruncateMultiXact();
+
        /*
         * Truncate pg_subtrans if possible.  We can throw away all data before
-        * the oldest XMIN of any running transaction.  No future transaction will
+        * the oldest XMIN of any running transaction.  No future transaction will
         * attempt to reference any pg_subtrans entry older than that (see Asserts
-        * in subtrans.c).      When hot standby is disabled, though, we mustn't do
+        * in subtrans.c).  When hot standby is disabled, though, we mustn't do
         * this because StartupSUBTRANS hasn't been called yet.
         */
        if (EnableHotStandby)
-               TruncateSUBTRANS(GetOldestXmin(true, false));
+               TruncateSUBTRANS(GetOldestXmin(NULL, false));
 
        /* Real work is done, but log and update before releasing lock. */
        LogCheckpointEnd(true);
@@ -7472,7 +8679,7 @@ CreateRestartPoint(int flags)
        xtime = GetLatestXTime();
        ereport((log_checkpoints ? LOG : DEBUG2),
                        (errmsg("recovery restart point at %X/%X",
-                                       (uint32) (lastCheckPoint.redo >> 32), (uint32) lastCheckPoint.redo),
+                (uint32) (lastCheckPoint.redo >> 32), (uint32) lastCheckPoint.redo),
                   xtime ? errdetail("last completed transaction was at log time %s",
                                                         timestamptz_to_str(xtime)) : 0));
 
@@ -7490,25 +8697,44 @@ CreateRestartPoint(int flags)
 }
 
 /*
- * Calculate the last segment that we need to retain because of
- * wal_keep_segments, by subtracting wal_keep_segments from
- * the given xlog location, recptr.
+ * Retreat *logSegNo to the last segment that we need to retain because of
+ * either wal_keep_segments or replication slots.
+ *
+ * This is calculated by subtracting wal_keep_segments from the given xlog
+ * location, recptr and by making sure that that result is below the
+ * requirement of replication slots.
  */
 static void
 KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 {
        XLogSegNo       segno;
-
-       if (wal_keep_segments == 0)
-               return;
+       XLogRecPtr      keep;
 
        XLByteToSeg(recptr, segno);
+       keep = XLogGetReplicationSlotMinimumLSN();
 
-       /* avoid underflow, don't go below 1 */
-       if (segno <= wal_keep_segments)
-               segno = 1;
-       else
-               segno = *logSegNo - wal_keep_segments;
+       /* compute limit for wal_keep_segments first */
+       if (wal_keep_segments > 0)
+       {
+               /* avoid underflow, don't go below 1 */
+               if (segno <= wal_keep_segments)
+                       segno = 1;
+               else
+                       segno = segno - wal_keep_segments;
+       }
+
+       /* then check whether slots limit removal further */
+       if (max_replication_slots > 0 && keep != InvalidXLogRecPtr)
+       {
+               XLogRecPtr      slotSegNo;
+
+               XLByteToSeg(keep, slotSegNo);
+
+               if (slotSegNo <= 0)
+                       segno = 1;
+               else if (slotSegNo < segno)
+                       segno = slotSegNo;
+       }
 
        /* don't delete WAL segments newer than the calculated segment */
        if (segno < *logSegNo)
@@ -7533,7 +8759,7 @@ XLogPutNextOid(Oid nextOid)
         * We need not flush the NEXTOID record immediately, because any of the
         * just-allocated OIDs could only reach disk as part of a tuple insert or
         * update that would have its own XLOG record that must follow the NEXTOID
-        * record.      Therefore, the standard buffer LSN interlock applied to those
+        * record.  Therefore, the standard buffer LSN interlock applied to those
         * records will ensure no such OID reaches disk before the NEXTOID record
         * does.
         *
@@ -7587,7 +8813,7 @@ XLogRestorePoint(const char *rpName)
        xl_restore_point xlrec;
 
        xlrec.rp_time = GetCurrentTimestamp();
-       strncpy(xlrec.rp_name, rpName, MAXFNAMELEN);
+       strlcpy(xlrec.rp_name, rpName, MAXFNAMELEN);
 
        rdata.buffer = InvalidBuffer;
        rdata.data = (char *) &xlrec;
@@ -7603,6 +8829,96 @@ XLogRestorePoint(const char *rpName)
        return RecPtr;
 }
 
+/*
+ * Write a backup block if needed when we are setting a hint. Note that
+ * this may be called for a variety of page types, not just heaps.
+ *
+ * Callable while holding just share lock on the buffer content.
+ *
+ * We can't use the plain backup block mechanism since that relies on the
+ * Buffer being exclusively locked. Since some modifications (setting LSN, hint
+ * bits) are allowed in a sharelocked buffer that can lead to wal checksum
+ * failures. So instead we copy the page and insert the copied data as normal
+ * record data.
+ *
+ * We only need to do something if page has not yet been full page written in
+ * this checkpoint round. The LSN of the inserted wal record is returned if we
+ * had to write, InvalidXLogRecPtr otherwise.
+ *
+ * It is possible that multiple concurrent backends could attempt to write WAL
+ * records. In that case, multiple copies of the same block would be recorded
+ * in separate WAL records by different backends, though that is still OK from
+ * a correctness perspective.
+ */
+XLogRecPtr
+XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
+{
+       XLogRecPtr      recptr = InvalidXLogRecPtr;
+       XLogRecPtr      lsn;
+       XLogRecData rdata[2];
+       BkpBlock        bkpb;
+
+       /*
+        * Ensure no checkpoint can change our view of RedoRecPtr.
+        */
+       Assert(MyPgXact->delayChkpt);
+
+       /*
+        * Update RedoRecPtr so XLogCheckBuffer can make the right decision
+        */
+       GetRedoRecPtr();
+
+       /*
+        * Setup phony rdata element for use within XLogCheckBuffer only. We reuse
+        * and reset rdata for any actual WAL record insert.
+        */
+       rdata[0].buffer = buffer;
+       rdata[0].buffer_std = buffer_std;
+
+       /*
+        * Check buffer while not holding an exclusive lock.
+        */
+       if (XLogCheckBuffer(rdata, false, &lsn, &bkpb))
+       {
+               char            copied_buffer[BLCKSZ];
+               char       *origdata = (char *) BufferGetBlock(buffer);
+
+               /*
+                * Copy buffer so we don't have to worry about concurrent hint bit or
+                * lsn updates. We assume pd_lower/upper cannot be changed without an
+                * exclusive lock, so the contents bkp are not racy.
+                *
+                * With buffer_std set to false, XLogCheckBuffer() sets hole_length
+                * and hole_offset to 0; so the following code is safe for either
+                * case.
+                */
+               memcpy(copied_buffer, origdata, bkpb.hole_offset);
+               memcpy(copied_buffer + bkpb.hole_offset,
+                          origdata + bkpb.hole_offset + bkpb.hole_length,
+                          BLCKSZ - bkpb.hole_offset - bkpb.hole_length);
+
+               /*
+                * Header for backup block.
+                */
+               rdata[0].data = (char *) &bkpb;
+               rdata[0].len = sizeof(BkpBlock);
+               rdata[0].buffer = InvalidBuffer;
+               rdata[0].next = &(rdata[1]);
+
+               /*
+                * Save copy of the buffer.
+                */
+               rdata[1].data = copied_buffer;
+               rdata[1].len = BLCKSZ - bkpb.hole_length;
+               rdata[1].buffer = InvalidBuffer;
+               rdata[1].next = NULL;
+
+               recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI, rdata);
+       }
+
+       return recptr;
+}
+
 /*
  * Check if any of the GUC parameters that are critical for hot standby
  * have changed, and update the value in pg_control file if necessary.
@@ -7611,7 +8927,9 @@ static void
 XLogReportParameters(void)
 {
        if (wal_level != ControlFile->wal_level ||
+               wal_log_hints != ControlFile->wal_log_hints ||
                MaxConnections != ControlFile->MaxConnections ||
+               max_worker_processes != ControlFile->max_worker_processes ||
                max_prepared_xacts != ControlFile->max_prepared_xacts ||
                max_locks_per_xact != ControlFile->max_locks_per_xact)
        {
@@ -7626,24 +8944,30 @@ XLogReportParameters(void)
                {
                        XLogRecData rdata;
                        xl_parameter_change xlrec;
+                       XLogRecPtr      recptr;
 
                        xlrec.MaxConnections = MaxConnections;
+                       xlrec.max_worker_processes = max_worker_processes;
                        xlrec.max_prepared_xacts = max_prepared_xacts;
                        xlrec.max_locks_per_xact = max_locks_per_xact;
                        xlrec.wal_level = wal_level;
+                       xlrec.wal_log_hints = wal_log_hints;
 
                        rdata.buffer = InvalidBuffer;
                        rdata.data = (char *) &xlrec;
                        rdata.len = sizeof(xlrec);
                        rdata.next = NULL;
 
-                       XLogInsert(RM_XLOG_ID, XLOG_PARAMETER_CHANGE, &rdata);
+                       recptr = XLogInsert(RM_XLOG_ID, XLOG_PARAMETER_CHANGE, &rdata);
+                       XLogFlush(recptr);
                }
 
                ControlFile->MaxConnections = MaxConnections;
+               ControlFile->max_worker_processes = max_worker_processes;
                ControlFile->max_prepared_xacts = max_prepared_xacts;
                ControlFile->max_locks_per_xact = max_locks_per_xact;
                ControlFile->wal_level = wal_level;
+               ControlFile->wal_log_hints = wal_log_hints;
                UpdateControlFile();
        }
 }
@@ -7681,9 +9005,9 @@ UpdateFullPageWrites(void)
         */
        if (fullPageWrites)
        {
-               LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+               WALInsertLockAcquireExclusive();
                Insert->fullPageWrites = true;
-               LWLockRelease(WALInsertLock);
+               WALInsertLockRelease();
        }
 
        /*
@@ -7704,9 +9028,9 @@ UpdateFullPageWrites(void)
 
        if (!fullPageWrites)
        {
-               LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+               WALInsertLockAcquireExclusive();
                Insert->fullPageWrites = false;
-               LWLockRelease(WALInsertLock);
+               WALInsertLockRelease();
        }
        END_CRIT_SECTION();
 }
@@ -7723,27 +9047,26 @@ checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI, TimeLineID prevTLI)
        /* Check that the record agrees on what the current (old) timeline is */
        if (prevTLI != ThisTimeLineID)
                ereport(PANIC,
-                               (errmsg("unexpected prev timeline ID %u (current timeline ID %u) in checkpoint record",
+                               (errmsg("unexpected previous timeline ID %u (current timeline ID %u) in checkpoint record",
                                                prevTLI, ThisTimeLineID)));
+
        /*
-        * The new timeline better be in the list of timelines we expect
-        * to see, according to the timeline history. It should also not
-        * decrease.
+        * The new timeline better be in the list of timelines we expect to see,
+        * according to the timeline history. It should also not decrease.
         */
        if (newTLI < ThisTimeLineID || !tliInHistory(newTLI, expectedTLEs))
                ereport(PANIC,
-                               (errmsg("unexpected timeline ID %u (after %u) in checkpoint record",
-                                               newTLI, ThisTimeLineID)));
+                (errmsg("unexpected timeline ID %u (after %u) in checkpoint record",
+                                newTLI, ThisTimeLineID)));
 
        /*
-        * If we have not yet reached min recovery point, and we're about
-        * to switch to a timeline greater than the timeline of the min
-        * recovery point: trouble. After switching to the new timeline,
-        * we could not possibly visit the min recovery point on the
-        * correct timeline anymore. This can happen if there is a newer
-        * timeline in the archive that branched before the timeline the
-        * min recovery point is on, and you attempt to do PITR to the
-        * new timeline.
+        * If we have not yet reached min recovery point, and we're about to
+        * switch to a timeline greater than the timeline of the min recovery
+        * point: trouble. After switching to the new timeline, we could not
+        * possibly visit the min recovery point on the correct timeline anymore.
+        * This can happen if there is a newer timeline in the archive that
+        * branched before the timeline the min recovery point is on, and you
+        * attempt to do PITR to the new timeline.
         */
        if (!XLogRecPtrIsInvalid(minRecoveryPoint) &&
                lsn < minRecoveryPoint &&
@@ -7769,7 +9092,7 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
 {
        uint8           info = record->xl_info & ~XLR_INFO_MASK;
 
-       /* Backup blocks are not used in xlog records */
+       /* Backup blocks are not used by XLOG rmgr */
        Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
 
        if (info == XLOG_NEXTOID)
@@ -7779,7 +9102,7 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
                /*
                 * We used to try to take the maximum of ShmemVariableCache->nextOid
                 * and the recorded nextOid, but that fails if the OID counter wraps
-                * around.      Since no OID allocation should be happening during replay
+                * around.  Since no OID allocation should be happening during replay
                 * anyway, better to just believe the record exactly.  We still take
                 * OidGenLock while setting the variable, just in case.
                 */
@@ -7806,6 +9129,7 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
                                                          checkPoint.nextMultiOffset);
                SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
                SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
+               MultiXactSetSafeTruncate(checkPoint.oldestMulti);
 
                /*
                 * If we see a shutdown checkpoint while waiting for an end-of-backup
@@ -7906,6 +9230,7 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
                                                                  checkPoint.oldestXidDB);
                MultiXactAdvanceOldest(checkPoint.oldestMulti,
                                                           checkPoint.oldestMultiDB);
+               MultiXactSetSafeTruncate(checkPoint.oldestMulti);
 
                /* ControlFile->checkPointCopy always tracks the latest ckpt XID */
                ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
@@ -7963,6 +9288,30 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
        {
                /* nothing to do here */
        }
+       else if (info == XLOG_FPI)
+       {
+               char       *data;
+               BkpBlock        bkpb;
+
+               /*
+                * Full-page image (FPI) records contain a backup block stored
+                * "inline" in the normal data since the locking when writing hint
+                * records isn't sufficient to use the normal backup block mechanism,
+                * which assumes exclusive lock on the buffer supplied.
+                *
+                * Since the only change in these backup block are hint bits, there
+                * are no recovery conflicts generated.
+                *
+                * This also means there is no corresponding API call for this, so an
+                * smgr implementation has no need to implement anything. Which means
+                * nothing is needed in md.c etc
+                */
+               data = XLogRecGetData(record);
+               memcpy(&bkpb, data, sizeof(BkpBlock));
+               data += sizeof(BkpBlock);
+
+               RestoreBackupBlockContents(lsn, bkpb, data, false, false);
+       }
        else if (info == XLOG_BACKUP_END)
        {
                XLogRecPtr      startpoint;
@@ -8003,9 +9352,11 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
 
                LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
                ControlFile->MaxConnections = xlrec.MaxConnections;
+               ControlFile->max_worker_processes = xlrec.max_worker_processes;
                ControlFile->max_prepared_xacts = xlrec.max_prepared_xacts;
                ControlFile->max_locks_per_xact = xlrec.max_locks_per_xact;
                ControlFile->wal_level = xlrec.wal_level;
+               ControlFile->wal_log_hints = wal_log_hints;
 
                /*
                 * Update minRecoveryPoint to ensure that if recovery is aborted, we
@@ -8096,7 +9447,7 @@ get_sync_bit(int method)
 
        /*
         * Optimize writes by bypassing kernel cache with O_DIRECT when using
-        * O_SYNC/O_FSYNC and O_DSYNC.  But only if archiving and streaming are
+        * O_SYNC/O_FSYNC and O_DSYNC.  But only if archiving and streaming are
         * disabled, otherwise the archive command or walsender process will read
         * the WAL soon after writing it, which is guaranteed to cause a physical
         * read if we bypassed the kernel cache. We also skip the
@@ -8158,7 +9509,7 @@ assign_xlog_sync_method(int new_sync_method, void *extra)
                                ereport(PANIC,
                                                (errcode_for_file_access(),
                                                 errmsg("could not fsync log segment %s: %m",
-                                                               XLogFileNameP(ThisTimeLineID, openLogSegNo))));
+                                                         XLogFileNameP(ThisTimeLineID, openLogSegNo))));
                        if (get_sync_bit(sync_method) != get_sync_bit(new_sync_method))
                                XLogFileClose();
                }
@@ -8189,8 +9540,8 @@ issue_xlog_fsync(int fd, XLogSegNo segno)
                        if (pg_fsync_writethrough(fd) != 0)
                                ereport(PANIC,
                                                (errcode_for_file_access(),
-                                                errmsg("could not fsync write-through log file %s: %m",
-                                                               XLogFileNameP(ThisTimeLineID, segno))));
+                                         errmsg("could not fsync write-through log file %s: %m",
+                                                        XLogFileNameP(ThisTimeLineID, segno))));
                        break;
 #endif
 #ifdef HAVE_FDATASYNC
@@ -8219,6 +9570,7 @@ char *
 XLogFileNameP(TimeLineID tli, XLogSegNo segno)
 {
        char       *result = palloc(MAXFNAMELEN);
+
        XLogFileName(result, tli, segno);
        return result;
 }
@@ -8246,6 +9598,9 @@ XLogFileNameP(TimeLineID tli, XLogSegNo segno)
  *
  * Every successfully started non-exclusive backup must be stopped by calling
  * do_pg_stop_backup() or do_pg_abort_backup().
+ *
+ * It is the responsibility of the caller of this function to verify the
+ * permissions of the calling user!
  */
 XLogRecPtr
 do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
@@ -8266,11 +9621,6 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
 
        backup_started_in_recovery = RecoveryInProgress();
 
-       if (!superuser() && !is_authenticated_user_replication_role())
-               ereport(ERROR,
-                               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-                  errmsg("must be superuser or replication role to run a backup")));
-
        /*
         * Currently only non-exclusive backup can be taken during recovery.
         */
@@ -8288,7 +9638,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
                ereport(ERROR,
                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                          errmsg("WAL level not sufficient for making an online backup"),
-                                errhint("wal_level must be set to \"archive\" or \"hot_standby\" at server start.")));
+                                errhint("wal_level must be set to \"archive\", \"hot_standby\" or \"logical\" at server start.")));
 
        if (strlen(backupidstr) > MAXPGPATH)
                ereport(ERROR,
@@ -8301,7 +9651,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
         * during an on-line backup even if not doing so at other times, because
         * it's quite possible for the backup dump to obtain a "torn" (partially
         * written) copy of a database page if it reads the page concurrently with
-        * our write to the same page.  This can be fixed as long as the first
+        * our write to the same page.  This can be fixed as long as the first
         * write to the page in the WAL sequence is a full-page write. Hence, we
         * turn on forcePageWrites and then force a CHECKPOINT, to ensure there
         * are no dirty pages in shared memory that might get dumped while the
@@ -8313,15 +9663,15 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
         * Note that forcePageWrites has no effect during an online backup from
         * the standby.
         *
-        * We must hold WALInsertLock to change the value of forcePageWrites, to
-        * ensure adequate interlocking against XLogInsert().
+        * We must hold all the insertion locks to change the value of
+        * forcePageWrites, to ensure adequate interlocking against XLogInsert().
         */
-       LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+       WALInsertLockAcquireExclusive();
        if (exclusive)
        {
                if (XLogCtl->Insert.exclusiveBackup)
                {
-                       LWLockRelease(WALInsertLock);
+                       WALInsertLockRelease();
                        ereport(ERROR,
                                        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                         errmsg("a backup is already in progress"),
@@ -8332,7 +9682,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
        else
                XLogCtl->Insert.nonExclusiveBackups++;
        XLogCtl->Insert.forcePageWrites = true;
-       LWLockRelease(WALInsertLock);
+       WALInsertLockRelease();
 
        /* Ensure we release forcePageWrites if fail below */
        PG_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) BoolGetDatum(exclusive));
@@ -8345,7 +9695,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
                 * old timeline IDs.  That would otherwise happen if you called
                 * pg_start_backup() right after restoring from a PITR archive: the
                 * first WAL segment containing the startup checkpoint has pages in
-                * the beginning with the old timeline ID.      That can cause trouble at
+                * the beginning with the old timeline ID.  That can cause trouble at
                 * recovery: we won't have a history file covering the old timeline if
                 * pg_xlog directory was not included in the base backup and the WAL
                 * archive was cleared too before starting the backup.
@@ -8368,7 +9718,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
                        bool            checkpointfpw;
 
                        /*
-                        * Force a CHECKPOINT.  Aside from being necessary to prevent torn
+                        * Force a CHECKPOINT.  Aside from being necessary to prevent torn
                         * page problems, this guarantees that two successive backup runs
                         * will have different checkpoint positions and hence different
                         * history file names, even if nothing happened in between.
@@ -8447,13 +9797,13 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
                         * taking a checkpoint right after another is not that expensive
                         * either because only few buffers have been dirtied yet.
                         */
-                       LWLockAcquire(WALInsertLock, LW_SHARED);
+                       WALInsertLockAcquireExclusive();
                        if (XLogCtl->Insert.lastBackupStart < startpoint)
                        {
                                XLogCtl->Insert.lastBackupStart = startpoint;
                                gotUniqueStartpoint = true;
                        }
-                       LWLockRelease(WALInsertLock);
+                       WALInsertLockRelease();
                } while (!gotUniqueStartpoint);
 
                XLByteToSeg(startpoint, _logSegNo);
@@ -8470,9 +9820,9 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
                                        "%Y-%m-%d %H:%M:%S %Z",
                                        pg_localtime(&stamp_time, log_timezone));
                appendStringInfo(&labelfbuf, "START WAL LOCATION: %X/%X (file %s)\n",
-                                                (uint32) (startpoint >> 32), (uint32) startpoint, xlogfilename);
+                        (uint32) (startpoint >> 32), (uint32) startpoint, xlogfilename);
                appendStringInfo(&labelfbuf, "CHECKPOINT LOCATION: %X/%X\n",
-                                                (uint32) (checkpointloc >> 32), (uint32) checkpointloc);
+                                        (uint32) (checkpointloc >> 32), (uint32) checkpointloc);
                appendStringInfo(&labelfbuf, "BACKUP METHOD: %s\n",
                                                 exclusive ? "pg_start_backup" : "streamed");
                appendStringInfo(&labelfbuf, "BACKUP FROM: %s\n",
@@ -8543,7 +9893,7 @@ pg_start_backup_callback(int code, Datum arg)
        bool            exclusive = DatumGetBool(arg);
 
        /* Update backup counters and forcePageWrites on failure */
-       LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+       WALInsertLockAcquireExclusive();
        if (exclusive)
        {
                Assert(XLogCtl->Insert.exclusiveBackup);
@@ -8560,7 +9910,7 @@ pg_start_backup_callback(int code, Datum arg)
        {
                XLogCtl->Insert.forcePageWrites = false;
        }
-       LWLockRelease(WALInsertLock);
+       WALInsertLockRelease();
 }
 
 /*
@@ -8572,6 +9922,9 @@ pg_start_backup_callback(int code, Datum arg)
  *
  * Returns the last WAL position that must be present to restore from this
  * backup, and the corresponding timeline ID in *stoptli_p.
+ *
+ * It is the responsibility of the caller of this function to verify the
+ * permissions of the calling user!
  */
 XLogRecPtr
 do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
@@ -8604,11 +9957,6 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
 
        backup_started_in_recovery = RecoveryInProgress();
 
-       if (!superuser() && !is_authenticated_user_replication_role())
-               ereport(ERROR,
-                               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-                (errmsg("must be superuser or replication role to run a backup"))));
-
        /*
         * Currently only non-exclusive backup can be taken during recovery.
         */
@@ -8626,12 +9974,12 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
                ereport(ERROR,
                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                          errmsg("WAL level not sufficient for making an online backup"),
-                                errhint("wal_level must be set to \"archive\" or \"hot_standby\" at server start.")));
+                                errhint("wal_level must be set to \"archive\", \"hot_standby\" or \"logical\" at server start.")));
 
        /*
         * OK to update backup counters and forcePageWrites
         */
-       LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+       WALInsertLockAcquireExclusive();
        if (exclusive)
                XLogCtl->Insert.exclusiveBackup = false;
        else
@@ -8651,7 +9999,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
        {
                XLogCtl->Insert.forcePageWrites = false;
        }
-       LWLockRelease(WALInsertLock);
+       WALInsertLockRelease();
 
        if (exclusive)
        {
@@ -8776,10 +10124,10 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
                                        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                           errmsg("WAL generated with full_page_writes=off was replayed "
                                          "during online backup"),
-                                errhint("This means that the backup being taken on the standby "
-                                                "is corrupt and should not be used. "
+                        errhint("This means that the backup being taken on the standby "
+                                        "is corrupt and should not be used. "
                                 "Enable full_page_writes and run CHECKPOINT on the master, "
-                                                "and then try an online backup again.")));
+                                        "and then try an online backup again.")));
 
 
                LWLockAcquire(ControlFileLock, LW_SHARED);
@@ -8830,7 +10178,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
                                 errmsg("could not create file \"%s\": %m",
                                                histfilepath)));
        fprintf(fp, "START WAL LOCATION: %X/%X (file %s)\n",
-                       (uint32) (startpoint >> 32), (uint32) startpoint, startxlogfilename);
+               (uint32) (startpoint >> 32), (uint32) startpoint, startxlogfilename);
        fprintf(fp, "STOP WAL LOCATION: %X/%X (file %s)\n",
                        (uint32) (stoppoint >> 32), (uint32) stoppoint, stopxlogfilename);
        /* transfer remaining lines from label to history file */
@@ -8930,13 +10278,13 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
  * an error handler.
  *
  * NB: This is only for aborting a non-exclusive backup that doesn't write
- * backup_label. A backup started with pg_stop_backup() needs to be finished
+ * backup_label. A backup started with pg_start_backup() needs to be finished
  * with pg_stop_backup().
  */
 void
 do_pg_abort_backup(void)
 {
-       LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+       WALInsertLockAcquireExclusive();
        Assert(XLogCtl->Insert.nonExclusiveBackups > 0);
        XLogCtl->Insert.nonExclusiveBackups--;
 
@@ -8945,7 +10293,7 @@ do_pg_abort_backup(void)
        {
                XLogCtl->Insert.forcePageWrites = false;
        }
-       LWLockRelease(WALInsertLock);
+       WALInsertLockRelease();
 }
 
 /*
@@ -8977,14 +10325,14 @@ GetXLogReplayRecPtr(TimeLineID *replayTLI)
 XLogRecPtr
 GetXLogInsertRecPtr(void)
 {
-       XLogCtlInsert *Insert = &XLogCtl->Insert;
-       XLogRecPtr      current_recptr;
+       volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+       uint64          current_bytepos;
 
-       LWLockAcquire(WALInsertLock, LW_SHARED);
-       INSERT_RECPTR(current_recptr, Insert, Insert->curridx);
-       LWLockRelease(WALInsertLock);
+       SpinLockAcquire(&Insert->insertpos_lck);
+       current_bytepos = Insert->CurrBytePos;
+       SpinLockRelease(&Insert->insertpos_lck);
 
-       return current_recptr;
+       return XLogBytePosToRecPtr(current_bytepos);
 }
 
 /*
@@ -9023,7 +10371,7 @@ GetOldestRestartPoint(XLogRecPtr *oldrecptr, TimeLineID *oldtli)
  *
  * If we see a backup_label during recovery, we assume that we are recovering
  * from a backup dump file, and we therefore roll forward from the checkpoint
- * identified by the label file, NOT what pg_control says.     This avoids the
+ * identified by the label file, NOT what pg_control says.  This avoids the
  * problem that pg_control might have been archived one or more checkpoints
  * later than the start of the dump, and so if we rely on it as the start
  * point, we will fail to restore a consistent database state.
@@ -9118,9 +10466,7 @@ rm_redo_error_callback(void *arg)
        StringInfoData buf;
 
        initStringInfo(&buf);
-       RmgrTable[record->xl_rmid].rm_desc(&buf,
-                                                                          record->xl_info,
-                                                                          XLogRecGetData(record));
+       RmgrTable[record->xl_rmid].rm_desc(&buf, record);
 
        /* don't bother emitting empty description */
        if (buf.len > 0)
@@ -9206,10 +10552,10 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
                         XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI)
 {
        XLogPageReadPrivate *private =
-               (XLogPageReadPrivate *) xlogreader->private_data;
+       (XLogPageReadPrivate *) xlogreader->private_data;
        int                     emode = private->emode;
        uint32          targetPageOff;
-       XLogSegNo       targetSegNo PG_USED_FOR_ASSERTS_ONLY;
+       XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY;
 
        XLByteToSeg(targetPagePtr, targetSegNo);
        targetPageOff = targetPagePtr % XLogSegSize;
@@ -9288,24 +10634,24 @@ retry:
        readOff = targetPageOff;
        if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
        {
-               char fname[MAXFNAMELEN];
+               char            fname[MAXFNAMELEN];
 
                XLogFileName(fname, curFileTLI, readSegNo);
                ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
                                (errcode_for_file_access(),
-                errmsg("could not seek in log segment %s to offset %u: %m",
+                                errmsg("could not seek in log segment %s to offset %u: %m",
                                                fname, readOff)));
                goto next_record_is_invalid;
        }
 
        if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
        {
-               char fname[MAXFNAMELEN];
+               char            fname[MAXFNAMELEN];
 
                XLogFileName(fname, curFileTLI, readSegNo);
                ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
                                (errcode_for_file_access(),
-                errmsg("could not read from log segment %s, offset %u: %m",
+                                errmsg("could not read from log segment %s, offset %u: %m",
                                                fname, readOff)));
                goto next_record_is_invalid;
        }
@@ -9350,7 +10696,7 @@ next_record_is_invalid:
  * 'tliRecPtr' is the position of the WAL record we're interested in. It is
  * used to decide which timeline to stream the requested WAL from.
  *
- * If the the record is not immediately available, the function returns false
+ * If the record is not immediately available, the function returns false
  * if we're not in standby mode. In standby mode, waits for it to become
  * available.
  *
@@ -9364,22 +10710,20 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                                                        bool fetching_ckpt, XLogRecPtr tliRecPtr)
 {
        static pg_time_t last_fail_time = 0;
-       pg_time_t now;
+       pg_time_t       now;
 
        /*-------
         * Standby mode is implemented by a state machine:
         *
-        * 1. Read from archive (XLOG_FROM_ARCHIVE)
-        * 2. Read from pg_xlog (XLOG_FROM_PG_XLOG)
-        * 3. Check trigger file
-        * 4. Read from primary server via walreceiver (XLOG_FROM_STREAM)
-        * 5. Rescan timelines
-        * 6. Sleep 5 seconds, and loop back to 1.
+        * 1. Read from either archive or pg_xlog (XLOG_FROM_ARCHIVE), or just
+        *        pg_xlog (XLOG_FROM_XLOG)
+        * 2. Check trigger file
+        * 3. Read from primary server via walreceiver (XLOG_FROM_STREAM)
+        * 4. Rescan timelines
+        * 5. Sleep 5 seconds, and loop back to 1.
         *
         * Failure to read from the current source advances the state machine to
-        * the next state. In addition, successfully reading a file from pg_xlog
-        * moves the state machine from state 2 back to state 1 (we always prefer
-        * files in the archive over files in pg_xlog).
+        * the next state.
         *
         * 'currentSource' indicates the current state. There are no currentSource
         * values for "check trigger", "rescan timelines", and "sleep" states,
@@ -9394,7 +10738,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 
        for (;;)
        {
-               int             oldSource = currentSource;
+               int                     oldSource = currentSource;
 
                /*
                 * First check if we failed to read from the current source, and
@@ -9407,15 +10751,13 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                        switch (currentSource)
                        {
                                case XLOG_FROM_ARCHIVE:
-                                       currentSource = XLOG_FROM_PG_XLOG;
-                                       break;
-
                                case XLOG_FROM_PG_XLOG:
+
                                        /*
-                                        * Check to see if the trigger file exists. Note that we do
-                                        * this only after failure, so when you create the trigger
-                                        * file, we still finish replaying as much as we can from
-                                        * archive and pg_xlog before failover.
+                                        * Check to see if the trigger file exists. Note that we
+                                        * do this only after failure, so when you create the
+                                        * trigger file, we still finish replaying as much as we
+                                        * can from archive and pg_xlog before failover.
                                         */
                                        if (StandbyMode && CheckForStandbyTrigger())
                                        {
@@ -9424,15 +10766,15 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                                        }
 
                                        /*
-                                        * Not in standby mode, and we've now tried the archive and
-                                        * pg_xlog.
+                                        * Not in standby mode, and we've now tried the archive
+                                        * and pg_xlog.
                                         */
                                        if (!StandbyMode)
                                                return false;
 
                                        /*
-                                        * If primary_conninfo is set, launch walreceiver to try to
-                                        * stream the missing WAL.
+                                        * If primary_conninfo is set, launch walreceiver to try
+                                        * to stream the missing WAL.
                                         *
                                         * If fetching_ckpt is TRUE, RecPtr points to the initial
                                         * checkpoint location. In that case, we use RedoStartLSN
@@ -9442,8 +10784,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                                         */
                                        if (PrimaryConnInfo)
                                        {
-                                               XLogRecPtr ptr;
-                                               TimeLineID tli;
+                                               XLogRecPtr      ptr;
+                                               TimeLineID      tli;
 
                                                if (fetching_ckpt)
                                                {
@@ -9452,7 +10794,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                                                }
                                                else
                                                {
-                                                       ptr = RecPtr;
+                                                       ptr = tliRecPtr;
                                                        tli = tliOfPointInHistory(tliRecPtr, expectedTLEs);
 
                                                        if (curFileTLI > 0 && tli < curFileTLI)
@@ -9461,30 +10803,36 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                                                                         tli, curFileTLI);
                                                }
                                                curFileTLI = tli;
-                                               RequestXLogStreaming(curFileTLI, ptr, PrimaryConnInfo);
+                                               RequestXLogStreaming(tli, ptr, PrimaryConnInfo,
+                                                                                        PrimarySlotName);
+                                               receivedUpto = 0;
                                        }
+
                                        /*
-                                        * Move to XLOG_FROM_STREAM state in either case. We'll get
-                                        * immediate failure if we didn't launch walreceiver, and
-                                        * move on to the next state.
+                                        * Move to XLOG_FROM_STREAM state in either case. We'll
+                                        * get immediate failure if we didn't launch walreceiver,
+                                        * and move on to the next state.
                                         */
                                        currentSource = XLOG_FROM_STREAM;
                                        break;
 
                                case XLOG_FROM_STREAM:
+
                                        /*
-                                        * Failure while streaming. Most likely, we got here because
-                                        * streaming replication was terminated, or promotion was
-                                        * triggered. But we also get here if we find an invalid
-                                        * record in the WAL streamed from master, in which case
-                                        * something is seriously wrong. There's little chance that
-                                        * the problem will just go away, but PANIC is not good for
-                                        * availability either, especially in hot standby mode. So,
-                                        * we treat that the same as disconnection, and retry from
-                                        * archive/pg_xlog again. The WAL in the archive should be
-                                        * identical to what was streamed, so it's unlikely that it
-                                        * helps, but one can hope...
+                                        * Failure while streaming. Most likely, we got here
+                                        * because streaming replication was terminated, or
+                                        * promotion was triggered. But we also get here if we
+                                        * find an invalid record in the WAL streamed from master,
+                                        * in which case something is seriously wrong. There's
+                                        * little chance that the problem will just go away, but
+                                        * PANIC is not good for availability either, especially
+                                        * in hot standby mode. So, we treat that the same as
+                                        * disconnection, and retry from archive/pg_xlog again.
+                                        * The WAL in the archive should be identical to what was
+                                        * streamed, so it's unlikely that it helps, but one can
+                                        * hope...
                                         */
+
                                        /*
                                         * Before we leave XLOG_FROM_STREAM state, make sure that
                                         * walreceiver is not active, so that it won't overwrite
@@ -9507,11 +10855,12 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                                        }
 
                                        /*
-                                        * XLOG_FROM_STREAM is the last state in our state machine,
-                                        * so we've exhausted all the options for obtaining the
-                                        * requested WAL. We're going to loop back and retry from
-                                        * the archive, but if it hasn't been long since last
-                                        * attempt, sleep 5 seconds to avoid busy-waiting.
+                                        * XLOG_FROM_STREAM is the last state in our state
+                                        * machine, so we've exhausted all the options for
+                                        * obtaining the requested WAL. We're going to loop back
+                                        * and retry from the archive, but if it hasn't been long
+                                        * since last attempt, sleep 5 seconds to avoid
+                                        * busy-waiting.
                                         */
                                        now = (pg_time_t) time(NULL);
                                        if ((now - last_fail_time) < 5)
@@ -9530,9 +10879,9 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                else if (currentSource == XLOG_FROM_PG_XLOG)
                {
                        /*
-                        * We just successfully read a file in pg_xlog. We prefer files
-                        * in the archive over ones in pg_xlog, so try the next file
-                        * again from the archive first.
+                        * We just successfully read a file in pg_xlog. We prefer files in
+                        * the archive over ones in pg_xlog, so try the next file again
+                        * from the archive first.
                         */
                        if (InArchiveRecovery)
                                currentSource = XLOG_FROM_ARCHIVE;
@@ -9567,7 +10916,9 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                                 * Try to restore the file from archive, or read an existing
                                 * file from pg_xlog.
                                 */
-                               readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2, currentSource);
+                               readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2,
+                                                currentSource == XLOG_FROM_ARCHIVE ? XLOG_FROM_ANY :
+                                                                                         currentSource);
                                if (readFile >= 0)
                                        return true;    /* success! */
 
@@ -9578,107 +10929,110 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                                break;
 
                        case XLOG_FROM_STREAM:
-                       {
-                               bool            havedata;
-
-                               /*
-                                * Check if WAL receiver is still active.
-                                */
-                               if (!WalRcvStreaming())
-                               {
-                                       lastSourceFailed = true;
-                                       break;
-                               }
-
-                               /*
-                                * Walreceiver is active, so see if new data has arrived.
-                                *
-                                * We only advance XLogReceiptTime when we obtain fresh WAL
-                                * from walreceiver and observe that we had already processed
-                                * everything before the most recent "chunk" that it flushed to
-                                * disk.  In steady state where we are keeping up with the
-                                * incoming data, XLogReceiptTime will be updated on each cycle.
-                                * When we are behind, XLogReceiptTime will not advance, so the
-                                * grace time allotted to conflicting queries will decrease.
-                                */
-                               if (RecPtr < receivedUpto)
-                                       havedata = true;
-                               else
                                {
-                                       XLogRecPtr      latestChunkStart;
+                                       bool            havedata;
 
-                                       receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart, &receiveTLI);
-                                       if (RecPtr < receivedUpto && receiveTLI == curFileTLI)
+                                       /*
+                                        * Check if WAL receiver is still active.
+                                        */
+                                       if (!WalRcvStreaming())
                                        {
+                                               lastSourceFailed = true;
+                                               break;
+                                       }
+
+                                       /*
+                                        * Walreceiver is active, so see if new data has arrived.
+                                        *
+                                        * We only advance XLogReceiptTime when we obtain fresh
+                                        * WAL from walreceiver and observe that we had already
+                                        * processed everything before the most recent "chunk"
+                                        * that it flushed to disk.  In steady state where we are
+                                        * keeping up with the incoming data, XLogReceiptTime will
+                                        * be updated on each cycle. When we are behind,
+                                        * XLogReceiptTime will not advance, so the grace time
+                                        * allotted to conflicting queries will decrease.
+                                        */
+                                       if (RecPtr < receivedUpto)
                                                havedata = true;
-                                               if (latestChunkStart <= RecPtr)
+                                       else
+                                       {
+                                               XLogRecPtr      latestChunkStart;
+
+                                               receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart, &receiveTLI);
+                                               if (RecPtr < receivedUpto && receiveTLI == curFileTLI)
                                                {
-                                                       XLogReceiptTime = GetCurrentTimestamp();
-                                                       SetCurrentChunkStartTime(XLogReceiptTime);
+                                                       havedata = true;
+                                                       if (latestChunkStart <= RecPtr)
+                                                       {
+                                                               XLogReceiptTime = GetCurrentTimestamp();
+                                                               SetCurrentChunkStartTime(XLogReceiptTime);
+                                                       }
                                                }
+                                               else
+                                                       havedata = false;
                                        }
-                                       else
-                                               havedata = false;
-                               }
-                               if (havedata)
-                               {
-                                       /*
-                                        * Great, streamed far enough.  Open the file if it's not
-                                        * open already.  Also read the timeline history file if
-                                        * we haven't initialized timeline history yet; it should
-                                        * be streamed over and present in pg_xlog by now.  Use
-                                        * XLOG_FROM_STREAM so that source info is set correctly
-                                        * and XLogReceiptTime isn't changed.
-                                        */
-                                       if (readFile < 0)
+                                       if (havedata)
                                        {
-                                               if (!expectedTLEs)
-                                                       expectedTLEs = readTimeLineHistory(receiveTLI);
-                                               readFile = XLogFileRead(readSegNo, PANIC,
-                                                                                               receiveTLI,
-                                                                                               XLOG_FROM_STREAM, false);
-                                               Assert(readFile >= 0);
+                                               /*
+                                                * Great, streamed far enough.  Open the file if it's
+                                                * not open already.  Also read the timeline history
+                                                * file if we haven't initialized timeline history
+                                                * yet; it should be streamed over and present in
+                                                * pg_xlog by now.  Use XLOG_FROM_STREAM so that
+                                                * source info is set correctly and XLogReceiptTime
+                                                * isn't changed.
+                                                */
+                                               if (readFile < 0)
+                                               {
+                                                       if (!expectedTLEs)
+                                                               expectedTLEs = readTimeLineHistory(receiveTLI);
+                                                       readFile = XLogFileRead(readSegNo, PANIC,
+                                                                                                       receiveTLI,
+                                                                                                       XLOG_FROM_STREAM, false);
+                                                       Assert(readFile >= 0);
+                                               }
+                                               else
+                                               {
+                                                       /* just make sure source info is correct... */
+                                                       readSource = XLOG_FROM_STREAM;
+                                                       XLogReceiptSource = XLOG_FROM_STREAM;
+                                                       return true;
+                                               }
+                                               break;
                                        }
-                                       else
+
+                                       /*
+                                        * Data not here yet. Check for trigger, then wait for
+                                        * walreceiver to wake us up when new WAL arrives.
+                                        */
+                                       if (CheckForStandbyTrigger())
                                        {
-                                               /* just make sure source info is correct... */
-                                               readSource = XLOG_FROM_STREAM;
-                                               XLogReceiptSource = XLOG_FROM_STREAM;
-                                               return true;
+                                               /*
+                                                * Note that we don't "return false" immediately here.
+                                                * After being triggered, we still want to replay all
+                                                * the WAL that was already streamed. It's in pg_xlog
+                                                * now, so we just treat this as a failure, and the
+                                                * state machine will move on to replay the streamed
+                                                * WAL from pg_xlog, and then recheck the trigger and
+                                                * exit replay.
+                                                */
+                                               lastSourceFailed = true;
+                                               break;
                                        }
-                                       break;
-                               }
 
-                               /*
-                                * Data not here yet. Check for trigger, then wait for
-                                * walreceiver to wake us up when new WAL arrives.
-                                */
-                               if (CheckForStandbyTrigger())
-                               {
                                        /*
-                                        * Note that we don't "return false" immediately here.
-                                        * After being triggered, we still want to replay all the
-                                        * WAL that was already streamed. It's in pg_xlog now, so
-                                        * we just treat this as a failure, and the state machine
-                                        * will move on to replay the streamed WAL from pg_xlog,
-                                        * and then recheck the trigger and exit replay.
+                                        * Wait for more WAL to arrive. Time out after 5 seconds,
+                                        * like when polling the archive, to react to a trigger
+                                        * file promptly.
                                         */
-                                       lastSourceFailed = true;
+                                       WaitLatch(&XLogCtl->recoveryWakeupLatch,
+                                                         WL_LATCH_SET | WL_TIMEOUT,
+                                                         5000L);
+                                       ResetLatch(&XLogCtl->recoveryWakeupLatch);
                                        break;
                                }
 
-                               /*
-                                * Wait for more WAL to arrive. Time out after 5 seconds, like
-                                * when polling the archive, to react to a trigger file
-                                * promptly.
-                                */
-                               WaitLatch(&XLogCtl->recoveryWakeupLatch,
-                                                 WL_LATCH_SET | WL_TIMEOUT,
-                                                 5000L);
-                               ResetLatch(&XLogCtl->recoveryWakeupLatch);
-                               break;
-                       }
-
                        default:
                                elog(ERROR, "unexpected WAL source %d", currentSource);
                }
@@ -9688,9 +11042,9 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                 * process.
                 */
                HandleStartupProcInterrupts();
-       } while (StandbyMode);
+       }
 
-       return false;
+       return false;                           /* not reached */
 }
 
 /*
@@ -9698,9 +11052,9 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
  * in the current WAL page, previously read by XLogPageRead().
  *
  * 'emode' is the error mode that would be used to report a file-not-found
- * or legitimate end-of-WAL situation.  Generally, we use it as-is, but if
+ * or legitimate end-of-WAL situation.   Generally, we use it as-is, but if
  * we're retrying the exact same record that we've tried previously, only
- * complain the first time to keep the noise down.     However, we only do when
+ * complain the first time to keep the noise down.  However, we only do when
  * reading from pg_xlog, because we don't expect any invalid records in archive
  * or in records streamed from master. Files in the archive should be complete,
  * and we should never hit the end of WAL because we stop and wait for more WAL
@@ -9742,37 +11096,25 @@ CheckForStandbyTrigger(void)
        if (IsPromoteTriggered())
        {
                /*
-                * In 9.1 and 9.2 the postmaster unlinked the promote file
-                * inside the signal handler. We now leave the file in place
-                * and let the Startup process do the unlink. This allows
-                * Startup to know whether we're doing fast or normal
-                * promotion. Fast promotion takes precedence.
+                * In 9.1 and 9.2 the postmaster unlinked the promote file inside the
+                * signal handler. It now leaves the file in place and lets the
+                * Startup process do the unlink. This allows Startup to know whether
+                * it should create a full checkpoint before starting up (fallback
+                * mode). Fast promotion takes precedence.
                 */
-               if (stat(FAST_PROMOTE_SIGNAL_FILE, &stat_buf) == 0)
+               if (stat(PROMOTE_SIGNAL_FILE, &stat_buf) == 0)
                {
-                       unlink(FAST_PROMOTE_SIGNAL_FILE);
                        unlink(PROMOTE_SIGNAL_FILE);
+                       unlink(FALLBACK_PROMOTE_SIGNAL_FILE);
                        fast_promote = true;
                }
-               else if (stat(PROMOTE_SIGNAL_FILE, &stat_buf) == 0)
+               else if (stat(FALLBACK_PROMOTE_SIGNAL_FILE, &stat_buf) == 0)
                {
-                       unlink(PROMOTE_SIGNAL_FILE);
+                       unlink(FALLBACK_PROMOTE_SIGNAL_FILE);
                        fast_promote = false;
                }
 
-               /*
-                * We only look for fast promote via the pg_ctl promote option.
-                * It would be possible to extend trigger file support for the
-                * fast promotion option but that wouldn't be backwards compatible
-                * anyway and we're looking to focus further work on the promote
-                * option as the right way to signal end of recovery.
-                */
-               if (fast_promote)
-                       ereport(LOG,
-                               (errmsg("received fast promote request")));
-               else
-                       ereport(LOG,
-                               (errmsg("received promote request")));
+               ereport(LOG, (errmsg("received promote request")));
 
                ResetPromoteTriggered();
                triggered = true;
@@ -9788,8 +11130,15 @@ CheckForStandbyTrigger(void)
                                (errmsg("trigger file found: %s", TriggerFile)));
                unlink(TriggerFile);
                triggered = true;
+               fast_promote = true;
                return true;
        }
+       else if (errno != ENOENT)
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not stat trigger file \"%s\": %m",
+                                               TriggerFile)));
+
        return false;
 }
 
@@ -9803,7 +11152,7 @@ CheckPromoteSignal(void)
        struct stat stat_buf;
 
        if (stat(PROMOTE_SIGNAL_FILE, &stat_buf) == 0 ||
-               stat(FAST_PROMOTE_SIGNAL_FILE, &stat_buf) == 0)
+               stat(FALLBACK_PROMOTE_SIGNAL_FILE, &stat_buf) == 0)
                return true;
 
        return false;