]> granicus.if.org Git - postgresql/blobdiff - src/backend/access/transam/xlog.c
Have multixact be truncated by checkpoint, not vacuum
[postgresql] / src / backend / access / transam / xlog.c
index 85a0ce90180eae2105535bdbed597b932550b001..e5640793eb8e09355b09d13cbdb68d3a9773ae6b 100644 (file)
@@ -23,6 +23,7 @@
 
 #include "access/clog.h"
 #include "access/multixact.h"
+#include "access/rewriteheap.h"
 #include "access/subtrans.h"
 #include "access/timeline.h"
 #include "access/transam.h"
 #include "pgstat.h"
 #include "postmaster/bgwriter.h"
 #include "postmaster/startup.h"
+#include "replication/logical.h"
 #include "replication/slot.h"
+#include "replication/snapbuild.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "storage/barrier.h"
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
+#include "storage/large_object.h"
 #include "storage/latch.h"
 #include "storage/pmsignal.h"
 #include "storage/predicate.h"
@@ -86,7 +90,7 @@ int                   sync_method = DEFAULT_SYNC_METHOD;
 int                    wal_level = WAL_LEVEL_MINIMAL;
 int                    CommitDelay = 0;        /* precommit delay in microseconds */
 int                    CommitSiblings = 5; /* # concurrent xacts needed to sleep */
-int                    num_xloginsert_slots = 8;
+int                    num_xloginsert_locks = 8;
 
 #ifdef WAL_DEBUG
 bool           XLOG_DEBUG = false;
@@ -98,7 +102,7 @@ bool         XLOG_DEBUG = false;
  * future XLOG segment as long as there aren't already XLOGfileslop future
  * segments; else we'll delete it.  This could be made a separate GUC
  * variable, but at present I think it's sufficient to hardwire it as
- * 2*CheckPointSegments+1.     Under normal conditions, a checkpoint will free
+ * 2*CheckPointSegments+1.  Under normal conditions, a checkpoint will free
  * no more than 2*CheckPointSegments log segments, and we want to recycle all
  * of them; the +1 allows boundary cases to happen without wasting a
  * delete/create-segment cycle.
@@ -128,7 +132,7 @@ const struct config_enum_entry sync_method_options[] = {
 
 /*
  * Statistics for current checkpoint are collected in this global struct.
- * Because only the background writer or a stand-alone backend can perform
+ * Because only the checkpointer or a stand-alone backend can perform
  * checkpoints, this will be unused in normal backends.
  */
 CheckpointStatsData CheckpointStats;
@@ -187,7 +191,7 @@ static bool LocalHotStandbyActive = false;
  *             0: unconditionally not allowed to insert XLOG
  *             -1: must check RecoveryInProgress(); disallow until it is false
  * Most processes start with -1 and transition to 1 after seeing that recovery
- * is not in progress. But we can also force the value for special cases.
+ * is not in progress.  But we can also force the value for special cases.
  * The coding in XLogInsertAllowed() depends on the first two of these states
  * being numerically the same as bool true and false.
  */
@@ -220,7 +224,7 @@ static bool recoveryPauseAtTarget = true;
 static TransactionId recoveryTargetXid;
 static TimestampTz recoveryTargetTime;
 static char *recoveryTargetName;
-static int min_recovery_apply_delay = 0;
+static int     recovery_min_apply_delay = 0;
 static TimestampTz recoveryDelayUntilTime;
 
 /* options taken from recovery.conf for XLOG streaming */
@@ -258,7 +262,7 @@ static bool recoveryStopAfter;
  *
  * expectedTLEs: a list of TimeLineHistoryEntries for recoveryTargetTLI and the timelines of
  * its known parents, newest first (so recoveryTargetTLI is always the
- * first list member). Only these TLIs are expected to be seen in the WAL
+ * first list member).  Only these TLIs are expected to be seen in the WAL
  * segments we read, and indeed only these TLIs will be considered as
  * candidate WAL files to open at all.
  *
@@ -287,9 +291,9 @@ XLogRecPtr  XactLastRecEnd = InvalidXLogRecPtr;
 /*
  * RedoRecPtr is this backend's local copy of the REDO record pointer
  * (which is almost but not quite the same as a pointer to the most recent
- * CHECKPOINT record). We update this from the shared-memory copy,
+ * CHECKPOINT record).  We update this from the shared-memory copy,
  * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
- * hold an insertion slot).  See XLogInsert for details.  We are also allowed
+ * hold an insertion lock).  See XLogInsert for details.  We are also allowed
  * to update from XLogCtl->RedoRecPtr if we hold the info_lck;
  * see GetRedoRecPtr.  A freshly spawned backend obtains the value during
  * InitXLOGAccess.
@@ -361,63 +365,51 @@ typedef struct XLogwrtResult
        XLogRecPtr      Flush;                  /* last byte + 1 flushed */
 } XLogwrtResult;
 
-
 /*
- * A slot for inserting to the WAL. This is similar to an LWLock, the main
- * difference is that there is an extra xlogInsertingAt field that is protected
- * by the same mutex. Unlike an LWLock, a slot can only be acquired in
- * exclusive mode.
- *
- * The xlogInsertingAt field is used to advertise to other processes how far
- * the slot owner has progressed in inserting the record. When a backend
- * acquires a slot, it initializes xlogInsertingAt to 1, because it doesn't
- * yet know where it's going to insert the record. That's conservative
- * but correct; the new insertion is certainly going to go to a byte position
- * greater than 1. If another backend needs to flush the WAL, it will have to
- * wait for the new insertion. xlogInsertingAt is updated after finishing the
- * insert or when crossing a page boundary, which will wake up anyone waiting
- * for it, whether the wait was necessary in the first place or not.
- *
- * A process can wait on a slot in two modes: LW_EXCLUSIVE or
- * LW_WAIT_UNTIL_FREE. LW_EXCLUSIVE works like in an lwlock; when the slot is
- * released, the first LW_EXCLUSIVE waiter in the queue is woken up. Processes
- * waiting in LW_WAIT_UNTIL_FREE mode are woken up whenever the slot is
- * released, or xlogInsertingAt is updated. In other words, a process in
- * LW_WAIT_UNTIL_FREE mode is woken up whenever the inserter makes any progress
- * copying the record in place. LW_WAIT_UNTIL_FREE waiters are always added to
- * the front of the queue, while LW_EXCLUSIVE waiters are appended to the end.
- *
- * To join the wait queue, a process must set MyProc->lwWaitMode to the mode
- * it wants to wait in, MyProc->lwWaiting to true, and link MyProc to the head
- * or tail of the wait queue. The same mechanism is used to wait on an LWLock,
- * see lwlock.c for details.
+ * Inserting to WAL is protected by a small fixed number of WAL insertion
+ * locks. To insert to the WAL, you must hold one of the locks - it doesn't
+ * matter which one. To lock out other concurrent insertions, you must hold
+ * of them. Each WAL insertion lock consists of a lightweight lock, plus an
+ * indicator of how far the insertion has progressed (insertingAt).
+ *
+ * The insertingAt values are read when a process wants to flush WAL from
+ * the in-memory buffers to disk, to check that all the insertions to the
+ * region the process is about to write out have finished. You could simply
+ * wait for all currently in-progress insertions to finish, but the
+ * insertingAt indicator allows you to ignore insertions to later in the WAL,
+ * so that you only wait for the insertions that are modifying the buffers
+ * you're about to write out.
+ *
+ * This isn't just an optimization. If all the WAL buffers are dirty, an
+ * inserter that's holding a WAL insert lock might need to evict an old WAL
+ * buffer, which requires flushing the WAL. If it's possible for an inserter
+ * to block on another inserter unnecessarily, deadlock can arise when two
+ * inserters holding a WAL insert lock wait for each other to finish their
+ * insertion.
+ *
+ * Small WAL records that don't cross a page boundary never update the value,
+ * the WAL record is just copied to the page and the lock is released. But
+ * to avoid the deadlock-scenario explained above, the indicator is always
+ * updated before sleeping while holding an insertion lock.
  */
 typedef struct
 {
-       slock_t         mutex;                  /* protects the below fields */
-       XLogRecPtr      xlogInsertingAt; /* insert has completed up to this point */
-
-       PGPROC     *owner;                      /* for debugging purposes */
-
-       bool            releaseOK;              /* T if ok to release waiters */
-       char            exclusive;              /* # of exclusive holders (0 or 1) */
-       PGPROC     *head;                       /* head of list of waiting PGPROCs */
-       PGPROC     *tail;                       /* tail of list of waiting PGPROCs */
-       /* tail is undefined when head is NULL */
-} XLogInsertSlot;
+       LWLock          lock;
+       XLogRecPtr      insertingAt;
+} WALInsertLock;
 
 /*
- * All the slots are allocated as an array in shared memory. We force the
- * array stride to be a power of 2, which saves a few cycles in indexing, but
- * more importantly also ensures that individual slots don't cross cache line
- * boundaries. (Of course, we have to also ensure that the array start
- * address is suitably aligned.)
+ * All the WAL insertion locks are allocated as an array in shared memory. We
+ * force the array stride to be a power of 2, which saves a few cycles in
+ * indexing, but more importantly also ensures that individual slots don't
+ * cross cache line boundaries. (Of course, we have to also ensure that the
+ * array start address is suitably aligned.)
  */
-typedef union XLogInsertSlotPadded
+typedef union WALInsertLockPadded
 {
-       XLogInsertSlot slot;
+       WALInsertLock l;
        char            pad[CACHE_LINE_SIZE];
-} XLogInsertSlotPadded;
+} WALInsertLockPadded;
 
 /*
  * Shared state data for XLogInsert.
@@ -427,11 +419,11 @@ typedef struct XLogCtlInsert
        slock_t         insertpos_lck;  /* protects CurrBytePos and PrevBytePos */
 
        /*
-        * CurrBytePos is the end of reserved WAL. The next record will be inserted
-        * at that position. PrevBytePos is the start position of the previously
-        * inserted (or rather, reserved) record - it is copied to the prev-link
-        * of the next record. These are stored as "usable byte positions" rather
-        * than XLogRecPtrs (see XLogBytePosToRecPtr()).
+        * CurrBytePos is the end of reserved WAL. The next record will be
+        * inserted at that position. PrevBytePos is the start position of the
+        * previously inserted (or rather, reserved) record - it is copied to the
+        * prev-link of the next record. These are stored as "usable byte
+        * positions" rather than XLogRecPtrs (see XLogBytePosToRecPtr()).
         */
        uint64          CurrBytePos;
        uint64          PrevBytePos;
@@ -452,8 +444,8 @@ typedef struct XLogCtlInsert
         * we must WAL-log it before it actually affects WAL-logging by backends.
         * Checkpointer sets at startup or after SIGHUP.
         *
-        * To read these fields, you must hold an insertion slot. To modify them,
-        * you must hold ALL the slots.
+        * To read these fields, you must hold an insertion lock. To modify them,
+        * you must hold ALL the locks.
         */
        XLogRecPtr      RedoRecPtr;             /* current redo point for insertions */
        bool            forcePageWrites;        /* forcing full-page writes for PITR? */
@@ -470,8 +462,12 @@ typedef struct XLogCtlInsert
        int                     nonExclusiveBackups;
        XLogRecPtr      lastBackupStart;
 
-       /* insertion slots, see XLogInsertSlot struct above for details */
-       XLogInsertSlotPadded *insertSlots;
+       /*
+        * WAL insertion locks.
+        */
+       WALInsertLockPadded *WALInsertLocks;
+       LWLockTranche WALInsertLockTranche;
+       int                     WALInsertLockTrancheId;
 } XLogCtlInsert;
 
 /*
@@ -509,10 +505,11 @@ typedef struct XLogCtlData
         * Latest initialized page in the cache (last byte position + 1).
         *
         * To change the identity of a buffer (and InitializedUpTo), you need to
-        * hold WALBufMappingLock.  To change the identity of a buffer that's still
-        * dirty, the old page needs to be written out first, and for that you
-        * need WALWriteLock, and you need to ensure that there are no in-progress
-        * insertions to the page by calling WaitXLogInsertionsToFinish().
+        * hold WALBufMappingLock.  To change the identity of a buffer that's
+        * still dirty, the old page needs to be written out first, and for that
+        * you need WALWriteLock, and you need to ensure that there are no
+        * in-progress insertions to the page by calling
+        * WaitXLogInsertionsToFinish().
         */
        XLogRecPtr      InitializedUpTo;
 
@@ -609,6 +606,9 @@ typedef struct XLogCtlData
 
 static XLogCtlData *XLogCtl = NULL;
 
+/* a private copy of XLogCtl->Insert.WALInsertLocks, for convenience */
+static WALInsertLockPadded *WALInsertLocks = NULL;
+
 /*
  * We maintain an image of pg_control in shared memory.
  */
@@ -653,7 +653,7 @@ typedef enum
        XLOG_FROM_ANY = 0,                      /* request to read WAL from any source */
        XLOG_FROM_ARCHIVE,                      /* restored using restore_command */
        XLOG_FROM_PG_XLOG,                      /* existing file in pg_xlog */
-       XLOG_FROM_STREAM,                       /* streamed from master */
+       XLOG_FROM_STREAM                        /* streamed from master */
 } XLogSource;
 
 /* human-readable names for XLogSources, for debugging output */
@@ -732,9 +732,9 @@ static bool InRedo = false;
 /* Have we launched bgwriter during recovery? */
 static bool bgwriterLaunched = false;
 
-/* For WALInsertSlotAcquire/Release functions */
-static int     MySlotNo = 0;
-static bool holdingAllSlots = false;
+/* For WALInsertLockAcquire/Release functions */
+static int     MyLockNo = 0;
+static bool holdingAllLocks = false;
 
 static void readRecoveryCommandFile(void);
 static void exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo);
