]> granicus.if.org Git - postgresql/commitdiff
Improve scalability of WAL insertions.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Mon, 8 Jul 2013 08:23:56 +0000 (11:23 +0300)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Mon, 8 Jul 2013 08:23:56 +0000 (11:23 +0300)
This patch replaces WALInsertLock with a number of WAL insertion slots,
allowing multiple backends to insert WAL records to the WAL buffers
concurrently. This is particularly useful for parallel loading large amounts
of data on a system with many CPUs.

This has one user-visible change: switching to a new WAL segment with
pg_switch_xlog() now fills the remaining unused portion of the segment with
zeros. This potentially adds some overhead, but it has been a very common
practice by DBA's to clear the "tail" of the segment with an external
pg_clearxlogtail utility anyway, to make the WAL files compress better.
With this patch, it's no longer necessary to do that.

This patch adds a new GUC, xloginsert_slots, to tune the number of WAL
insertion slots. Performance testing suggests that the default, 8, works
pretty well for all kinds of worklods, but I left the GUC in place to allow
others with different hardware to test that easily. We might want to remove
that before release.

Reviewed by Andres Freund.

src/backend/access/transam/xlog.c
src/backend/storage/lmgr/spin.c
src/backend/utils/misc/guc.c
src/include/access/xlog.h
src/include/access/xlogdefs.h
src/include/storage/lwlock.h

index 77e5c3b5d85ad7907e9b5403bbe93141a297c468..acf0dd187619b03a2fc08387fd61301b05d6dd3c 100644 (file)
@@ -41,6 +41,7 @@
 #include "postmaster/startup.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
+#include "storage/barrier.h"
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
@@ -83,6 +84,7 @@ int                   sync_method = DEFAULT_SYNC_METHOD;
 int                    wal_level = WAL_LEVEL_MINIMAL;
 int                    CommitDelay = 0;        /* precommit delay in microseconds */
 int                    CommitSiblings = 5; /* # concurrent xacts needed to sleep */
+int                    num_xloginsert_slots = 8;
 
 #ifdef WAL_DEBUG
 bool           XLOG_DEBUG = false;
@@ -279,8 +281,8 @@ XLogRecPtr  XactLastRecEnd = InvalidXLogRecPtr;
  * (which is almost but not quite the same as a pointer to the most recent
  * CHECKPOINT record). We update this from the shared-memory copy,
  * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
- * hold the Insert lock).  See XLogInsert for details. We are also allowed
- * to update from XLogCtl->Insert.RedoRecPtr if we hold the info_lck;
+ * hold an insertion slot).  See XLogInsert for details.  We are also allowed
+ * to update from XLogCtl->RedoRecPtr if we hold the info_lck;
  * see GetRedoRecPtr.  A freshly spawned backend obtains the value during
  * InitXLOGAccess.
  */
@@ -321,7 +323,10 @@ static XLogRecPtr RedoStartLSN = InvalidXLogRecPtr;
  * so it's a plain spinlock.  The other locks are held longer (potentially
  * over I/O operations), so we use LWLocks for them.  These locks are:
  *
- * WALInsertLock: must be held to insert a record into the WAL buffers.
+ * WALBufMappingLock: must be held to replace a page in the WAL buffer cache.
+ * It is only held while initializing and changing the mapping.  If the
+ * contents of the buffer being replaced haven't been written yet, the mapping
+ * lock is released while the write is done, and reacquired afterwards.
  *
  * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
  * XLogFlush).
@@ -348,17 +353,83 @@ typedef struct XLogwrtResult
        XLogRecPtr      Flush;                  /* last byte + 1 flushed */
 } XLogwrtResult;
 
