From: Heikki Linnakangas Date: Mon, 8 Jul 2013 08:23:56 +0000 (+0300) Subject: Improve scalability of WAL insertions. X-Git-Tag: REL9_4_BETA1~1350 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=9a20a9b2;p=postgresql Improve scalability of WAL insertions. This patch replaces WALInsertLock with a number of WAL insertion slots, allowing multiple backends to insert WAL records to the WAL buffers concurrently. This is particularly useful for parallel loading large amounts of data on a system with many CPUs. This has one user-visible change: switching to a new WAL segment with pg_switch_xlog() now fills the remaining unused portion of the segment with zeros. This potentially adds some overhead, but it has been a very common practice by DBA's to clear the "tail" of the segment with an external pg_clearxlogtail utility anyway, to make the WAL files compress better. With this patch, it's no longer necessary to do that. This patch adds a new GUC, xloginsert_slots, to tune the number of WAL insertion slots. Performance testing suggests that the default, 8, works pretty well for all kinds of worklods, but I left the GUC in place to allow others with different hardware to test that easily. We might want to remove that before release. Reviewed by Andres Freund. --- diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 77e5c3b5d8..acf0dd1876 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -41,6 +41,7 @@ #include "postmaster/startup.h" #include "replication/walreceiver.h" #include "replication/walsender.h" +#include "storage/barrier.h" #include "storage/bufmgr.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -83,6 +84,7 @@ int sync_method = DEFAULT_SYNC_METHOD; int wal_level = WAL_LEVEL_MINIMAL; int CommitDelay = 0; /* precommit delay in microseconds */ int CommitSiblings = 5; /* # concurrent xacts needed to sleep */ +int num_xloginsert_slots = 8; #ifdef WAL_DEBUG bool XLOG_DEBUG = false; @@ -279,8 +281,8 @@ XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr; * (which is almost but not quite the same as a pointer to the most recent * CHECKPOINT record). We update this from the shared-memory copy, * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we - * hold the Insert lock). See XLogInsert for details. We are also allowed - * to update from XLogCtl->Insert.RedoRecPtr if we hold the info_lck; + * hold an insertion slot). See XLogInsert for details. We are also allowed + * to update from XLogCtl->RedoRecPtr if we hold the info_lck; * see GetRedoRecPtr. A freshly spawned backend obtains the value during * InitXLOGAccess. */ @@ -321,7 +323,10 @@ static XLogRecPtr RedoStartLSN = InvalidXLogRecPtr; * so it's a plain spinlock. The other locks are held longer (potentially * over I/O operations), so we use LWLocks for them. These locks are: * - * WALInsertLock: must be held to insert a record into the WAL buffers. + * WALBufMappingLock: must be held to replace a page in the WAL buffer cache. + * It is only held while initializing and changing the mapping. If the + * contents of the buffer being replaced haven't been written yet, the mapping + * lock is released while the write is done, and reacquired afterwards. * * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or * XLogFlush). @@ -348,17 +353,83 @@ typedef struct XLogwrtResult XLogRecPtr Flush; /* last byte + 1 flushed */ } XLogwrtResult; + +/* + * A slot for inserting to the WAL. This is similar to an LWLock, the main + * difference is that there is an extra xlogInsertingAt field that is protected + * by the same mutex. Unlike an LWLock, a slot can only be acquired in + * exclusive mode. + * + * The xlogInsertingAt field is used to advertise to other processes how far + * the slot owner has progressed in inserting the record. When a backend + * acquires a slot, it initializes xlogInsertingAt to 1, because it doesn't + * yet know where it's going to insert the record. That's conservative + * but correct; the new insertion is certainly going to go to a byte position + * greater than 1. If another backend needs to flush the WAL, it will have to + * wait for the new insertion. xlogInsertingAt is updated after finishing the + * insert or when crossing a page boundary, which will wake up anyone waiting + * for it, whether the wait was necessary in the first place or not. + * + * A process can wait on a slot in two modes: LW_EXCLUSIVE or + * LW_WAIT_UNTIL_FREE. LW_EXCLUSIVE works like in an lwlock; when the slot is + * released, the first LW_EXCLUSIVE waiter in the queue is woken up. Processes + * waiting in LW_WAIT_UNTIL_FREE mode are woken up whenever the slot is + * released, or xlogInsertingAt is updated. In other words, a process in + * LW_WAIT_UNTIL_FREE mode is woken up whenever the inserter makes any progress + * copying the record in place. LW_WAIT_UNTIL_FREE waiters are always added to + * the front of the queue, while LW_EXCLUSIVE waiters are appended to the end. + * + * To join the wait queue, a process must set MyProc->lwWaitMode to the mode + * it wants to wait in, MyProc->lwWaiting to true, and link MyProc to the head + * or tail of the wait queue. The same mechanism is used to wait on an LWLock, + * see lwlock.c for details. + */ +typedef struct +{ + slock_t mutex; /* protects the below fields */ + XLogRecPtr xlogInsertingAt; /* insert has completed up to this point */ + + PGPROC *owner; /* for debugging purposes */ + + bool releaseOK; /* T if ok to release waiters */ + char exclusive; /* # of exclusive holders (0 or 1) */ + PGPROC *head; /* head of list of waiting PGPROCs */ + PGPROC *tail; /* tail of list of waiting PGPROCs */ + /* tail is undefined when head is NULL */ +} XLogInsertSlot; + +/* + * All the slots are allocated as an array in shared memory. We force the + * array stride to be a power of 2, which saves a few cycles in indexing, but + * more importantly also ensures that individual slots don't cross cache line + * boundaries. (Of course, we have to also ensure that the array start + * address is suitably aligned.) + */ +typedef union XLogInsertSlotPadded +{ + XLogInsertSlot slot; + char pad[64]; +} XLogInsertSlotPadded; + /* * Shared state data for XLogInsert. */ typedef struct XLogCtlInsert { - XLogRecPtr PrevRecord; /* start of previously-inserted record */ - int curridx; /* current block index in cache */ - XLogPageHeader currpage; /* points to header of block in cache */ - char *currpos; /* current insertion point in cache */ - XLogRecPtr RedoRecPtr; /* current redo point for insertions */ - bool forcePageWrites; /* forcing full-page writes for PITR? */ + slock_t insertpos_lck; /* protects CurrBytePos and PrevBytePos */ + + /* + * CurrBytePos is the end of reserved WAL. The next record will be inserted + * at that position. PrevBytePos is the start position of the previously + * inserted (or rather, reserved) record - it is copied to the the prev- + * link of the next record. These are stored as "usable byte positions" + * rather than XLogRecPtrs (see XLogBytePosToRecPtr()). + */ + uint64 CurrBytePos; + uint64 PrevBytePos; + + /* insertion slots, see above for details */ + XLogInsertSlotPadded *insertSlots; /* * fullPageWrites is the master copy used by all backends to determine @@ -366,7 +437,12 @@ typedef struct XLogCtlInsert * This is required because, when full_page_writes is changed by SIGHUP, * we must WAL-log it before it actually affects WAL-logging by backends. * Checkpointer sets at startup or after SIGHUP. + * + * To read these fields, you must hold an insertion slot. To modify them, + * you must hold ALL the slots. */ + XLogRecPtr RedoRecPtr; /* current redo point for insertions */ + bool forcePageWrites; /* forcing full-page writes for PITR? */ bool fullPageWrites; /* @@ -395,11 +471,11 @@ typedef struct XLogCtlWrite */ typedef struct XLogCtlData { - /* Protected by WALInsertLock: */ XLogCtlInsert Insert; /* Protected by info_lck: */ XLogwrtRqst LogwrtRqst; + XLogRecPtr RedoRecPtr; /* a recent copy of Insert->RedoRecPtr */ uint32 ckptXidEpoch; /* nextXID & epoch of latest checkpoint */ TransactionId ckptXid; XLogRecPtr asyncXactLSN; /* LSN of newest async commit/abort */ @@ -419,10 +495,21 @@ typedef struct XLogCtlData */ XLogwrtResult LogwrtResult; + /* + * Latest initialized block index in cache. + * + * To change curridx and the identity of a buffer, you need to hold + * WALBufMappingLock. To change the identity of a buffer that's still + * dirty, the old page needs to be written out first, and for that you + * need WALWriteLock, and you need to ensure that there are no in-progress + * insertions to the page by calling WaitXLogInsertionsToFinish(). + */ + int curridx; + /* * These values do not change after startup, although the pointed-to pages - * and xlblocks values certainly do. Permission to read/write the pages - * and xlblocks values depends on WALInsertLock and WALWriteLock. + * and xlblocks values certainly do. xlblock values are protected by + * WALBufMappingLock. */ char *pages; /* buffers for unwritten XLOG pages */ XLogRecPtr *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */ @@ -518,24 +605,34 @@ static XLogCtlData *XLogCtl = NULL; static ControlFileData *ControlFile = NULL; /* - * Macros for managing XLogInsert state. In most cases, the calling routine - * has local copies of XLogCtl->Insert and/or XLogCtl->Insert->curridx, - * so these are passed as parameters instead of being fetched via XLogCtl. + * Calculate the amount of space left on the page after 'endptr'. Beware + * multiple evaluation! */ +#define INSERT_FREESPACE(endptr) \ + (((endptr) % XLOG_BLCKSZ == 0) ? 0 : (XLOG_BLCKSZ - (endptr) % XLOG_BLCKSZ)) -/* Free space remaining in the current xlog page buffer */ -#define INSERT_FREESPACE(Insert) \ - (XLOG_BLCKSZ - ((Insert)->currpos - (char *) (Insert)->currpage)) +/* Macro to advance to next buffer index. */ +#define NextBufIdx(idx) \ + (((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1)) -/* Construct XLogRecPtr value for current insertion point */ -#define INSERT_RECPTR(recptr,Insert,curridx) \ - (recptr) = XLogCtl->xlblocks[curridx] - INSERT_FREESPACE(Insert) +/* + * XLogRecPtrToBufIdx returns the index of the WAL buffer that holds, or + * would hold if it was in cache, the page containing 'recptr'. + * + * XLogRecEndPtrToBufIdx is the same, but a pointer to the first byte of a + * page is taken to mean the previous page. + */ +#define XLogRecPtrToBufIdx(recptr) \ + (((recptr) / XLOG_BLCKSZ) % (XLogCtl->XLogCacheBlck + 1)) -#define PrevBufIdx(idx) \ - (((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1)) +#define XLogRecEndPtrToBufIdx(recptr) \ + ((((recptr) - 1) / XLOG_BLCKSZ) % (XLogCtl->XLogCacheBlck + 1)) -#define NextBufIdx(idx) \ - (((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1)) +/* + * These are the number of bytes in a WAL page and segment usable for WAL data. + */ +#define UsableBytesInPage (XLOG_BLCKSZ - SizeOfXLogShortPHD) +#define UsableBytesInSegment ((XLOG_SEG_SIZE / XLOG_BLCKSZ) * UsableBytesInPage - (SizeOfXLogLongPHD - SizeOfXLogShortPHD)) /* * Private, possibly out-of-date copy of shared LogwrtResult. @@ -631,6 +728,9 @@ static bool InRedo = false; /* Have we launched bgwriter during recovery? */ static bool bgwriterLaunched = false; +/* For WALInsertSlotAcquire/Release functions */ +static int MySlotNo = 0; +static bool holdingAllSlots = false; static void readRecoveryCommandFile(void); static void exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo); @@ -651,9 +751,9 @@ static bool XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock, XLogRecPtr *lsn, BkpBlock *bkpb); static Buffer RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb, char *blk, bool get_cleanup_lock, bool keep_buffer); -static bool AdvanceXLInsertBuffer(bool new_segment); +static void AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic); static bool XLogCheckpointNeeded(XLogSegNo new_segno); -static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch); +static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible); static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, bool find_free, int *max_advance, bool use_lock); @@ -693,6 +793,24 @@ static bool read_backup_label(XLogRecPtr *checkPointLoc, static void rm_redo_error_callback(void *arg); static int get_sync_bit(int method); +static void CopyXLogRecordToWAL(int write_len, bool isLogSwitch, + XLogRecData *rdata, + XLogRecPtr StartPos, XLogRecPtr EndPos); +static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, + XLogRecPtr *EndPos, XLogRecPtr *PrevPtr); +static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, + XLogRecPtr *PrevPtr); +static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto); +static void WakeupWaiters(XLogRecPtr EndPos); +static char *GetXLogBuffer(XLogRecPtr ptr); +static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos); +static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos); +static uint64 XLogRecPtrToBytePos(XLogRecPtr ptr); + +static void WALInsertSlotAcquire(bool exclusive); +static void WALInsertSlotAcquireOne(int slotno); +static void WALInsertSlotRelease(void); +static void WALInsertSlotReleaseOne(int slotno); /* * Insert an XLOG record having the specified RMID and info bytes, @@ -713,10 +831,6 @@ XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) { XLogCtlInsert *Insert = &XLogCtl->Insert; - XLogRecPtr RecPtr; - XLogRecPtr WriteRqst; - uint32 freespace; - int curridx; XLogRecData *rdt; XLogRecData *rdt_lastnormal; Buffer dtbuf[XLR_MAX_BKP_BLOCKS]; @@ -731,11 +845,13 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) uint32 len, write_len; unsigned i; - bool updrqst; bool doPageWrites; bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH); + bool inserted; uint8 info_orig = info; static XLogRecord *rechdr; + XLogRecPtr StartPos; + XLogRecPtr EndPos; if (rechdr == NULL) { @@ -761,8 +877,8 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) */ if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID) { - RecPtr = SizeOfXLogLongPHD; /* start of 1st chkpt record */ - return RecPtr; + EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */ + return EndPos; } /* @@ -770,9 +886,9 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) * up. * * We may have to loop back to here if a race condition is detected below. - * We could prevent the race by doing all this work while holding the - * insert lock, but it seems better to avoid doing CRC calculations while - * holding the lock. + * We could prevent the race by doing all this work while holding an + * insertion slot, but it seems better to avoid doing CRC calculations + * while holding one. * * We add entries for backup blocks to the chain, so that they don't need * any special treatment in the critical section where the chunks are @@ -789,8 +905,8 @@ begin:; /* * Decide if we need to do full-page writes in this XLOG record: true if * full_page_writes is on or we have a PITR request for it. Since we - * don't yet have the insert lock, fullPageWrites and forcePageWrites - * could change under us, but we'll recheck them once we have the lock. + * don't yet have an insertion slot, fullPageWrites and forcePageWrites + * could change under us, but we'll recheck them once we have a slot. */ doPageWrites = Insert->fullPageWrites || Insert->forcePageWrites; @@ -930,25 +1046,60 @@ begin:; COMP_CRC32(rdata_crc, rdt->data, rdt->len); /* - * Construct record header (prev-link and CRC are filled in later), and - * make that the first chunk in the chain. + * Construct record header (prev-link is filled in later, after reserving + * the space for the record), and make that the first chunk in the chain. + * + * The CRC calculated for the header here doesn't include prev-link, + * because we don't know it yet. It will be added later. */ rechdr->xl_xid = GetCurrentTransactionIdIfAny(); rechdr->xl_tot_len = SizeOfXLogRecord + write_len; rechdr->xl_len = len; /* doesn't include backup blocks */ rechdr->xl_info = info; rechdr->xl_rmid = rmid; + rechdr->xl_prev = InvalidXLogRecPtr; + COMP_CRC32(rdata_crc, ((char *) rechdr), offsetof(XLogRecord, xl_prev)); hdr_rdt.next = rdata; hdr_rdt.data = (char *) rechdr; hdr_rdt.len = SizeOfXLogRecord; - write_len += SizeOfXLogRecord; + /*---------- + * + * We have now done all the preparatory work we can without holding a + * lock or modifying shared state. From here on, inserting the new WAL + * record to the shared WAL buffer cache is a two-step process: + * + * 1. Reserve the right amount of space from the WAL. The current head of + * reserved space is kept in Insert->CurrBytePos, and is protected by + * insertpos_lck. + * + * 2. Copy the record to the reserved WAL space. This involves finding the + * correct WAL buffer containing the reserved space, and copying the + * record in place. This can be done concurrently in multiple processes. + * + * To keep track of which insertions are still in-progress, each concurrent + * inserter allocates an "insertion slot", which tells others how far the + * inserter has progressed. There is a small fixed number of insertion + * slots, determined by the num_xloginsert_slots GUC. When an inserter + * finishes, it updates the xlogInsertingAt of its slot to the end of the + * record it inserted, to let others know that it's done. xlogInsertingAt + * is also updated when crossing over to a new WAL buffer, to allow the + * the previous buffer to be flushed. + * + * Holding onto a slot also protects RedoRecPtr and fullPageWrites from + * changing until the insertion is finished. + * + * Step 2 can usually be done completely in parallel. If the required WAL + * page is not initialized yet, you have to grab WALBufMappingLock to + * initialize it, but the WAL writer tries to do that ahead of insertions + * to avoid that from happening in the critical path. + * + *---------- + */ START_CRIT_SECTION(); - - /* Now wait to get insert lock */ - LWLockAcquire(WALInsertLock, LW_EXCLUSIVE); + WALInsertSlotAcquire(isLogSwitch); /* * Check to see if my RedoRecPtr is out of date. If so, may have to go @@ -977,7 +1128,7 @@ begin:; * Oops, this buffer now needs to be backed up, but we * didn't think so above. Start over. */ - LWLockRelease(WALInsertLock); + WALInsertSlotRelease(); END_CRIT_SECTION(); rdt_lastnormal->next = NULL; info = info_orig; @@ -996,7 +1147,7 @@ begin:; if ((Insert->fullPageWrites || Insert->forcePageWrites) && !doPageWrites) { /* Oops, must redo it with full-page data. */ - LWLockRelease(WALInsertLock); + WALInsertSlotRelease(); END_CRIT_SECTION(); rdt_lastnormal->next = NULL; info = info_orig; @@ -1004,238 +1155,1169 @@ begin:; } /* - * If the current page is completely full, the record goes to the next - * page, right after the page header. + * Reserve space for the record in the WAL. This also sets the xl_prev + * pointer. + */ + if (isLogSwitch) + inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev); + else + { + ReserveXLogInsertLocation(write_len, &StartPos, &EndPos, + &rechdr->xl_prev); + inserted = true; + } + + if (inserted) + { + /* + * Now that xl_prev has been filled in, finish CRC calculation of the + * record header. + */ + COMP_CRC32(rdata_crc, ((char *) &rechdr->xl_prev), sizeof(XLogRecPtr)); + FIN_CRC32(rdata_crc); + rechdr->xl_crc = rdata_crc; + + /* + * All the record data, including the header, is now ready to be + * inserted. Copy the record in the space reserved. + */ + CopyXLogRecordToWAL(write_len, isLogSwitch, &hdr_rdt, StartPos, EndPos); + } + else + { + /* + * This was an xlog-switch record, but the current insert location was + * already exactly at the beginning of a segment, so there was no need + * to do anything. + */ + } + + /* + * Done! Let others know that we're finished. + */ + WALInsertSlotRelease(); + + END_CRIT_SECTION(); + + /* + * Update shared LogwrtRqst.Write, if we crossed page boundary. + */ + if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ) + { + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + + SpinLockAcquire(&xlogctl->info_lck); + /* advance global request to include new block(s) */ + if (xlogctl->LogwrtRqst.Write < EndPos) + xlogctl->LogwrtRqst.Write = EndPos; + /* update local result copy while I have the chance */ + LogwrtResult = xlogctl->LogwrtResult; + SpinLockRelease(&xlogctl->info_lck); + } + + /* + * If this was an XLOG_SWITCH record, flush the record and the empty + * padding space that fills the rest of the segment, and perform + * end-of-segment actions (eg, notifying archiver). + */ + if (isLogSwitch) + { + TRACE_POSTGRESQL_XLOG_SWITCH(); + XLogFlush(EndPos); + /* + * Even though we reserved the rest of the segment for us, which is + * reflected in EndPos, we return a pointer to just the end of the + * xlog-switch record. + */ + if (inserted) + { + EndPos = StartPos + SizeOfXLogRecord; + if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ) + { + if (EndPos % XLOG_SEG_SIZE == EndPos % XLOG_BLCKSZ) + EndPos += SizeOfXLogLongPHD; + else + EndPos += SizeOfXLogShortPHD; + } + } + } + +#ifdef WAL_DEBUG + if (XLOG_DEBUG) + { + StringInfoData buf; + + initStringInfo(&buf); + appendStringInfo(&buf, "INSERT @ %X/%X: ", + (uint32) (EndPos >> 32), (uint32) EndPos); + xlog_outrec(&buf, rechdr); + if (rdata->data != NULL) + { + appendStringInfo(&buf, " - "); + RmgrTable[rechdr->xl_rmid].rm_desc(&buf, rechdr->xl_info, rdata->data); + } + elog(LOG, "%s", buf.data); + pfree(buf.data); + } +#endif + + /* + * Update our global variables + */ + ProcLastRecPtr = StartPos; + XactLastRecEnd = EndPos; + + return EndPos; +} + +/* + * Reserves the right amount of space for a record of given size from the WAL. + * *StartPos is set to the beginning of the reserved section, *EndPos to + * its end+1. *PrevPtr is set to the beginning of the previous record; it is + * used to set the xl_prev of this record. + * + * This is the performance critical part of XLogInsert that must be serialized + * across backends. The rest can happen mostly in parallel. Try to keep this + * section as short as possible, insertpos_lck can be heavily contended on a + * busy system. + * + * NB: The space calculation here must match the code in CopyXLogRecordToWAL, + * where we actually copy the record to the reserved space. + */ +static void +ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos, + XLogRecPtr *PrevPtr) +{ + volatile XLogCtlInsert *Insert = &XLogCtl->Insert; + uint64 startbytepos; + uint64 endbytepos; + uint64 prevbytepos; + + size = MAXALIGN(size); + + /* All (non xlog-switch) records should contain data. */ + Assert(size > SizeOfXLogRecord); + + /* + * The duration the spinlock needs to be held is minimized by minimizing + * the calculations that have to be done while holding the lock. The + * current tip of reserved WAL is kept in CurrBytePos, as a byte position + * that only counts "usable" bytes in WAL, that is, it excludes all WAL + * page headers. The mapping between "usable" byte positions and physical + * positions (XLogRecPtrs) can be done outside the locked region, and + * because the usable byte position doesn't include any headers, reserving + * X bytes from WAL is almost as simple as "CurrBytePos += X". + */ + SpinLockAcquire(&Insert->insertpos_lck); + + startbytepos = Insert->CurrBytePos; + endbytepos = startbytepos + size; + prevbytepos = Insert->PrevBytePos; + Insert->CurrBytePos = endbytepos; + Insert->PrevBytePos = startbytepos; + + SpinLockRelease(&Insert->insertpos_lck); + + *StartPos = XLogBytePosToRecPtr(startbytepos); + *EndPos = XLogBytePosToEndRecPtr(endbytepos); + *PrevPtr = XLogBytePosToRecPtr(prevbytepos); + + /* + * Check that the conversions between "usable byte positions" and + * XLogRecPtrs work consistently in both directions. + */ + Assert(XLogRecPtrToBytePos(*StartPos) == startbytepos); + Assert(XLogRecPtrToBytePos(*EndPos) == endbytepos); + Assert(XLogRecPtrToBytePos(*PrevPtr) == prevbytepos); +} + +/* + * Like ReserveXLogInsertLocation(), but for an xlog-switch record. + * + * A log-switch record is handled slightly differently. The rest of the + * segment will be reserved for this insertion, as indicated by the returned + * *EndPos_p value. However, if we are already at the beginning of the current + * segment, *StartPos_p and *EndPos_p are set to the current location without + * reserving any space, and the function returns false. +*/ +static bool +ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr) +{ + volatile XLogCtlInsert *Insert = &XLogCtl->Insert; + uint64 startbytepos; + uint64 endbytepos; + uint64 prevbytepos; + uint32 size = SizeOfXLogRecord; + XLogRecPtr ptr; + uint32 segleft; + + /* + * These calculations are a bit heavy-weight to be done while holding a + * spinlock, but since we're holding all the WAL insertion slots, there + * are no other inserters competing for it. GetXLogInsertRecPtr() does + * compete for it, but that's not called very frequently. + */ + SpinLockAcquire(&Insert->insertpos_lck); + + startbytepos = Insert->CurrBytePos; + + ptr = XLogBytePosToEndRecPtr(startbytepos); + if (ptr % XLOG_SEG_SIZE == 0) + { + SpinLockRelease(&Insert->insertpos_lck); + *EndPos = *StartPos = ptr; + return false; + } + + endbytepos = startbytepos + size; + prevbytepos = Insert->PrevBytePos; + + *StartPos = XLogBytePosToRecPtr(startbytepos); + *EndPos = XLogBytePosToEndRecPtr(endbytepos); + + segleft = XLOG_SEG_SIZE - ((*EndPos) % XLOG_SEG_SIZE); + if (segleft != XLOG_SEG_SIZE) + { + /* consume the rest of the segment */ + *EndPos += segleft; + endbytepos = XLogRecPtrToBytePos(*EndPos); + } + Insert->CurrBytePos = endbytepos; + Insert->PrevBytePos = startbytepos; + + SpinLockRelease(&Insert->insertpos_lck); + + *PrevPtr = XLogBytePosToRecPtr(prevbytepos); + + Assert((*EndPos) % XLOG_SEG_SIZE == 0); + Assert(XLogRecPtrToBytePos(*EndPos) == endbytepos); + Assert(XLogRecPtrToBytePos(*StartPos) == startbytepos); + Assert(XLogRecPtrToBytePos(*PrevPtr) == prevbytepos); + + return true; +} + +/* + * Subroutine of XLogInsert. Copies a WAL record to an already-reserved + * area in the WAL. + */ +static void +CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata, + XLogRecPtr StartPos, XLogRecPtr EndPos) +{ + char *currpos; + int freespace; + int written; + XLogRecPtr CurrPos; + XLogPageHeader pagehdr; + + /* The first chunk is the record header */ + Assert(rdata->len == SizeOfXLogRecord); + + /* + * Get a pointer to the right place in the right WAL buffer to start + * inserting to. + */ + CurrPos = StartPos; + currpos = GetXLogBuffer(CurrPos); + freespace = INSERT_FREESPACE(CurrPos); + + /* + * there should be enough space for at least the first field (xl_tot_len) + * on this page. + */ + Assert(freespace >= sizeof(uint32)); + + /* Copy record data */ + written = 0; + while (rdata != NULL) + { + char *rdata_data = rdata->data; + int rdata_len = rdata->len; + + while (rdata_len > freespace) + { + /* + * Write what fits on this page, and continue on the next page. + */ + Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || freespace == 0); + memcpy(currpos, rdata_data, freespace); + rdata_data += freespace; + rdata_len -= freespace; + written += freespace; + CurrPos += freespace; + + /* + * Get pointer to beginning of next page, and set the xlp_rem_len + * in the page header. Set XLP_FIRST_IS_CONTRECORD. + * + * It's safe to set the contrecord flag and xlp_rem_len without a + * lock on the page. All the other flags were already set when the + * page was initialized, in AdvanceXLInsertBuffer, and we're the + * only backend that needs to set the contrecord flag. + */ + currpos = GetXLogBuffer(CurrPos); + pagehdr = (XLogPageHeader) currpos; + pagehdr->xlp_rem_len = write_len - written; + pagehdr->xlp_info |= XLP_FIRST_IS_CONTRECORD; + + /* skip over the page header */ + if (CurrPos % XLogSegSize == 0) + { + CurrPos += SizeOfXLogLongPHD; + currpos += SizeOfXLogLongPHD; + } + else + { + CurrPos += SizeOfXLogShortPHD; + currpos += SizeOfXLogShortPHD; + } + freespace = INSERT_FREESPACE(CurrPos); + } + + Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || rdata_len == 0); + memcpy(currpos, rdata_data, rdata_len); + currpos += rdata_len; + CurrPos += rdata_len; + freespace -= rdata_len; + written += rdata_len; + + rdata = rdata->next; + } + Assert(written == write_len); + + /* Align the end position, so that the next record starts aligned */ + CurrPos = MAXALIGN(CurrPos); + + /* + * If this was an xlog-switch, it's not enough to write the switch record, + * we also have to consume all the remaining space in the WAL segment. + * We have already reserved it for us, but we still need to make sure it's + * allocated and zeroed in the WAL buffers so that when the caller (or + * someone else) does XLogWrite(), it can really write out all the zeros. + */ + if (isLogSwitch && CurrPos % XLOG_SEG_SIZE != 0) + { + /* An xlog-switch record doesn't contain any data besides the header */ + Assert(write_len == SizeOfXLogRecord); + + /* + * We do this one page at a time, to make sure we don't deadlock + * against ourselves if wal_buffers < XLOG_SEG_SIZE. + */ + Assert(EndPos % XLogSegSize == 0); + + /* Use up all the remaining space on the first page */ + CurrPos += freespace; + + while (CurrPos < EndPos) + { + /* initialize the next page (if not initialized already) */ + WakeupWaiters(CurrPos); + AdvanceXLInsertBuffer(CurrPos, false); + CurrPos += XLOG_BLCKSZ; + } + } + + if (CurrPos != EndPos) + elog(PANIC, "space reserved for WAL record does not match what was written"); +} + +/* + * Allocate a slot for insertion. + * + * In exclusive mode, all slots are reserved for the current process. That + * blocks all concurrent insertions. + */ +static void +WALInsertSlotAcquire(bool exclusive) +{ + int i; + + if (exclusive) + { + for (i = 0; i < num_xloginsert_slots; i++) + WALInsertSlotAcquireOne(i); + holdingAllSlots = true; + } + else + WALInsertSlotAcquireOne(-1); +} + +/* + * Workhorse of WALInsertSlotAcquire. Acquires the given slot, or an arbitrary + * one if slotno == -1. The index of the slot that was acquired is stored in + * MySlotNo. + * + * This is more or less equivalent to LWLockAcquire(). + */ +static void +WALInsertSlotAcquireOne(int slotno) +{ + volatile XLogInsertSlot *slot; + PGPROC *proc = MyProc; + bool retry = false; + int extraWaits = 0; + static int slotToTry = -1; + + /* + * Try to use the slot we used last time. If the system isn't particularly + * busy, it's a good bet that it's available, and it's good to have some + * affinity to a particular slot so that you don't unnecessarily bounce + * cache lines between processes when there is no contention. + * + * If this is the first time through in this backend, pick a slot + * (semi-)randomly. This allows the slots to be used evenly if you have a + * lot of very short connections. + */ + if (slotno != -1) + MySlotNo = slotno; + else + { + if (slotToTry == -1) + slotToTry = MyProc->pgprocno % num_xloginsert_slots; + MySlotNo = slotToTry; + } + + /* + * We can't wait if we haven't got a PGPROC. This should only occur + * during bootstrap or shared memory initialization. Put an Assert here + * to catch unsafe coding practices. + */ + Assert(MyProc != NULL); + + /* + * Lock out cancel/die interrupts until we exit the code section protected + * by the slot. This ensures that interrupts will not interfere with + * manipulations of data structures in shared memory. + */ + START_CRIT_SECTION(); + + /* + * Loop here to try to acquire slot after each time we are signaled by + * WALInsertSlotRelease. + */ + for (;;) + { + bool mustwait; + + slot = &XLogCtl->Insert.insertSlots[MySlotNo].slot; + + /* Acquire mutex. Time spent holding mutex should be short! */ + SpinLockAcquire(&slot->mutex); + + /* If retrying, allow WALInsertSlotRelease to release waiters again */ + if (retry) + slot->releaseOK = true; + + /* If I can get the slot, do so quickly. */ + if (slot->exclusive == 0) + { + slot->exclusive++; + mustwait = false; + } + else + mustwait = true; + + if (!mustwait) + break; /* got the lock */ + + Assert(slot->owner != MyProc); + + /* + * Add myself to wait queue. + */ + proc->lwWaiting = true; + proc->lwWaitMode = LW_EXCLUSIVE; + proc->lwWaitLink = NULL; + if (slot->head == NULL) + slot->head = proc; + else + slot->tail->lwWaitLink = proc; + slot->tail = proc; + + /* Can release the mutex now */ + SpinLockRelease(&slot->mutex); + + /* + * Wait until awakened. + * + * Since we share the process wait semaphore with the regular lock + * manager and ProcWaitForSignal, and we may need to acquire a slot + * while one of those is pending, it is possible that we get awakened + * for a reason other than being signaled by WALInsertSlotRelease. If + * so, loop back and wait again. Once we've gotten the slot, + * re-increment the sema by the number of additional signals received, + * so that the lock manager or signal manager will see the received + * signal when it next waits. + */ + for (;;) + { + /* "false" means cannot accept cancel/die interrupt here. */ + PGSemaphoreLock(&proc->sem, false); + if (!proc->lwWaiting) + break; + extraWaits++; + } + + /* Now loop back and try to acquire lock again. */ + retry = true; + } + + slot->owner = proc; + + /* + * Normally, we initialize the xlogInsertingAt value of the slot to 1, + * because we don't yet know where in the WAL we're going to insert. It's + * not critical what it points to right now - leaving it to a too small + * value just means that WaitXlogInsertionsToFinish() might wait on us + * unnecessarily, until we update the value (when we finish the insert or + * move to next page). + * + * If we're grabbing all the slots, however, stamp all but the last one + * with InvalidXLogRecPtr, meaning there is no insert in progress. The last + * slot is the one that we will update as we proceed with the insert, the + * rest are held just to keep off other inserters. + */ + if (slotno != -1 && slotno != num_xloginsert_slots - 1) + slot->xlogInsertingAt = InvalidXLogRecPtr; + else + slot->xlogInsertingAt = 1; + + /* We are done updating shared state of the slot itself. */ + SpinLockRelease(&slot->mutex); + + /* + * Fix the process wait semaphore's count for any absorbed wakeups. + */ + while (extraWaits-- > 0) + PGSemaphoreUnlock(&proc->sem); + + /* + * If we couldn't get the slot immediately, try another slot next time. + * On a system with more insertion slots than concurrent inserters, this + * causes all the inserters to eventually migrate to a slot that no-one + * else is using. On a system with more inserters than slots, it still + * causes the inserters to be distributed quite evenly across the slots. + */ + if (slotno != -1 && retry) + slotToTry = (slotToTry + 1) % num_xloginsert_slots; +} + +/* + * Wait for the given slot to become free, or for its xlogInsertingAt location + * to change to something else than 'waitptr'. In other words, wait for the + * inserter using the given slot to finish its insertion, or to at least make + * some progress. + */ +static void +WaitOnSlot(volatile XLogInsertSlot *slot, XLogRecPtr waitptr) +{ + PGPROC *proc = MyProc; + int extraWaits = 0; + + /* + * Lock out cancel/die interrupts while we sleep on the slot. There is + * no cleanup mechanism to remove us from the wait queue if we got + * interrupted. + */ + HOLD_INTERRUPTS(); + + /* + * Loop here to try to acquire lock after each time we are signaled. + */ + for (;;) + { + bool mustwait; + + /* Acquire mutex. Time spent holding mutex should be short! */ + SpinLockAcquire(&slot->mutex); + + /* If I can get the lock, do so quickly. */ + if (slot->exclusive == 0 || slot->xlogInsertingAt != waitptr) + mustwait = false; + else + mustwait = true; + + if (!mustwait) + break; /* the lock was free */ + + Assert(slot->owner != MyProc); + + /* + * Add myself to wait queue. + */ + proc->lwWaiting = true; + proc->lwWaitMode = LW_WAIT_UNTIL_FREE; + proc->lwWaitLink = NULL; + + /* waiters are added to the front of the queue */ + proc->lwWaitLink = slot->head; + if (slot->head == NULL) + slot->tail = proc; + slot->head = proc; + + /* Can release the mutex now */ + SpinLockRelease(&slot->mutex); + + /* + * Wait until awakened. + * + * Since we share the process wait semaphore with other things, like + * the regular lock manager and ProcWaitForSignal, and we may need to + * acquire an LWLock while one of those is pending, it is possible that + * we get awakened for a reason other than being signaled by + * LWLockRelease. If so, loop back and wait again. Once we've gotten + * the LWLock, re-increment the sema by the number of additional + * signals received, so that the lock manager or signal manager will + * see the received signal when it next waits. + */ + for (;;) + { + /* "false" means cannot accept cancel/die interrupt here. */ + PGSemaphoreLock(&proc->sem, false); + if (!proc->lwWaiting) + break; + extraWaits++; + } + + /* Now loop back and try to acquire lock again. */ + } + + /* We are done updating shared state of the lock itself. */ + SpinLockRelease(&slot->mutex); + + /* + * Fix the process wait semaphore's count for any absorbed wakeups. + */ + while (extraWaits-- > 0) + PGSemaphoreUnlock(&proc->sem); + + /* + * Now okay to allow cancel/die interrupts. + */ + RESUME_INTERRUPTS(); +} + +/* + * Wake up all processes waiting for us with WaitOnSlot(). Sets our + * xlogInsertingAt value to EndPos, without releasing the slot. + */ +static void +WakeupWaiters(XLogRecPtr EndPos) +{ + volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[MySlotNo].slot; + PGPROC *head; + PGPROC *proc; + PGPROC *next; + + /* + * If we have already reported progress up to the same point, do nothing. + * No other process can modify xlogInsertingAt, so we can check this before + * grabbing the spinlock. + */ + if (slot->xlogInsertingAt == EndPos) + return; + /* xlogInsertingAt should not go backwards */ + Assert(slot->xlogInsertingAt < EndPos); + + /* Acquire mutex. Time spent holding mutex should be short! */ + SpinLockAcquire(&slot->mutex); + + /* we should own the slot */ + Assert(slot->exclusive == 1 && slot->owner == MyProc); + + slot->xlogInsertingAt = EndPos; + + /* + * See if there are any waiters that need to be woken up. + */ + head = slot->head; + + if (head != NULL) + { + proc = head; + + /* LW_WAIT_UNTIL_FREE waiters are always in the front of the queue */ + next = proc->lwWaitLink; + while (next && next->lwWaitMode == LW_WAIT_UNTIL_FREE) + { + proc = next; + next = next->lwWaitLink; + } + + /* proc is now the last PGPROC to be released */ + slot->head = next; + proc->lwWaitLink = NULL; + } + + /* We are done updating shared state of the lock itself. */ + SpinLockRelease(&slot->mutex); + + /* + * Awaken any waiters I removed from the queue. + */ + while (head != NULL) + { + proc = head; + head = proc->lwWaitLink; + proc->lwWaitLink = NULL; + proc->lwWaiting = false; + PGSemaphoreUnlock(&proc->sem); + } +} + +/* + * Release our insertion slot (or slots, if we're holding them all). + */ +static void +WALInsertSlotRelease(void) +{ + int i; + + if (holdingAllSlots) + { + for (i = 0; i < num_xloginsert_slots; i++) + WALInsertSlotReleaseOne(i); + holdingAllSlots = false; + } + else + WALInsertSlotReleaseOne(MySlotNo); +} + +static void +WALInsertSlotReleaseOne(int slotno) +{ + volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[slotno].slot; + PGPROC *head; + PGPROC *proc; + + /* Acquire mutex. Time spent holding mutex should be short! */ + SpinLockAcquire(&slot->mutex); + + /* we must be holding it */ + Assert(slot->exclusive == 1 && slot->owner == MyProc); + + slot->xlogInsertingAt = InvalidXLogRecPtr; + + /* Release my hold on the slot */ + slot->exclusive = 0; + slot->owner = NULL; + + /* + * See if I need to awaken any waiters.. + */ + head = slot->head; + if (head != NULL) + { + if (slot->releaseOK) + { + /* + * Remove the to-be-awakened PGPROCs from the queue. + */ + bool releaseOK = true; + + proc = head; + + /* + * First wake up any backends that want to be woken up without + * acquiring the lock. These are always in the front of the queue. + */ + while (proc->lwWaitMode == LW_WAIT_UNTIL_FREE && proc->lwWaitLink) + proc = proc->lwWaitLink; + + /* + * Awaken the first exclusive-waiter, if any. + */ + if (proc->lwWaitLink) + { + Assert(proc->lwWaitLink->lwWaitMode == LW_EXCLUSIVE); + proc = proc->lwWaitLink; + releaseOK = false; + } + /* proc is now the last PGPROC to be released */ + slot->head = proc->lwWaitLink; + proc->lwWaitLink = NULL; + + slot->releaseOK = releaseOK; + } + else + head = NULL; + } + + /* We are done updating shared state of the slot itself. */ + SpinLockRelease(&slot->mutex); + + /* + * Awaken any waiters I removed from the queue. + */ + while (head != NULL) + { + proc = head; + head = proc->lwWaitLink; + proc->lwWaitLink = NULL; + proc->lwWaiting = false; + PGSemaphoreUnlock(&proc->sem); + } + + /* + * Now okay to allow cancel/die interrupts. + */ + END_CRIT_SECTION(); +} + + +/* + * Wait for any WAL insertions < upto to finish. + * + * Returns the location of the oldest insertion that is still in-progress. + * Any WAL prior to that point has been fully copied into WAL buffers, and + * can be flushed out to disk. Because this waits for any insertions older + * than 'upto' to finish, the return value is always >= 'upto'. + * + * Note: When you are about to write out WAL, you must call this function + * *before* acquiring WALWriteLock, to avoid deadlocks. This function might + * need to wait for an insertion to finish (or at least advance to next + * uninitialized page), and the inserter might need to evict an old WAL buffer + * to make room for a new one, which in turn requires WALWriteLock. + */ +static XLogRecPtr +WaitXLogInsertionsToFinish(XLogRecPtr upto) +{ + uint64 bytepos; + XLogRecPtr reservedUpto; + XLogRecPtr finishedUpto; + volatile XLogCtlInsert *Insert = &XLogCtl->Insert; + int i; + + if (MyProc == NULL) + elog(PANIC, "cannot wait without a PGPROC structure"); + + /* Read the current insert position */ + SpinLockAcquire(&Insert->insertpos_lck); + bytepos = Insert->CurrBytePos; + SpinLockRelease(&Insert->insertpos_lck); + reservedUpto = XLogBytePosToEndRecPtr(bytepos); + + /* + * No-one should request to flush a piece of WAL that hasn't even been + * reserved yet. However, it can happen if there is a block with a bogus + * LSN on disk, for example. XLogFlush checks for that situation and + * complains, but only after the flush. Here we just assume that to mean + * that all WAL that has been reserved needs to be finished. In this + * corner-case, the return value can be smaller than 'upto' argument. + */ + if (upto > reservedUpto) + { + elog(LOG, "request to flush past end of generated WAL; request %X/%X, currpos %X/%X", + (uint32) (upto >> 32), (uint32) upto, + (uint32) (reservedUpto >> 32), (uint32) reservedUpto); + upto = reservedUpto; + } + + /* + * finishedUpto is our return value, indicating the point upto which + * all the WAL insertions have been finished. Initialize it to the head + * of reserved WAL, and as we iterate through the insertion slots, back it + * out for any insertion that's still in progress. + */ + finishedUpto = reservedUpto; + + /* + * Loop through all the slots, sleeping on any in-progress insert older + * than 'upto'. + */ + for (i = 0; i < num_xloginsert_slots; i++) + { + volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[i].slot; + XLogRecPtr insertingat; + + retry: + /* + * We can check if the slot is in use without grabbing the spinlock. + * The spinlock acquisition of insertpos_lck before this loop acts + * as a memory barrier. If someone acquires the slot after that, it + * can't possibly be inserting to anything < reservedUpto. If it was + * acquired before that, an unlocked test will return true. + */ + if (!slot->exclusive) + continue; + + SpinLockAcquire(&slot->mutex); + /* re-check now that we have the lock */ + if (!slot->exclusive) + { + SpinLockRelease(&slot->mutex); + continue; + } + insertingat = slot->xlogInsertingAt; + SpinLockRelease(&slot->mutex); + + if (insertingat == InvalidXLogRecPtr) + { + /* + * slot is reserved just to hold off other inserters, there is no + * actual insert in progress. + */ + continue; + } + + /* + * This insertion is still in progress. Do we need to wait for it? + * + * When an inserter acquires a slot, it doesn't reset 'insertingat', so + * it will initially point to the old value of some already-finished + * insertion. The inserter will update the value as soon as it finishes + * the insertion, moves to the next page, or has to do I/O to flush an + * old dirty buffer. That means that when we see a slot with + * insertingat value < upto, we don't know if that insertion is still + * truly in progress, or if the slot is reused by a new inserter that + * hasn't updated the insertingat value yet. We have to assume it's the + * latter, and wait. + */ + if (insertingat < upto) + { + WaitOnSlot(slot, insertingat); + goto retry; + } + else + { + /* + * We don't need to wait for this insertion, but update the + * return value. + */ + if (insertingat < finishedUpto) + finishedUpto = insertingat; + } + } + return finishedUpto; +} + +/* + * Get a pointer to the right location in the WAL buffer containing the + * given XLogRecPtr. + * + * If the page is not initialized yet, it is initialized. That might require + * evicting an old dirty buffer from the buffer cache, which means I/O. + * + * The caller must ensure that the page containing the requested location + * isn't evicted yet, and won't be evicted. The way to ensure that is to + * hold onto an XLogInsertSlot with the xlogInsertingAt position set to + * something <= ptr. GetXLogBuffer() will update xlogInsertingAt if it needs + * to evict an old page from the buffer. (This means that once you call + * GetXLogBuffer() with a given 'ptr', you must not access anything before + * that point anymore, and must not call GetXLogBuffer() with an older 'ptr' + * later, because older buffers might be recycled already) + */ +static char * +GetXLogBuffer(XLogRecPtr ptr) +{ + int idx; + XLogRecPtr endptr; + static uint64 cachedPage = 0; + static char *cachedPos = NULL; + XLogRecPtr expectedEndPtr; + + /* + * Fast path for the common case that we need to access again the same + * page as last time. */ - updrqst = false; - freespace = INSERT_FREESPACE(Insert); - if (freespace == 0) + if (ptr / XLOG_BLCKSZ == cachedPage) { - updrqst = AdvanceXLInsertBuffer(false); - freespace = INSERT_FREESPACE(Insert); + Assert(((XLogPageHeader) cachedPos)->xlp_magic == XLOG_PAGE_MAGIC); + Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr == ptr - (ptr % XLOG_BLCKSZ)); + return cachedPos + ptr % XLOG_BLCKSZ; } - /* Compute record's XLOG location */ - curridx = Insert->curridx; - INSERT_RECPTR(RecPtr, Insert, curridx); + /* + * The XLog buffer cache is organized so that a page is always loaded + * to a particular buffer. That way we can easily calculate the buffer + * a given page must be loaded into, from the XLogRecPtr alone. + */ + idx = XLogRecPtrToBufIdx(ptr); /* - * If the record is an XLOG_SWITCH, and we are exactly at the start of a - * segment, we need not insert it (and don't want to because we'd like - * consecutive switch requests to be no-ops). Instead, make sure - * everything is written and flushed through the end of the prior segment, - * and return the prior segment's end address. + * See what page is loaded in the buffer at the moment. It could be the + * page we're looking for, or something older. It can't be anything newer + * - that would imply the page we're looking for has already been written + * out to disk and evicted, and the caller is responsible for making sure + * that doesn't happen. + * + * However, we don't hold a lock while we read the value. If someone has + * just initialized the page, it's possible that we get a "torn read" of + * the XLogRecPtr if 64-bit fetches are not atomic on this platform. In + * that case we will see a bogus value. That's ok, we'll grab the mapping + * lock (in AdvanceXLInsertBuffer) and retry if we see anything else than + * the page we're looking for. But it means that when we do this unlocked + * read, we might see a value that appears to be ahead of the page we're + * looking for. Don't PANIC on that, until we've verified the value while + * holding the lock. */ - if (isLogSwitch && (RecPtr % XLogSegSize) == SizeOfXLogLongPHD) + expectedEndPtr = ptr; + expectedEndPtr += XLOG_BLCKSZ - ptr % XLOG_BLCKSZ; + + endptr = XLogCtl->xlblocks[idx]; + if (expectedEndPtr != endptr) { - /* We can release insert lock immediately */ - LWLockRelease(WALInsertLock); + /* + * Let others know that we're finished inserting the record up + * to the page boundary. + */ + WakeupWaiters(expectedEndPtr - XLOG_BLCKSZ); - RecPtr -= SizeOfXLogLongPHD; + AdvanceXLInsertBuffer(ptr, false); + endptr = XLogCtl->xlblocks[idx]; - LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); - LogwrtResult = XLogCtl->LogwrtResult; - if (LogwrtResult.Flush < RecPtr) - { - XLogwrtRqst FlushRqst; + if (expectedEndPtr != endptr) + elog(PANIC, "could not find WAL buffer for %X/%X", + (uint32) (ptr >> 32) , (uint32) ptr); + } + else + { + /* + * Make sure the initialization of the page is visible to us, and + * won't arrive later to overwrite the WAL data we write on the page. + */ + pg_memory_barrier(); + } - FlushRqst.Write = RecPtr; - FlushRqst.Flush = RecPtr; - XLogWrite(FlushRqst, false, false); - } - LWLockRelease(WALWriteLock); + /* + * Found the buffer holding this page. Return a pointer to the right + * offset within the page. + */ + cachedPage = ptr / XLOG_BLCKSZ; + cachedPos = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ; - END_CRIT_SECTION(); + Assert(((XLogPageHeader) cachedPos)->xlp_magic == XLOG_PAGE_MAGIC); + Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr == ptr - (ptr % XLOG_BLCKSZ)); - /* wake up walsenders now that we've released heavily contended locks */ - WalSndWakeupProcessRequests(); - return RecPtr; - } + return cachedPos + ptr % XLOG_BLCKSZ; +} - /* Finish the record header */ - rechdr->xl_prev = Insert->PrevRecord; +/* + * Converts a "usable byte position" to XLogRecPtr. A usable byte position + * is the position starting from the beginning of WAL, excluding all WAL + * page headers. + */ +static XLogRecPtr +XLogBytePosToRecPtr(uint64 bytepos) +{ + uint64 fullsegs; + uint64 fullpages; + uint64 bytesleft; + uint32 seg_offset; + XLogRecPtr result; - /* Now we can finish computing the record's CRC */ - COMP_CRC32(rdata_crc, (char *) rechdr, offsetof(XLogRecord, xl_crc)); - FIN_CRC32(rdata_crc); - rechdr->xl_crc = rdata_crc; + fullsegs = bytepos / UsableBytesInSegment; + bytesleft = bytepos % UsableBytesInSegment; -#ifdef WAL_DEBUG - if (XLOG_DEBUG) + if (bytesleft < XLOG_BLCKSZ - SizeOfXLogLongPHD) { - StringInfoData buf; - - initStringInfo(&buf); - appendStringInfo(&buf, "INSERT @ %X/%X: ", - (uint32) (RecPtr >> 32), (uint32) RecPtr); - xlog_outrec(&buf, rechdr); - if (rdata->data != NULL) - { - appendStringInfo(&buf, " - "); - RmgrTable[rechdr->xl_rmid].rm_desc(&buf, rechdr->xl_info, rdata->data); - } - elog(LOG, "%s", buf.data); - pfree(buf.data); + /* fits on first page of segment */ + seg_offset = bytesleft + SizeOfXLogLongPHD; } -#endif - - /* Record begin of record in appropriate places */ - ProcLastRecPtr = RecPtr; - Insert->PrevRecord = RecPtr; - - /* - * Append the data, including backup blocks if any - */ - rdata = &hdr_rdt; - while (write_len) + else { - while (rdata->data == NULL) - rdata = rdata->next; + /* account for the first page on segment with long header */ + seg_offset = XLOG_BLCKSZ; + bytesleft -= XLOG_BLCKSZ - SizeOfXLogLongPHD; - if (freespace > 0) - { - if (rdata->len > freespace) - { - memcpy(Insert->currpos, rdata->data, freespace); - rdata->data += freespace; - rdata->len -= freespace; - write_len -= freespace; - } - else - { - memcpy(Insert->currpos, rdata->data, rdata->len); - freespace -= rdata->len; - write_len -= rdata->len; - Insert->currpos += rdata->len; - rdata = rdata->next; - continue; - } - } + fullpages = bytesleft / UsableBytesInPage; + bytesleft = bytesleft % UsableBytesInPage; - /* Use next buffer */ - updrqst = AdvanceXLInsertBuffer(false); - curridx = Insert->curridx; - /* Mark page header to indicate this record continues on the page */ - Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD; - Insert->currpage->xlp_rem_len = write_len; - freespace = INSERT_FREESPACE(Insert); + seg_offset += fullpages * XLOG_BLCKSZ + bytesleft + SizeOfXLogShortPHD; } - /* Ensure next record will be properly aligned */ - Insert->currpos = (char *) Insert->currpage + - MAXALIGN(Insert->currpos - (char *) Insert->currpage); - freespace = INSERT_FREESPACE(Insert); + XLogSegNoOffsetToRecPtr(fullsegs, seg_offset, result); - /* - * The recptr I return is the beginning of the *next* record. This will be - * stored as LSN for changed data pages... - */ - INSERT_RECPTR(RecPtr, Insert, curridx); + return result; +} - /* - * If the record is an XLOG_SWITCH, we must now write and flush all the - * existing data, and then forcibly advance to the start of the next - * segment. It's not good to do this I/O while holding the insert lock, - * but there seems too much risk of confusion if we try to release the - * lock sooner. Fortunately xlog switch needn't be a high-performance - * operation anyway... - */ - if (isLogSwitch) +/* + * Like XLogBytePosToRecPtr, but if the position is at a page boundary, + * returns a pointer to the beginning of the page (ie. before page header), + * not to where the first xlog record on that page would go to. This is used + * when converting a pointer to the end of a record. + */ +static XLogRecPtr +XLogBytePosToEndRecPtr(uint64 bytepos) +{ + uint64 fullsegs; + uint64 fullpages; + uint64 bytesleft; + uint32 seg_offset; + XLogRecPtr result; + + fullsegs = bytepos / UsableBytesInSegment; + bytesleft = bytepos % UsableBytesInSegment; + + if (bytesleft < XLOG_BLCKSZ - SizeOfXLogLongPHD) + { + /* fits on first page of segment */ + if (bytesleft == 0) + seg_offset = 0; + else + seg_offset = bytesleft + SizeOfXLogLongPHD; + } + else { - XLogwrtRqst FlushRqst; - XLogRecPtr OldSegEnd; + /* account for the first page on segment with long header */ + seg_offset = XLOG_BLCKSZ; + bytesleft -= XLOG_BLCKSZ - SizeOfXLogLongPHD; - TRACE_POSTGRESQL_XLOG_SWITCH(); + fullpages = bytesleft / UsableBytesInPage; + bytesleft = bytesleft % UsableBytesInPage; - LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); + if (bytesleft == 0) + seg_offset += fullpages * XLOG_BLCKSZ + bytesleft; + else + seg_offset += fullpages * XLOG_BLCKSZ + bytesleft + SizeOfXLogShortPHD; + } - /* - * Flush through the end of the page containing XLOG_SWITCH, and - * perform end-of-segment actions (eg, notifying archiver). - */ - WriteRqst = XLogCtl->xlblocks[curridx]; - FlushRqst.Write = WriteRqst; - FlushRqst.Flush = WriteRqst; - XLogWrite(FlushRqst, false, true); + XLogSegNoOffsetToRecPtr(fullsegs, seg_offset, result); - /* Set up the next buffer as first page of next segment */ - /* Note: AdvanceXLInsertBuffer cannot need to do I/O here */ - (void) AdvanceXLInsertBuffer(true); + return result; +} - /* There should be no unwritten data */ - curridx = Insert->curridx; - Assert(curridx == XLogCtl->Write.curridx); +/* + * Convert an XLogRecPtr to a "usable byte position". + */ +static uint64 +XLogRecPtrToBytePos(XLogRecPtr ptr) +{ + uint64 fullsegs; + uint32 fullpages; + uint32 offset; + uint64 result; - /* Compute end address of old segment */ - OldSegEnd = XLogCtl->xlblocks[curridx]; - OldSegEnd -= XLOG_BLCKSZ; + XLByteToSeg(ptr, fullsegs); - /* Make it look like we've written and synced all of old segment */ - LogwrtResult.Write = OldSegEnd; - LogwrtResult.Flush = OldSegEnd; + fullpages = (ptr % XLOG_SEG_SIZE) / XLOG_BLCKSZ; + offset = ptr % XLOG_BLCKSZ; - /* - * Update shared-memory status --- this code should match XLogWrite - */ + if (fullpages == 0) + { + result = fullsegs * UsableBytesInSegment; + if (offset > 0) { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - - SpinLockAcquire(&xlogctl->info_lck); - xlogctl->LogwrtResult = LogwrtResult; - if (xlogctl->LogwrtRqst.Write < LogwrtResult.Write) - xlogctl->LogwrtRqst.Write = LogwrtResult.Write; - if (xlogctl->LogwrtRqst.Flush < LogwrtResult.Flush) - xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush; - SpinLockRelease(&xlogctl->info_lck); + Assert(offset >= SizeOfXLogLongPHD); + result += offset - SizeOfXLogLongPHD; } - - LWLockRelease(WALWriteLock); - - updrqst = false; /* done already */ } else { - /* normal case, ie not xlog switch */ - - /* Need to update shared LogwrtRqst if some block was filled up */ - if (freespace == 0) - { - /* curridx is filled and available for writing out */ - updrqst = true; - } - else + result = fullsegs * UsableBytesInSegment + + (XLOG_BLCKSZ - SizeOfXLogLongPHD) + /* account for first page */ + (fullpages - 1) * UsableBytesInPage; /* full pages */ + if (offset > 0) { - /* if updrqst already set, write through end of previous buf */ - curridx = PrevBufIdx(curridx); + Assert(offset >= SizeOfXLogShortPHD); + result += offset - SizeOfXLogShortPHD; } - WriteRqst = XLogCtl->xlblocks[curridx]; - } - - LWLockRelease(WALInsertLock); - - if (updrqst) - { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - - SpinLockAcquire(&xlogctl->info_lck); - /* advance global request to include new block(s) */ - if (xlogctl->LogwrtRqst.Write < WriteRqst) - xlogctl->LogwrtRqst.Write = WriteRqst; - /* update local result copy while I have the chance */ - LogwrtResult = xlogctl->LogwrtResult; - SpinLockRelease(&xlogctl->info_lck); } - XactLastRecEnd = RecPtr; - - END_CRIT_SECTION(); - - /* wake up walsenders now that we've released heavily contended locks */ - WalSndWakeupProcessRequests(); - - return RecPtr; + return result; } /* @@ -1303,158 +2385,181 @@ XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock, } /* - * Advance the Insert state to the next buffer page, writing out the next - * buffer if it still contains unwritten data. - * - * If new_segment is TRUE then we set up the next buffer page as the first - * page of the next xlog segment file, possibly but not usually the next - * consecutive file page. - * - * The global LogwrtRqst.Write pointer needs to be advanced to include the - * just-filled page. If we can do this for free (without an extra lock), - * we do so here. Otherwise the caller must do it. We return TRUE if the - * request update still needs to be done, FALSE if we did it internally. - * - * Must be called with WALInsertLock held. + * Initialize XLOG buffers, writing out old buffers if they still contain + * unwritten data, upto the page containing 'upto'. Or if 'opportunistic' is + * true, initialize as many pages as we can without having to write out + * unwritten data. Any new pages are initialized to zeros, with pages headers + * initialized properly. */ -static bool -AdvanceXLInsertBuffer(bool new_segment) +static void +AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic) { XLogCtlInsert *Insert = &XLogCtl->Insert; - int nextidx = NextBufIdx(Insert->curridx); - bool update_needed = true; + int nextidx; XLogRecPtr OldPageRqstPtr; XLogwrtRqst WriteRqst; - XLogRecPtr NewPageEndPtr; + XLogRecPtr NewPageEndPtr = InvalidXLogRecPtr; XLogRecPtr NewPageBeginPtr; XLogPageHeader NewPage; + int npages = 0; + + LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE); /* - * Get ending-offset of the buffer page we need to replace (this may be - * zero if the buffer hasn't been used yet). Fall through if it's already - * written out. + * Now that we have the lock, check if someone initialized the page + * already. */ - OldPageRqstPtr = XLogCtl->xlblocks[nextidx]; - if (LogwrtResult.Write < OldPageRqstPtr) + while (upto >= XLogCtl->xlblocks[XLogCtl->curridx] || opportunistic) { - /* nope, got work to do... */ - XLogRecPtr FinishedPageRqstPtr; - - FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx]; - - /* Before waiting, get info_lck and update LogwrtResult */ - { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - - SpinLockAcquire(&xlogctl->info_lck); - if (xlogctl->LogwrtRqst.Write < FinishedPageRqstPtr) - xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr; - LogwrtResult = xlogctl->LogwrtResult; - SpinLockRelease(&xlogctl->info_lck); - } - - update_needed = false; /* Did the shared-request update */ + nextidx = NextBufIdx(XLogCtl->curridx); /* - * Now that we have an up-to-date LogwrtResult value, see if we still - * need to write it or if someone else already did. + * Get ending-offset of the buffer page we need to replace (this may + * be zero if the buffer hasn't been used yet). Fall through if it's + * already written out. */ + OldPageRqstPtr = XLogCtl->xlblocks[nextidx]; if (LogwrtResult.Write < OldPageRqstPtr) { - /* Must acquire write lock */ - LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); - LogwrtResult = XLogCtl->LogwrtResult; - if (LogwrtResult.Write >= OldPageRqstPtr) + /* + * Nope, got work to do. If we just want to pre-initialize as much + * as we can without flushing, give up now. + */ + if (opportunistic) + break; + + /* Before waiting, get info_lck and update LogwrtResult */ { - /* OK, someone wrote it already */ - LWLockRelease(WALWriteLock); + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + + SpinLockAcquire(&xlogctl->info_lck); + if (xlogctl->LogwrtRqst.Write < OldPageRqstPtr) + xlogctl->LogwrtRqst.Write = OldPageRqstPtr; + LogwrtResult = xlogctl->LogwrtResult; + SpinLockRelease(&xlogctl->info_lck); } - else + + /* + * Now that we have an up-to-date LogwrtResult value, see if we + * still need to write it or if someone else already did. + */ + if (LogwrtResult.Write < OldPageRqstPtr) { /* - * Have to write buffers while holding insert lock. This is - * not good, so only write as much as we absolutely must. + * Must acquire write lock. Release WALBufMappingLock first, + * to make sure that all insertions that we need to wait for + * can finish (up to this same position). Otherwise we risk + * deadlock. */ - TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START(); - WriteRqst.Write = OldPageRqstPtr; - WriteRqst.Flush = 0; - XLogWrite(WriteRqst, false, false); - LWLockRelease(WALWriteLock); - TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE(); + LWLockRelease(WALBufMappingLock); + + WaitXLogInsertionsToFinish(OldPageRqstPtr); + + LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); + + LogwrtResult = XLogCtl->LogwrtResult; + if (LogwrtResult.Write >= OldPageRqstPtr) + { + /* OK, someone wrote it already */ + LWLockRelease(WALWriteLock); + } + else + { + /* Have to write it ourselves */ + TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START(); + WriteRqst.Write = OldPageRqstPtr; + WriteRqst.Flush = 0; + XLogWrite(WriteRqst, false); + LWLockRelease(WALWriteLock); + TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE(); + } + /* Re-acquire WALBufMappingLock and retry */ + LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE); + continue; } } - } - /* - * Now the next buffer slot is free and we can set it up to be the next - * output page. - */ - NewPageBeginPtr = XLogCtl->xlblocks[Insert->curridx]; + /* + * Now the next buffer slot is free and we can set it up to be the next + * output page. + */ + NewPageBeginPtr = XLogCtl->xlblocks[XLogCtl->curridx]; + NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ; - if (new_segment) - { - /* force it to a segment start point */ - if (NewPageBeginPtr % XLogSegSize != 0) - NewPageBeginPtr += XLogSegSize - NewPageBeginPtr % XLogSegSize; - } + Assert(NewPageEndPtr % XLOG_BLCKSZ == 0); + Assert(XLogRecEndPtrToBufIdx(NewPageEndPtr) == nextidx); + Assert(XLogRecPtrToBufIdx(NewPageBeginPtr) == nextidx); - NewPageEndPtr = NewPageBeginPtr; - NewPageEndPtr += XLOG_BLCKSZ; - XLogCtl->xlblocks[nextidx] = NewPageEndPtr; - NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ); + NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ); - Insert->curridx = nextidx; - Insert->currpage = NewPage; + /* + * Be sure to re-zero the buffer so that bytes beyond what we've + * written will look like zeroes and not valid XLOG records... + */ + MemSet((char *) NewPage, 0, XLOG_BLCKSZ); - Insert->currpos = ((char *) NewPage) +SizeOfXLogShortPHD; + /* + * Fill the new page's header + */ + NewPage ->xlp_magic = XLOG_PAGE_MAGIC; - /* - * Be sure to re-zero the buffer so that bytes beyond what we've written - * will look like zeroes and not valid XLOG records... - */ - MemSet((char *) NewPage, 0, XLOG_BLCKSZ); + /* NewPage->xlp_info = 0; */ /* done by memset */ + NewPage ->xlp_tli = ThisTimeLineID; + NewPage ->xlp_pageaddr = NewPageBeginPtr; + /* NewPage->xlp_rem_len = 0; */ /* done by memset */ - /* - * Fill the new page's header - */ - NewPage ->xlp_magic = XLOG_PAGE_MAGIC; + /* + * If online backup is not in progress, mark the header to indicate + * that* WAL records beginning in this page have removable backup + * blocks. This allows the WAL archiver to know whether it is safe to + * compress archived WAL data by transforming full-block records into + * the non-full-block format. It is sufficient to record this at the + * page level because we force a page switch (in fact a segment switch) + * when starting a backup, so the flag will be off before any records + * can be written during the backup. At the end of a backup, the last + * page will be marked as all unsafe when perhaps only part is unsafe, + * but at worst the archiver would miss the opportunity to compress a + * few records. + */ + if (!Insert->forcePageWrites) + NewPage ->xlp_info |= XLP_BKP_REMOVABLE; - /* NewPage->xlp_info = 0; */ /* done by memset */ - NewPage ->xlp_tli = ThisTimeLineID; - NewPage ->xlp_pageaddr = NewPageBeginPtr; + /* + * If first page of an XLOG segment file, make it a long header. + */ + if ((NewPage->xlp_pageaddr % XLogSegSize) == 0) + { + XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage; - /* - * If online backup is not in progress, mark the header to indicate that - * WAL records beginning in this page have removable backup blocks. This - * allows the WAL archiver to know whether it is safe to compress archived - * WAL data by transforming full-block records into the non-full-block - * format. It is sufficient to record this at the page level because we - * force a page switch (in fact a segment switch) when starting a backup, - * so the flag will be off before any records can be written during the - * backup. At the end of a backup, the last page will be marked as all - * unsafe when perhaps only part is unsafe, but at worst the archiver - * would miss the opportunity to compress a few records. - */ - if (!Insert->forcePageWrites) - NewPage ->xlp_info |= XLP_BKP_REMOVABLE; + NewLongPage->xlp_sysid = ControlFile->system_identifier; + NewLongPage->xlp_seg_size = XLogSegSize; + NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ; + NewPage ->xlp_info |= XLP_LONG_HEADER; + } - /* - * If first page of an XLOG segment file, make it a long header. - */ - if ((NewPage->xlp_pageaddr % XLogSegSize) == 0) - { - XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage; + /* + * Make sure the initialization of the page becomes visible to others + * before the xlblocks update. GetXLogBuffer() reads xlblocks without + * holding a lock. + */ + pg_write_barrier(); + + *((volatile XLogRecPtr *) &XLogCtl->xlblocks[nextidx]) = NewPageEndPtr; - NewLongPage->xlp_sysid = ControlFile->system_identifier; - NewLongPage->xlp_seg_size = XLogSegSize; - NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ; - NewPage ->xlp_info |= XLP_LONG_HEADER; + XLogCtl->curridx = nextidx; - Insert->currpos = ((char *) NewPage) +SizeOfXLogLongPHD; + npages++; } + LWLockRelease(WALBufMappingLock); - return update_needed; +#ifdef WAL_DEBUG + if (npages > 0) + { + elog(DEBUG1, "initialized %d pages, upto %X/%X", + npages, (uint32) (NewPageEndPtr >> 32), (uint32) NewPageEndPtr); + } +#endif } /* @@ -1486,16 +2591,12 @@ XLogCheckpointNeeded(XLogSegNo new_segno) * This option allows us to avoid uselessly issuing multiple writes when a * single one would do. * - * If xlog_switch == TRUE, we are intending an xlog segment switch, so - * perform end-of-segment actions after writing the last page, even if - * it's not physically the end of its segment. (NB: this will work properly - * only if caller specifies WriteRqst == page-end and flexible == false, - * and there is some data to write.) - * - * Must be called with WALWriteLock held. + * Must be called with WALWriteLock held. WaitXLogInsertionsToFinish(WriteRqst) + * must be called before grabbing the lock, to make sure the data is ready to + * write. */ static void -XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch) +XLogWrite(XLogwrtRqst WriteRqst, bool flexible) { XLogCtlWrite *Write = &XLogCtl->Write; bool ispartialpage; @@ -1544,15 +2645,15 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch) * if we're passed a bogus WriteRqst.Write that is past the end of the * last page that's been initialized by AdvanceXLInsertBuffer. */ - if (LogwrtResult.Write >= XLogCtl->xlblocks[curridx]) + XLogRecPtr EndPtr = XLogCtl->xlblocks[curridx]; + if (LogwrtResult.Write >= EndPtr) elog(PANIC, "xlog write request %X/%X is past end of log %X/%X", (uint32) (LogwrtResult.Write >> 32), (uint32) LogwrtResult.Write, - (uint32) (XLogCtl->xlblocks[curridx] >> 32), - (uint32) XLogCtl->xlblocks[curridx]); + (uint32) (EndPtr >> 32), (uint32) EndPtr); /* Advance LogwrtResult.Write to end of current buffer page */ - LogwrtResult.Write = XLogCtl->xlblocks[curridx]; + LogwrtResult.Write = EndPtr; ispartialpage = WriteRqst.Write < LogwrtResult.Write; if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo)) @@ -1656,16 +2757,13 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch) * later. Doing it here ensures that one and only one backend will * perform this fsync. * - * We also do this if this is the last page written for an xlog - * switch. - * * This is also the right place to notify the Archiver that the * segment is ready to copy to archival storage, and to update the * timer for archive_timeout, and to signal for a checkpoint if * too many logfile segments have been used since the last * checkpoint. */ - if (finishing_seg || (xlog_switch && last_iteration)) + if (finishing_seg) { issue_xlog_fsync(openLogFile, openLogSegNo); @@ -1949,6 +3047,7 @@ XLogFlush(XLogRecPtr record) { /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; + XLogRecPtr insertpos; /* read LogwrtResult and update local state */ SpinLockAcquire(&xlogctl->info_lck); @@ -1961,6 +3060,12 @@ XLogFlush(XLogRecPtr record) if (record <= LogwrtResult.Flush) break; + /* + * Before actually performing the write, wait for all in-flight + * insertions to the pages we're about to write to finish. + */ + insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr); + /* * Try to get the write lock. If we can't get it immediately, wait * until it's released, and recheck if we still need to do the flush @@ -1997,31 +3102,27 @@ XLogFlush(XLogRecPtr record) */ if (CommitDelay > 0 && enableFsync && MinimumActiveBackends(CommitSiblings)) + { pg_usleep(CommitDelay); + /* + * Re-check how far we can now flush the WAL. It's generally not + * safe to call WaitXLogInsetionsToFinish while holding + * WALWriteLock, because an in-progress insertion might need to + * also grab WALWriteLock to make progress. But we know that all + * the insertions up to insertpos have already finished, because + * that's what the earlier WaitXLogInsertionsToFinish() returned. + * We're only calling it again to allow insertpos to be moved + * further forward, not to actually wait for anyone. + */ + insertpos = WaitXLogInsertionsToFinish(insertpos); + } + /* try to write/flush later additions to XLOG as well */ - if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE)) - { - XLogCtlInsert *Insert = &XLogCtl->Insert; - uint32 freespace = INSERT_FREESPACE(Insert); + WriteRqst.Write = insertpos; + WriteRqst.Flush = insertpos; - if (freespace == 0) /* buffer is full */ - WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx]; - else - { - WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx]; - WriteRqstPtr -= freespace; - } - LWLockRelease(WALInsertLock); - WriteRqst.Write = WriteRqstPtr; - WriteRqst.Flush = WriteRqstPtr; - } - else - { - WriteRqst.Write = WriteRqstPtr; - WriteRqst.Flush = record; - } - XLogWrite(WriteRqst, false, false); + XLogWrite(WriteRqst, false); LWLockRelease(WALWriteLock); /* done */ @@ -2142,7 +3243,8 @@ XLogBackgroundFlush(void) START_CRIT_SECTION(); - /* now wait for the write lock */ + /* now wait for any in-progress insertions to finish and get write lock */ + WaitXLogInsertionsToFinish(WriteRqstPtr); LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); LogwrtResult = XLogCtl->LogwrtResult; if (WriteRqstPtr > LogwrtResult.Flush) @@ -2151,7 +3253,7 @@ XLogBackgroundFlush(void) WriteRqst.Write = WriteRqstPtr; WriteRqst.Flush = WriteRqstPtr; - XLogWrite(WriteRqst, flexible, false); + XLogWrite(WriteRqst, flexible); wrote_something = true; } LWLockRelease(WALWriteLock); @@ -2161,6 +3263,12 @@ XLogBackgroundFlush(void) /* wake up walsenders now that we've released heavily contended locks */ WalSndWakeupProcessRequests(); + /* + * Great, done. To take some work off the critical path, try to initialize + * as many of the no-longer-needed WAL buffers for future use as we can. + */ + AdvanceXLInsertBuffer(InvalidXLogRecPtr, true); + return wrote_something; } @@ -3937,10 +5045,13 @@ XLOGShmemSize(void) /* XLogCtl */ size = sizeof(XLogCtlData); + + /* xlog insertion slots, plus alignment */ + size = add_size(size, mul_size(sizeof(XLogInsertSlotPadded), num_xloginsert_slots + 1)); /* xlblocks array */ size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers)); /* extra alignment padding for XLOG I/O buffers */ - size = add_size(size, ALIGNOF_XLOG_BUFFER); + size = add_size(size, XLOG_BLCKSZ); /* and the buffers themselves */ size = add_size(size, mul_size(XLOG_BLCKSZ, XLOGbuffers)); @@ -3959,11 +5070,11 @@ XLOGShmemInit(void) bool foundCFile, foundXLog; char *allocptr; + int i; ControlFile = (ControlFileData *) ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile); - XLogCtl = (XLogCtlData *) - ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog); + allocptr = ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog); if (foundCFile || foundXLog) { @@ -3971,7 +5082,7 @@ XLOGShmemInit(void) Assert(foundCFile && foundXLog); return; } - + XLogCtl = (XLogCtlData *) allocptr; memset(XLogCtl, 0, sizeof(XLogCtlData)); /* @@ -3979,15 +5090,23 @@ XLOGShmemInit(void) * multiple of the alignment for same, so no extra alignment padding is * needed here. */ - allocptr = ((char *) XLogCtl) + sizeof(XLogCtlData); + allocptr += sizeof(XLogCtlData); XLogCtl->xlblocks = (XLogRecPtr *) allocptr; memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers); allocptr += sizeof(XLogRecPtr) * XLOGbuffers; + /* Xlog insertion slots. Ensure they're aligned to the full padded size */ + allocptr += sizeof(XLogInsertSlotPadded) - + ((uintptr_t) allocptr) % sizeof(XLogInsertSlotPadded); + XLogCtl->Insert.insertSlots = (XLogInsertSlotPadded *) allocptr; + allocptr += sizeof(XLogInsertSlotPadded) * num_xloginsert_slots; + /* - * Align the start of the page buffers to an ALIGNOF_XLOG_BUFFER boundary. + * Align the start of the page buffers to a full xlog block size boundary. + * This simplifies some calculations in XLOG insertion. It is also required + * for O_DIRECT. */ - allocptr = (char *) TYPEALIGN(ALIGNOF_XLOG_BUFFER, allocptr); + allocptr = (char *) TYPEALIGN(XLOG_BLCKSZ, allocptr); XLogCtl->pages = allocptr; memset(XLogCtl->pages, 0, (Size) XLOG_BLCKSZ * XLOGbuffers); @@ -3999,7 +5118,21 @@ XLOGShmemInit(void) XLogCtl->SharedRecoveryInProgress = true; XLogCtl->SharedHotStandbyActive = false; XLogCtl->WalWriterSleeping = false; - XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages); + + for (i = 0; i < num_xloginsert_slots; i++) + { + XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[i].slot; + SpinLockInit(&slot->mutex); + slot->xlogInsertingAt = InvalidXLogRecPtr; + slot->owner = NULL; + + slot->releaseOK = true; + slot->exclusive = 0; + slot->head = NULL; + slot->tail = NULL; + } + + SpinLockInit(&XLogCtl->Insert.insertpos_lck); SpinLockInit(&XLogCtl->info_lck); SpinLockInit(&XLogCtl->ulsn_lck); InitSharedLatch(&XLogCtl->recoveryWakeupLatch); @@ -4050,8 +5183,8 @@ BootStrapXLOG(void) ThisTimeLineID = 1; /* page buffer must be aligned suitably for O_DIRECT */ - buffer = (char *) palloc(XLOG_BLCKSZ + ALIGNOF_XLOG_BUFFER); - page = (XLogPageHeader) TYPEALIGN(ALIGNOF_XLOG_BUFFER, buffer); + buffer = (char *) palloc(XLOG_BLCKSZ + XLOG_BLCKSZ); + page = (XLogPageHeader) TYPEALIGN(XLOG_BLCKSZ, buffer); memset(page, 0, XLOG_BLCKSZ); /* @@ -4893,6 +6026,7 @@ StartupXLOG(void) bool backupEndRequired = false; bool backupFromStandby = false; DBState dbstate_at_startup; + int firstIdx; XLogReaderState *xlogreader; XLogPageReadPrivate private; bool fast_promoted = false; @@ -5257,7 +6391,7 @@ StartupXLOG(void) lastFullPageWrites = checkPoint.fullPageWrites; - RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo; + RedoRecPtr = XLogCtl->RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo; if (RecPtr < checkPoint.redo) ereport(PANIC, @@ -5899,25 +7033,21 @@ StartupXLOG(void) openLogFile = XLogFileOpen(openLogSegNo); openLogOff = 0; Insert = &XLogCtl->Insert; - Insert->PrevRecord = LastRec; - XLogCtl->xlblocks[0] = ((EndOfLog - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ; + Insert->PrevBytePos = XLogRecPtrToBytePos(LastRec); + + firstIdx = XLogRecEndPtrToBufIdx(EndOfLog); + XLogCtl->curridx = firstIdx; + + XLogCtl->xlblocks[firstIdx] = ((EndOfLog - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ; /* * Tricky point here: readBuf contains the *last* block that the LastRec * record spans, not the one it starts in. The last block is indeed the * one we want to use. */ - if (EndOfLog % XLOG_BLCKSZ == 0) - { - memset(Insert->currpage, 0, XLOG_BLCKSZ); - } - else - { - Assert(readOff == (XLogCtl->xlblocks[0] - XLOG_BLCKSZ) % XLogSegSize); - memcpy((char *) Insert->currpage, xlogreader->readBuf, XLOG_BLCKSZ); - } - Insert->currpos = (char *) Insert->currpage + - (EndOfLog + XLOG_BLCKSZ - XLogCtl->xlblocks[0]); + Assert(readOff == (XLogCtl->xlblocks[firstIdx] - XLOG_BLCKSZ) % XLogSegSize); + memcpy((char *) &XLogCtl->pages[firstIdx * XLOG_BLCKSZ], xlogreader->readBuf, XLOG_BLCKSZ); + Insert->CurrBytePos = XLogRecPtrToBytePos(EndOfLog); LogwrtResult.Write = LogwrtResult.Flush = EndOfLog; @@ -5926,12 +7056,12 @@ StartupXLOG(void) XLogCtl->LogwrtRqst.Write = EndOfLog; XLogCtl->LogwrtRqst.Flush = EndOfLog; - freespace = INSERT_FREESPACE(Insert); + freespace = INSERT_FREESPACE(EndOfLog); if (freespace > 0) { /* Make sure rest of page is zero */ - MemSet(Insert->currpos, 0, freespace); - XLogCtl->Write.curridx = 0; + MemSet(&XLogCtl->pages[firstIdx * XLOG_BLCKSZ] + EndOfLog % XLOG_BLCKSZ, 0, freespace); + XLogCtl->Write.curridx = firstIdx; } else { @@ -5943,7 +7073,7 @@ StartupXLOG(void) * this is sufficient. The first actual attempt to insert a log * record will advance the insert state. */ - XLogCtl->Write.curridx = NextBufIdx(0); + XLogCtl->Write.curridx = NextBufIdx(firstIdx); } /* Pre-scan prepared transactions to find out the range of XIDs present */ @@ -6504,21 +7634,29 @@ InitXLOGAccess(void) } /* - * Once spawned, a backend may update its local RedoRecPtr from - * XLogCtl->Insert.RedoRecPtr; it must hold the insert lock or info_lck - * to do so. This is done in XLogInsert() or GetRedoRecPtr(). + * Return the current Redo pointer from shared memory. + * + * As a side-effect, the local RedoRecPtr copy is updated. */ XLogRecPtr GetRedoRecPtr(void) { /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; + XLogRecPtr ptr; + /* + * The possibly not up-to-date copy in XlogCtl is enough. Even if we + * grabbed a WAL insertion slot to read the master copy, someone might + * update it just after we've released the lock. + */ SpinLockAcquire(&xlogctl->info_lck); - Assert(RedoRecPtr <= xlogctl->Insert.RedoRecPtr); - RedoRecPtr = xlogctl->Insert.RedoRecPtr; + ptr = xlogctl->RedoRecPtr; SpinLockRelease(&xlogctl->info_lck); + if (RedoRecPtr < ptr) + RedoRecPtr = ptr; + return RedoRecPtr; } @@ -6527,9 +7665,8 @@ GetRedoRecPtr(void) * * NOTE: The value *actually* returned is the position of the last full * xlog page. It lags behind the real insert position by at most 1 page. - * For that, we don't need to acquire WALInsertLock which can be quite - * heavily contended, and an approximation is enough for the current - * usage of this function. + * For that, we don't need to scan through WAL insertion slots, and an + * approximation is enough for the current usage of this function. */ XLogRecPtr GetInsertRecPtr(void) @@ -6806,6 +7943,8 @@ LogCheckpointEnd(bool restartpoint) void CreateCheckPoint(int flags) { + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; bool shutdown; CheckPoint checkPoint; XLogRecPtr recptr; @@ -6813,6 +7952,7 @@ CreateCheckPoint(int flags) XLogRecData rdata; uint32 freespace; XLogSegNo _logSegNo; + XLogRecPtr curInsert; VirtualTransactionId *vxids; int nvxids; @@ -6883,10 +8023,11 @@ CreateCheckPoint(int flags) checkPoint.oldestActiveXid = InvalidTransactionId; /* - * We must hold WALInsertLock while examining insert state to determine - * the checkpoint REDO pointer. + * We must block concurrent insertions while examining insert state to + * determine the checkpoint REDO pointer. */ - LWLockAcquire(WALInsertLock, LW_EXCLUSIVE); + WALInsertSlotAcquire(true); + curInsert = XLogBytePosToRecPtr(Insert->CurrBytePos); /* * If this isn't a shutdown or forced checkpoint, and we have not inserted @@ -6906,14 +8047,11 @@ CreateCheckPoint(int flags) if ((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY | CHECKPOINT_FORCE)) == 0) { - XLogRecPtr curInsert; - - INSERT_RECPTR(curInsert, Insert, Insert->curridx); if (curInsert == ControlFile->checkPoint + MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) && ControlFile->checkPoint == ControlFile->checkPointCopy.redo) { - LWLockRelease(WALInsertLock); + WALInsertSlotRelease(); LWLockRelease(CheckpointLock); END_CRIT_SECTION(); return; @@ -6945,18 +8083,19 @@ CreateCheckPoint(int flags) * the buffer flush work. Those XLOG records are logically after the * checkpoint, even though physically before it. Got that? */ - freespace = INSERT_FREESPACE(Insert); + freespace = INSERT_FREESPACE(curInsert); if (freespace == 0) { - (void) AdvanceXLInsertBuffer(false); - /* OK to ignore update return flag, since we will do flush anyway */ - freespace = INSERT_FREESPACE(Insert); + if (curInsert % XLogSegSize == 0) + curInsert += SizeOfXLogLongPHD; + else + curInsert += SizeOfXLogShortPHD; } - INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx); + checkPoint.redo = curInsert; /* * Here we update the shared RedoRecPtr for future XLogInsert calls; this - * must be done while holding the insert lock AND the info_lck. + * must be done while holding the insertion slots. * * Note: if we fail to complete the checkpoint, RedoRecPtr will be left * pointing past where it really needs to point. This is okay; the only @@ -6965,20 +8104,18 @@ CreateCheckPoint(int flags) * XLogInserts that happen while we are dumping buffers must assume that * their buffer changes are not included in the checkpoint. */ - { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - - SpinLockAcquire(&xlogctl->info_lck); - RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo; - SpinLockRelease(&xlogctl->info_lck); - } + RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo; /* - * Now we can release WAL insert lock, allowing other xacts to proceed - * while we are flushing disk buffers. + * Now we can release the WAL insertion slots, allowing other xacts to + * proceed while we are flushing disk buffers. */ - LWLockRelease(WALInsertLock); + WALInsertSlotRelease(); + + /* Update the info_lck-protected copy of RedoRecPtr as well */ + SpinLockAcquire(&xlogctl->info_lck); + xlogctl->RedoRecPtr = checkPoint.redo; + SpinLockRelease(&xlogctl->info_lck); /* * If enabled, log checkpoint start. We postpone this until now so as not @@ -7003,10 +8140,11 @@ CreateCheckPoint(int flags) * we wait till he's out of his commit critical section before proceeding. * See notes in RecordTransactionCommit(). * - * Because we've already released WALInsertLock, this test is a bit fuzzy: - * it is possible that we will wait for xacts we didn't really need to - * wait for. But the delay should be short and it seems better to make - * checkpoint take a bit longer than to hold locks longer than necessary. + * Because we've already released the insertion slots, this test is a bit + * fuzzy: it is possible that we will wait for xacts we didn't really need + * to wait for. But the delay should be short and it seems better to make + * checkpoint take a bit longer than to hold off insertions longer than + * necessary. * (In fact, the whole reason we have this issue is that xact.c does * commit record XLOG insertion and clog update as two separate steps * protected by different locks, but again that seems best on grounds of @@ -7233,10 +8371,10 @@ CreateEndOfRecoveryRecord(void) xlrec.end_time = time(NULL); - LWLockAcquire(WALInsertLock, LW_SHARED); + WALInsertSlotAcquire(true); xlrec.ThisTimeLineID = ThisTimeLineID; xlrec.PrevTimeLineID = XLogCtl->PrevTimeLineID; - LWLockRelease(WALInsertLock); + WALInsertSlotRelease(); LocalSetXLogInsertAllowed(); @@ -7437,15 +8575,18 @@ CreateRestartPoint(int flags) * the number of segments replayed since last restartpoint, and request a * restartpoint if it exceeds checkpoint_segments. * - * You need to hold WALInsertLock and info_lck to update it, although - * during recovery acquiring WALInsertLock is just pro forma, because - * there is no other processes updating Insert.RedoRecPtr. + * Like in CreateCheckPoint(), hold off insertions to update it, although + * during recovery this is just pro forma, because no WAL insertions are + * happening. */ - LWLockAcquire(WALInsertLock, LW_EXCLUSIVE); - SpinLockAcquire(&xlogctl->info_lck); + WALInsertSlotAcquire(true); xlogctl->Insert.RedoRecPtr = lastCheckPoint.redo; + WALInsertSlotRelease(); + + /* Also update the info_lck-protected copy */ + SpinLockAcquire(&xlogctl->info_lck); + xlogctl->RedoRecPtr = lastCheckPoint.redo; SpinLockRelease(&xlogctl->info_lck); - LWLockRelease(WALInsertLock); /* * Prepare to accumulate statistics. @@ -7863,9 +9004,9 @@ UpdateFullPageWrites(void) */ if (fullPageWrites) { - LWLockAcquire(WALInsertLock, LW_EXCLUSIVE); + WALInsertSlotAcquire(true); Insert->fullPageWrites = true; - LWLockRelease(WALInsertLock); + WALInsertSlotRelease(); } /* @@ -7886,9 +9027,9 @@ UpdateFullPageWrites(void) if (!fullPageWrites) { - LWLockAcquire(WALInsertLock, LW_EXCLUSIVE); + WALInsertSlotAcquire(true); Insert->fullPageWrites = false; - LWLockRelease(WALInsertLock); + WALInsertSlotRelease(); } END_CRIT_SECTION(); } @@ -8520,15 +9661,15 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p, * Note that forcePageWrites has no effect during an online backup from * the standby. * - * We must hold WALInsertLock to change the value of forcePageWrites, to - * ensure adequate interlocking against XLogInsert(). + * We must hold all the insertion slots to change the value of + * forcePageWrites, to ensure adequate interlocking against XLogInsert(). */ - LWLockAcquire(WALInsertLock, LW_EXCLUSIVE); + WALInsertSlotAcquire(true); if (exclusive) { if (XLogCtl->Insert.exclusiveBackup) { - LWLockRelease(WALInsertLock); + WALInsertSlotRelease(); ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("a backup is already in progress"), @@ -8539,7 +9680,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p, else XLogCtl->Insert.nonExclusiveBackups++; XLogCtl->Insert.forcePageWrites = true; - LWLockRelease(WALInsertLock); + WALInsertSlotRelease(); /* Ensure we release forcePageWrites if fail below */ PG_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) BoolGetDatum(exclusive)); @@ -8654,13 +9795,13 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p, * taking a checkpoint right after another is not that expensive * either because only few buffers have been dirtied yet. */ - LWLockAcquire(WALInsertLock, LW_SHARED); + WALInsertSlotAcquire(true); if (XLogCtl->Insert.lastBackupStart < startpoint) { XLogCtl->Insert.lastBackupStart = startpoint; gotUniqueStartpoint = true; } - LWLockRelease(WALInsertLock); + WALInsertSlotRelease(); } while (!gotUniqueStartpoint); XLByteToSeg(startpoint, _logSegNo); @@ -8750,7 +9891,7 @@ pg_start_backup_callback(int code, Datum arg) bool exclusive = DatumGetBool(arg); /* Update backup counters and forcePageWrites on failure */ - LWLockAcquire(WALInsertLock, LW_EXCLUSIVE); + WALInsertSlotAcquire(true); if (exclusive) { Assert(XLogCtl->Insert.exclusiveBackup); @@ -8767,7 +9908,7 @@ pg_start_backup_callback(int code, Datum arg) { XLogCtl->Insert.forcePageWrites = false; } - LWLockRelease(WALInsertLock); + WALInsertSlotRelease(); } /* @@ -8838,7 +9979,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p) /* * OK to update backup counters and forcePageWrites */ - LWLockAcquire(WALInsertLock, LW_EXCLUSIVE); + WALInsertSlotAcquire(true); if (exclusive) XLogCtl->Insert.exclusiveBackup = false; else @@ -8858,7 +9999,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p) { XLogCtl->Insert.forcePageWrites = false; } - LWLockRelease(WALInsertLock); + WALInsertSlotRelease(); if (exclusive) { @@ -9143,7 +10284,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p) void do_pg_abort_backup(void) { - LWLockAcquire(WALInsertLock, LW_EXCLUSIVE); + WALInsertSlotAcquire(true); Assert(XLogCtl->Insert.nonExclusiveBackups > 0); XLogCtl->Insert.nonExclusiveBackups--; @@ -9152,7 +10293,7 @@ do_pg_abort_backup(void) { XLogCtl->Insert.forcePageWrites = false; } - LWLockRelease(WALInsertLock); + WALInsertSlotRelease(); } /* @@ -9184,14 +10325,14 @@ GetXLogReplayRecPtr(TimeLineID *replayTLI) XLogRecPtr GetXLogInsertRecPtr(void) { - XLogCtlInsert *Insert = &XLogCtl->Insert; - XLogRecPtr current_recptr; + volatile XLogCtlInsert *Insert = &XLogCtl->Insert; + uint64 current_bytepos; - LWLockAcquire(WALInsertLock, LW_SHARED); - INSERT_RECPTR(current_recptr, Insert, Insert->curridx); - LWLockRelease(WALInsertLock); + SpinLockAcquire(&Insert->insertpos_lck); + current_bytepos = Insert->CurrBytePos; + SpinLockRelease(&Insert->insertpos_lck); - return current_recptr; + return XLogBytePosToRecPtr(current_bytepos); } /* diff --git a/src/backend/storage/lmgr/spin.c b/src/backend/storage/lmgr/spin.c index 5503925788..f054be8806 100644 --- a/src/backend/storage/lmgr/spin.c +++ b/src/backend/storage/lmgr/spin.c @@ -22,6 +22,7 @@ */ #include "postgres.h" +#include "access/xlog.h" #include "miscadmin.h" #include "replication/walsender.h" #include "storage/lwlock.h" @@ -63,6 +64,7 @@ SpinlockSemas(void) nsemas = NumLWLocks(); /* one for each lwlock */ nsemas += NBuffers; /* one for each buffer header */ nsemas += max_wal_senders; /* one for each wal sender process */ + nsemas += num_xloginsert_slots; /* one for each WAL insertion slot */ nsemas += 30; /* plus a bunch for other small-scale use */ return nsemas; diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index d6200616de..5aefd1b62c 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -2037,6 +2037,17 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"xloginsert_slots", PGC_POSTMASTER, WAL_SETTINGS, + gettext_noop("Sets the number of slots for concurrent xlog insertions."), + NULL, + GUC_NOT_IN_SAMPLE + }, + &num_xloginsert_slots, + 8, 1, 1000, + NULL, NULL, NULL + }, + { /* see max_connections */ {"max_wal_senders", PGC_POSTMASTER, REPLICATION_SENDING, diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 83e583259d..002862cca5 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -190,6 +190,7 @@ extern char *XLogArchiveCommand; extern bool EnableHotStandby; extern bool fullPageWrites; extern bool log_checkpoints; +extern int num_xloginsert_slots; /* WAL levels */ typedef enum WalLevel diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h index fa6497adad..bca166ebdc 100644 --- a/src/include/access/xlogdefs.h +++ b/src/include/access/xlogdefs.h @@ -93,14 +93,4 @@ typedef uint32 TimeLineID; #define DEFAULT_SYNC_METHOD SYNC_METHOD_FSYNC #endif -/* - * Limitation of buffer-alignment for direct IO depends on OS and filesystem, - * but XLOG_BLCKSZ is assumed to be enough for it. - */ -#ifdef O_DIRECT -#define ALIGNOF_XLOG_BUFFER XLOG_BLCKSZ -#else -#define ALIGNOF_XLOG_BUFFER ALIGNOF_BUFFER -#endif - #endif /* XLOG_DEFS_H */ diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index d8f7e9d64a..85dc4ffdaa 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -53,7 +53,7 @@ typedef enum LWLockId ProcArrayLock, SInvalReadLock, SInvalWriteLock, - WALInsertLock, + WALBufMappingLock, WALWriteLock, ControlFileLock, CheckpointLock,