@@ -801,23 +801,22 @@ static void rm_redo_error_callback(void *arg);
 static int     get_sync_bit(int method);
 
 static void CopyXLogRecordToWAL(int write_len, bool isLogSwitch,
-                                 XLogRecData *rdata,
-                                 XLogRecPtr StartPos, XLogRecPtr EndPos);
+                                       XLogRecData *rdata,
+                                       XLogRecPtr StartPos, XLogRecPtr EndPos);
 static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
                                                  XLogRecPtr *EndPos, XLogRecPtr *PrevPtr);
 static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
                                  XLogRecPtr *PrevPtr);
 static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
-static void WakeupWaiters(XLogRecPtr EndPos);
 static char *GetXLogBuffer(XLogRecPtr ptr);
 static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
 static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
 static uint64 XLogRecPtrToBytePos(XLogRecPtr ptr);
 
-static void WALInsertSlotAcquire(bool exclusive);
-static void WALInsertSlotAcquireOne(int slotno);
-static void WALInsertSlotRelease(void);
-static void WALInsertSlotReleaseOne(int slotno);
+static void WALInsertLockAcquire(void);
+static void WALInsertLockAcquireExclusive(void);
+static void WALInsertLockRelease(void);
+static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt);
 
 /*
  * Insert an XLOG record having the specified RMID and info bytes,
@@ -862,9 +861,9 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
 
        if (rechdr == NULL)
        {
-               rechdr = malloc(SizeOfXLogRecord);
-               if (rechdr == NULL)
-                       elog(ERROR, "out of memory");
+               static char rechdrbuf[SizeOfXLogRecord + MAXIMUM_ALIGNOF];
+
+               rechdr = (XLogRecord *) MAXALIGN(&rechdrbuf);
                MemSet(rechdr, 0, SizeOfXLogRecord);
        }
 
@@ -894,7 +893,7 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
         *
         * We may have to loop back to here if a race condition is detected below.
         * We could prevent the race by doing all this work while holding an
-        * insertion slot, but it seems better to avoid doing CRC calculations
+        * insertion lock, but it seems better to avoid doing CRC calculations
         * while holding one.
         *
         * We add entries for backup blocks to the chain, so that they don't need
@@ -912,8 +911,8 @@ begin:;
        /*
         * Decide if we need to do full-page writes in this XLOG record: true if
         * full_page_writes is on or we have a PITR request for it.  Since we
-        * don't yet have an insertion slot, fullPageWrites and forcePageWrites
-        * could change under us, but we'll recheck them once we have a slot.
+        * don't yet have an insertion lock, fullPageWrites and forcePageWrites
+        * could change under us, but we'll recheck them once we have a lock.
         */
        doPageWrites = Insert->fullPageWrites || Insert->forcePageWrites;
 
@@ -1079,24 +1078,23 @@ begin:;
         * record to the shared WAL buffer cache is a two-step process:
         *
         * 1. Reserve the right amount of space from the WAL. The current head of
-        *    reserved space is kept in Insert->CurrBytePos, and is protected by
-        *    insertpos_lck.
+        *        reserved space is kept in Insert->CurrBytePos, and is protected by
+        *        insertpos_lck.
         *
         * 2. Copy the record to the reserved WAL space. This involves finding the
-        *    correct WAL buffer containing the reserved space, and copying the
-        *    record in place. This can be done concurrently in multiple processes.
+        *        correct WAL buffer containing the reserved space, and copying the
+        *        record in place. This can be done concurrently in multiple processes.
         *
         * To keep track of which insertions are still in-progress, each concurrent
-        * inserter allocates an "insertion slot", which tells others how far the
-        * inserter has progressed. There is a small fixed number of insertion
-        * slots, determined by the num_xloginsert_slots GUC. When an inserter
-        * finishes, it updates the xlogInsertingAt of its slot to the end of the
-        * record it inserted, to let others know that it's done. xlogInsertingAt
-        * is also updated when crossing over to a new WAL buffer, to allow the
-        * the previous buffer to be flushed.
+        * inserter acquires an insertion lock. In addition to just indicating that
+        * an insertion is in progress, the lock tells others how far the inserter
+        * has progressed. There is a small fixed number of insertion locks,
+        * determined by the num_xloginsert_locks GUC. When an inserter crosses a
+        * page boundary, it updates the value stored in the lock to the how far it
+        * has inserted, to allow the previous buffer to be flushed.
         *
-        * Holding onto a slot also protects RedoRecPtr and fullPageWrites from
-        * changing until the insertion is finished.
+        * Holding onto an insertion lock also protects RedoRecPtr and
+        * fullPageWrites from changing until the insertion is finished.
         *
         * Step 2 can usually be done completely in parallel. If the required WAL
         * page is not initialized yet, you have to grab WALBufMappingLock to
@@ -1106,7 +1104,10 @@ begin:;
         *----------
         */
        START_CRIT_SECTION();
-       WALInsertSlotAcquire(isLogSwitch);
+       if (isLogSwitch)
+               WALInsertLockAcquireExclusive();
+       else
+               WALInsertLockAcquire();
 
        /*
         * Check to see if my RedoRecPtr is out of date.  If so, may have to go
@@ -1135,7 +1136,7 @@ begin:;
                                         * Oops, this buffer now needs to be backed up, but we
                                         * didn't think so above.  Start over.
                                         */
-                                       WALInsertSlotRelease();
+                                       WALInsertLockRelease();
                                        END_CRIT_SECTION();
                                        rdt_lastnormal->next = NULL;
                                        info = info_orig;
@@ -1154,7 +1155,7 @@ begin:;
        if ((Insert->fullPageWrites || Insert->forcePageWrites) && !doPageWrites)
        {
                /* Oops, must redo it with full-page data. */
-               WALInsertSlotRelease();
+               WALInsertLockRelease();
                END_CRIT_SECTION();
                rdt_lastnormal->next = NULL;
                info = info_orig;
@@ -1202,7 +1203,7 @@ begin:;
        /*
         * Done! Let others know that we're finished.
         */
-       WALInsertSlotRelease();
+       WALInsertLockRelease();
 
        MarkCurrentTransactionIdLoggedIfAny();
 
@@ -1234,6 +1235,7 @@ begin:;
        {
                TRACE_POSTGRESQL_XLOG_SWITCH();
                XLogFlush(EndPos);
+
                /*
                 * Even though we reserved the rest of the segment for us, which is
                 * reflected in EndPos, we return a pointer to just the end of the
@@ -1263,8 +1265,24 @@ begin:;
                xlog_outrec(&buf, rechdr);
                if (rdata->data != NULL)
                {
+                       StringInfoData recordbuf;
+
+                       /*
+                        * We have to piece together the WAL record data from the
+                        * XLogRecData entries, so that we can pass it to the rm_desc
+                        * function as one contiguous chunk. (but we can leave out any
+                        * extra entries we created for backup blocks)
+                        */
+                       rdt_lastnormal->next = NULL;
+
+                       initStringInfo(&recordbuf);
+                       appendBinaryStringInfo(&recordbuf, (char *) rechdr, sizeof(XLogRecord));
+                       for (; rdata != NULL; rdata = rdata->next)
+                               appendBinaryStringInfo(&recordbuf, rdata->data, rdata->len);
+
                        appendStringInfoString(&buf, " - ");
-                       RmgrTable[rechdr->xl_rmid].rm_desc(&buf, rechdr->xl_info, rdata->data);
+                       RmgrTable[rechdr->xl_rmid].rm_desc(&buf, (XLogRecord *) recordbuf.data);
+                       pfree(recordbuf.data);
                }
                elog(LOG, "%s", buf.data);
                pfree(buf.data);
@@ -1363,7 +1381,7 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
 
        /*
         * These calculations are a bit heavy-weight to be done while holding a
-        * spinlock, but since we're holding all the WAL insertion slots, there
+        * spinlock, but since we're holding all the WAL insertion locks, there
         * are no other inserters competing for it. GetXLogInsertRecPtr() does
         * compete for it, but that's not called very frequently.
         */
@@ -1501,8 +1519,8 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
 
        /*
         * If this was an xlog-switch, it's not enough to write the switch record,
-        * we also have to consume all the remaining space in the WAL segment.
-        * We have already reserved it for us, but we still need to make sure it's
+        * we also have to consume all the remaining space in the WAL segment. We
+        * have already reserved it for us, but we still need to make sure it's
         * allocated and zeroed in the WAL buffers so that when the caller (or
         * someone else) does XLogWrite(), it can really write out all the zeros.
         */
@@ -1523,7 +1541,7 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
                while (CurrPos < EndPos)
                {
                        /* initialize the next page (if not initialized already) */
-                       WakeupWaiters(CurrPos);
+                       WALInsertLockUpdateInsertingAt(CurrPos);
                        AdvanceXLInsertBuffer(CurrPos, false);
                        CurrPos += XLOG_BLCKSZ;
                }
@@ -1534,452 +1552,123 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
 }
 
 /*
- * Allocate a slot for insertion.
- *
- * In exclusive mode, all slots are reserved for the current process. That
- * blocks all concurrent insertions.
- */
-static void
-WALInsertSlotAcquire(bool exclusive)
-{
-       int                     i;
-
-       if (exclusive)
-       {
-               for (i = 0; i < num_xloginsert_slots; i++)
-                       WALInsertSlotAcquireOne(i);
-               holdingAllSlots = true;
-       }
-       else
-               WALInsertSlotAcquireOne(-1);
-}
-
-/*
- * Workhorse of WALInsertSlotAcquire. Acquires the given slot, or an arbitrary
- * one if slotno == -1. The index of the slot that was acquired is stored in
- * MySlotNo.
- *
- * This is more or less equivalent to LWLockAcquire().
+ * Acquire a WAL insertion lock, for inserting to WAL.
  */
 static void
-WALInsertSlotAcquireOne(int slotno)
+WALInsertLockAcquire(void)
 {
-       volatile XLogInsertSlot *slot;
-       PGPROC     *proc = MyProc;
-       bool            retry = false;
-       int                     extraWaits = 0;
-       static int      slotToTry = -1;
+       bool            immed;
 
        /*
-        * Try to use the slot we used last time. If the system isn't particularly
-        * busy, it's a good bet that it's available, and it's good to have some
-        * affinity to a particular slot so that you don't unnecessarily bounce
-        * cache lines between processes when there is no contention.
+        * It doesn't matter which of the WAL insertion locks we acquire, so try
+        * the one we used last time.  If the system isn't particularly busy, it's
+        * a good bet that it's still available, and it's good to have some
+        * affinity to a particular lock so that you don't unnecessarily bounce
+        * cache lines between processes when there's no contention.
         *
-        * If this is the first time through in this backend, pick a slot
-        * (semi-)randomly. This allows the slots to be used evenly if you have a
+        * If this is the first time through in this backend, pick a lock
+        * (semi-)randomly.  This allows the locks to be used evenly if you have a
         * lot of very short connections.
         */
-       if (slotno != -1)
-               MySlotNo = slotno;
-       else
-       {
-               if (slotToTry == -1)
-                       slotToTry = MyProc->pgprocno % num_xloginsert_slots;
-               MySlotNo = slotToTry;
-       }
-
-       /*
-        * We can't wait if we haven't got a PGPROC.  This should only occur
-        * during bootstrap or shared memory initialization.  Put an Assert here
-        * to catch unsafe coding practices.
-        */
-       Assert(MyProc != NULL);
+       static int      lockToTry = -1;
 
-       /*
-        * Lock out cancel/die interrupts until we exit the code section protected
-        * by the slot.  This ensures that interrupts will not interfere with
-        * manipulations of data structures in shared memory. There is no cleanup
-        * mechanism to release the slot if the backend dies while holding one,
-        * so make this a critical section.
-        */
-       START_CRIT_SECTION();
+       if (lockToTry == -1)
+               lockToTry = MyProc->pgprocno % num_xloginsert_locks;
+       MyLockNo = lockToTry;
 
        /*
-        * Loop here to try to acquire slot after each time we are signaled by
-        * WALInsertSlotRelease.
+        * The insertingAt value is initially set to 0, as we don't know our
+        * insert location yet.
         */
-       for (;;)
+       immed = LWLockAcquireWithVar(&WALInsertLocks[MyLockNo].l.lock,
+                                                                &WALInsertLocks[MyLockNo].l.insertingAt,
+                                                                0);
+       if (!immed)
        {
-               bool            mustwait;
-
-               slot = &XLogCtl->Insert.insertSlots[MySlotNo].slot;
-
-               /* Acquire mutex.  Time spent holding mutex should be short! */
-               SpinLockAcquire(&slot->mutex);
-
-               /* If retrying, allow WALInsertSlotRelease to release waiters again */
-               if (retry)
-                       slot->releaseOK = true;
-
-               /* If I can get the slot, do so quickly. */
-               if (slot->exclusive == 0)
-               {
-                       slot->exclusive++;
-                       mustwait = false;
-               }
-               else
-                       mustwait = true;
-
-               if (!mustwait)
-                       break;                          /* got the lock */
-
-               Assert(slot->owner != MyProc);
-
-               /*
-                * Add myself to wait queue.
-                */
-               proc->lwWaiting = true;
-               proc->lwWaitMode = LW_EXCLUSIVE;
-               proc->lwWaitLink = NULL;
-               if (slot->head == NULL)
-                       slot->head = proc;
-               else
-                       slot->tail->lwWaitLink = proc;
-               slot->tail = proc;
-
-               /* Can release the mutex now */
-               SpinLockRelease(&slot->mutex);
-
                /*
-                * Wait until awakened.
-                *
-                * Since we share the process wait semaphore with the regular lock
-                * manager and ProcWaitForSignal, and we may need to acquire a slot
-                * while one of those is pending, it is possible that we get awakened
-                * for a reason other than being signaled by WALInsertSlotRelease. If
-                * so, loop back and wait again.  Once we've gotten the slot,
-                * re-increment the sema by the number of additional signals received,
-                * so that the lock manager or signal manager will see the received
-                * signal when it next waits.
+                * If we couldn't get the lock immediately, try another lock next
+                * time.  On a system with more insertion locks than concurrent
+                * inserters, this causes all the inserters to eventually migrate to a
+                * lock that no-one else is using.  On a system with more inserters
+                * than locks, it still helps to distribute the inserters evenly
+                * across the locks.
                 */
-               for (;;)
-               {
-                       /* "false" means cannot accept cancel/die interrupt here. */
-                       PGSemaphoreLock(&proc->sem, false);
-                       if (!proc->lwWaiting)
-                               break;
-                       extraWaits++;
-               }
-
-               /* Now loop back and try to acquire lock again. */
-               retry = true;
+               lockToTry = (lockToTry + 1) % num_xloginsert_locks;
        }
-
-       slot->owner = proc;
-
-       /*
-        * Normally, we initialize the xlogInsertingAt value of the slot to 1,
-        * because we don't yet know where in the WAL we're going to insert. It's
-        * not critical what it points to right now - leaving it to a too small
-        * value just means that WaitXlogInsertionsToFinish() might wait on us
-        * unnecessarily, until we update the value (when we finish the insert or
-        * move to next page).
-        *
-        * If we're grabbing all the slots, however, stamp all but the last one
-        * with InvalidXLogRecPtr, meaning there is no insert in progress. The last
-        * slot is the one that we will update as we proceed with the insert, the
-        * rest are held just to keep off other inserters.
-        */
-       if (slotno != -1 && slotno != num_xloginsert_slots - 1)
-               slot->xlogInsertingAt = InvalidXLogRecPtr;
-       else
-               slot->xlogInsertingAt = 1;
-
-       /* We are done updating shared state of the slot itself. */
-       SpinLockRelease(&slot->mutex);
-
-       /*
-        * Fix the process wait semaphore's count for any absorbed wakeups.
-        */
-       while (extraWaits-- > 0)
-               PGSemaphoreUnlock(&proc->sem);
-
-       /*
-        * If we couldn't get the slot immediately, try another slot next time.
-        * On a system with more insertion slots than concurrent inserters, this
-        * causes all the inserters to eventually migrate to a slot that no-one
-        * else is using. On a system with more inserters than slots, it still
-        * causes the inserters to be distributed quite evenly across the slots.
-        */
-       if (slotno != -1 && retry)
-               slotToTry = (slotToTry + 1) % num_xloginsert_slots;
 }
 
 /*
- * Wait for the given slot to become free, or for its xlogInsertingAt location
- * to change to something else than 'waitptr'. In other words, wait for the
- * inserter using the given slot to finish its insertion, or to at least make
- * some progress.
+ * Acquire all WAL insertion locks, to prevent other backends from inserting
+ * to WAL.
  */
 static void