+
+/*
+ * A slot for inserting to the WAL. This is similar to an LWLock, the main
+ * difference is that there is an extra xlogInsertingAt field that is protected
+ * by the same mutex. Unlike an LWLock, a slot can only be acquired in
+ * exclusive mode.
+ *
+ * The xlogInsertingAt field is used to advertise to other processes how far
+ * the slot owner has progressed in inserting the record. When a backend
+ * acquires a slot, it initializes xlogInsertingAt to 1, because it doesn't
+ * yet know where it's going to insert the record. That's conservative
+ * but correct; the new insertion is certainly going to go to a byte position
+ * greater than 1. If another backend needs to flush the WAL, it will have to
+ * wait for the new insertion. xlogInsertingAt is updated after finishing the
+ * insert or when crossing a page boundary, which will wake up anyone waiting
+ * for it, whether the wait was necessary in the first place or not.
+ *
+ * A process can wait on a slot in two modes: LW_EXCLUSIVE or
+ * LW_WAIT_UNTIL_FREE. LW_EXCLUSIVE works like in an lwlock; when the slot is
+ * released, the first LW_EXCLUSIVE waiter in the queue is woken up. Processes
+ * waiting in LW_WAIT_UNTIL_FREE mode are woken up whenever the slot is
+ * released, or xlogInsertingAt is updated. In other words, a process in
+ * LW_WAIT_UNTIL_FREE mode is woken up whenever the inserter makes any progress
+ * copying the record in place. LW_WAIT_UNTIL_FREE waiters are always added to
+ * the front of the queue, while LW_EXCLUSIVE waiters are appended to the end.
+ *
+ * To join the wait queue, a process must set MyProc->lwWaitMode to the mode
+ * it wants to wait in, MyProc->lwWaiting to true, and link MyProc to the head
+ * or tail of the wait queue. The same mechanism is used to wait on an LWLock,
+ * see lwlock.c for details.
+ */
+typedef struct
+{
+       slock_t         mutex;                  /* protects the below fields */
+       XLogRecPtr      xlogInsertingAt; /* insert has completed up to this point */
+
+       PGPROC     *owner;                      /* for debugging purposes */
+
+       bool            releaseOK;              /* T if ok to release waiters */
+       char            exclusive;              /* # of exclusive holders (0 or 1) */
+       PGPROC     *head;                       /* head of list of waiting PGPROCs */
+       PGPROC     *tail;                       /* tail of list of waiting PGPROCs */
+       /* tail is undefined when head is NULL */
+} XLogInsertSlot;
+
+/*
+ * All the slots are allocated as an array in shared memory. We force the
+ * array stride to be a power of 2, which saves a few cycles in indexing, but
+ * more importantly also ensures that individual slots don't cross cache line
+ * boundaries. (Of course, we have to also ensure that the array start
+ * address is suitably aligned.)
+ */
+typedef union XLogInsertSlotPadded
+{
+       XLogInsertSlot slot;
+       char            pad[64];
+} XLogInsertSlotPadded;
+
 /*
  * Shared state data for XLogInsert.
  */
 typedef struct XLogCtlInsert
 {
-       XLogRecPtr      PrevRecord;             /* start of previously-inserted record */
-       int                     curridx;                /* current block index in cache */
-       XLogPageHeader currpage;        /* points to header of block in cache */
-       char       *currpos;            /* current insertion point in cache */
-       XLogRecPtr      RedoRecPtr;             /* current redo point for insertions */
-       bool            forcePageWrites;        /* forcing full-page writes for PITR? */
+       slock_t         insertpos_lck;  /* protects CurrBytePos and PrevBytePos */
+
+       /*
+        * CurrBytePos is the end of reserved WAL. The next record will be inserted
+        * at that position. PrevBytePos is the start position of the previously
+        * inserted (or rather, reserved) record - it is copied to the the prev-
+        * link of the next record. These are stored as "usable byte positions"
+        * rather than XLogRecPtrs (see XLogBytePosToRecPtr()).
+        */
+       uint64          CurrBytePos;
+       uint64          PrevBytePos;
+
+       /* insertion slots, see above for details */
+       XLogInsertSlotPadded *insertSlots;
 
        /*
         * fullPageWrites is the master copy used by all backends to determine
@@ -366,7 +437,12 @@ typedef struct XLogCtlInsert
         * This is required because, when full_page_writes is changed by SIGHUP,
         * we must WAL-log it before it actually affects WAL-logging by backends.
         * Checkpointer sets at startup or after SIGHUP.
+        *
+        * To read these fields, you must hold an insertion slot. To modify them,
+        * you must hold ALL the slots.
         */
+       XLogRecPtr      RedoRecPtr;             /* current redo point for insertions */
+       bool            forcePageWrites;        /* forcing full-page writes for PITR? */
        bool            fullPageWrites;
 
        /*
@@ -395,11 +471,11 @@ typedef struct XLogCtlWrite
  */
 typedef struct XLogCtlData
 {
-       /* Protected by WALInsertLock: */
        XLogCtlInsert Insert;
 
        /* Protected by info_lck: */
        XLogwrtRqst LogwrtRqst;
+       XLogRecPtr      RedoRecPtr;             /* a recent copy of Insert->RedoRecPtr */
        uint32          ckptXidEpoch;   /* nextXID & epoch of latest checkpoint */
        TransactionId ckptXid;
        XLogRecPtr      asyncXactLSN;   /* LSN of newest async commit/abort */
@@ -419,10 +495,21 @@ typedef struct XLogCtlData
         */
        XLogwrtResult LogwrtResult;
 
+       /*
+        * Latest initialized block index in cache.
+        *
+        * To change curridx and the identity of a buffer, you need to hold
+        * WALBufMappingLock.  To change the identity of a buffer that's still
+        * dirty, the old page needs to be written out first, and for that you
+        * need WALWriteLock, and you need to ensure that there are no in-progress
+        * insertions to the page by calling WaitXLogInsertionsToFinish().
+        */
+       int                     curridx;
+
        /*
         * These values do not change after startup, although the pointed-to pages
-        * and xlblocks values certainly do.  Permission to read/write the pages
-        * and xlblocks values depends on WALInsertLock and WALWriteLock.
+        * and xlblocks values certainly do.  xlblock values are protected by
+        * WALBufMappingLock.
         */
        char       *pages;                      /* buffers for unwritten XLOG pages */
        XLogRecPtr *xlblocks;           /* 1st byte ptr-s + XLOG_BLCKSZ */
@@ -518,24 +605,34 @@ static XLogCtlData *XLogCtl = NULL;
 static ControlFileData *ControlFile = NULL;
 
 /*
- * Macros for managing XLogInsert state.  In most cases, the calling routine
- * has local copies of XLogCtl->Insert and/or XLogCtl->Insert->curridx,
- * so these are passed as parameters instead of being fetched via XLogCtl.
+ * Calculate the amount of space left on the page after 'endptr'. Beware
+ * multiple evaluation!
  */
+#define INSERT_FREESPACE(endptr)       \
+       (((endptr) % XLOG_BLCKSZ == 0) ? 0 : (XLOG_BLCKSZ - (endptr) % XLOG_BLCKSZ))
 
-/* Free space remaining in the current xlog page buffer */
-#define INSERT_FREESPACE(Insert)  \
-       (XLOG_BLCKSZ - ((Insert)->currpos - (char *) (Insert)->currpage))
+/* Macro to advance to next buffer index. */
+#define NextBufIdx(idx)                \
+               (((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
 
-/* Construct XLogRecPtr value for current insertion point */
-#define INSERT_RECPTR(recptr,Insert,curridx)  \
-               (recptr) = XLogCtl->xlblocks[curridx] - INSERT_FREESPACE(Insert)
+/*
+ * XLogRecPtrToBufIdx returns the index of the WAL buffer that holds, or
+ * would hold if it was in cache, the page containing 'recptr'.
+ *
+ * XLogRecEndPtrToBufIdx is the same, but a pointer to the first byte of a
+ * page is taken to mean the previous page.
+ */
+#define XLogRecPtrToBufIdx(recptr)     \
+       (((recptr) / XLOG_BLCKSZ) % (XLogCtl->XLogCacheBlck + 1))
 
-#define PrevBufIdx(idx)                \
-               (((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))
+#define XLogRecEndPtrToBufIdx(recptr)  \
+       ((((recptr) - 1) / XLOG_BLCKSZ) % (XLogCtl->XLogCacheBlck + 1))
 
-#define NextBufIdx(idx)                \
-               (((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
+/*
+ * These are the number of bytes in a WAL page and segment usable for WAL data.
+ */
+#define UsableBytesInPage (XLOG_BLCKSZ - SizeOfXLogShortPHD)
+#define UsableBytesInSegment ((XLOG_SEG_SIZE / XLOG_BLCKSZ) * UsableBytesInPage - (SizeOfXLogLongPHD - SizeOfXLogShortPHD))
 
 /*
  * Private, possibly out-of-date copy of shared LogwrtResult.
@@ -631,6 +728,9 @@ static bool InRedo = false;
 /* Have we launched bgwriter during recovery? */
 static bool bgwriterLaunched = false;
 
+/* For WALInsertSlotAcquire/Release functions */
+static int     MySlotNo = 0;
+static bool holdingAllSlots = false;
 
 static void readRecoveryCommandFile(void);
 static void exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo);
@@ -651,9 +751,9 @@ static bool XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock,
                                XLogRecPtr *lsn, BkpBlock *bkpb);
 static Buffer RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb,
                                                 char *blk, bool get_cleanup_lock, bool keep_buffer);
-static bool AdvanceXLInsertBuffer(bool new_segment);
+static void AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic);
 static bool XLogCheckpointNeeded(XLogSegNo new_segno);
-static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
+static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
 static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
                                           bool find_free, int *max_advance,
                                           bool use_lock);
@@ -693,6 +793,24 @@ static bool read_backup_label(XLogRecPtr *checkPointLoc,
 static void rm_redo_error_callback(void *arg);
 static int     get_sync_bit(int method);
 
+static void CopyXLogRecordToWAL(int write_len, bool isLogSwitch,
+                                 XLogRecData *rdata,
+                                 XLogRecPtr StartPos, XLogRecPtr EndPos);
+static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
+                                                 XLogRecPtr *EndPos, XLogRecPtr *PrevPtr);
+static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
+                                 XLogRecPtr *PrevPtr);
+static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
+static void WakeupWaiters(XLogRecPtr EndPos);
+static char *GetXLogBuffer(XLogRecPtr ptr);
+static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
+static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
+static uint64 XLogRecPtrToBytePos(XLogRecPtr ptr);
+
+static void WALInsertSlotAcquire(bool exclusive);
+static void WALInsertSlotAcquireOne(int slotno);
+static void WALInsertSlotRelease(void);
+static void WALInsertSlotReleaseOne(int slotno);
 
 /*
  * Insert an XLOG record having the specified RMID and info bytes,
@@ -713,10 +831,6 @@ XLogRecPtr
 XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
 {
        XLogCtlInsert *Insert = &XLogCtl->Insert;
-       XLogRecPtr      RecPtr;
-       XLogRecPtr      WriteRqst;
-       uint32          freespace;
-       int                     curridx;
        XLogRecData *rdt;
        XLogRecData *rdt_lastnormal;
        Buffer          dtbuf[XLR_MAX_BKP_BLOCKS];
@@ -731,11 +845,13 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
        uint32          len,
                                write_len;
        unsigned        i;
-       bool            updrqst;
        bool            doPageWrites;
        bool            isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
+       bool            inserted;
        uint8           info_orig = info;
        static XLogRecord *rechdr;
+       XLogRecPtr      StartPos;
+       XLogRecPtr      EndPos;
 
        if (rechdr == NULL)
        {
@@ -761,8 +877,8 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
         */
        if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
        {
-               RecPtr = SizeOfXLogLongPHD;             /* start of 1st chkpt record */
-               return RecPtr;
+               EndPos = SizeOfXLogLongPHD;             /* start of 1st chkpt record */
+               return EndPos;
        }
 
        /*
@@ -770,9 +886,9 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
         * up.
         *
         * We may have to loop back to here if a race condition is detected below.
-        * We could prevent the race by doing all this work while holding the
-        * insert lock, but it seems better to avoid doing CRC calculations while
-        * holding the lock.
+        * We could prevent the race by doing all this work while holding an
+        * insertion slot, but it seems better to avoid doing CRC calculations
+        * while holding one.
         *
         * We add entries for backup blocks to the chain, so that they don't need
         * any special treatment in the critical section where the chunks are
@@ -789,8 +905,8 @@ begin:;
        /*
         * Decide if we need to do full-page writes in this XLOG record: true if
         * full_page_writes is on or we have a PITR request for it.  Since we
-        * don't yet have the insert lock, fullPageWrites and forcePageWrites
-        * could change under us, but we'll recheck them once we have the lock.
+        * don't yet have an insertion slot, fullPageWrites and forcePageWrites
+        * could change under us, but we'll recheck them once we have a slot.
         */
        doPageWrites = Insert->fullPageWrites || Insert->forcePageWrites;
 
@@ -930,25 +1046,60 @@ begin:;
                COMP_CRC32(rdata_crc, rdt->data, rdt->len);
 
        /*
-        * Construct record header (prev-link and CRC are filled in later), and
-        * make that the first chunk in the chain.
+        * Construct record header (prev-link is filled in later, after reserving
+        * the space for the record), and make that the first chunk in the chain.
+        *
+        * The CRC calculated for the header here doesn't include prev-link,
+        * because we don't know it yet. It will be added later.
         */
        rechdr->xl_xid = GetCurrentTransactionIdIfAny();
        rechdr->xl_tot_len = SizeOfXLogRecord + write_len;
        rechdr->xl_len = len;           /* doesn't include backup blocks */
        rechdr->xl_info = info;
        rechdr->xl_rmid = rmid;
+       rechdr->xl_prev = InvalidXLogRecPtr;
+       COMP_CRC32(rdata_crc, ((char *) rechdr), offsetof(XLogRecord, xl_prev));
 
        hdr_rdt.next = rdata;
        hdr_rdt.data = (char *) rechdr;
        hdr_rdt.len = SizeOfXLogRecord;
-
        write_len += SizeOfXLogRecord;
 
+       /*----------
+        *
+        * We have now done all the preparatory work we can without holding a
+        * lock or modifying shared state. From here on, inserting the new WAL
+        * record to the shared WAL buffer cache is a two-step process:
+        *
+        * 1. Reserve the right amount of space from the WAL. The current head of
+        *    reserved space is kept in Insert->CurrBytePos, and is protected by
+        *    insertpos_lck.
+        *
+        * 2. Copy the record to the reserved WAL space. This involves finding the
+        *    correct WAL buffer containing the reserved space, and copying the
+        *    record in place. This can be done concurrently in multiple processes.
+        *
+        * To keep track of which insertions are still in-progress, each concurrent
+        * inserter allocates an "insertion slot", which tells others how far the
+        * inserter has progressed. There is a small fixed number of insertion
+        * slots, determined by the num_xloginsert_slots GUC. When an inserter
+        * finishes, it updates the xlogInsertingAt of its slot to the end of the
+        * record it inserted, to let others know that it's done. xlogInsertingAt
+        * is also updated when crossing over to a new WAL buffer, to allow the
+        * the previous buffer to be flushed.
+        *
+        * Holding onto a slot also protects RedoRecPtr and fullPageWrites from
+        * changing until the insertion is finished.
+        *
+        * Step 2 can usually be done completely in parallel. If the required WAL
+        * page is not initialized yet, you have to grab WALBufMappingLock to
+        * initialize it, but the WAL writer tries to do that ahead of insertions
+        * to avoid that from happening in the critical path.
+        *
+        *----------
+        */
        START_CRIT_SECTION();
-
-       /* Now wait to get insert lock */
-       LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+       WALInsertSlotAcquire(isLogSwitch);
 
        /*
         * Check to see if my RedoRecPtr is out of date.  If so, may have to go
@@ -977,7 +1128,7 @@ begin:;
                                         * Oops, this buffer now needs to be backed up, but we
                                         * didn't think so above.  Start over.
                                         */
-                                       LWLockRelease(WALInsertLock);
+                                       WALInsertSlotRelease();
                                        END_CRIT_SECTION();
                                        rdt_lastnormal->next = NULL;
                                        info = info_orig;
@@ -996,7 +1147,7 @@ begin:;
        if ((Insert->fullPageWrites || Insert->forcePageWrites) && !doPageWrites)
        {
                /* Oops, must redo it with full-page data. */
-               LWLockRelease(WALInsertLock);
+               WALInsertSlotRelease();
                END_CRIT_SECTION();
                rdt_lastnormal->next = NULL;
                info = info_orig;
@@ -1004,238 +1155,1169 @@ begin:;
        }
 
        /*
-        * If the current page is completely full, the record goes to the next
-        * page, right after the page header.
+        * Reserve space for the record in the WAL. This also sets the xl_prev
+        * pointer.
+        */
+       if (isLogSwitch)
+               inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev);
+       else
+       {
+               ReserveXLogInsertLocation(write_len, &StartPos, &EndPos,
+                                                                 &rechdr->xl_prev);
+               inserted = true;
+       }
+
+       if (inserted)
+       {
+               /*
+                * Now that xl_prev has been filled in, finish CRC calculation of the
+                * record header.
+                */
+               COMP_CRC32(rdata_crc, ((char *) &rechdr->xl_prev), sizeof(XLogRecPtr));
+               FIN_CRC32(rdata_crc);
+               rechdr->xl_crc = rdata_crc;
+
+               /*
+                * All the record data, including the header, is now ready to be
+                * inserted. Copy the record in the space reserved.
+                */
+               CopyXLogRecordToWAL(write_len, isLogSwitch, &hdr_rdt, StartPos, EndPos);
+       }
+       else
+       {
+               /*
+                * This was an xlog-switch record, but the current insert location was
+                * already exactly at the beginning of a segment, so there was no need
+                * to do anything.
+                */
+       }
+
+       /*
+        * Done! Let others know that we're finished.
+        */
+       WALInsertSlotRelease();
+
+       END_CRIT_SECTION();
+
+       /*
+        * Update shared LogwrtRqst.Write, if we crossed page boundary.
+        */
+       if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile XLogCtlData *xlogctl = XLogCtl;
+
+               SpinLockAcquire(&xlogctl->info_lck);
+               /* advance global request to include new block(s) */
+               if (xlogctl->LogwrtRqst.Write < EndPos)
+                       xlogctl->LogwrtRqst.Write = EndPos;
+               /* update local result copy while I have the chance */
+               LogwrtResult = xlogctl->LogwrtResult;
+               SpinLockRelease(&xlogctl->info_lck);
+       }
+
+       /*
+        * If this was an XLOG_SWITCH record, flush the record and the empty
+        * padding space that fills the rest of the segment, and perform
+        * end-of-segment actions (eg, notifying archiver).
+        */
+       if (isLogSwitch)
+       {
+               TRACE_POSTGRESQL_XLOG_SWITCH();
+               XLogFlush(EndPos);
+               /*
+                * Even though we reserved the rest of the segment for us, which is
+                * reflected in EndPos, we return a pointer to just the end of the
+                * xlog-switch record.
+                */
+               if (inserted)
+               {
+                       EndPos = StartPos + SizeOfXLogRecord;
+                       if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
+                       {
+                               if (EndPos % XLOG_SEG_SIZE == EndPos % XLOG_BLCKSZ)
+                                       EndPos += SizeOfXLogLongPHD;
+                               else
+                                       EndPos += SizeOfXLogShortPHD;
+                       }
+               }
+       }
+
+#ifdef WAL_DEBUG
+       if (XLOG_DEBUG)
+       {
+               StringInfoData buf;
+
+               initStringInfo(&buf);
+               appendStringInfo(&buf, "INSERT @ %X/%X: ",
+                                                (uint32) (EndPos >> 32), (uint32) EndPos);
+               xlog_outrec(&buf, rechdr);
+               if (rdata->data != NULL)
+               {
+                       appendStringInfo(&buf, " - ");
+                       RmgrTable[rechdr->xl_rmid].rm_desc(&buf, rechdr->xl_info, rdata->data);
+               }
+               elog(LOG, "%s", buf.data);
+               pfree(buf.data);
+       }
+#endif
+
+       /*
+        * Update our global variables
+        */
+       ProcLastRecPtr = StartPos;
+       XactLastRecEnd = EndPos;
+
+       return EndPos;
+}
+
+/*
+ * Reserves the right amount of space for a record of given size from the WAL.
+ * *StartPos is set to the beginning of the reserved section, *EndPos to
+ * its end+1. *PrevPtr is set to the beginning of the previous record; it is
+ * used to set the xl_prev of this record.
+ *
+ * This is the performance critical part of XLogInsert that must be serialized
+ * across backends. The rest can happen mostly in parallel. Try to keep this
+ * section as short as possible, insertpos_lck can be heavily contended on a
+ * busy system.
+ *
+ * NB: The space calculation here must match the code in CopyXLogRecordToWAL,
+ * where we actually copy the record to the reserved space.
+ */
+static void
+ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos,
+                                                 XLogRecPtr *PrevPtr)
+{
+       volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+       uint64          startbytepos;
+       uint64          endbytepos;
+       uint64          prevbytepos;
+
+       size = MAXALIGN(size);
+
+       /* All (non xlog-switch) records should contain data. */
+       Assert(size > SizeOfXLogRecord);
+
+       /*
+        * The duration the spinlock needs to be held is minimized by minimizing
+        * the calculations that have to be done while holding the lock. The
+        * current tip of reserved WAL is kept in CurrBytePos, as a byte position
+        * that only counts "usable" bytes in WAL, that is, it excludes all WAL
+        * page headers. The mapping between "usable" byte positions and physical
+        * positions (XLogRecPtrs) can be done outside the locked region, and
+        * because the usable byte position doesn't include any headers, reserving
+        * X bytes from WAL is almost as simple as "CurrBytePos += X".
+        */
+       SpinLockAcquire(&Insert->insertpos_lck);
+
+       startbytepos = Insert->CurrBytePos;
+       endbytepos = startbytepos + size;
+       prevbytepos = Insert->PrevBytePos;
+       Insert->CurrBytePos = endbytepos;
+       Insert->PrevBytePos = startbytepos;
+
+       SpinLockRelease(&Insert->insertpos_lck);
+
+       *StartPos = XLogBytePosToRecPtr(startbytepos);
+       *EndPos = XLogBytePosToEndRecPtr(endbytepos);
+       *PrevPtr = XLogBytePosToRecPtr(prevbytepos);
+
+       /*
+        * Check that the conversions between "usable byte positions" and
+        * XLogRecPtrs work consistently in both directions.
+        */
+       Assert(XLogRecPtrToBytePos(*StartPos) == startbytepos);
+       Assert(XLogRecPtrToBytePos(*EndPos) == endbytepos);
+       Assert(XLogRecPtrToBytePos(*PrevPtr) == prevbytepos);
+}
+
+/*
+ * Like ReserveXLogInsertLocation(), but for an xlog-switch record.
+ *
+ * A log-switch record is handled slightly differently. The rest of the
+ * segment will be reserved for this insertion, as indicated by the returned
+ * *EndPos_p value. However, if we are already at the beginning of the current
+ * segment, *StartPos_p and *EndPos_p are set to the current location without
+ * reserving any space, and the function returns false.
+*/
+static bool
+ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
+{
+       volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+       uint64          startbytepos;
+       uint64          endbytepos;
+       uint64          prevbytepos;
+       uint32          size = SizeOfXLogRecord;
+       XLogRecPtr      ptr;
+       uint32          segleft;
+
+       /*
+        * These calculations are a bit heavy-weight to be done while holding a
+        * spinlock, but since we're holding all the WAL insertion slots, there
+        * are no other inserters competing for it. GetXLogInsertRecPtr() does
+        * compete for it, but that's not called very frequently.
+        */
+       SpinLockAcquire(&Insert->insertpos_lck);
+
+       startbytepos = Insert->CurrBytePos;
+
+       ptr = XLogBytePosToEndRecPtr(startbytepos);
+       if (ptr % XLOG_SEG_SIZE == 0)
+       {
+               SpinLockRelease(&Insert->insertpos_lck);
+               *EndPos = *StartPos = ptr;
+               return false;
+       }
+
+       endbytepos = startbytepos + size;
+       prevbytepos = Insert->PrevBytePos;
+
+       *StartPos = XLogBytePosToRecPtr(startbytepos);
+       *EndPos = XLogBytePosToEndRecPtr(endbytepos);
+
+       segleft = XLOG_SEG_SIZE - ((*EndPos) % XLOG_SEG_SIZE);
+       if (segleft != XLOG_SEG_SIZE)
+       {
+               /* consume the rest of the segment */
+               *EndPos += segleft;
+               endbytepos = XLogRecPtrToBytePos(*EndPos);
+       }
+       Insert->CurrBytePos = endbytepos;
+       Insert->PrevBytePos = startbytepos;
+
+       SpinLockRelease(&Insert->insertpos_lck);
+
+       *PrevPtr = XLogBytePosToRecPtr(prevbytepos);
+
+       Assert((*EndPos) % XLOG_SEG_SIZE == 0);
+       Assert(XLogRecPtrToBytePos(*EndPos) == endbytepos);
+       Assert(XLogRecPtrToBytePos(*StartPos) == startbytepos);
+       Assert(XLogRecPtrToBytePos(*PrevPtr) == prevbytepos);
+
+       return true;
+}
+
+/*
+ * Subroutine of XLogInsert.  Copies a WAL record to an already-reserved
+ * area in the WAL.
+ */
+static void
+CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
+                                       XLogRecPtr StartPos, XLogRecPtr EndPos)
+{
+       char       *currpos;
+       int                     freespace;
+       int                     written;
+       XLogRecPtr      CurrPos;
+       XLogPageHeader pagehdr;
+
+       /* The first chunk is the record header */
+       Assert(rdata->len == SizeOfXLogRecord);
+
+       /*
+        * Get a pointer to the right place in the right WAL buffer to start
+        * inserting to.
+        */
+       CurrPos = StartPos;
+       currpos = GetXLogBuffer(CurrPos);
+       freespace = INSERT_FREESPACE(CurrPos);
+
+       /*
+        * there should be enough space for at least the first field (xl_tot_len)
+        * on this page.
+        */
+       Assert(freespace >= sizeof(uint32));
+
+       /* Copy record data */
+       written = 0;
+       while (rdata != NULL)
+       {
+               char       *rdata_data = rdata->data;
+               int                     rdata_len = rdata->len;
+
+               while (rdata_len > freespace)
+               {
+                       /*
+                        * Write what fits on this page, and continue on the next page.
+                        */
+                       Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || freespace == 0);
+                       memcpy(currpos, rdata_data, freespace);
+                       rdata_data += freespace;
+                       rdata_len -= freespace;
+                       written += freespace;
+                       CurrPos += freespace;
+
+                       /*
+                        * Get pointer to beginning of next page, and set the xlp_rem_len
+                        * in the page header. Set XLP_FIRST_IS_CONTRECORD.
+                        *
+                        * It's safe to set the contrecord flag and xlp_rem_len without a
+                        * lock on the page. All the other flags were already set when the
+                        * page was initialized, in AdvanceXLInsertBuffer, and we're the
+                        * only backend that needs to set the contrecord flag.
+                        */
+                       currpos = GetXLogBuffer(CurrPos);
+                       pagehdr = (XLogPageHeader) currpos;
+                       pagehdr->xlp_rem_len = write_len - written;
+                       pagehdr->xlp_info |= XLP_FIRST_IS_CONTRECORD;
+
+                       /* skip over the page header */
+                       if (CurrPos % XLogSegSize == 0)
+                       {
+                               CurrPos += SizeOfXLogLongPHD;
+                               currpos += SizeOfXLogLongPHD;
+                       }
+                       else
+                       {
+                               CurrPos += SizeOfXLogShortPHD;
+                               currpos += SizeOfXLogShortPHD;
+                       }
+                       freespace = INSERT_FREESPACE(CurrPos);
+               }
+
+               Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || rdata_len == 0);
+               memcpy(currpos, rdata_data, rdata_len);
+               currpos += rdata_len;
+               CurrPos += rdata_len;
+               freespace -= rdata_len;
+               written += rdata_len;
+
+               rdata = rdata->next;
+       }
+       Assert(written == write_len);
+
+       /* Align the end position, so that the next record starts aligned */
+       CurrPos = MAXALIGN(CurrPos);
+
+       /*
+        * If this was an xlog-switch, it's not enough to write the switch record,
+        * we also have to consume all the remaining space in the WAL segment.
+        * We have already reserved it for us, but we still need to make sure it's
+        * allocated and zeroed in the WAL buffers so that when the caller (or
+        * someone else) does XLogWrite(), it can really write out all the zeros.
+        */
+       if (isLogSwitch && CurrPos % XLOG_SEG_SIZE != 0)
+       {
+               /* An xlog-switch record doesn't contain any data besides the header */
+               Assert(write_len == SizeOfXLogRecord);
+
+               /*
+                * We do this one page at a time, to make sure we don't deadlock
+                * against ourselves if wal_buffers < XLOG_SEG_SIZE.
+                */
+               Assert(EndPos % XLogSegSize == 0);
+
+               /* Use up all the remaining space on the first page */
+               CurrPos += freespace;
+
+               while (CurrPos < EndPos)
+               {
+                       /* initialize the next page (if not initialized already) */
+                       WakeupWaiters(CurrPos);
+                       AdvanceXLInsertBuffer(CurrPos, false);
+                       CurrPos += XLOG_BLCKSZ;
+               }
+       }
+
+       if (CurrPos != EndPos)
+               elog(PANIC, "space reserved for WAL record does not match what was written");
+}
+
+/*
+ * Allocate a slot for insertion.
+ *
+ * In exclusive mode, all slots are reserved for the current process. That
+ * blocks all concurrent insertions.
+ */
+static void
+WALInsertSlotAcquire(bool exclusive)
+{
+       int                     i;
+
+       if (exclusive)
+       {
+               for (i = 0; i < num_xloginsert_slots; i++)
+                       WALInsertSlotAcquireOne(i);
+               holdingAllSlots = true;
+       }
+       else
+               WALInsertSlotAcquireOne(-1);
+}
+
+/*
+ * Workhorse of WALInsertSlotAcquire. Acquires the given slot, or an arbitrary
+ * one if slotno == -1. The index of the slot that was acquired is stored in
+ * MySlotNo.
+ *
+ * This is more or less equivalent to LWLockAcquire().
+ */
+static void
+WALInsertSlotAcquireOne(int slotno)
+{
+       volatile XLogInsertSlot *slot;
+       PGPROC     *proc = MyProc;
+       bool            retry = false;
+       int                     extraWaits = 0;
+       static int      slotToTry = -1;
+
+       /*
+        * Try to use the slot we used last time. If the system isn't particularly
+        * busy, it's a good bet that it's available, and it's good to have some
+        * affinity to a particular slot so that you don't unnecessarily bounce
+        * cache lines between processes when there is no contention.
+        *
+        * If this is the first time through in this backend, pick a slot
+        * (semi-)randomly. This allows the slots to be used evenly if you have a
+        * lot of very short connections.
+        */
+       if (slotno != -1)
+               MySlotNo = slotno;
+       else
+       {
+               if (slotToTry == -1)
+                       slotToTry = MyProc->pgprocno % num_xloginsert_slots;
+               MySlotNo = slotToTry;
+       }
+
+       /*
+        * We can't wait if we haven't got a PGPROC.  This should only occur
+        * during bootstrap or shared memory initialization.  Put an Assert here
+        * to catch unsafe coding practices.
+        */
+       Assert(MyProc != NULL);
+
+       /*
+        * Lock out cancel/die interrupts until we exit the code section protected
+        * by the slot.  This ensures that interrupts will not interfere with
+        * manipulations of data structures in shared memory.
+        */
+       START_CRIT_SECTION();
+
+       /*
+        * Loop here to try to acquire slot after each time we are signaled by
+        * WALInsertSlotRelease.
+        */
+       for (;;)
+       {
+               bool            mustwait;
+
+               slot = &XLogCtl->Insert.insertSlots[MySlotNo].slot;
+
+               /* Acquire mutex.  Time spent holding mutex should be short! */
+               SpinLockAcquire(&slot->mutex);
+
+               /* If retrying, allow WALInsertSlotRelease to release waiters again */
+               if (retry)
+                       slot->releaseOK = true;
+
+               /* If I can get the slot, do so quickly. */
+               if (slot->exclusive == 0)
+               {
+                       slot->exclusive++;
+                       mustwait = false;
+               }
+               else
+                       mustwait = true;
+
+               if (!mustwait)
+                       break;                          /* got the lock */
+
+               Assert(slot->owner != MyProc);
+
+               /*
+                * Add myself to wait queue.
+                */
+               proc->lwWaiting = true;
+               proc->lwWaitMode = LW_EXCLUSIVE;
+               proc->lwWaitLink = NULL;
+               if (slot->head == NULL)
+                       slot->head = proc;
+               else
+                       slot->tail->lwWaitLink = proc;
+               slot->tail = proc;
+
+               /* Can release the mutex now */
+               SpinLockRelease(&slot->mutex);
+
+               /*
+                * Wait until awakened.
+                *
+                * Since we share the process wait semaphore with the regular lock
+                * manager and ProcWaitForSignal, and we may need to acquire a slot
+                * while one of those is pending, it is possible that we get awakened
+                * for a reason other than being signaled by WALInsertSlotRelease. If
+                * so, loop back and wait again.  Once we've gotten the slot,
+                * re-increment the sema by the number of additional signals received,
+                * so that the lock manager or signal manager will see the received
+                * signal when it next waits.
+                */
+               for (;;)
+               {
+                       /* "false" means cannot accept cancel/die interrupt here. */
+                       PGSemaphoreLock(&proc->sem, false);
+                       if (!proc->lwWaiting)
+                               break;
+                       extraWaits++;
+               }
+
+               /* Now loop back and try to acquire lock again. */
+               retry = true;
+       }
+
+       slot->owner = proc;
+
+       /*
+        * Normally, we initialize the xlogInsertingAt value of the slot to 1,
+        * because we don't yet know where in the WAL we're going to insert. It's
+        * not critical what it points to right now - leaving it to a too small
+        * value just means that WaitXlogInsertionsToFinish() might wait on us
+        * unnecessarily, until we update the value (when we finish the insert or
+        * move to next page).
+        *
+        * If we're grabbing all the slots, however, stamp all but the last one
+        * with InvalidXLogRecPtr, meaning there is no insert in progress. The last
+        * slot is the one that we will update as we proceed with the insert, the
+        * rest are held just to keep off other inserters.
+        */
+       if (slotno != -1 && slotno != num_xloginsert_slots - 1)
+               slot->xlogInsertingAt = InvalidXLogRecPtr;
+       else
+               slot->xlogInsertingAt = 1;
+
+       /* We are done updating shared state of the slot itself. */
+       SpinLockRelease(&slot->mutex);
+
+       /*
+        * Fix the process wait semaphore's count for any absorbed wakeups.
+        */
+       while (extraWaits-- > 0)
+               PGSemaphoreUnlock(&proc->sem);
+
+       /*
+        * If we couldn't get the slot immediately, try another slot next time.
+        * On a system with more insertion slots than concurrent inserters, this
+        * causes all the inserters to eventually migrate to a slot that no-one
+        * else is using. On a system with more inserters than slots, it still
+        * causes the inserters to be distributed quite evenly across the slots.
+        */
+       if (slotno != -1 && retry)
+               slotToTry = (slotToTry + 1) % num_xloginsert_slots;
+}
+
+/*
+ * Wait for the given slot to become free, or for its xlogInsertingAt location
+ * to change to something else than 'waitptr'. In other words, wait for the
+ * inserter using the given slot to finish its insertion, or to at least make
+ * some progress.
+ */
+static void
+WaitOnSlot(volatile XLogInsertSlot *slot, XLogRecPtr waitptr)
+{
+       PGPROC     *proc = MyProc;
+       int                     extraWaits = 0;
+
+       /*
+        * Lock out cancel/die interrupts while we sleep on the slot. There is
+        * no cleanup mechanism to remove us from the wait queue if we got
+        * interrupted.
+        */
+       HOLD_INTERRUPTS();
+
+       /*
+        * Loop here to try to acquire lock after each time we are signaled.
+        */
+       for (;;)
+       {
+               bool            mustwait;
+
+               /* Acquire mutex.  Time spent holding mutex should be short! */
+               SpinLockAcquire(&slot->mutex);
+
+               /* If I can get the lock, do so quickly. */
+               if (slot->exclusive == 0 || slot->xlogInsertingAt != waitptr)
+                       mustwait = false;
+               else
+                       mustwait = true;
+
+               if (!mustwait)
+                       break;                          /* the lock was free */
+
+               Assert(slot->owner != MyProc);
+
+               /*
+                * Add myself to wait queue.
+                */
+               proc->lwWaiting = true;
+               proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
+               proc->lwWaitLink = NULL;
+
+               /* waiters are added to the front of the queue */
+               proc->lwWaitLink = slot->head;
+               if (slot->head == NULL)
+                       slot->tail = proc;
+               slot->head = proc;
+
+               /* Can release the mutex now */
+               SpinLockRelease(&slot->mutex);
+
+               /*
+                * Wait until awakened.
+                *
+                * Since we share the process wait semaphore with other things, like
+                * the regular lock manager and ProcWaitForSignal, and we may need to
+                * acquire an LWLock while one of those is pending, it is possible that
+                * we get awakened for a reason other than being signaled by
+                * LWLockRelease. If so, loop back and wait again.  Once we've gotten
+                * the LWLock, re-increment the sema by the number of additional
+                * signals received, so that the lock manager or signal manager will
+                * see the received signal when it next waits.
+                */
+               for (;;)
+               {
+                       /* "false" means cannot accept cancel/die interrupt here. */
+                       PGSemaphoreLock(&proc->sem, false);
+                       if (!proc->lwWaiting)
+                               break;
+                       extraWaits++;
+               }
+
+               /* Now loop back and try to acquire lock again. */
+       }
+
+       /* We are done updating shared state of the lock itself. */
+       SpinLockRelease(&slot->mutex);
+
+       /*
+        * Fix the process wait semaphore's count for any absorbed wakeups.
+        */
+       while (extraWaits-- > 0)
+               PGSemaphoreUnlock(&proc->sem);
+
+       /*
+        * Now okay to allow cancel/die interrupts.
+        */
+       RESUME_INTERRUPTS();
+}
+
+/*
+ * Wake up all processes waiting for us with WaitOnSlot(). Sets our
+ * xlogInsertingAt value to EndPos, without releasing the slot.
+ */
+static void
+WakeupWaiters(XLogRecPtr EndPos)
+{
+       volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[MySlotNo].slot;
+       PGPROC     *head;
+       PGPROC     *proc;
+       PGPROC     *next;
+
+       /*
+        * If we have already reported progress up to the same point, do nothing.
+        * No other process can modify xlogInsertingAt, so we can check this before
+        * grabbing the spinlock.
+        */
+       if (slot->xlogInsertingAt == EndPos)
+               return;
+       /* xlogInsertingAt should not go backwards */
+       Assert(slot->xlogInsertingAt < EndPos);
+
+       /* Acquire mutex.  Time spent holding mutex should be short! */
+       SpinLockAcquire(&slot->mutex);
+
+       /* we should own the slot */
+       Assert(slot->exclusive == 1 && slot->owner == MyProc);
+
+       slot->xlogInsertingAt = EndPos;
+
+       /*
+        * See if there are any waiters that need to be woken up.
+        */
+       head = slot->head;
+
+       if (head != NULL)
+       {
+               proc = head;
+
+               /* LW_WAIT_UNTIL_FREE waiters are always in the front of the queue */
+               next = proc->lwWaitLink;
+               while (next && next->lwWaitMode == LW_WAIT_UNTIL_FREE)
+               {
+                       proc = next;
+                       next = next->lwWaitLink;
+               }
+
+               /* proc is now the last PGPROC to be released */
+               slot->head = next;
+               proc->lwWaitLink = NULL;
+       }
+
+       /* We are done updating shared state of the lock itself. */
+       SpinLockRelease(&slot->mutex);
+
+       /*
+        * Awaken any waiters I removed from the queue.
+        */
+       while (head != NULL)
+       {
+               proc = head;
+               head = proc->lwWaitLink;
+               proc->lwWaitLink = NULL;
+               proc->lwWaiting = false;
+               PGSemaphoreUnlock(&proc->sem);
+       }
+}
+
+/*
+ * Release our insertion slot (or slots, if we're holding them all).
+ */
+static void
+WALInsertSlotRelease(void)
+{
+       int                     i;
+
+       if (holdingAllSlots)
+       {
+               for (i = 0; i < num_xloginsert_slots; i++)
+                       WALInsertSlotReleaseOne(i);
+               holdingAllSlots = false;
+       }
+       else
+               WALInsertSlotReleaseOne(MySlotNo);
+}
+
+static void
+WALInsertSlotReleaseOne(int slotno)
+{
+       volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[slotno].slot;
+       PGPROC     *head;
+       PGPROC     *proc;
+
+       /* Acquire mutex.  Time spent holding mutex should be short! */
+       SpinLockAcquire(&slot->mutex);
+
+       /* we must be holding it */
+       Assert(slot->exclusive == 1 && slot->owner == MyProc);
+
+       slot->xlogInsertingAt = InvalidXLogRecPtr;
+
+       /* Release my hold on the slot */
+       slot->exclusive = 0;
+       slot->owner = NULL;
+
+       /*
+        * See if I need to awaken any waiters..
+        */
+       head = slot->head;
+       if (head != NULL)
+       {
+               if (slot->releaseOK)
+               {
+                       /*
+                        * Remove the to-be-awakened PGPROCs from the queue.
+                        */
+                       bool            releaseOK = true;
+
+                       proc = head;
+
+                       /*
+                        * First wake up any backends that want to be woken up without
+                        * acquiring the lock. These are always in the front of the queue.
+                        */
+                       while (proc->lwWaitMode == LW_WAIT_UNTIL_FREE && proc->lwWaitLink)
+                               proc = proc->lwWaitLink;
+
+                       /*
+                        * Awaken the first exclusive-waiter, if any.
+                        */
+                       if (proc->lwWaitLink)
+                       {
+                               Assert(proc->lwWaitLink->lwWaitMode == LW_EXCLUSIVE);
+                               proc = proc->lwWaitLink;
+                               releaseOK = false;
+                       }
+                       /* proc is now the last PGPROC to be released */
+                       slot->head = proc->lwWaitLink;
+                       proc->lwWaitLink = NULL;
+
+                       slot->releaseOK = releaseOK;
+               }
+               else
+                       head = NULL;
+       }
+
+       /* We are done updating shared state of the slot itself. */
+       SpinLockRelease(&slot->mutex);
+
+       /*
+        * Awaken any waiters I removed from the queue.
+        */
+       while (head != NULL)
+       {
+               proc = head;
+               head = proc->lwWaitLink;
+               proc->lwWaitLink = NULL;
+               proc->lwWaiting = false;
+               PGSemaphoreUnlock(&proc->sem);
+       }
+
+       /*
+        * Now okay to allow cancel/die interrupts.
+        */
+       END_CRIT_SECTION();
+}
+
+
+/*
+ * Wait for any WAL insertions < upto to finish.
+ *
+ * Returns the location of the oldest insertion that is still in-progress.
+ * Any WAL prior to that point has been fully copied into WAL buffers, and
+ * can be flushed out to disk. Because this waits for any insertions older
+ * than 'upto' to finish, the return value is always >= 'upto'.
+ *
+ * Note: When you are about to write out WAL, you must call this function
+ * *before* acquiring WALWriteLock, to avoid deadlocks. This function might
+ * need to wait for an insertion to finish (or at least advance to next
+ * uninitialized page), and the inserter might need to evict an old WAL buffer
+ * to make room for a new one, which in turn requires WALWriteLock.
+ */
+static XLogRecPtr
+WaitXLogInsertionsToFinish(XLogRecPtr upto)
+{
+       uint64          bytepos;
+       XLogRecPtr      reservedUpto;
+       XLogRecPtr      finishedUpto;
+       volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+       int                     i;
+
+       if (MyProc == NULL)
+               elog(PANIC, "cannot wait without a PGPROC structure");
+
+       /* Read the current insert position */
+       SpinLockAcquire(&Insert->insertpos_lck);
+       bytepos = Insert->CurrBytePos;
+       SpinLockRelease(&Insert->insertpos_lck);
+       reservedUpto = XLogBytePosToEndRecPtr(bytepos);
+
+       /*
+        * No-one should request to flush a piece of WAL that hasn't even been
+        * reserved yet. However, it can happen if there is a block with a bogus
+        * LSN on disk, for example. XLogFlush checks for that situation and
+        * complains, but only after the flush. Here we just assume that to mean
+        * that all WAL that has been reserved needs to be finished. In this
+        * corner-case, the return value can be smaller than 'upto' argument.
+        */
+       if (upto > reservedUpto)
+       {
+               elog(LOG, "request to flush past end of generated WAL; request %X/%X, currpos %X/%X",
+                        (uint32) (upto >> 32), (uint32) upto,
+                        (uint32) (reservedUpto >> 32), (uint32) reservedUpto);
+               upto = reservedUpto;
+       }
+
+       /*
+        * finishedUpto is our return value, indicating the point upto which
+        * all the WAL insertions have been finished. Initialize it to the head
+        * of reserved WAL, and as we iterate through the insertion slots, back it
+        * out for any insertion that's still in progress.
+        */
+       finishedUpto = reservedUpto;
+
+       /*
+        * Loop through all the slots, sleeping on any in-progress insert older
+        * than 'upto'.
+        */
+       for (i = 0; i < num_xloginsert_slots; i++)
+       {
+               volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[i].slot;
+               XLogRecPtr insertingat;
+
+       retry:
+               /*
+                * We can check if the slot is in use without grabbing the spinlock.
+                * The spinlock acquisition of insertpos_lck before this loop acts
+                * as a memory barrier. If someone acquires the slot after that, it
+                * can't possibly be inserting to anything < reservedUpto. If it was
+                * acquired before that, an unlocked test will return true.
+                */
+               if (!slot->exclusive)
+                       continue;
+
+               SpinLockAcquire(&slot->mutex);
+               /* re-check now that we have the lock */
+               if (!slot->exclusive)
+               {
+                       SpinLockRelease(&slot->mutex);
+                       continue;
+               }
+               insertingat = slot->xlogInsertingAt;
+               SpinLockRelease(&slot->mutex);
+
+               if (insertingat == InvalidXLogRecPtr)
+               {
+                       /*
+                        * slot is reserved just to hold off other inserters, there is no
+                        * actual insert in progress.
+                        */
+                       continue;
+               }
+
+               /*
+                * This insertion is still in progress. Do we need to wait for it?
+                *
+                * When an inserter acquires a slot, it doesn't reset 'insertingat', so
+                * it will initially point to the old value of some already-finished
+                * insertion. The inserter will update the value as soon as it finishes
+                * the insertion, moves to the next page, or has to do I/O to flush an
+                * old dirty buffer. That means that when we see a slot with
+                * insertingat value < upto, we don't know if that insertion is still
+                * truly in progress, or if the slot is reused by a new inserter that
+                * hasn't updated the insertingat value yet. We have to assume it's the
+                * latter, and wait.
+                */
+               if (insertingat < upto)
+               {
+                       WaitOnSlot(slot, insertingat);
+                       goto retry;
+               }
+               else
+               {
+                       /*
+                        * We don't need to wait for this insertion, but update the
+                        * return value.
+                        */
+                       if (insertingat < finishedUpto)
+                               finishedUpto = insertingat;
+               }
+       }
+       return finishedUpto;
+}
+
+/*
+ * Get a pointer to the right location in the WAL buffer containing the
+ * given XLogRecPtr.
+ *
+ * If the page is not initialized yet, it is initialized. That might require
+ * evicting an old dirty buffer from the buffer cache, which means I/O.
+ *
+ * The caller must ensure that the page containing the requested location
+ * isn't evicted yet, and won't be evicted. The way to ensure that is to
+ * hold onto an XLogInsertSlot with the xlogInsertingAt position set to
+ * something <= ptr. GetXLogBuffer() will update xlogInsertingAt if it needs
+ * to evict an old page from the buffer. (This means that once you call
+ * GetXLogBuffer() with a given 'ptr', you must not access anything before
+ * that point anymore, and must not call GetXLogBuffer() with an older 'ptr'
+ * later, because older buffers might be recycled already)
+ */
+static char *
+GetXLogBuffer(XLogRecPtr ptr)
+{
+       int                     idx;
+       XLogRecPtr      endptr;
+       static uint64 cachedPage = 0;
+       static char *cachedPos = NULL;
+       XLogRecPtr      expectedEndPtr;
+
+       /*
+        * Fast path for the common case that we need to access again the same
+        * page as last time.
         */
-       updrqst = false;
-       freespace = INSERT_FREESPACE(Insert);
-       if (freespace == 0)
+       if (ptr / XLOG_BLCKSZ == cachedPage)
        {
-               updrqst = AdvanceXLInsertBuffer(false);
-               freespace = INSERT_FREESPACE(Insert);
+               Assert(((XLogPageHeader) cachedPos)->xlp_magic == XLOG_PAGE_MAGIC);
+               Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr == ptr - (ptr % XLOG_BLCKSZ));
+               return cachedPos + ptr % XLOG_BLCKSZ;
        }
 
