#include "postmaster/startup.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
+#include "storage/barrier.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/ipc.h"
int wal_level = WAL_LEVEL_MINIMAL;
int CommitDelay = 0; /* precommit delay in microseconds */
int CommitSiblings = 5; /* # concurrent xacts needed to sleep */
+int num_xloginsert_slots = 8;
#ifdef WAL_DEBUG
bool XLOG_DEBUG = false;
* (which is almost but not quite the same as a pointer to the most recent
* CHECKPOINT record). We update this from the shared-memory copy,
* XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
- * hold the Insert lock). See XLogInsert for details. We are also allowed
- * to update from XLogCtl->Insert.RedoRecPtr if we hold the info_lck;
+ * hold an insertion slot). See XLogInsert for details. We are also allowed
+ * to update from XLogCtl->RedoRecPtr if we hold the info_lck;
* see GetRedoRecPtr. A freshly spawned backend obtains the value during
* InitXLOGAccess.
*/
* so it's a plain spinlock. The other locks are held longer (potentially
* over I/O operations), so we use LWLocks for them. These locks are:
*
- * WALInsertLock: must be held to insert a record into the WAL buffers.
+ * WALBufMappingLock: must be held to replace a page in the WAL buffer cache.
+ * It is only held while initializing and changing the mapping. If the
+ * contents of the buffer being replaced haven't been written yet, the mapping
+ * lock is released while the write is done, and reacquired afterwards.
*
* WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
* XLogFlush).
XLogRecPtr Flush; /* last byte + 1 flushed */
} XLogwrtResult;
+
+/*
+ * A slot for inserting to the WAL. This is similar to an LWLock, the main
+ * difference is that there is an extra xlogInsertingAt field that is protected
+ * by the same mutex. Unlike an LWLock, a slot can only be acquired in
+ * exclusive mode.
+ *
+ * The xlogInsertingAt field is used to advertise to other processes how far
+ * the slot owner has progressed in inserting the record. When a backend
+ * acquires a slot, it initializes xlogInsertingAt to 1, because it doesn't
+ * yet know where it's going to insert the record. That's conservative
+ * but correct; the new insertion is certainly going to go to a byte position
+ * greater than 1. If another backend needs to flush the WAL, it will have to
+ * wait for the new insertion. xlogInsertingAt is updated after finishing the
+ * insert or when crossing a page boundary, which will wake up anyone waiting
+ * for it, whether the wait was necessary in the first place or not.
+ *
+ * A process can wait on a slot in two modes: LW_EXCLUSIVE or
+ * LW_WAIT_UNTIL_FREE. LW_EXCLUSIVE works like in an lwlock; when the slot is
+ * released, the first LW_EXCLUSIVE waiter in the queue is woken up. Processes
+ * waiting in LW_WAIT_UNTIL_FREE mode are woken up whenever the slot is
+ * released, or xlogInsertingAt is updated. In other words, a process in
+ * LW_WAIT_UNTIL_FREE mode is woken up whenever the inserter makes any progress
+ * copying the record in place. LW_WAIT_UNTIL_FREE waiters are always added to
+ * the front of the queue, while LW_EXCLUSIVE waiters are appended to the end.
+ *
+ * To join the wait queue, a process must set MyProc->lwWaitMode to the mode
+ * it wants to wait in, MyProc->lwWaiting to true, and link MyProc to the head
+ * or tail of the wait queue. The same mechanism is used to wait on an LWLock,
+ * see lwlock.c for details.
+ */
+typedef struct
+{
+ slock_t mutex; /* protects the below fields */
+ XLogRecPtr xlogInsertingAt; /* insert has completed up to this point */
+
+ PGPROC *owner; /* for debugging purposes */
+
+ bool releaseOK; /* T if ok to release waiters */
+ char exclusive; /* # of exclusive holders (0 or 1) */
+ PGPROC *head; /* head of list of waiting PGPROCs */
+ PGPROC *tail; /* tail of list of waiting PGPROCs */
+ /* tail is undefined when head is NULL */
+} XLogInsertSlot;
+
+/*
+ * All the slots are allocated as an array in shared memory. We force the
+ * array stride to be a power of 2, which saves a few cycles in indexing, but
+ * more importantly also ensures that individual slots don't cross cache line
+ * boundaries. (Of course, we have to also ensure that the array start
+ * address is suitably aligned.)
+ */
+typedef union XLogInsertSlotPadded
+{
+ XLogInsertSlot slot;
+ char pad[64];
+} XLogInsertSlotPadded;
+
/*
* Shared state data for XLogInsert.
*/
typedef struct XLogCtlInsert
{
- XLogRecPtr PrevRecord; /* start of previously-inserted record */
- int curridx; /* current block index in cache */
- XLogPageHeader currpage; /* points to header of block in cache */
- char *currpos; /* current insertion point in cache */
- XLogRecPtr RedoRecPtr; /* current redo point for insertions */
- bool forcePageWrites; /* forcing full-page writes for PITR? */
+ slock_t insertpos_lck; /* protects CurrBytePos and PrevBytePos */
+
+ /*
+ * CurrBytePos is the end of reserved WAL. The next record will be inserted
+ * at that position. PrevBytePos is the start position of the previously
+ * inserted (or rather, reserved) record - it is copied to the the prev-
+ * link of the next record. These are stored as "usable byte positions"
+ * rather than XLogRecPtrs (see XLogBytePosToRecPtr()).
+ */
+ uint64 CurrBytePos;
+ uint64 PrevBytePos;
+
+ /* insertion slots, see above for details */
+ XLogInsertSlotPadded *insertSlots;
/*
* fullPageWrites is the master copy used by all backends to determine
* This is required because, when full_page_writes is changed by SIGHUP,
* we must WAL-log it before it actually affects WAL-logging by backends.
* Checkpointer sets at startup or after SIGHUP.
+ *
+ * To read these fields, you must hold an insertion slot. To modify them,
+ * you must hold ALL the slots.
*/
+ XLogRecPtr RedoRecPtr; /* current redo point for insertions */
+ bool forcePageWrites; /* forcing full-page writes for PITR? */
bool fullPageWrites;
/*
*/
typedef struct XLogCtlData
{
- /* Protected by WALInsertLock: */
XLogCtlInsert Insert;
/* Protected by info_lck: */
XLogwrtRqst LogwrtRqst;
+ XLogRecPtr RedoRecPtr; /* a recent copy of Insert->RedoRecPtr */
uint32 ckptXidEpoch; /* nextXID & epoch of latest checkpoint */
TransactionId ckptXid;
XLogRecPtr asyncXactLSN; /* LSN of newest async commit/abort */
*/
XLogwrtResult LogwrtResult;
+ /*
+ * Latest initialized block index in cache.
+ *
+ * To change curridx and the identity of a buffer, you need to hold
+ * WALBufMappingLock. To change the identity of a buffer that's still
+ * dirty, the old page needs to be written out first, and for that you
+ * need WALWriteLock, and you need to ensure that there are no in-progress
+ * insertions to the page by calling WaitXLogInsertionsToFinish().
+ */
+ int curridx;
+
/*
* These values do not change after startup, although the pointed-to pages
- * and xlblocks values certainly do. Permission to read/write the pages
- * and xlblocks values depends on WALInsertLock and WALWriteLock.
+ * and xlblocks values certainly do. xlblock values are protected by
+ * WALBufMappingLock.
*/
char *pages; /* buffers for unwritten XLOG pages */
XLogRecPtr *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */
static ControlFileData *ControlFile = NULL;
/*
- * Macros for managing XLogInsert state. In most cases, the calling routine
- * has local copies of XLogCtl->Insert and/or XLogCtl->Insert->curridx,
- * so these are passed as parameters instead of being fetched via XLogCtl.
+ * Calculate the amount of space left on the page after 'endptr'. Beware
+ * multiple evaluation!
*/
+#define INSERT_FREESPACE(endptr) \
+ (((endptr) % XLOG_BLCKSZ == 0) ? 0 : (XLOG_BLCKSZ - (endptr) % XLOG_BLCKSZ))
-/* Free space remaining in the current xlog page buffer */
-#define INSERT_FREESPACE(Insert) \
- (XLOG_BLCKSZ - ((Insert)->currpos - (char *) (Insert)->currpage))
+/* Macro to advance to next buffer index. */
+#define NextBufIdx(idx) \
+ (((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
-/* Construct XLogRecPtr value for current insertion point */
-#define INSERT_RECPTR(recptr,Insert,curridx) \
- (recptr) = XLogCtl->xlblocks[curridx] - INSERT_FREESPACE(Insert)
+/*
+ * XLogRecPtrToBufIdx returns the index of the WAL buffer that holds, or
+ * would hold if it was in cache, the page containing 'recptr'.
+ *
+ * XLogRecEndPtrToBufIdx is the same, but a pointer to the first byte of a
+ * page is taken to mean the previous page.
+ */
+#define XLogRecPtrToBufIdx(recptr) \
+ (((recptr) / XLOG_BLCKSZ) % (XLogCtl->XLogCacheBlck + 1))
-#define PrevBufIdx(idx) \
- (((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))
+#define XLogRecEndPtrToBufIdx(recptr) \
+ ((((recptr) - 1) / XLOG_BLCKSZ) % (XLogCtl->XLogCacheBlck + 1))
-#define NextBufIdx(idx) \
- (((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
+/*
+ * These are the number of bytes in a WAL page and segment usable for WAL data.
+ */
+#define UsableBytesInPage (XLOG_BLCKSZ - SizeOfXLogShortPHD)
+#define UsableBytesInSegment ((XLOG_SEG_SIZE / XLOG_BLCKSZ) * UsableBytesInPage - (SizeOfXLogLongPHD - SizeOfXLogShortPHD))
/*
* Private, possibly out-of-date copy of shared LogwrtResult.
/* Have we launched bgwriter during recovery? */
static bool bgwriterLaunched = false;
+/* For WALInsertSlotAcquire/Release functions */
+static int MySlotNo = 0;
+static bool holdingAllSlots = false;
static void readRecoveryCommandFile(void);
static void exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo);
XLogRecPtr *lsn, BkpBlock *bkpb);
static Buffer RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb,
char *blk, bool get_cleanup_lock, bool keep_buffer);
-static bool AdvanceXLInsertBuffer(bool new_segment);
+static void AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic);
static bool XLogCheckpointNeeded(XLogSegNo new_segno);
-static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
+static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
bool find_free, int *max_advance,
bool use_lock);
static void rm_redo_error_callback(void *arg);
static int get_sync_bit(int method);
+static void CopyXLogRecordToWAL(int write_len, bool isLogSwitch,
+ XLogRecData *rdata,
+ XLogRecPtr StartPos, XLogRecPtr EndPos);
+static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
+ XLogRecPtr *EndPos, XLogRecPtr *PrevPtr);
+static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
+ XLogRecPtr *PrevPtr);
+static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
+static void WakeupWaiters(XLogRecPtr EndPos);
+static char *GetXLogBuffer(XLogRecPtr ptr);
+static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
+static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
+static uint64 XLogRecPtrToBytePos(XLogRecPtr ptr);
+
+static void WALInsertSlotAcquire(bool exclusive);
+static void WALInsertSlotAcquireOne(int slotno);
+static void WALInsertSlotRelease(void);
+static void WALInsertSlotReleaseOne(int slotno);
/*
* Insert an XLOG record having the specified RMID and info bytes,
XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
{
XLogCtlInsert *Insert = &XLogCtl->Insert;
- XLogRecPtr RecPtr;
- XLogRecPtr WriteRqst;
- uint32 freespace;
- int curridx;
XLogRecData *rdt;
XLogRecData *rdt_lastnormal;
Buffer dtbuf[XLR_MAX_BKP_BLOCKS];
uint32 len,
write_len;
unsigned i;
- bool updrqst;
bool doPageWrites;
bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
+ bool inserted;
uint8 info_orig = info;
static XLogRecord *rechdr;
+ XLogRecPtr StartPos;
+ XLogRecPtr EndPos;
if (rechdr == NULL)
{
*/
if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
{
- RecPtr = SizeOfXLogLongPHD; /* start of 1st chkpt record */
- return RecPtr;
+ EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */
+ return EndPos;
}
/*
* up.
*
* We may have to loop back to here if a race condition is detected below.
- * We could prevent the race by doing all this work while holding the
- * insert lock, but it seems better to avoid doing CRC calculations while
- * holding the lock.
+ * We could prevent the race by doing all this work while holding an
+ * insertion slot, but it seems better to avoid doing CRC calculations
+ * while holding one.
*
* We add entries for backup blocks to the chain, so that they don't need
* any special treatment in the critical section where the chunks are
/*
* Decide if we need to do full-page writes in this XLOG record: true if
* full_page_writes is on or we have a PITR request for it. Since we
- * don't yet have the insert lock, fullPageWrites and forcePageWrites
- * could change under us, but we'll recheck them once we have the lock.
+ * don't yet have an insertion slot, fullPageWrites and forcePageWrites
+ * could change under us, but we'll recheck them once we have a slot.
*/
doPageWrites = Insert->fullPageWrites || Insert->forcePageWrites;
COMP_CRC32(rdata_crc, rdt->data, rdt->len);
/*
- * Construct record header (prev-link and CRC are filled in later), and
- * make that the first chunk in the chain.
+ * Construct record header (prev-link is filled in later, after reserving
+ * the space for the record), and make that the first chunk in the chain.
+ *
+ * The CRC calculated for the header here doesn't include prev-link,
+ * because we don't know it yet. It will be added later.
*/
rechdr->xl_xid = GetCurrentTransactionIdIfAny();
rechdr->xl_tot_len = SizeOfXLogRecord + write_len;
rechdr->xl_len = len; /* doesn't include backup blocks */
rechdr->xl_info = info;
rechdr->xl_rmid = rmid;
+ rechdr->xl_prev = InvalidXLogRecPtr;
+ COMP_CRC32(rdata_crc, ((char *) rechdr), offsetof(XLogRecord, xl_prev));
hdr_rdt.next = rdata;
hdr_rdt.data = (char *) rechdr;
hdr_rdt.len = SizeOfXLogRecord;
-
write_len += SizeOfXLogRecord;
+ /*----------
+ *
+ * We have now done all the preparatory work we can without holding a
+ * lock or modifying shared state. From here on, inserting the new WAL
+ * record to the shared WAL buffer cache is a two-step process:
+ *
+ * 1. Reserve the right amount of space from the WAL. The current head of
+ * reserved space is kept in Insert->CurrBytePos, and is protected by
+ * insertpos_lck.
+ *
+ * 2. Copy the record to the reserved WAL space. This involves finding the
+ * correct WAL buffer containing the reserved space, and copying the
+ * record in place. This can be done concurrently in multiple processes.
+ *
+ * To keep track of which insertions are still in-progress, each concurrent
+ * inserter allocates an "insertion slot", which tells others how far the
+ * inserter has progressed. There is a small fixed number of insertion
+ * slots, determined by the num_xloginsert_slots GUC. When an inserter
+ * finishes, it updates the xlogInsertingAt of its slot to the end of the
+ * record it inserted, to let others know that it's done. xlogInsertingAt
+ * is also updated when crossing over to a new WAL buffer, to allow the
+ * the previous buffer to be flushed.
+ *
+ * Holding onto a slot also protects RedoRecPtr and fullPageWrites from
+ * changing until the insertion is finished.
+ *
+ * Step 2 can usually be done completely in parallel. If the required WAL
+ * page is not initialized yet, you have to grab WALBufMappingLock to
+ * initialize it, but the WAL writer tries to do that ahead of insertions
+ * to avoid that from happening in the critical path.
+ *
+ *----------
+ */
START_CRIT_SECTION();
-
- /* Now wait to get insert lock */
- LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+ WALInsertSlotAcquire(isLogSwitch);
/*
* Check to see if my RedoRecPtr is out of date. If so, may have to go
* Oops, this buffer now needs to be backed up, but we
* didn't think so above. Start over.
*/
- LWLockRelease(WALInsertLock);
+ WALInsertSlotRelease();
END_CRIT_SECTION();
rdt_lastnormal->next = NULL;
info = info_orig;
if ((Insert->fullPageWrites || Insert->forcePageWrites) && !doPageWrites)
{
/* Oops, must redo it with full-page data. */
- LWLockRelease(WALInsertLock);
+ WALInsertSlotRelease();
END_CRIT_SECTION();
rdt_lastnormal->next = NULL;
info = info_orig;
}
/*
- * If the current page is completely full, the record goes to the next
- * page, right after the page header.
+ * Reserve space for the record in the WAL. This also sets the xl_prev
+ * pointer.
+ */
+ if (isLogSwitch)
+ inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev);
+ else
+ {
+ ReserveXLogInsertLocation(write_len, &StartPos, &EndPos,
+ &rechdr->xl_prev);
+ inserted = true;
+ }
+
+ if (inserted)
+ {
+ /*
+ * Now that xl_prev has been filled in, finish CRC calculation of the
+ * record header.
+ */
+ COMP_CRC32(rdata_crc, ((char *) &rechdr->xl_prev), sizeof(XLogRecPtr));
+ FIN_CRC32(rdata_crc);
+ rechdr->xl_crc = rdata_crc;
+
+ /*
+ * All the record data, including the header, is now ready to be
+ * inserted. Copy the record in the space reserved.
+ */
+ CopyXLogRecordToWAL(write_len, isLogSwitch, &hdr_rdt, StartPos, EndPos);
+ }
+ else
+ {
+ /*
+ * This was an xlog-switch record, but the current insert location was
+ * already exactly at the beginning of a segment, so there was no need
+ * to do anything.
+ */
+ }
+
+ /*
+ * Done! Let others know that we're finished.
+ */
+ WALInsertSlotRelease();
+
+ END_CRIT_SECTION();
+
+ /*
+ * Update shared LogwrtRqst.Write, if we crossed page boundary.
+ */
+ if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+
+ SpinLockAcquire(&xlogctl->info_lck);
+ /* advance global request to include new block(s) */
+ if (xlogctl->LogwrtRqst.Write < EndPos)
+ xlogctl->LogwrtRqst.Write = EndPos;
+ /* update local result copy while I have the chance */
+ LogwrtResult = xlogctl->LogwrtResult;
+ SpinLockRelease(&xlogctl->info_lck);
+ }
+
+ /*
+ * If this was an XLOG_SWITCH record, flush the record and the empty
+ * padding space that fills the rest of the segment, and perform
+ * end-of-segment actions (eg, notifying archiver).
+ */
+ if (isLogSwitch)
+ {
+ TRACE_POSTGRESQL_XLOG_SWITCH();
+ XLogFlush(EndPos);
+ /*
+ * Even though we reserved the rest of the segment for us, which is
+ * reflected in EndPos, we return a pointer to just the end of the
+ * xlog-switch record.
+ */
+ if (inserted)
+ {
+ EndPos = StartPos + SizeOfXLogRecord;
+ if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
+ {
+ if (EndPos % XLOG_SEG_SIZE == EndPos % XLOG_BLCKSZ)
+ EndPos += SizeOfXLogLongPHD;
+ else
+ EndPos += SizeOfXLogShortPHD;
+ }
+ }
+ }
+
+#ifdef WAL_DEBUG
+ if (XLOG_DEBUG)
+ {
+ StringInfoData buf;
+
+ initStringInfo(&buf);
+ appendStringInfo(&buf, "INSERT @ %X/%X: ",
+ (uint32) (EndPos >> 32), (uint32) EndPos);
+ xlog_outrec(&buf, rechdr);
+ if (rdata->data != NULL)
+ {
+ appendStringInfo(&buf, " - ");
+ RmgrTable[rechdr->xl_rmid].rm_desc(&buf, rechdr->xl_info, rdata->data);
+ }
+ elog(LOG, "%s", buf.data);
+ pfree(buf.data);
+ }
+#endif
+
+ /*
+ * Update our global variables
+ */
+ ProcLastRecPtr = StartPos;
+ XactLastRecEnd = EndPos;
+
+ return EndPos;
+}
+
+/*
+ * Reserves the right amount of space for a record of given size from the WAL.
+ * *StartPos is set to the beginning of the reserved section, *EndPos to
+ * its end+1. *PrevPtr is set to the beginning of the previous record; it is
+ * used to set the xl_prev of this record.
+ *
+ * This is the performance critical part of XLogInsert that must be serialized
+ * across backends. The rest can happen mostly in parallel. Try to keep this
+ * section as short as possible, insertpos_lck can be heavily contended on a
+ * busy system.
+ *
+ * NB: The space calculation here must match the code in CopyXLogRecordToWAL,
+ * where we actually copy the record to the reserved space.
+ */
+static void
+ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos,
+ XLogRecPtr *PrevPtr)
+{
+ volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+ uint64 startbytepos;
+ uint64 endbytepos;
+ uint64 prevbytepos;
+
+ size = MAXALIGN(size);
+
+ /* All (non xlog-switch) records should contain data. */
+ Assert(size > SizeOfXLogRecord);
+
+ /*
+ * The duration the spinlock needs to be held is minimized by minimizing
+ * the calculations that have to be done while holding the lock. The
+ * current tip of reserved WAL is kept in CurrBytePos, as a byte position
+ * that only counts "usable" bytes in WAL, that is, it excludes all WAL
+ * page headers. The mapping between "usable" byte positions and physical
+ * positions (XLogRecPtrs) can be done outside the locked region, and
+ * because the usable byte position doesn't include any headers, reserving
+ * X bytes from WAL is almost as simple as "CurrBytePos += X".
+ */
+ SpinLockAcquire(&Insert->insertpos_lck);
+
+ startbytepos = Insert->CurrBytePos;
+ endbytepos = startbytepos + size;
+ prevbytepos = Insert->PrevBytePos;
+ Insert->CurrBytePos = endbytepos;
+ Insert->PrevBytePos = startbytepos;
+
+ SpinLockRelease(&Insert->insertpos_lck);
+
+ *StartPos = XLogBytePosToRecPtr(startbytepos);
+ *EndPos = XLogBytePosToEndRecPtr(endbytepos);
+ *PrevPtr = XLogBytePosToRecPtr(prevbytepos);
+
+ /*
+ * Check that the conversions between "usable byte positions" and
+ * XLogRecPtrs work consistently in both directions.
+ */
+ Assert(XLogRecPtrToBytePos(*StartPos) == startbytepos);
+ Assert(XLogRecPtrToBytePos(*EndPos) == endbytepos);
+ Assert(XLogRecPtrToBytePos(*PrevPtr) == prevbytepos);
+}
+
+/*
+ * Like ReserveXLogInsertLocation(), but for an xlog-switch record.
+ *
+ * A log-switch record is handled slightly differently. The rest of the
+ * segment will be reserved for this insertion, as indicated by the returned
+ * *EndPos_p value. However, if we are already at the beginning of the current
+ * segment, *StartPos_p and *EndPos_p are set to the current location without
+ * reserving any space, and the function returns false.
+*/
+static bool
+ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
+{
+ volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+ uint64 startbytepos;
+ uint64 endbytepos;
+ uint64 prevbytepos;
+ uint32 size = SizeOfXLogRecord;
+ XLogRecPtr ptr;
+ uint32 segleft;
+
+ /*
+ * These calculations are a bit heavy-weight to be done while holding a
+ * spinlock, but since we're holding all the WAL insertion slots, there
+ * are no other inserters competing for it. GetXLogInsertRecPtr() does
+ * compete for it, but that's not called very frequently.
+ */
+ SpinLockAcquire(&Insert->insertpos_lck);
+
+ startbytepos = Insert->CurrBytePos;
+
+ ptr = XLogBytePosToEndRecPtr(startbytepos);
+ if (ptr % XLOG_SEG_SIZE == 0)
+ {
+ SpinLockRelease(&Insert->insertpos_lck);
+ *EndPos = *StartPos = ptr;
+ return false;
+ }
+
+ endbytepos = startbytepos + size;
+ prevbytepos = Insert->PrevBytePos;
+
+ *StartPos = XLogBytePosToRecPtr(startbytepos);
+ *EndPos = XLogBytePosToEndRecPtr(endbytepos);
+
+ segleft = XLOG_SEG_SIZE - ((*EndPos) % XLOG_SEG_SIZE);
+ if (segleft != XLOG_SEG_SIZE)
+ {
+ /* consume the rest of the segment */
+ *EndPos += segleft;
+ endbytepos = XLogRecPtrToBytePos(*EndPos);
+ }
+ Insert->CurrBytePos = endbytepos;
+ Insert->PrevBytePos = startbytepos;
+
+ SpinLockRelease(&Insert->insertpos_lck);
+
+ *PrevPtr = XLogBytePosToRecPtr(prevbytepos);
+
+ Assert((*EndPos) % XLOG_SEG_SIZE == 0);
+ Assert(XLogRecPtrToBytePos(*EndPos) == endbytepos);
+ Assert(XLogRecPtrToBytePos(*StartPos) == startbytepos);
+ Assert(XLogRecPtrToBytePos(*PrevPtr) == prevbytepos);
+
+ return true;
+}
+
+/*
+ * Subroutine of XLogInsert. Copies a WAL record to an already-reserved
+ * area in the WAL.
+ */
+static void
+CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
+ XLogRecPtr StartPos, XLogRecPtr EndPos)
+{
+ char *currpos;
+ int freespace;
+ int written;
+ XLogRecPtr CurrPos;
+ XLogPageHeader pagehdr;
+
+ /* The first chunk is the record header */
+ Assert(rdata->len == SizeOfXLogRecord);
+
+ /*
+ * Get a pointer to the right place in the right WAL buffer to start
+ * inserting to.
+ */
+ CurrPos = StartPos;
+ currpos = GetXLogBuffer(CurrPos);
+ freespace = INSERT_FREESPACE(CurrPos);
+
+ /*
+ * there should be enough space for at least the first field (xl_tot_len)
+ * on this page.
+ */
+ Assert(freespace >= sizeof(uint32));
+
+ /* Copy record data */
+ written = 0;
+ while (rdata != NULL)
+ {
+ char *rdata_data = rdata->data;
+ int rdata_len = rdata->len;
+
+ while (rdata_len > freespace)
+ {
+ /*
+ * Write what fits on this page, and continue on the next page.
+ */
+ Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || freespace == 0);
+ memcpy(currpos, rdata_data, freespace);
+ rdata_data += freespace;
+ rdata_len -= freespace;
+ written += freespace;
+ CurrPos += freespace;
+
+ /*
+ * Get pointer to beginning of next page, and set the xlp_rem_len
+ * in the page header. Set XLP_FIRST_IS_CONTRECORD.
+ *
+ * It's safe to set the contrecord flag and xlp_rem_len without a
+ * lock on the page. All the other flags were already set when the
+ * page was initialized, in AdvanceXLInsertBuffer, and we're the
+ * only backend that needs to set the contrecord flag.
+ */
+ currpos = GetXLogBuffer(CurrPos);
+ pagehdr = (XLogPageHeader) currpos;
+ pagehdr->xlp_rem_len = write_len - written;
+ pagehdr->xlp_info |= XLP_FIRST_IS_CONTRECORD;
+
+ /* skip over the page header */
+ if (CurrPos % XLogSegSize == 0)
+ {
+ CurrPos += SizeOfXLogLongPHD;
+ currpos += SizeOfXLogLongPHD;
+ }
+ else
+ {
+ CurrPos += SizeOfXLogShortPHD;
+ currpos += SizeOfXLogShortPHD;
+ }
+ freespace = INSERT_FREESPACE(CurrPos);
+ }
+
+ Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || rdata_len == 0);
+ memcpy(currpos, rdata_data, rdata_len);
+ currpos += rdata_len;
+ CurrPos += rdata_len;
+ freespace -= rdata_len;
+ written += rdata_len;
+
+ rdata = rdata->next;
+ }
+ Assert(written == write_len);
+
+ /* Align the end position, so that the next record starts aligned */
+ CurrPos = MAXALIGN(CurrPos);
+
+ /*
+ * If this was an xlog-switch, it's not enough to write the switch record,
+ * we also have to consume all the remaining space in the WAL segment.
+ * We have already reserved it for us, but we still need to make sure it's
+ * allocated and zeroed in the WAL buffers so that when the caller (or
+ * someone else) does XLogWrite(), it can really write out all the zeros.
+ */
+ if (isLogSwitch && CurrPos % XLOG_SEG_SIZE != 0)
+ {
+ /* An xlog-switch record doesn't contain any data besides the header */
+ Assert(write_len == SizeOfXLogRecord);
+
+ /*
+ * We do this one page at a time, to make sure we don't deadlock
+ * against ourselves if wal_buffers < XLOG_SEG_SIZE.
+ */
+ Assert(EndPos % XLogSegSize == 0);
+
+ /* Use up all the remaining space on the first page */
+ CurrPos += freespace;
+
+ while (CurrPos < EndPos)
+ {
+ /* initialize the next page (if not initialized already) */
+ WakeupWaiters(CurrPos);
+ AdvanceXLInsertBuffer(CurrPos, false);
+ CurrPos += XLOG_BLCKSZ;
+ }
+ }
+
+ if (CurrPos != EndPos)
+ elog(PANIC, "space reserved for WAL record does not match what was written");
+}
+
+/*
+ * Allocate a slot for insertion.
+ *
+ * In exclusive mode, all slots are reserved for the current process. That
+ * blocks all concurrent insertions.
+ */
+static void
+WALInsertSlotAcquire(bool exclusive)
+{
+ int i;
+
+ if (exclusive)
+ {
+ for (i = 0; i < num_xloginsert_slots; i++)
+ WALInsertSlotAcquireOne(i);
+ holdingAllSlots = true;
+ }
+ else
+ WALInsertSlotAcquireOne(-1);
+}
+
+/*
+ * Workhorse of WALInsertSlotAcquire. Acquires the given slot, or an arbitrary
+ * one if slotno == -1. The index of the slot that was acquired is stored in
+ * MySlotNo.
+ *
+ * This is more or less equivalent to LWLockAcquire().
+ */
+static void
+WALInsertSlotAcquireOne(int slotno)
+{
+ volatile XLogInsertSlot *slot;
+ PGPROC *proc = MyProc;
+ bool retry = false;
+ int extraWaits = 0;
+ static int slotToTry = -1;
+
+ /*
+ * Try to use the slot we used last time. If the system isn't particularly
+ * busy, it's a good bet that it's available, and it's good to have some
+ * affinity to a particular slot so that you don't unnecessarily bounce
+ * cache lines between processes when there is no contention.
+ *
+ * If this is the first time through in this backend, pick a slot
+ * (semi-)randomly. This allows the slots to be used evenly if you have a
+ * lot of very short connections.
+ */
+ if (slotno != -1)
+ MySlotNo = slotno;
+ else
+ {
+ if (slotToTry == -1)
+ slotToTry = MyProc->pgprocno % num_xloginsert_slots;
+ MySlotNo = slotToTry;
+ }
+
+ /*
+ * We can't wait if we haven't got a PGPROC. This should only occur
+ * during bootstrap or shared memory initialization. Put an Assert here
+ * to catch unsafe coding practices.
+ */
+ Assert(MyProc != NULL);
+
+ /*
+ * Lock out cancel/die interrupts until we exit the code section protected
+ * by the slot. This ensures that interrupts will not interfere with
+ * manipulations of data structures in shared memory.
+ */
+ START_CRIT_SECTION();
+
+ /*
+ * Loop here to try to acquire slot after each time we are signaled by
+ * WALInsertSlotRelease.
+ */
+ for (;;)
+ {
+ bool mustwait;
+
+ slot = &XLogCtl->Insert.insertSlots[MySlotNo].slot;
+
+ /* Acquire mutex. Time spent holding mutex should be short! */
+ SpinLockAcquire(&slot->mutex);
+
+ /* If retrying, allow WALInsertSlotRelease to release waiters again */
+ if (retry)
+ slot->releaseOK = true;
+
+ /* If I can get the slot, do so quickly. */
+ if (slot->exclusive == 0)
+ {
+ slot->exclusive++;
+ mustwait = false;
+ }
+ else
+ mustwait = true;
+
+ if (!mustwait)
+ break; /* got the lock */
+
+ Assert(slot->owner != MyProc);
+
+ /*
+ * Add myself to wait queue.
+ */
+ proc->lwWaiting = true;
+ proc->lwWaitMode = LW_EXCLUSIVE;
+ proc->lwWaitLink = NULL;
+ if (slot->head == NULL)
+ slot->head = proc;
+ else
+ slot->tail->lwWaitLink = proc;
+ slot->tail = proc;
+
+ /* Can release the mutex now */
+ SpinLockRelease(&slot->mutex);
+
+ /*
+ * Wait until awakened.
+ *
+ * Since we share the process wait semaphore with the regular lock
+ * manager and ProcWaitForSignal, and we may need to acquire a slot
+ * while one of those is pending, it is possible that we get awakened
+ * for a reason other than being signaled by WALInsertSlotRelease. If
+ * so, loop back and wait again. Once we've gotten the slot,
+ * re-increment the sema by the number of additional signals received,
+ * so that the lock manager or signal manager will see the received
+ * signal when it next waits.
+ */
+ for (;;)
+ {
+ /* "false" means cannot accept cancel/die interrupt here. */
+ PGSemaphoreLock(&proc->sem, false);
+ if (!proc->lwWaiting)
+ break;
+ extraWaits++;
+ }
+
+ /* Now loop back and try to acquire lock again. */
+ retry = true;
+ }
+
+ slot->owner = proc;
+
+ /*
+ * Normally, we initialize the xlogInsertingAt value of the slot to 1,
+ * because we don't yet know where in the WAL we're going to insert. It's
+ * not critical what it points to right now - leaving it to a too small
+ * value just means that WaitXlogInsertionsToFinish() might wait on us
+ * unnecessarily, until we update the value (when we finish the insert or
+ * move to next page).
+ *
+ * If we're grabbing all the slots, however, stamp all but the last one
+ * with InvalidXLogRecPtr, meaning there is no insert in progress. The last
+ * slot is the one that we will update as we proceed with the insert, the
+ * rest are held just to keep off other inserters.
+ */
+ if (slotno != -1 && slotno != num_xloginsert_slots - 1)
+ slot->xlogInsertingAt = InvalidXLogRecPtr;
+ else
+ slot->xlogInsertingAt = 1;
+
+ /* We are done updating shared state of the slot itself. */
+ SpinLockRelease(&slot->mutex);
+
+ /*
+ * Fix the process wait semaphore's count for any absorbed wakeups.
+ */
+ while (extraWaits-- > 0)
+ PGSemaphoreUnlock(&proc->sem);
+
+ /*
+ * If we couldn't get the slot immediately, try another slot next time.
+ * On a system with more insertion slots than concurrent inserters, this
+ * causes all the inserters to eventually migrate to a slot that no-one
+ * else is using. On a system with more inserters than slots, it still
+ * causes the inserters to be distributed quite evenly across the slots.
+ */
+ if (slotno != -1 && retry)
+ slotToTry = (slotToTry + 1) % num_xloginsert_slots;
+}
+
+/*
+ * Wait for the given slot to become free, or for its xlogInsertingAt location
+ * to change to something else than 'waitptr'. In other words, wait for the
+ * inserter using the given slot to finish its insertion, or to at least make
+ * some progress.
+ */
+static void
+WaitOnSlot(volatile XLogInsertSlot *slot, XLogRecPtr waitptr)
+{
+ PGPROC *proc = MyProc;
+ int extraWaits = 0;
+
+ /*
+ * Lock out cancel/die interrupts while we sleep on the slot. There is
+ * no cleanup mechanism to remove us from the wait queue if we got
+ * interrupted.
+ */
+ HOLD_INTERRUPTS();
+
+ /*
+ * Loop here to try to acquire lock after each time we are signaled.
+ */
+ for (;;)
+ {
+ bool mustwait;
+
+ /* Acquire mutex. Time spent holding mutex should be short! */
+ SpinLockAcquire(&slot->mutex);
+
+ /* If I can get the lock, do so quickly. */
+ if (slot->exclusive == 0 || slot->xlogInsertingAt != waitptr)
+ mustwait = false;
+ else
+ mustwait = true;
+
+ if (!mustwait)
+ break; /* the lock was free */
+
+ Assert(slot->owner != MyProc);
+
+ /*
+ * Add myself to wait queue.
+ */
+ proc->lwWaiting = true;
+ proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
+ proc->lwWaitLink = NULL;
+
+ /* waiters are added to the front of the queue */
+ proc->lwWaitLink = slot->head;
+ if (slot->head == NULL)
+ slot->tail = proc;
+ slot->head = proc;
+
+ /* Can release the mutex now */
+ SpinLockRelease(&slot->mutex);
+
+ /*
+ * Wait until awakened.
+ *
+ * Since we share the process wait semaphore with other things, like
+ * the regular lock manager and ProcWaitForSignal, and we may need to
+ * acquire an LWLock while one of those is pending, it is possible that
+ * we get awakened for a reason other than being signaled by
+ * LWLockRelease. If so, loop back and wait again. Once we've gotten
+ * the LWLock, re-increment the sema by the number of additional
+ * signals received, so that the lock manager or signal manager will
+ * see the received signal when it next waits.
+ */
+ for (;;)
+ {
+ /* "false" means cannot accept cancel/die interrupt here. */
+ PGSemaphoreLock(&proc->sem, false);
+ if (!proc->lwWaiting)
+ break;
+ extraWaits++;
+ }
+
+ /* Now loop back and try to acquire lock again. */
+ }
+
+ /* We are done updating shared state of the lock itself. */
+ SpinLockRelease(&slot->mutex);
+
+ /*
+ * Fix the process wait semaphore's count for any absorbed wakeups.
+ */
+ while (extraWaits-- > 0)
+ PGSemaphoreUnlock(&proc->sem);
+
+ /*
+ * Now okay to allow cancel/die interrupts.
+ */
+ RESUME_INTERRUPTS();
+}
+
+/*
+ * Wake up all processes waiting for us with WaitOnSlot(). Sets our
+ * xlogInsertingAt value to EndPos, without releasing the slot.
+ */
+static void
+WakeupWaiters(XLogRecPtr EndPos)
+{
+ volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[MySlotNo].slot;
+ PGPROC *head;
+ PGPROC *proc;
+ PGPROC *next;
+
+ /*
+ * If we have already reported progress up to the same point, do nothing.
+ * No other process can modify xlogInsertingAt, so we can check this before
+ * grabbing the spinlock.
+ */
+ if (slot->xlogInsertingAt == EndPos)
+ return;
+ /* xlogInsertingAt should not go backwards */
+ Assert(slot->xlogInsertingAt < EndPos);
+
+ /* Acquire mutex. Time spent holding mutex should be short! */
+ SpinLockAcquire(&slot->mutex);
+
+ /* we should own the slot */
+ Assert(slot->exclusive == 1 && slot->owner == MyProc);
+
+ slot->xlogInsertingAt = EndPos;
+
+ /*
+ * See if there are any waiters that need to be woken up.
+ */
+ head = slot->head;
+
+ if (head != NULL)
+ {
+ proc = head;
+
+ /* LW_WAIT_UNTIL_FREE waiters are always in the front of the queue */
+ next = proc->lwWaitLink;
+ while (next && next->lwWaitMode == LW_WAIT_UNTIL_FREE)
+ {
+ proc = next;
+ next = next->lwWaitLink;
+ }
+
+ /* proc is now the last PGPROC to be released */
+ slot->head = next;
+ proc->lwWaitLink = NULL;
+ }
+
+ /* We are done updating shared state of the lock itself. */
+ SpinLockRelease(&slot->mutex);
+
+ /*
+ * Awaken any waiters I removed from the queue.
+ */
+ while (head != NULL)
+ {
+ proc = head;
+ head = proc->lwWaitLink;
+ proc->lwWaitLink = NULL;
+ proc->lwWaiting = false;
+ PGSemaphoreUnlock(&proc->sem);
+ }
+}
+
+/*
+ * Release our insertion slot (or slots, if we're holding them all).
+ */
+static void
+WALInsertSlotRelease(void)
+{
+ int i;
+
+ if (holdingAllSlots)
+ {
+ for (i = 0; i < num_xloginsert_slots; i++)
+ WALInsertSlotReleaseOne(i);
+ holdingAllSlots = false;
+ }
+ else
+ WALInsertSlotReleaseOne(MySlotNo);
+}
+
+static void
+WALInsertSlotReleaseOne(int slotno)
+{
+ volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[slotno].slot;
+ PGPROC *head;
+ PGPROC *proc;
+
+ /* Acquire mutex. Time spent holding mutex should be short! */
+ SpinLockAcquire(&slot->mutex);
+
+ /* we must be holding it */
+ Assert(slot->exclusive == 1 && slot->owner == MyProc);
+
+ slot->xlogInsertingAt = InvalidXLogRecPtr;
+
+ /* Release my hold on the slot */
+ slot->exclusive = 0;
+ slot->owner = NULL;
+
+ /*
+ * See if I need to awaken any waiters..
+ */
+ head = slot->head;
+ if (head != NULL)
+ {
+ if (slot->releaseOK)
+ {
+ /*
+ * Remove the to-be-awakened PGPROCs from the queue.
+ */
+ bool releaseOK = true;
+
+ proc = head;
+
+ /*
+ * First wake up any backends that want to be woken up without
+ * acquiring the lock. These are always in the front of the queue.
+ */
+ while (proc->lwWaitMode == LW_WAIT_UNTIL_FREE && proc->lwWaitLink)
+ proc = proc->lwWaitLink;
+
+ /*
+ * Awaken the first exclusive-waiter, if any.
+ */
+ if (proc->lwWaitLink)
+ {
+ Assert(proc->lwWaitLink->lwWaitMode == LW_EXCLUSIVE);
+ proc = proc->lwWaitLink;
+ releaseOK = false;
+ }
+ /* proc is now the last PGPROC to be released */
+ slot->head = proc->lwWaitLink;
+ proc->lwWaitLink = NULL;
+
+ slot->releaseOK = releaseOK;
+ }
+ else
+ head = NULL;
+ }
+
+ /* We are done updating shared state of the slot itself. */
+ SpinLockRelease(&slot->mutex);
+
+ /*
+ * Awaken any waiters I removed from the queue.
+ */
+ while (head != NULL)
+ {
+ proc = head;
+ head = proc->lwWaitLink;
+ proc->lwWaitLink = NULL;
+ proc->lwWaiting = false;
+ PGSemaphoreUnlock(&proc->sem);
+ }
+
+ /*
+ * Now okay to allow cancel/die interrupts.
+ */
+ END_CRIT_SECTION();
+}
+
+
+/*
+ * Wait for any WAL insertions < upto to finish.
+ *
+ * Returns the location of the oldest insertion that is still in-progress.
+ * Any WAL prior to that point has been fully copied into WAL buffers, and
+ * can be flushed out to disk. Because this waits for any insertions older
+ * than 'upto' to finish, the return value is always >= 'upto'.
+ *
+ * Note: When you are about to write out WAL, you must call this function
+ * *before* acquiring WALWriteLock, to avoid deadlocks. This function might
+ * need to wait for an insertion to finish (or at least advance to next
+ * uninitialized page), and the inserter might need to evict an old WAL buffer
+ * to make room for a new one, which in turn requires WALWriteLock.
+ */
+static XLogRecPtr
+WaitXLogInsertionsToFinish(XLogRecPtr upto)
+{
+ uint64 bytepos;
+ XLogRecPtr reservedUpto;
+ XLogRecPtr finishedUpto;
+ volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+ int i;
+
+ if (MyProc == NULL)
+ elog(PANIC, "cannot wait without a PGPROC structure");
+
+ /* Read the current insert position */
+ SpinLockAcquire(&Insert->insertpos_lck);
+ bytepos = Insert->CurrBytePos;
+ SpinLockRelease(&Insert->insertpos_lck);
+ reservedUpto = XLogBytePosToEndRecPtr(bytepos);
+
+ /*
+ * No-one should request to flush a piece of WAL that hasn't even been
+ * reserved yet. However, it can happen if there is a block with a bogus
+ * LSN on disk, for example. XLogFlush checks for that situation and
+ * complains, but only after the flush. Here we just assume that to mean
+ * that all WAL that has been reserved needs to be finished. In this
+ * corner-case, the return value can be smaller than 'upto' argument.
+ */
+ if (upto > reservedUpto)
+ {
+ elog(LOG, "request to flush past end of generated WAL; request %X/%X, currpos %X/%X",
+ (uint32) (upto >> 32), (uint32) upto,
+ (uint32) (reservedUpto >> 32), (uint32) reservedUpto);
+ upto = reservedUpto;
+ }
+
+ /*
+ * finishedUpto is our return value, indicating the point upto which
+ * all the WAL insertions have been finished. Initialize it to the head
+ * of reserved WAL, and as we iterate through the insertion slots, back it
+ * out for any insertion that's still in progress.
+ */
+ finishedUpto = reservedUpto;
+
+ /*
+ * Loop through all the slots, sleeping on any in-progress insert older
+ * than 'upto'.
+ */
+ for (i = 0; i < num_xloginsert_slots; i++)
+ {
+ volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[i].slot;
+ XLogRecPtr insertingat;
+
+ retry:
+ /*
+ * We can check if the slot is in use without grabbing the spinlock.
+ * The spinlock acquisition of insertpos_lck before this loop acts
+ * as a memory barrier. If someone acquires the slot after that, it
+ * can't possibly be inserting to anything < reservedUpto. If it was
+ * acquired before that, an unlocked test will return true.
+ */
+ if (!slot->exclusive)
+ continue;
+
+ SpinLockAcquire(&slot->mutex);
+ /* re-check now that we have the lock */
+ if (!slot->exclusive)
+ {
+ SpinLockRelease(&slot->mutex);
+ continue;
+ }
+ insertingat = slot->xlogInsertingAt;
+ SpinLockRelease(&slot->mutex);
+
+ if (insertingat == InvalidXLogRecPtr)
+ {
+ /*
+ * slot is reserved just to hold off other inserters, there is no
+ * actual insert in progress.
+ */
+ continue;
+ }
+
+ /*
+ * This insertion is still in progress. Do we need to wait for it?
+ *
+ * When an inserter acquires a slot, it doesn't reset 'insertingat', so
+ * it will initially point to the old value of some already-finished
+ * insertion. The inserter will update the value as soon as it finishes
+ * the insertion, moves to the next page, or has to do I/O to flush an
+ * old dirty buffer. That means that when we see a slot with
+ * insertingat value < upto, we don't know if that insertion is still
+ * truly in progress, or if the slot is reused by a new inserter that
+ * hasn't updated the insertingat value yet. We have to assume it's the
+ * latter, and wait.
+ */
+ if (insertingat < upto)
+ {
+ WaitOnSlot(slot, insertingat);
+ goto retry;
+ }
+ else
+ {
+ /*
+ * We don't need to wait for this insertion, but update the
+ * return value.
+ */
+ if (insertingat < finishedUpto)
+ finishedUpto = insertingat;
+ }
+ }
+ return finishedUpto;
+}
+
+/*
+ * Get a pointer to the right location in the WAL buffer containing the
+ * given XLogRecPtr.
+ *
+ * If the page is not initialized yet, it is initialized. That might require
+ * evicting an old dirty buffer from the buffer cache, which means I/O.
+ *
+ * The caller must ensure that the page containing the requested location
+ * isn't evicted yet, and won't be evicted. The way to ensure that is to
+ * hold onto an XLogInsertSlot with the xlogInsertingAt position set to
+ * something <= ptr. GetXLogBuffer() will update xlogInsertingAt if it needs
+ * to evict an old page from the buffer. (This means that once you call
+ * GetXLogBuffer() with a given 'ptr', you must not access anything before
+ * that point anymore, and must not call GetXLogBuffer() with an older 'ptr'
+ * later, because older buffers might be recycled already)
+ */
+static char *
+GetXLogBuffer(XLogRecPtr ptr)
+{
+ int idx;
+ XLogRecPtr endptr;
+ static uint64 cachedPage = 0;
+ static char *cachedPos = NULL;
+ XLogRecPtr expectedEndPtr;
+
+ /*
+ * Fast path for the common case that we need to access again the same
+ * page as last time.
*/
- updrqst = false;
- freespace = INSERT_FREESPACE(Insert);
- if (freespace == 0)
+ if (ptr / XLOG_BLCKSZ == cachedPage)
{
- updrqst = AdvanceXLInsertBuffer(false);
- freespace = INSERT_FREESPACE(Insert);
+ Assert(((XLogPageHeader) cachedPos)->xlp_magic == XLOG_PAGE_MAGIC);
+ Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr == ptr - (ptr % XLOG_BLCKSZ));
+ return cachedPos + ptr % XLOG_BLCKSZ;
}
- /* Compute record's XLOG location */
- curridx = Insert->curridx;
- INSERT_RECPTR(RecPtr, Insert, curridx);
+ /*
+ * The XLog buffer cache is organized so that a page is always loaded
+ * to a particular buffer. That way we can easily calculate the buffer
+ * a given page must be loaded into, from the XLogRecPtr alone.
+ */
+ idx = XLogRecPtrToBufIdx(ptr);
/*
- * If the record is an XLOG_SWITCH, and we are exactly at the start of a
- * segment, we need not insert it (and don't want to because we'd like
- * consecutive switch requests to be no-ops). Instead, make sure
- * everything is written and flushed through the end of the prior segment,
- * and return the prior segment's end address.
+ * See what page is loaded in the buffer at the moment. It could be the
+ * page we're looking for, or something older. It can't be anything newer
+ * - that would imply the page we're looking for has already been written
+ * out to disk and evicted, and the caller is responsible for making sure
+ * that doesn't happen.
+ *
+ * However, we don't hold a lock while we read the value. If someone has
+ * just initialized the page, it's possible that we get a "torn read" of
+ * the XLogRecPtr if 64-bit fetches are not atomic on this platform. In
+ * that case we will see a bogus value. That's ok, we'll grab the mapping
+ * lock (in AdvanceXLInsertBuffer) and retry if we see anything else than
+ * the page we're looking for. But it means that when we do this unlocked
+ * read, we might see a value that appears to be ahead of the page we're
+ * looking for. Don't PANIC on that, until we've verified the value while
+ * holding the lock.
*/
- if (isLogSwitch && (RecPtr % XLogSegSize) == SizeOfXLogLongPHD)
+ expectedEndPtr = ptr;
+ expectedEndPtr += XLOG_BLCKSZ - ptr % XLOG_BLCKSZ;
+
+ endptr = XLogCtl->xlblocks[idx];
+ if (expectedEndPtr != endptr)
{
- /* We can release insert lock immediately */
- LWLockRelease(WALInsertLock);
+ /*
+ * Let others know that we're finished inserting the record up
+ * to the page boundary.
+ */
+ WakeupWaiters(expectedEndPtr - XLOG_BLCKSZ);
- RecPtr -= SizeOfXLogLongPHD;
+ AdvanceXLInsertBuffer(ptr, false);
+ endptr = XLogCtl->xlblocks[idx];
- LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
- LogwrtResult = XLogCtl->LogwrtResult;
- if (LogwrtResult.Flush < RecPtr)
- {
- XLogwrtRqst FlushRqst;
+ if (expectedEndPtr != endptr)
+ elog(PANIC, "could not find WAL buffer for %X/%X",
+ (uint32) (ptr >> 32) , (uint32) ptr);
+ }
+ else
+ {
+ /*
+ * Make sure the initialization of the page is visible to us, and
+ * won't arrive later to overwrite the WAL data we write on the page.
+ */
+ pg_memory_barrier();
+ }
- FlushRqst.Write = RecPtr;
- FlushRqst.Flush = RecPtr;
- XLogWrite(FlushRqst, false, false);
- }
- LWLockRelease(WALWriteLock);
+ /*
+ * Found the buffer holding this page. Return a pointer to the right
+ * offset within the page.
+ */
+ cachedPage = ptr / XLOG_BLCKSZ;
+ cachedPos = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ;
- END_CRIT_SECTION();
+ Assert(((XLogPageHeader) cachedPos)->xlp_magic == XLOG_PAGE_MAGIC);
+ Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr == ptr - (ptr % XLOG_BLCKSZ));
- /* wake up walsenders now that we've released heavily contended locks */
- WalSndWakeupProcessRequests();
- return RecPtr;
- }
+ return cachedPos + ptr % XLOG_BLCKSZ;
+}
- /* Finish the record header */
- rechdr->xl_prev = Insert->PrevRecord;
+/*
+ * Converts a "usable byte position" to XLogRecPtr. A usable byte position
+ * is the position starting from the beginning of WAL, excluding all WAL
+ * page headers.
+ */
+static XLogRecPtr
+XLogBytePosToRecPtr(uint64 bytepos)
+{
+ uint64 fullsegs;
+ uint64 fullpages;
+ uint64 bytesleft;
+ uint32 seg_offset;
+ XLogRecPtr result;
- /* Now we can finish computing the record's CRC */
- COMP_CRC32(rdata_crc, (char *) rechdr, offsetof(XLogRecord, xl_crc));
- FIN_CRC32(rdata_crc);
- rechdr->xl_crc = rdata_crc;
+ fullsegs = bytepos / UsableBytesInSegment;
+ bytesleft = bytepos % UsableBytesInSegment;
-#ifdef WAL_DEBUG
- if (XLOG_DEBUG)
+ if (bytesleft < XLOG_BLCKSZ - SizeOfXLogLongPHD)
{
- StringInfoData buf;
-
- initStringInfo(&buf);
- appendStringInfo(&buf, "INSERT @ %X/%X: ",
- (uint32) (RecPtr >> 32), (uint32) RecPtr);
- xlog_outrec(&buf, rechdr);
- if (rdata->data != NULL)
- {
- appendStringInfo(&buf, " - ");
- RmgrTable[rechdr->xl_rmid].rm_desc(&buf, rechdr->xl_info, rdata->data);
- }
- elog(LOG, "%s", buf.data);
- pfree(buf.data);
+ /* fits on first page of segment */
+ seg_offset = bytesleft + SizeOfXLogLongPHD;
}
-#endif
-
- /* Record begin of record in appropriate places */
- ProcLastRecPtr = RecPtr;
- Insert->PrevRecord = RecPtr;
-
- /*
- * Append the data, including backup blocks if any
- */
- rdata = &hdr_rdt;
- while (write_len)
+ else
{
- while (rdata->data == NULL)
- rdata = rdata->next;
+ /* account for the first page on segment with long header */
+ seg_offset = XLOG_BLCKSZ;
+ bytesleft -= XLOG_BLCKSZ - SizeOfXLogLongPHD;
- if (freespace > 0)
- {
- if (rdata->len > freespace)
- {
- memcpy(Insert->currpos, rdata->data, freespace);
- rdata->data += freespace;
- rdata->len -= freespace;
- write_len -= freespace;
- }
- else
- {
- memcpy(Insert->currpos, rdata->data, rdata->len);
- freespace -= rdata->len;
- write_len -= rdata->len;
- Insert->currpos += rdata->len;
- rdata = rdata->next;
- continue;
- }
- }
+ fullpages = bytesleft / UsableBytesInPage;
+ bytesleft = bytesleft % UsableBytesInPage;
- /* Use next buffer */
- updrqst = AdvanceXLInsertBuffer(false);
- curridx = Insert->curridx;
- /* Mark page header to indicate this record continues on the page */
- Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
- Insert->currpage->xlp_rem_len = write_len;
- freespace = INSERT_FREESPACE(Insert);
+ seg_offset += fullpages * XLOG_BLCKSZ + bytesleft + SizeOfXLogShortPHD;
}
- /* Ensure next record will be properly aligned */
- Insert->currpos = (char *) Insert->currpage +
- MAXALIGN(Insert->currpos - (char *) Insert->currpage);
- freespace = INSERT_FREESPACE(Insert);
+ XLogSegNoOffsetToRecPtr(fullsegs, seg_offset, result);
- /*
- * The recptr I return is the beginning of the *next* record. This will be
- * stored as LSN for changed data pages...
- */
- INSERT_RECPTR(RecPtr, Insert, curridx);
+ return result;
+}
- /*
- * If the record is an XLOG_SWITCH, we must now write and flush all the
- * existing data, and then forcibly advance to the start of the next
- * segment. It's not good to do this I/O while holding the insert lock,
- * but there seems too much risk of confusion if we try to release the
- * lock sooner. Fortunately xlog switch needn't be a high-performance
- * operation anyway...
- */
- if (isLogSwitch)
+/*
+ * Like XLogBytePosToRecPtr, but if the position is at a page boundary,
+ * returns a pointer to the beginning of the page (ie. before page header),
+ * not to where the first xlog record on that page would go to. This is used
+ * when converting a pointer to the end of a record.
+ */
+static XLogRecPtr
+XLogBytePosToEndRecPtr(uint64 bytepos)
+{
+ uint64 fullsegs;
+ uint64 fullpages;
+ uint64 bytesleft;
+ uint32 seg_offset;
+ XLogRecPtr result;
+
+ fullsegs = bytepos / UsableBytesInSegment;
+ bytesleft = bytepos % UsableBytesInSegment;
+
+ if (bytesleft < XLOG_BLCKSZ - SizeOfXLogLongPHD)
+ {
+ /* fits on first page of segment */
+ if (bytesleft == 0)
+ seg_offset = 0;
+ else
+ seg_offset = bytesleft + SizeOfXLogLongPHD;
+ }
+ else
{
- XLogwrtRqst FlushRqst;
- XLogRecPtr OldSegEnd;
+ /* account for the first page on segment with long header */
+ seg_offset = XLOG_BLCKSZ;
+ bytesleft -= XLOG_BLCKSZ - SizeOfXLogLongPHD;
- TRACE_POSTGRESQL_XLOG_SWITCH();
+ fullpages = bytesleft / UsableBytesInPage;
+ bytesleft = bytesleft % UsableBytesInPage;
- LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
+ if (bytesleft == 0)
+ seg_offset += fullpages * XLOG_BLCKSZ + bytesleft;
+ else
+ seg_offset += fullpages * XLOG_BLCKSZ + bytesleft + SizeOfXLogShortPHD;
+ }
- /*
- * Flush through the end of the page containing XLOG_SWITCH, and
- * perform end-of-segment actions (eg, notifying archiver).
- */
- WriteRqst = XLogCtl->xlblocks[curridx];
- FlushRqst.Write = WriteRqst;
- FlushRqst.Flush = WriteRqst;
- XLogWrite(FlushRqst, false, true);
+ XLogSegNoOffsetToRecPtr(fullsegs, seg_offset, result);
- /* Set up the next buffer as first page of next segment */
- /* Note: AdvanceXLInsertBuffer cannot need to do I/O here */
- (void) AdvanceXLInsertBuffer(true);
+ return result;
+}
- /* There should be no unwritten data */
- curridx = Insert->curridx;
- Assert(curridx == XLogCtl->Write.curridx);
+/*
+ * Convert an XLogRecPtr to a "usable byte position".
+ */
+static uint64
+XLogRecPtrToBytePos(XLogRecPtr ptr)
+{
+ uint64 fullsegs;
+ uint32 fullpages;
+ uint32 offset;
+ uint64 result;
- /* Compute end address of old segment */
- OldSegEnd = XLogCtl->xlblocks[curridx];
- OldSegEnd -= XLOG_BLCKSZ;
+ XLByteToSeg(ptr, fullsegs);
- /* Make it look like we've written and synced all of old segment */
- LogwrtResult.Write = OldSegEnd;
- LogwrtResult.Flush = OldSegEnd;
+ fullpages = (ptr % XLOG_SEG_SIZE) / XLOG_BLCKSZ;
+ offset = ptr % XLOG_BLCKSZ;
- /*
- * Update shared-memory status --- this code should match XLogWrite
- */
+ if (fullpages == 0)
+ {
+ result = fullsegs * UsableBytesInSegment;
+ if (offset > 0)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
-
- SpinLockAcquire(&xlogctl->info_lck);
- xlogctl->LogwrtResult = LogwrtResult;
- if (xlogctl->LogwrtRqst.Write < LogwrtResult.Write)
- xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
- if (xlogctl->LogwrtRqst.Flush < LogwrtResult.Flush)
- xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
- SpinLockRelease(&xlogctl->info_lck);
+ Assert(offset >= SizeOfXLogLongPHD);
+ result += offset - SizeOfXLogLongPHD;
}
-
- LWLockRelease(WALWriteLock);
-
- updrqst = false; /* done already */
}
else
{
- /* normal case, ie not xlog switch */
-
- /* Need to update shared LogwrtRqst if some block was filled up */
- if (freespace == 0)
- {
- /* curridx is filled and available for writing out */
- updrqst = true;
- }
- else
+ result = fullsegs * UsableBytesInSegment +
+ (XLOG_BLCKSZ - SizeOfXLogLongPHD) + /* account for first page */
+ (fullpages - 1) * UsableBytesInPage; /* full pages */
+ if (offset > 0)
{
- /* if updrqst already set, write through end of previous buf */
- curridx = PrevBufIdx(curridx);
+ Assert(offset >= SizeOfXLogShortPHD);
+ result += offset - SizeOfXLogShortPHD;
}
- WriteRqst = XLogCtl->xlblocks[curridx];
- }
-
- LWLockRelease(WALInsertLock);
-
- if (updrqst)
- {
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
-
- SpinLockAcquire(&xlogctl->info_lck);
- /* advance global request to include new block(s) */
- if (xlogctl->LogwrtRqst.Write < WriteRqst)
- xlogctl->LogwrtRqst.Write = WriteRqst;
- /* update local result copy while I have the chance */
- LogwrtResult = xlogctl->LogwrtResult;
- SpinLockRelease(&xlogctl->info_lck);
}
- XactLastRecEnd = RecPtr;
-
- END_CRIT_SECTION();
-
- /* wake up walsenders now that we've released heavily contended locks */
- WalSndWakeupProcessRequests();
-
- return RecPtr;
+ return result;
}
/*
}
/*
- * Advance the Insert state to the next buffer page, writing out the next
- * buffer if it still contains unwritten data.
- *
- * If new_segment is TRUE then we set up the next buffer page as the first
- * page of the next xlog segment file, possibly but not usually the next
- * consecutive file page.
- *
- * The global LogwrtRqst.Write pointer needs to be advanced to include the
- * just-filled page. If we can do this for free (without an extra lock),
- * we do so here. Otherwise the caller must do it. We return TRUE if the
- * request update still needs to be done, FALSE if we did it internally.
- *
- * Must be called with WALInsertLock held.
+ * Initialize XLOG buffers, writing out old buffers if they still contain
+ * unwritten data, upto the page containing 'upto'. Or if 'opportunistic' is
+ * true, initialize as many pages as we can without having to write out
+ * unwritten data. Any new pages are initialized to zeros, with pages headers
+ * initialized properly.
*/
-static bool
-AdvanceXLInsertBuffer(bool new_segment)
+static void
+AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
{
XLogCtlInsert *Insert = &XLogCtl->Insert;
- int nextidx = NextBufIdx(Insert->curridx);
- bool update_needed = true;
+ int nextidx;
XLogRecPtr OldPageRqstPtr;
XLogwrtRqst WriteRqst;
- XLogRecPtr NewPageEndPtr;
+ XLogRecPtr NewPageEndPtr = InvalidXLogRecPtr;
XLogRecPtr NewPageBeginPtr;
XLogPageHeader NewPage;
+ int npages = 0;
+
+ LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
/*
- * Get ending-offset of the buffer page we need to replace (this may be
- * zero if the buffer hasn't been used yet). Fall through if it's already
- * written out.
+ * Now that we have the lock, check if someone initialized the page
+ * already.
*/
- OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
- if (LogwrtResult.Write < OldPageRqstPtr)
+ while (upto >= XLogCtl->xlblocks[XLogCtl->curridx] || opportunistic)
{
- /* nope, got work to do... */
- XLogRecPtr FinishedPageRqstPtr;
-
- FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx];
-
- /* Before waiting, get info_lck and update LogwrtResult */
- {
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
-
- SpinLockAcquire(&xlogctl->info_lck);
- if (xlogctl->LogwrtRqst.Write < FinishedPageRqstPtr)
- xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr;
- LogwrtResult = xlogctl->LogwrtResult;
- SpinLockRelease(&xlogctl->info_lck);
- }
-
- update_needed = false; /* Did the shared-request update */
+ nextidx = NextBufIdx(XLogCtl->curridx);
/*
- * Now that we have an up-to-date LogwrtResult value, see if we still
- * need to write it or if someone else already did.
+ * Get ending-offset of the buffer page we need to replace (this may
+ * be zero if the buffer hasn't been used yet). Fall through if it's
+ * already written out.
*/
+ OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
if (LogwrtResult.Write < OldPageRqstPtr)
{
- /* Must acquire write lock */
- LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
- LogwrtResult = XLogCtl->LogwrtResult;
- if (LogwrtResult.Write >= OldPageRqstPtr)
+ /*
+ * Nope, got work to do. If we just want to pre-initialize as much
+ * as we can without flushing, give up now.
+ */
+ if (opportunistic)
+ break;
+
+ /* Before waiting, get info_lck and update LogwrtResult */
{
- /* OK, someone wrote it already */
- LWLockRelease(WALWriteLock);
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+
+ SpinLockAcquire(&xlogctl->info_lck);
+ if (xlogctl->LogwrtRqst.Write < OldPageRqstPtr)
+ xlogctl->LogwrtRqst.Write = OldPageRqstPtr;
+ LogwrtResult = xlogctl->LogwrtResult;
+ SpinLockRelease(&xlogctl->info_lck);
}
- else
+
+ /*
+ * Now that we have an up-to-date LogwrtResult value, see if we
+ * still need to write it or if someone else already did.
+ */
+ if (LogwrtResult.Write < OldPageRqstPtr)
{
/*
- * Have to write buffers while holding insert lock. This is
- * not good, so only write as much as we absolutely must.
+ * Must acquire write lock. Release WALBufMappingLock first,
+ * to make sure that all insertions that we need to wait for
+ * can finish (up to this same position). Otherwise we risk
+ * deadlock.
*/
- TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
- WriteRqst.Write = OldPageRqstPtr;
- WriteRqst.Flush = 0;
- XLogWrite(WriteRqst, false, false);
- LWLockRelease(WALWriteLock);
- TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
+ LWLockRelease(WALBufMappingLock);
+
+ WaitXLogInsertionsToFinish(OldPageRqstPtr);
+
+ LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
+
+ LogwrtResult = XLogCtl->LogwrtResult;
+ if (LogwrtResult.Write >= OldPageRqstPtr)
+ {
+ /* OK, someone wrote it already */
+ LWLockRelease(WALWriteLock);
+ }
+ else
+ {
+ /* Have to write it ourselves */
+ TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
+ WriteRqst.Write = OldPageRqstPtr;
+ WriteRqst.Flush = 0;
+ XLogWrite(WriteRqst, false);
+ LWLockRelease(WALWriteLock);
+ TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
+ }
+ /* Re-acquire WALBufMappingLock and retry */
+ LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
+ continue;
}
}
- }
- /*
- * Now the next buffer slot is free and we can set it up to be the next
- * output page.
- */
- NewPageBeginPtr = XLogCtl->xlblocks[Insert->curridx];
+ /*
+ * Now the next buffer slot is free and we can set it up to be the next
+ * output page.
+ */
+ NewPageBeginPtr = XLogCtl->xlblocks[XLogCtl->curridx];
+ NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ;
- if (new_segment)
- {
- /* force it to a segment start point */
- if (NewPageBeginPtr % XLogSegSize != 0)
- NewPageBeginPtr += XLogSegSize - NewPageBeginPtr % XLogSegSize;
- }
+ Assert(NewPageEndPtr % XLOG_BLCKSZ == 0);
+ Assert(XLogRecEndPtrToBufIdx(NewPageEndPtr) == nextidx);
+ Assert(XLogRecPtrToBufIdx(NewPageBeginPtr) == nextidx);
- NewPageEndPtr = NewPageBeginPtr;
- NewPageEndPtr += XLOG_BLCKSZ;
- XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
- NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
+ NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
- Insert->curridx = nextidx;
- Insert->currpage = NewPage;
+ /*
+ * Be sure to re-zero the buffer so that bytes beyond what we've
+ * written will look like zeroes and not valid XLOG records...
+ */
+ MemSet((char *) NewPage, 0, XLOG_BLCKSZ);
- Insert->currpos = ((char *) NewPage) +SizeOfXLogShortPHD;
+ /*
+ * Fill the new page's header
+ */
+ NewPage ->xlp_magic = XLOG_PAGE_MAGIC;
- /*
- * Be sure to re-zero the buffer so that bytes beyond what we've written
- * will look like zeroes and not valid XLOG records...
- */
- MemSet((char *) NewPage, 0, XLOG_BLCKSZ);
+ /* NewPage->xlp_info = 0; */ /* done by memset */
+ NewPage ->xlp_tli = ThisTimeLineID;
+ NewPage ->xlp_pageaddr = NewPageBeginPtr;
+ /* NewPage->xlp_rem_len = 0; */ /* done by memset */
- /*
- * Fill the new page's header
- */
- NewPage ->xlp_magic = XLOG_PAGE_MAGIC;
+ /*
+ * If online backup is not in progress, mark the header to indicate
+ * that* WAL records beginning in this page have removable backup
+ * blocks. This allows the WAL archiver to know whether it is safe to
+ * compress archived WAL data by transforming full-block records into
+ * the non-full-block format. It is sufficient to record this at the
+ * page level because we force a page switch (in fact a segment switch)
+ * when starting a backup, so the flag will be off before any records
+ * can be written during the backup. At the end of a backup, the last
+ * page will be marked as all unsafe when perhaps only part is unsafe,
+ * but at worst the archiver would miss the opportunity to compress a
+ * few records.
+ */
+ if (!Insert->forcePageWrites)
+ NewPage ->xlp_info |= XLP_BKP_REMOVABLE;
- /* NewPage->xlp_info = 0; */ /* done by memset */
- NewPage ->xlp_tli = ThisTimeLineID;
- NewPage ->xlp_pageaddr = NewPageBeginPtr;
+ /*
+ * If first page of an XLOG segment file, make it a long header.
+ */
+ if ((NewPage->xlp_pageaddr % XLogSegSize) == 0)
+ {
+ XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;
- /*
- * If online backup is not in progress, mark the header to indicate that
- * WAL records beginning in this page have removable backup blocks. This
- * allows the WAL archiver to know whether it is safe to compress archived
- * WAL data by transforming full-block records into the non-full-block
- * format. It is sufficient to record this at the page level because we
- * force a page switch (in fact a segment switch) when starting a backup,
- * so the flag will be off before any records can be written during the
- * backup. At the end of a backup, the last page will be marked as all
- * unsafe when perhaps only part is unsafe, but at worst the archiver
- * would miss the opportunity to compress a few records.
- */
- if (!Insert->forcePageWrites)
- NewPage ->xlp_info |= XLP_BKP_REMOVABLE;
+ NewLongPage->xlp_sysid = ControlFile->system_identifier;
+ NewLongPage->xlp_seg_size = XLogSegSize;
+ NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
+ NewPage ->xlp_info |= XLP_LONG_HEADER;
+ }
- /*
- * If first page of an XLOG segment file, make it a long header.
- */
- if ((NewPage->xlp_pageaddr % XLogSegSize) == 0)
- {
- XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;
+ /*
+ * Make sure the initialization of the page becomes visible to others
+ * before the xlblocks update. GetXLogBuffer() reads xlblocks without
+ * holding a lock.
+ */
+ pg_write_barrier();
+
+ *((volatile XLogRecPtr *) &XLogCtl->xlblocks[nextidx]) = NewPageEndPtr;
- NewLongPage->xlp_sysid = ControlFile->system_identifier;
- NewLongPage->xlp_seg_size = XLogSegSize;
- NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
- NewPage ->xlp_info |= XLP_LONG_HEADER;
+ XLogCtl->curridx = nextidx;
- Insert->currpos = ((char *) NewPage) +SizeOfXLogLongPHD;
+ npages++;
}
+ LWLockRelease(WALBufMappingLock);
- return update_needed;
+#ifdef WAL_DEBUG
+ if (npages > 0)
+ {
+ elog(DEBUG1, "initialized %d pages, upto %X/%X",
+ npages, (uint32) (NewPageEndPtr >> 32), (uint32) NewPageEndPtr);
+ }
+#endif
}
/*
* This option allows us to avoid uselessly issuing multiple writes when a
* single one would do.
*
- * If xlog_switch == TRUE, we are intending an xlog segment switch, so
- * perform end-of-segment actions after writing the last page, even if
- * it's not physically the end of its segment. (NB: this will work properly
- * only if caller specifies WriteRqst == page-end and flexible == false,
- * and there is some data to write.)
- *
- * Must be called with WALWriteLock held.
+ * Must be called with WALWriteLock held. WaitXLogInsertionsToFinish(WriteRqst)
+ * must be called before grabbing the lock, to make sure the data is ready to
+ * write.
*/
static void
-XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
+XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
{
XLogCtlWrite *Write = &XLogCtl->Write;
bool ispartialpage;
* if we're passed a bogus WriteRqst.Write that is past the end of the
* last page that's been initialized by AdvanceXLInsertBuffer.
*/
- if (LogwrtResult.Write >= XLogCtl->xlblocks[curridx])
+ XLogRecPtr EndPtr = XLogCtl->xlblocks[curridx];
+ if (LogwrtResult.Write >= EndPtr)
elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
(uint32) (LogwrtResult.Write >> 32),
(uint32) LogwrtResult.Write,
- (uint32) (XLogCtl->xlblocks[curridx] >> 32),
- (uint32) XLogCtl->xlblocks[curridx]);
+ (uint32) (EndPtr >> 32), (uint32) EndPtr);
/* Advance LogwrtResult.Write to end of current buffer page */
- LogwrtResult.Write = XLogCtl->xlblocks[curridx];
+ LogwrtResult.Write = EndPtr;
ispartialpage = WriteRqst.Write < LogwrtResult.Write;
if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo))
* later. Doing it here ensures that one and only one backend will
* perform this fsync.
*
- * We also do this if this is the last page written for an xlog
- * switch.
- *
* This is also the right place to notify the Archiver that the
* segment is ready to copy to archival storage, and to update the
* timer for archive_timeout, and to signal for a checkpoint if
* too many logfile segments have been used since the last
* checkpoint.
*/
- if (finishing_seg || (xlog_switch && last_iteration))
+ if (finishing_seg)
{
issue_xlog_fsync(openLogFile, openLogSegNo);
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
+ XLogRecPtr insertpos;
/* read LogwrtResult and update local state */
SpinLockAcquire(&xlogctl->info_lck);
if (record <= LogwrtResult.Flush)
break;
+ /*
+ * Before actually performing the write, wait for all in-flight
+ * insertions to the pages we're about to write to finish.
+ */
+ insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr);
+
/*
* Try to get the write lock. If we can't get it immediately, wait
* until it's released, and recheck if we still need to do the flush
*/
if (CommitDelay > 0 && enableFsync &&
MinimumActiveBackends(CommitSiblings))
+ {
pg_usleep(CommitDelay);
+ /*
+ * Re-check how far we can now flush the WAL. It's generally not
+ * safe to call WaitXLogInsetionsToFinish while holding
+ * WALWriteLock, because an in-progress insertion might need to
+ * also grab WALWriteLock to make progress. But we know that all
+ * the insertions up to insertpos have already finished, because
+ * that's what the earlier WaitXLogInsertionsToFinish() returned.
+ * We're only calling it again to allow insertpos to be moved
+ * further forward, not to actually wait for anyone.
+ */
+ insertpos = WaitXLogInsertionsToFinish(insertpos);
+ }
+
/* try to write/flush later additions to XLOG as well */
- if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE))
- {
- XLogCtlInsert *Insert = &XLogCtl->Insert;
- uint32 freespace = INSERT_FREESPACE(Insert);
+ WriteRqst.Write = insertpos;
+ WriteRqst.Flush = insertpos;
- if (freespace == 0) /* buffer is full */
- WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
- else
- {
- WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
- WriteRqstPtr -= freespace;
- }
- LWLockRelease(WALInsertLock);
- WriteRqst.Write = WriteRqstPtr;
- WriteRqst.Flush = WriteRqstPtr;
- }
- else
- {
- WriteRqst.Write = WriteRqstPtr;
- WriteRqst.Flush = record;
- }
- XLogWrite(WriteRqst, false, false);
+ XLogWrite(WriteRqst, false);
LWLockRelease(WALWriteLock);
/* done */
START_CRIT_SECTION();
- /* now wait for the write lock */
+ /* now wait for any in-progress insertions to finish and get write lock */
+ WaitXLogInsertionsToFinish(WriteRqstPtr);
LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
LogwrtResult = XLogCtl->LogwrtResult;
if (WriteRqstPtr > LogwrtResult.Flush)
WriteRqst.Write = WriteRqstPtr;
WriteRqst.Flush = WriteRqstPtr;
- XLogWrite(WriteRqst, flexible, false);
+ XLogWrite(WriteRqst, flexible);
wrote_something = true;
}
LWLockRelease(WALWriteLock);
/* wake up walsenders now that we've released heavily contended locks */
WalSndWakeupProcessRequests();
+ /*
+ * Great, done. To take some work off the critical path, try to initialize
+ * as many of the no-longer-needed WAL buffers for future use as we can.
+ */
+ AdvanceXLInsertBuffer(InvalidXLogRecPtr, true);
+
return wrote_something;
}
/* XLogCtl */
size = sizeof(XLogCtlData);
+
+ /* xlog insertion slots, plus alignment */
+ size = add_size(size, mul_size(sizeof(XLogInsertSlotPadded), num_xloginsert_slots + 1));
/* xlblocks array */
size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers));
/* extra alignment padding for XLOG I/O buffers */
- size = add_size(size, ALIGNOF_XLOG_BUFFER);
+ size = add_size(size, XLOG_BLCKSZ);
/* and the buffers themselves */
size = add_size(size, mul_size(XLOG_BLCKSZ, XLOGbuffers));
bool foundCFile,
foundXLog;
char *allocptr;
+ int i;
ControlFile = (ControlFileData *)
ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile);
- XLogCtl = (XLogCtlData *)
- ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog);
+ allocptr = ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog);
if (foundCFile || foundXLog)
{
Assert(foundCFile && foundXLog);
return;
}
-
+ XLogCtl = (XLogCtlData *) allocptr;
memset(XLogCtl, 0, sizeof(XLogCtlData));
/*
* multiple of the alignment for same, so no extra alignment padding is
* needed here.
*/
- allocptr = ((char *) XLogCtl) + sizeof(XLogCtlData);
+ allocptr += sizeof(XLogCtlData);
XLogCtl->xlblocks = (XLogRecPtr *) allocptr;
memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
+ /* Xlog insertion slots. Ensure they're aligned to the full padded size */
+ allocptr += sizeof(XLogInsertSlotPadded) -
+ ((uintptr_t) allocptr) % sizeof(XLogInsertSlotPadded);
+ XLogCtl->Insert.insertSlots = (XLogInsertSlotPadded *) allocptr;
+ allocptr += sizeof(XLogInsertSlotPadded) * num_xloginsert_slots;
+
/*
- * Align the start of the page buffers to an ALIGNOF_XLOG_BUFFER boundary.
+ * Align the start of the page buffers to a full xlog block size boundary.
+ * This simplifies some calculations in XLOG insertion. It is also required
+ * for O_DIRECT.
*/
- allocptr = (char *) TYPEALIGN(ALIGNOF_XLOG_BUFFER, allocptr);
+ allocptr = (char *) TYPEALIGN(XLOG_BLCKSZ, allocptr);
XLogCtl->pages = allocptr;
memset(XLogCtl->pages, 0, (Size) XLOG_BLCKSZ * XLOGbuffers);
XLogCtl->SharedRecoveryInProgress = true;
XLogCtl->SharedHotStandbyActive = false;
XLogCtl->WalWriterSleeping = false;
- XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
+
+ for (i = 0; i < num_xloginsert_slots; i++)
+ {
+ XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[i].slot;
+ SpinLockInit(&slot->mutex);
+ slot->xlogInsertingAt = InvalidXLogRecPtr;
+ slot->owner = NULL;
+
+ slot->releaseOK = true;
+ slot->exclusive = 0;
+ slot->head = NULL;
+ slot->tail = NULL;
+ }
+
+ SpinLockInit(&XLogCtl->Insert.insertpos_lck);
SpinLockInit(&XLogCtl->info_lck);
SpinLockInit(&XLogCtl->ulsn_lck);
InitSharedLatch(&XLogCtl->recoveryWakeupLatch);
ThisTimeLineID = 1;
/* page buffer must be aligned suitably for O_DIRECT */
- buffer = (char *) palloc(XLOG_BLCKSZ + ALIGNOF_XLOG_BUFFER);
- page = (XLogPageHeader) TYPEALIGN(ALIGNOF_XLOG_BUFFER, buffer);
+ buffer = (char *) palloc(XLOG_BLCKSZ + XLOG_BLCKSZ);
+ page = (XLogPageHeader) TYPEALIGN(XLOG_BLCKSZ, buffer);
memset(page, 0, XLOG_BLCKSZ);
/*
bool backupEndRequired = false;
bool backupFromStandby = false;
DBState dbstate_at_startup;
+ int firstIdx;
XLogReaderState *xlogreader;
XLogPageReadPrivate private;
bool fast_promoted = false;
lastFullPageWrites = checkPoint.fullPageWrites;
- RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
+ RedoRecPtr = XLogCtl->RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
if (RecPtr < checkPoint.redo)
ereport(PANIC,
openLogFile = XLogFileOpen(openLogSegNo);
openLogOff = 0;
Insert = &XLogCtl->Insert;
- Insert->PrevRecord = LastRec;
- XLogCtl->xlblocks[0] = ((EndOfLog - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ;
+ Insert->PrevBytePos = XLogRecPtrToBytePos(LastRec);
+
+ firstIdx = XLogRecEndPtrToBufIdx(EndOfLog);
+ XLogCtl->curridx = firstIdx;
+
+ XLogCtl->xlblocks[firstIdx] = ((EndOfLog - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ;
/*
* Tricky point here: readBuf contains the *last* block that the LastRec
* record spans, not the one it starts in. The last block is indeed the
* one we want to use.
*/
- if (EndOfLog % XLOG_BLCKSZ == 0)
- {
- memset(Insert->currpage, 0, XLOG_BLCKSZ);
- }
- else
- {
- Assert(readOff == (XLogCtl->xlblocks[0] - XLOG_BLCKSZ) % XLogSegSize);
- memcpy((char *) Insert->currpage, xlogreader->readBuf, XLOG_BLCKSZ);
- }
- Insert->currpos = (char *) Insert->currpage +
- (EndOfLog + XLOG_BLCKSZ - XLogCtl->xlblocks[0]);
+ Assert(readOff == (XLogCtl->xlblocks[firstIdx] - XLOG_BLCKSZ) % XLogSegSize);
+ memcpy((char *) &XLogCtl->pages[firstIdx * XLOG_BLCKSZ], xlogreader->readBuf, XLOG_BLCKSZ);
+ Insert->CurrBytePos = XLogRecPtrToBytePos(EndOfLog);
LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
XLogCtl->LogwrtRqst.Write = EndOfLog;
XLogCtl->LogwrtRqst.Flush = EndOfLog;
- freespace = INSERT_FREESPACE(Insert);
+ freespace = INSERT_FREESPACE(EndOfLog);
if (freespace > 0)
{
/* Make sure rest of page is zero */
- MemSet(Insert->currpos, 0, freespace);
- XLogCtl->Write.curridx = 0;
+ MemSet(&XLogCtl->pages[firstIdx * XLOG_BLCKSZ] + EndOfLog % XLOG_BLCKSZ, 0, freespace);
+ XLogCtl->Write.curridx = firstIdx;
}
else
{
* this is sufficient. The first actual attempt to insert a log
* record will advance the insert state.
*/
- XLogCtl->Write.curridx = NextBufIdx(0);
+ XLogCtl->Write.curridx = NextBufIdx(firstIdx);
}
/* Pre-scan prepared transactions to find out the range of XIDs present */
}
/*
- * Once spawned, a backend may update its local RedoRecPtr from
- * XLogCtl->Insert.RedoRecPtr; it must hold the insert lock or info_lck
- * to do so. This is done in XLogInsert() or GetRedoRecPtr().
+ * Return the current Redo pointer from shared memory.
+ *
+ * As a side-effect, the local RedoRecPtr copy is updated.
*/
XLogRecPtr
GetRedoRecPtr(void)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
+ XLogRecPtr ptr;
+ /*
+ * The possibly not up-to-date copy in XlogCtl is enough. Even if we
+ * grabbed a WAL insertion slot to read the master copy, someone might
+ * update it just after we've released the lock.
+ */
SpinLockAcquire(&xlogctl->info_lck);
- Assert(RedoRecPtr <= xlogctl->Insert.RedoRecPtr);
- RedoRecPtr = xlogctl->Insert.RedoRecPtr;
+ ptr = xlogctl->RedoRecPtr;
SpinLockRelease(&xlogctl->info_lck);
+ if (RedoRecPtr < ptr)
+ RedoRecPtr = ptr;
+
return RedoRecPtr;
}
*
* NOTE: The value *actually* returned is the position of the last full
* xlog page. It lags behind the real insert position by at most 1 page.
- * For that, we don't need to acquire WALInsertLock which can be quite
- * heavily contended, and an approximation is enough for the current
- * usage of this function.
+ * For that, we don't need to scan through WAL insertion slots, and an
+ * approximation is enough for the current usage of this function.
*/
XLogRecPtr
GetInsertRecPtr(void)
void
CreateCheckPoint(int flags)
{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
bool shutdown;
CheckPoint checkPoint;
XLogRecPtr recptr;
XLogRecData rdata;
uint32 freespace;
XLogSegNo _logSegNo;
+ XLogRecPtr curInsert;
VirtualTransactionId *vxids;
int nvxids;
checkPoint.oldestActiveXid = InvalidTransactionId;
/*
- * We must hold WALInsertLock while examining insert state to determine
- * the checkpoint REDO pointer.
+ * We must block concurrent insertions while examining insert state to
+ * determine the checkpoint REDO pointer.
*/
- LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+ WALInsertSlotAcquire(true);
+ curInsert = XLogBytePosToRecPtr(Insert->CurrBytePos);
/*
* If this isn't a shutdown or forced checkpoint, and we have not inserted
if ((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY |
CHECKPOINT_FORCE)) == 0)
{
- XLogRecPtr curInsert;
-
- INSERT_RECPTR(curInsert, Insert, Insert->curridx);
if (curInsert == ControlFile->checkPoint +
MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
ControlFile->checkPoint == ControlFile->checkPointCopy.redo)
{
- LWLockRelease(WALInsertLock);
+ WALInsertSlotRelease();
LWLockRelease(CheckpointLock);
END_CRIT_SECTION();
return;
* the buffer flush work. Those XLOG records are logically after the
* checkpoint, even though physically before it. Got that?
*/
- freespace = INSERT_FREESPACE(Insert);
+ freespace = INSERT_FREESPACE(curInsert);
if (freespace == 0)
{
- (void) AdvanceXLInsertBuffer(false);
- /* OK to ignore update return flag, since we will do flush anyway */
- freespace = INSERT_FREESPACE(Insert);
+ if (curInsert % XLogSegSize == 0)
+ curInsert += SizeOfXLogLongPHD;
+ else
+ curInsert += SizeOfXLogShortPHD;
}
- INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx);
+ checkPoint.redo = curInsert;
/*
* Here we update the shared RedoRecPtr for future XLogInsert calls; this
- * must be done while holding the insert lock AND the info_lck.
+ * must be done while holding the insertion slots.
*
* Note: if we fail to complete the checkpoint, RedoRecPtr will be left
* pointing past where it really needs to point. This is okay; the only
* XLogInserts that happen while we are dumping buffers must assume that
* their buffer changes are not included in the checkpoint.
*/
- {
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
-
- SpinLockAcquire(&xlogctl->info_lck);
- RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
- SpinLockRelease(&xlogctl->info_lck);
- }
+ RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
/*
- * Now we can release WAL insert lock, allowing other xacts to proceed
- * while we are flushing disk buffers.
+ * Now we can release the WAL insertion slots, allowing other xacts to
+ * proceed while we are flushing disk buffers.
*/
- LWLockRelease(WALInsertLock);
+ WALInsertSlotRelease();
+
+ /* Update the info_lck-protected copy of RedoRecPtr as well */
+ SpinLockAcquire(&xlogctl->info_lck);
+ xlogctl->RedoRecPtr = checkPoint.redo;
+ SpinLockRelease(&xlogctl->info_lck);
/*
* If enabled, log checkpoint start. We postpone this until now so as not
* we wait till he's out of his commit critical section before proceeding.
* See notes in RecordTransactionCommit().
*
- * Because we've already released WALInsertLock, this test is a bit fuzzy:
- * it is possible that we will wait for xacts we didn't really need to
- * wait for. But the delay should be short and it seems better to make
- * checkpoint take a bit longer than to hold locks longer than necessary.
+ * Because we've already released the insertion slots, this test is a bit
+ * fuzzy: it is possible that we will wait for xacts we didn't really need
+ * to wait for. But the delay should be short and it seems better to make
+ * checkpoint take a bit longer than to hold off insertions longer than
+ * necessary.
* (In fact, the whole reason we have this issue is that xact.c does
* commit record XLOG insertion and clog update as two separate steps
* protected by different locks, but again that seems best on grounds of
xlrec.end_time = time(NULL);
- LWLockAcquire(WALInsertLock, LW_SHARED);
+ WALInsertSlotAcquire(true);
xlrec.ThisTimeLineID = ThisTimeLineID;
xlrec.PrevTimeLineID = XLogCtl->PrevTimeLineID;
- LWLockRelease(WALInsertLock);
+ WALInsertSlotRelease();
LocalSetXLogInsertAllowed();
* the number of segments replayed since last restartpoint, and request a
* restartpoint if it exceeds checkpoint_segments.
*
- * You need to hold WALInsertLock and info_lck to update it, although
- * during recovery acquiring WALInsertLock is just pro forma, because
- * there is no other processes updating Insert.RedoRecPtr.
+ * Like in CreateCheckPoint(), hold off insertions to update it, although
+ * during recovery this is just pro forma, because no WAL insertions are
+ * happening.
*/
- LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
- SpinLockAcquire(&xlogctl->info_lck);
+ WALInsertSlotAcquire(true);
xlogctl->Insert.RedoRecPtr = lastCheckPoint.redo;
+ WALInsertSlotRelease();
+
+ /* Also update the info_lck-protected copy */
+ SpinLockAcquire(&xlogctl->info_lck);
+ xlogctl->RedoRecPtr = lastCheckPoint.redo;
SpinLockRelease(&xlogctl->info_lck);
- LWLockRelease(WALInsertLock);
/*
* Prepare to accumulate statistics.
*/
if (fullPageWrites)
{
- LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+ WALInsertSlotAcquire(true);
Insert->fullPageWrites = true;
- LWLockRelease(WALInsertLock);
+ WALInsertSlotRelease();
}
/*
if (!fullPageWrites)
{
- LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+ WALInsertSlotAcquire(true);
Insert->fullPageWrites = false;
- LWLockRelease(WALInsertLock);
+ WALInsertSlotRelease();
}
END_CRIT_SECTION();
}
* Note that forcePageWrites has no effect during an online backup from
* the standby.
*
- * We must hold WALInsertLock to change the value of forcePageWrites, to
- * ensure adequate interlocking against XLogInsert().
+ * We must hold all the insertion slots to change the value of
+ * forcePageWrites, to ensure adequate interlocking against XLogInsert().
*/
- LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+ WALInsertSlotAcquire(true);
if (exclusive)
{
if (XLogCtl->Insert.exclusiveBackup)
{
- LWLockRelease(WALInsertLock);
+ WALInsertSlotRelease();
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("a backup is already in progress"),
else
XLogCtl->Insert.nonExclusiveBackups++;
XLogCtl->Insert.forcePageWrites = true;
- LWLockRelease(WALInsertLock);
+ WALInsertSlotRelease();
/* Ensure we release forcePageWrites if fail below */
PG_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) BoolGetDatum(exclusive));
* taking a checkpoint right after another is not that expensive
* either because only few buffers have been dirtied yet.
*/
- LWLockAcquire(WALInsertLock, LW_SHARED);
+ WALInsertSlotAcquire(true);
if (XLogCtl->Insert.lastBackupStart < startpoint)
{
XLogCtl->Insert.lastBackupStart = startpoint;
gotUniqueStartpoint = true;
}
- LWLockRelease(WALInsertLock);
+ WALInsertSlotRelease();
} while (!gotUniqueStartpoint);
XLByteToSeg(startpoint, _logSegNo);
bool exclusive = DatumGetBool(arg);
/* Update backup counters and forcePageWrites on failure */
- LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+ WALInsertSlotAcquire(true);
if (exclusive)
{
Assert(XLogCtl->Insert.exclusiveBackup);
{
XLogCtl->Insert.forcePageWrites = false;
}
- LWLockRelease(WALInsertLock);
+ WALInsertSlotRelease();
}
/*
/*
* OK to update backup counters and forcePageWrites
*/
- LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+ WALInsertSlotAcquire(true);
if (exclusive)
XLogCtl->Insert.exclusiveBackup = false;
else
{
XLogCtl->Insert.forcePageWrites = false;
}
- LWLockRelease(WALInsertLock);
+ WALInsertSlotRelease();
if (exclusive)
{
void
do_pg_abort_backup(void)
{
- LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+ WALInsertSlotAcquire(true);
Assert(XLogCtl->Insert.nonExclusiveBackups > 0);
XLogCtl->Insert.nonExclusiveBackups--;
{
XLogCtl->Insert.forcePageWrites = false;
}
- LWLockRelease(WALInsertLock);
+ WALInsertSlotRelease();
}
/*
XLogRecPtr
GetXLogInsertRecPtr(void)
{
- XLogCtlInsert *Insert = &XLogCtl->Insert;
- XLogRecPtr current_recptr;
+ volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+ uint64 current_bytepos;
- LWLockAcquire(WALInsertLock, LW_SHARED);
- INSERT_RECPTR(current_recptr, Insert, Insert->curridx);
- LWLockRelease(WALInsertLock);
+ SpinLockAcquire(&Insert->insertpos_lck);
+ current_bytepos = Insert->CurrBytePos;
+ SpinLockRelease(&Insert->insertpos_lck);
- return current_recptr;
+ return XLogBytePosToRecPtr(current_bytepos);
}
/*