-WaitOnSlot(volatile XLogInsertSlot *slot, XLogRecPtr waitptr)
+WALInsertLockAcquireExclusive(void)
 {
-       PGPROC     *proc = MyProc;
-       int                     extraWaits = 0;
-
-       /*
-        * Lock out cancel/die interrupts while we sleep on the slot. There is
-        * no cleanup mechanism to remove us from the wait queue if we got
-        * interrupted.
-        */
-       HOLD_INTERRUPTS();
+       int                     i;
 
        /*
-        * Loop here to try to acquire lock after each time we are signaled.
+        * When holding all the locks, we only update the last lock's insertingAt
+        * indicator.  The others are set to 0xFFFFFFFFFFFFFFFF, which is higher
+        * than any real XLogRecPtr value, to make sure that no-one blocks waiting
+        * on those.
         */
-       for (;;)
+       for (i = 0; i < num_xloginsert_locks - 1; i++)
        {
-               bool            mustwait;
-
-               /* Acquire mutex.  Time spent holding mutex should be short! */
-               SpinLockAcquire(&slot->mutex);
-
-               /* If I can get the lock, do so quickly. */
-               if (slot->exclusive == 0 || slot->xlogInsertingAt != waitptr)
-                       mustwait = false;
-               else
-                       mustwait = true;
-
-               if (!mustwait)
-                       break;                          /* the lock was free */
-
-               Assert(slot->owner != MyProc);
-
-               /*
-                * Add myself to wait queue.
-                */
-               proc->lwWaiting = true;
-               proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
-               proc->lwWaitLink = NULL;
-
-               /* waiters are added to the front of the queue */
-               proc->lwWaitLink = slot->head;
-               if (slot->head == NULL)
-                       slot->tail = proc;
-               slot->head = proc;
-
-               /* Can release the mutex now */
-               SpinLockRelease(&slot->mutex);
-
-               /*
-                * Wait until awakened.
-                *
-                * Since we share the process wait semaphore with other things, like
-                * the regular lock manager and ProcWaitForSignal, and we may need to
-                * acquire an LWLock while one of those is pending, it is possible that
-                * we get awakened for a reason other than being signaled by
-                * LWLockRelease. If so, loop back and wait again.  Once we've gotten
-                * the LWLock, re-increment the sema by the number of additional
-                * signals received, so that the lock manager or signal manager will
-                * see the received signal when it next waits.
-                */
-               for (;;)
-               {
-                       /* "false" means cannot accept cancel/die interrupt here. */
-                       PGSemaphoreLock(&proc->sem, false);
-                       if (!proc->lwWaiting)
-                               break;
-                       extraWaits++;
-               }
-
-               /* Now loop back and try to acquire lock again. */
+               LWLockAcquireWithVar(&WALInsertLocks[i].l.lock,
+                                                        &WALInsertLocks[i].l.insertingAt,
+                                                        UINT64CONST(0xFFFFFFFFFFFFFFFF));
        }
+       LWLockAcquireWithVar(&WALInsertLocks[i].l.lock,
+                                                &WALInsertLocks[i].l.insertingAt,
+                                                0);
 
-       /* We are done updating shared state of the lock itself. */
-       SpinLockRelease(&slot->mutex);
-
-       /*
-        * Fix the process wait semaphore's count for any absorbed wakeups.
-        */
-       while (extraWaits-- > 0)
-               PGSemaphoreUnlock(&proc->sem);
-
-       /*
-        * Now okay to allow cancel/die interrupts.
-        */
-       RESUME_INTERRUPTS();
+       holdingAllLocks = true;
 }
 
 /*
- * Wake up all processes waiting for us with WaitOnSlot(). Sets our
- * xlogInsertingAt value to EndPos, without releasing the slot.
+ * Release our insertion lock (or locks, if we're holding them all).
  */
 static void
-WakeupWaiters(XLogRecPtr EndPos)
+WALInsertLockRelease(void)
 {
-       volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[MySlotNo].slot;
-       PGPROC     *head;
-       PGPROC     *proc;
-       PGPROC     *next;
-
-       /*
-        * If we have already reported progress up to the same point, do nothing.
-        * No other process can modify xlogInsertingAt, so we can check this before
-        * grabbing the spinlock.
-        */
-       if (slot->xlogInsertingAt == EndPos)
-               return;
-       /* xlogInsertingAt should not go backwards */
-       Assert(slot->xlogInsertingAt < EndPos);
-
-       /* Acquire mutex.  Time spent holding mutex should be short! */
-       SpinLockAcquire(&slot->mutex);
-
-       /* we should own the slot */
-       Assert(slot->exclusive == 1 && slot->owner == MyProc);
-
-       slot->xlogInsertingAt = EndPos;
-
-       /*
-        * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
-        * up. They are always in the front of the queue.
-        */
-       head = slot->head;
-
-       if (head != NULL && head->lwWaitMode == LW_WAIT_UNTIL_FREE)
+       if (holdingAllLocks)
        {
-               proc = head;
-               next = proc->lwWaitLink;
-               while (next && next->lwWaitMode == LW_WAIT_UNTIL_FREE)
-               {
-                       proc = next;
-                       next = next->lwWaitLink;
-               }
+               int                     i;
 
-               /* proc is now the last PGPROC to be released */
-               slot->head = next;
-               proc->lwWaitLink = NULL;
+               for (i = 0; i < num_xloginsert_locks; i++)
+                       LWLockRelease(&WALInsertLocks[i].l.lock);
+
+               holdingAllLocks = false;
        }
        else
-               head = NULL;
-
-       /* We are done updating shared state of the lock itself. */
-       SpinLockRelease(&slot->mutex);
-
-       /*
-        * Awaken any waiters I removed from the queue.
-        */
-       while (head != NULL)
        {
-               proc = head;
-               head = proc->lwWaitLink;
-               proc->lwWaitLink = NULL;
-               proc->lwWaiting = false;
-               PGSemaphoreUnlock(&proc->sem);
+               LWLockRelease(&WALInsertLocks[MyLockNo].l.lock);
        }
 }
 
 /*
- * Release our insertion slot (or slots, if we're holding them all).
+ * Update our insertingAt value, to let others know that we've finished
+ * inserting up to that point.
  */
 static void
-WALInsertSlotRelease(void)
+WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt)
 {
-       int                     i;
-
-       if (holdingAllSlots)
+       if (holdingAllLocks)
        {
-               for (i = 0; i < num_xloginsert_slots; i++)
-                       WALInsertSlotReleaseOne(i);
-               holdingAllSlots = false;
+               /*
+                * We use the last lock to mark our actual position, see comments in
+                * WALInsertLockAcquireExclusive.
+                */
+               LWLockUpdateVar(&WALInsertLocks[num_xloginsert_locks - 1].l.lock,
+                                        &WALInsertLocks[num_xloginsert_locks - 1].l.insertingAt,
+                                               insertingAt);
        }
        else
-               WALInsertSlotReleaseOne(MySlotNo);
-}
-
-static void
-WALInsertSlotReleaseOne(int slotno)
-{
-       volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[slotno].slot;
-       PGPROC     *head;
-       PGPROC     *proc;
-
-       /* Acquire mutex.  Time spent holding mutex should be short! */
-       SpinLockAcquire(&slot->mutex);
-
-       /* we must be holding it */
-       Assert(slot->exclusive == 1 && slot->owner == MyProc);
-
-       slot->xlogInsertingAt = InvalidXLogRecPtr;
-
-       /* Release my hold on the slot */
-       slot->exclusive = 0;
-       slot->owner = NULL;
-
-       /*
-        * See if I need to awaken any waiters..
-        */
-       head = slot->head;
-       if (head != NULL)
-       {
-               if (slot->releaseOK)
-               {
-                       /*
-                        * Remove the to-be-awakened PGPROCs from the queue.
-                        */
-                       bool            releaseOK = true;
-
-                       proc = head;
-
-                       /*
-                        * First wake up any backends that want to be woken up without
-                        * acquiring the lock. These are always in the front of the queue.
-                        */
-                       while (proc->lwWaitMode == LW_WAIT_UNTIL_FREE && proc->lwWaitLink)
-                               proc = proc->lwWaitLink;
-
-                       /*
-                        * Awaken the first exclusive-waiter, if any.
-                        */
-                       if (proc->lwWaitLink)
-                       {
-                               Assert(proc->lwWaitLink->lwWaitMode == LW_EXCLUSIVE);
-                               proc = proc->lwWaitLink;
-                               releaseOK = false;
-                       }
-                       /* proc is now the last PGPROC to be released */
-                       slot->head = proc->lwWaitLink;
-                       proc->lwWaitLink = NULL;
-
-                       slot->releaseOK = releaseOK;
-               }
-               else
-                       head = NULL;
-       }
-
-       /* We are done updating shared state of the slot itself. */
-       SpinLockRelease(&slot->mutex);
-
-       /*
-        * Awaken any waiters I removed from the queue.
-        */
-       while (head != NULL)
-       {
-               proc = head;
-               head = proc->lwWaitLink;
-               proc->lwWaitLink = NULL;
-               proc->lwWaiting = false;
-               PGSemaphoreUnlock(&proc->sem);
-       }
-
-       /*
-        * Now okay to allow cancel/die interrupts.
-        */
-       END_CRIT_SECTION();
+               LWLockUpdateVar(&WALInsertLocks[MyLockNo].l.lock,
+                                               &WALInsertLocks[MyLockNo].l.insertingAt,
+                                               insertingAt);
 }
 
-
 /*
  * Wait for any WAL insertions < upto to finish.
  *
@@ -2029,79 +1718,50 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
        }
 
        /*
-        * finishedUpto is our return value, indicating the point upto which
-        * all the WAL insertions have been finished. Initialize it to the head
-        * of reserved WAL, and as we iterate through the insertion slots, back it
+        * Loop through all the locks, sleeping on any in-progress insert older
+        * than 'upto'.
+        *
+        * finishedUpto is our return value, indicating the point upto which all
+        * the WAL insertions have been finished. Initialize it to the head of
+        * reserved WAL, and as we iterate through the insertion locks, back it
         * out for any insertion that's still in progress.
         */
        finishedUpto = reservedUpto;