-       /* Compute record's XLOG location */
-       curridx = Insert->curridx;
-       INSERT_RECPTR(RecPtr, Insert, curridx);
+       /*
+        * The XLog buffer cache is organized so that a page is always loaded
+        * to a particular buffer.  That way we can easily calculate the buffer
+        * a given page must be loaded into, from the XLogRecPtr alone.
+        */
+       idx = XLogRecPtrToBufIdx(ptr);
 
        /*
-        * If the record is an XLOG_SWITCH, and we are exactly at the start of a
-        * segment, we need not insert it (and don't want to because we'd like
-        * consecutive switch requests to be no-ops).  Instead, make sure
-        * everything is written and flushed through the end of the prior segment,
-        * and return the prior segment's end address.
+        * See what page is loaded in the buffer at the moment. It could be the
+        * page we're looking for, or something older. It can't be anything newer
+        * - that would imply the page we're looking for has already been written
+        * out to disk and evicted, and the caller is responsible for making sure
+        * that doesn't happen.
+        *
+        * However, we don't hold a lock while we read the value. If someone has
+        * just initialized the page, it's possible that we get a "torn read" of
+        * the XLogRecPtr if 64-bit fetches are not atomic on this platform. In
+        * that case we will see a bogus value. That's ok, we'll grab the mapping
+        * lock (in AdvanceXLInsertBuffer) and retry if we see anything else than
+        * the page we're looking for. But it means that when we do this unlocked
+        * read, we might see a value that appears to be ahead of the page we're
+        * looking for. Don't PANIC on that, until we've verified the value while
+        * holding the lock.
         */
