#include "access/clog.h"
#include "access/multixact.h"
+#include "access/rewriteheap.h"
#include "access/subtrans.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "pgstat.h"
#include "postmaster/bgwriter.h"
#include "postmaster/startup.h"
+#include "replication/logical.h"
#include "replication/slot.h"
+#include "replication/snapbuild.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/barrier.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/ipc.h"
+#include "storage/large_object.h"
#include "storage/latch.h"
#include "storage/pmsignal.h"
#include "storage/predicate.h"
int wal_level = WAL_LEVEL_MINIMAL;
int CommitDelay = 0; /* precommit delay in microseconds */
int CommitSiblings = 5; /* # concurrent xacts needed to sleep */
-int num_xloginsert_slots = 8;
+int num_xloginsert_locks = 8;
#ifdef WAL_DEBUG
bool XLOG_DEBUG = false;
* future XLOG segment as long as there aren't already XLOGfileslop future
* segments; else we'll delete it. This could be made a separate GUC
* variable, but at present I think it's sufficient to hardwire it as
- * 2*CheckPointSegments+1. Under normal conditions, a checkpoint will free
+ * 2*CheckPointSegments+1. Under normal conditions, a checkpoint will free
* no more than 2*CheckPointSegments log segments, and we want to recycle all
* of them; the +1 allows boundary cases to happen without wasting a
* delete/create-segment cycle.
/*
* Statistics for current checkpoint are collected in this global struct.
- * Because only the background writer or a stand-alone backend can perform
+ * Because only the checkpointer or a stand-alone backend can perform
* checkpoints, this will be unused in normal backends.
*/
CheckpointStatsData CheckpointStats;
* 0: unconditionally not allowed to insert XLOG
* -1: must check RecoveryInProgress(); disallow until it is false
* Most processes start with -1 and transition to 1 after seeing that recovery
- * is not in progress. But we can also force the value for special cases.
+ * is not in progress. But we can also force the value for special cases.
* The coding in XLogInsertAllowed() depends on the first two of these states
* being numerically the same as bool true and false.
*/
static TransactionId recoveryTargetXid;
static TimestampTz recoveryTargetTime;
static char *recoveryTargetName;
-static int min_recovery_apply_delay = 0;
+static int recovery_min_apply_delay = 0;
static TimestampTz recoveryDelayUntilTime;
/* options taken from recovery.conf for XLOG streaming */
*
* expectedTLEs: a list of TimeLineHistoryEntries for recoveryTargetTLI and the timelines of
* its known parents, newest first (so recoveryTargetTLI is always the
- * first list member). Only these TLIs are expected to be seen in the WAL
+ * first list member). Only these TLIs are expected to be seen in the WAL
* segments we read, and indeed only these TLIs will be considered as
* candidate WAL files to open at all.
*
/*
* RedoRecPtr is this backend's local copy of the REDO record pointer
* (which is almost but not quite the same as a pointer to the most recent
- * CHECKPOINT record). We update this from the shared-memory copy,
+ * CHECKPOINT record). We update this from the shared-memory copy,
* XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
- * hold an insertion slot). See XLogInsert for details. We are also allowed
+ * hold an insertion lock). See XLogInsert for details. We are also allowed
* to update from XLogCtl->RedoRecPtr if we hold the info_lck;
* see GetRedoRecPtr. A freshly spawned backend obtains the value during
* InitXLOGAccess.
XLogRecPtr Flush; /* last byte + 1 flushed */
} XLogwrtResult;
-
/*
- * A slot for inserting to the WAL. This is similar to an LWLock, the main
- * difference is that there is an extra xlogInsertingAt field that is protected
- * by the same mutex. Unlike an LWLock, a slot can only be acquired in
- * exclusive mode.
- *
- * The xlogInsertingAt field is used to advertise to other processes how far
- * the slot owner has progressed in inserting the record. When a backend
- * acquires a slot, it initializes xlogInsertingAt to 1, because it doesn't
- * yet know where it's going to insert the record. That's conservative
- * but correct; the new insertion is certainly going to go to a byte position
- * greater than 1. If another backend needs to flush the WAL, it will have to
- * wait for the new insertion. xlogInsertingAt is updated after finishing the
- * insert or when crossing a page boundary, which will wake up anyone waiting
- * for it, whether the wait was necessary in the first place or not.
- *
- * A process can wait on a slot in two modes: LW_EXCLUSIVE or
- * LW_WAIT_UNTIL_FREE. LW_EXCLUSIVE works like in an lwlock; when the slot is
- * released, the first LW_EXCLUSIVE waiter in the queue is woken up. Processes
- * waiting in LW_WAIT_UNTIL_FREE mode are woken up whenever the slot is
- * released, or xlogInsertingAt is updated. In other words, a process in
- * LW_WAIT_UNTIL_FREE mode is woken up whenever the inserter makes any progress
- * copying the record in place. LW_WAIT_UNTIL_FREE waiters are always added to
- * the front of the queue, while LW_EXCLUSIVE waiters are appended to the end.
- *
- * To join the wait queue, a process must set MyProc->lwWaitMode to the mode
- * it wants to wait in, MyProc->lwWaiting to true, and link MyProc to the head
- * or tail of the wait queue. The same mechanism is used to wait on an LWLock,
- * see lwlock.c for details.
+ * Inserting to WAL is protected by a small fixed number of WAL insertion
+ * locks. To insert to the WAL, you must hold one of the locks - it doesn't
+ * matter which one. To lock out other concurrent insertions, you must hold
+ * of them. Each WAL insertion lock consists of a lightweight lock, plus an
+ * indicator of how far the insertion has progressed (insertingAt).
+ *
+ * The insertingAt values are read when a process wants to flush WAL from
+ * the in-memory buffers to disk, to check that all the insertions to the
+ * region the process is about to write out have finished. You could simply
+ * wait for all currently in-progress insertions to finish, but the
+ * insertingAt indicator allows you to ignore insertions to later in the WAL,
+ * so that you only wait for the insertions that are modifying the buffers
+ * you're about to write out.
+ *
+ * This isn't just an optimization. If all the WAL buffers are dirty, an
+ * inserter that's holding a WAL insert lock might need to evict an old WAL
+ * buffer, which requires flushing the WAL. If it's possible for an inserter
+ * to block on another inserter unnecessarily, deadlock can arise when two
+ * inserters holding a WAL insert lock wait for each other to finish their
+ * insertion.
+ *
+ * Small WAL records that don't cross a page boundary never update the value,
+ * the WAL record is just copied to the page and the lock is released. But
+ * to avoid the deadlock-scenario explained above, the indicator is always
+ * updated before sleeping while holding an insertion lock.
*/
typedef struct
{
- slock_t mutex; /* protects the below fields */
- XLogRecPtr xlogInsertingAt; /* insert has completed up to this point */
-
- PGPROC *owner; /* for debugging purposes */
-
- bool releaseOK; /* T if ok to release waiters */
- char exclusive; /* # of exclusive holders (0 or 1) */
- PGPROC *head; /* head of list of waiting PGPROCs */
- PGPROC *tail; /* tail of list of waiting PGPROCs */
- /* tail is undefined when head is NULL */
-} XLogInsertSlot;
+ LWLock lock;
+ XLogRecPtr insertingAt;
+} WALInsertLock;
/*
- * All the slots are allocated as an array in shared memory. We force the
- * array stride to be a power of 2, which saves a few cycles in indexing, but
- * more importantly also ensures that individual slots don't cross cache line
- * boundaries. (Of course, we have to also ensure that the array start
- * address is suitably aligned.)
+ * All the WAL insertion locks are allocated as an array in shared memory. We
+ * force the array stride to be a power of 2, which saves a few cycles in
+ * indexing, but more importantly also ensures that individual slots don't
+ * cross cache line boundaries. (Of course, we have to also ensure that the
+ * array start address is suitably aligned.)
*/
-typedef union XLogInsertSlotPadded
+typedef union WALInsertLockPadded
{
- XLogInsertSlot slot;
+ WALInsertLock l;
char pad[CACHE_LINE_SIZE];
-} XLogInsertSlotPadded;
+} WALInsertLockPadded;
/*
* Shared state data for XLogInsert.
slock_t insertpos_lck; /* protects CurrBytePos and PrevBytePos */
/*
- * CurrBytePos is the end of reserved WAL. The next record will be inserted
- * at that position. PrevBytePos is the start position of the previously
- * inserted (or rather, reserved) record - it is copied to the prev-link
- * of the next record. These are stored as "usable byte positions" rather
- * than XLogRecPtrs (see XLogBytePosToRecPtr()).
+ * CurrBytePos is the end of reserved WAL. The next record will be
+ * inserted at that position. PrevBytePos is the start position of the
+ * previously inserted (or rather, reserved) record - it is copied to the
+ * prev-link of the next record. These are stored as "usable byte
+ * positions" rather than XLogRecPtrs (see XLogBytePosToRecPtr()).
*/
uint64 CurrBytePos;
uint64 PrevBytePos;
* we must WAL-log it before it actually affects WAL-logging by backends.
* Checkpointer sets at startup or after SIGHUP.
*
- * To read these fields, you must hold an insertion slot. To modify them,
- * you must hold ALL the slots.
+ * To read these fields, you must hold an insertion lock. To modify them,
+ * you must hold ALL the locks.
*/
XLogRecPtr RedoRecPtr; /* current redo point for insertions */
bool forcePageWrites; /* forcing full-page writes for PITR? */
int nonExclusiveBackups;
XLogRecPtr lastBackupStart;
- /* insertion slots, see XLogInsertSlot struct above for details */
- XLogInsertSlotPadded *insertSlots;
+ /*
+ * WAL insertion locks.
+ */
+ WALInsertLockPadded *WALInsertLocks;
+ LWLockTranche WALInsertLockTranche;
+ int WALInsertLockTrancheId;
} XLogCtlInsert;
/*
* Latest initialized page in the cache (last byte position + 1).
*
* To change the identity of a buffer (and InitializedUpTo), you need to
- * hold WALBufMappingLock. To change the identity of a buffer that's still
- * dirty, the old page needs to be written out first, and for that you
- * need WALWriteLock, and you need to ensure that there are no in-progress
- * insertions to the page by calling WaitXLogInsertionsToFinish().
+ * hold WALBufMappingLock. To change the identity of a buffer that's
+ * still dirty, the old page needs to be written out first, and for that
+ * you need WALWriteLock, and you need to ensure that there are no
+ * in-progress insertions to the page by calling
+ * WaitXLogInsertionsToFinish().
*/
XLogRecPtr InitializedUpTo;
static XLogCtlData *XLogCtl = NULL;
+/* a private copy of XLogCtl->Insert.WALInsertLocks, for convenience */
+static WALInsertLockPadded *WALInsertLocks = NULL;
+
/*
* We maintain an image of pg_control in shared memory.
*/
XLOG_FROM_ANY = 0, /* request to read WAL from any source */
XLOG_FROM_ARCHIVE, /* restored using restore_command */
XLOG_FROM_PG_XLOG, /* existing file in pg_xlog */
- XLOG_FROM_STREAM, /* streamed from master */
+ XLOG_FROM_STREAM /* streamed from master */
} XLogSource;
/* human-readable names for XLogSources, for debugging output */
/* Have we launched bgwriter during recovery? */
static bool bgwriterLaunched = false;
-/* For WALInsertSlotAcquire/Release functions */
-static int MySlotNo = 0;
-static bool holdingAllSlots = false;
+/* For WALInsertLockAcquire/Release functions */
+static int MyLockNo = 0;
+static bool holdingAllLocks = false;
static void readRecoveryCommandFile(void);
static void exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo);
static int get_sync_bit(int method);
static void CopyXLogRecordToWAL(int write_len, bool isLogSwitch,
- XLogRecData *rdata,
- XLogRecPtr StartPos, XLogRecPtr EndPos);
+ XLogRecData *rdata,
+ XLogRecPtr StartPos, XLogRecPtr EndPos);
static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
XLogRecPtr *EndPos, XLogRecPtr *PrevPtr);
static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
XLogRecPtr *PrevPtr);
static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
-static void WakeupWaiters(XLogRecPtr EndPos);
static char *GetXLogBuffer(XLogRecPtr ptr);
static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
static uint64 XLogRecPtrToBytePos(XLogRecPtr ptr);
-static void WALInsertSlotAcquire(bool exclusive);
-static void WALInsertSlotAcquireOne(int slotno);
-static void WALInsertSlotRelease(void);
-static void WALInsertSlotReleaseOne(int slotno);
+static void WALInsertLockAcquire(void);
+static void WALInsertLockAcquireExclusive(void);
+static void WALInsertLockRelease(void);
+static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt);
/*
* Insert an XLOG record having the specified RMID and info bytes,
if (rechdr == NULL)
{
- rechdr = malloc(SizeOfXLogRecord);
- if (rechdr == NULL)
- elog(ERROR, "out of memory");
+ static char rechdrbuf[SizeOfXLogRecord + MAXIMUM_ALIGNOF];
+
+ rechdr = (XLogRecord *) MAXALIGN(&rechdrbuf);
MemSet(rechdr, 0, SizeOfXLogRecord);
}
*
* We may have to loop back to here if a race condition is detected below.
* We could prevent the race by doing all this work while holding an
- * insertion slot, but it seems better to avoid doing CRC calculations
+ * insertion lock, but it seems better to avoid doing CRC calculations
* while holding one.
*
* We add entries for backup blocks to the chain, so that they don't need
/*
* Decide if we need to do full-page writes in this XLOG record: true if
* full_page_writes is on or we have a PITR request for it. Since we
- * don't yet have an insertion slot, fullPageWrites and forcePageWrites
- * could change under us, but we'll recheck them once we have a slot.
+ * don't yet have an insertion lock, fullPageWrites and forcePageWrites
+ * could change under us, but we'll recheck them once we have a lock.
*/
doPageWrites = Insert->fullPageWrites || Insert->forcePageWrites;
* record to the shared WAL buffer cache is a two-step process:
*
* 1. Reserve the right amount of space from the WAL. The current head of
- * reserved space is kept in Insert->CurrBytePos, and is protected by
- * insertpos_lck.
+ * reserved space is kept in Insert->CurrBytePos, and is protected by
+ * insertpos_lck.
*
* 2. Copy the record to the reserved WAL space. This involves finding the
- * correct WAL buffer containing the reserved space, and copying the
- * record in place. This can be done concurrently in multiple processes.
+ * correct WAL buffer containing the reserved space, and copying the
+ * record in place. This can be done concurrently in multiple processes.
*
* To keep track of which insertions are still in-progress, each concurrent
- * inserter allocates an "insertion slot", which tells others how far the
- * inserter has progressed. There is a small fixed number of insertion
- * slots, determined by the num_xloginsert_slots GUC. When an inserter
- * finishes, it updates the xlogInsertingAt of its slot to the end of the
- * record it inserted, to let others know that it's done. xlogInsertingAt
- * is also updated when crossing over to a new WAL buffer, to allow the
- * the previous buffer to be flushed.
+ * inserter acquires an insertion lock. In addition to just indicating that
+ * an insertion is in progress, the lock tells others how far the inserter
+ * has progressed. There is a small fixed number of insertion locks,
+ * determined by the num_xloginsert_locks GUC. When an inserter crosses a
+ * page boundary, it updates the value stored in the lock to the how far it
+ * has inserted, to allow the previous buffer to be flushed.
*
- * Holding onto a slot also protects RedoRecPtr and fullPageWrites from
- * changing until the insertion is finished.
+ * Holding onto an insertion lock also protects RedoRecPtr and
+ * fullPageWrites from changing until the insertion is finished.
*
* Step 2 can usually be done completely in parallel. If the required WAL
* page is not initialized yet, you have to grab WALBufMappingLock to
*----------
*/
START_CRIT_SECTION();
- WALInsertSlotAcquire(isLogSwitch);
+ if (isLogSwitch)
+ WALInsertLockAcquireExclusive();
+ else
+ WALInsertLockAcquire();
/*
* Check to see if my RedoRecPtr is out of date. If so, may have to go
* Oops, this buffer now needs to be backed up, but we
* didn't think so above. Start over.
*/
- WALInsertSlotRelease();
+ WALInsertLockRelease();
END_CRIT_SECTION();
rdt_lastnormal->next = NULL;
info = info_orig;
if ((Insert->fullPageWrites || Insert->forcePageWrites) && !doPageWrites)
{
/* Oops, must redo it with full-page data. */
- WALInsertSlotRelease();
+ WALInsertLockRelease();
END_CRIT_SECTION();
rdt_lastnormal->next = NULL;
info = info_orig;
/*
* Done! Let others know that we're finished.
*/
- WALInsertSlotRelease();
+ WALInsertLockRelease();
MarkCurrentTransactionIdLoggedIfAny();
{
TRACE_POSTGRESQL_XLOG_SWITCH();
XLogFlush(EndPos);
+
/*
* Even though we reserved the rest of the segment for us, which is
* reflected in EndPos, we return a pointer to just the end of the
xlog_outrec(&buf, rechdr);
if (rdata->data != NULL)
{
+ StringInfoData recordbuf;
+
+ /*
+ * We have to piece together the WAL record data from the
+ * XLogRecData entries, so that we can pass it to the rm_desc
+ * function as one contiguous chunk. (but we can leave out any
+ * extra entries we created for backup blocks)
+ */
+ rdt_lastnormal->next = NULL;
+
+ initStringInfo(&recordbuf);
+ appendBinaryStringInfo(&recordbuf, (char *) rechdr, sizeof(XLogRecord));
+ for (; rdata != NULL; rdata = rdata->next)
+ appendBinaryStringInfo(&recordbuf, rdata->data, rdata->len);
+
appendStringInfoString(&buf, " - ");
- RmgrTable[rechdr->xl_rmid].rm_desc(&buf, rechdr->xl_info, rdata->data);
+ RmgrTable[rechdr->xl_rmid].rm_desc(&buf, (XLogRecord *) recordbuf.data);
+ pfree(recordbuf.data);
}
elog(LOG, "%s", buf.data);
pfree(buf.data);
/*
* These calculations are a bit heavy-weight to be done while holding a
- * spinlock, but since we're holding all the WAL insertion slots, there
+ * spinlock, but since we're holding all the WAL insertion locks, there
* are no other inserters competing for it. GetXLogInsertRecPtr() does
* compete for it, but that's not called very frequently.
*/
/*
* If this was an xlog-switch, it's not enough to write the switch record,
- * we also have to consume all the remaining space in the WAL segment.
- * We have already reserved it for us, but we still need to make sure it's
+ * we also have to consume all the remaining space in the WAL segment. We
+ * have already reserved it for us, but we still need to make sure it's
* allocated and zeroed in the WAL buffers so that when the caller (or
* someone else) does XLogWrite(), it can really write out all the zeros.
*/
while (CurrPos < EndPos)
{
/* initialize the next page (if not initialized already) */
- WakeupWaiters(CurrPos);
+ WALInsertLockUpdateInsertingAt(CurrPos);
AdvanceXLInsertBuffer(CurrPos, false);
CurrPos += XLOG_BLCKSZ;
}
}
/*
- * Allocate a slot for insertion.
- *
- * In exclusive mode, all slots are reserved for the current process. That
- * blocks all concurrent insertions.
- */
-static void
-WALInsertSlotAcquire(bool exclusive)
-{
- int i;
-
- if (exclusive)
- {
- for (i = 0; i < num_xloginsert_slots; i++)
- WALInsertSlotAcquireOne(i);
- holdingAllSlots = true;
- }
- else
- WALInsertSlotAcquireOne(-1);
-}
-
-/*
- * Workhorse of WALInsertSlotAcquire. Acquires the given slot, or an arbitrary
- * one if slotno == -1. The index of the slot that was acquired is stored in
- * MySlotNo.
- *
- * This is more or less equivalent to LWLockAcquire().
+ * Acquire a WAL insertion lock, for inserting to WAL.
*/
static void
-WALInsertSlotAcquireOne(int slotno)
+WALInsertLockAcquire(void)
{
- volatile XLogInsertSlot *slot;
- PGPROC *proc = MyProc;
- bool retry = false;
- int extraWaits = 0;
- static int slotToTry = -1;
+ bool immed;
/*
- * Try to use the slot we used last time. If the system isn't particularly
- * busy, it's a good bet that it's available, and it's good to have some
- * affinity to a particular slot so that you don't unnecessarily bounce
- * cache lines between processes when there is no contention.
+ * It doesn't matter which of the WAL insertion locks we acquire, so try
+ * the one we used last time. If the system isn't particularly busy, it's
+ * a good bet that it's still available, and it's good to have some
+ * affinity to a particular lock so that you don't unnecessarily bounce
+ * cache lines between processes when there's no contention.
*
- * If this is the first time through in this backend, pick a slot
- * (semi-)randomly. This allows the slots to be used evenly if you have a
+ * If this is the first time through in this backend, pick a lock
+ * (semi-)randomly. This allows the locks to be used evenly if you have a
* lot of very short connections.
*/
- if (slotno != -1)
- MySlotNo = slotno;
- else
- {
- if (slotToTry == -1)
- slotToTry = MyProc->pgprocno % num_xloginsert_slots;
- MySlotNo = slotToTry;
- }
-
- /*
- * We can't wait if we haven't got a PGPROC. This should only occur
- * during bootstrap or shared memory initialization. Put an Assert here
- * to catch unsafe coding practices.
- */
- Assert(MyProc != NULL);
+ static int lockToTry = -1;
- /*
- * Lock out cancel/die interrupts until we exit the code section protected
- * by the slot. This ensures that interrupts will not interfere with
- * manipulations of data structures in shared memory. There is no cleanup
- * mechanism to release the slot if the backend dies while holding one,
- * so make this a critical section.
- */
- START_CRIT_SECTION();
+ if (lockToTry == -1)
+ lockToTry = MyProc->pgprocno % num_xloginsert_locks;
+ MyLockNo = lockToTry;
/*
- * Loop here to try to acquire slot after each time we are signaled by
- * WALInsertSlotRelease.
+ * The insertingAt value is initially set to 0, as we don't know our
+ * insert location yet.
*/
- for (;;)
+ immed = LWLockAcquireWithVar(&WALInsertLocks[MyLockNo].l.lock,
+ &WALInsertLocks[MyLockNo].l.insertingAt,
+ 0);
+ if (!immed)
{
- bool mustwait;
-
- slot = &XLogCtl->Insert.insertSlots[MySlotNo].slot;
-
- /* Acquire mutex. Time spent holding mutex should be short! */
- SpinLockAcquire(&slot->mutex);
-
- /* If retrying, allow WALInsertSlotRelease to release waiters again */
- if (retry)
- slot->releaseOK = true;
-
- /* If I can get the slot, do so quickly. */
- if (slot->exclusive == 0)
- {
- slot->exclusive++;
- mustwait = false;
- }
- else
- mustwait = true;
-
- if (!mustwait)
- break; /* got the lock */
-
- Assert(slot->owner != MyProc);
-
- /*
- * Add myself to wait queue.
- */
- proc->lwWaiting = true;
- proc->lwWaitMode = LW_EXCLUSIVE;
- proc->lwWaitLink = NULL;
- if (slot->head == NULL)
- slot->head = proc;
- else
- slot->tail->lwWaitLink = proc;
- slot->tail = proc;
-
- /* Can release the mutex now */
- SpinLockRelease(&slot->mutex);
-
/*
- * Wait until awakened.
- *
- * Since we share the process wait semaphore with the regular lock
- * manager and ProcWaitForSignal, and we may need to acquire a slot
- * while one of those is pending, it is possible that we get awakened
- * for a reason other than being signaled by WALInsertSlotRelease. If
- * so, loop back and wait again. Once we've gotten the slot,
- * re-increment the sema by the number of additional signals received,
- * so that the lock manager or signal manager will see the received
- * signal when it next waits.
+ * If we couldn't get the lock immediately, try another lock next
+ * time. On a system with more insertion locks than concurrent
+ * inserters, this causes all the inserters to eventually migrate to a
+ * lock that no-one else is using. On a system with more inserters
+ * than locks, it still helps to distribute the inserters evenly
+ * across the locks.
*/
- for (;;)
- {
- /* "false" means cannot accept cancel/die interrupt here. */
- PGSemaphoreLock(&proc->sem, false);
- if (!proc->lwWaiting)
- break;
- extraWaits++;
- }
-
- /* Now loop back and try to acquire lock again. */
- retry = true;
+ lockToTry = (lockToTry + 1) % num_xloginsert_locks;
}
-
- slot->owner = proc;
-
- /*
- * Normally, we initialize the xlogInsertingAt value of the slot to 1,
- * because we don't yet know where in the WAL we're going to insert. It's
- * not critical what it points to right now - leaving it to a too small
- * value just means that WaitXlogInsertionsToFinish() might wait on us
- * unnecessarily, until we update the value (when we finish the insert or
- * move to next page).
- *
- * If we're grabbing all the slots, however, stamp all but the last one
- * with InvalidXLogRecPtr, meaning there is no insert in progress. The last
- * slot is the one that we will update as we proceed with the insert, the
- * rest are held just to keep off other inserters.
- */
- if (slotno != -1 && slotno != num_xloginsert_slots - 1)
- slot->xlogInsertingAt = InvalidXLogRecPtr;
- else
- slot->xlogInsertingAt = 1;
-
- /* We are done updating shared state of the slot itself. */
- SpinLockRelease(&slot->mutex);
-
- /*
- * Fix the process wait semaphore's count for any absorbed wakeups.
- */
- while (extraWaits-- > 0)
- PGSemaphoreUnlock(&proc->sem);
-
- /*
- * If we couldn't get the slot immediately, try another slot next time.
- * On a system with more insertion slots than concurrent inserters, this
- * causes all the inserters to eventually migrate to a slot that no-one
- * else is using. On a system with more inserters than slots, it still
- * causes the inserters to be distributed quite evenly across the slots.
- */
- if (slotno != -1 && retry)
- slotToTry = (slotToTry + 1) % num_xloginsert_slots;
}
/*
- * Wait for the given slot to become free, or for its xlogInsertingAt location
- * to change to something else than 'waitptr'. In other words, wait for the
- * inserter using the given slot to finish its insertion, or to at least make
- * some progress.
+ * Acquire all WAL insertion locks, to prevent other backends from inserting
+ * to WAL.
*/
static void
-WaitOnSlot(volatile XLogInsertSlot *slot, XLogRecPtr waitptr)
+WALInsertLockAcquireExclusive(void)
{
- PGPROC *proc = MyProc;
- int extraWaits = 0;
-
- /*
- * Lock out cancel/die interrupts while we sleep on the slot. There is
- * no cleanup mechanism to remove us from the wait queue if we got
- * interrupted.
- */
- HOLD_INTERRUPTS();
+ int i;
/*
- * Loop here to try to acquire lock after each time we are signaled.
+ * When holding all the locks, we only update the last lock's insertingAt
+ * indicator. The others are set to 0xFFFFFFFFFFFFFFFF, which is higher
+ * than any real XLogRecPtr value, to make sure that no-one blocks waiting
+ * on those.
*/
- for (;;)
+ for (i = 0; i < num_xloginsert_locks - 1; i++)
{
- bool mustwait;
-
- /* Acquire mutex. Time spent holding mutex should be short! */
- SpinLockAcquire(&slot->mutex);
-
- /* If I can get the lock, do so quickly. */
- if (slot->exclusive == 0 || slot->xlogInsertingAt != waitptr)
- mustwait = false;
- else
- mustwait = true;
-
- if (!mustwait)
- break; /* the lock was free */
-
- Assert(slot->owner != MyProc);
-
- /*
- * Add myself to wait queue.
- */
- proc->lwWaiting = true;
- proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
- proc->lwWaitLink = NULL;
-
- /* waiters are added to the front of the queue */
- proc->lwWaitLink = slot->head;
- if (slot->head == NULL)
- slot->tail = proc;
- slot->head = proc;
-
- /* Can release the mutex now */
- SpinLockRelease(&slot->mutex);
-
- /*
- * Wait until awakened.
- *
- * Since we share the process wait semaphore with other things, like
- * the regular lock manager and ProcWaitForSignal, and we may need to
- * acquire an LWLock while one of those is pending, it is possible that
- * we get awakened for a reason other than being signaled by
- * LWLockRelease. If so, loop back and wait again. Once we've gotten
- * the LWLock, re-increment the sema by the number of additional
- * signals received, so that the lock manager or signal manager will
- * see the received signal when it next waits.
- */
- for (;;)
- {
- /* "false" means cannot accept cancel/die interrupt here. */
- PGSemaphoreLock(&proc->sem, false);
- if (!proc->lwWaiting)
- break;
- extraWaits++;
- }
-
- /* Now loop back and try to acquire lock again. */
+ LWLockAcquireWithVar(&WALInsertLocks[i].l.lock,
+ &WALInsertLocks[i].l.insertingAt,
+ UINT64CONST(0xFFFFFFFFFFFFFFFF));
}
+ LWLockAcquireWithVar(&WALInsertLocks[i].l.lock,
+ &WALInsertLocks[i].l.insertingAt,
+ 0);
- /* We are done updating shared state of the lock itself. */
- SpinLockRelease(&slot->mutex);
-
- /*
- * Fix the process wait semaphore's count for any absorbed wakeups.
- */
- while (extraWaits-- > 0)
- PGSemaphoreUnlock(&proc->sem);
-
- /*
- * Now okay to allow cancel/die interrupts.
- */
- RESUME_INTERRUPTS();
+ holdingAllLocks = true;
}
/*
- * Wake up all processes waiting for us with WaitOnSlot(). Sets our
- * xlogInsertingAt value to EndPos, without releasing the slot.
+ * Release our insertion lock (or locks, if we're holding them all).
*/
static void
-WakeupWaiters(XLogRecPtr EndPos)
+WALInsertLockRelease(void)
{
- volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[MySlotNo].slot;
- PGPROC *head;
- PGPROC *proc;
- PGPROC *next;
-
- /*
- * If we have already reported progress up to the same point, do nothing.
- * No other process can modify xlogInsertingAt, so we can check this before
- * grabbing the spinlock.
- */
- if (slot->xlogInsertingAt == EndPos)
- return;
- /* xlogInsertingAt should not go backwards */
- Assert(slot->xlogInsertingAt < EndPos);
-
- /* Acquire mutex. Time spent holding mutex should be short! */
- SpinLockAcquire(&slot->mutex);
-
- /* we should own the slot */
- Assert(slot->exclusive == 1 && slot->owner == MyProc);
-
- slot->xlogInsertingAt = EndPos;
-
- /*
- * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
- * up. They are always in the front of the queue.
- */
- head = slot->head;
-
- if (head != NULL && head->lwWaitMode == LW_WAIT_UNTIL_FREE)
+ if (holdingAllLocks)
{
- proc = head;
- next = proc->lwWaitLink;
- while (next && next->lwWaitMode == LW_WAIT_UNTIL_FREE)
- {
- proc = next;
- next = next->lwWaitLink;
- }
+ int i;
- /* proc is now the last PGPROC to be released */
- slot->head = next;
- proc->lwWaitLink = NULL;
+ for (i = 0; i < num_xloginsert_locks; i++)
+ LWLockRelease(&WALInsertLocks[i].l.lock);
+
+ holdingAllLocks = false;
}
else
- head = NULL;
-
- /* We are done updating shared state of the lock itself. */
- SpinLockRelease(&slot->mutex);
-
- /*
- * Awaken any waiters I removed from the queue.
- */
- while (head != NULL)
{
- proc = head;
- head = proc->lwWaitLink;
- proc->lwWaitLink = NULL;
- proc->lwWaiting = false;
- PGSemaphoreUnlock(&proc->sem);
+ LWLockRelease(&WALInsertLocks[MyLockNo].l.lock);
}
}
/*
- * Release our insertion slot (or slots, if we're holding them all).
+ * Update our insertingAt value, to let others know that we've finished
+ * inserting up to that point.
*/
static void
-WALInsertSlotRelease(void)
+WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt)
{
- int i;
-
- if (holdingAllSlots)
+ if (holdingAllLocks)
{
- for (i = 0; i < num_xloginsert_slots; i++)
- WALInsertSlotReleaseOne(i);
- holdingAllSlots = false;
+ /*
+ * We use the last lock to mark our actual position, see comments in
+ * WALInsertLockAcquireExclusive.
+ */
+ LWLockUpdateVar(&WALInsertLocks[num_xloginsert_locks - 1].l.lock,
+ &WALInsertLocks[num_xloginsert_locks - 1].l.insertingAt,
+ insertingAt);
}
else
- WALInsertSlotReleaseOne(MySlotNo);
-}
-
-static void
-WALInsertSlotReleaseOne(int slotno)
-{
- volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[slotno].slot;
- PGPROC *head;
- PGPROC *proc;
-
- /* Acquire mutex. Time spent holding mutex should be short! */
- SpinLockAcquire(&slot->mutex);
-
- /* we must be holding it */
- Assert(slot->exclusive == 1 && slot->owner == MyProc);
-
- slot->xlogInsertingAt = InvalidXLogRecPtr;
-
- /* Release my hold on the slot */
- slot->exclusive = 0;
- slot->owner = NULL;
-
- /*
- * See if I need to awaken any waiters..
- */
- head = slot->head;
- if (head != NULL)
- {
- if (slot->releaseOK)
- {
- /*
- * Remove the to-be-awakened PGPROCs from the queue.
- */
- bool releaseOK = true;
-
- proc = head;
-
- /*
- * First wake up any backends that want to be woken up without
- * acquiring the lock. These are always in the front of the queue.
- */
- while (proc->lwWaitMode == LW_WAIT_UNTIL_FREE && proc->lwWaitLink)
- proc = proc->lwWaitLink;
-
- /*
- * Awaken the first exclusive-waiter, if any.
- */
- if (proc->lwWaitLink)
- {
- Assert(proc->lwWaitLink->lwWaitMode == LW_EXCLUSIVE);
- proc = proc->lwWaitLink;
- releaseOK = false;
- }
- /* proc is now the last PGPROC to be released */
- slot->head = proc->lwWaitLink;
- proc->lwWaitLink = NULL;
-
- slot->releaseOK = releaseOK;
- }
- else
- head = NULL;
- }
-
- /* We are done updating shared state of the slot itself. */
- SpinLockRelease(&slot->mutex);
-
- /*
- * Awaken any waiters I removed from the queue.
- */
- while (head != NULL)
- {
- proc = head;
- head = proc->lwWaitLink;
- proc->lwWaitLink = NULL;
- proc->lwWaiting = false;
- PGSemaphoreUnlock(&proc->sem);
- }
-
- /*
- * Now okay to allow cancel/die interrupts.
- */
- END_CRIT_SECTION();
+ LWLockUpdateVar(&WALInsertLocks[MyLockNo].l.lock,
+ &WALInsertLocks[MyLockNo].l.insertingAt,
+ insertingAt);
}
-
/*
* Wait for any WAL insertions < upto to finish.
*
}
/*
- * finishedUpto is our return value, indicating the point upto which
- * all the WAL insertions have been finished. Initialize it to the head
- * of reserved WAL, and as we iterate through the insertion slots, back it
+ * Loop through all the locks, sleeping on any in-progress insert older
+ * than 'upto'.
+ *
+ * finishedUpto is our return value, indicating the point upto which all
+ * the WAL insertions have been finished. Initialize it to the head of
+ * reserved WAL, and as we iterate through the insertion locks, back it
* out for any insertion that's still in progress.
*/
finishedUpto = reservedUpto;
-
- /*
- * Loop through all the slots, sleeping on any in-progress insert older
- * than 'upto'.
- */
- for (i = 0; i < num_xloginsert_slots; i++)
+ for (i = 0; i < num_xloginsert_locks; i++)
{
- volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[i].slot;
- XLogRecPtr insertingat;
+ XLogRecPtr insertingat = InvalidXLogRecPtr;
- retry:
- /*
- * We can check if the slot is in use without grabbing the spinlock.
- * The spinlock acquisition of insertpos_lck before this loop acts
- * as a memory barrier. If someone acquires the slot after that, it
- * can't possibly be inserting to anything < reservedUpto. If it was
- * acquired before that, an unlocked test will return true.
- */
- if (!slot->exclusive)
- continue;
-
- SpinLockAcquire(&slot->mutex);
- /* re-check now that we have the lock */
- if (!slot->exclusive)
- {
- SpinLockRelease(&slot->mutex);
- continue;
- }
- insertingat = slot->xlogInsertingAt;
- SpinLockRelease(&slot->mutex);
-
- if (insertingat == InvalidXLogRecPtr)
+ do
{
/*
- * slot is reserved just to hold off other inserters, there is no
- * actual insert in progress.
+ * See if this insertion is in progress. LWLockWait will wait for
+ * the lock to be released, or for the 'value' to be set by a
+ * LWLockUpdateVar call. When a lock is initially acquired, its
+ * value is 0 (InvalidXLogRecPtr), which means that we don't know
+ * where it's inserting yet. We will have to wait for it. If
+ * it's a small insertion, the record will most likely fit on the
+ * same page and the inserter will release the lock without ever
+ * calling LWLockUpdateVar. But if it has to sleep, it will
+ * advertise the insertion point with LWLockUpdateVar before
+ * sleeping.
*/
- continue;
- }
+ if (LWLockWaitForVar(&WALInsertLocks[i].l.lock,
+ &WALInsertLocks[i].l.insertingAt,
+ insertingat, &insertingat))
+ {
+ /* the lock was free, so no insertion in progress */
+ insertingat = InvalidXLogRecPtr;
+ break;
+ }
- /*
- * This insertion is still in progress. Do we need to wait for it?
- *
- * When an inserter acquires a slot, it doesn't reset 'insertingat', so
- * it will initially point to the old value of some already-finished
- * insertion. The inserter will update the value as soon as it finishes
- * the insertion, moves to the next page, or has to do I/O to flush an
- * old dirty buffer. That means that when we see a slot with
- * insertingat value < upto, we don't know if that insertion is still
- * truly in progress, or if the slot is reused by a new inserter that
- * hasn't updated the insertingat value yet. We have to assume it's the
- * latter, and wait.
- */
- if (insertingat < upto)
- {
- WaitOnSlot(slot, insertingat);
- goto retry;
- }
- else
- {
/*
- * We don't need to wait for this insertion, but update the
- * return value.
+ * This insertion is still in progress. Have to wait, unless the
+ * inserter has proceeded past 'upto'.
*/
- if (insertingat < finishedUpto)
- finishedUpto = insertingat;
- }
+ } while (insertingat < upto);
+
+ if (insertingat != InvalidXLogRecPtr && insertingat < finishedUpto)
+ finishedUpto = insertingat;
}
return finishedUpto;
}
*
* The caller must ensure that the page containing the requested location
* isn't evicted yet, and won't be evicted. The way to ensure that is to
- * hold onto an XLogInsertSlot with the xlogInsertingAt position set to
- * something <= ptr. GetXLogBuffer() will update xlogInsertingAt if it needs
+ * hold onto a WAL insertion lock with the insertingAt position set to
+ * something <= ptr. GetXLogBuffer() will update insertingAt if it needs
* to evict an old page from the buffer. (This means that once you call
* GetXLogBuffer() with a given 'ptr', you must not access anything before
* that point anymore, and must not call GetXLogBuffer() with an older 'ptr'
}
/*
- * The XLog buffer cache is organized so that a page is always loaded
- * to a particular buffer. That way we can easily calculate the buffer
- * a given page must be loaded into, from the XLogRecPtr alone.
+ * The XLog buffer cache is organized so that a page is always loaded to a
+ * particular buffer. That way we can easily calculate the buffer a given
+ * page must be loaded into, from the XLogRecPtr alone.
*/
idx = XLogRecPtrToBufIdx(ptr);
if (expectedEndPtr != endptr)
{
/*
- * Let others know that we're finished inserting the record up
- * to the page boundary.
+ * Let others know that we're finished inserting the record up to the
+ * page boundary.
*/
- WakeupWaiters(expectedEndPtr - XLOG_BLCKSZ);
+ WALInsertLockUpdateInsertingAt(expectedEndPtr - XLOG_BLCKSZ);
AdvanceXLInsertBuffer(ptr, false);
endptr = XLogCtl->xlblocks[idx];
if (expectedEndPtr != endptr)
elog(PANIC, "could not find WAL buffer for %X/%X",
- (uint32) (ptr >> 32) , (uint32) ptr);
+ (uint32) (ptr >> 32), (uint32) ptr);
}
else
{
else
{
result = fullsegs * UsableBytesInSegment +
- (XLOG_BLCKSZ - SizeOfXLogLongPHD) + /* account for first page */
- (fullpages - 1) * UsableBytesInPage; /* full pages */
+ (XLOG_BLCKSZ - SizeOfXLogLongPHD) + /* account for first page */
+ (fullpages - 1) * UsableBytesInPage; /* full pages */
if (offset > 0)
{
Assert(offset >= SizeOfXLogShortPHD);
return result;
}
+/*
+ * Determine whether the buffer referenced has to be backed up.
+ *
+ * Since we don't yet have the insert lock, fullPageWrites and forcePageWrites
+ * could change later, so the result should be used for optimization purposes
+ * only.
+ */
+bool
+XLogCheckBufferNeedsBackup(Buffer buffer)
+{
+ bool doPageWrites;
+ Page page;
+
+ page = BufferGetPage(buffer);
+
+ doPageWrites = XLogCtl->Insert.fullPageWrites || XLogCtl->Insert.forcePageWrites;
+
+ if (doPageWrites && PageGetLSN(page) <= RedoRecPtr)
+ return true; /* buffer requires backup */
+
+ return false; /* buffer does not need to be backed up */
+}
+
/*
* Determine whether the buffer referenced by an XLogRecData item has to
* be backed up, and if so fill a BkpBlock struct for it. In any case
}
/*
- * Now the next buffer slot is free and we can set it up to be the next
- * output page.
+ * Now the next buffer slot is free and we can set it up to be the
+ * next output page.
*/
NewPageBeginPtr = XLogCtl->InitializedUpTo;
NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ;
/* NewPage->xlp_info = 0; */ /* done by memset */
NewPage ->xlp_tli = ThisTimeLineID;
NewPage ->xlp_pageaddr = NewPageBeginPtr;
- /* NewPage->xlp_rem_len = 0; */ /* done by memset */
+
+ /* NewPage->xlp_rem_len = 0; */ /* done by memset */
/*
* If online backup is not in progress, mark the header to indicate
* blocks. This allows the WAL archiver to know whether it is safe to
* compress archived WAL data by transforming full-block records into
* the non-full-block format. It is sufficient to record this at the
- * page level because we force a page switch (in fact a segment switch)
- * when starting a backup, so the flag will be off before any records
- * can be written during the backup. At the end of a backup, the last
- * page will be marked as all unsafe when perhaps only part is unsafe,
- * but at worst the archiver would miss the opportunity to compress a
- * few records.
+ * page level because we force a page switch (in fact a segment
+ * switch) when starting a backup, so the flag will be off before any
+ * records can be written during the backup. At the end of a backup,
+ * the last page will be marked as all unsafe when perhaps only part
+ * is unsafe, but at worst the archiver would miss the opportunity to
+ * compress a few records.
*/
if (!Insert->forcePageWrites)
NewPage ->xlp_info |= XLP_BKP_REMOVABLE;
* if we're passed a bogus WriteRqst.Write that is past the end of the
* last page that's been initialized by AdvanceXLInsertBuffer.
*/
- XLogRecPtr EndPtr = XLogCtl->xlblocks[curridx];
+ XLogRecPtr EndPtr = XLogCtl->xlblocks[curridx];
+
if (LogwrtResult.Write >= EndPtr)
elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
(uint32) (LogwrtResult.Write >> 32),
do
{
errno = 0;
- written = write(openLogFile, from, nleft);
+ written = write(openLogFile, from, nleft);
if (written <= 0)
{
if (errno == EINTR)
(errcode_for_file_access(),
errmsg("could not write to log file %s "
"at offset %u, length %zu: %m",
- XLogFileNameP(ThisTimeLineID, openLogSegNo),
+ XLogFileNameP(ThisTimeLineID, openLogSegNo),
openLogOff, nbytes)));
}
nleft -= written;
{
/*
* Could get here without iterating above loop, in which case we might
- * have no open file or the wrong one. However, we do not need to
+ * have no open file or the wrong one. However, we do not need to
* fsync more than one file.
*/
if (sync_method != SYNC_METHOD_OPEN &&
/*
* If the WALWriter is sleeping, we should kick it to make it come out of
- * low-power mode. Otherwise, determine whether there's a full page of
+ * low-power mode. Otherwise, determine whether there's a full page of
* WAL available to write.
*/
if (!sleeping)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
- XLogRecPtr retval;
+ XLogRecPtr retval;
+
SpinLockAcquire(&xlogctl->info_lck);
retval = xlogctl->replicationSlotMinLSN;
SpinLockRelease(&xlogctl->info_lck);
* We normally flush only completed blocks; but if there is nothing to do on
* that basis, we check for unflushed async commits in the current incomplete
* block, and flush through the latest one of those. Thus, if async commits
- * are not being used, we will flush complete blocks only. We can guarantee
+ * are not being used, we will flush complete blocks only. We can guarantee
* that async commits reach disk after at most three cycles; normally only
- * one or two. (When flushing complete blocks, we allow XLogWrite to write
+ * one or two. (When flushing complete blocks, we allow XLogWrite to write
* "flexibly", meaning it can stop at the end of the buffer ring; this makes a
* difference only with very high load or long wal_writer_delay, but imposes
* one extra cycle for the worst case for async commits.)
* log, seg: identify segment to be created/opened.
*
* *use_existent: if TRUE, OK to use a pre-existing file (else, any
- * pre-existing file will be deleted). On return, TRUE if a pre-existing
+ * pre-existing file will be deleted). On return, TRUE if a pre-existing
* file was used.
*
* use_lock: if TRUE, acquire ControlFileLock while moving file into
{
char path[MAXPGPATH];
char tmppath[MAXPGPATH];
+ char zbuffer_raw[XLOG_BLCKSZ + MAXIMUM_ALIGNOF];
char *zbuffer;
XLogSegNo installed_segno;
int max_advance;
unlink(tmppath);
- /*
- * Allocate a buffer full of zeros. This is done before opening the file
- * so that we don't leak the file descriptor if palloc fails.
- *
- * Note: palloc zbuffer, instead of just using a local char array, to
- * ensure it is reasonably well-aligned; this may save a few cycles
- * transferring data to the kernel.
- */
- zbuffer = (char *) palloc0(XLOG_BLCKSZ);
-
/* do not use get_sync_bit() here --- want to fsync only at end of fill */
fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
S_IRUSR | S_IWUSR);
errmsg("could not create file \"%s\": %m", tmppath)));
/*
- * Zero-fill the file. We have to do this the hard way to ensure that all
+ * Zero-fill the file. We have to do this the hard way to ensure that all
* the file space has really been allocated --- on platforms that allow
* "holes" in files, just seeking to the end doesn't allocate intermediate
* space. This way, we know that we have all the space and (after the
- * fsync below) that all the indirect blocks are down on disk. Therefore,
+ * fsync below) that all the indirect blocks are down on disk. Therefore,
* fdatasync(2) or O_DSYNC will be sufficient to sync future writes to the
* log file.
+ *
+ * Note: ensure the buffer is reasonably well-aligned; this may save a few
+ * cycles transferring data to the kernel.
*/
+ zbuffer = (char *) MAXALIGN(zbuffer_raw);
+ memset(zbuffer, 0, XLOG_BLCKSZ);
for (nbytes = 0; nbytes < XLogSegSize; nbytes += XLOG_BLCKSZ)
{
errno = 0;
errmsg("could not write to file \"%s\": %m", tmppath)));
}
}
- pfree(zbuffer);
if (pg_fsync(fd) != 0)
{
* a different timeline)
*
* Currently this is only used during recovery, and so there are no locking
- * considerations. But we should be just as tense as XLogFileInit to avoid
+ * considerations. But we should be just as tense as XLogFileInit to avoid
* emplacing a bogus file.
*/
static void
if (fd < 0)
ereport(PANIC,
(errcode_for_file_access(),
- errmsg("could not open transaction log file \"%s\": %m", path)));
+ errmsg("could not open transaction log file \"%s\": %m", path)));
return fd;
}
* the timelines listed in expectedTLEs.
*
* We expect curFileTLI on entry to be the TLI of the preceding file in
- * sequence, or 0 if there was no predecessor. We do not allow curFileTLI
+ * sequence, or 0 if there was no predecessor. We do not allow curFileTLI
* to go backwards; this prevents us from picking up the wrong file when a
* parent timeline extends to higher segment numbers than the child we
* want to read.
*
* If we haven't read the timeline history file yet, read it now, so that
- * we know which TLIs to scan. We don't save the list in expectedTLEs,
+ * we know which TLIs to scan. We don't save the list in expectedTLEs,
* however, unless we actually find a valid segment. That way if there is
* neither a timeline history file nor a WAL segment in the archive, and
* streaming replication is set up, we'll read the timeline history file
/*
* WAL segment files will not be re-read in normal operation, so we advise
- * the OS to release any cached pages. But do not do so if WAL archiving
+ * the OS to release any cached pages. But do not do so if WAL archiving
* or streaming is active, because archiver and walsender process could
* use the cache to read the WAL segment.
*/
}
}
+/*
+ * Return the last WAL segment removed, or 0 if no segment has been removed
+ * since startup.
+ *
+ * NB: the result can be out of date arbitrarily fast, the caller has to deal
+ * with that.
+ */
+XLogSegNo
+XLogGetLastRemovedSegno(void)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+ XLogSegNo lastRemovedSegNo;
+
+ SpinLockAcquire(&xlogctl->info_lck);
+ lastRemovedSegNo = xlogctl->lastRemovedSegNo;
+ SpinLockRelease(&xlogctl->info_lck);
+
+ return lastRemovedSegNo;
+}
+
/*
* Update the last removed segno pointer in shared memory, to reflect
* that the given XLOG file has been removed.
{
/*
* We ignore the timeline part of the XLOG segment identifiers in
- * deciding whether a segment is still needed. This ensures that we
+ * deciding whether a segment is still needed. This ensures that we
* won't prematurely remove a segment from a parent timeline. We could
* probably be a little more proactive about removing segments of
* non-parent timelines, but that would be a whole lot more
* I/O routines for pg_control
*
* *ControlFile is a buffer in shared memory that holds an image of the
- * contents of pg_control. WriteControlFile() initializes pg_control
+ * contents of pg_control. WriteControlFile() initializes pg_control
* given a preloaded buffer, ReadControlFile() loads the buffer from
* the pg_control file (during postmaster or standalone-backend startup),
* and UpdateControlFile() rewrites pg_control after we modify xlog state.
ControlFile->indexMaxKeys = INDEX_MAX_KEYS;
ControlFile->toast_max_chunk_size = TOAST_MAX_CHUNK_SIZE;
+ ControlFile->loblksize = LOBLKSIZE;
#ifdef HAVE_INT64_TIMESTAMP
ControlFile->enableIntTimes = true;
" but the server was compiled with TOAST_MAX_CHUNK_SIZE %d.",
ControlFile->toast_max_chunk_size, (int) TOAST_MAX_CHUNK_SIZE),
errhint("It looks like you need to recompile or initdb.")));
+ if (ControlFile->loblksize != LOBLKSIZE)
+ ereport(FATAL,
+ (errmsg("database files are incompatible with server"),
+ errdetail("The database cluster was initialized with LOBLKSIZE %d,"
+ " but the server was compiled with LOBLKSIZE %d.",
+ ControlFile->loblksize, (int) LOBLKSIZE),
+ errhint("It looks like you need to recompile or initdb.")));
#ifdef HAVE_INT64_TIMESTAMP
if (ControlFile->enableIntTimes != true)
errhint("It looks like you need to recompile or initdb.")));
#endif
- /* Make the fixed settings visible as GUC variables, too */
+ /* Make the initdb settings visible as GUC variables, too */
SetConfigOption("data_checksums", DataChecksumsEnabled() ? "yes" : "no",
PGC_INTERNAL, PGC_S_OVERRIDE);
}
{
/*
* If we haven't yet changed the boot_val default of -1, just let it
- * be. We'll fix it when XLOGShmemSize is called.
+ * be. We'll fix it when XLOGShmemSize is called.
*/
if (XLOGbuffers == -1)
return true;
/* XLogCtl */
size = sizeof(XLogCtlData);
- /* xlog insertion slots, plus alignment */
- size = add_size(size, mul_size(sizeof(XLogInsertSlotPadded), num_xloginsert_slots + 1));
+ /* WAL insertion locks, plus alignment */
+ size = add_size(size, mul_size(sizeof(WALInsertLockPadded), num_xloginsert_locks + 1));
/* xlblocks array */
size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers));
/* extra alignment padding for XLOG I/O buffers */
memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
- /* Xlog insertion slots. Ensure they're aligned to the full padded size */
- allocptr += sizeof(XLogInsertSlotPadded) -
- ((uintptr_t) allocptr) % sizeof(XLogInsertSlotPadded);
- XLogCtl->Insert.insertSlots = (XLogInsertSlotPadded *) allocptr;
- allocptr += sizeof(XLogInsertSlotPadded) * num_xloginsert_slots;
+
+ /* WAL insertion locks. Ensure they're aligned to the full padded size */
+ allocptr += sizeof(WALInsertLockPadded) -
+ ((uintptr_t) allocptr) %sizeof(WALInsertLockPadded);
+ WALInsertLocks = XLogCtl->Insert.WALInsertLocks =
+ (WALInsertLockPadded *) allocptr;
+ allocptr += sizeof(WALInsertLockPadded) * num_xloginsert_locks;
+
+ XLogCtl->Insert.WALInsertLockTrancheId = LWLockNewTrancheId();
+
+ XLogCtl->Insert.WALInsertLockTranche.name = "WALInsertLocks";
+ XLogCtl->Insert.WALInsertLockTranche.array_base = WALInsertLocks;
+ XLogCtl->Insert.WALInsertLockTranche.array_stride = sizeof(WALInsertLockPadded);
+
+ LWLockRegisterTranche(XLogCtl->Insert.WALInsertLockTrancheId, &XLogCtl->Insert.WALInsertLockTranche);
+ for (i = 0; i < num_xloginsert_locks; i++)
+ {
+ LWLockInitialize(&WALInsertLocks[i].l.lock,
+ XLogCtl->Insert.WALInsertLockTrancheId);
+ WALInsertLocks[i].l.insertingAt = InvalidXLogRecPtr;
+ }
/*
* Align the start of the page buffers to a full xlog block size boundary.
- * This simplifies some calculations in XLOG insertion. It is also required
- * for O_DIRECT.
+ * This simplifies some calculations in XLOG insertion. It is also
+ * required for O_DIRECT.
*/
allocptr = (char *) TYPEALIGN(XLOG_BLCKSZ, allocptr);
XLogCtl->pages = allocptr;
XLogCtl->SharedHotStandbyActive = false;
XLogCtl->WalWriterSleeping = false;
- for (i = 0; i < num_xloginsert_slots; i++)
- {
- XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[i].slot;
- SpinLockInit(&slot->mutex);
- slot->xlogInsertingAt = InvalidXLogRecPtr;
- slot->owner = NULL;
-
- slot->releaseOK = true;
- slot->exclusive = 0;
- slot->head = NULL;
- slot->tail = NULL;
- }
-
SpinLockInit(&XLogCtl->Insert.insertpos_lck);
SpinLockInit(&XLogCtl->info_lck);
SpinLockInit(&XLogCtl->ulsn_lck);
* field, as being about as unique as we can easily get. (Think not to
* use random(), since it hasn't been seeded and there's no portable way
* to seed it other than the system clock value...) The upper half of the
- * uint64 value is just the tv_sec part, while the lower half is the XOR
- * of tv_sec and tv_usec. This is to ensure that we don't lose uniqueness
- * unnecessarily if "uint64" is really only 32 bits wide. A person
- * knowing this encoding can determine the initialization time of the
- * installation, which could perhaps be useful sometimes.
+ * uint64 value is just the tv_sec part, while the lower half contains the
+ * tv_usec part (which must fit in 20 bits), plus 12 bits from our current
+ * PID for a little extra uniqueness. A person knowing this encoding can
+ * determine the initialization time of the installation, which could
+ * perhaps be useful sometimes.
*/
gettimeofday(&tv, NULL);
sysidentifier = ((uint64) tv.tv_sec) << 32;
- sysidentifier |= (uint32) (tv.tv_sec | tv.tv_usec);
+ sysidentifier |= ((uint64) tv.tv_usec) << 12;
+ sysidentifier |= getpid() & 0xFFF;
/* First timeline ID is always 1 */
ThisTimeLineID = 1;
(errmsg_internal("primary_conninfo = '%s'",
PrimaryConnInfo)));
}
- else if (strcmp(item->name, "primary_slotname") == 0)
+ else if (strcmp(item->name, "primary_slot_name") == 0)
{
ReplicationSlotValidateName(item->value, ERROR);
PrimarySlotName = pstrdup(item->value);
ereport(DEBUG2,
- (errmsg_internal("primary_slotname = '%s'",
+ (errmsg_internal("primary_slot_name = '%s'",
PrimarySlotName)));
}
else if (strcmp(item->name, "trigger_file") == 0)
(errmsg_internal("trigger_file = '%s'",
TriggerFile)));
}
- else if (strcmp(item->name, "min_recovery_apply_delay") == 0)
+ else if (strcmp(item->name, "recovery_min_apply_delay") == 0)
{
const char *hintmsg;
- if (!parse_int(item->value, &min_recovery_apply_delay, GUC_UNIT_MS,
- &hintmsg))
+ if (!parse_int(item->value, &recovery_min_apply_delay, GUC_UNIT_MS,
+ &hintmsg))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("parameter \"%s\" requires a temporal value", "min_recovery_apply_delay"),
+ errmsg("parameter \"%s\" requires a temporal value",
+ "recovery_min_apply_delay"),
hintmsg ? errhint("%s", _(hintmsg)) : 0));
ereport(DEBUG2,
- (errmsg("min_recovery_apply_delay = '%s'", item->value)));
+ (errmsg("recovery_min_apply_delay = '%s'", item->value)));
}
else
ereport(FATAL,
/*
* If user specified recovery_target_timeline, validate it or compute the
- * "latest" value. We can't do this until after we've gotten the restore
+ * "latest" value. We can't do this until after we've gotten the restore
* command and set InArchiveRecovery, because we need to fetch timeline
* history files from the archive.
*/
*
* when testing for an xid, we MUST test for equality only, since
* transactions are numbered in the order they start, not the order
- * they complete. A higher numbered xid will complete before you
- * about 50% of the time...
+ * they complete. A higher numbered xid will complete before you about
+ * 50% of the time...
*/
stopsHere = (record->xl_xid == recoveryTargetXid);
}
record_info = record->xl_info & ~XLR_INFO_MASK;
/*
- * There can be many restore points that share the same name; we stop
- * at the first one.
+ * There can be many restore points that share the same name; we stop at
+ * the first one.
*/
if (recoveryTarget == RECOVERY_TARGET_NAME &&
record->xl_rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT)
recoveryStopAfter = true;
recoveryStopXid = InvalidTransactionId;
(void) getRecordTimestamp(record, &recoveryStopTime);
- strncpy(recoveryStopName, recordRestorePointData->rp_name, MAXFNAMELEN);
+ strlcpy(recoveryStopName, recordRestorePointData->rp_name, MAXFNAMELEN);
ereport(LOG,
- (errmsg("recovery stopping at restore point \"%s\", time %s",
- recoveryStopName,
- timestamptz_to_str(recoveryStopTime))));
+ (errmsg("recovery stopping at restore point \"%s\", time %s",
+ recoveryStopName,
+ timestamptz_to_str(recoveryStopTime))));
return true;
}
}
}
/*
- * When min_recovery_apply_delay is set, we wait long enough to make sure
+ * When recovery_min_apply_delay is set, we wait long enough to make sure
* certain record types are applied at least that interval behind the master.
*
* Returns true if we waited.
int microsecs;
/* nothing to do if no delay configured */
- if (min_recovery_apply_delay == 0)
+ if (recovery_min_apply_delay == 0)
return false;
/*
* Is it a COMMIT record?
*
- * We deliberately choose not to delay aborts since they have no effect
- * on MVCC. We already allow replay of records that don't have a
- * timestamp, so there is already opportunity for issues caused by early
- * conflicts on standbys.
+ * We deliberately choose not to delay aborts since they have no effect on
+ * MVCC. We already allow replay of records that don't have a timestamp,
+ * so there is already opportunity for issues caused by early conflicts on
+ * standbys.
*/
record_info = record->xl_info & ~XLR_INFO_MASK;
if (!(record->xl_rmid == RM_XACT_ID &&
return false;
recoveryDelayUntilTime =
- TimestampTzPlusMilliseconds(xtime, min_recovery_apply_delay);
+ TimestampTzPlusMilliseconds(xtime, recovery_min_apply_delay);
/*
* Exit without arming the latch if it's already past time to apply this
*/
TimestampDifference(GetCurrentTimestamp(), recoveryDelayUntilTime,
&secs, µsecs);
- if (secs <= 0 && microsecs <=0)
+ if (secs <= 0 && microsecs <= 0)
return false;
while (true)
TimestampDifference(GetCurrentTimestamp(), recoveryDelayUntilTime,
&secs, µsecs);
- if (secs <= 0 && microsecs <=0)
+ if (secs <= 0 && microsecs <= 0)
break;
elog(DEBUG2, "recovery apply delay %ld seconds, %d milliseconds",
- secs, microsecs / 1000);
+ secs, microsecs / 1000);
WaitLatch(&XLogCtl->recoveryWakeupLatch,
- WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
- secs * 1000L + microsecs / 1000);
+ WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+ secs * 1000L + microsecs / 1000);
}
return true;
}
* For archive recovery, the WAL must be generated with at least 'archive'
* wal_level.
*/
- if (InArchiveRecovery && ControlFile->wal_level == WAL_LEVEL_MINIMAL)
+ if (ArchiveRecoveryRequested && ControlFile->wal_level == WAL_LEVEL_MINIMAL)
{
ereport(WARNING,
(errmsg("WAL was generated with wal_level=minimal, data may be missing"),
* For Hot Standby, the WAL must be generated with 'hot_standby' mode, and
* we must have at least as many backend slots as the primary.
*/
- if (InArchiveRecovery && EnableHotStandby)
+ if (ArchiveRecoveryRequested && EnableHotStandby)
{
if (ControlFile->wal_level < WAL_LEVEL_HOT_STANDBY)
ereport(ERROR,
ValidateXLOGDirectoryStructure();
/*
- * Clear out any old relcache cache files. This is *necessary* if we do
+ * Clear out any old relcache cache files. This is *necessary* if we do
* any WAL replay, since that would probably result in the cache files
* being out of sync with database reality. In theory we could leave them
* in place if the database had been cleanly shut down, but it seems
* Save archive_cleanup_command in shared memory so that other processes
* can see it.
*/
- strncpy(XLogCtl->archiveCleanupCommand,
+ strlcpy(XLogCtl->archiveCleanupCommand,
archiveCleanupCommand ? archiveCleanupCommand : "",
sizeof(XLogCtl->archiveCleanupCommand));
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory"),
- errdetail("Failed while allocating an XLog reading processor.")));
+ errdetail("Failed while allocating an XLog reading processor.")));
xlogreader->system_identifier = ControlFile->system_identifier;
if (read_backup_label(&checkPointLoc, &backupEndRequired,
MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
+ MultiXactSetSafeTruncate(checkPoint.oldestMulti);
XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
XLogCtl->ckptXid = checkPoint.nextXid;
* Initialize replication slots, before there's a chance to remove
* required resources.
*/
- StartupReplicationSlots(checkPoint.redo);
+ StartupReplicationSlots();
+
+ /*
+ * Startup logical state, needs to be setup now so we have proper data
+ * during crash recovery.
+ */
+ StartupReorderBuffer();
/*
- * Startup MultiXact. We need to do this early for two reasons: one
- * is that we might try to access multixacts when we do tuple freezing,
- * and the other is we need its state initialized because we attempt
+ * Startup MultiXact. We need to do this early for two reasons: one is
+ * that we might try to access multixacts when we do tuple freezing, and
+ * the other is we need its state initialized because we attempt
* truncation during restartpoints.
*/
StartupMultiXact();
}
/*
- * Initialize shared variables for tracking progress of WAL replay,
- * as if we had just replayed the record before the REDO location.
+ * Initialize shared variables for tracking progress of WAL replay, as
+ * if we had just replayed the record before the REDO location (or the
+ * checkpoint record itself, if it's a shutdown checkpoint).
*/
SpinLockAcquire(&xlogctl->info_lck);
- xlogctl->replayEndRecPtr = checkPoint.redo;
+ if (checkPoint.redo < RecPtr)
+ xlogctl->replayEndRecPtr = checkPoint.redo;
+ else
+ xlogctl->replayEndRecPtr = EndRecPtr;
xlogctl->replayEndTLI = ThisTimeLineID;
- xlogctl->lastReplayedEndRecPtr = checkPoint.redo;
- xlogctl->lastReplayedTLI = ThisTimeLineID;
+ xlogctl->lastReplayedEndRecPtr = xlogctl->replayEndRecPtr;
+ xlogctl->lastReplayedTLI = xlogctl->replayEndTLI;
xlogctl->recoveryLastXTime = 0;
xlogctl->currentChunkStartTime = 0;
xlogctl->recoveryPause = false;
(uint32) (EndRecPtr >> 32), (uint32) EndRecPtr);
xlog_outrec(&buf, record);
appendStringInfoString(&buf, " - ");
- RmgrTable[record->xl_rmid].rm_desc(&buf,
- record->xl_info,
- XLogRecGetData(record));
+ RmgrTable[record->xl_rmid].rm_desc(&buf, record);
elog(LOG, "%s", buf.data);
pfree(buf.data);
}
}
/*
- * If we've been asked to lag the master, wait on
- * latch until enough time has passed.
+ * If we've been asked to lag the master, wait on latch until
+ * enough time has passed.
*/
if (recoveryApplyDelay(record))
{
/*
- * We test for paused recovery again here. If
- * user sets delayed apply, it may be because
- * they expect to pause recovery in case of
- * problems, so we must test again here otherwise
- * pausing during the delay-wait wouldn't work.
+ * We test for paused recovery again here. If user sets
+ * delayed apply, it may be because they expect to pause
+ * recovery in case of problems, so we must test again
+ * here otherwise pausing during the delay-wait wouldn't
+ * work.
*/
if (xlogctl->recoveryPause)
recoveryPausesHere();
recoveryPausesHere();
}
+ /* Allow resource managers to do any required cleanup. */
+ for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
+ {
+ if (RmgrTable[rmid].rm_cleanup != NULL)
+ RmgrTable[rmid].rm_cleanup();
+ }
+
ereport(LOG,
(errmsg("redo done at %X/%X",
(uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr)));
/*
* Consider whether we need to assign a new timeline ID.
*
- * If we are doing an archive recovery, we always assign a new ID. This
- * handles a couple of issues. If we stopped short of the end of WAL
+ * If we are doing an archive recovery, we always assign a new ID. This
+ * handles a couple of issues. If we stopped short of the end of WAL
* during recovery, then we are clearly generating a new timeline and must
* assign it a unique new ID. Even if we ran to the end, modifying the
* current last segment is problematic because it may result in trying to
/*
* Tricky point here: readBuf contains the *last* block that the LastRec
- * record spans, not the one it starts in. The last block is indeed the
+ * record spans, not the one it starts in. The last block is indeed the
* one we want to use.
*/
if (EndOfLog % XLOG_BLCKSZ != 0)
else
{
/*
- * There is no partial block to copy. Just set InitializedUpTo,
- * and let the first attempt to insert a log record to initialize
- * the next buffer.
+ * There is no partial block to copy. Just set InitializedUpTo, and
+ * let the first attempt to insert a log record to initialize the next
+ * buffer.
*/
XLogCtl->InitializedUpTo = EndOfLog;
}
if (InRecovery)
{
- int rmid;
-
- /*
- * Resource managers might need to write WAL records, eg, to record
- * index cleanup actions. So temporarily enable XLogInsertAllowed in
- * this process only.
- */
- LocalSetXLogInsertAllowed();
-
- /*
- * Allow resource managers to do any required cleanup.
- */
- for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
- {
- if (RmgrTable[rmid].rm_cleanup != NULL)
- RmgrTable[rmid].rm_cleanup();
- }
-
- /* Disallow XLogInsert again */
- LocalXLogInsertAllowed = -1;
-
/*
* Perform a checkpoint to update all our recovery activity to disk.
*
XLogReportParameters();
/*
- * All done. Allow backends to write WAL. (Although the bool flag is
+ * All done. Allow backends to write WAL. (Although the bool flag is
* probably atomic in itself, we use the info_lck here to ensure that
* there are no race conditions concerning visibility of other recent
* updates to shared memory.)
static void
CheckRecoveryConsistency(void)
{
- XLogRecPtr lastReplayedEndRecPtr;
+ XLogRecPtr lastReplayedEndRecPtr;
/*
* During crash recovery, we don't reach a consistent state until we've
/*
* Initialize TimeLineID and RedoRecPtr when we discover that recovery
* is finished. InitPostgres() relies upon this behaviour to ensure
- * that InitXLOGAccess() is called at backend startup. (If you change
+ * that InitXLOGAccess() is called at backend startup. (If you change
* this, see also LocalSetXLogInsertAllowed.)
*/
if (!LocalRecoveryInProgress)
pg_memory_barrier();
InitXLOGAccess();
}
+
/*
* Note: We don't need a memory barrier when we're still in recovery.
* We might exit recovery immediately after return, so the caller
ThisTimeLineID = XLogCtl->ThisTimeLineID;
Assert(ThisTimeLineID != 0 || IsBootstrapProcessingMode());
+ /* Initialize our copy of WALInsertLocks and register the tranche */
+ WALInsertLocks = XLogCtl->Insert.WALInsertLocks;
+ LWLockRegisterTranche(XLogCtl->Insert.WALInsertLockTrancheId,
+ &XLogCtl->Insert.WALInsertLockTranche);
+
/* Use GetRedoRecPtr to copy the RedoRecPtr safely */
(void) GetRedoRecPtr();
}
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
- XLogRecPtr ptr;
+ XLogRecPtr ptr;
/*
* The possibly not up-to-date copy in XlogCtl is enough. Even if we
- * grabbed a WAL insertion slot to read the master copy, someone might
+ * grabbed a WAL insertion lock to read the master copy, someone might
* update it just after we've released the lock.
*/
SpinLockAcquire(&xlogctl->info_lck);
*
* NOTE: The value *actually* returned is the position of the last full
* xlog page. It lags behind the real insert position by at most 1 page.
- * For that, we don't need to scan through WAL insertion slots, and an
+ * For that, we don't need to scan through WAL insertion locks, and an
* approximation is enough for the current usage of this function.
*/
XLogRecPtr
* We must block concurrent insertions while examining insert state to
* determine the checkpoint REDO pointer.
*/
- WALInsertSlotAcquire(true);
+ WALInsertLockAcquireExclusive();
curInsert = XLogBytePosToRecPtr(Insert->CurrBytePos);
/*
* If this isn't a shutdown or forced checkpoint, and we have not inserted
* any XLOG records since the start of the last checkpoint, skip the
- * checkpoint. The idea here is to avoid inserting duplicate checkpoints
+ * checkpoint. The idea here is to avoid inserting duplicate checkpoints
* when the system is idle. That wastes log space, and more importantly it
* exposes us to possible loss of both current and previous checkpoint
* records if the machine crashes just as we're writing the update.
MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
ControlFile->checkPoint == ControlFile->checkPointCopy.redo)
{
- WALInsertSlotRelease();
+ WALInsertLockRelease();
LWLockRelease(CheckpointLock);
END_CRIT_SECTION();
return;
/*
* Here we update the shared RedoRecPtr for future XLogInsert calls; this
- * must be done while holding the insertion slots.
+ * must be done while holding all the insertion locks.
*
* Note: if we fail to complete the checkpoint, RedoRecPtr will be left
* pointing past where it really needs to point. This is okay; the only
RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
/*
- * Now we can release the WAL insertion slots, allowing other xacts to
+ * Now we can release the WAL insertion locks, allowing other xacts to
* proceed while we are flushing disk buffers.
*/
- WALInsertSlotRelease();
+ WALInsertLockRelease();
/* Update the info_lck-protected copy of RedoRecPtr as well */
SpinLockAcquire(&xlogctl->info_lck);
TRACE_POSTGRESQL_CHECKPOINT_START(flags);
- /*
- * In some cases there are groups of actions that must all occur on one
- * side or the other of a checkpoint record. Before flushing the
- * checkpoint record we must explicitly wait for any backend currently
- * performing those groups of actions.
- *
- * One example is end of transaction, so we must wait for any transactions
- * that are currently in commit critical sections. If an xact inserted
- * its commit record into XLOG just before the REDO point, then a crash
- * restart from the REDO point would not replay that record, which means
- * that our flushing had better include the xact's update of pg_clog. So
- * we wait till he's out of his commit critical section before proceeding.
- * See notes in RecordTransactionCommit().
- *
- * Because we've already released the insertion slots, this test is a bit
- * fuzzy: it is possible that we will wait for xacts we didn't really need
- * to wait for. But the delay should be short and it seems better to make
- * checkpoint take a bit longer than to hold off insertions longer than
- * necessary.
- * (In fact, the whole reason we have this issue is that xact.c does
- * commit record XLOG insertion and clog update as two separate steps
- * protected by different locks, but again that seems best on grounds of
- * minimizing lock contention.)
- *
- * A transaction that has not yet set delayChkpt when we look cannot be at
- * risk, since he's not inserted his commit record yet; and one that's
- * already cleared it is not at risk either, since he's done fixing clog
- * and we will correctly flush the update below. So we cannot miss any
- * xacts we need to wait for.
- */
- vxids = GetVirtualXIDsDelayingChkpt(&nvxids);
- if (nvxids > 0)
- {
- do
- {
- pg_usleep(10000L); /* wait for 10 msec */
- } while (HaveVirtualXIDsDelayingChkpt(vxids, nvxids));
- }
- pfree(vxids);
-
/*
* Get the other info we need for the checkpoint record.
*/
*/
END_CRIT_SECTION();
+ /*
+ * In some cases there are groups of actions that must all occur on one
+ * side or the other of a checkpoint record. Before flushing the
+ * checkpoint record we must explicitly wait for any backend currently
+ * performing those groups of actions.
+ *
+ * One example is end of transaction, so we must wait for any transactions
+ * that are currently in commit critical sections. If an xact inserted
+ * its commit record into XLOG just before the REDO point, then a crash
+ * restart from the REDO point would not replay that record, which means
+ * that our flushing had better include the xact's update of pg_clog. So
+ * we wait till he's out of his commit critical section before proceeding.
+ * See notes in RecordTransactionCommit().
+ *
+ * Because we've already released the insertion locks, this test is a bit
+ * fuzzy: it is possible that we will wait for xacts we didn't really need
+ * to wait for. But the delay should be short and it seems better to make
+ * checkpoint take a bit longer than to hold off insertions longer than
+ * necessary. (In fact, the whole reason we have this issue is that xact.c
+ * does commit record XLOG insertion and clog update as two separate steps
+ * protected by different locks, but again that seems best on grounds of
+ * minimizing lock contention.)
+ *
+ * A transaction that has not yet set delayChkpt when we look cannot be at
+ * risk, since he's not inserted his commit record yet; and one that's
+ * already cleared it is not at risk either, since he's done fixing clog
+ * and we will correctly flush the update below. So we cannot miss any
+ * xacts we need to wait for.
+ */
+ vxids = GetVirtualXIDsDelayingChkpt(&nvxids);
+ if (nvxids > 0)
+ {
+ do
+ {
+ pg_usleep(10000L); /* wait for 10 msec */
+ } while (HaveVirtualXIDsDelayingChkpt(vxids, nvxids));
+ }
+ pfree(vxids);
+
CheckPointGuts(checkPoint.redo, flags);
/*
*/
END_CRIT_SECTION();
+ /*
+ * Now that the checkpoint is safely on disk, we can update the point to
+ * which multixact can be truncated.
+ */
+ MultiXactSetSafeTruncate(checkPoint.oldestMulti);
+
/*
* Let smgr do post-checkpoint cleanup (eg, deleting old files).
*/
/*
* Truncate pg_subtrans if possible. We can throw away all data before
- * the oldest XMIN of any running transaction. No future transaction will
+ * the oldest XMIN of any running transaction. No future transaction will
* attempt to reference any pg_subtrans entry older than that (see Asserts
- * in subtrans.c). During recovery, though, we mustn't do this because
+ * in subtrans.c). During recovery, though, we mustn't do this because
* StartupSUBTRANS hasn't been called yet.
*/
if (!RecoveryInProgress())
- TruncateSUBTRANS(GetOldestXmin(true, false));
+ TruncateSUBTRANS(GetOldestXmin(NULL, false));
+
+ /*
+ * Truncate pg_multixact too.
+ */
+ TruncateMultiXact();
/* Real work is done, but log and update stats before releasing lock. */
LogCheckpointEnd(false);
* CreateRestartPoint() allows for the case where recovery may end before
* the restartpoint completes so there is no concern of concurrent behaviour.
*/
-void
+static void
CreateEndOfRecoveryRecord(void)
{
xl_end_of_recovery xlrec;
xlrec.end_time = time(NULL);
- WALInsertSlotAcquire(true);
+ WALInsertLockAcquireExclusive();
xlrec.ThisTimeLineID = ThisTimeLineID;
xlrec.PrevTimeLineID = XLogCtl->PrevTimeLineID;
- WALInsertSlotRelease();
+ WALInsertLockRelease();
LocalSetXLogInsertAllowed();
CheckPointPredicate();
CheckPointRelationMap();
CheckPointReplicationSlots();
+ CheckPointSnapBuild();
+ CheckPointLogicalRewriteHeap();
CheckPointBuffers(flags); /* performs all required fsyncs */
/* We deliberately delay 2PC checkpointing as long as possible */
CheckPointTwoPhase(checkPointRedo);
static void
RecoveryRestartPoint(const CheckPoint *checkPoint)
{
- int rmid;
-
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
- /*
- * Is it safe to restartpoint? We must ask each of the resource managers
- * whether they have any partial state information that might prevent a
- * correct restart from this point. If so, we skip this opportunity, but
- * return at the next checkpoint record for another try.
- */
- for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
- {
- if (RmgrTable[rmid].rm_safe_restartpoint != NULL)
- if (!(RmgrTable[rmid].rm_safe_restartpoint()))
- {
- elog(trace_recovery(DEBUG2),
- "RM %d not safe to record restart point at %X/%X",
- rmid,
- (uint32) (checkPoint->redo >> 32),
- (uint32) checkPoint->redo);
- return;
- }
- }
-
/*
* Also refrain from creating a restartpoint if we have seen any
* references to non-existent pages. Restarting recovery from the
* during recovery this is just pro forma, because no WAL insertions are
* happening.
*/
- WALInsertSlotAcquire(true);
+ WALInsertLockAcquireExclusive();
xlogctl->Insert.RedoRecPtr = lastCheckPoint.redo;
- WALInsertSlotRelease();
+ WALInsertLockRelease();
/* Also update the info_lck-protected copy */
SpinLockAcquire(&xlogctl->info_lck);
}
LWLockRelease(ControlFileLock);
- /*
- * Due to an historical accident multixact truncations are not WAL-logged,
- * but just performed everytime the mxact horizon is increased. So, unless
- * we explicitly execute truncations on a standby it will never clean out
- * /pg_multixact which obviously is bad, both because it uses space and
- * because we can wrap around into pre-existing data...
- *
- * We can only do the truncation here, after the UpdateControlFile()
- * above, because we've now safely established a restart point, that
- * guarantees we will not need need to access those multis.
- *
- * It's probably worth improving this.
- */
- TruncateMultiXact(lastCheckPoint.oldestMulti);
-
/*
* Delete old log files (those no longer needed even for previous
* checkpoint/restartpoint) to prevent the disk holding the xlog from
_logSegNo--;
/*
- * Try to recycle segments on a useful timeline. If we've been promoted
- * since the beginning of this restartpoint, use the new timeline
- * chosen at end of recovery (RecoveryInProgress() sets ThisTimeLineID
- * in that case). If we're still in recovery, use the timeline we're
- * currently replaying.
+ * Try to recycle segments on a useful timeline. If we've been
+ * promoted since the beginning of this restartpoint, use the new
+ * timeline chosen at end of recovery (RecoveryInProgress() sets
+ * ThisTimeLineID in that case). If we're still in recovery, use the
+ * timeline we're currently replaying.
*
* There is no guarantee that the WAL segments will be useful on the
* current timeline; if recovery proceeds to a new timeline right
ThisTimeLineID = 0;
}
+ /*
+ * Due to an historical accident multixact truncations are not WAL-logged,
+ * but just performed everytime the mxact horizon is increased. So, unless
+ * we explicitly execute truncations on a standby it will never clean out
+ * /pg_multixact which obviously is bad, both because it uses space and
+ * because we can wrap around into pre-existing data...
+ *
+ * We can only do the truncation here, after the UpdateControlFile()
+ * above, because we've now safely established a restart point. That
+ * guarantees we will not need to access those multis.
+ *
+ * It's probably worth improving this.
+ */
+ TruncateMultiXact();
+
/*
* Truncate pg_subtrans if possible. We can throw away all data before
- * the oldest XMIN of any running transaction. No future transaction will
+ * the oldest XMIN of any running transaction. No future transaction will
* attempt to reference any pg_subtrans entry older than that (see Asserts
- * in subtrans.c). When hot standby is disabled, though, we mustn't do
+ * in subtrans.c). When hot standby is disabled, though, we mustn't do
* this because StartupSUBTRANS hasn't been called yet.
*/
if (EnableHotStandby)
- TruncateSUBTRANS(GetOldestXmin(true, false));
+ TruncateSUBTRANS(GetOldestXmin(NULL, false));
/* Real work is done, but log and update before releasing lock. */
LogCheckpointEnd(true);
/* then check whether slots limit removal further */
if (max_replication_slots > 0 && keep != InvalidXLogRecPtr)
{
- XLogRecPtr slotSegNo;
+ XLogRecPtr slotSegNo;
XLByteToSeg(keep, slotSegNo);
* We need not flush the NEXTOID record immediately, because any of the
* just-allocated OIDs could only reach disk as part of a tuple insert or
* update that would have its own XLOG record that must follow the NEXTOID
- * record. Therefore, the standard buffer LSN interlock applied to those
+ * record. Therefore, the standard buffer LSN interlock applied to those
* records will ensure no such OID reaches disk before the NEXTOID record
* does.
*
xl_restore_point xlrec;
xlrec.rp_time = GetCurrentTimestamp();
- strncpy(xlrec.rp_name, rpName, MAXFNAMELEN);
+ strlcpy(xlrec.rp_name, rpName, MAXFNAMELEN);
rdata.buffer = InvalidBuffer;
rdata.data = (char *) &xlrec;
* lsn updates. We assume pd_lower/upper cannot be changed without an
* exclusive lock, so the contents bkp are not racy.
*
- * With buffer_std set to false, XLogCheckBuffer() sets hole_length and
- * hole_offset to 0; so the following code is safe for either case.
+ * With buffer_std set to false, XLogCheckBuffer() sets hole_length
+ * and hole_offset to 0; so the following code is safe for either
+ * case.
*/
memcpy(copied_buffer, origdata, bkpb.hole_offset);
memcpy(copied_buffer + bkpb.hole_offset,
{
XLogRecData rdata;
xl_parameter_change xlrec;
+ XLogRecPtr recptr;
xlrec.MaxConnections = MaxConnections;
xlrec.max_worker_processes = max_worker_processes;
rdata.len = sizeof(xlrec);
rdata.next = NULL;
- XLogInsert(RM_XLOG_ID, XLOG_PARAMETER_CHANGE, &rdata);
+ recptr = XLogInsert(RM_XLOG_ID, XLOG_PARAMETER_CHANGE, &rdata);
+ XLogFlush(recptr);
}
ControlFile->MaxConnections = MaxConnections;
*/
if (fullPageWrites)
{
- WALInsertSlotAcquire(true);
+ WALInsertLockAcquireExclusive();
Insert->fullPageWrites = true;
- WALInsertSlotRelease();
+ WALInsertLockRelease();
}
/*
if (!fullPageWrites)
{
- WALInsertSlotAcquire(true);
+ WALInsertLockAcquireExclusive();
Insert->fullPageWrites = false;
- WALInsertSlotRelease();
+ WALInsertLockRelease();
}
END_CRIT_SECTION();
}
/*
* We used to try to take the maximum of ShmemVariableCache->nextOid
* and the recorded nextOid, but that fails if the OID counter wraps
- * around. Since no OID allocation should be happening during replay
+ * around. Since no OID allocation should be happening during replay
* anyway, better to just believe the record exactly. We still take
* OidGenLock while setting the variable, just in case.
*/
checkPoint.nextMultiOffset);
SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
+ MultiXactSetSafeTruncate(checkPoint.oldestMulti);
/*
* If we see a shutdown checkpoint while waiting for an end-of-backup
checkPoint.oldestXidDB);
MultiXactAdvanceOldest(checkPoint.oldestMulti,
checkPoint.oldestMultiDB);
+ MultiXactSetSafeTruncate(checkPoint.oldestMulti);
/* ControlFile->checkPointCopy always tracks the latest ckpt XID */
ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
BkpBlock bkpb;
/*
- * Full-page image (FPI) records contain a backup block stored "inline"
- * in the normal data since the locking when writing hint records isn't
- * sufficient to use the normal backup block mechanism, which assumes
- * exclusive lock on the buffer supplied.
+ * Full-page image (FPI) records contain a backup block stored
+ * "inline" in the normal data since the locking when writing hint
+ * records isn't sufficient to use the normal backup block mechanism,
+ * which assumes exclusive lock on the buffer supplied.
*
* Since the only change in these backup block are hint bits, there
* are no recovery conflicts generated.
/*
* Optimize writes by bypassing kernel cache with O_DIRECT when using
- * O_SYNC/O_FSYNC and O_DSYNC. But only if archiving and streaming are
+ * O_SYNC/O_FSYNC and O_DSYNC. But only if archiving and streaming are
* disabled, otherwise the archive command or walsender process will read
* the WAL soon after writing it, which is guaranteed to cause a physical
* read if we bypassed the kernel cache. We also skip the
* during an on-line backup even if not doing so at other times, because
* it's quite possible for the backup dump to obtain a "torn" (partially
* written) copy of a database page if it reads the page concurrently with
- * our write to the same page. This can be fixed as long as the first
+ * our write to the same page. This can be fixed as long as the first
* write to the page in the WAL sequence is a full-page write. Hence, we
* turn on forcePageWrites and then force a CHECKPOINT, to ensure there
* are no dirty pages in shared memory that might get dumped while the
* Note that forcePageWrites has no effect during an online backup from
* the standby.
*
- * We must hold all the insertion slots to change the value of
+ * We must hold all the insertion locks to change the value of
* forcePageWrites, to ensure adequate interlocking against XLogInsert().
*/
- WALInsertSlotAcquire(true);
+ WALInsertLockAcquireExclusive();
if (exclusive)
{
if (XLogCtl->Insert.exclusiveBackup)
{
- WALInsertSlotRelease();
+ WALInsertLockRelease();
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("a backup is already in progress"),
else
XLogCtl->Insert.nonExclusiveBackups++;
XLogCtl->Insert.forcePageWrites = true;
- WALInsertSlotRelease();
+ WALInsertLockRelease();
/* Ensure we release forcePageWrites if fail below */
PG_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) BoolGetDatum(exclusive));
* old timeline IDs. That would otherwise happen if you called
* pg_start_backup() right after restoring from a PITR archive: the
* first WAL segment containing the startup checkpoint has pages in
- * the beginning with the old timeline ID. That can cause trouble at
+ * the beginning with the old timeline ID. That can cause trouble at
* recovery: we won't have a history file covering the old timeline if
* pg_xlog directory was not included in the base backup and the WAL
* archive was cleared too before starting the backup.
bool checkpointfpw;
/*
- * Force a CHECKPOINT. Aside from being necessary to prevent torn
+ * Force a CHECKPOINT. Aside from being necessary to prevent torn
* page problems, this guarantees that two successive backup runs
* will have different checkpoint positions and hence different
* history file names, even if nothing happened in between.
* taking a checkpoint right after another is not that expensive
* either because only few buffers have been dirtied yet.
*/
- WALInsertSlotAcquire(true);
+ WALInsertLockAcquireExclusive();
if (XLogCtl->Insert.lastBackupStart < startpoint)
{
XLogCtl->Insert.lastBackupStart = startpoint;
gotUniqueStartpoint = true;
}
- WALInsertSlotRelease();
+ WALInsertLockRelease();
} while (!gotUniqueStartpoint);
XLByteToSeg(startpoint, _logSegNo);
bool exclusive = DatumGetBool(arg);
/* Update backup counters and forcePageWrites on failure */
- WALInsertSlotAcquire(true);
+ WALInsertLockAcquireExclusive();
if (exclusive)
{
Assert(XLogCtl->Insert.exclusiveBackup);
{
XLogCtl->Insert.forcePageWrites = false;
}
- WALInsertSlotRelease();
+ WALInsertLockRelease();
}
/*
/*
* OK to update backup counters and forcePageWrites
*/
- WALInsertSlotAcquire(true);
+ WALInsertLockAcquireExclusive();
if (exclusive)
XLogCtl->Insert.exclusiveBackup = false;
else
{
XLogCtl->Insert.forcePageWrites = false;
}
- WALInsertSlotRelease();
+ WALInsertLockRelease();
if (exclusive)
{
* an error handler.
*
* NB: This is only for aborting a non-exclusive backup that doesn't write
- * backup_label. A backup started with pg_stop_backup() needs to be finished
+ * backup_label. A backup started with pg_start_backup() needs to be finished
* with pg_stop_backup().
*/
void
do_pg_abort_backup(void)
{
- WALInsertSlotAcquire(true);
+ WALInsertLockAcquireExclusive();
Assert(XLogCtl->Insert.nonExclusiveBackups > 0);
XLogCtl->Insert.nonExclusiveBackups--;
{
XLogCtl->Insert.forcePageWrites = false;
}
- WALInsertSlotRelease();
+ WALInsertLockRelease();
}
/*
*
* If we see a backup_label during recovery, we assume that we are recovering
* from a backup dump file, and we therefore roll forward from the checkpoint
- * identified by the label file, NOT what pg_control says. This avoids the
+ * identified by the label file, NOT what pg_control says. This avoids the
* problem that pg_control might have been archived one or more checkpoints
* later than the start of the dump, and so if we rely on it as the start
* point, we will fail to restore a consistent database state.
StringInfoData buf;
initStringInfo(&buf);
- RmgrTable[record->xl_rmid].rm_desc(&buf,
- record->xl_info,
- XLogRecGetData(record));
+ RmgrTable[record->xl_rmid].rm_desc(&buf, record);
/* don't bother emitting empty description */
if (buf.len > 0)
* Standby mode is implemented by a state machine:
*
* 1. Read from either archive or pg_xlog (XLOG_FROM_ARCHIVE), or just
- * pg_xlog (XLOG_FROM_XLOG)
+ * pg_xlog (XLOG_FROM_XLOG)
* 2. Check trigger file
* 3. Read from primary server via walreceiver (XLOG_FROM_STREAM)
* 4. Rescan timelines
* file from pg_xlog.
*/
readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2,
- currentSource == XLOG_FROM_ARCHIVE ? XLOG_FROM_ANY :
- currentSource);
+ currentSource == XLOG_FROM_ARCHIVE ? XLOG_FROM_ANY :
+ currentSource);
if (readFile >= 0)
return true; /* success! */
if (havedata)
{
/*
- * Great, streamed far enough. Open the file if it's
+ * Great, streamed far enough. Open the file if it's
* not open already. Also read the timeline history
* file if we haven't initialized timeline history
* yet; it should be streamed over and present in
- * pg_xlog by now. Use XLOG_FROM_STREAM so that
+ * pg_xlog by now. Use XLOG_FROM_STREAM so that
* source info is set correctly and XLogReceiptTime
* isn't changed.
*/
* process.
*/
HandleStartupProcInterrupts();
- } while (StandbyMode);
+ }
- return false;
+ return false; /* not reached */
}
/*
* in the current WAL page, previously read by XLogPageRead().
*
* 'emode' is the error mode that would be used to report a file-not-found
- * or legitimate end-of-WAL situation. Generally, we use it as-is, but if
+ * or legitimate end-of-WAL situation. Generally, we use it as-is, but if
* we're retrying the exact same record that we've tried previously, only
- * complain the first time to keep the noise down. However, we only do when
+ * complain the first time to keep the noise down. However, we only do when
* reading from pg_xlog, because we don't expect any invalid records in archive
* or in records streamed from master. Files in the archive should be complete,
* and we should never hit the end of WAL because we stop and wait for more WAL
fast_promote = true;
return true;
}
+ else if (errno != ENOENT)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat trigger file \"%s\": %m",
+ TriggerFile)));
+
return false;
}