-
-       /*
-        * Loop through all the slots, sleeping on any in-progress insert older
-        * than 'upto'.
-        */
-       for (i = 0; i < num_xloginsert_slots; i++)
+       for (i = 0; i < num_xloginsert_locks; i++)
        {
-               volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[i].slot;
-               XLogRecPtr insertingat;
+               XLogRecPtr      insertingat = InvalidXLogRecPtr;
 
-       retry:
-               /*
-                * We can check if the slot is in use without grabbing the spinlock.
-                * The spinlock acquisition of insertpos_lck before this loop acts
-                * as a memory barrier. If someone acquires the slot after that, it
-                * can't possibly be inserting to anything < reservedUpto. If it was
-                * acquired before that, an unlocked test will return true.
-                */
-               if (!slot->exclusive)
-                       continue;
-
-               SpinLockAcquire(&slot->mutex);
-               /* re-check now that we have the lock */
-               if (!slot->exclusive)
-               {
-                       SpinLockRelease(&slot->mutex);
-                       continue;
-               }
-               insertingat = slot->xlogInsertingAt;
-               SpinLockRelease(&slot->mutex);
-
-               if (insertingat == InvalidXLogRecPtr)
+               do
                {
                        /*
-                        * slot is reserved just to hold off other inserters, there is no
-                        * actual insert in progress.
+                        * See if this insertion is in progress. LWLockWait will wait for
+                        * the lock to be released, or for the 'value' to be set by a
+                        * LWLockUpdateVar call.  When a lock is initially acquired, its
+                        * value is 0 (InvalidXLogRecPtr), which means that we don't know
+                        * where it's inserting yet.  We will have to wait for it.  If
+                        * it's a small insertion, the record will most likely fit on the
+                        * same page and the inserter will release the lock without ever
+                        * calling LWLockUpdateVar.  But if it has to sleep, it will
+                        * advertise the insertion point with LWLockUpdateVar before
+                        * sleeping.
                         */
-                       continue;
-               }
+                       if (LWLockWaitForVar(&WALInsertLocks[i].l.lock,
+                                                                &WALInsertLocks[i].l.insertingAt,
+                                                                insertingat, &insertingat))
+                       {
+                               /* the lock was free, so no insertion in progress */
+                               insertingat = InvalidXLogRecPtr;
+                               break;
+                       }
 
-               /*
-                * This insertion is still in progress. Do we need to wait for it?
-                *
-                * When an inserter acquires a slot, it doesn't reset 'insertingat', so
-                * it will initially point to the old value of some already-finished
-                * insertion. The inserter will update the value as soon as it finishes
-                * the insertion, moves to the next page, or has to do I/O to flush an
-                * old dirty buffer. That means that when we see a slot with
-                * insertingat value < upto, we don't know if that insertion is still
-                * truly in progress, or if the slot is reused by a new inserter that
-                * hasn't updated the insertingat value yet. We have to assume it's the
-                * latter, and wait.
-                */
-               if (insertingat < upto)
-               {
-                       WaitOnSlot(slot, insertingat);
-                       goto retry;
-               }
-               else
-               {
                        /*
-                        * We don't need to wait for this insertion, but update the
-                        * return value.
+                        * This insertion is still in progress. Have to wait, unless the
+                        * inserter has proceeded past 'upto'.
                         */
-                       if (insertingat < finishedUpto)
-                               finishedUpto = insertingat;
-               }
+               } while (insertingat < upto);
+
+               if (insertingat != InvalidXLogRecPtr && insertingat < finishedUpto)
+                       finishedUpto = insertingat;
        }
        return finishedUpto;
 }
@@ -2115,8 +1775,8 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
  *
  * The caller must ensure that the page containing the requested location
  * isn't evicted yet, and won't be evicted. The way to ensure that is to
- * hold onto an XLogInsertSlot with the xlogInsertingAt position set to
- * something <= ptr. GetXLogBuffer() will update xlogInsertingAt if it needs
+ * hold onto a WAL insertion lock with the insertingAt position set to
+ * something <= ptr. GetXLogBuffer() will update insertingAt if it needs
  * to evict an old page from the buffer. (This means that once you call
  * GetXLogBuffer() with a given 'ptr', you must not access anything before
  * that point anymore, and must not call GetXLogBuffer() with an older 'ptr'
@@ -2143,9 +1803,9 @@ GetXLogBuffer(XLogRecPtr ptr)
        }
 
        /*
-        * The XLog buffer cache is organized so that a page is always loaded
-        * to a particular buffer.  That way we can easily calculate the buffer
-        * a given page must be loaded into, from the XLogRecPtr alone.
+        * The XLog buffer cache is organized so that a page is always loaded to a
+        * particular buffer.  That way we can easily calculate the buffer a given
+        * page must be loaded into, from the XLogRecPtr alone.
         */
        idx = XLogRecPtrToBufIdx(ptr);
 
@@ -2173,17 +1833,17 @@ GetXLogBuffer(XLogRecPtr ptr)
        if (expectedEndPtr != endptr)
        {
                /*
-                * Let others know that we're finished inserting the record up
-                * to the page boundary.
+                * Let others know that we're finished inserting the record up to the
+                * page boundary.
                 */
-               WakeupWaiters(expectedEndPtr - XLOG_BLCKSZ);
+               WALInsertLockUpdateInsertingAt(expectedEndPtr - XLOG_BLCKSZ);
 
                AdvanceXLInsertBuffer(ptr, false);
                endptr = XLogCtl->xlblocks[idx];
 
                if (expectedEndPtr != endptr)
                        elog(PANIC, "could not find WAL buffer for %X/%X",
-                                (uint32) (ptr >> 32) , (uint32) ptr);
+                                (uint32) (ptr >> 32), (uint32) ptr);
        }
        else
        {
@@ -2320,8 +1980,8 @@ XLogRecPtrToBytePos(XLogRecPtr ptr)
        else
        {
                result = fullsegs * UsableBytesInSegment +
-                       (XLOG_BLCKSZ - SizeOfXLogLongPHD) +  /* account for first page */
-                       (fullpages - 1) * UsableBytesInPage; /* full pages */
+                       (XLOG_BLCKSZ - SizeOfXLogLongPHD) + /* account for first page */
+                       (fullpages - 1) * UsableBytesInPage;            /* full pages */
                if (offset > 0)
                {
                        Assert(offset >= SizeOfXLogShortPHD);
@@ -2332,6 +1992,29 @@ XLogRecPtrToBytePos(XLogRecPtr ptr)
        return result;
 }
 
+/*
+ * Determine whether the buffer referenced has to be backed up.
+ *
+ * Since we don't yet have the insert lock, fullPageWrites and forcePageWrites
+ * could change later, so the result should be used for optimization purposes
+ * only.
+ */
+bool
+XLogCheckBufferNeedsBackup(Buffer buffer)
+{
+       bool            doPageWrites;
+       Page            page;
+
+       page = BufferGetPage(buffer);
+
+       doPageWrites = XLogCtl->Insert.fullPageWrites || XLogCtl->Insert.forcePageWrites;
+
+       if (doPageWrites && PageGetLSN(page) <= RedoRecPtr)
+               return true;                    /* buffer requires backup */
+
+       return false;                           /* buffer does not need to be backed up */
+}
+
 /*
  * Determine whether the buffer referenced by an XLogRecData item has to
  * be backed up, and if so fill a BkpBlock struct for it.  In any case
@@ -2493,8 +2176,8 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
                }
 
                /*
-                * Now the next buffer slot is free and we can set it up to be the next
-                * output page.
+                * Now the next buffer slot is free and we can set it up to be the
+                * next output page.
                 */
                NewPageBeginPtr = XLogCtl->InitializedUpTo;
                NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ;
@@ -2517,7 +2200,8 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
                /* NewPage->xlp_info = 0; */    /* done by memset */
                NewPage   ->xlp_tli = ThisTimeLineID;
                NewPage   ->xlp_pageaddr = NewPageBeginPtr;
-               /* NewPage->xlp_rem_len = 0; */         /* done by memset */
+
+               /* NewPage->xlp_rem_len = 0; */ /* done by memset */
 
                /*
                 * If online backup is not in progress, mark the header to indicate
@@ -2525,12 +2209,12 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
                 * blocks.  This allows the WAL archiver to know whether it is safe to
                 * compress archived WAL data by transforming full-block records into
                 * the non-full-block format.  It is sufficient to record this at the
-                * page level because we force a page switch (in fact a segment switch)
-                * when starting a backup, so the flag will be off before any records
-                * can be written during the backup.  At the end of a backup, the last
-                * page will be marked as all unsafe when perhaps only part is unsafe,
-                * but at worst the archiver would miss the opportunity to compress a
-                * few records.
+                * page level because we force a page switch (in fact a segment
+                * switch) when starting a backup, so the flag will be off before any
+                * records can be written during the backup.  At the end of a backup,
+                * the last page will be marked as all unsafe when perhaps only part
+                * is unsafe, but at worst the archiver would miss the opportunity to
+                * compress a few records.
                 */
                if (!Insert->forcePageWrites)
                        NewPage   ->xlp_info |= XLP_BKP_REMOVABLE;
@@ -2652,7 +2336,8 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
                 * if we're passed a bogus WriteRqst.Write that is past the end of the
                 * last page that's been initialized by AdvanceXLInsertBuffer.
                 */
-               XLogRecPtr EndPtr = XLogCtl->xlblocks[curridx];
+               XLogRecPtr      EndPtr = XLogCtl->xlblocks[curridx];
+
                if (LogwrtResult.Write >= EndPtr)
                        elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
                                 (uint32) (LogwrtResult.Write >> 32),
@@ -2736,7 +2421,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
                        do
                        {
                                errno = 0;
-                               written  = write(openLogFile, from, nleft);
+                               written = write(openLogFile, from, nleft);
                                if (written <= 0)
                                {
                                        if (errno == EINTR)
@@ -2745,7 +2430,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
                                                        (errcode_for_file_access(),
                                                         errmsg("could not write to log file %s "
                                                                        "at offset %u, length %zu: %m",
-                                                                       XLogFileNameP(ThisTimeLineID, openLogSegNo),
+                                                                XLogFileNameP(ThisTimeLineID, openLogSegNo),
                                                                        openLogOff, nbytes)));
                                }
                                nleft -= written;
@@ -2823,7 +2508,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
        {
                /*
                 * Could get here without iterating above loop, in which case we might
-                * have no open file or the wrong one.  However, we do not need to
+                * have no open file or the wrong one.  However, we do not need to
                 * fsync more than one file.
                 */
                if (sync_method != SYNC_METHOD_OPEN &&
@@ -2892,7 +2577,7 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 
        /*
         * If the WALWriter is sleeping, we should kick it to make it come out of
-        * low-power mode.      Otherwise, determine whether there's a full page of
+        * low-power mode.  Otherwise, determine whether there's a full page of
         * WAL available to write.
         */
        if (!sleeping)
@@ -2939,7 +2624,8 @@ XLogGetReplicationSlotMinimumLSN(void)
 {
        /* use volatile pointer to prevent code rearrangement */
        volatile XLogCtlData *xlogctl = XLogCtl;
-       XLogRecPtr              retval;
+       XLogRecPtr      retval;
+
        SpinLockAcquire(&xlogctl->info_lck);
        retval = xlogctl->replicationSlotMinLSN;
        SpinLockRelease(&xlogctl->info_lck);
@@ -3206,9 +2892,9 @@ XLogFlush(XLogRecPtr record)
  * We normally flush only completed blocks; but if there is nothing to do on
  * that basis, we check for unflushed async commits in the current incomplete
  * block, and flush through the latest one of those.  Thus, if async commits
- * are not being used, we will flush complete blocks only.     We can guarantee
+ * are not being used, we will flush complete blocks only.  We can guarantee
  * that async commits reach disk after at most three cycles; normally only
- * one or two. (When flushing complete blocks, we allow XLogWrite to write
+ * one or two.  (When flushing complete blocks, we allow XLogWrite to write
  * "flexibly", meaning it can stop at the end of the buffer ring; this makes a
  * difference only with very high load or long wal_writer_delay, but imposes
  * one extra cycle for the worst case for async commits.)
@@ -3383,7 +3069,7 @@ XLogNeedsFlush(XLogRecPtr record)
  * log, seg: identify segment to be created/opened.
  *
  * *use_existent: if TRUE, OK to use a pre-existing file (else, any
- * pre-existing file will be deleted). On return, TRUE if a pre-existing
+ * pre-existing file will be deleted).  On return, TRUE if a pre-existing
  * file was used.
  *
  * use_lock: if TRUE, acquire ControlFileLock while moving file into
@@ -3402,6 +3088,7 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
 {
        char            path[MAXPGPATH];
        char            tmppath[MAXPGPATH];
+       char            zbuffer_raw[XLOG_BLCKSZ + MAXIMUM_ALIGNOF];
        char       *zbuffer;
        XLogSegNo       installed_segno;
        int                     max_advance;
@@ -3440,16 +3127,6 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
 
        unlink(tmppath);
 
-       /*
-        * Allocate a buffer full of zeros. This is done before opening the file
-        * so that we don't leak the file descriptor if palloc fails.
-        *
-        * Note: palloc zbuffer, instead of just using a local char array, to
-        * ensure it is reasonably well-aligned; this may save a few cycles
-        * transferring data to the kernel.
-        */
-       zbuffer = (char *) palloc0(XLOG_BLCKSZ);
-
        /* do not use get_sync_bit() here --- want to fsync only at end of fill */
        fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
                                           S_IRUSR | S_IWUSR);
@@ -3459,14 +3136,19 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
                                 errmsg("could not create file \"%s\": %m", tmppath)));
 
        /*
-        * Zero-fill the file.  We have to do this the hard way to ensure that all
+        * Zero-fill the file.  We have to do this the hard way to ensure that all
         * the file space has really been allocated --- on platforms that allow
         * "holes" in files, just seeking to the end doesn't allocate intermediate
         * space.  This way, we know that we have all the space and (after the
-        * fsync below) that all the indirect blocks are down on disk.  Therefore,
+        * fsync below) that all the indirect blocks are down on disk.  Therefore,
         * fdatasync(2) or O_DSYNC will be sufficient to sync future writes to the
         * log file.
+        *
+        * Note: ensure the buffer is reasonably well-aligned; this may save a few
+        * cycles transferring data to the kernel.
         */
+       zbuffer = (char *) MAXALIGN(zbuffer_raw);
+       memset(zbuffer, 0, XLOG_BLCKSZ);
        for (nbytes = 0; nbytes < XLogSegSize; nbytes += XLOG_BLCKSZ)
        {
                errno = 0;
@@ -3489,7 +3171,6 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
                                         errmsg("could not write to file \"%s\": %m", tmppath)));
                }
        }
-       pfree(zbuffer);
 
        if (pg_fsync(fd) != 0)
        {
@@ -3551,7 +3232,7 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
  *             a different timeline)
  *
  * Currently this is only used during recovery, and so there are no locking
- * considerations.     But we should be just as tense as XLogFileInit to avoid
+ * considerations.  But we should be just as tense as XLogFileInit to avoid
  * emplacing a bogus file.
  */
 static void
@@ -3762,7 +3443,7 @@ XLogFileOpen(XLogSegNo segno)
        if (fd < 0)
                ereport(PANIC,
                                (errcode_for_file_access(),
-                                errmsg("could not open transaction log file \"%s\": %m", path)));
+                       errmsg("could not open transaction log file \"%s\": %m", path)));
 
        return fd;
 }
@@ -3869,13 +3550,13 @@ XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source)
         * the timelines listed in expectedTLEs.
         *
         * We expect curFileTLI on entry to be the TLI of the preceding file in