-       if (isLogSwitch && (RecPtr % XLogSegSize) == SizeOfXLogLongPHD)
+       expectedEndPtr = ptr;
+       expectedEndPtr += XLOG_BLCKSZ - ptr % XLOG_BLCKSZ;
+
+       endptr = XLogCtl->xlblocks[idx];
+       if (expectedEndPtr != endptr)
        {
-               /* We can release insert lock immediately */
-               LWLockRelease(WALInsertLock);
+               /*
+                * Let others know that we're finished inserting the record up
+                * to the page boundary.
+                */
+               WakeupWaiters(expectedEndPtr - XLOG_BLCKSZ);
 
-               RecPtr -= SizeOfXLogLongPHD;
+               AdvanceXLInsertBuffer(ptr, false);
+               endptr = XLogCtl->xlblocks[idx];
 
-               LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
-               LogwrtResult = XLogCtl->LogwrtResult;
-               if (LogwrtResult.Flush < RecPtr)
-               {
-                       XLogwrtRqst FlushRqst;
+               if (expectedEndPtr != endptr)
+                       elog(PANIC, "could not find WAL buffer for %X/%X",
+                                (uint32) (ptr >> 32) , (uint32) ptr);
+       }
+       else
+       {
+               /*
+                * Make sure the initialization of the page is visible to us, and
+                * won't arrive later to overwrite the WAL data we write on the page.
+                */
+               pg_memory_barrier();
+       }
 
-                       FlushRqst.Write = RecPtr;
-                       FlushRqst.Flush = RecPtr;
-                       XLogWrite(FlushRqst, false, false);
-               }
-               LWLockRelease(WALWriteLock);
+       /*
+        * Found the buffer holding this page. Return a pointer to the right
+        * offset within the page.
+        */
+       cachedPage = ptr / XLOG_BLCKSZ;
+       cachedPos = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ;
 
-               END_CRIT_SECTION();
+       Assert(((XLogPageHeader) cachedPos)->xlp_magic == XLOG_PAGE_MAGIC);
+       Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr == ptr - (ptr % XLOG_BLCKSZ));
 
-               /* wake up walsenders now that we've released heavily contended locks */
-               WalSndWakeupProcessRequests();
-               return RecPtr;
-       }
+       return cachedPos + ptr % XLOG_BLCKSZ;
+}
 
-       /* Finish the record header */
-       rechdr->xl_prev = Insert->PrevRecord;
+/*
+ * Converts a "usable byte position" to XLogRecPtr. A usable byte position
+ * is the position starting from the beginning of WAL, excluding all WAL
+ * page headers.
+ */
+static XLogRecPtr
+XLogBytePosToRecPtr(uint64 bytepos)
+{
+       uint64          fullsegs;
+       uint64          fullpages;
+       uint64          bytesleft;
+       uint32          seg_offset;
+       XLogRecPtr      result;
 
-       /* Now we can finish computing the record's CRC */
-       COMP_CRC32(rdata_crc, (char *) rechdr, offsetof(XLogRecord, xl_crc));
-       FIN_CRC32(rdata_crc);
-       rechdr->xl_crc = rdata_crc;
+       fullsegs = bytepos / UsableBytesInSegment;
+       bytesleft = bytepos % UsableBytesInSegment;
 
-#ifdef WAL_DEBUG
-       if (XLOG_DEBUG)
+       if (bytesleft < XLOG_BLCKSZ - SizeOfXLogLongPHD)
        {
-               StringInfoData buf;
-
-               initStringInfo(&buf);
-               appendStringInfo(&buf, "INSERT @ %X/%X: ",
-                                                (uint32) (RecPtr >> 32), (uint32) RecPtr);
-               xlog_outrec(&buf, rechdr);
-               if (rdata->data != NULL)
-               {
-                       appendStringInfo(&buf, " - ");
-                       RmgrTable[rechdr->xl_rmid].rm_desc(&buf, rechdr->xl_info, rdata->data);
-               }
-               elog(LOG, "%s", buf.data);
-               pfree(buf.data);
+               /* fits on first page of segment */
+               seg_offset = bytesleft + SizeOfXLogLongPHD;
        }
-#endif
-
-       /* Record begin of record in appropriate places */
-       ProcLastRecPtr = RecPtr;
-       Insert->PrevRecord = RecPtr;
-
-       /*
-        * Append the data, including backup blocks if any
-        */
-       rdata = &hdr_rdt;
-       while (write_len)
+       else
        {
-               while (rdata->data == NULL)
-                       rdata = rdata->next;
+               /* account for the first page on segment with long header */
+               seg_offset = XLOG_BLCKSZ;
+               bytesleft -= XLOG_BLCKSZ - SizeOfXLogLongPHD;
 
-               if (freespace > 0)
-               {
-                       if (rdata->len > freespace)
-                       {
-                               memcpy(Insert->currpos, rdata->data, freespace);
-                               rdata->data += freespace;
-                               rdata->len -= freespace;
-                               write_len -= freespace;
-                       }
-                       else
-                       {
-                               memcpy(Insert->currpos, rdata->data, rdata->len);
-                               freespace -= rdata->len;
-                               write_len -= rdata->len;
-                               Insert->currpos += rdata->len;
-                               rdata = rdata->next;
-                               continue;
-                       }
-               }
+               fullpages = bytesleft / UsableBytesInPage;
+               bytesleft = bytesleft % UsableBytesInPage;
 
-               /* Use next buffer */
-               updrqst = AdvanceXLInsertBuffer(false);
-               curridx = Insert->curridx;
-               /* Mark page header to indicate this record continues on the page */
-               Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
-               Insert->currpage->xlp_rem_len = write_len;
-               freespace = INSERT_FREESPACE(Insert);
+               seg_offset += fullpages * XLOG_BLCKSZ + bytesleft + SizeOfXLogShortPHD;
        }
 
-       /* Ensure next record will be properly aligned */
-       Insert->currpos = (char *) Insert->currpage +
-               MAXALIGN(Insert->currpos - (char *) Insert->currpage);
-       freespace = INSERT_FREESPACE(Insert);
+       XLogSegNoOffsetToRecPtr(fullsegs, seg_offset, result);
 
-       /*
-        * The recptr I return is the beginning of the *next* record. This will be
-        * stored as LSN for changed data pages...
-        */
-       INSERT_RECPTR(RecPtr, Insert, curridx);
+       return result;
+}
 
-       /*
-        * If the record is an XLOG_SWITCH, we must now write and flush all the
-        * existing data, and then forcibly advance to the start of the next
-        * segment.  It's not good to do this I/O while holding the insert lock,
-        * but there seems too much risk of confusion if we try to release the
-        * lock sooner.  Fortunately xlog switch needn't be a high-performance
-        * operation anyway...
-        */
-       if (isLogSwitch)
+/*
+ * Like XLogBytePosToRecPtr, but if the position is at a page boundary,
+ * returns a pointer to the beginning of the page (ie. before page header),
+ * not to where the first xlog record on that page would go to. This is used
+ * when converting a pointer to the end of a record.
+ */
+static XLogRecPtr
+XLogBytePosToEndRecPtr(uint64 bytepos)
+{
+       uint64          fullsegs;
+       uint64          fullpages;
+       uint64          bytesleft;
+       uint32          seg_offset;
+       XLogRecPtr      result;
+
+       fullsegs = bytepos / UsableBytesInSegment;
+       bytesleft = bytepos % UsableBytesInSegment;
+
+       if (bytesleft < XLOG_BLCKSZ - SizeOfXLogLongPHD)
+       {
+               /* fits on first page of segment */
+               if (bytesleft == 0)
+                       seg_offset = 0;
+               else
+                       seg_offset = bytesleft + SizeOfXLogLongPHD;
+       }
+       else
        {
-               XLogwrtRqst FlushRqst;
-               XLogRecPtr      OldSegEnd;
+               /* account for the first page on segment with long header */
+               seg_offset = XLOG_BLCKSZ;
+               bytesleft -= XLOG_BLCKSZ - SizeOfXLogLongPHD;
 
-               TRACE_POSTGRESQL_XLOG_SWITCH();
+               fullpages = bytesleft / UsableBytesInPage;
+               bytesleft = bytesleft % UsableBytesInPage;
 
-               LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
+               if (bytesleft == 0)
+                       seg_offset += fullpages * XLOG_BLCKSZ + bytesleft;
+               else
+                       seg_offset += fullpages * XLOG_BLCKSZ + bytesleft + SizeOfXLogShortPHD;
+       }
 
-               /*
-                * Flush through the end of the page containing XLOG_SWITCH, and
-                * perform end-of-segment actions (eg, notifying archiver).
-                */
-               WriteRqst = XLogCtl->xlblocks[curridx];
-               FlushRqst.Write = WriteRqst;
-               FlushRqst.Flush = WriteRqst;
-               XLogWrite(FlushRqst, false, true);
+       XLogSegNoOffsetToRecPtr(fullsegs, seg_offset, result);
 
-               /* Set up the next buffer as first page of next segment */
-               /* Note: AdvanceXLInsertBuffer cannot need to do I/O here */
-               (void) AdvanceXLInsertBuffer(true);
+       return result;
+}
 
-               /* There should be no unwritten data */
-               curridx = Insert->curridx;
-               Assert(curridx == XLogCtl->Write.curridx);
+/*
+ * Convert an XLogRecPtr to a "usable byte position".
+ */
+static uint64
+XLogRecPtrToBytePos(XLogRecPtr ptr)
+{
+       uint64          fullsegs;
+       uint32          fullpages;
+       uint32          offset;
+       uint64          result;
 
-               /* Compute end address of old segment */
-               OldSegEnd = XLogCtl->xlblocks[curridx];
-               OldSegEnd -= XLOG_BLCKSZ;
+       XLByteToSeg(ptr, fullsegs);
 
-               /* Make it look like we've written and synced all of old segment */
-               LogwrtResult.Write = OldSegEnd;
-               LogwrtResult.Flush = OldSegEnd;
+       fullpages = (ptr % XLOG_SEG_SIZE) / XLOG_BLCKSZ;
+       offset = ptr % XLOG_BLCKSZ;
 
-               /*
-                * Update shared-memory status --- this code should match XLogWrite
-                */
+       if (fullpages == 0)
+       {
+               result = fullsegs * UsableBytesInSegment;
+               if (offset > 0)
                {
-                       /* use volatile pointer to prevent code rearrangement */
-                       volatile XLogCtlData *xlogctl = XLogCtl;
-
-                       SpinLockAcquire(&xlogctl->info_lck);
-                       xlogctl->LogwrtResult = LogwrtResult;
-                       if (xlogctl->LogwrtRqst.Write < LogwrtResult.Write)
-                               xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
-                       if (xlogctl->LogwrtRqst.Flush < LogwrtResult.Flush)
-                               xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
-                       SpinLockRelease(&xlogctl->info_lck);
+                       Assert(offset >= SizeOfXLogLongPHD);
+                       result += offset - SizeOfXLogLongPHD;
                }
-
-               LWLockRelease(WALWriteLock);
-
-               updrqst = false;                /* done already */
        }
        else
        {
-               /* normal case, ie not xlog switch */
-
-               /* Need to update shared LogwrtRqst if some block was filled up */
-               if (freespace == 0)
-               {
-                       /* curridx is filled and available for writing out */
-                       updrqst = true;
-               }
-               else
+               result = fullsegs * UsableBytesInSegment +
+                       (XLOG_BLCKSZ - SizeOfXLogLongPHD) +  /* account for first page */
+                       (fullpages - 1) * UsableBytesInPage; /* full pages */
+               if (offset > 0)
                {
-                       /* if updrqst already set, write through end of previous buf */
-                       curridx = PrevBufIdx(curridx);
+                       Assert(offset >= SizeOfXLogShortPHD);
+                       result += offset - SizeOfXLogShortPHD;
                }
-               WriteRqst = XLogCtl->xlblocks[curridx];
-       }
-
-       LWLockRelease(WALInsertLock);
-
-       if (updrqst)
-       {
-               /* use volatile pointer to prevent code rearrangement */
-               volatile XLogCtlData *xlogctl = XLogCtl;
-
-               SpinLockAcquire(&xlogctl->info_lck);
-               /* advance global request to include new block(s) */
-               if (xlogctl->LogwrtRqst.Write < WriteRqst)
-                       xlogctl->LogwrtRqst.Write = WriteRqst;
-               /* update local result copy while I have the chance */
-               LogwrtResult = xlogctl->LogwrtResult;
-               SpinLockRelease(&xlogctl->info_lck);
        }
 
-       XactLastRecEnd = RecPtr;
-
-       END_CRIT_SECTION();
-
-       /* wake up walsenders now that we've released heavily contended locks */
-       WalSndWakeupProcessRequests();
-
-       return RecPtr;
+       return result;
 }
 
 /*
@@ -1303,158 +2385,181 @@ XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock,
 }
 
 /*
- * Advance the Insert state to the next buffer page, writing out the next
- * buffer if it still contains unwritten data.
- *
- * If new_segment is TRUE then we set up the next buffer page as the first
- * page of the next xlog segment file, possibly but not usually the next
- * consecutive file page.
- *
- * The global LogwrtRqst.Write pointer needs to be advanced to include the
- * just-filled page.  If we can do this for free (without an extra lock),
- * we do so here.  Otherwise the caller must do it.  We return TRUE if the
- * request update still needs to be done, FALSE if we did it internally.
- *
- * Must be called with WALInsertLock held.
+ * Initialize XLOG buffers, writing out old buffers if they still contain
+ * unwritten data, upto the page containing 'upto'. Or if 'opportunistic' is
+ * true, initialize as many pages as we can without having to write out
+ * unwritten data. Any new pages are initialized to zeros, with pages headers
+ * initialized properly.
  */