-        * sequence, or 0 if there was no predecessor.  We do not allow curFileTLI
+        * sequence, or 0 if there was no predecessor.  We do not allow curFileTLI
         * to go backwards; this prevents us from picking up the wrong file when a
         * parent timeline extends to higher segment numbers than the child we
         * want to read.
         *
         * If we haven't read the timeline history file yet, read it now, so that
-        * we know which TLIs to scan.  We don't save the list in expectedTLEs,
+        * we know which TLIs to scan.  We don't save the list in expectedTLEs,
         * however, unless we actually find a valid segment.  That way if there is
         * neither a timeline history file nor a WAL segment in the archive, and
         * streaming replication is set up, we'll read the timeline history file
@@ -3939,7 +3620,7 @@ XLogFileClose(void)
 
        /*
         * WAL segment files will not be re-read in normal operation, so we advise
-        * the OS to release any cached pages.  But do not do so if WAL archiving
+        * the OS to release any cached pages.  But do not do so if WAL archiving
         * or streaming is active, because archiver and walsender process could
         * use the cache to read the WAL segment.
         */
@@ -4015,6 +3696,27 @@ CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
        }
 }
 
+/*
+ * Return the last WAL segment removed, or 0 if no segment has been removed
+ * since startup.
+ *
+ * NB: the result can be out of date arbitrarily fast, the caller has to deal
+ * with that.
+ */
+XLogSegNo
+XLogGetLastRemovedSegno(void)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile XLogCtlData *xlogctl = XLogCtl;
+       XLogSegNo       lastRemovedSegNo;
+
+       SpinLockAcquire(&xlogctl->info_lck);
+       lastRemovedSegNo = xlogctl->lastRemovedSegNo;
+       SpinLockRelease(&xlogctl->info_lck);
+
+       return lastRemovedSegNo;
+}
+
 /*
  * Update the last removed segno pointer in shared memory, to reflect
  * that the given XLOG file has been removed.
@@ -4084,7 +3786,7 @@ RemoveOldXlogFiles(XLogSegNo segno, XLogRecPtr endptr)
        {
                /*
                 * We ignore the timeline part of the XLOG segment identifiers in
-                * deciding whether a segment is still needed.  This ensures that we
+                * deciding whether a segment is still needed.  This ensures that we
                 * won't prematurely remove a segment from a parent timeline. We could
                 * probably be a little more proactive about removing segments of
                 * non-parent timelines, but that would be a whole lot more
@@ -4618,7 +4320,7 @@ rescanLatestTimeLine(void)
  * I/O routines for pg_control
  *
  * *ControlFile is a buffer in shared memory that holds an image of the
- * contents of pg_control.     WriteControlFile() initializes pg_control
+ * contents of pg_control.  WriteControlFile() initializes pg_control
  * given a preloaded buffer, ReadControlFile() loads the buffer from
  * the pg_control file (during postmaster or standalone-backend startup),
  * and UpdateControlFile() rewrites pg_control after we modify xlog state.
@@ -4652,6 +4354,7 @@ WriteControlFile(void)
        ControlFile->indexMaxKeys = INDEX_MAX_KEYS;
 
        ControlFile->toast_max_chunk_size = TOAST_MAX_CHUNK_SIZE;
+       ControlFile->loblksize = LOBLKSIZE;
 
 #ifdef HAVE_INT64_TIMESTAMP
        ControlFile->enableIntTimes = true;
@@ -4845,6 +4548,13 @@ ReadControlFile(void)
                                " but the server was compiled with TOAST_MAX_CHUNK_SIZE %d.",
                          ControlFile->toast_max_chunk_size, (int) TOAST_MAX_CHUNK_SIZE),
                                 errhint("It looks like you need to recompile or initdb.")));
+       if (ControlFile->loblksize != LOBLKSIZE)
+               ereport(FATAL,
+                               (errmsg("database files are incompatible with server"),
+                 errdetail("The database cluster was initialized with LOBLKSIZE %d,"
+                                       " but the server was compiled with LOBLKSIZE %d.",
+                                       ControlFile->loblksize, (int) LOBLKSIZE),
+                                errhint("It looks like you need to recompile or initdb.")));
 
 #ifdef HAVE_INT64_TIMESTAMP
        if (ControlFile->enableIntTimes != true)
@@ -4894,7 +4604,7 @@ ReadControlFile(void)
                                 errhint("It looks like you need to recompile or initdb.")));
 #endif
 
-       /* Make the fixed  settings visible as GUC variables, too */
+       /* Make the initdb settings visible as GUC variables, too */
        SetConfigOption("data_checksums", DataChecksumsEnabled() ? "yes" : "no",
                                        PGC_INTERNAL, PGC_S_OVERRIDE);
 }
@@ -5023,7 +4733,7 @@ check_wal_buffers(int *newval, void **extra, GucSource source)
        {
                /*
                 * If we haven't yet changed the boot_val default of -1, just let it
-                * be.  We'll fix it when XLOGShmemSize is called.
+                * be.  We'll fix it when XLOGShmemSize is called.
                 */
                if (XLOGbuffers == -1)
                        return true;
@@ -5071,8 +4781,8 @@ XLOGShmemSize(void)
        /* XLogCtl */
        size = sizeof(XLogCtlData);
 
-       /* xlog insertion slots, plus alignment */
-       size = add_size(size, mul_size(sizeof(XLogInsertSlotPadded), num_xloginsert_slots + 1));
+       /* WAL insertion locks, plus alignment */
+       size = add_size(size, mul_size(sizeof(WALInsertLockPadded), num_xloginsert_locks + 1));
        /* xlblocks array */
        size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers));
        /* extra alignment padding for XLOG I/O buffers */
@@ -5120,16 +4830,32 @@ XLOGShmemInit(void)
        memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
        allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
 
-       /* Xlog insertion slots. Ensure they're aligned to the full padded size */
-       allocptr += sizeof(XLogInsertSlotPadded) -
-               ((uintptr_t) allocptr) % sizeof(XLogInsertSlotPadded);
-       XLogCtl->Insert.insertSlots = (XLogInsertSlotPadded *) allocptr;
-       allocptr += sizeof(XLogInsertSlotPadded) * num_xloginsert_slots;
+
+       /* WAL insertion locks. Ensure they're aligned to the full padded size */
+       allocptr += sizeof(WALInsertLockPadded) -
+               ((uintptr_t) allocptr) %sizeof(WALInsertLockPadded);
+       WALInsertLocks = XLogCtl->Insert.WALInsertLocks =
+               (WALInsertLockPadded *) allocptr;
+       allocptr += sizeof(WALInsertLockPadded) * num_xloginsert_locks;
+
+       XLogCtl->Insert.WALInsertLockTrancheId = LWLockNewTrancheId();
+
+       XLogCtl->Insert.WALInsertLockTranche.name = "WALInsertLocks";
+       XLogCtl->Insert.WALInsertLockTranche.array_base = WALInsertLocks;
+       XLogCtl->Insert.WALInsertLockTranche.array_stride = sizeof(WALInsertLockPadded);
+
+       LWLockRegisterTranche(XLogCtl->Insert.WALInsertLockTrancheId, &XLogCtl->Insert.WALInsertLockTranche);
+       for (i = 0; i < num_xloginsert_locks; i++)
+       {
+               LWLockInitialize(&WALInsertLocks[i].l.lock,
+                                                XLogCtl->Insert.WALInsertLockTrancheId);
+               WALInsertLocks[i].l.insertingAt = InvalidXLogRecPtr;
+       }
 
        /*
         * Align the start of the page buffers to a full xlog block size boundary.
-        * This simplifies some calculations in XLOG insertion. It is also required
-        * for O_DIRECT.
+        * This simplifies some calculations in XLOG insertion. It is also
+        * required for O_DIRECT.
         */
        allocptr = (char *) TYPEALIGN(XLOG_BLCKSZ, allocptr);
        XLogCtl->pages = allocptr;
@@ -5144,19 +4870,6 @@ XLOGShmemInit(void)
        XLogCtl->SharedHotStandbyActive = false;
        XLogCtl->WalWriterSleeping = false;
 
-       for (i = 0; i < num_xloginsert_slots; i++)
-       {
-               XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[i].slot;
-               SpinLockInit(&slot->mutex);
-               slot->xlogInsertingAt = InvalidXLogRecPtr;
-               slot->owner = NULL;
-
-               slot->releaseOK = true;
-               slot->exclusive = 0;
-               slot->head = NULL;
-               slot->tail = NULL;
-       }
-
        SpinLockInit(&XLogCtl->Insert.insertpos_lck);
        SpinLockInit(&XLogCtl->info_lck);
        SpinLockInit(&XLogCtl->ulsn_lck);
@@ -5194,15 +4907,16 @@ BootStrapXLOG(void)
         * field, as being about as unique as we can easily get.  (Think not to
         * use random(), since it hasn't been seeded and there's no portable way
         * to seed it other than the system clock value...)  The upper half of the
-        * uint64 value is just the tv_sec part, while the lower half is the XOR
-        * of tv_sec and tv_usec.  This is to ensure that we don't lose uniqueness
-        * unnecessarily if "uint64" is really only 32 bits wide.  A person
-        * knowing this encoding can determine the initialization time of the
-        * installation, which could perhaps be useful sometimes.
+        * uint64 value is just the tv_sec part, while the lower half contains the
+        * tv_usec part (which must fit in 20 bits), plus 12 bits from our current
+        * PID for a little extra uniqueness.  A person knowing this encoding can
+        * determine the initialization time of the installation, which could
+        * perhaps be useful sometimes.
         */
        gettimeofday(&tv, NULL);
        sysidentifier = ((uint64) tv.tv_sec) << 32;
-       sysidentifier |= (uint32) (tv.tv_sec | tv.tv_usec);
+       sysidentifier |= ((uint64) tv.tv_usec) << 12;
+       sysidentifier |= getpid() & 0xFFF;
 
        /* First timeline ID is always 1 */
        ThisTimeLineID = 1;
@@ -5517,12 +5231,12 @@ readRecoveryCommandFile(void)
                                        (errmsg_internal("primary_conninfo = '%s'",
                                                                         PrimaryConnInfo)));
                }
-               else if (strcmp(item->name, "primary_slotname") == 0)
+               else if (strcmp(item->name, "primary_slot_name") == 0)
                {
                        ReplicationSlotValidateName(item->value, ERROR);
                        PrimarySlotName = pstrdup(item->value);
                        ereport(DEBUG2,
-                                       (errmsg_internal("primary_slotname = '%s'",
+                                       (errmsg_internal("primary_slot_name = '%s'",
                                                                         PrimarySlotName)));
                }
                else if (strcmp(item->name, "trigger_file") == 0)
@@ -5532,18 +5246,19 @@ readRecoveryCommandFile(void)
                                        (errmsg_internal("trigger_file = '%s'",
                                                                         TriggerFile)));
                }
-               else if (strcmp(item->name, "min_recovery_apply_delay") == 0)
+               else if (strcmp(item->name, "recovery_min_apply_delay") == 0)
                {
                        const char *hintmsg;
 
-                       if (!parse_int(item->value, &min_recovery_apply_delay, GUC_UNIT_MS,
-                                       &hintmsg))
+                       if (!parse_int(item->value, &recovery_min_apply_delay, GUC_UNIT_MS,
+                                                  &hintmsg))
                                ereport(ERROR,
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-                                                errmsg("parameter \"%s\" requires a temporal value", "min_recovery_apply_delay"),
+                                                errmsg("parameter \"%s\" requires a temporal value",
+                                                               "recovery_min_apply_delay"),
                                                 hintmsg ? errhint("%s", _(hintmsg)) : 0));
                        ereport(DEBUG2,
-                                       (errmsg("min_recovery_apply_delay = '%s'", item->value)));
+                                       (errmsg("recovery_min_apply_delay = '%s'", item->value)));
                }
                else
                        ereport(FATAL,
@@ -5575,7 +5290,7 @@ readRecoveryCommandFile(void)
 
        /*
         * If user specified recovery_target_timeline, validate it or compute the
-        * "latest" value.      We can't do this until after we've gotten the restore
+        * "latest" value.  We can't do this until after we've gotten the restore
         * command and set InArchiveRecovery, because we need to fetch timeline
         * history files from the archive.
         */
@@ -5768,8 +5483,8 @@ recoveryStopsBefore(XLogRecord *record)
                 *
                 * when testing for an xid, we MUST test for equality only, since
                 * transactions are numbered in the order they start, not the order
-                * they complete. A higher numbered xid will complete before you
-                * about 50% of the time...
+                * they complete. A higher numbered xid will complete before you about
+                * 50% of the time...
                 */
                stopsHere = (record->xl_xid == recoveryTargetXid);
        }
@@ -5829,8 +5544,8 @@ recoveryStopsAfter(XLogRecord *record)
        record_info = record->xl_info & ~XLR_INFO_MASK;
 
        /*
-        * There can be many restore points that share the same name; we stop
-        * at the first one.
+        * There can be many restore points that share the same name; we stop at
+        * the first one.
         */
        if (recoveryTarget == RECOVERY_TARGET_NAME &&
                record->xl_rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT)
@@ -5844,12 +5559,12 @@ recoveryStopsAfter(XLogRecord *record)
                        recoveryStopAfter = true;
                        recoveryStopXid = InvalidTransactionId;
                        (void) getRecordTimestamp(record, &recoveryStopTime);
-                       strncpy(recoveryStopName, recordRestorePointData->rp_name, MAXFNAMELEN);
+                       strlcpy(recoveryStopName, recordRestorePointData->rp_name, MAXFNAMELEN);
 
                        ereport(LOG,
-                                       (errmsg("recovery stopping at restore point \"%s\", time %s",
-                                                       recoveryStopName,
-                                                       timestamptz_to_str(recoveryStopTime))));
+                               (errmsg("recovery stopping at restore point \"%s\", time %s",
+                                               recoveryStopName,
+                                               timestamptz_to_str(recoveryStopTime))));
                        return true;
                }
        }