-static bool
-AdvanceXLInsertBuffer(bool new_segment)
+static void
+AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 {
        XLogCtlInsert *Insert = &XLogCtl->Insert;
-       int                     nextidx = NextBufIdx(Insert->curridx);
-       bool            update_needed = true;
+       int                     nextidx;
        XLogRecPtr      OldPageRqstPtr;
        XLogwrtRqst WriteRqst;
-       XLogRecPtr      NewPageEndPtr;
+       XLogRecPtr      NewPageEndPtr = InvalidXLogRecPtr;
        XLogRecPtr      NewPageBeginPtr;
        XLogPageHeader NewPage;
+       int                     npages = 0;
+
+       LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
 
        /*
-        * Get ending-offset of the buffer page we need to replace (this may be
-        * zero if the buffer hasn't been used yet).  Fall through if it's already
-        * written out.
+        * Now that we have the lock, check if someone initialized the page
+        * already.
         */
-       OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
-       if (LogwrtResult.Write < OldPageRqstPtr)
+       while (upto >= XLogCtl->xlblocks[XLogCtl->curridx] || opportunistic)
        {
-               /* nope, got work to do... */
-               XLogRecPtr      FinishedPageRqstPtr;
-
-               FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx];
-
-               /* Before waiting, get info_lck and update LogwrtResult */
-               {
-                       /* use volatile pointer to prevent code rearrangement */
-                       volatile XLogCtlData *xlogctl = XLogCtl;
-
-                       SpinLockAcquire(&xlogctl->info_lck);
-                       if (xlogctl->LogwrtRqst.Write < FinishedPageRqstPtr)
-                               xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr;
-                       LogwrtResult = xlogctl->LogwrtResult;
-                       SpinLockRelease(&xlogctl->info_lck);
-               }
-
-               update_needed = false;  /* Did the shared-request update */
+               nextidx = NextBufIdx(XLogCtl->curridx);
 
                /*
-                * Now that we have an up-to-date LogwrtResult value, see if we still
-                * need to write it or if someone else already did.
+                * Get ending-offset of the buffer page we need to replace (this may
+                * be zero if the buffer hasn't been used yet).  Fall through if it's
+                * already written out.
                 */
+               OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
                if (LogwrtResult.Write < OldPageRqstPtr)
                {
-                       /* Must acquire write lock */
-                       LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
-                       LogwrtResult = XLogCtl->LogwrtResult;
-                       if (LogwrtResult.Write >= OldPageRqstPtr)
+                       /*
+                        * Nope, got work to do. If we just want to pre-initialize as much
+                        * as we can without flushing, give up now.
+                        */
+                       if (opportunistic)
+                               break;
+
+                       /* Before waiting, get info_lck and update LogwrtResult */
                        {
-                               /* OK, someone wrote it already */
-                               LWLockRelease(WALWriteLock);
+                               /* use volatile pointer to prevent code rearrangement */
+                               volatile XLogCtlData *xlogctl = XLogCtl;
+
+                               SpinLockAcquire(&xlogctl->info_lck);
+                               if (xlogctl->LogwrtRqst.Write < OldPageRqstPtr)
+                                       xlogctl->LogwrtRqst.Write = OldPageRqstPtr;
+                               LogwrtResult = xlogctl->LogwrtResult;
+                               SpinLockRelease(&xlogctl->info_lck);
                        }
-                       else
+
+                       /*
+                        * Now that we have an up-to-date LogwrtResult value, see if we
+                        * still need to write it or if someone else already did.
+                        */
+                       if (LogwrtResult.Write < OldPageRqstPtr)
                        {
                                /*
-                                * Have to write buffers while holding insert lock. This is
-                                * not good, so only write as much as we absolutely must.
+                                * Must acquire write lock. Release WALBufMappingLock first,
+                                * to make sure that all insertions that we need to wait for
+                                * can finish (up to this same position). Otherwise we risk
+                                * deadlock.
                                 */
-                               TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
-                               WriteRqst.Write = OldPageRqstPtr;
-                               WriteRqst.Flush = 0;
-                               XLogWrite(WriteRqst, false, false);
-                               LWLockRelease(WALWriteLock);
-                               TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
+                               LWLockRelease(WALBufMappingLock);
+
+                               WaitXLogInsertionsToFinish(OldPageRqstPtr);
+
+                               LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
+
+                               LogwrtResult = XLogCtl->LogwrtResult;
+                               if (LogwrtResult.Write >= OldPageRqstPtr)
+                               {
+                                       /* OK, someone wrote it already */
+                                       LWLockRelease(WALWriteLock);
+                               }
+                               else
+                               {
+                                       /* Have to write it ourselves */
+                                       TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
+                                       WriteRqst.Write = OldPageRqstPtr;
+                                       WriteRqst.Flush = 0;
+                                       XLogWrite(WriteRqst, false);
+                                       LWLockRelease(WALWriteLock);
+                                       TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
+                               }
+                               /* Re-acquire WALBufMappingLock and retry */
+                               LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
+                               continue;
                        }
                }
-       }
 
-       /*
-        * Now the next buffer slot is free and we can set it up to be the next
-        * output page.
-        */
-       NewPageBeginPtr = XLogCtl->xlblocks[Insert->curridx];
+               /*
+                * Now the next buffer slot is free and we can set it up to be the next
+                * output page.
+                */
+               NewPageBeginPtr = XLogCtl->xlblocks[XLogCtl->curridx];
+               NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ;
 
-       if (new_segment)
-       {
-               /* force it to a segment start point */
-               if (NewPageBeginPtr % XLogSegSize != 0)
-                       NewPageBeginPtr += XLogSegSize - NewPageBeginPtr % XLogSegSize;
-       }
+               Assert(NewPageEndPtr % XLOG_BLCKSZ == 0);
+               Assert(XLogRecEndPtrToBufIdx(NewPageEndPtr) == nextidx);
+               Assert(XLogRecPtrToBufIdx(NewPageBeginPtr) == nextidx);
 
-       NewPageEndPtr = NewPageBeginPtr;
-       NewPageEndPtr += XLOG_BLCKSZ;
-       XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
-       NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
+               NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
 
-       Insert->curridx = nextidx;
-       Insert->currpage = NewPage;
+               /*
+                * Be sure to re-zero the buffer so that bytes beyond what we've
+                * written will look like zeroes and not valid XLOG records...
+                */
+               MemSet((char *) NewPage, 0, XLOG_BLCKSZ);
 
-       Insert->currpos = ((char *) NewPage) +SizeOfXLogShortPHD;
+               /*
+                * Fill the new page's header
+                */
+               NewPage   ->xlp_magic = XLOG_PAGE_MAGIC;
 
-       /*
-        * Be sure to re-zero the buffer so that bytes beyond what we've written
-        * will look like zeroes and not valid XLOG records...
-        */
-       MemSet((char *) NewPage, 0, XLOG_BLCKSZ);
+               /* NewPage->xlp_info = 0; */    /* done by memset */
+               NewPage   ->xlp_tli = ThisTimeLineID;
+               NewPage   ->xlp_pageaddr = NewPageBeginPtr;
+               /* NewPage->xlp_rem_len = 0; */         /* done by memset */
 
-       /*
-        * Fill the new page's header
-        */
-       NewPage   ->xlp_magic = XLOG_PAGE_MAGIC;
+               /*
+                * If online backup is not in progress, mark the header to indicate
+                * that* WAL records beginning in this page have removable backup
+                * blocks.  This allows the WAL archiver to know whether it is safe to
+                * compress archived WAL data by transforming full-block records into
+                * the non-full-block format.  It is sufficient to record this at the
+                * page level because we force a page switch (in fact a segment switch)
+                * when starting a backup, so the flag will be off before any records
+                * can be written during the backup.  At the end of a backup, the last
+                * page will be marked as all unsafe when perhaps only part is unsafe,
+                * but at worst the archiver would miss the opportunity to compress a
+                * few records.
+                */
+               if (!Insert->forcePageWrites)
+                       NewPage   ->xlp_info |= XLP_BKP_REMOVABLE;
 
-       /* NewPage->xlp_info = 0; */    /* done by memset */
-       NewPage   ->xlp_tli = ThisTimeLineID;
-       NewPage   ->xlp_pageaddr = NewPageBeginPtr;
+               /*
+                * If first page of an XLOG segment file, make it a long header.
+                */
+               if ((NewPage->xlp_pageaddr % XLogSegSize) == 0)
+               {
+                       XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;
 
-       /*
-        * If online backup is not in progress, mark the header to indicate that
-        * WAL records beginning in this page have removable backup blocks.  This
-        * allows the WAL archiver to know whether it is safe to compress archived
-        * WAL data by transforming full-block records into the non-full-block
-        * format.      It is sufficient to record this at the page level because we
-        * force a page switch (in fact a segment switch) when starting a backup,
-        * so the flag will be off before any records can be written during the
-        * backup.      At the end of a backup, the last page will be marked as all
-        * unsafe when perhaps only part is unsafe, but at worst the archiver
-        * would miss the opportunity to compress a few records.
-        */
-       if (!Insert->forcePageWrites)
-               NewPage   ->xlp_info |= XLP_BKP_REMOVABLE;
+                       NewLongPage->xlp_sysid = ControlFile->system_identifier;
+                       NewLongPage->xlp_seg_size = XLogSegSize;
+                       NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
+                       NewPage   ->xlp_info |= XLP_LONG_HEADER;
+               }
 
-       /*
-        * If first page of an XLOG segment file, make it a long header.
-        */
-       if ((NewPage->xlp_pageaddr % XLogSegSize) == 0)
-       {
-               XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;
+               /*
+                * Make sure the initialization of the page becomes visible to others
+                * before the xlblocks update. GetXLogBuffer() reads xlblocks without
+                * holding a lock.
+                */
+               pg_write_barrier();
+
+               *((volatile XLogRecPtr *) &XLogCtl->xlblocks[nextidx]) = NewPageEndPtr;
 
-               NewLongPage->xlp_sysid = ControlFile->system_identifier;
-               NewLongPage->xlp_seg_size = XLogSegSize;
-               NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
-               NewPage   ->xlp_info |= XLP_LONG_HEADER;
+               XLogCtl->curridx = nextidx;
 
-               Insert->currpos = ((char *) NewPage) +SizeOfXLogLongPHD;
+               npages++;
        }
+       LWLockRelease(WALBufMappingLock);
 
-       return update_needed;
+#ifdef WAL_DEBUG
+       if (npages > 0)
+       {
+               elog(DEBUG1, "initialized %d pages, upto %X/%X",
+                        npages, (uint32) (NewPageEndPtr >> 32), (uint32) NewPageEndPtr);
+       }
+#endif
 }
 
 /*
@@ -1486,16 +2591,12 @@ XLogCheckpointNeeded(XLogSegNo new_segno)
  * This option allows us to avoid uselessly issuing multiple writes when a
  * single one would do.
  *
- * If xlog_switch == TRUE, we are intending an xlog segment switch, so
- * perform end-of-segment actions after writing the last page, even if
- * it's not physically the end of its segment.  (NB: this will work properly
- * only if caller specifies WriteRqst == page-end and flexible == false,
- * and there is some data to write.)
- *
- * Must be called with WALWriteLock held.
+ * Must be called with WALWriteLock held. WaitXLogInsertionsToFinish(WriteRqst)
+ * must be called before grabbing the lock, to make sure the data is ready to
+ * write.
  */
 static void
-XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
+XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 {
        XLogCtlWrite *Write = &XLogCtl->Write;
        bool            ispartialpage;
@@ -1544,15 +2645,15 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
                 * if we're passed a bogus WriteRqst.Write that is past the end of the
                 * last page that's been initialized by AdvanceXLInsertBuffer.
                 */
-               if (LogwrtResult.Write >= XLogCtl->xlblocks[curridx])
+               XLogRecPtr EndPtr = XLogCtl->xlblocks[curridx];
+               if (LogwrtResult.Write >= EndPtr)
                        elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
                                 (uint32) (LogwrtResult.Write >> 32),
                                 (uint32) LogwrtResult.Write,
-                                (uint32) (XLogCtl->xlblocks[curridx] >> 32),
-                                (uint32) XLogCtl->xlblocks[curridx]);
+                                (uint32) (EndPtr >> 32), (uint32) EndPtr);
 
                /* Advance LogwrtResult.Write to end of current buffer page */
-               LogwrtResult.Write = XLogCtl->xlblocks[curridx];
+               LogwrtResult.Write = EndPtr;
                ispartialpage = WriteRqst.Write < LogwrtResult.Write;
 
                if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo))
@@ -1656,16 +2757,13 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
                         * later. Doing it here ensures that one and only one backend will
                         * perform this fsync.
                         *
-                        * We also do this if this is the last page written for an xlog
-                        * switch.
-                        *
                         * This is also the right place to notify the Archiver that the
                         * segment is ready to copy to archival storage, and to update the
                         * timer for archive_timeout, and to signal for a checkpoint if
                         * too many logfile segments have been used since the last
                         * checkpoint.
                         */
-                       if (finishing_seg || (xlog_switch && last_iteration))
+                       if (finishing_seg)
                        {
                                issue_xlog_fsync(openLogFile, openLogSegNo);
 
@@ -1949,6 +3047,7 @@ XLogFlush(XLogRecPtr record)
        {
                /* use volatile pointer to prevent code rearrangement */
                volatile XLogCtlData *xlogctl = XLogCtl;
+               XLogRecPtr      insertpos;
 
                /* read LogwrtResult and update local state */
                SpinLockAcquire(&xlogctl->info_lck);
@@ -1961,6 +3060,12 @@ XLogFlush(XLogRecPtr record)
                if (record <= LogwrtResult.Flush)
                        break;
 
+               /*
+                * Before actually performing the write, wait for all in-flight
+                * insertions to the pages we're about to write to finish.
+                */
+               insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr);
+
                /*
                 * Try to get the write lock. If we can't get it immediately, wait
                 * until it's released, and recheck if we still need to do the flush
@@ -1997,31 +3102,27 @@ XLogFlush(XLogRecPtr record)
                 */
                if (CommitDelay > 0 && enableFsync &&
                        MinimumActiveBackends(CommitSiblings))
+               {
                        pg_usleep(CommitDelay);
 
+                       /*
+                        * Re-check how far we can now flush the WAL. It's generally not
+                        * safe to call WaitXLogInsetionsToFinish while holding
+                        * WALWriteLock, because an in-progress insertion might need to
+                        * also grab WALWriteLock to make progress. But we know that all
+                        * the insertions up to insertpos have already finished, because
+                        * that's what the earlier WaitXLogInsertionsToFinish() returned.
+                        * We're only calling it again to allow insertpos to be moved
+                        * further forward, not to actually wait for anyone.
+                        */
+                       insertpos = WaitXLogInsertionsToFinish(insertpos);
+               }
+
                /* try to write/flush later additions to XLOG as well */
-               if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE))
-               {
-                       XLogCtlInsert *Insert = &XLogCtl->Insert;
-                       uint32          freespace = INSERT_FREESPACE(Insert);
+               WriteRqst.Write = insertpos;
+               WriteRqst.Flush = insertpos;
 
-                       if (freespace == 0) /* buffer is full */
-                               WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
-                       else
-                       {
-                               WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
-                               WriteRqstPtr -= freespace;
-                       }
-                       LWLockRelease(WALInsertLock);
-                       WriteRqst.Write = WriteRqstPtr;
-                       WriteRqst.Flush = WriteRqstPtr;
-               }
-               else
-               {
-                       WriteRqst.Write = WriteRqstPtr;
-                       WriteRqst.Flush = record;
-               }
-               XLogWrite(WriteRqst, false, false);
+               XLogWrite(WriteRqst, false);
 
                LWLockRelease(WALWriteLock);
                /* done */
@@ -2142,7 +3243,8 @@ XLogBackgroundFlush(void)
 
        START_CRIT_SECTION();
 
-       /* now wait for the write lock */
+       /* now wait for any in-progress insertions to finish and get write lock */
+       WaitXLogInsertionsToFinish(WriteRqstPtr);
        LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
        LogwrtResult = XLogCtl->LogwrtResult;
        if (WriteRqstPtr > LogwrtResult.Flush)
@@ -2151,7 +3253,7 @@ XLogBackgroundFlush(void)
 
                WriteRqst.Write = WriteRqstPtr;
                WriteRqst.Flush = WriteRqstPtr;
-               XLogWrite(WriteRqst, flexible, false);
+               XLogWrite(WriteRqst, flexible);
                wrote_something = true;
        }
        LWLockRelease(WALWriteLock);
@@ -2161,6 +3263,12 @@ XLogBackgroundFlush(void)
        /* wake up walsenders now that we've released heavily contended locks */
        WalSndWakeupProcessRequests();
 
+       /*
+        * Great, done. To take some work off the critical path, try to initialize
+        * as many of the no-longer-needed WAL buffers for future use as we can.
+        */
+       AdvanceXLInsertBuffer(InvalidXLogRecPtr, true);
+
        return wrote_something;
 }
 
@@ -3937,10 +5045,13 @@ XLOGShmemSize(void)
 
        /* XLogCtl */
        size = sizeof(XLogCtlData);
+
+       /* xlog insertion slots, plus alignment */
+       size = add_size(size, mul_size(sizeof(XLogInsertSlotPadded), num_xloginsert_slots + 1));
        /* xlblocks array */
        size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers));
        /* extra alignment padding for XLOG I/O buffers */
-       size = add_size(size, ALIGNOF_XLOG_BUFFER);
+       size = add_size(size, XLOG_BLCKSZ);
        /* and the buffers themselves */
        size = add_size(size, mul_size(XLOG_BLCKSZ, XLOGbuffers));
 
@@ -3959,11 +5070,11 @@ XLOGShmemInit(void)
        bool            foundCFile,
                                foundXLog;
        char       *allocptr;
+       int                     i;
 
        ControlFile = (ControlFileData *)
                ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile);
-       XLogCtl = (XLogCtlData *)
-               ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog);
+       allocptr = ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog);
 
        if (foundCFile || foundXLog)
        {
@@ -3971,7 +5082,7 @@ XLOGShmemInit(void)
                Assert(foundCFile && foundXLog);
                return;
        }
-
+       XLogCtl = (XLogCtlData *) allocptr;
        memset(XLogCtl, 0, sizeof(XLogCtlData));
 
        /*
@@ -3979,15 +5090,23 @@ XLOGShmemInit(void)
         * multiple of the alignment for same, so no extra alignment padding is
         * needed here.
         */
-       allocptr = ((char *) XLogCtl) + sizeof(XLogCtlData);
+       allocptr += sizeof(XLogCtlData);
        XLogCtl->xlblocks = (XLogRecPtr *) allocptr;
        memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
        allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
 
+       /* Xlog insertion slots. Ensure they're aligned to the full padded size */
+       allocptr += sizeof(XLogInsertSlotPadded) -
+               ((uintptr_t) allocptr) % sizeof(XLogInsertSlotPadded);
+       XLogCtl->Insert.insertSlots = (XLogInsertSlotPadded *) allocptr;
+       allocptr += sizeof(XLogInsertSlotPadded) * num_xloginsert_slots;
+
        /*
-        * Align the start of the page buffers to an ALIGNOF_XLOG_BUFFER boundary.
+        * Align the start of the page buffers to a full xlog block size boundary.
+        * This simplifies some calculations in XLOG insertion. It is also required
+        * for O_DIRECT.
         */
-       allocptr = (char *) TYPEALIGN(ALIGNOF_XLOG_BUFFER, allocptr);
+       allocptr = (char *) TYPEALIGN(XLOG_BLCKSZ, allocptr);
        XLogCtl->pages = allocptr;
        memset(XLogCtl->pages, 0, (Size) XLOG_BLCKSZ * XLOGbuffers);
 
@@ -3999,7 +5118,21 @@ XLOGShmemInit(void)
        XLogCtl->SharedRecoveryInProgress = true;
        XLogCtl->SharedHotStandbyActive = false;
        XLogCtl->WalWriterSleeping = false;
-       XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
+
+       for (i = 0; i < num_xloginsert_slots; i++)
+       {
+               XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[i].slot;
+               SpinLockInit(&slot->mutex);
+               slot->xlogInsertingAt = InvalidXLogRecPtr;
+               slot->owner = NULL;
+
+               slot->releaseOK = true;
+               slot->exclusive = 0;
+               slot->head = NULL;
+               slot->tail = NULL;
+       }
+
+       SpinLockInit(&XLogCtl->Insert.insertpos_lck);
        SpinLockInit(&XLogCtl->info_lck);
        SpinLockInit(&XLogCtl->ulsn_lck);
        InitSharedLatch(&XLogCtl->recoveryWakeupLatch);
@@ -4050,8 +5183,8 @@ BootStrapXLOG(void)
        ThisTimeLineID = 1;
 
        /* page buffer must be aligned suitably for O_DIRECT */
-       buffer = (char *) palloc(XLOG_BLCKSZ + ALIGNOF_XLOG_BUFFER);
-       page = (XLogPageHeader) TYPEALIGN(ALIGNOF_XLOG_BUFFER, buffer);
+       buffer = (char *) palloc(XLOG_BLCKSZ + XLOG_BLCKSZ);
+       page = (XLogPageHeader) TYPEALIGN(XLOG_BLCKSZ, buffer);
        memset(page, 0, XLOG_BLCKSZ);
 
        /*
@@ -4893,6 +6026,7 @@ StartupXLOG(void)
        bool            backupEndRequired = false;
        bool            backupFromStandby = false;
        DBState         dbstate_at_startup;
+       int                     firstIdx;
        XLogReaderState *xlogreader;
        XLogPageReadPrivate private;
        bool            fast_promoted = false;
@@ -5257,7 +6391,7 @@ StartupXLOG(void)
 
        lastFullPageWrites = checkPoint.fullPageWrites;
 
-       RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
+       RedoRecPtr = XLogCtl->RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
 
        if (RecPtr < checkPoint.redo)
                ereport(PANIC,
@@ -5899,25 +7033,21 @@ StartupXLOG(void)
        openLogFile = XLogFileOpen(openLogSegNo);
        openLogOff = 0;
        Insert = &XLogCtl->Insert;
-       Insert->PrevRecord = LastRec;
-       XLogCtl->xlblocks[0] = ((EndOfLog - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ;
+       Insert->PrevBytePos = XLogRecPtrToBytePos(LastRec);
+
+       firstIdx = XLogRecEndPtrToBufIdx(EndOfLog);
+       XLogCtl->curridx = firstIdx;
+
+       XLogCtl->xlblocks[firstIdx] = ((EndOfLog - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ;
 
        /*
         * Tricky point here: readBuf contains the *last* block that the LastRec
         * record spans, not the one it starts in.      The last block is indeed the
         * one we want to use.
         */
-       if (EndOfLog % XLOG_BLCKSZ == 0)
-       {
-               memset(Insert->currpage, 0, XLOG_BLCKSZ);
-       }
-       else
-       {
-               Assert(readOff == (XLogCtl->xlblocks[0] - XLOG_BLCKSZ) % XLogSegSize);
-               memcpy((char *) Insert->currpage, xlogreader->readBuf, XLOG_BLCKSZ);
-       }
-       Insert->currpos = (char *) Insert->currpage +
-               (EndOfLog + XLOG_BLCKSZ - XLogCtl->xlblocks[0]);
+       Assert(readOff == (XLogCtl->xlblocks[firstIdx] - XLOG_BLCKSZ) % XLogSegSize);
+       memcpy((char *) &XLogCtl->pages[firstIdx * XLOG_BLCKSZ], xlogreader->readBuf, XLOG_BLCKSZ);
+       Insert->CurrBytePos = XLogRecPtrToBytePos(EndOfLog);
 
        LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
 
@@ -5926,12 +7056,12 @@ StartupXLOG(void)
        XLogCtl->LogwrtRqst.Write = EndOfLog;
        XLogCtl->LogwrtRqst.Flush = EndOfLog;
 
-       freespace = INSERT_FREESPACE(Insert);
+       freespace = INSERT_FREESPACE(EndOfLog);
        if (freespace > 0)
        {
                /* Make sure rest of page is zero */
-               MemSet(Insert->currpos, 0, freespace);
-               XLogCtl->Write.curridx = 0;
+               MemSet(&XLogCtl->pages[firstIdx * XLOG_BLCKSZ] + EndOfLog % XLOG_BLCKSZ, 0, freespace);
+               XLogCtl->Write.curridx = firstIdx;
        }
        else
        {
@@ -5943,7 +7073,7 @@ StartupXLOG(void)
                 * this is sufficient.  The first actual attempt to insert a log
                 * record will advance the insert state.
                 */
-               XLogCtl->Write.curridx = NextBufIdx(0);
+               XLogCtl->Write.curridx = NextBufIdx(firstIdx);
        }
 
        /* Pre-scan prepared transactions to find out the range of XIDs present */
@@ -6504,21 +7634,29 @@ InitXLOGAccess(void)
 }
 
 /*
- * Once spawned, a backend may update its local RedoRecPtr from
- * XLogCtl->Insert.RedoRecPtr; it must hold the insert lock or info_lck
- * to do so.  This is done in XLogInsert() or GetRedoRecPtr().
+ * Return the current Redo pointer from shared memory.
+ *
+ * As a side-effect, the local RedoRecPtr copy is updated.
  */
 XLogRecPtr
 GetRedoRecPtr(void)
 {
        /* use volatile pointer to prevent code rearrangement */
        volatile XLogCtlData *xlogctl = XLogCtl;
+       XLogRecPtr ptr;
 
+       /*
+        * The possibly not up-to-date copy in XlogCtl is enough. Even if we
+        * grabbed a WAL insertion slot to read the master copy, someone might
+        * update it just after we've released the lock.
+        */
        SpinLockAcquire(&xlogctl->info_lck);
-       Assert(RedoRecPtr <= xlogctl->Insert.RedoRecPtr);
-       RedoRecPtr = xlogctl->Insert.RedoRecPtr;
+       ptr = xlogctl->RedoRecPtr;
        SpinLockRelease(&xlogctl->info_lck);
 
+       if (RedoRecPtr < ptr)
+               RedoRecPtr = ptr;
+
        return RedoRecPtr;
 }
 
@@ -6527,9 +7665,8 @@ GetRedoRecPtr(void)
  *
  * NOTE: The value *actually* returned is the position of the last full
  * xlog page. It lags behind the real insert position by at most 1 page.
- * For that, we don't need to acquire WALInsertLock which can be quite
- * heavily contended, and an approximation is enough for the current
- * usage of this function.
+ * For that, we don't need to scan through WAL insertion slots, and an
+ * approximation is enough for the current usage of this function.
  */
 XLogRecPtr
 GetInsertRecPtr(void)