@@ -5965,7 +5680,7 @@ SetRecoveryPause(bool recoveryPause)
 }
 
 /*
- * When min_recovery_apply_delay is set, we wait long enough to make sure
+ * When recovery_min_apply_delay is set, we wait long enough to make sure
  * certain record types are applied at least that interval behind the master.
  *
  * Returns true if we waited.
@@ -5986,16 +5701,16 @@ recoveryApplyDelay(XLogRecord *record)
        int                     microsecs;
 
        /* nothing to do if no delay configured */
-       if (min_recovery_apply_delay == 0)
+       if (recovery_min_apply_delay == 0)
                return false;
 
        /*
         * Is it a COMMIT record?
         *
-        * We deliberately choose not to delay aborts since they have no effect
-        * on MVCC. We already allow replay of records that don't have a
-        * timestamp, so there is already opportunity for issues caused by early
-        * conflicts on standbys.
+        * We deliberately choose not to delay aborts since they have no effect on
+        * MVCC. We already allow replay of records that don't have a timestamp,
+        * so there is already opportunity for issues caused by early conflicts on
+        * standbys.
         */
        record_info = record->xl_info & ~XLR_INFO_MASK;
        if (!(record->xl_rmid == RM_XACT_ID &&
@@ -6007,7 +5722,7 @@ recoveryApplyDelay(XLogRecord *record)
                return false;
 
        recoveryDelayUntilTime =
-               TimestampTzPlusMilliseconds(xtime, min_recovery_apply_delay);
+               TimestampTzPlusMilliseconds(xtime, recovery_min_apply_delay);
 
        /*
         * Exit without arming the latch if it's already past time to apply this
@@ -6015,7 +5730,7 @@ recoveryApplyDelay(XLogRecord *record)
         */
        TimestampDifference(GetCurrentTimestamp(), recoveryDelayUntilTime,
                                                &secs, &microsecs);
-       if (secs <= 0 && microsecs <=0)
+       if (secs <= 0 && microsecs <= 0)
                return false;
 
        while (true)
@@ -6035,15 +5750,15 @@ recoveryApplyDelay(XLogRecord *record)
                TimestampDifference(GetCurrentTimestamp(), recoveryDelayUntilTime,
                                                        &secs, &microsecs);
 
-               if (secs <= 0 && microsecs <=0)
+               if (secs <= 0 && microsecs <= 0)
                        break;
 
                elog(DEBUG2, "recovery apply delay %ld seconds, %d milliseconds",
-                       secs, microsecs / 1000);
+                        secs, microsecs / 1000);
 
                WaitLatch(&XLogCtl->recoveryWakeupLatch,
-                                       WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
-                                       secs * 1000L + microsecs / 1000);
+                                 WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+                                 secs * 1000L + microsecs / 1000);
        }
        return true;
 }
@@ -6163,7 +5878,7 @@ CheckRequiredParameterValues(void)
         * For archive recovery, the WAL must be generated with at least 'archive'
         * wal_level.
         */