@@ -6806,6 +7943,8 @@ LogCheckpointEnd(bool restartpoint)
 void
 CreateCheckPoint(int flags)
 {
+       /* use volatile pointer to prevent code rearrangement */
+       volatile XLogCtlData *xlogctl = XLogCtl;
        bool            shutdown;
        CheckPoint      checkPoint;
        XLogRecPtr      recptr;
@@ -6813,6 +7952,7 @@ CreateCheckPoint(int flags)
        XLogRecData rdata;
        uint32          freespace;
        XLogSegNo       _logSegNo;
+       XLogRecPtr      curInsert;
        VirtualTransactionId *vxids;
        int                     nvxids;
 
@@ -6883,10 +8023,11 @@ CreateCheckPoint(int flags)
                checkPoint.oldestActiveXid = InvalidTransactionId;
 
        /*
-        * We must hold WALInsertLock while examining insert state to determine
-        * the checkpoint REDO pointer.
+        * We must block concurrent insertions while examining insert state to
+        * determine the checkpoint REDO pointer.
         */
-       LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+       WALInsertSlotAcquire(true);
+       curInsert = XLogBytePosToRecPtr(Insert->CurrBytePos);
 
        /*
         * If this isn't a shutdown or forced checkpoint, and we have not inserted
@@ -6906,14 +8047,11 @@ CreateCheckPoint(int flags)
        if ((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY |
                                  CHECKPOINT_FORCE)) == 0)
        {
-               XLogRecPtr      curInsert;
-
-               INSERT_RECPTR(curInsert, Insert, Insert->curridx);
                if (curInsert == ControlFile->checkPoint +
                        MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
                        ControlFile->checkPoint == ControlFile->checkPointCopy.redo)
                {
-                       LWLockRelease(WALInsertLock);
+                       WALInsertSlotRelease();
                        LWLockRelease(CheckpointLock);
                        END_CRIT_SECTION();
                        return;
@@ -6945,18 +8083,19 @@ CreateCheckPoint(int flags)
         * the buffer flush work.  Those XLOG records are logically after the
         * checkpoint, even though physically before it.  Got that?
         */
-       freespace = INSERT_FREESPACE(Insert);
+       freespace = INSERT_FREESPACE(curInsert);
        if (freespace == 0)
        {
-               (void) AdvanceXLInsertBuffer(false);
-               /* OK to ignore update return flag, since we will do flush anyway */
-               freespace = INSERT_FREESPACE(Insert);
+               if (curInsert % XLogSegSize == 0)
+                       curInsert += SizeOfXLogLongPHD;
+               else
+                       curInsert += SizeOfXLogShortPHD;
        }
-       INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx);
+       checkPoint.redo = curInsert;
 
        /*
         * Here we update the shared RedoRecPtr for future XLogInsert calls; this
-        * must be done while holding the insert lock AND the info_lck.
+        * must be done while holding the insertion slots.
         *
         * Note: if we fail to complete the checkpoint, RedoRecPtr will be left
         * pointing past where it really needs to point.  This is okay; the only
@@ -6965,20 +8104,18 @@ CreateCheckPoint(int flags)
         * XLogInserts that happen while we are dumping buffers must assume that
         * their buffer changes are not included in the checkpoint.
         */
-       {
-               /* use volatile pointer to prevent code rearrangement */
-               volatile XLogCtlData *xlogctl = XLogCtl;
-
-               SpinLockAcquire(&xlogctl->info_lck);
-               RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
-               SpinLockRelease(&xlogctl->info_lck);
-       }
+       RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
 
        /*
-        * Now we can release WAL insert lock, allowing other xacts to proceed
-        * while we are flushing disk buffers.
+        * Now we can release the WAL insertion slots, allowing other xacts to
+        * proceed while we are flushing disk buffers.
         */
-       LWLockRelease(WALInsertLock);
+       WALInsertSlotRelease();
+
+       /* Update the info_lck-protected copy of RedoRecPtr as well */
+       SpinLockAcquire(&xlogctl->info_lck);
+       xlogctl->RedoRecPtr = checkPoint.redo;
+       SpinLockRelease(&xlogctl->info_lck);
 
        /*
         * If enabled, log checkpoint start.  We postpone this until now so as not
@@ -7003,10 +8140,11 @@ CreateCheckPoint(int flags)
         * we wait till he's out of his commit critical section before proceeding.
         * See notes in RecordTransactionCommit().
         *
-        * Because we've already released WALInsertLock, this test is a bit fuzzy:
-        * it is possible that we will wait for xacts we didn't really need to
-        * wait for.  But the delay should be short and it seems better to make
-        * checkpoint take a bit longer than to hold locks longer than necessary.
+        * Because we've already released the insertion slots, this test is a bit
+        * fuzzy: it is possible that we will wait for xacts we didn't really need
+        * to wait for.  But the delay should be short and it seems better to make
+        * checkpoint take a bit longer than to hold off insertions longer than
+        * necessary.
         * (In fact, the whole reason we have this issue is that xact.c does
         * commit record XLOG insertion and clog update as two separate steps
         * protected by different locks, but again that seems best on grounds of
@@ -7233,10 +8371,10 @@ CreateEndOfRecoveryRecord(void)
 
        xlrec.end_time = time(NULL);
 
-       LWLockAcquire(WALInsertLock, LW_SHARED);
+       WALInsertSlotAcquire(true);
        xlrec.ThisTimeLineID = ThisTimeLineID;
        xlrec.PrevTimeLineID = XLogCtl->PrevTimeLineID;
-       LWLockRelease(WALInsertLock);
+       WALInsertSlotRelease();
 
        LocalSetXLogInsertAllowed();
 
@@ -7437,15 +8575,18 @@ CreateRestartPoint(int flags)
         * the number of segments replayed since last restartpoint, and request a
         * restartpoint if it exceeds checkpoint_segments.
         *
-        * You need to hold WALInsertLock and info_lck to update it, although
-        * during recovery acquiring WALInsertLock is just pro forma, because
-        * there is no other processes updating Insert.RedoRecPtr.
+        * Like in CreateCheckPoint(), hold off insertions to update it, although
+        * during recovery this is just pro forma, because no WAL insertions are
+        * happening.
         */
-       LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
-       SpinLockAcquire(&xlogctl->info_lck);
+       WALInsertSlotAcquire(true);
        xlogctl->Insert.RedoRecPtr = lastCheckPoint.redo;
+       WALInsertSlotRelease();
+
+       /* Also update the info_lck-protected copy */
+       SpinLockAcquire(&xlogctl->info_lck);
+       xlogctl->RedoRecPtr = lastCheckPoint.redo;
        SpinLockRelease(&xlogctl->info_lck);
-       LWLockRelease(WALInsertLock);
 
        /*
         * Prepare to accumulate statistics.
@@ -7863,9 +9004,9 @@ UpdateFullPageWrites(void)
         */
        if (fullPageWrites)
        {
-               LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+               WALInsertSlotAcquire(true);
                Insert->fullPageWrites = true;
-               LWLockRelease(WALInsertLock);
+               WALInsertSlotRelease();
        }
 
        /*
@@ -7886,9 +9027,9 @@ UpdateFullPageWrites(void)
 
        if (!fullPageWrites)
        {
-               LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+               WALInsertSlotAcquire(true);
                Insert->fullPageWrites = false;
-               LWLockRelease(WALInsertLock);
+               WALInsertSlotRelease();
        }
        END_CRIT_SECTION();
 }
@@ -8520,15 +9661,15 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
         * Note that forcePageWrites has no effect during an online backup from
         * the standby.
         *
-        * We must hold WALInsertLock to change the value of forcePageWrites, to
-        * ensure adequate interlocking against XLogInsert().
+        * We must hold all the insertion slots to change the value of
+        * forcePageWrites, to ensure adequate interlocking against XLogInsert().
         */
-       LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+       WALInsertSlotAcquire(true);
        if (exclusive)
        {
                if (XLogCtl->Insert.exclusiveBackup)
                {
-                       LWLockRelease(WALInsertLock);
+                       WALInsertSlotRelease();
                        ereport(ERROR,
                                        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                         errmsg("a backup is already in progress"),
@@ -8539,7 +9680,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
        else
                XLogCtl->Insert.nonExclusiveBackups++;
        XLogCtl->Insert.forcePageWrites = true;
-       LWLockRelease(WALInsertLock);
+       WALInsertSlotRelease();
 
        /* Ensure we release forcePageWrites if fail below */
        PG_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) BoolGetDatum(exclusive));
@@ -8654,13 +9795,13 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
                         * taking a checkpoint right after another is not that expensive
                         * either because only few buffers have been dirtied yet.
                         */
-                       LWLockAcquire(WALInsertLock, LW_SHARED);
+                       WALInsertSlotAcquire(true);
                        if (XLogCtl->Insert.lastBackupStart < startpoint)
                        {
                                XLogCtl->Insert.lastBackupStart = startpoint;
                                gotUniqueStartpoint = true;
                        }
-                       LWLockRelease(WALInsertLock);
+                       WALInsertSlotRelease();
                } while (!gotUniqueStartpoint);
 
                XLByteToSeg(startpoint, _logSegNo);
@@ -8750,7 +9891,7 @@ pg_start_backup_callback(int code, Datum arg)
        bool            exclusive = DatumGetBool(arg);
 
        /* Update backup counters and forcePageWrites on failure */
-       LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+       WALInsertSlotAcquire(true);
        if (exclusive)
        {
                Assert(XLogCtl->Insert.exclusiveBackup);
@@ -8767,7 +9908,7 @@ pg_start_backup_callback(int code, Datum arg)
        {
                XLogCtl->Insert.forcePageWrites = false;
        }
-       LWLockRelease(WALInsertLock);
+       WALInsertSlotRelease();
 }
 
 /*
@@ -8838,7 +9979,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
        /*
         * OK to update backup counters and forcePageWrites
         */
-       LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+       WALInsertSlotAcquire(true);
        if (exclusive)
                XLogCtl->Insert.exclusiveBackup = false;
        else
@@ -8858,7 +9999,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
        {
                XLogCtl->Insert.forcePageWrites = false;
        }
-       LWLockRelease(WALInsertLock);
+       WALInsertSlotRelease();
 
        if (exclusive)
        {
@@ -9143,7 +10284,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
 void
 do_pg_abort_backup(void)
 {
-       LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+       WALInsertSlotAcquire(true);
        Assert(XLogCtl->Insert.nonExclusiveBackups > 0);
        XLogCtl->Insert.nonExclusiveBackups--;
 
@@ -9152,7 +10293,7 @@ do_pg_abort_backup(void)
        {
                XLogCtl->Insert.forcePageWrites = false;
        }
-       LWLockRelease(WALInsertLock);
+       WALInsertSlotRelease();
 }
 
 /*
@@ -9184,14 +10325,14 @@ GetXLogReplayRecPtr(TimeLineID *replayTLI)
 XLogRecPtr
 GetXLogInsertRecPtr(void)
 {
-       XLogCtlInsert *Insert = &XLogCtl->Insert;
-       XLogRecPtr      current_recptr;
+       volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+       uint64          current_bytepos;
 
-       LWLockAcquire(WALInsertLock, LW_SHARED);
-       INSERT_RECPTR(current_recptr, Insert, Insert->curridx);
-       LWLockRelease(WALInsertLock);
+       SpinLockAcquire(&Insert->insertpos_lck);
+       current_bytepos = Insert->CurrBytePos;
+       SpinLockRelease(&Insert->insertpos_lck);
 
-       return current_recptr;
+       return XLogBytePosToRecPtr(current_bytepos);
 }
 
 /*
index 5503925788e400c436266ee15856cb34fd66b5c6..f054be8806f3bee08d7ec0c659925243df92a50b 100644 (file)
@@ -22,6 +22,7 @@
  */
 #include "postgres.h"
 
+#include "access/xlog.h"
 #include "miscadmin.h"
 #include "replication/walsender.h"
 #include "storage/lwlock.h"
@@ -63,6 +64,7 @@ SpinlockSemas(void)
        nsemas = NumLWLocks();          /* one for each lwlock */
        nsemas += NBuffers;                     /* one for each buffer header */
        nsemas += max_wal_senders;      /* one for each wal sender process */
+       nsemas += num_xloginsert_slots; /* one for each WAL insertion slot */
        nsemas += 30;                           /* plus a bunch for other small-scale use */
 
        return nsemas;
index d6200616de8df517096fa9f9540887ad1a476439..5aefd1b62c56056d9705a027a78a23f222a7ad36 100644 (file)
@@ -2037,6 +2037,17 @@ static struct config_int ConfigureNamesInt[] =
                NULL, NULL, NULL
        },
 
+       {
+               {"xloginsert_slots", PGC_POSTMASTER, WAL_SETTINGS,
+                       gettext_noop("Sets the number of slots for concurrent xlog insertions."),
+                       NULL,
+                       GUC_NOT_IN_SAMPLE
+               },
+               &num_xloginsert_slots,
+               8, 1, 1000,
+               NULL, NULL, NULL
+       },
+
        {
                /* see max_connections */
                {"max_wal_senders", PGC_POSTMASTER, REPLICATION_SENDING,
index 83e583259dd39ec0ad8fceacade42e082efeecc0..002862cca50c6028a0dd18d3a591756018d786da 100644 (file)
@@ -190,6 +190,7 @@ extern char *XLogArchiveCommand;
 extern bool EnableHotStandby;
 extern bool fullPageWrites;
 extern bool log_checkpoints;
+extern int     num_xloginsert_slots;
 
 /* WAL levels */
 typedef enum WalLevel
index fa6497adaddc830cfcb4dbaab4deb43a464cea35..bca166ebdcd9ff4f001403bcb960e73f17912d4c 100644 (file)
@@ -93,14 +93,4 @@ typedef uint32 TimeLineID;
 #define DEFAULT_SYNC_METHOD            SYNC_METHOD_FSYNC
 #endif
 
-/*
- * Limitation of buffer-alignment for direct IO depends on OS and filesystem,
- * but XLOG_BLCKSZ is assumed to be enough for it.
- */
-#ifdef O_DIRECT
-#define ALIGNOF_XLOG_BUFFER            XLOG_BLCKSZ
-#else
-#define ALIGNOF_XLOG_BUFFER            ALIGNOF_BUFFER
-#endif
-
 #endif   /* XLOG_DEFS_H */
index d8f7e9d64a080da690826edcc581659bf27aff47..85dc4ffdaa2ece93af5cc23b69dae251e5cf205e 100644 (file)
@@ -53,7 +53,7 @@ typedef enum LWLockId
        ProcArrayLock,
        SInvalReadLock,
        SInvalWriteLock,
-       WALInsertLock,
+       WALBufMappingLock,
        WALWriteLock,
        ControlFileLock,
        CheckpointLock,