-       if (InArchiveRecovery && ControlFile->wal_level == WAL_LEVEL_MINIMAL)
+       if (ArchiveRecoveryRequested && ControlFile->wal_level == WAL_LEVEL_MINIMAL)
        {
                ereport(WARNING,
                                (errmsg("WAL was generated with wal_level=minimal, data may be missing"),
@@ -6174,7 +5889,7 @@ CheckRequiredParameterValues(void)
         * For Hot Standby, the WAL must be generated with 'hot_standby' mode, and
         * we must have at least as many backend slots as the primary.
         */
-       if (InArchiveRecovery && EnableHotStandby)
+       if (ArchiveRecoveryRequested && EnableHotStandby)
        {
                if (ControlFile->wal_level < WAL_LEVEL_HOT_STANDBY)
                        ereport(ERROR,
@@ -6282,7 +5997,7 @@ StartupXLOG(void)
        ValidateXLOGDirectoryStructure();
 
        /*
-        * Clear out any old relcache cache files.      This is *necessary* if we do
+        * Clear out any old relcache cache files.  This is *necessary* if we do
         * any WAL replay, since that would probably result in the cache files
         * being out of sync with database reality.  In theory we could leave them
         * in place if the database had been cleanly shut down, but it seems
@@ -6311,7 +6026,7 @@ StartupXLOG(void)
         * Save archive_cleanup_command in shared memory so that other processes
         * can see it.
         */
-       strncpy(XLogCtl->archiveCleanupCommand,
+       strlcpy(XLogCtl->archiveCleanupCommand,
                        archiveCleanupCommand ? archiveCleanupCommand : "",
                        sizeof(XLogCtl->archiveCleanupCommand));
 
@@ -6354,7 +6069,7 @@ StartupXLOG(void)
                ereport(ERROR,
                                (errcode(ERRCODE_OUT_OF_MEMORY),
                                 errmsg("out of memory"),
-                       errdetail("Failed while allocating an XLog reading processor.")));
+                  errdetail("Failed while allocating an XLog reading processor.")));
        xlogreader->system_identifier = ControlFile->system_identifier;
 
        if (read_backup_label(&checkPointLoc, &backupEndRequired,
@@ -6549,6 +6264,7 @@ StartupXLOG(void)
        MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
        SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
        SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
+       MultiXactSetSafeTruncate(checkPoint.oldestMulti);
        XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
        XLogCtl->ckptXid = checkPoint.nextXid;
 
@@ -6556,12 +6272,18 @@ StartupXLOG(void)
         * Initialize replication slots, before there's a chance to remove
         * required resources.
         */
-       StartupReplicationSlots(checkPoint.redo);
+       StartupReplicationSlots();
+
+       /*
+        * Startup logical state, needs to be setup now so we have proper data
+        * during crash recovery.
+        */
+       StartupReorderBuffer();
 
        /*
-        * Startup MultiXact.  We need to do this early for two reasons: one
-        * is that we might try to access multixacts when we do tuple freezing,
-        * and the other is we need its state initialized because we attempt
+        * Startup MultiXact.  We need to do this early for two reasons: one is
+        * that we might try to access multixacts when we do tuple freezing, and
+        * the other is we need its state initialized because we attempt
         * truncation during restartpoints.
         */
        StartupMultiXact();
@@ -6815,14 +6537,18 @@ StartupXLOG(void)
                }
 
                /*
-                * Initialize shared variables for tracking progress of WAL replay,
-                * as if we had just replayed the record before the REDO location.
+                * Initialize shared variables for tracking progress of WAL replay, as
+                * if we had just replayed the record before the REDO location (or the
+                * checkpoint record itself, if it's a shutdown checkpoint).
                 */
                SpinLockAcquire(&xlogctl->info_lck);
-               xlogctl->replayEndRecPtr = checkPoint.redo;
+               if (checkPoint.redo < RecPtr)
+                       xlogctl->replayEndRecPtr = checkPoint.redo;
+               else
+                       xlogctl->replayEndRecPtr = EndRecPtr;
                xlogctl->replayEndTLI = ThisTimeLineID;
-               xlogctl->lastReplayedEndRecPtr = checkPoint.redo;
-               xlogctl->lastReplayedTLI = ThisTimeLineID;
+               xlogctl->lastReplayedEndRecPtr = xlogctl->replayEndRecPtr;
+               xlogctl->lastReplayedTLI = xlogctl->replayEndTLI;
                xlogctl->recoveryLastXTime = 0;
                xlogctl->currentChunkStartTime = 0;
                xlogctl->recoveryPause = false;
@@ -6903,9 +6629,7 @@ StartupXLOG(void)
                                                         (uint32) (EndRecPtr >> 32), (uint32) EndRecPtr);
                                        xlog_outrec(&buf, record);
                                        appendStringInfoString(&buf, " - ");
-                                       RmgrTable[record->xl_rmid].rm_desc(&buf,
-                                                                                                          record->xl_info,
-                                                                                                        XLogRecGetData(record));
+                                       RmgrTable[record->xl_rmid].rm_desc(&buf, record);
                                        elog(LOG, "%s", buf.data);
                                        pfree(buf.data);
                                }
@@ -6940,17 +6664,17 @@ StartupXLOG(void)
                                }
 
                                /*
-                                * If we've been asked to lag the master, wait on
-                                * latch until enough time has passed.
+                                * If we've been asked to lag the master, wait on latch until
+                                * enough time has passed.
                                 */
                                if (recoveryApplyDelay(record))
                                {
                                        /*
-                                        * We test for paused recovery again here. If
-                                        * user sets delayed apply, it may be because
-                                        * they expect to pause recovery in case of
-                                        * problems, so we must test again here otherwise
-                                        * pausing during the delay-wait wouldn't work.
+                                        * We test for paused recovery again here. If user sets
+                                        * delayed apply, it may be because they expect to pause
+                                        * recovery in case of problems, so we must test again
+                                        * here otherwise pausing during the delay-wait wouldn't
+                                        * work.
                                         */
                                        if (xlogctl->recoveryPause)
                                                recoveryPausesHere();
@@ -7087,6 +6811,13 @@ StartupXLOG(void)
                                recoveryPausesHere();
                        }
 
+                       /* Allow resource managers to do any required cleanup. */
+                       for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
+                       {
+                               if (RmgrTable[rmid].rm_cleanup != NULL)
+                                       RmgrTable[rmid].rm_cleanup();
+                       }
+
                        ereport(LOG,
                                        (errmsg("redo done at %X/%X",
                                                 (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr)));
@@ -7180,8 +6911,8 @@ StartupXLOG(void)
        /*
         * Consider whether we need to assign a new timeline ID.
         *
-        * If we are doing an archive recovery, we always assign a new ID.      This
-        * handles a couple of issues.  If we stopped short of the end of WAL
+        * If we are doing an archive recovery, we always assign a new ID.  This
+        * handles a couple of issues.  If we stopped short of the end of WAL
         * during recovery, then we are clearly generating a new timeline and must
         * assign it a unique new ID.  Even if we ran to the end, modifying the
         * current last segment is problematic because it may result in trying to
@@ -7256,7 +6987,7 @@ StartupXLOG(void)
 
        /*
         * Tricky point here: readBuf contains the *last* block that the LastRec
-        * record spans, not the one it starts in.      The last block is indeed the
+        * record spans, not the one it starts in.  The last block is indeed the
         * one we want to use.
         */
        if (EndOfLog % XLOG_BLCKSZ != 0)
@@ -7283,9 +7014,9 @@ StartupXLOG(void)
        else
        {
                /*
-                * There is no partial block to copy. Just set InitializedUpTo,
-                * and let the first attempt to insert a log record to initialize
-                * the next buffer.
+                * There is no partial block to copy. Just set InitializedUpTo, and
+                * let the first attempt to insert a log record to initialize the next
+                * buffer.
                 */
                XLogCtl->InitializedUpTo = EndOfLog;
        }
@@ -7312,27 +7043,6 @@ StartupXLOG(void)
 
        if (InRecovery)
        {
-               int                     rmid;
-
-               /*
-                * Resource managers might need to write WAL records, eg, to record
-                * index cleanup actions.  So temporarily enable XLogInsertAllowed in
-                * this process only.
-                */
-               LocalSetXLogInsertAllowed();
-
-               /*
-                * Allow resource managers to do any required cleanup.
-                */
-               for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
-               {
-                       if (RmgrTable[rmid].rm_cleanup != NULL)
-                               RmgrTable[rmid].rm_cleanup();
-               }
-
-               /* Disallow XLogInsert again */
-               LocalXLogInsertAllowed = -1;
-
                /*
                 * Perform a checkpoint to update all our recovery activity to disk.
                 *
@@ -7470,7 +7180,7 @@ StartupXLOG(void)
        XLogReportParameters();
 
        /*
-        * All done.  Allow backends to write WAL.      (Although the bool flag is
+        * All done.  Allow backends to write WAL.  (Although the bool flag is
         * probably atomic in itself, we use the info_lck here to ensure that
         * there are no race conditions concerning visibility of other recent
         * updates to shared memory.)
@@ -7508,7 +7218,7 @@ StartupXLOG(void)
 static void
 CheckRecoveryConsistency(void)
 {
-       XLogRecPtr lastReplayedEndRecPtr;
+       XLogRecPtr      lastReplayedEndRecPtr;
 
        /*
         * During crash recovery, we don't reach a consistent state until we've
@@ -7630,7 +7340,7 @@ RecoveryInProgress(void)
                /*
                 * Initialize TimeLineID and RedoRecPtr when we discover that recovery
                 * is finished. InitPostgres() relies upon this behaviour to ensure
-                * that InitXLOGAccess() is called at backend startup.  (If you change
+                * that InitXLOGAccess() is called at backend startup.  (If you change
                 * this, see also LocalSetXLogInsertAllowed.)
                 */
                if (!LocalRecoveryInProgress)
@@ -7643,6 +7353,7 @@ RecoveryInProgress(void)
                        pg_memory_barrier();
                        InitXLOGAccess();
                }
+
                /*
                 * Note: We don't need a memory barrier when we're still in recovery.
                 * We might exit recovery immediately after return, so the caller
@@ -7883,6 +7594,11 @@ InitXLOGAccess(void)
        ThisTimeLineID = XLogCtl->ThisTimeLineID;
        Assert(ThisTimeLineID != 0 || IsBootstrapProcessingMode());
 
+       /* Initialize our copy of WALInsertLocks and register the tranche */
+       WALInsertLocks = XLogCtl->Insert.WALInsertLocks;
+       LWLockRegisterTranche(XLogCtl->Insert.WALInsertLockTrancheId,
+                                                 &XLogCtl->Insert.WALInsertLockTranche);
+
        /* Use GetRedoRecPtr to copy the RedoRecPtr safely */
        (void) GetRedoRecPtr();
 }
@@ -7897,11 +7613,11 @@ GetRedoRecPtr(void)
 {
        /* use volatile pointer to prevent code rearrangement */
        volatile XLogCtlData *xlogctl = XLogCtl;
-       XLogRecPtr ptr;
+       XLogRecPtr      ptr;
 
        /*
         * The possibly not up-to-date copy in XlogCtl is enough. Even if we
-        * grabbed a WAL insertion slot to read the master copy, someone might
+        * grabbed a WAL insertion lock to read the master copy, someone might
         * update it just after we've released the lock.
         */
        SpinLockAcquire(&xlogctl->info_lck);
@@ -7919,7 +7635,7 @@ GetRedoRecPtr(void)
  *
  * NOTE: The value *actually* returned is the position of the last full
  * xlog page. It lags behind the real insert position by at most 1 page.
- * For that, we don't need to scan through WAL insertion slots, and an
+ * For that, we don't need to scan through WAL insertion locks, and an
  * approximation is enough for the current usage of this function.
  */
 XLogRecPtr
@@ -8280,13 +7996,13 @@ CreateCheckPoint(int flags)
         * We must block concurrent insertions while examining insert state to
         * determine the checkpoint REDO pointer.
         */
-       WALInsertSlotAcquire(true);
+       WALInsertLockAcquireExclusive();
        curInsert = XLogBytePosToRecPtr(Insert->CurrBytePos);
 
        /*
         * If this isn't a shutdown or forced checkpoint, and we have not inserted
         * any XLOG records since the start of the last checkpoint, skip the
-        * checkpoint.  The idea here is to avoid inserting duplicate checkpoints
+        * checkpoint.  The idea here is to avoid inserting duplicate checkpoints
         * when the system is idle. That wastes log space, and more importantly it
         * exposes us to possible loss of both current and previous checkpoint
         * records if the machine crashes just as we're writing the update.
@@ -8305,7 +8021,7 @@ CreateCheckPoint(int flags)
                        MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
                        ControlFile->checkPoint == ControlFile->checkPointCopy.redo)
                {
-                       WALInsertSlotRelease();
+                       WALInsertLockRelease();
                        LWLockRelease(CheckpointLock);
                        END_CRIT_SECTION();
                        return;
@@ -8349,7 +8065,7 @@ CreateCheckPoint(int flags)
 
        /*
         * Here we update the shared RedoRecPtr for future XLogInsert calls; this
-        * must be done while holding the insertion slots.
+        * must be done while holding all the insertion locks.
         *
         * Note: if we fail to complete the checkpoint, RedoRecPtr will be left
         * pointing past where it really needs to point.  This is okay; the only
@@ -8361,10 +8077,10 @@ CreateCheckPoint(int flags)
        RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
 
        /*
-        * Now we can release the WAL insertion slots, allowing other xacts to
+        * Now we can release the WAL insertion locks, allowing other xacts to
         * proceed while we are flushing disk buffers.
         */
-       WALInsertSlotRelease();
+       WALInsertLockRelease();
 
        /* Update the info_lck-protected copy of RedoRecPtr as well */
        SpinLockAcquire(&xlogctl->info_lck);
@@ -8380,46 +8096,6 @@ CreateCheckPoint(int flags)
 
        TRACE_POSTGRESQL_CHECKPOINT_START(flags);
 
-       /*
-        * In some cases there are groups of actions that must all occur on one
-        * side or the other of a checkpoint record. Before flushing the
-        * checkpoint record we must explicitly wait for any backend currently
-        * performing those groups of actions.
-        *
-        * One example is end of transaction, so we must wait for any transactions
-        * that are currently in commit critical sections.      If an xact inserted
-        * its commit record into XLOG just before the REDO point, then a crash
-        * restart from the REDO point would not replay that record, which means
-        * that our flushing had better include the xact's update of pg_clog.  So
-        * we wait till he's out of his commit critical section before proceeding.
-        * See notes in RecordTransactionCommit().
-        *
-        * Because we've already released the insertion slots, this test is a bit
-        * fuzzy: it is possible that we will wait for xacts we didn't really need
-        * to wait for.  But the delay should be short and it seems better to make
-        * checkpoint take a bit longer than to hold off insertions longer than
-        * necessary.
-        * (In fact, the whole reason we have this issue is that xact.c does
-        * commit record XLOG insertion and clog update as two separate steps
-        * protected by different locks, but again that seems best on grounds of
-        * minimizing lock contention.)
-        *
-        * A transaction that has not yet set delayChkpt when we look cannot be at
-        * risk, since he's not inserted his commit record yet; and one that's
-        * already cleared it is not at risk either, since he's done fixing clog
-        * and we will correctly flush the update below.  So we cannot miss any
-        * xacts we need to wait for.
-        */
-       vxids = GetVirtualXIDsDelayingChkpt(&nvxids);
-       if (nvxids > 0)
-       {
-               do
-               {
-                       pg_usleep(10000L);      /* wait for 10 msec */
-               } while (HaveVirtualXIDsDelayingChkpt(vxids, nvxids));
-       }
-       pfree(vxids);
-
        /*
         * Get the other info we need for the checkpoint record.
         */
@@ -8456,6 +8132,45 @@ CreateCheckPoint(int flags)
         */
        END_CRIT_SECTION();
 
+       /*
+        * In some cases there are groups of actions that must all occur on one
+        * side or the other of a checkpoint record. Before flushing the
+        * checkpoint record we must explicitly wait for any backend currently
+        * performing those groups of actions.
+        *
+        * One example is end of transaction, so we must wait for any transactions
+        * that are currently in commit critical sections.  If an xact inserted
+        * its commit record into XLOG just before the REDO point, then a crash
+        * restart from the REDO point would not replay that record, which means
+        * that our flushing had better include the xact's update of pg_clog.  So
+        * we wait till he's out of his commit critical section before proceeding.
+        * See notes in RecordTransactionCommit().
+        *
+        * Because we've already released the insertion locks, this test is a bit
+        * fuzzy: it is possible that we will wait for xacts we didn't really need
+        * to wait for.  But the delay should be short and it seems better to make
+        * checkpoint take a bit longer than to hold off insertions longer than
+        * necessary. (In fact, the whole reason we have this issue is that xact.c
+        * does commit record XLOG insertion and clog update as two separate steps
+        * protected by different locks, but again that seems best on grounds of
+        * minimizing lock contention.)
+        *
+        * A transaction that has not yet set delayChkpt when we look cannot be at
+        * risk, since he's not inserted his commit record yet; and one that's
+        * already cleared it is not at risk either, since he's done fixing clog
+        * and we will correctly flush the update below.  So we cannot miss any
+        * xacts we need to wait for.
+        */
+       vxids = GetVirtualXIDsDelayingChkpt(&nvxids);
+       if (nvxids > 0)
+       {
+               do
+               {
+                       pg_usleep(10000L);      /* wait for 10 msec */
+               } while (HaveVirtualXIDsDelayingChkpt(vxids, nvxids));
+       }
+       pfree(vxids);
+
        CheckPointGuts(checkPoint.redo, flags);
 
        /*
@@ -8558,6 +8273,12 @@ CreateCheckPoint(int flags)
         */
        END_CRIT_SECTION();
 
+       /*
+        * Now that the checkpoint is safely on disk, we can update the point to
+        * which multixact can be truncated.
+        */
+       MultiXactSetSafeTruncate(checkPoint.oldestMulti);
+
        /*
         * Let smgr do post-checkpoint cleanup (eg, deleting old files).
         */
@@ -8583,13 +8304,18 @@ CreateCheckPoint(int flags)
 
        /*
         * Truncate pg_subtrans if possible.  We can throw away all data before
-        * the oldest XMIN of any running transaction.  No future transaction will
+        * the oldest XMIN of any running transaction.  No future transaction will
         * attempt to reference any pg_subtrans entry older than that (see Asserts
-        * in subtrans.c).      During recovery, though, we mustn't do this because
+        * in subtrans.c).  During recovery, though, we mustn't do this because
         * StartupSUBTRANS hasn't been called yet.
         */
        if (!RecoveryInProgress())
-               TruncateSUBTRANS(GetOldestXmin(true, false));
+               TruncateSUBTRANS(GetOldestXmin(NULL, false));
+
+       /*
+        * Truncate pg_multixact too.
+        */
+       TruncateMultiXact();
 
        /* Real work is done, but log and update stats before releasing lock. */
        LogCheckpointEnd(false);
@@ -8612,7 +8338,7 @@ CreateCheckPoint(int flags)
  * CreateRestartPoint() allows for the case where recovery may end before
  * the restartpoint completes so there is no concern of concurrent behaviour.
  */
-void
+static void
 CreateEndOfRecoveryRecord(void)
 {
        xl_end_of_recovery xlrec;
@@ -8625,10 +8351,10 @@ CreateEndOfRecoveryRecord(void)
 
        xlrec.end_time = time(NULL);
 
-       WALInsertSlotAcquire(true);
+       WALInsertLockAcquireExclusive();
        xlrec.ThisTimeLineID = ThisTimeLineID;
        xlrec.PrevTimeLineID = XLogCtl->PrevTimeLineID;
-       WALInsertSlotRelease();
+       WALInsertLockRelease();
 
        LocalSetXLogInsertAllowed();
 
@@ -8674,6 +8400,8 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
        CheckPointPredicate();
        CheckPointRelationMap();
        CheckPointReplicationSlots();
+       CheckPointSnapBuild();
+       CheckPointLogicalRewriteHeap();
        CheckPointBuffers(flags);       /* performs all required fsyncs */
        /* We deliberately delay 2PC checkpointing as long as possible */
        CheckPointTwoPhase(checkPointRedo);
@@ -8692,31 +8420,9 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
 static void
 RecoveryRestartPoint(const CheckPoint *checkPoint)
 {
-       int                     rmid;
-
        /* use volatile pointer to prevent code rearrangement */
        volatile XLogCtlData *xlogctl = XLogCtl;
 
-       /*
-        * Is it safe to restartpoint?  We must ask each of the resource managers
-        * whether they have any partial state information that might prevent a
-        * correct restart from this point.  If so, we skip this opportunity, but
-        * return at the next checkpoint record for another try.
-        */
-       for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
-       {
-               if (RmgrTable[rmid].rm_safe_restartpoint != NULL)
-                       if (!(RmgrTable[rmid].rm_safe_restartpoint()))
-                       {
-                               elog(trace_recovery(DEBUG2),
-                                        "RM %d not safe to record restart point at %X/%X",
-                                        rmid,
-                                        (uint32) (checkPoint->redo >> 32),
-                                        (uint32) checkPoint->redo);
-                               return;
-                       }
-       }
-
        /*
         * Also refrain from creating a restartpoint if we have seen any
         * references to non-existent pages. Restarting recovery from the
@@ -8834,9 +8540,9 @@ CreateRestartPoint(int flags)
         * during recovery this is just pro forma, because no WAL insertions are
         * happening.
         */
-       WALInsertSlotAcquire(true);
+       WALInsertLockAcquireExclusive();
        xlogctl->Insert.RedoRecPtr = lastCheckPoint.redo;
-       WALInsertSlotRelease();
+       WALInsertLockRelease();
 
        /* Also update the info_lck-protected copy */
        SpinLockAcquire(&xlogctl->info_lck);
@@ -8884,21 +8590,6 @@ CreateRestartPoint(int flags)
        }
        LWLockRelease(ControlFileLock);
 
-       /*
-        * Due to an historical accident multixact truncations are not WAL-logged,
-        * but just performed everytime the mxact horizon is increased. So, unless
-        * we explicitly execute truncations on a standby it will never clean out
-        * /pg_multixact which obviously is bad, both because it uses space and
-        * because we can wrap around into pre-existing data...
-        *
-        * We can only do the truncation here, after the UpdateControlFile()
-        * above, because we've now safely established a restart point, that
-        * guarantees we will not need need to access those multis.
-        *
-        * It's probably worth improving this.
-        */
-       TruncateMultiXact(lastCheckPoint.oldestMulti);
-
        /*
         * Delete old log files (those no longer needed even for previous
         * checkpoint/restartpoint) to prevent the disk holding the xlog from
@@ -8923,11 +8614,11 @@ CreateRestartPoint(int flags)
                _logSegNo--;
 
                /*
-                * Try to recycle segments on a useful timeline. If we've been promoted
-                * since the beginning of this restartpoint, use the new timeline
-                * chosen at end of recovery (RecoveryInProgress() sets ThisTimeLineID
-                * in that case). If we're still in recovery, use the timeline we're
-                * currently replaying.
+                * Try to recycle segments on a useful timeline. If we've been
+                * promoted since the beginning of this restartpoint, use the new
+                * timeline chosen at end of recovery (RecoveryInProgress() sets
+                * ThisTimeLineID in that case). If we're still in recovery, use the
+                * timeline we're currently replaying.
                 *
                 * There is no guarantee that the WAL segments will be useful on the
                 * current timeline; if recovery proceeds to a new timeline right
@@ -8957,15 +8648,30 @@ CreateRestartPoint(int flags)
                        ThisTimeLineID = 0;
        }
 
+       /*
+        * Due to an historical accident multixact truncations are not WAL-logged,
+        * but just performed everytime the mxact horizon is increased. So, unless
+        * we explicitly execute truncations on a standby it will never clean out
+        * /pg_multixact which obviously is bad, both because it uses space and
+        * because we can wrap around into pre-existing data...
+        *
+        * We can only do the truncation here, after the UpdateControlFile()
+        * above, because we've now safely established a restart point.  That
+        * guarantees we will not need to access those multis.
+        *
+        * It's probably worth improving this.
+        */
+       TruncateMultiXact();
+
        /*
         * Truncate pg_subtrans if possible.  We can throw away all data before
-        * the oldest XMIN of any running transaction.  No future transaction will
+        * the oldest XMIN of any running transaction.  No future transaction will
         * attempt to reference any pg_subtrans entry older than that (see Asserts
-        * in subtrans.c).      When hot standby is disabled, though, we mustn't do
+        * in subtrans.c).  When hot standby is disabled, though, we mustn't do
         * this because StartupSUBTRANS hasn't been called yet.
         */
        if (EnableHotStandby)
-               TruncateSUBTRANS(GetOldestXmin(true, false));
+               TruncateSUBTRANS(GetOldestXmin(NULL, false));
 
        /* Real work is done, but log and update before releasing lock. */
        LogCheckpointEnd(true);
@@ -9020,7 +8726,7 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
        /* then check whether slots limit removal further */
        if (max_replication_slots > 0 && keep != InvalidXLogRecPtr)
        {
-               XLogRecPtr slotSegNo;
+               XLogRecPtr      slotSegNo;
 
                XLByteToSeg(keep, slotSegNo);
 
@@ -9053,7 +8759,7 @@ XLogPutNextOid(Oid nextOid)
         * We need not flush the NEXTOID record immediately, because any of the
         * just-allocated OIDs could only reach disk as part of a tuple insert or
         * update that would have its own XLOG record that must follow the NEXTOID
-        * record.      Therefore, the standard buffer LSN interlock applied to those
+        * record.  Therefore, the standard buffer LSN interlock applied to those
         * records will ensure no such OID reaches disk before the NEXTOID record
         * does.
         *
@@ -9107,7 +8813,7 @@ XLogRestorePoint(const char *rpName)
        xl_restore_point xlrec;
 
        xlrec.rp_time = GetCurrentTimestamp();
-       strncpy(xlrec.rp_name, rpName, MAXFNAMELEN);
+       strlcpy(xlrec.rp_name, rpName, MAXFNAMELEN);
 
        rdata.buffer = InvalidBuffer;
        rdata.data = (char *) &xlrec;
@@ -9182,8 +8888,9 @@ XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
                 * lsn updates. We assume pd_lower/upper cannot be changed without an
                 * exclusive lock, so the contents bkp are not racy.
                 *
-                * With buffer_std set to false, XLogCheckBuffer() sets hole_length and
-                * hole_offset to 0; so the following code is safe for either case.
+                * With buffer_std set to false, XLogCheckBuffer() sets hole_length
+                * and hole_offset to 0; so the following code is safe for either
+                * case.
                 */
                memcpy(copied_buffer, origdata, bkpb.hole_offset);
                memcpy(copied_buffer + bkpb.hole_offset,
@@ -9237,6 +8944,7 @@ XLogReportParameters(void)
                {
                        XLogRecData rdata;
                        xl_parameter_change xlrec;
+                       XLogRecPtr      recptr;
 
                        xlrec.MaxConnections = MaxConnections;
                        xlrec.max_worker_processes = max_worker_processes;
@@ -9250,7 +8958,8 @@ XLogReportParameters(void)
                        rdata.len = sizeof(xlrec);
                        rdata.next = NULL;
 
-                       XLogInsert(RM_XLOG_ID, XLOG_PARAMETER_CHANGE, &rdata);
+                       recptr = XLogInsert(RM_XLOG_ID, XLOG_PARAMETER_CHANGE, &rdata);
+                       XLogFlush(recptr);
                }
 
                ControlFile->MaxConnections = MaxConnections;
@@ -9296,9 +9005,9 @@ UpdateFullPageWrites(void)
         */
        if (fullPageWrites)
        {
-               WALInsertSlotAcquire(true);
+               WALInsertLockAcquireExclusive();
                Insert->fullPageWrites = true;
-               WALInsertSlotRelease();
+               WALInsertLockRelease();
        }
 
        /*
@@ -9319,9 +9028,9 @@ UpdateFullPageWrites(void)
 
        if (!fullPageWrites)
        {
-               WALInsertSlotAcquire(true);
+               WALInsertLockAcquireExclusive();
                Insert->fullPageWrites = false;
-               WALInsertSlotRelease();
+               WALInsertLockRelease();
        }
        END_CRIT_SECTION();
 }
@@ -9393,7 +9102,7 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
                /*
                 * We used to try to take the maximum of ShmemVariableCache->nextOid
                 * and the recorded nextOid, but that fails if the OID counter wraps
-                * around.      Since no OID allocation should be happening during replay
+                * around.  Since no OID allocation should be happening during replay
                 * anyway, better to just believe the record exactly.  We still take
                 * OidGenLock while setting the variable, just in case.
                 */
@@ -9420,6 +9129,7 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
                                                          checkPoint.nextMultiOffset);
                SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
                SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
+               MultiXactSetSafeTruncate(checkPoint.oldestMulti);
 
                /*
                 * If we see a shutdown checkpoint while waiting for an end-of-backup
@@ -9520,6 +9230,7 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
                                                                  checkPoint.oldestXidDB);
                MultiXactAdvanceOldest(checkPoint.oldestMulti,
                                                           checkPoint.oldestMultiDB);
+               MultiXactSetSafeTruncate(checkPoint.oldestMulti);
 
                /* ControlFile->checkPointCopy always tracks the latest ckpt XID */
                ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
@@ -9583,10 +9294,10 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
                BkpBlock        bkpb;
 
                /*
-                * Full-page image (FPI) records contain a backup block stored "inline"
-                * in the normal data since the locking when writing hint records isn't
-                * sufficient to use the normal backup block mechanism, which assumes
-                * exclusive lock on the buffer supplied.
+                * Full-page image (FPI) records contain a backup block stored
+                * "inline" in the normal data since the locking when writing hint
+                * records isn't sufficient to use the normal backup block mechanism,
+                * which assumes exclusive lock on the buffer supplied.
                 *
                 * Since the only change in these backup block are hint bits, there
                 * are no recovery conflicts generated.
@@ -9736,7 +9447,7 @@ get_sync_bit(int method)
 
        /*
         * Optimize writes by bypassing kernel cache with O_DIRECT when using
-        * O_SYNC/O_FSYNC and O_DSYNC.  But only if archiving and streaming are
+        * O_SYNC/O_FSYNC and O_DSYNC.  But only if archiving and streaming are
         * disabled, otherwise the archive command or walsender process will read
         * the WAL soon after writing it, which is guaranteed to cause a physical
         * read if we bypassed the kernel cache. We also skip the
@@ -9940,7 +9651,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
         * during an on-line backup even if not doing so at other times, because
         * it's quite possible for the backup dump to obtain a "torn" (partially
         * written) copy of a database page if it reads the page concurrently with
-        * our write to the same page.  This can be fixed as long as the first
+        * our write to the same page.  This can be fixed as long as the first
         * write to the page in the WAL sequence is a full-page write. Hence, we
         * turn on forcePageWrites and then force a CHECKPOINT, to ensure there
         * are no dirty pages in shared memory that might get dumped while the
@@ -9952,15 +9663,15 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
         * Note that forcePageWrites has no effect during an online backup from
         * the standby.
         *
-        * We must hold all the insertion slots to change the value of
+        * We must hold all the insertion locks to change the value of
         * forcePageWrites, to ensure adequate interlocking against XLogInsert().
         */
-       WALInsertSlotAcquire(true);
+       WALInsertLockAcquireExclusive();
        if (exclusive)
        {
                if (XLogCtl->Insert.exclusiveBackup)
                {
-                       WALInsertSlotRelease();
+                       WALInsertLockRelease();
                        ereport(ERROR,
                                        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                         errmsg("a backup is already in progress"),
@@ -9971,7 +9682,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
        else
                XLogCtl->Insert.nonExclusiveBackups++;
        XLogCtl->Insert.forcePageWrites = true;
-       WALInsertSlotRelease();
+       WALInsertLockRelease();
 
        /* Ensure we release forcePageWrites if fail below */
        PG_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) BoolGetDatum(exclusive));
@@ -9984,7 +9695,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
                 * old timeline IDs.  That would otherwise happen if you called
                 * pg_start_backup() right after restoring from a PITR archive: the
                 * first WAL segment containing the startup checkpoint has pages in
-                * the beginning with the old timeline ID.      That can cause trouble at
+                * the beginning with the old timeline ID.  That can cause trouble at
                 * recovery: we won't have a history file covering the old timeline if
                 * pg_xlog directory was not included in the base backup and the WAL
                 * archive was cleared too before starting the backup.
@@ -10007,7 +9718,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
                        bool            checkpointfpw;
 
                        /*
-                        * Force a CHECKPOINT.  Aside from being necessary to prevent torn
+                        * Force a CHECKPOINT.  Aside from being necessary to prevent torn
                         * page problems, this guarantees that two successive backup runs
                         * will have different checkpoint positions and hence different
                         * history file names, even if nothing happened in between.
@@ -10086,13 +9797,13 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
                         * taking a checkpoint right after another is not that expensive
                         * either because only few buffers have been dirtied yet.
                         */
-                       WALInsertSlotAcquire(true);
+                       WALInsertLockAcquireExclusive();
                        if (XLogCtl->Insert.lastBackupStart < startpoint)
                        {
                                XLogCtl->Insert.lastBackupStart = startpoint;
                                gotUniqueStartpoint = true;
                        }
-                       WALInsertSlotRelease();
+                       WALInsertLockRelease();
                } while (!gotUniqueStartpoint);
 
                XLByteToSeg(startpoint, _logSegNo);
@@ -10182,7 +9893,7 @@ pg_start_backup_callback(int code, Datum arg)
        bool            exclusive = DatumGetBool(arg);
 
        /* Update backup counters and forcePageWrites on failure */
-       WALInsertSlotAcquire(true);
+       WALInsertLockAcquireExclusive();
        if (exclusive)
        {
                Assert(XLogCtl->Insert.exclusiveBackup);
@@ -10199,7 +9910,7 @@ pg_start_backup_callback(int code, Datum arg)
        {
                XLogCtl->Insert.forcePageWrites = false;
        }
-       WALInsertSlotRelease();
+       WALInsertLockRelease();
 }
 
 /*
@@ -10268,7 +9979,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
        /*
         * OK to update backup counters and forcePageWrites
         */
-       WALInsertSlotAcquire(true);
+       WALInsertLockAcquireExclusive();
        if (exclusive)
                XLogCtl->Insert.exclusiveBackup = false;
        else
@@ -10288,7 +9999,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
        {
                XLogCtl->Insert.forcePageWrites = false;
        }
-       WALInsertSlotRelease();
+       WALInsertLockRelease();
 
        if (exclusive)
        {
@@ -10567,13 +10278,13 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
  * an error handler.
  *
  * NB: This is only for aborting a non-exclusive backup that doesn't write
- * backup_label. A backup started with pg_stop_backup() needs to be finished
+ * backup_label. A backup started with pg_start_backup() needs to be finished
  * with pg_stop_backup().
  */
 void
 do_pg_abort_backup(void)
 {
-       WALInsertSlotAcquire(true);
+       WALInsertLockAcquireExclusive();
        Assert(XLogCtl->Insert.nonExclusiveBackups > 0);
        XLogCtl->Insert.nonExclusiveBackups--;
 
@@ -10582,7 +10293,7 @@ do_pg_abort_backup(void)
        {
                XLogCtl->Insert.forcePageWrites = false;
        }
-       WALInsertSlotRelease();
+       WALInsertLockRelease();
 }
 
 /*
@@ -10660,7 +10371,7 @@ GetOldestRestartPoint(XLogRecPtr *oldrecptr, TimeLineID *oldtli)
  *
  * If we see a backup_label during recovery, we assume that we are recovering
  * from a backup dump file, and we therefore roll forward from the checkpoint
- * identified by the label file, NOT what pg_control says.     This avoids the
+ * identified by the label file, NOT what pg_control says.  This avoids the
  * problem that pg_control might have been archived one or more checkpoints
  * later than the start of the dump, and so if we rely on it as the start
  * point, we will fail to restore a consistent database state.
@@ -10755,9 +10466,7 @@ rm_redo_error_callback(void *arg)
        StringInfoData buf;
 
        initStringInfo(&buf);
-       RmgrTable[record->xl_rmid].rm_desc(&buf,
-                                                                          record->xl_info,
-                                                                          XLogRecGetData(record));
+       RmgrTable[record->xl_rmid].rm_desc(&buf, record);
 
        /* don't bother emitting empty description */
        if (buf.len > 0)
@@ -11007,7 +10716,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
         * Standby mode is implemented by a state machine:
         *
         * 1. Read from either archive or pg_xlog (XLOG_FROM_ARCHIVE), or just
-        *    pg_xlog (XLOG_FROM_XLOG)
+        *        pg_xlog (XLOG_FROM_XLOG)
         * 2. Check trigger file
         * 3. Read from primary server via walreceiver (XLOG_FROM_STREAM)
         * 4. Rescan timelines
@@ -11208,8 +10917,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                                 * file from pg_xlog.
                                 */
                                readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2,
-                                               currentSource == XLOG_FROM_ARCHIVE ? XLOG_FROM_ANY :
-                                                                                currentSource);
+                                                currentSource == XLOG_FROM_ARCHIVE ? XLOG_FROM_ANY :
+                                                                                         currentSource);
                                if (readFile >= 0)
                                        return true;    /* success! */
 
@@ -11266,11 +10975,11 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                                        if (havedata)
                                        {
                                                /*
-                                                * Great, streamed far enough.  Open the file if it's
+                                                * Great, streamed far enough.  Open the file if it's
                                                 * not open already.  Also read the timeline history
                                                 * file if we haven't initialized timeline history
                                                 * yet; it should be streamed over and present in
-                                                * pg_xlog by now.      Use XLOG_FROM_STREAM so that
+                                                * pg_xlog by now.  Use XLOG_FROM_STREAM so that
                                                 * source info is set correctly and XLogReceiptTime
                                                 * isn't changed.
                                                 */
@@ -11333,9 +11042,9 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                 * process.
                 */
                HandleStartupProcInterrupts();
-       } while (StandbyMode);
+       }
 
-       return false;
+       return false;                           /* not reached */
 }
 
 /*
@@ -11343,9 +11052,9 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
  * in the current WAL page, previously read by XLogPageRead().
  *
  * 'emode' is the error mode that would be used to report a file-not-found
- * or legitimate end-of-WAL situation.  Generally, we use it as-is, but if
+ * or legitimate end-of-WAL situation.   Generally, we use it as-is, but if
  * we're retrying the exact same record that we've tried previously, only
- * complain the first time to keep the noise down.     However, we only do when
+ * complain the first time to keep the noise down.  However, we only do when
  * reading from pg_xlog, because we don't expect any invalid records in archive
  * or in records streamed from master. Files in the archive should be complete,
  * and we should never hit the end of WAL because we stop and wait for more WAL
@@ -11424,6 +11133,12 @@ CheckForStandbyTrigger(void)
                fast_promote = true;
                return true;
        }
+       else if (errno != ENOENT)
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not stat trigger file \"%s\": %m",
+                                               TriggerFile)));
+
        return false;
 }