* PostgreSQL transaction log manager
*
*
- * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/backend/access/transam/xlog.c
#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
+#include "access/xlogreader.h"
#include "access/xlogutils.h"
#include "catalog/catversion.h"
#include "catalog/pg_control.h"
#include "catalog/pg_database.h"
-#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/bgwriter.h"
#include "postmaster/startup.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
+#include "storage/barrier.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "utils/timestamp.h"
#include "pg_trace.h"
+extern uint32 bootstrap_data_checksum_version;
/* File path names (all relative to $PGDATA) */
#define RECOVERY_COMMAND_FILE "recovery.conf"
#define RECOVERY_COMMAND_DONE "recovery.done"
-#define PROMOTE_SIGNAL_FILE "promote"
+#define PROMOTE_SIGNAL_FILE "promote"
+#define FALLBACK_PROMOTE_SIGNAL_FILE "fallback_promote"
/* User-settable parameters */
int wal_level = WAL_LEVEL_MINIMAL;
int CommitDelay = 0; /* precommit delay in microseconds */
int CommitSiblings = 5; /* # concurrent xacts needed to sleep */
+int num_xloginsert_slots = 8;
#ifdef WAL_DEBUG
bool XLOG_DEBUG = false;
*/
static int LocalXLogInsertAllowed = -1;
-/* Are we recovering using offline XLOG archives? (only valid in the startup process) */
-bool InArchiveRecovery = false;
+/*
+ * When ArchiveRecoveryRequested is set, archive recovery was requested,
+ * ie. recovery.conf file was present. When InArchiveRecovery is set, we are
+ * currently recovering using offline XLOG archives. These variables are only
+ * valid in the startup process.
+ *
+ * When ArchiveRecoveryRequested is true, but InArchiveRecovery is false, we're
+ * currently performing crash recovery using only XLOG files in pg_xlog, but
+ * will switch to using offline XLOG archives as soon as we reach the end of
+ * WAL in pg_xlog.
+*/
+bool ArchiveRecoveryRequested = false;
+bool InArchiveRecovery = false;
/* Was the last xlog file restored from archive, or local? */
static bool restoredFromArchive = false;
/* options taken from recovery.conf for archive recovery */
-char *recoveryRestoreCommand = NULL;
+char *recoveryRestoreCommand = NULL;
static char *recoveryEndCommand = NULL;
static char *archiveCleanupCommand = NULL;
static RecoveryTargetType recoveryTarget = RECOVERY_TARGET_UNSET;
static char *recoveryTargetName;
/* options taken from recovery.conf for XLOG streaming */
-bool StandbyMode = false;
+static bool StandbyModeRequested = false;
static char *PrimaryConnInfo = NULL;
static char *TriggerFile = NULL;
+/* are we currently in standby mode? */
+bool StandbyMode = false;
+
+/* whether request for fast promotion has been made yet */
+static bool fast_promote = false;
+
/* if recoveryStopsHere returns true, it saves actual stop xid/time/name here */
static TransactionId recoveryStopXid;
static TimestampTz recoveryStopTime;
* (which is almost but not quite the same as a pointer to the most recent
* CHECKPOINT record). We update this from the shared-memory copy,
* XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
- * hold the Insert lock). See XLogInsert for details. We are also allowed
- * to update from XLogCtl->Insert.RedoRecPtr if we hold the info_lck;
+ * hold an insertion slot). See XLogInsert for details. We are also allowed
+ * to update from XLogCtl->RedoRecPtr if we hold the info_lck;
* see GetRedoRecPtr. A freshly spawned backend obtains the value during
* InitXLOGAccess.
*/
* so it's a plain spinlock. The other locks are held longer (potentially
* over I/O operations), so we use LWLocks for them. These locks are:
*
- * WALInsertLock: must be held to insert a record into the WAL buffers.
+ * WALBufMappingLock: must be held to replace a page in the WAL buffer cache.
+ * It is only held while initializing and changing the mapping. If the
+ * contents of the buffer being replaced haven't been written yet, the mapping
+ * lock is released while the write is done, and reacquired afterwards.
*
* WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
* XLogFlush).
XLogRecPtr Flush; /* last byte + 1 flushed */
} XLogwrtResult;
+
+/*
+ * A slot for inserting to the WAL. This is similar to an LWLock, the main
+ * difference is that there is an extra xlogInsertingAt field that is protected
+ * by the same mutex. Unlike an LWLock, a slot can only be acquired in
+ * exclusive mode.
+ *
+ * The xlogInsertingAt field is used to advertise to other processes how far
+ * the slot owner has progressed in inserting the record. When a backend
+ * acquires a slot, it initializes xlogInsertingAt to 1, because it doesn't
+ * yet know where it's going to insert the record. That's conservative
+ * but correct; the new insertion is certainly going to go to a byte position
+ * greater than 1. If another backend needs to flush the WAL, it will have to
+ * wait for the new insertion. xlogInsertingAt is updated after finishing the
+ * insert or when crossing a page boundary, which will wake up anyone waiting
+ * for it, whether the wait was necessary in the first place or not.
+ *
+ * A process can wait on a slot in two modes: LW_EXCLUSIVE or
+ * LW_WAIT_UNTIL_FREE. LW_EXCLUSIVE works like in an lwlock; when the slot is
+ * released, the first LW_EXCLUSIVE waiter in the queue is woken up. Processes
+ * waiting in LW_WAIT_UNTIL_FREE mode are woken up whenever the slot is
+ * released, or xlogInsertingAt is updated. In other words, a process in
+ * LW_WAIT_UNTIL_FREE mode is woken up whenever the inserter makes any progress
+ * copying the record in place. LW_WAIT_UNTIL_FREE waiters are always added to
+ * the front of the queue, while LW_EXCLUSIVE waiters are appended to the end.
+ *
+ * To join the wait queue, a process must set MyProc->lwWaitMode to the mode
+ * it wants to wait in, MyProc->lwWaiting to true, and link MyProc to the head
+ * or tail of the wait queue. The same mechanism is used to wait on an LWLock,
+ * see lwlock.c for details.
+ */
+typedef struct
+{
+ slock_t mutex; /* protects the below fields */
+ XLogRecPtr xlogInsertingAt; /* insert has completed up to this point */
+
+ PGPROC *owner; /* for debugging purposes */
+
+ bool releaseOK; /* T if ok to release waiters */
+ char exclusive; /* # of exclusive holders (0 or 1) */
+ PGPROC *head; /* head of list of waiting PGPROCs */
+ PGPROC *tail; /* tail of list of waiting PGPROCs */
+ /* tail is undefined when head is NULL */
+} XLogInsertSlot;
+
+/*
+ * All the slots are allocated as an array in shared memory. We force the
+ * array stride to be a power of 2, which saves a few cycles in indexing, but
+ * more importantly also ensures that individual slots don't cross cache line
+ * boundaries. (Of course, we have to also ensure that the array start
+ * address is suitably aligned.)
+ */
+typedef union XLogInsertSlotPadded
+{
+ XLogInsertSlot slot;
+ char pad[CACHE_LINE_SIZE];
+} XLogInsertSlotPadded;
+
/*
* Shared state data for XLogInsert.
*/
typedef struct XLogCtlInsert
{
- XLogRecPtr PrevRecord; /* start of previously-inserted record */
- int curridx; /* current block index in cache */
- XLogPageHeader currpage; /* points to header of block in cache */
- char *currpos; /* current insertion point in cache */
- XLogRecPtr RedoRecPtr; /* current redo point for insertions */
- bool forcePageWrites; /* forcing full-page writes for PITR? */
+ slock_t insertpos_lck; /* protects CurrBytePos and PrevBytePos */
+
+ /*
+ * CurrBytePos is the end of reserved WAL. The next record will be inserted
+ * at that position. PrevBytePos is the start position of the previously
+ * inserted (or rather, reserved) record - it is copied to the the prev-
+ * link of the next record. These are stored as "usable byte positions"
+ * rather than XLogRecPtrs (see XLogBytePosToRecPtr()).
+ */
+ uint64 CurrBytePos;
+ uint64 PrevBytePos;
+
+ /*
+ * Make sure the above heavily-contended spinlock and byte positions are
+ * on their own cache line. In particular, the RedoRecPtr and full page
+ * write variables below should be on a different cache line. They are
+ * read on every WAL insertion, but updated rarely, and we don't want
+ * those reads to steal the cache line containing Curr/PrevBytePos.
+ */
+ char pad[CACHE_LINE_SIZE];
/*
* fullPageWrites is the master copy used by all backends to determine
* This is required because, when full_page_writes is changed by SIGHUP,
* we must WAL-log it before it actually affects WAL-logging by backends.
* Checkpointer sets at startup or after SIGHUP.
+ *
+ * To read these fields, you must hold an insertion slot. To modify them,
+ * you must hold ALL the slots.
*/
+ XLogRecPtr RedoRecPtr; /* current redo point for insertions */
+ bool forcePageWrites; /* forcing full-page writes for PITR? */
bool fullPageWrites;
/*
bool exclusiveBackup;
int nonExclusiveBackups;
XLogRecPtr lastBackupStart;
-} XLogCtlInsert;
-/*
- * Shared state data for XLogWrite/XLogFlush.
- */
-typedef struct XLogCtlWrite
-{
- int curridx; /* cache index of next block to write */
- pg_time_t lastSegSwitchTime; /* time of last xlog segment switch */
-} XLogCtlWrite;
+ /* insertion slots, see XLogInsertSlot struct above for details */
+ XLogInsertSlotPadded *insertSlots;
+} XLogCtlInsert;
/*
* Total shared-memory state for XLOG.
*/
typedef struct XLogCtlData
{
- /* Protected by WALInsertLock: */
XLogCtlInsert Insert;
/* Protected by info_lck: */
XLogwrtRqst LogwrtRqst;
+ XLogRecPtr RedoRecPtr; /* a recent copy of Insert->RedoRecPtr */
uint32 ckptXidEpoch; /* nextXID & epoch of latest checkpoint */
TransactionId ckptXid;
XLogRecPtr asyncXactLSN; /* LSN of newest async commit/abort */
- XLogSegNo lastRemovedSegNo; /* latest removed/recycled XLOG segment */
+ XLogSegNo lastRemovedSegNo; /* latest removed/recycled XLOG
+ * segment */
- /* Protected by WALWriteLock: */
- XLogCtlWrite Write;
+ /* Fake LSN counter, for unlogged relations. Protected by ulsn_lck. */
+ XLogRecPtr unloggedLSN;
+ slock_t ulsn_lck;
+
+ /* Time of last xlog segment switch. Protected by WALWriteLock. */
+ pg_time_t lastSegSwitchTime;
/*
* Protected by info_lck and WALWriteLock (you must hold either lock to
*/
XLogwrtResult LogwrtResult;
+ /*
+ * Latest initialized page in the cache (last byte position + 1).
+ *
+ * To change the identity of a buffer (and InitializedUpTo), you need to
+ * hold WALBufMappingLock. To change the identity of a buffer that's still
+ * dirty, the old page needs to be written out first, and for that you
+ * need WALWriteLock, and you need to ensure that there are no in-progress
+ * insertions to the page by calling WaitXLogInsertionsToFinish().
+ */
+ XLogRecPtr InitializedUpTo;
+
/*
* These values do not change after startup, although the pointed-to pages
- * and xlblocks values certainly do. Permission to read/write the pages
- * and xlblocks values depends on WALInsertLock and WALWriteLock.
+ * and xlblocks values certainly do. xlblock values are protected by
+ * WALBufMappingLock.
*/
char *pages; /* buffers for unwritten XLOG pages */
XLogRecPtr *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */
int XLogCacheBlck; /* highest allocated xlog buffer index */
+
+ /*
+ * Shared copy of ThisTimeLineID. Does not change after end-of-recovery.
+ * If we created a new timeline when the system was started up,
+ * PrevTimeLineID is the old timeline's ID that we forked off from.
+ * Otherwise it's equal to ThisTimeLineID.
+ */
TimeLineID ThisTimeLineID;
+ TimeLineID PrevTimeLineID;
/*
* archiveCleanupCommand is read from recovery.conf but needs to be in
static ControlFileData *ControlFile = NULL;
/*
- * Macros for managing XLogInsert state. In most cases, the calling routine
- * has local copies of XLogCtl->Insert and/or XLogCtl->Insert->curridx,
- * so these are passed as parameters instead of being fetched via XLogCtl.
+ * Calculate the amount of space left on the page after 'endptr'. Beware
+ * multiple evaluation!
*/
+#define INSERT_FREESPACE(endptr) \
+ (((endptr) % XLOG_BLCKSZ == 0) ? 0 : (XLOG_BLCKSZ - (endptr) % XLOG_BLCKSZ))
-/* Free space remaining in the current xlog page buffer */
-#define INSERT_FREESPACE(Insert) \
- (XLOG_BLCKSZ - ((Insert)->currpos - (char *) (Insert)->currpage))
-
-/* Construct XLogRecPtr value for current insertion point */
-#define INSERT_RECPTR(recptr,Insert,curridx) \
- (recptr) = XLogCtl->xlblocks[curridx] - INSERT_FREESPACE(Insert)
-
-#define PrevBufIdx(idx) \
- (((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))
-
+/* Macro to advance to next buffer index. */
#define NextBufIdx(idx) \
(((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
+/*
+ * XLogRecPtrToBufIdx returns the index of the WAL buffer that holds, or
+ * would hold if it was in cache, the page containing 'recptr'.
+ */
+#define XLogRecPtrToBufIdx(recptr) \
+ (((recptr) / XLOG_BLCKSZ) % (XLogCtl->XLogCacheBlck + 1))
+
+/*
+ * These are the number of bytes in a WAL page and segment usable for WAL data.
+ */
+#define UsableBytesInPage (XLOG_BLCKSZ - SizeOfXLogShortPHD)
+#define UsableBytesInSegment ((XLOG_SEG_SIZE / XLOG_BLCKSZ) * UsableBytesInPage - (SizeOfXLogLongPHD - SizeOfXLogShortPHD))
+
/*
* Private, possibly out-of-date copy of shared LogwrtResult.
* See discussion above.
*/
typedef enum
{
- XLOG_FROM_ANY = 0, /* request to read WAL from any source */
- XLOG_FROM_ARCHIVE, /* restored using restore_command */
- XLOG_FROM_PG_XLOG, /* existing file in pg_xlog */
- XLOG_FROM_STREAM, /* streamed from master */
+ XLOG_FROM_ANY = 0, /* request to read WAL from any source */
+ XLOG_FROM_ARCHIVE, /* restored using restore_command */
+ XLOG_FROM_PG_XLOG, /* existing file in pg_xlog */
+ XLOG_FROM_STREAM, /* streamed from master */
} XLogSource;
/* human-readable names for XLogSources, for debugging output */
-static const char *xlogSourceNames[] = { "any", "archive", "pg_xlog", "stream" };
+static const char *xlogSourceNames[] = {"any", "archive", "pg_xlog", "stream"};
/*
* openLogFile is -1 or a kernel FD for an open log file segment.
static XLogSegNo readSegNo = 0;
static uint32 readOff = 0;
static uint32 readLen = 0;
-static bool readFileHeaderValidated = false;
static XLogSource readSource = 0; /* XLOG_FROM_* code */
/*
* next.
*/
static XLogSource currentSource = 0; /* XLOG_FROM_* code */
-static bool lastSourceFailed = false;
+static bool lastSourceFailed = false;
+
+typedef struct XLogPageReadPrivate
+{
+ int emode;
+ bool fetching_ckpt; /* are we fetching a checkpoint record? */
+ bool randAccess;
+} XLogPageReadPrivate;
/*
* These variables track when we last obtained some WAL data to process,
* XLogReceiptSource tracks where we last successfully read some WAL.)
*/
static TimestampTz XLogReceiptTime = 0;
-static XLogSource XLogReceiptSource = 0; /* XLOG_FROM_* code */
-
-/* Buffer for currently read page (XLOG_BLCKSZ bytes) */
-static char *readBuf = NULL;
-
-/* Buffer for current ReadRecord result (expandable) */
-static char *readRecordBuf = NULL;
-static uint32 readRecordBufSize = 0;
+static XLogSource XLogReceiptSource = 0; /* XLOG_FROM_* code */
/* State information for XLOG reading */
static XLogRecPtr ReadRecPtr; /* start of last record read */
static XLogRecPtr EndRecPtr; /* end+1 of last record read */
-static TimeLineID lastPageTLI = 0;
-static TimeLineID lastSegmentTLI = 0;
static XLogRecPtr minRecoveryPoint; /* local copy of
* ControlFile->minRecoveryPoint */
/* Have we launched bgwriter during recovery? */
static bool bgwriterLaunched = false;
+/* For WALInsertSlotAcquire/Release functions */
+static int MySlotNo = 0;
+static bool holdingAllSlots = false;
static void readRecoveryCommandFile(void);
static void exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo);
static void SetCurrentChunkStartTime(TimestampTz xtime);
static void CheckRequiredParameterValues(void);
static void XLogReportParameters(void);
-static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI);
+static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI,
+ TimeLineID prevTLI);
static void LocalSetXLogInsertAllowed(void);
+static void CreateEndOfRecoveryRecord(void);
static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
-static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
+static bool XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock,
XLogRecPtr *lsn, BkpBlock *bkpb);
-static bool AdvanceXLInsertBuffer(bool new_segment);
+static Buffer RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb,
+ char *blk, bool get_cleanup_lock, bool keep_buffer);
+static void AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic);
static bool XLogCheckpointNeeded(XLogSegNo new_segno);
-static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
+static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
bool find_free, int *max_advance,
bool use_lock);
static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
int source, bool notexistOk);
-static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source);
-static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
- bool randAccess);
+static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source);
+static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
+ int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
+ TimeLineID *readTLI);
static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
- bool fetching_ckpt);
+ bool fetching_ckpt, XLogRecPtr tliRecPtr);
static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
static void XLogFileClose(void);
static void PreallocXlogFiles(XLogRecPtr endptr);
static void ValidateXLOGDirectoryStructure(void);
static void CleanupBackupHistory(void);
static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
-static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt);
+static XLogRecord *ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
+ int emode, bool fetching_ckpt);
static void CheckRecoveryConsistency(void);
-static bool ValidXLogPageHeader(XLogPageHeader hdr, int emode, bool segmentonly);
-static bool ValidXLogRecordHeader(XLogRecPtr *RecPtr, XLogRecord *record,
- int emode, bool randAccess);
-static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
+static XLogRecord *ReadCheckpointRecord(XLogReaderState *xlogreader,
+ XLogRecPtr RecPtr, int whichChkpti, bool report);
static bool rescanLatestTimeLine(void);
static void WriteControlFile(void);
static void ReadControlFile(void);
static void rm_redo_error_callback(void *arg);
static int get_sync_bit(int method);
+static void CopyXLogRecordToWAL(int write_len, bool isLogSwitch,
+ XLogRecData *rdata,
+ XLogRecPtr StartPos, XLogRecPtr EndPos);
+static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
+ XLogRecPtr *EndPos, XLogRecPtr *PrevPtr);
+static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
+ XLogRecPtr *PrevPtr);
+static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
+static void WakeupWaiters(XLogRecPtr EndPos);
+static char *GetXLogBuffer(XLogRecPtr ptr);
+static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
+static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
+static uint64 XLogRecPtrToBytePos(XLogRecPtr ptr);
+
+static void WALInsertSlotAcquire(bool exclusive);
+static void WALInsertSlotAcquireOne(int slotno);
+static void WALInsertSlotRelease(void);
+static void WALInsertSlotReleaseOne(int slotno);
/*
* Insert an XLOG record having the specified RMID and info bytes,
XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
{
XLogCtlInsert *Insert = &XLogCtl->Insert;
- XLogRecPtr RecPtr;
- XLogRecPtr WriteRqst;
- uint32 freespace;
- int curridx;
XLogRecData *rdt;
XLogRecData *rdt_lastnormal;
Buffer dtbuf[XLR_MAX_BKP_BLOCKS];
uint32 len,
write_len;
unsigned i;
- bool updrqst;
bool doPageWrites;
bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
+ bool inserted;
uint8 info_orig = info;
static XLogRecord *rechdr;
+ XLogRecPtr StartPos;
+ XLogRecPtr EndPos;
if (rechdr == NULL)
{
*/
if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
{
- RecPtr = SizeOfXLogLongPHD; /* start of 1st chkpt record */
- return RecPtr;
+ EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */
+ return EndPos;
}
/*
* up.
*
* We may have to loop back to here if a race condition is detected below.
- * We could prevent the race by doing all this work while holding the
- * insert lock, but it seems better to avoid doing CRC calculations while
- * holding the lock.
+ * We could prevent the race by doing all this work while holding an
+ * insertion slot, but it seems better to avoid doing CRC calculations
+ * while holding one.
*
* We add entries for backup blocks to the chain, so that they don't need
* any special treatment in the critical section where the chunks are
/*
* Decide if we need to do full-page writes in this XLOG record: true if
* full_page_writes is on or we have a PITR request for it. Since we
- * don't yet have the insert lock, fullPageWrites and forcePageWrites
- * could change under us, but we'll recheck them once we have the lock.
+ * don't yet have an insertion slot, fullPageWrites and forcePageWrites
+ * could change under us, but we'll recheck them once we have a slot.
*/
doPageWrites = Insert->fullPageWrites || Insert->forcePageWrites;
{
/* OK, put it in this slot */
dtbuf[i] = rdt->buffer;
- if (XLogCheckBuffer(rdt, doPageWrites,
- &(dtbuf_lsn[i]), &(dtbuf_xlg[i])))
+ if (doPageWrites && XLogCheckBuffer(rdt, true,
+ &(dtbuf_lsn[i]), &(dtbuf_xlg[i])))
{
dtbuf_bkp[i] = true;
rdt->data = NULL;
COMP_CRC32(rdata_crc, rdt->data, rdt->len);
/*
- * Construct record header (prev-link and CRC are filled in later), and
- * make that the first chunk in the chain.
+ * Construct record header (prev-link is filled in later, after reserving
+ * the space for the record), and make that the first chunk in the chain.
+ *
+ * The CRC calculated for the header here doesn't include prev-link,
+ * because we don't know it yet. It will be added later.
*/
rechdr->xl_xid = GetCurrentTransactionIdIfAny();
rechdr->xl_tot_len = SizeOfXLogRecord + write_len;
rechdr->xl_len = len; /* doesn't include backup blocks */
rechdr->xl_info = info;
rechdr->xl_rmid = rmid;
+ rechdr->xl_prev = InvalidXLogRecPtr;
+ COMP_CRC32(rdata_crc, ((char *) rechdr), offsetof(XLogRecord, xl_prev));
hdr_rdt.next = rdata;
hdr_rdt.data = (char *) rechdr;
hdr_rdt.len = SizeOfXLogRecord;
-
write_len += SizeOfXLogRecord;
+ /*----------
+ *
+ * We have now done all the preparatory work we can without holding a
+ * lock or modifying shared state. From here on, inserting the new WAL
+ * record to the shared WAL buffer cache is a two-step process:
+ *
+ * 1. Reserve the right amount of space from the WAL. The current head of
+ * reserved space is kept in Insert->CurrBytePos, and is protected by
+ * insertpos_lck.
+ *
+ * 2. Copy the record to the reserved WAL space. This involves finding the
+ * correct WAL buffer containing the reserved space, and copying the
+ * record in place. This can be done concurrently in multiple processes.
+ *
+ * To keep track of which insertions are still in-progress, each concurrent
+ * inserter allocates an "insertion slot", which tells others how far the
+ * inserter has progressed. There is a small fixed number of insertion
+ * slots, determined by the num_xloginsert_slots GUC. When an inserter
+ * finishes, it updates the xlogInsertingAt of its slot to the end of the
+ * record it inserted, to let others know that it's done. xlogInsertingAt
+ * is also updated when crossing over to a new WAL buffer, to allow the
+ * the previous buffer to be flushed.
+ *
+ * Holding onto a slot also protects RedoRecPtr and fullPageWrites from
+ * changing until the insertion is finished.
+ *
+ * Step 2 can usually be done completely in parallel. If the required WAL
+ * page is not initialized yet, you have to grab WALBufMappingLock to
+ * initialize it, but the WAL writer tries to do that ahead of insertions
+ * to avoid that from happening in the critical path.
+ *
+ *----------
+ */
START_CRIT_SECTION();
-
- /* Now wait to get insert lock */
- LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+ WALInsertSlotAcquire(isLogSwitch);
/*
* Check to see if my RedoRecPtr is out of date. If so, may have to go
* affect the contents of the XLOG record, so we'll update our local copy
* but not force a recomputation.
*/
- if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))
+ if (RedoRecPtr != Insert->RedoRecPtr)
{
- Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));
+ Assert(RedoRecPtr < Insert->RedoRecPtr);
RedoRecPtr = Insert->RedoRecPtr;
if (doPageWrites)
if (dtbuf[i] == InvalidBuffer)
continue;
if (dtbuf_bkp[i] == false &&
- XLByteLE(dtbuf_lsn[i], RedoRecPtr))
+ dtbuf_lsn[i] <= RedoRecPtr)
{
/*
* Oops, this buffer now needs to be backed up, but we
* didn't think so above. Start over.
*/
- LWLockRelease(WALInsertLock);
+ WALInsertSlotRelease();
END_CRIT_SECTION();
rdt_lastnormal->next = NULL;
info = info_orig;
if ((Insert->fullPageWrites || Insert->forcePageWrites) && !doPageWrites)
{
/* Oops, must redo it with full-page data. */
- LWLockRelease(WALInsertLock);
+ WALInsertSlotRelease();
END_CRIT_SECTION();
rdt_lastnormal->next = NULL;
info = info_orig;
}
/*
- * If the current page is completely full, the record goes to the next
- * page, right after the page header.
+ * Reserve space for the record in the WAL. This also sets the xl_prev
+ * pointer.
*/
- updrqst = false;
- freespace = INSERT_FREESPACE(Insert);
- if (freespace == 0)
+ if (isLogSwitch)
+ inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev);
+ else
+ {
+ ReserveXLogInsertLocation(write_len, &StartPos, &EndPos,
+ &rechdr->xl_prev);
+ inserted = true;
+ }
+
+ if (inserted)
+ {
+ /*
+ * Now that xl_prev has been filled in, finish CRC calculation of the
+ * record header.
+ */
+ COMP_CRC32(rdata_crc, ((char *) &rechdr->xl_prev), sizeof(XLogRecPtr));
+ FIN_CRC32(rdata_crc);
+ rechdr->xl_crc = rdata_crc;
+
+ /*
+ * All the record data, including the header, is now ready to be
+ * inserted. Copy the record in the space reserved.
+ */
+ CopyXLogRecordToWAL(write_len, isLogSwitch, &hdr_rdt, StartPos, EndPos);
+ }
+ else
{
- updrqst = AdvanceXLInsertBuffer(false);
- freespace = INSERT_FREESPACE(Insert);
+ /*
+ * This was an xlog-switch record, but the current insert location was
+ * already exactly at the beginning of a segment, so there was no need
+ * to do anything.
+ */
}
- /* Compute record's XLOG location */
- curridx = Insert->curridx;
- INSERT_RECPTR(RecPtr, Insert, curridx);
+ /*
+ * Done! Let others know that we're finished.
+ */
+ WALInsertSlotRelease();
+
+ END_CRIT_SECTION();
/*
- * If the record is an XLOG_SWITCH, and we are exactly at the start of a
- * segment, we need not insert it (and don't want to because we'd like
- * consecutive switch requests to be no-ops). Instead, make sure
- * everything is written and flushed through the end of the prior segment,
- * and return the prior segment's end address.
+ * Update shared LogwrtRqst.Write, if we crossed page boundary.
*/
- if (isLogSwitch && (RecPtr % XLogSegSize) == SizeOfXLogLongPHD)
+ if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
{
- /* We can release insert lock immediately */
- LWLockRelease(WALInsertLock);
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
- RecPtr -= SizeOfXLogLongPHD;
+ SpinLockAcquire(&xlogctl->info_lck);
+ /* advance global request to include new block(s) */
+ if (xlogctl->LogwrtRqst.Write < EndPos)
+ xlogctl->LogwrtRqst.Write = EndPos;
+ /* update local result copy while I have the chance */
+ LogwrtResult = xlogctl->LogwrtResult;
+ SpinLockRelease(&xlogctl->info_lck);
+ }
- LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
- LogwrtResult = XLogCtl->LogwrtResult;
- if (!XLByteLE(RecPtr, LogwrtResult.Flush))
+ /*
+ * If this was an XLOG_SWITCH record, flush the record and the empty
+ * padding space that fills the rest of the segment, and perform
+ * end-of-segment actions (eg, notifying archiver).
+ */
+ if (isLogSwitch)
+ {
+ TRACE_POSTGRESQL_XLOG_SWITCH();
+ XLogFlush(EndPos);
+ /*
+ * Even though we reserved the rest of the segment for us, which is
+ * reflected in EndPos, we return a pointer to just the end of the
+ * xlog-switch record.
+ */
+ if (inserted)
{
- XLogwrtRqst FlushRqst;
-
- FlushRqst.Write = RecPtr;
- FlushRqst.Flush = RecPtr;
- XLogWrite(FlushRqst, false, false);
+ EndPos = StartPos + SizeOfXLogRecord;
+ if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
+ {
+ if (EndPos % XLOG_SEG_SIZE == EndPos % XLOG_BLCKSZ)
+ EndPos += SizeOfXLogLongPHD;
+ else
+ EndPos += SizeOfXLogShortPHD;
+ }
}
- LWLockRelease(WALWriteLock);
-
- END_CRIT_SECTION();
-
- /* wake up walsenders now that we've released heavily contended locks */
- WalSndWakeupProcessRequests();
- return RecPtr;
}
- /* Finish the record header */
- rechdr->xl_prev = Insert->PrevRecord;
-
- /* Now we can finish computing the record's CRC */
- COMP_CRC32(rdata_crc, (char *) rechdr, offsetof(XLogRecord, xl_crc));
- FIN_CRC32(rdata_crc);
- rechdr->xl_crc = rdata_crc;
-
#ifdef WAL_DEBUG
if (XLOG_DEBUG)
{
initStringInfo(&buf);
appendStringInfo(&buf, "INSERT @ %X/%X: ",
- (uint32) (RecPtr >> 32), (uint32) RecPtr);
+ (uint32) (EndPos >> 32), (uint32) EndPos);
xlog_outrec(&buf, rechdr);
if (rdata->data != NULL)
{
}
#endif
- /* Record begin of record in appropriate places */
- ProcLastRecPtr = RecPtr;
- Insert->PrevRecord = RecPtr;
-
/*
- * Append the data, including backup blocks if any
+ * Update our global variables
*/
- rdata = &hdr_rdt;
- while (write_len)
- {
- while (rdata->data == NULL)
- rdata = rdata->next;
+ ProcLastRecPtr = StartPos;
+ XactLastRecEnd = EndPos;
- if (freespace > 0)
- {
- if (rdata->len > freespace)
- {
- memcpy(Insert->currpos, rdata->data, freespace);
- rdata->data += freespace;
- rdata->len -= freespace;
- write_len -= freespace;
- }
- else
- {
- memcpy(Insert->currpos, rdata->data, rdata->len);
- freespace -= rdata->len;
- write_len -= rdata->len;
- Insert->currpos += rdata->len;
- rdata = rdata->next;
- continue;
- }
- }
+ return EndPos;
+}
- /* Use next buffer */
- updrqst = AdvanceXLInsertBuffer(false);
- curridx = Insert->curridx;
- /* Mark page header to indicate this record continues on the page */
- Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
- Insert->currpage->xlp_rem_len = write_len;
- freespace = INSERT_FREESPACE(Insert);
- }
+/*
+ * Reserves the right amount of space for a record of given size from the WAL.
+ * *StartPos is set to the beginning of the reserved section, *EndPos to
+ * its end+1. *PrevPtr is set to the beginning of the previous record; it is
+ * used to set the xl_prev of this record.
+ *
+ * This is the performance critical part of XLogInsert that must be serialized
+ * across backends. The rest can happen mostly in parallel. Try to keep this
+ * section as short as possible, insertpos_lck can be heavily contended on a
+ * busy system.
+ *
+ * NB: The space calculation here must match the code in CopyXLogRecordToWAL,
+ * where we actually copy the record to the reserved space.
+ */
+static void
+ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos,
+ XLogRecPtr *PrevPtr)
+{
+ volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+ uint64 startbytepos;
+ uint64 endbytepos;
+ uint64 prevbytepos;
- /* Ensure next record will be properly aligned */
- Insert->currpos = (char *) Insert->currpage +
- MAXALIGN(Insert->currpos - (char *) Insert->currpage);
- freespace = INSERT_FREESPACE(Insert);
+ size = MAXALIGN(size);
- /*
- * The recptr I return is the beginning of the *next* record. This will be
- * stored as LSN for changed data pages...
- */
- INSERT_RECPTR(RecPtr, Insert, curridx);
+ /* All (non xlog-switch) records should contain data. */
+ Assert(size > SizeOfXLogRecord);
/*
- * If the record is an XLOG_SWITCH, we must now write and flush all the
- * existing data, and then forcibly advance to the start of the next
- * segment. It's not good to do this I/O while holding the insert lock,
- * but there seems too much risk of confusion if we try to release the
- * lock sooner. Fortunately xlog switch needn't be a high-performance
- * operation anyway...
+ * The duration the spinlock needs to be held is minimized by minimizing
+ * the calculations that have to be done while holding the lock. The
+ * current tip of reserved WAL is kept in CurrBytePos, as a byte position
+ * that only counts "usable" bytes in WAL, that is, it excludes all WAL
+ * page headers. The mapping between "usable" byte positions and physical
+ * positions (XLogRecPtrs) can be done outside the locked region, and
+ * because the usable byte position doesn't include any headers, reserving
+ * X bytes from WAL is almost as simple as "CurrBytePos += X".
*/
- if (isLogSwitch)
- {
- XLogwrtRqst FlushRqst;
- XLogRecPtr OldSegEnd;
-
- TRACE_POSTGRESQL_XLOG_SWITCH();
-
- LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
-
- /*
- * Flush through the end of the page containing XLOG_SWITCH, and
- * perform end-of-segment actions (eg, notifying archiver).
- */
- WriteRqst = XLogCtl->xlblocks[curridx];
- FlushRqst.Write = WriteRqst;
- FlushRqst.Flush = WriteRqst;
- XLogWrite(FlushRqst, false, true);
+ SpinLockAcquire(&Insert->insertpos_lck);
- /* Set up the next buffer as first page of next segment */
- /* Note: AdvanceXLInsertBuffer cannot need to do I/O here */
- (void) AdvanceXLInsertBuffer(true);
+ startbytepos = Insert->CurrBytePos;
+ endbytepos = startbytepos + size;
+ prevbytepos = Insert->PrevBytePos;
+ Insert->CurrBytePos = endbytepos;
+ Insert->PrevBytePos = startbytepos;
- /* There should be no unwritten data */
- curridx = Insert->curridx;
- Assert(curridx == XLogCtl->Write.curridx);
+ SpinLockRelease(&Insert->insertpos_lck);
- /* Compute end address of old segment */
- OldSegEnd = XLogCtl->xlblocks[curridx];
- OldSegEnd -= XLOG_BLCKSZ;
+ *StartPos = XLogBytePosToRecPtr(startbytepos);
+ *EndPos = XLogBytePosToEndRecPtr(endbytepos);
+ *PrevPtr = XLogBytePosToRecPtr(prevbytepos);
- /* Make it look like we've written and synced all of old segment */
- LogwrtResult.Write = OldSegEnd;
- LogwrtResult.Flush = OldSegEnd;
+ /*
+ * Check that the conversions between "usable byte positions" and
+ * XLogRecPtrs work consistently in both directions.
+ */
+ Assert(XLogRecPtrToBytePos(*StartPos) == startbytepos);
+ Assert(XLogRecPtrToBytePos(*EndPos) == endbytepos);
+ Assert(XLogRecPtrToBytePos(*PrevPtr) == prevbytepos);
+}
- /*
- * Update shared-memory status --- this code should match XLogWrite
- */
- {
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
+/*
+ * Like ReserveXLogInsertLocation(), but for an xlog-switch record.
+ *
+ * A log-switch record is handled slightly differently. The rest of the
+ * segment will be reserved for this insertion, as indicated by the returned
+ * *EndPos value. However, if we are already at the beginning of the current
+ * segment, *StartPos and *EndPos are set to the current location without
+ * reserving any space, and the function returns false.
+*/
+static bool
+ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
+{
+ volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+ uint64 startbytepos;
+ uint64 endbytepos;
+ uint64 prevbytepos;
+ uint32 size = SizeOfXLogRecord;
+ XLogRecPtr ptr;
+ uint32 segleft;
- SpinLockAcquire(&xlogctl->info_lck);
- xlogctl->LogwrtResult = LogwrtResult;
- if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
- xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
- if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
- xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
- SpinLockRelease(&xlogctl->info_lck);
- }
+ /*
+ * These calculations are a bit heavy-weight to be done while holding a
+ * spinlock, but since we're holding all the WAL insertion slots, there
+ * are no other inserters competing for it. GetXLogInsertRecPtr() does
+ * compete for it, but that's not called very frequently.
+ */
+ SpinLockAcquire(&Insert->insertpos_lck);
- LWLockRelease(WALWriteLock);
+ startbytepos = Insert->CurrBytePos;
- updrqst = false; /* done already */
- }
- else
+ ptr = XLogBytePosToEndRecPtr(startbytepos);
+ if (ptr % XLOG_SEG_SIZE == 0)
{
- /* normal case, ie not xlog switch */
-
- /* Need to update shared LogwrtRqst if some block was filled up */
- if (freespace == 0)
- {
- /* curridx is filled and available for writing out */
- updrqst = true;
- }
- else
- {
- /* if updrqst already set, write through end of previous buf */
- curridx = PrevBufIdx(curridx);
- }
- WriteRqst = XLogCtl->xlblocks[curridx];
+ SpinLockRelease(&Insert->insertpos_lck);
+ *EndPos = *StartPos = ptr;
+ return false;
}
- LWLockRelease(WALInsertLock);
+ endbytepos = startbytepos + size;
+ prevbytepos = Insert->PrevBytePos;
- if (updrqst)
- {
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
+ *StartPos = XLogBytePosToRecPtr(startbytepos);
+ *EndPos = XLogBytePosToEndRecPtr(endbytepos);
- SpinLockAcquire(&xlogctl->info_lck);
- /* advance global request to include new block(s) */
- if (XLByteLT(xlogctl->LogwrtRqst.Write, WriteRqst))
- xlogctl->LogwrtRqst.Write = WriteRqst;
- /* update local result copy while I have the chance */
- LogwrtResult = xlogctl->LogwrtResult;
- SpinLockRelease(&xlogctl->info_lck);
+ segleft = XLOG_SEG_SIZE - ((*EndPos) % XLOG_SEG_SIZE);
+ if (segleft != XLOG_SEG_SIZE)
+ {
+ /* consume the rest of the segment */
+ *EndPos += segleft;
+ endbytepos = XLogRecPtrToBytePos(*EndPos);
}
+ Insert->CurrBytePos = endbytepos;
+ Insert->PrevBytePos = startbytepos;
- XactLastRecEnd = RecPtr;
+ SpinLockRelease(&Insert->insertpos_lck);
- END_CRIT_SECTION();
+ *PrevPtr = XLogBytePosToRecPtr(prevbytepos);
- /* wake up walsenders now that we've released heavily contended locks */
- WalSndWakeupProcessRequests();
+ Assert((*EndPos) % XLOG_SEG_SIZE == 0);
+ Assert(XLogRecPtrToBytePos(*EndPos) == endbytepos);
+ Assert(XLogRecPtrToBytePos(*StartPos) == startbytepos);
+ Assert(XLogRecPtrToBytePos(*PrevPtr) == prevbytepos);
- return RecPtr;
+ return true;
}
/*
- * Determine whether the buffer referenced by an XLogRecData item has to
- * be backed up, and if so fill a BkpBlock struct for it. In any case
- * save the buffer's LSN at *lsn.
+ * Subroutine of XLogInsert. Copies a WAL record to an already-reserved
+ * area in the WAL.
*/
-static bool
-XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
- XLogRecPtr *lsn, BkpBlock *bkpb)
+static void
+CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
+ XLogRecPtr StartPos, XLogRecPtr EndPos)
{
- Page page;
+ char *currpos;
+ int freespace;
+ int written;
+ XLogRecPtr CurrPos;
+ XLogPageHeader pagehdr;
- page = BufferGetPage(rdata->buffer);
+ /* The first chunk is the record header */
+ Assert(rdata->len == SizeOfXLogRecord);
+
+ /*
+ * Get a pointer to the right place in the right WAL buffer to start
+ * inserting to.
+ */
+ CurrPos = StartPos;
+ currpos = GetXLogBuffer(CurrPos);
+ freespace = INSERT_FREESPACE(CurrPos);
/*
- * XXX We assume page LSN is first data on *every* page that can be passed
- * to XLogInsert, whether it otherwise has the standard page layout or
- * not. We don't need the buffer header lock for PageGetLSN because we
- * have exclusive lock on the page and/or the relation.
+ * there should be enough space for at least the first field (xl_tot_len)
+ * on this page.
*/
- *lsn = PageGetLSN(page);
+ Assert(freespace >= sizeof(uint32));
- if (doPageWrites &&
- XLByteLE(PageGetLSN(page), RedoRecPtr))
+ /* Copy record data */
+ written = 0;
+ while (rdata != NULL)
{
- /*
- * The page needs to be backed up, so set up *bkpb
- */
- BufferGetTag(rdata->buffer, &bkpb->node, &bkpb->fork, &bkpb->block);
+ char *rdata_data = rdata->data;
+ int rdata_len = rdata->len;
- if (rdata->buffer_std)
+ while (rdata_len > freespace)
{
- /* Assume we can omit data between pd_lower and pd_upper */
- uint16 lower = ((PageHeader) page)->pd_lower;
- uint16 upper = ((PageHeader) page)->pd_upper;
+ /*
+ * Write what fits on this page, and continue on the next page.
+ */
+ Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || freespace == 0);
+ memcpy(currpos, rdata_data, freespace);
+ rdata_data += freespace;
+ rdata_len -= freespace;
+ written += freespace;
+ CurrPos += freespace;
- if (lower >= SizeOfPageHeaderData &&
- upper > lower &&
- upper <= BLCKSZ)
+ /*
+ * Get pointer to beginning of next page, and set the xlp_rem_len
+ * in the page header. Set XLP_FIRST_IS_CONTRECORD.
+ *
+ * It's safe to set the contrecord flag and xlp_rem_len without a
+ * lock on the page. All the other flags were already set when the
+ * page was initialized, in AdvanceXLInsertBuffer, and we're the
+ * only backend that needs to set the contrecord flag.
+ */
+ currpos = GetXLogBuffer(CurrPos);
+ pagehdr = (XLogPageHeader) currpos;
+ pagehdr->xlp_rem_len = write_len - written;
+ pagehdr->xlp_info |= XLP_FIRST_IS_CONTRECORD;
+
+ /* skip over the page header */
+ if (CurrPos % XLogSegSize == 0)
{
- bkpb->hole_offset = lower;
- bkpb->hole_length = upper - lower;
+ CurrPos += SizeOfXLogLongPHD;
+ currpos += SizeOfXLogLongPHD;
}
else
{
- /* No "hole" to compress out */
- bkpb->hole_offset = 0;
- bkpb->hole_length = 0;
+ CurrPos += SizeOfXLogShortPHD;
+ currpos += SizeOfXLogShortPHD;
}
- }
- else
- {
- /* Not a standard page header, don't try to eliminate "hole" */
- bkpb->hole_offset = 0;
- bkpb->hole_length = 0;
+ freespace = INSERT_FREESPACE(CurrPos);
}
- return true; /* buffer requires backup */
- }
+ Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || rdata_len == 0);
+ memcpy(currpos, rdata_data, rdata_len);
+ currpos += rdata_len;
+ CurrPos += rdata_len;
+ freespace -= rdata_len;
+ written += rdata_len;
- return false; /* buffer does not need to be backed up */
-}
+ rdata = rdata->next;
+ }
+ Assert(written == write_len);
-/*
- * Advance the Insert state to the next buffer page, writing out the next
- * buffer if it still contains unwritten data.
- *
- * If new_segment is TRUE then we set up the next buffer page as the first
- * page of the next xlog segment file, possibly but not usually the next
- * consecutive file page.
- *
- * The global LogwrtRqst.Write pointer needs to be advanced to include the
- * just-filled page. If we can do this for free (without an extra lock),
- * we do so here. Otherwise the caller must do it. We return TRUE if the
- * request update still needs to be done, FALSE if we did it internally.
- *
- * Must be called with WALInsertLock held.
- */
-static bool
-AdvanceXLInsertBuffer(bool new_segment)
-{
- XLogCtlInsert *Insert = &XLogCtl->Insert;
- int nextidx = NextBufIdx(Insert->curridx);
- bool update_needed = true;
- XLogRecPtr OldPageRqstPtr;
- XLogwrtRqst WriteRqst;
- XLogRecPtr NewPageEndPtr;
- XLogRecPtr NewPageBeginPtr;
- XLogPageHeader NewPage;
+ /* Align the end position, so that the next record starts aligned */
+ CurrPos = MAXALIGN(CurrPos);
/*
- * Get ending-offset of the buffer page we need to replace (this may be
- * zero if the buffer hasn't been used yet). Fall through if it's already
- * written out.
+ * If this was an xlog-switch, it's not enough to write the switch record,
+ * we also have to consume all the remaining space in the WAL segment.
+ * We have already reserved it for us, but we still need to make sure it's
+ * allocated and zeroed in the WAL buffers so that when the caller (or
+ * someone else) does XLogWrite(), it can really write out all the zeros.
*/
- OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
- if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
+ if (isLogSwitch && CurrPos % XLOG_SEG_SIZE != 0)
{
- /* nope, got work to do... */
- XLogRecPtr FinishedPageRqstPtr;
+ /* An xlog-switch record doesn't contain any data besides the header */
+ Assert(write_len == SizeOfXLogRecord);
- FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx];
+ /*
+ * We do this one page at a time, to make sure we don't deadlock
+ * against ourselves if wal_buffers < XLOG_SEG_SIZE.
+ */
+ Assert(EndPos % XLogSegSize == 0);
- /* Before waiting, get info_lck and update LogwrtResult */
- {
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
+ /* Use up all the remaining space on the first page */
+ CurrPos += freespace;
- SpinLockAcquire(&xlogctl->info_lck);
- if (XLByteLT(xlogctl->LogwrtRqst.Write, FinishedPageRqstPtr))
- xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr;
- LogwrtResult = xlogctl->LogwrtResult;
- SpinLockRelease(&xlogctl->info_lck);
+ while (CurrPos < EndPos)
+ {
+ /* initialize the next page (if not initialized already) */
+ WakeupWaiters(CurrPos);
+ AdvanceXLInsertBuffer(CurrPos, false);
+ CurrPos += XLOG_BLCKSZ;
}
+ }
- update_needed = false; /* Did the shared-request update */
+ if (CurrPos != EndPos)
+ elog(PANIC, "space reserved for WAL record does not match what was written");
+}
- /*
- * Now that we have an up-to-date LogwrtResult value, see if we still
- * need to write it or if someone else already did.
- */
- if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
- {
- /* Must acquire write lock */
- LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
- LogwrtResult = XLogCtl->LogwrtResult;
- if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
- {
- /* OK, someone wrote it already */
- LWLockRelease(WALWriteLock);
- }
- else
+/*
+ * Allocate a slot for insertion.
+ *
+ * In exclusive mode, all slots are reserved for the current process. That
+ * blocks all concurrent insertions.
+ */
+static void
+WALInsertSlotAcquire(bool exclusive)
+{
+ int i;
+
+ if (exclusive)
+ {
+ for (i = 0; i < num_xloginsert_slots; i++)
+ WALInsertSlotAcquireOne(i);
+ holdingAllSlots = true;
+ }
+ else
+ WALInsertSlotAcquireOne(-1);
+}
+
+/*
+ * Workhorse of WALInsertSlotAcquire. Acquires the given slot, or an arbitrary
+ * one if slotno == -1. The index of the slot that was acquired is stored in
+ * MySlotNo.
+ *
+ * This is more or less equivalent to LWLockAcquire().
+ */
+static void
+WALInsertSlotAcquireOne(int slotno)
+{
+ volatile XLogInsertSlot *slot;
+ PGPROC *proc = MyProc;
+ bool retry = false;
+ int extraWaits = 0;
+ static int slotToTry = -1;
+
+ /*
+ * Try to use the slot we used last time. If the system isn't particularly
+ * busy, it's a good bet that it's available, and it's good to have some
+ * affinity to a particular slot so that you don't unnecessarily bounce
+ * cache lines between processes when there is no contention.
+ *
+ * If this is the first time through in this backend, pick a slot
+ * (semi-)randomly. This allows the slots to be used evenly if you have a
+ * lot of very short connections.
+ */
+ if (slotno != -1)
+ MySlotNo = slotno;
+ else
+ {
+ if (slotToTry == -1)
+ slotToTry = MyProc->pgprocno % num_xloginsert_slots;
+ MySlotNo = slotToTry;
+ }
+
+ /*
+ * We can't wait if we haven't got a PGPROC. This should only occur
+ * during bootstrap or shared memory initialization. Put an Assert here
+ * to catch unsafe coding practices.
+ */
+ Assert(MyProc != NULL);
+
+ /*
+ * Lock out cancel/die interrupts until we exit the code section protected
+ * by the slot. This ensures that interrupts will not interfere with
+ * manipulations of data structures in shared memory. There is no cleanup
+ * mechanism to release the slot if the backend dies while holding one,
+ * so make this a critical section.
+ */
+ START_CRIT_SECTION();
+
+ /*
+ * Loop here to try to acquire slot after each time we are signaled by
+ * WALInsertSlotRelease.
+ */
+ for (;;)
+ {
+ bool mustwait;
+
+ slot = &XLogCtl->Insert.insertSlots[MySlotNo].slot;
+
+ /* Acquire mutex. Time spent holding mutex should be short! */
+ SpinLockAcquire(&slot->mutex);
+
+ /* If retrying, allow WALInsertSlotRelease to release waiters again */
+ if (retry)
+ slot->releaseOK = true;
+
+ /* If I can get the slot, do so quickly. */
+ if (slot->exclusive == 0)
+ {
+ slot->exclusive++;
+ mustwait = false;
+ }
+ else
+ mustwait = true;
+
+ if (!mustwait)
+ break; /* got the lock */
+
+ Assert(slot->owner != MyProc);
+
+ /*
+ * Add myself to wait queue.
+ */
+ proc->lwWaiting = true;
+ proc->lwWaitMode = LW_EXCLUSIVE;
+ proc->lwWaitLink = NULL;
+ if (slot->head == NULL)
+ slot->head = proc;
+ else
+ slot->tail->lwWaitLink = proc;
+ slot->tail = proc;
+
+ /* Can release the mutex now */
+ SpinLockRelease(&slot->mutex);
+
+ /*
+ * Wait until awakened.
+ *
+ * Since we share the process wait semaphore with the regular lock
+ * manager and ProcWaitForSignal, and we may need to acquire a slot
+ * while one of those is pending, it is possible that we get awakened
+ * for a reason other than being signaled by WALInsertSlotRelease. If
+ * so, loop back and wait again. Once we've gotten the slot,
+ * re-increment the sema by the number of additional signals received,
+ * so that the lock manager or signal manager will see the received
+ * signal when it next waits.
+ */
+ for (;;)
+ {
+ /* "false" means cannot accept cancel/die interrupt here. */
+ PGSemaphoreLock(&proc->sem, false);
+ if (!proc->lwWaiting)
+ break;
+ extraWaits++;
+ }
+
+ /* Now loop back and try to acquire lock again. */
+ retry = true;
+ }
+
+ slot->owner = proc;
+
+ /*
+ * Normally, we initialize the xlogInsertingAt value of the slot to 1,
+ * because we don't yet know where in the WAL we're going to insert. It's
+ * not critical what it points to right now - leaving it to a too small
+ * value just means that WaitXlogInsertionsToFinish() might wait on us
+ * unnecessarily, until we update the value (when we finish the insert or
+ * move to next page).
+ *
+ * If we're grabbing all the slots, however, stamp all but the last one
+ * with InvalidXLogRecPtr, meaning there is no insert in progress. The last
+ * slot is the one that we will update as we proceed with the insert, the
+ * rest are held just to keep off other inserters.
+ */
+ if (slotno != -1 && slotno != num_xloginsert_slots - 1)
+ slot->xlogInsertingAt = InvalidXLogRecPtr;
+ else
+ slot->xlogInsertingAt = 1;
+
+ /* We are done updating shared state of the slot itself. */
+ SpinLockRelease(&slot->mutex);
+
+ /*
+ * Fix the process wait semaphore's count for any absorbed wakeups.
+ */
+ while (extraWaits-- > 0)
+ PGSemaphoreUnlock(&proc->sem);
+
+ /*
+ * If we couldn't get the slot immediately, try another slot next time.
+ * On a system with more insertion slots than concurrent inserters, this
+ * causes all the inserters to eventually migrate to a slot that no-one
+ * else is using. On a system with more inserters than slots, it still
+ * causes the inserters to be distributed quite evenly across the slots.
+ */
+ if (slotno != -1 && retry)
+ slotToTry = (slotToTry + 1) % num_xloginsert_slots;
+}
+
+/*
+ * Wait for the given slot to become free, or for its xlogInsertingAt location
+ * to change to something else than 'waitptr'. In other words, wait for the
+ * inserter using the given slot to finish its insertion, or to at least make
+ * some progress.
+ */
+static void
+WaitOnSlot(volatile XLogInsertSlot *slot, XLogRecPtr waitptr)
+{
+ PGPROC *proc = MyProc;
+ int extraWaits = 0;
+
+ /*
+ * Lock out cancel/die interrupts while we sleep on the slot. There is
+ * no cleanup mechanism to remove us from the wait queue if we got
+ * interrupted.
+ */
+ HOLD_INTERRUPTS();
+
+ /*
+ * Loop here to try to acquire lock after each time we are signaled.
+ */
+ for (;;)
+ {
+ bool mustwait;
+
+ /* Acquire mutex. Time spent holding mutex should be short! */
+ SpinLockAcquire(&slot->mutex);
+
+ /* If I can get the lock, do so quickly. */
+ if (slot->exclusive == 0 || slot->xlogInsertingAt != waitptr)
+ mustwait = false;
+ else
+ mustwait = true;
+
+ if (!mustwait)
+ break; /* the lock was free */
+
+ Assert(slot->owner != MyProc);
+
+ /*
+ * Add myself to wait queue.
+ */
+ proc->lwWaiting = true;
+ proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
+ proc->lwWaitLink = NULL;
+
+ /* waiters are added to the front of the queue */
+ proc->lwWaitLink = slot->head;
+ if (slot->head == NULL)
+ slot->tail = proc;
+ slot->head = proc;
+
+ /* Can release the mutex now */
+ SpinLockRelease(&slot->mutex);
+
+ /*
+ * Wait until awakened.
+ *
+ * Since we share the process wait semaphore with other things, like
+ * the regular lock manager and ProcWaitForSignal, and we may need to
+ * acquire an LWLock while one of those is pending, it is possible that
+ * we get awakened for a reason other than being signaled by
+ * LWLockRelease. If so, loop back and wait again. Once we've gotten
+ * the LWLock, re-increment the sema by the number of additional
+ * signals received, so that the lock manager or signal manager will
+ * see the received signal when it next waits.
+ */
+ for (;;)
+ {
+ /* "false" means cannot accept cancel/die interrupt here. */
+ PGSemaphoreLock(&proc->sem, false);
+ if (!proc->lwWaiting)
+ break;
+ extraWaits++;
+ }
+
+ /* Now loop back and try to acquire lock again. */
+ }
+
+ /* We are done updating shared state of the lock itself. */
+ SpinLockRelease(&slot->mutex);
+
+ /*
+ * Fix the process wait semaphore's count for any absorbed wakeups.
+ */
+ while (extraWaits-- > 0)
+ PGSemaphoreUnlock(&proc->sem);
+
+ /*
+ * Now okay to allow cancel/die interrupts.
+ */
+ RESUME_INTERRUPTS();
+}
+
+/*
+ * Wake up all processes waiting for us with WaitOnSlot(). Sets our
+ * xlogInsertingAt value to EndPos, without releasing the slot.
+ */
+static void
+WakeupWaiters(XLogRecPtr EndPos)
+{
+ volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[MySlotNo].slot;
+ PGPROC *head;
+ PGPROC *proc;
+ PGPROC *next;
+
+ /*
+ * If we have already reported progress up to the same point, do nothing.
+ * No other process can modify xlogInsertingAt, so we can check this before
+ * grabbing the spinlock.
+ */
+ if (slot->xlogInsertingAt == EndPos)
+ return;
+ /* xlogInsertingAt should not go backwards */
+ Assert(slot->xlogInsertingAt < EndPos);
+
+ /* Acquire mutex. Time spent holding mutex should be short! */
+ SpinLockAcquire(&slot->mutex);
+
+ /* we should own the slot */
+ Assert(slot->exclusive == 1 && slot->owner == MyProc);
+
+ slot->xlogInsertingAt = EndPos;
+
+ /*
+ * See if there are any waiters that need to be woken up.
+ */
+ head = slot->head;
+
+ if (head != NULL)
+ {
+ proc = head;
+
+ /* LW_WAIT_UNTIL_FREE waiters are always in the front of the queue */
+ next = proc->lwWaitLink;
+ while (next && next->lwWaitMode == LW_WAIT_UNTIL_FREE)
+ {
+ proc = next;
+ next = next->lwWaitLink;
+ }
+
+ /* proc is now the last PGPROC to be released */
+ slot->head = next;
+ proc->lwWaitLink = NULL;
+ }
+
+ /* We are done updating shared state of the lock itself. */
+ SpinLockRelease(&slot->mutex);
+
+ /*
+ * Awaken any waiters I removed from the queue.
+ */
+ while (head != NULL)
+ {
+ proc = head;
+ head = proc->lwWaitLink;
+ proc->lwWaitLink = NULL;
+ proc->lwWaiting = false;
+ PGSemaphoreUnlock(&proc->sem);
+ }
+}
+
+/*
+ * Release our insertion slot (or slots, if we're holding them all).
+ */
+static void
+WALInsertSlotRelease(void)
+{
+ int i;
+
+ if (holdingAllSlots)
+ {
+ for (i = 0; i < num_xloginsert_slots; i++)
+ WALInsertSlotReleaseOne(i);
+ holdingAllSlots = false;
+ }
+ else
+ WALInsertSlotReleaseOne(MySlotNo);
+}
+
+static void
+WALInsertSlotReleaseOne(int slotno)
+{
+ volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[slotno].slot;
+ PGPROC *head;
+ PGPROC *proc;
+
+ /* Acquire mutex. Time spent holding mutex should be short! */
+ SpinLockAcquire(&slot->mutex);
+
+ /* we must be holding it */
+ Assert(slot->exclusive == 1 && slot->owner == MyProc);
+
+ slot->xlogInsertingAt = InvalidXLogRecPtr;
+
+ /* Release my hold on the slot */
+ slot->exclusive = 0;
+ slot->owner = NULL;
+
+ /*
+ * See if I need to awaken any waiters..
+ */
+ head = slot->head;
+ if (head != NULL)
+ {
+ if (slot->releaseOK)
+ {
+ /*
+ * Remove the to-be-awakened PGPROCs from the queue.
+ */
+ bool releaseOK = true;
+
+ proc = head;
+
+ /*
+ * First wake up any backends that want to be woken up without
+ * acquiring the lock. These are always in the front of the queue.
+ */
+ while (proc->lwWaitMode == LW_WAIT_UNTIL_FREE && proc->lwWaitLink)
+ proc = proc->lwWaitLink;
+
+ /*
+ * Awaken the first exclusive-waiter, if any.
+ */
+ if (proc->lwWaitLink)
{
- /*
- * Have to write buffers while holding insert lock. This is
- * not good, so only write as much as we absolutely must.
- */
- TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
- WriteRqst.Write = OldPageRqstPtr;
- WriteRqst.Flush = 0;
- XLogWrite(WriteRqst, false, false);
- LWLockRelease(WALWriteLock);
- TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
+ Assert(proc->lwWaitLink->lwWaitMode == LW_EXCLUSIVE);
+ proc = proc->lwWaitLink;
+ releaseOK = false;
}
+ /* proc is now the last PGPROC to be released */
+ slot->head = proc->lwWaitLink;
+ proc->lwWaitLink = NULL;
+
+ slot->releaseOK = releaseOK;
}
+ else
+ head = NULL;
+ }
+
+ /* We are done updating shared state of the slot itself. */
+ SpinLockRelease(&slot->mutex);
+
+ /*
+ * Awaken any waiters I removed from the queue.
+ */
+ while (head != NULL)
+ {
+ proc = head;
+ head = proc->lwWaitLink;
+ proc->lwWaitLink = NULL;
+ proc->lwWaiting = false;
+ PGSemaphoreUnlock(&proc->sem);
}
/*
- * Now the next buffer slot is free and we can set it up to be the next
- * output page.
+ * Now okay to allow cancel/die interrupts.
*/
- NewPageBeginPtr = XLogCtl->xlblocks[Insert->curridx];
+ END_CRIT_SECTION();
+}
+
+
+/*
+ * Wait for any WAL insertions < upto to finish.
+ *
+ * Returns the location of the oldest insertion that is still in-progress.
+ * Any WAL prior to that point has been fully copied into WAL buffers, and
+ * can be flushed out to disk. Because this waits for any insertions older
+ * than 'upto' to finish, the return value is always >= 'upto'.
+ *
+ * Note: When you are about to write out WAL, you must call this function
+ * *before* acquiring WALWriteLock, to avoid deadlocks. This function might
+ * need to wait for an insertion to finish (or at least advance to next
+ * uninitialized page), and the inserter might need to evict an old WAL buffer
+ * to make room for a new one, which in turn requires WALWriteLock.
+ */
+static XLogRecPtr
+WaitXLogInsertionsToFinish(XLogRecPtr upto)
+{
+ uint64 bytepos;
+ XLogRecPtr reservedUpto;
+ XLogRecPtr finishedUpto;
+ volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+ int i;
+
+ if (MyProc == NULL)
+ elog(PANIC, "cannot wait without a PGPROC structure");
- if (new_segment)
+ /* Read the current insert position */
+ SpinLockAcquire(&Insert->insertpos_lck);
+ bytepos = Insert->CurrBytePos;
+ SpinLockRelease(&Insert->insertpos_lck);
+ reservedUpto = XLogBytePosToEndRecPtr(bytepos);
+
+ /*
+ * No-one should request to flush a piece of WAL that hasn't even been
+ * reserved yet. However, it can happen if there is a block with a bogus
+ * LSN on disk, for example. XLogFlush checks for that situation and
+ * complains, but only after the flush. Here we just assume that to mean
+ * that all WAL that has been reserved needs to be finished. In this
+ * corner-case, the return value can be smaller than 'upto' argument.
+ */
+ if (upto > reservedUpto)
{
- /* force it to a segment start point */
- if (NewPageBeginPtr % XLogSegSize != 0)
- XLByteAdvance(NewPageBeginPtr,
- XLogSegSize - NewPageBeginPtr % XLogSegSize);
+ elog(LOG, "request to flush past end of generated WAL; request %X/%X, currpos %X/%X",
+ (uint32) (upto >> 32), (uint32) upto,
+ (uint32) (reservedUpto >> 32), (uint32) reservedUpto);
+ upto = reservedUpto;
}
- NewPageEndPtr = NewPageBeginPtr;
- XLByteAdvance(NewPageEndPtr, XLOG_BLCKSZ);
- XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
- NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
+ /*
+ * finishedUpto is our return value, indicating the point upto which
+ * all the WAL insertions have been finished. Initialize it to the head
+ * of reserved WAL, and as we iterate through the insertion slots, back it
+ * out for any insertion that's still in progress.
+ */
+ finishedUpto = reservedUpto;
- Insert->curridx = nextidx;
- Insert->currpage = NewPage;
+ /*
+ * Loop through all the slots, sleeping on any in-progress insert older
+ * than 'upto'.
+ */
+ for (i = 0; i < num_xloginsert_slots; i++)
+ {
+ volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[i].slot;
+ XLogRecPtr insertingat;
- Insert->currpos = ((char *) NewPage) +SizeOfXLogShortPHD;
+ retry:
+ /*
+ * We can check if the slot is in use without grabbing the spinlock.
+ * The spinlock acquisition of insertpos_lck before this loop acts
+ * as a memory barrier. If someone acquires the slot after that, it
+ * can't possibly be inserting to anything < reservedUpto. If it was
+ * acquired before that, an unlocked test will return true.
+ */
+ if (!slot->exclusive)
+ continue;
+
+ SpinLockAcquire(&slot->mutex);
+ /* re-check now that we have the lock */
+ if (!slot->exclusive)
+ {
+ SpinLockRelease(&slot->mutex);
+ continue;
+ }
+ insertingat = slot->xlogInsertingAt;
+ SpinLockRelease(&slot->mutex);
+
+ if (insertingat == InvalidXLogRecPtr)
+ {
+ /*
+ * slot is reserved just to hold off other inserters, there is no
+ * actual insert in progress.
+ */
+ continue;
+ }
+
+ /*
+ * This insertion is still in progress. Do we need to wait for it?
+ *
+ * When an inserter acquires a slot, it doesn't reset 'insertingat', so
+ * it will initially point to the old value of some already-finished
+ * insertion. The inserter will update the value as soon as it finishes
+ * the insertion, moves to the next page, or has to do I/O to flush an
+ * old dirty buffer. That means that when we see a slot with
+ * insertingat value < upto, we don't know if that insertion is still
+ * truly in progress, or if the slot is reused by a new inserter that
+ * hasn't updated the insertingat value yet. We have to assume it's the
+ * latter, and wait.
+ */
+ if (insertingat < upto)
+ {
+ WaitOnSlot(slot, insertingat);
+ goto retry;
+ }
+ else
+ {
+ /*
+ * We don't need to wait for this insertion, but update the
+ * return value.
+ */
+ if (insertingat < finishedUpto)
+ finishedUpto = insertingat;
+ }
+ }
+ return finishedUpto;
+}
+
+/*
+ * Get a pointer to the right location in the WAL buffer containing the
+ * given XLogRecPtr.
+ *
+ * If the page is not initialized yet, it is initialized. That might require
+ * evicting an old dirty buffer from the buffer cache, which means I/O.
+ *
+ * The caller must ensure that the page containing the requested location
+ * isn't evicted yet, and won't be evicted. The way to ensure that is to
+ * hold onto an XLogInsertSlot with the xlogInsertingAt position set to
+ * something <= ptr. GetXLogBuffer() will update xlogInsertingAt if it needs
+ * to evict an old page from the buffer. (This means that once you call
+ * GetXLogBuffer() with a given 'ptr', you must not access anything before
+ * that point anymore, and must not call GetXLogBuffer() with an older 'ptr'
+ * later, because older buffers might be recycled already)
+ */
+static char *
+GetXLogBuffer(XLogRecPtr ptr)
+{
+ int idx;
+ XLogRecPtr endptr;
+ static uint64 cachedPage = 0;
+ static char *cachedPos = NULL;
+ XLogRecPtr expectedEndPtr;
+
+ /*
+ * Fast path for the common case that we need to access again the same
+ * page as last time.
+ */
+ if (ptr / XLOG_BLCKSZ == cachedPage)
+ {
+ Assert(((XLogPageHeader) cachedPos)->xlp_magic == XLOG_PAGE_MAGIC);
+ Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr == ptr - (ptr % XLOG_BLCKSZ));
+ return cachedPos + ptr % XLOG_BLCKSZ;
+ }
/*
- * Be sure to re-zero the buffer so that bytes beyond what we've written
- * will look like zeroes and not valid XLOG records...
+ * The XLog buffer cache is organized so that a page is always loaded
+ * to a particular buffer. That way we can easily calculate the buffer
+ * a given page must be loaded into, from the XLogRecPtr alone.
*/
- MemSet((char *) NewPage, 0, XLOG_BLCKSZ);
+ idx = XLogRecPtrToBufIdx(ptr);
+
+ /*
+ * See what page is loaded in the buffer at the moment. It could be the
+ * page we're looking for, or something older. It can't be anything newer
+ * - that would imply the page we're looking for has already been written
+ * out to disk and evicted, and the caller is responsible for making sure
+ * that doesn't happen.
+ *
+ * However, we don't hold a lock while we read the value. If someone has
+ * just initialized the page, it's possible that we get a "torn read" of
+ * the XLogRecPtr if 64-bit fetches are not atomic on this platform. In
+ * that case we will see a bogus value. That's ok, we'll grab the mapping
+ * lock (in AdvanceXLInsertBuffer) and retry if we see anything else than
+ * the page we're looking for. But it means that when we do this unlocked
+ * read, we might see a value that appears to be ahead of the page we're
+ * looking for. Don't PANIC on that, until we've verified the value while
+ * holding the lock.
+ */
+ expectedEndPtr = ptr;
+ expectedEndPtr += XLOG_BLCKSZ - ptr % XLOG_BLCKSZ;
+
+ endptr = XLogCtl->xlblocks[idx];
+ if (expectedEndPtr != endptr)
+ {
+ /*
+ * Let others know that we're finished inserting the record up
+ * to the page boundary.
+ */
+ WakeupWaiters(expectedEndPtr - XLOG_BLCKSZ);
+
+ AdvanceXLInsertBuffer(ptr, false);
+ endptr = XLogCtl->xlblocks[idx];
+
+ if (expectedEndPtr != endptr)
+ elog(PANIC, "could not find WAL buffer for %X/%X",
+ (uint32) (ptr >> 32) , (uint32) ptr);
+ }
+ else
+ {
+ /*
+ * Make sure the initialization of the page is visible to us, and
+ * won't arrive later to overwrite the WAL data we write on the page.
+ */
+ pg_memory_barrier();
+ }
/*
- * Fill the new page's header
+ * Found the buffer holding this page. Return a pointer to the right
+ * offset within the page.
*/
- NewPage ->xlp_magic = XLOG_PAGE_MAGIC;
+ cachedPage = ptr / XLOG_BLCKSZ;
+ cachedPos = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ;
+
+ Assert(((XLogPageHeader) cachedPos)->xlp_magic == XLOG_PAGE_MAGIC);
+ Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr == ptr - (ptr % XLOG_BLCKSZ));
+
+ return cachedPos + ptr % XLOG_BLCKSZ;
+}
+
+/*
+ * Converts a "usable byte position" to XLogRecPtr. A usable byte position
+ * is the position starting from the beginning of WAL, excluding all WAL
+ * page headers.
+ */
+static XLogRecPtr
+XLogBytePosToRecPtr(uint64 bytepos)
+{
+ uint64 fullsegs;
+ uint64 fullpages;
+ uint64 bytesleft;
+ uint32 seg_offset;
+ XLogRecPtr result;
+
+ fullsegs = bytepos / UsableBytesInSegment;
+ bytesleft = bytepos % UsableBytesInSegment;
+
+ if (bytesleft < XLOG_BLCKSZ - SizeOfXLogLongPHD)
+ {
+ /* fits on first page of segment */
+ seg_offset = bytesleft + SizeOfXLogLongPHD;
+ }
+ else
+ {
+ /* account for the first page on segment with long header */
+ seg_offset = XLOG_BLCKSZ;
+ bytesleft -= XLOG_BLCKSZ - SizeOfXLogLongPHD;
+
+ fullpages = bytesleft / UsableBytesInPage;
+ bytesleft = bytesleft % UsableBytesInPage;
+
+ seg_offset += fullpages * XLOG_BLCKSZ + bytesleft + SizeOfXLogShortPHD;
+ }
+
+ XLogSegNoOffsetToRecPtr(fullsegs, seg_offset, result);
+
+ return result;
+}
+
+/*
+ * Like XLogBytePosToRecPtr, but if the position is at a page boundary,
+ * returns a pointer to the beginning of the page (ie. before page header),
+ * not to where the first xlog record on that page would go to. This is used
+ * when converting a pointer to the end of a record.
+ */
+static XLogRecPtr
+XLogBytePosToEndRecPtr(uint64 bytepos)
+{
+ uint64 fullsegs;
+ uint64 fullpages;
+ uint64 bytesleft;
+ uint32 seg_offset;
+ XLogRecPtr result;
+
+ fullsegs = bytepos / UsableBytesInSegment;
+ bytesleft = bytepos % UsableBytesInSegment;
+
+ if (bytesleft < XLOG_BLCKSZ - SizeOfXLogLongPHD)
+ {
+ /* fits on first page of segment */
+ if (bytesleft == 0)
+ seg_offset = 0;
+ else
+ seg_offset = bytesleft + SizeOfXLogLongPHD;
+ }
+ else
+ {
+ /* account for the first page on segment with long header */
+ seg_offset = XLOG_BLCKSZ;
+ bytesleft -= XLOG_BLCKSZ - SizeOfXLogLongPHD;
+
+ fullpages = bytesleft / UsableBytesInPage;
+ bytesleft = bytesleft % UsableBytesInPage;
+
+ if (bytesleft == 0)
+ seg_offset += fullpages * XLOG_BLCKSZ + bytesleft;
+ else
+ seg_offset += fullpages * XLOG_BLCKSZ + bytesleft + SizeOfXLogShortPHD;
+ }
+
+ XLogSegNoOffsetToRecPtr(fullsegs, seg_offset, result);
- /* NewPage->xlp_info = 0; */ /* done by memset */
- NewPage ->xlp_tli = ThisTimeLineID;
- NewPage ->xlp_pageaddr = NewPageBeginPtr;
+ return result;
+}
+
+/*
+ * Convert an XLogRecPtr to a "usable byte position".
+ */
+static uint64
+XLogRecPtrToBytePos(XLogRecPtr ptr)
+{
+ uint64 fullsegs;
+ uint32 fullpages;
+ uint32 offset;
+ uint64 result;
+
+ XLByteToSeg(ptr, fullsegs);
+
+ fullpages = (ptr % XLOG_SEG_SIZE) / XLOG_BLCKSZ;
+ offset = ptr % XLOG_BLCKSZ;
+
+ if (fullpages == 0)
+ {
+ result = fullsegs * UsableBytesInSegment;
+ if (offset > 0)
+ {
+ Assert(offset >= SizeOfXLogLongPHD);
+ result += offset - SizeOfXLogLongPHD;
+ }
+ }
+ else
+ {
+ result = fullsegs * UsableBytesInSegment +
+ (XLOG_BLCKSZ - SizeOfXLogLongPHD) + /* account for first page */
+ (fullpages - 1) * UsableBytesInPage; /* full pages */
+ if (offset > 0)
+ {
+ Assert(offset >= SizeOfXLogShortPHD);
+ result += offset - SizeOfXLogShortPHD;
+ }
+ }
+
+ return result;
+}
+
+/*
+ * Determine whether the buffer referenced by an XLogRecData item has to
+ * be backed up, and if so fill a BkpBlock struct for it. In any case
+ * save the buffer's LSN at *lsn.
+ */
+static bool
+XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock,
+ XLogRecPtr *lsn, BkpBlock *bkpb)
+{
+ Page page;
+
+ page = BufferGetPage(rdata->buffer);
/*
- * If online backup is not in progress, mark the header to indicate that
- * WAL records beginning in this page have removable backup blocks. This
- * allows the WAL archiver to know whether it is safe to compress archived
- * WAL data by transforming full-block records into the non-full-block
- * format. It is sufficient to record this at the page level because we
- * force a page switch (in fact a segment switch) when starting a backup,
- * so the flag will be off before any records can be written during the
- * backup. At the end of a backup, the last page will be marked as all
- * unsafe when perhaps only part is unsafe, but at worst the archiver
- * would miss the opportunity to compress a few records.
+ * We assume page LSN is first data on *every* page that can be passed to
+ * XLogInsert, whether it has the standard page layout or not. We don't
+ * need to take the buffer header lock for PageGetLSN if we hold an
+ * exclusive lock on the page and/or the relation.
*/
- if (!Insert->forcePageWrites)
- NewPage ->xlp_info |= XLP_BKP_REMOVABLE;
+ if (holdsExclusiveLock)
+ *lsn = PageGetLSN(page);
+ else
+ *lsn = BufferGetLSNAtomic(rdata->buffer);
+
+ if (*lsn <= RedoRecPtr)
+ {
+ /*
+ * The page needs to be backed up, so set up *bkpb
+ */
+ BufferGetTag(rdata->buffer, &bkpb->node, &bkpb->fork, &bkpb->block);
+
+ if (rdata->buffer_std)
+ {
+ /* Assume we can omit data between pd_lower and pd_upper */
+ uint16 lower = ((PageHeader) page)->pd_lower;
+ uint16 upper = ((PageHeader) page)->pd_upper;
+
+ if (lower >= SizeOfPageHeaderData &&
+ upper > lower &&
+ upper <= BLCKSZ)
+ {
+ bkpb->hole_offset = lower;
+ bkpb->hole_length = upper - lower;
+ }
+ else
+ {
+ /* No "hole" to compress out */
+ bkpb->hole_offset = 0;
+ bkpb->hole_length = 0;
+ }
+ }
+ else
+ {
+ /* Not a standard page header, don't try to eliminate "hole" */
+ bkpb->hole_offset = 0;
+ bkpb->hole_length = 0;
+ }
+
+ return true; /* buffer requires backup */
+ }
+
+ return false; /* buffer does not need to be backed up */
+}
+
+/*
+ * Initialize XLOG buffers, writing out old buffers if they still contain
+ * unwritten data, upto the page containing 'upto'. Or if 'opportunistic' is
+ * true, initialize as many pages as we can without having to write out
+ * unwritten data. Any new pages are initialized to zeros, with pages headers
+ * initialized properly.
+ */
+static void
+AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
+{
+ XLogCtlInsert *Insert = &XLogCtl->Insert;
+ int nextidx;
+ XLogRecPtr OldPageRqstPtr;
+ XLogwrtRqst WriteRqst;
+ XLogRecPtr NewPageEndPtr = InvalidXLogRecPtr;
+ XLogRecPtr NewPageBeginPtr;
+ XLogPageHeader NewPage;
+ int npages = 0;
+
+ LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
/*
- * If first page of an XLOG segment file, make it a long header.
+ * Now that we have the lock, check if someone initialized the page
+ * already.
*/
- if ((NewPage->xlp_pageaddr % XLogSegSize) == 0)
+ while (upto >= XLogCtl->InitializedUpTo || opportunistic)
{
- XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;
+ nextidx = XLogRecPtrToBufIdx(XLogCtl->InitializedUpTo);
+
+ /*
+ * Get ending-offset of the buffer page we need to replace (this may
+ * be zero if the buffer hasn't been used yet). Fall through if it's
+ * already written out.
+ */
+ OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
+ if (LogwrtResult.Write < OldPageRqstPtr)
+ {
+ /*
+ * Nope, got work to do. If we just want to pre-initialize as much
+ * as we can without flushing, give up now.
+ */
+ if (opportunistic)
+ break;
+
+ /* Before waiting, get info_lck and update LogwrtResult */
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+
+ SpinLockAcquire(&xlogctl->info_lck);
+ if (xlogctl->LogwrtRqst.Write < OldPageRqstPtr)
+ xlogctl->LogwrtRqst.Write = OldPageRqstPtr;
+ LogwrtResult = xlogctl->LogwrtResult;
+ SpinLockRelease(&xlogctl->info_lck);
+ }
+
+ /*
+ * Now that we have an up-to-date LogwrtResult value, see if we
+ * still need to write it or if someone else already did.
+ */
+ if (LogwrtResult.Write < OldPageRqstPtr)
+ {
+ /*
+ * Must acquire write lock. Release WALBufMappingLock first,
+ * to make sure that all insertions that we need to wait for
+ * can finish (up to this same position). Otherwise we risk
+ * deadlock.
+ */
+ LWLockRelease(WALBufMappingLock);
+
+ WaitXLogInsertionsToFinish(OldPageRqstPtr);
+
+ LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
+
+ LogwrtResult = XLogCtl->LogwrtResult;
+ if (LogwrtResult.Write >= OldPageRqstPtr)
+ {
+ /* OK, someone wrote it already */
+ LWLockRelease(WALWriteLock);
+ }
+ else
+ {
+ /* Have to write it ourselves */
+ TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
+ WriteRqst.Write = OldPageRqstPtr;
+ WriteRqst.Flush = 0;
+ XLogWrite(WriteRqst, false);
+ LWLockRelease(WALWriteLock);
+ TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
+ }
+ /* Re-acquire WALBufMappingLock and retry */
+ LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
+ continue;
+ }
+ }
+
+ /*
+ * Now the next buffer slot is free and we can set it up to be the next
+ * output page.
+ */
+ NewPageBeginPtr = XLogCtl->InitializedUpTo;
+ NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ;
+
+ Assert(XLogRecPtrToBufIdx(NewPageBeginPtr) == nextidx);
+
+ NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
+
+ /*
+ * Be sure to re-zero the buffer so that bytes beyond what we've
+ * written will look like zeroes and not valid XLOG records...
+ */
+ MemSet((char *) NewPage, 0, XLOG_BLCKSZ);
+
+ /*
+ * Fill the new page's header
+ */
+ NewPage ->xlp_magic = XLOG_PAGE_MAGIC;
+
+ /* NewPage->xlp_info = 0; */ /* done by memset */
+ NewPage ->xlp_tli = ThisTimeLineID;
+ NewPage ->xlp_pageaddr = NewPageBeginPtr;
+ /* NewPage->xlp_rem_len = 0; */ /* done by memset */
+
+ /*
+ * If online backup is not in progress, mark the header to indicate
+ * that* WAL records beginning in this page have removable backup
+ * blocks. This allows the WAL archiver to know whether it is safe to
+ * compress archived WAL data by transforming full-block records into
+ * the non-full-block format. It is sufficient to record this at the
+ * page level because we force a page switch (in fact a segment switch)
+ * when starting a backup, so the flag will be off before any records
+ * can be written during the backup. At the end of a backup, the last
+ * page will be marked as all unsafe when perhaps only part is unsafe,
+ * but at worst the archiver would miss the opportunity to compress a
+ * few records.
+ */
+ if (!Insert->forcePageWrites)
+ NewPage ->xlp_info |= XLP_BKP_REMOVABLE;
+
+ /*
+ * If first page of an XLOG segment file, make it a long header.
+ */
+ if ((NewPage->xlp_pageaddr % XLogSegSize) == 0)
+ {
+ XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;
+
+ NewLongPage->xlp_sysid = ControlFile->system_identifier;
+ NewLongPage->xlp_seg_size = XLogSegSize;
+ NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
+ NewPage ->xlp_info |= XLP_LONG_HEADER;
+ }
+
+ /*
+ * Make sure the initialization of the page becomes visible to others
+ * before the xlblocks update. GetXLogBuffer() reads xlblocks without
+ * holding a lock.
+ */
+ pg_write_barrier();
+
+ *((volatile XLogRecPtr *) &XLogCtl->xlblocks[nextidx]) = NewPageEndPtr;
- NewLongPage->xlp_sysid = ControlFile->system_identifier;
- NewLongPage->xlp_seg_size = XLogSegSize;
- NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
- NewPage ->xlp_info |= XLP_LONG_HEADER;
+ XLogCtl->InitializedUpTo = NewPageEndPtr;
- Insert->currpos = ((char *) NewPage) +SizeOfXLogLongPHD;
+ npages++;
}
+ LWLockRelease(WALBufMappingLock);
- return update_needed;
+#ifdef WAL_DEBUG
+ if (npages > 0)
+ {
+ elog(DEBUG1, "initialized %d pages, upto %X/%X",
+ npages, (uint32) (NewPageEndPtr >> 32), (uint32) NewPageEndPtr);
+ }
+#endif
}
/*
* This option allows us to avoid uselessly issuing multiple writes when a
* single one would do.
*
- * If xlog_switch == TRUE, we are intending an xlog segment switch, so
- * perform end-of-segment actions after writing the last page, even if
- * it's not physically the end of its segment. (NB: this will work properly
- * only if caller specifies WriteRqst == page-end and flexible == false,
- * and there is some data to write.)
- *
- * Must be called with WALWriteLock held.
+ * Must be called with WALWriteLock held. WaitXLogInsertionsToFinish(WriteRqst)
+ * must be called before grabbing the lock, to make sure the data is ready to
+ * write.
*/
static void
-XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
+XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
{
- XLogCtlWrite *Write = &XLogCtl->Write;
bool ispartialpage;
bool last_iteration;
bool finishing_seg;
/*
* Within the loop, curridx is the cache block index of the page to
- * consider writing. We advance Write->curridx only after successfully
- * writing pages. (Right now, this refinement is useless since we are
- * going to PANIC if any error occurs anyway; but someday it may come in
- * useful.)
+ * consider writing. Begin at the buffer containing the next unwritten
+ * page, or last partially written page.
*/
- curridx = Write->curridx;
+ curridx = XLogRecPtrToBufIdx(LogwrtResult.Write);
- while (XLByteLT(LogwrtResult.Write, WriteRqst.Write))
+ while (LogwrtResult.Write < WriteRqst.Write)
{
/*
* Make sure we're not ahead of the insert process. This could happen
* if we're passed a bogus WriteRqst.Write that is past the end of the
* last page that's been initialized by AdvanceXLInsertBuffer.
*/
- if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[curridx]))
+ XLogRecPtr EndPtr = XLogCtl->xlblocks[curridx];
+ if (LogwrtResult.Write >= EndPtr)
elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
- (uint32) (LogwrtResult.Write >> 32), (uint32) LogwrtResult.Write,
- (uint32) (XLogCtl->xlblocks[curridx] >> 32),
- (uint32) XLogCtl->xlblocks[curridx]);
+ (uint32) (LogwrtResult.Write >> 32),
+ (uint32) LogwrtResult.Write,
+ (uint32) (EndPtr >> 32), (uint32) EndPtr);
/* Advance LogwrtResult.Write to end of current buffer page */
- LogwrtResult.Write = XLogCtl->xlblocks[curridx];
- ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);
+ LogwrtResult.Write = EndPtr;
+ ispartialpage = WriteRqst.Write < LogwrtResult.Write;
if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo))
{
* contiguous in memory), or if we are at the end of the logfile
* segment.
*/
- last_iteration = !XLByteLT(LogwrtResult.Write, WriteRqst.Write);
+ last_iteration = WriteRqst.Write <= LogwrtResult.Write;
finishing_seg = !ispartialpage &&
(startoffset + npages * XLOG_BLCKSZ) >= XLogSegSize;
{
char *from;
Size nbytes;
+ Size nleft;
+ int written;
/* Need to seek in the file? */
if (openLogOff != startoffset)
if (lseek(openLogFile, (off_t) startoffset, SEEK_SET) < 0)
ereport(PANIC,
(errcode_for_file_access(),
- errmsg("could not seek in log file %s to offset %u: %m",
- XLogFileNameP(ThisTimeLineID, openLogSegNo),
- startoffset)));
+ errmsg("could not seek in log file %s to offset %u: %m",
+ XLogFileNameP(ThisTimeLineID, openLogSegNo),
+ startoffset)));
openLogOff = startoffset;
}
/* OK to write the page(s) */
from = XLogCtl->pages + startidx * (Size) XLOG_BLCKSZ;
nbytes = npages * (Size) XLOG_BLCKSZ;
- errno = 0;
- if (write(openLogFile, from, nbytes) != nbytes)
+ nleft = nbytes;
+ do
{
- /* if write didn't set errno, assume no disk space */
- if (errno == 0)
- errno = ENOSPC;
- ereport(PANIC,
- (errcode_for_file_access(),
- errmsg("could not write to log file %s "
- "at offset %u, length %lu: %m",
- XLogFileNameP(ThisTimeLineID, openLogSegNo),
- openLogOff, (unsigned long) nbytes)));
- }
+ errno = 0;
+ written = write(openLogFile, from, nleft);
+ if (written <= 0)
+ {
+ if (errno == EINTR)
+ continue;
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not write to log file %s "
+ "at offset %u, length %lu: %m",
+ XLogFileNameP(ThisTimeLineID, openLogSegNo),
+ openLogOff, (unsigned long) nbytes)));
+ }
+ nleft -= written;
+ from += written;
+ } while (nleft > 0);
/* Update state for write */
openLogOff += nbytes;
- Write->curridx = ispartialpage ? curridx : NextBufIdx(curridx);
npages = 0;
/*
* later. Doing it here ensures that one and only one backend will
* perform this fsync.
*
- * We also do this if this is the last page written for an xlog
- * switch.
- *
* This is also the right place to notify the Archiver that the
* segment is ready to copy to archival storage, and to update the
* timer for archive_timeout, and to signal for a checkpoint if
* too many logfile segments have been used since the last
* checkpoint.
*/
- if (finishing_seg || (xlog_switch && last_iteration))
+ if (finishing_seg)
{
issue_xlog_fsync(openLogFile, openLogSegNo);
if (XLogArchivingActive())
XLogArchiveNotifySeg(openLogSegNo);
- Write->lastSegSwitchTime = (pg_time_t) time(NULL);
+ XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);
/*
* Request a checkpoint if we've consumed too much xlog since
}
Assert(npages == 0);
- Assert(curridx == Write->curridx);
/*
* If asked to flush, do so
*/
- if (XLByteLT(LogwrtResult.Flush, WriteRqst.Flush) &&
- XLByteLT(LogwrtResult.Flush, LogwrtResult.Write))
+ if (LogwrtResult.Flush < WriteRqst.Flush &&
+ LogwrtResult.Flush < LogwrtResult.Write)
+
{
/*
* Could get here without iterating above loop, in which case we might
SpinLockAcquire(&xlogctl->info_lck);
xlogctl->LogwrtResult = LogwrtResult;
- if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
+ if (xlogctl->LogwrtRqst.Write < LogwrtResult.Write)
xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
- if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
+ if (xlogctl->LogwrtRqst.Flush < LogwrtResult.Flush)
xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
SpinLockRelease(&xlogctl->info_lck);
}
SpinLockAcquire(&xlogctl->info_lck);
LogwrtResult = xlogctl->LogwrtResult;
sleeping = xlogctl->WalWriterSleeping;
- if (XLByteLT(xlogctl->asyncXactLSN, asyncXactLSN))
+ if (xlogctl->asyncXactLSN < asyncXactLSN)
xlogctl->asyncXactLSN = asyncXactLSN;
SpinLockRelease(&xlogctl->info_lck);
WriteRqstPtr -= WriteRqstPtr % XLOG_BLCKSZ;
/* if we have already flushed that far, we're done */
- if (XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
+ if (WriteRqstPtr <= LogwrtResult.Flush)
return;
}
UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force)
{
/* Quick check using our local copy of the variable */
- if (!updateMinRecoveryPoint || (!force && XLByteLE(lsn, minRecoveryPoint)))
+ if (!updateMinRecoveryPoint || (!force && lsn <= minRecoveryPoint))
return;
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
*/
if (minRecoveryPoint == 0)
updateMinRecoveryPoint = false;
- else if (force || XLByteLT(minRecoveryPoint, lsn))
+ else if (force || minRecoveryPoint < lsn)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
newMinRecoveryPointTLI = xlogctl->replayEndTLI;
SpinLockRelease(&xlogctl->info_lck);
- if (!force && XLByteLT(newMinRecoveryPoint, lsn))
+ if (!force && newMinRecoveryPoint < lsn)
elog(WARNING,
"xlog min recovery request %X/%X is past current point %X/%X",
- (uint32) (lsn >> 32) , (uint32) lsn,
+ (uint32) (lsn >> 32), (uint32) lsn,
(uint32) (newMinRecoveryPoint >> 32),
(uint32) newMinRecoveryPoint);
/* update control file */
- if (XLByteLT(ControlFile->minRecoveryPoint, newMinRecoveryPoint))
+ if (ControlFile->minRecoveryPoint < newMinRecoveryPoint)
{
ControlFile->minRecoveryPoint = newMinRecoveryPoint;
ControlFile->minRecoveryPointTLI = newMinRecoveryPointTLI;
minRecoveryPointTLI = newMinRecoveryPointTLI;
ereport(DEBUG2,
- (errmsg("updated min recovery point to %X/%X on timeline %u",
- (uint32) (minRecoveryPoint >> 32),
- (uint32) minRecoveryPoint,
- newMinRecoveryPointTLI)));
+ (errmsg("updated min recovery point to %X/%X on timeline %u",
+ (uint32) (minRecoveryPoint >> 32),
+ (uint32) minRecoveryPoint,
+ newMinRecoveryPointTLI)));
}
}
LWLockRelease(ControlFileLock);
}
/* Quick exit if already known flushed */
- if (XLByteLE(record, LogwrtResult.Flush))
+ if (record <= LogwrtResult.Flush)
return;
#ifdef WAL_DEBUG
elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X",
(uint32) (record >> 32), (uint32) record,
(uint32) (LogwrtResult.Write >> 32), (uint32) LogwrtResult.Write,
- (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
+ (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
#endif
START_CRIT_SECTION();
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
+ XLogRecPtr insertpos;
/* read LogwrtResult and update local state */
SpinLockAcquire(&xlogctl->info_lck);
- if (XLByteLT(WriteRqstPtr, xlogctl->LogwrtRqst.Write))
+ if (WriteRqstPtr < xlogctl->LogwrtRqst.Write)
WriteRqstPtr = xlogctl->LogwrtRqst.Write;
LogwrtResult = xlogctl->LogwrtResult;
SpinLockRelease(&xlogctl->info_lck);
/* done already? */
- if (XLByteLE(record, LogwrtResult.Flush))
+ if (record <= LogwrtResult.Flush)
break;
+ /*
+ * Before actually performing the write, wait for all in-flight
+ * insertions to the pages we're about to write to finish.
+ */
+ insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr);
+
/*
* Try to get the write lock. If we can't get it immediately, wait
* until it's released, and recheck if we still need to do the flush
/* Got the lock; recheck whether request is satisfied */
LogwrtResult = XLogCtl->LogwrtResult;
- if (XLByteLE(record, LogwrtResult.Flush))
+ if (record <= LogwrtResult.Flush)
{
LWLockRelease(WALWriteLock);
break;
/*
* Sleep before flush! By adding a delay here, we may give further
* backends the opportunity to join the backlog of group commit
- * followers; this can significantly improve transaction throughput, at
- * the risk of increasing transaction latency.
+ * followers; this can significantly improve transaction throughput,
+ * at the risk of increasing transaction latency.
*
* We do not sleep if enableFsync is not turned on, nor if there are
* fewer than CommitSiblings other backends with active transactions.
*/
if (CommitDelay > 0 && enableFsync &&
MinimumActiveBackends(CommitSiblings))
- pg_usleep(CommitDelay);
-
- /* try to write/flush later additions to XLOG as well */
- if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE))
- {
- XLogCtlInsert *Insert = &XLogCtl->Insert;
- uint32 freespace = INSERT_FREESPACE(Insert);
-
- if (freespace == 0) /* buffer is full */
- WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
- else
- {
- WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
- WriteRqstPtr -= freespace;
- }
- LWLockRelease(WALInsertLock);
- WriteRqst.Write = WriteRqstPtr;
- WriteRqst.Flush = WriteRqstPtr;
- }
- else
{
- WriteRqst.Write = WriteRqstPtr;
- WriteRqst.Flush = record;
+ pg_usleep(CommitDelay);
+
+ /*
+ * Re-check how far we can now flush the WAL. It's generally not
+ * safe to call WaitXLogInsetionsToFinish while holding
+ * WALWriteLock, because an in-progress insertion might need to
+ * also grab WALWriteLock to make progress. But we know that all
+ * the insertions up to insertpos have already finished, because
+ * that's what the earlier WaitXLogInsertionsToFinish() returned.
+ * We're only calling it again to allow insertpos to be moved
+ * further forward, not to actually wait for anyone.
+ */
+ insertpos = WaitXLogInsertionsToFinish(insertpos);
}
- XLogWrite(WriteRqst, false, false);
+
+ /* try to write/flush later additions to XLOG as well */
+ WriteRqst.Write = insertpos;
+ WriteRqst.Flush = insertpos;
+
+ XLogWrite(WriteRqst, false);
LWLockRelease(WALWriteLock);
/* done */
* calls from bufmgr.c are not within critical sections and so we will not
* force a restart for a bad LSN on a data page.
*/
- if (XLByteLT(LogwrtResult.Flush, record))
+ if (LogwrtResult.Flush < record)
elog(ERROR,
"xlog flush request %X/%X is not satisfied --- flushed only to %X/%X",
(uint32) (record >> 32), (uint32) record,
- (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
+ (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
}
/*
WriteRqstPtr -= WriteRqstPtr % XLOG_BLCKSZ;
/* if we have already flushed that far, consider async commit records */
- if (XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
+ if (WriteRqstPtr <= LogwrtResult.Flush)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
* holding an open file handle to a logfile that's no longer in use,
* preventing the file from being deleted.
*/
- if (XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
+ if (WriteRqstPtr <= LogwrtResult.Flush)
{
if (openLogFile >= 0)
{
elog(LOG, "xlog bg flush request %X/%X; write %X/%X; flush %X/%X",
(uint32) (WriteRqstPtr >> 32), (uint32) WriteRqstPtr,
(uint32) (LogwrtResult.Write >> 32), (uint32) LogwrtResult.Write,
- (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
+ (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
#endif
START_CRIT_SECTION();
- /* now wait for the write lock */
+ /* now wait for any in-progress insertions to finish and get write lock */
+ WaitXLogInsertionsToFinish(WriteRqstPtr);
LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
LogwrtResult = XLogCtl->LogwrtResult;
- if (!XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
+ if (WriteRqstPtr > LogwrtResult.Flush)
{
XLogwrtRqst WriteRqst;
WriteRqst.Write = WriteRqstPtr;
WriteRqst.Flush = WriteRqstPtr;
- XLogWrite(WriteRqst, flexible, false);
+ XLogWrite(WriteRqst, flexible);
wrote_something = true;
}
LWLockRelease(WALWriteLock);
/* wake up walsenders now that we've released heavily contended locks */
WalSndWakeupProcessRequests();
+ /*
+ * Great, done. To take some work off the critical path, try to initialize
+ * as many of the no-longer-needed WAL buffers for future use as we can.
+ */
+ AdvanceXLInsertBuffer(InvalidXLogRecPtr, true);
+
return wrote_something;
}
if (RecoveryInProgress())
{
/* Quick exit if already known updated */
- if (XLByteLE(record, minRecoveryPoint) || !updateMinRecoveryPoint)
+ if (record <= minRecoveryPoint || !updateMinRecoveryPoint)
return false;
/*
updateMinRecoveryPoint = false;
/* check again */
- if (XLByteLE(record, minRecoveryPoint) || !updateMinRecoveryPoint)
+ if (record <= minRecoveryPoint || !updateMinRecoveryPoint)
return false;
else
return true;
}
/* Quick exit if already known flushed */
- if (XLByteLE(record, LogwrtResult.Flush))
+ if (record <= LogwrtResult.Flush)
return false;
/* read LogwrtResult and update local state */
}
/* check again */
- if (XLByteLE(record, LogwrtResult.Flush))
+ if (record <= LogwrtResult.Flush)
return false;
return true;
if (fd < 0)
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not open file \"%s\": %m", path)));
+ errmsg("could not open file \"%s\": %m", path)));
elog(DEBUG2, "done creating and filling new WAL file");
if (fd < 0)
ereport(PANIC,
(errcode_for_file_access(),
- errmsg("could not open xlog file \"%s\": %m", path)));
+ errmsg("could not open transaction log file \"%s\": %m", path)));
return fd;
}
*/
if (source == XLOG_FROM_ARCHIVE)
{
- char xlogfpath[MAXPGPATH];
- bool reload = false;
- struct stat statbuf;
-
- XLogFilePath(xlogfpath, tli, segno);
- if (stat(xlogfpath, &statbuf) == 0)
- {
- char oldpath[MAXPGPATH];
-#ifdef WIN32
- static unsigned int deletedcounter = 1;
- /*
- * On Windows, if another process (e.g a walsender process) holds
- * the file open in FILE_SHARE_DELETE mode, unlink will succeed,
- * but the file will still show up in directory listing until the
- * last handle is closed, and we cannot rename the new file in its
- * place until that. To avoid that problem, rename the old file to
- * a temporary name first. Use a counter to create a unique
- * filename, because the same file might be restored from the
- * archive multiple times, and a walsender could still be holding
- * onto an old deleted version of it.
- */
- snprintf(oldpath, MAXPGPATH, "%s.deleted%u",
- xlogfpath, deletedcounter++);
- if (rename(xlogfpath, oldpath) != 0)
- {
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not rename file \"%s\" to \"%s\": %m",
- xlogfpath, oldpath)));
- }
-#else
- strncpy(oldpath, xlogfpath, MAXPGPATH);
-#endif
- if (unlink(oldpath) != 0)
- ereport(FATAL,
- (errcode_for_file_access(),
- errmsg("could not remove file \"%s\": %m",
- xlogfpath)));
- reload = true;
- }
-
- if (rename(path, xlogfpath) < 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not rename file \"%s\" to \"%s\": %m",
- path, xlogfpath)));
+ KeepFileRestoredFromArchive(path, xlogfname);
/*
* Set path to point at the new file in pg_xlog.
*/
- strncpy(path, xlogfpath, MAXPGPATH);
-
- /*
- * If the existing segment was replaced, since walsenders might have
- * it open, request them to reload a currently-open segment.
- */
- if (reload)
- WalSndRqstFileReload();
-
- /* Signal walsender that new WAL has arrived */
- if (AllowCascadeReplication())
- WalSndWakeup();
+ snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlogfname);
}
fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
if (source != XLOG_FROM_STREAM)
XLogReceiptTime = GetCurrentTimestamp();
- /* The file header needs to be validated on first access */
- readFileHeaderValidated = false;
-
return fd;
}
if (errno != ENOENT || !notfoundOk) /* unexpected failure? */
char path[MAXPGPATH];
ListCell *cell;
int fd;
+ List *tles;
/*
* Loop looking for a suitable timeline ID: we might need to read any of
* to go backwards; this prevents us from picking up the wrong file when a
* parent timeline extends to higher segment numbers than the child we
* want to read.
- */
- foreach(cell, expectedTLEs)
+ *
+ * If we haven't read the timeline history file yet, read it now, so that
+ * we know which TLIs to scan. We don't save the list in expectedTLEs,
+ * however, unless we actually find a valid segment. That way if there is
+ * neither a timeline history file nor a WAL segment in the archive, and
+ * streaming replication is set up, we'll read the timeline history file
+ * streamed from the master when we start streaming, instead of recovering
+ * with a dummy history generated here.
+ */
+ if (expectedTLEs)
+ tles = expectedTLEs;
+ else
+ tles = readTimeLineHistory(recoveryTargetTLI);
+
+ foreach(cell, tles)
{
TimeLineID tli = ((TimeLineHistoryEntry *) lfirst(cell))->tli;
if (source == XLOG_FROM_ANY || source == XLOG_FROM_ARCHIVE)
{
- fd = XLogFileRead(segno, emode, tli, XLOG_FROM_ARCHIVE, true);
+ fd = XLogFileRead(segno, emode, tli,
+ XLOG_FROM_ARCHIVE, true);
if (fd != -1)
{
elog(DEBUG1, "got WAL segment from archive");
+ if (!expectedTLEs)
+ expectedTLEs = tles;
return fd;
}
}
if (source == XLOG_FROM_ANY || source == XLOG_FROM_PG_XLOG)
{
- fd = XLogFileRead(segno, emode, tli, XLOG_FROM_PG_XLOG, true);
+ fd = XLogFileRead(segno, emode, tli,
+ XLOG_FROM_PG_XLOG, true);
if (fd != -1)
+ {
+ if (!expectedTLEs)
+ expectedTLEs = tles;
return fd;
+ }
}
}
}
/*
- * Get the segno of the latest removed or recycled WAL segment.
- * Returns 0/0 if no WAL segments have been removed since startup.
+ * Throws an error if the given log segment has already been removed or
+ * recycled. The caller should only pass a segment that it knows to have
+ * existed while the server has been running, as this function always
+ * succeeds if no WAL segments have been removed since startup.
+ * 'tli' is only used in the error message.
*/
void
-XLogGetLastRemoved(XLogSegNo *segno)
+CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
+ XLogSegNo lastRemovedSegNo;
SpinLockAcquire(&xlogctl->info_lck);
- *segno = xlogctl->lastRemovedSegNo;
+ lastRemovedSegNo = xlogctl->lastRemovedSegNo;
SpinLockRelease(&xlogctl->info_lck);
+
+ if (segno <= lastRemovedSegNo)
+ {
+ char filename[MAXFNAMELEN];
+
+ XLogFileName(filename, tli, segno);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("requested WAL segment %s has already been removed",
+ filename)));
+ }
}
/*
strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
strcmp(xlde->d_name + 8, lastoff + 8) <= 0)
{
- if (RecoveryInProgress() || XLogArchiveCheckDone(xlde->d_name))
+ if (XLogArchiveCheckDone(xlde->d_name))
{
snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index,
bool get_cleanup_lock, bool keep_buffer)
{
- Buffer buffer;
- Page page;
BkpBlock bkpb;
char *blk;
int i;
if (i == block_index)
{
/* Found it, apply the update */
- buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block,
- RBM_ZERO);
- Assert(BufferIsValid(buffer));
- if (get_cleanup_lock)
- LockBufferForCleanup(buffer);
- else
- LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
-
- page = (Page) BufferGetPage(buffer);
-
- if (bkpb.hole_length == 0)
- {
- memcpy((char *) page, blk, BLCKSZ);
- }
- else
- {
- memcpy((char *) page, blk, bkpb.hole_offset);
- /* must zero-fill the hole */
- MemSet((char *) page + bkpb.hole_offset, 0, bkpb.hole_length);
- memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length),
- blk + bkpb.hole_offset,
- BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
- }
-
- PageSetLSN(page, lsn);
- PageSetTLI(page, ThisTimeLineID);
- MarkBufferDirty(buffer);
-
- if (!keep_buffer)
- UnlockReleaseBuffer(buffer);
-
- return buffer;
+ return RestoreBackupBlockContents(lsn, bkpb, blk, get_cleanup_lock,
+ keep_buffer);
}
blk += BLCKSZ - bkpb.hole_length;
}
/*
- * CRC-check an XLOG record. We do not believe the contents of an XLOG
- * record (other than to the minimal extent of computing the amount of
- * data to read in) until we've checked the CRCs.
- *
- * We assume all of the record (that is, xl_tot_len bytes) has been read
- * into memory at *record. Also, ValidXLogRecordHeader() has accepted the
- * record's header, which means in particular that xl_tot_len is at least
- * SizeOfXlogRecord, so it is safe to fetch xl_len.
- */
-static bool
-RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
-{
- pg_crc32 crc;
- int i;
- uint32 len = record->xl_len;
- BkpBlock bkpb;
- char *blk;
- size_t remaining = record->xl_tot_len;
-
- /* First the rmgr data */
- if (remaining < SizeOfXLogRecord + len)
- {
- /* ValidXLogRecordHeader() should've caught this already... */
- ereport(emode_for_corrupt_record(emode, recptr),
- (errmsg("invalid record length at %X/%X",
- (uint32) (recptr >> 32), (uint32) recptr)));
- return false;
- }
- remaining -= SizeOfXLogRecord + len;
- INIT_CRC32(crc);
- COMP_CRC32(crc, XLogRecGetData(record), len);
-
- /* Add in the backup blocks, if any */
- blk = (char *) XLogRecGetData(record) + len;
- for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
- {
- uint32 blen;
-
- if (!(record->xl_info & XLR_BKP_BLOCK(i)))
- continue;
-
- if (remaining < sizeof(BkpBlock))
- {
- ereport(emode_for_corrupt_record(emode, recptr),
- (errmsg("invalid backup block size in record at %X/%X",
- (uint32) (recptr >> 32), (uint32) recptr)));
- return false;
- }
- memcpy(&bkpb, blk, sizeof(BkpBlock));
-
- if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ)
- {
- ereport(emode_for_corrupt_record(emode, recptr),
- (errmsg("incorrect hole size in record at %X/%X",
- (uint32) (recptr >> 32), (uint32) recptr)));
- return false;
- }
- blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length;
-
- if (remaining < blen)
- {
- ereport(emode_for_corrupt_record(emode, recptr),
- (errmsg("invalid backup block size in record at %X/%X",
- (uint32) (recptr >> 32), (uint32) recptr)));
- return false;
- }
- remaining -= blen;
- COMP_CRC32(crc, blk, blen);
- blk += blen;
- }
-
- /* Check that xl_tot_len agrees with our calculation */
- if (remaining != 0)
- {
- ereport(emode_for_corrupt_record(emode, recptr),
- (errmsg("incorrect total length in record at %X/%X",
- (uint32) (recptr >> 32), (uint32) recptr)));
- return false;
- }
-
- /* Finally include the record header */
- COMP_CRC32(crc, (char *) record, offsetof(XLogRecord, xl_crc));
- FIN_CRC32(crc);
-
- if (!EQ_CRC32(record->xl_crc, crc))
- {
- ereport(emode_for_corrupt_record(emode, recptr),
- (errmsg("incorrect resource manager data checksum in record at %X/%X",
- (uint32) (recptr >> 32), (uint32) recptr)));
- return false;
- }
-
- return true;
-}
-
-/*
- * Attempt to read an XLOG record.
- *
- * If RecPtr is not NULL, try to read a record at that position. Otherwise
- * try to read a record just after the last one previously read.
- *
- * If no valid record is available, returns NULL, or fails if emode is PANIC.
- * (emode must be either PANIC, LOG)
+ * Workhorse for RestoreBackupBlock usable without an xlog record
*
- * The record is copied into readRecordBuf, so that on successful return,
- * the returned record pointer always points there.
+ * Restores a full-page image from BkpBlock and a data pointer.
*/
-static XLogRecord *
-ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
+static Buffer
+RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb, char *blk,
+ bool get_cleanup_lock, bool keep_buffer)
{
- XLogRecord *record;
- XLogRecPtr tmpRecPtr = EndRecPtr;
- bool randAccess = false;
- uint32 len,
- total_len;
- uint32 targetRecOff;
- uint32 pageHeaderSize;
- bool gotheader;
-
- if (readBuf == NULL)
- {
- /*
- * First time through, permanently allocate readBuf. We do it this
- * way, rather than just making a static array, for two reasons: (1)
- * no need to waste the storage in most instantiations of the backend;
- * (2) a static char array isn't guaranteed to have any particular
- * alignment, whereas malloc() will provide MAXALIGN'd storage.
- */
- readBuf = (char *) malloc(XLOG_BLCKSZ);
- Assert(readBuf != NULL);
- }
-
- if (RecPtr == NULL)
- {
- RecPtr = &tmpRecPtr;
-
- /*
- * RecPtr is pointing to end+1 of the previous WAL record. If
- * we're at a page boundary, no more records can fit on the current
- * page. We must skip over the page header, but we can't do that
- * until we've read in the page, since the header size is variable.
- */
- }
- else
- {
- /*
- * In this case, the passed-in record pointer should already be
- * pointing to a valid record starting position.
- */
- if (!XRecOffIsValid(*RecPtr))
- ereport(PANIC,
- (errmsg("invalid record offset at %X/%X",
- (uint32) (*RecPtr >> 32), (uint32) *RecPtr)));
-
- /*
- * Since we are going to a random position in WAL, forget any prior
- * state about what timeline we were in, and allow it to be any
- * timeline in expectedTLEs. We also set a flag to allow curFileTLI
- * to go backwards (but we can't reset that variable right here, since
- * we might not change files at all).
- */
- /* see comment in ValidXLogPageHeader */
- lastPageTLI = lastSegmentTLI = 0;
- randAccess = true; /* allow curFileTLI to go backwards too */
- }
-
- /* This is the first try to read this page. */
- lastSourceFailed = false;
-retry:
- /* Read the page containing the record */
- if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess))
- return NULL;
-
- pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
- targetRecOff = (*RecPtr) % XLOG_BLCKSZ;
- if (targetRecOff == 0)
- {
- /*
- * At page start, so skip over page header. The Assert checks that
- * we're not scribbling on caller's record pointer; it's OK because we
- * can only get here in the continuing-from-prev-record case, since
- * XRecOffIsValid rejected the zero-page-offset case otherwise.
- */
- Assert(RecPtr == &tmpRecPtr);
- (*RecPtr) += pageHeaderSize;
- targetRecOff = pageHeaderSize;
- }
- else if (targetRecOff < pageHeaderSize)
- {
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errmsg("invalid record offset at %X/%X",
- (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
- goto next_record_is_invalid;
- }
- if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
- targetRecOff == pageHeaderSize)
- {
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errmsg("contrecord is requested by %X/%X",
- (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
- goto next_record_is_invalid;
- }
-
- /*
- * Read the record length.
- *
- * NB: Even though we use an XLogRecord pointer here, the whole record
- * header might not fit on this page. xl_tot_len is the first field of
- * the struct, so it must be on this page (the records are MAXALIGNed),
- * but we cannot access any other fields until we've verified that we
- * got the whole header.
- */
- record = (XLogRecord *) (readBuf + (*RecPtr) % XLOG_BLCKSZ);
- total_len = record->xl_tot_len;
-
- /*
- * If the whole record header is on this page, validate it immediately.
- * Otherwise do just a basic sanity check on xl_tot_len, and validate the
- * rest of the header after reading it from the next page. The xl_tot_len
- * check is necessary here to ensure that we enter the "Need to reassemble
- * record" code path below; otherwise we might fail to apply
- * ValidXLogRecordHeader at all.
- */
- if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
- {
- if (!ValidXLogRecordHeader(RecPtr, record, emode, randAccess))
- goto next_record_is_invalid;
- gotheader = true;
- }
- else
- {
- if (total_len < SizeOfXLogRecord)
- {
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errmsg("invalid record length at %X/%X",
- (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
- goto next_record_is_invalid;
- }
- gotheader = false;
- }
-
- /*
- * Allocate or enlarge readRecordBuf as needed. To avoid useless small
- * increases, round its size to a multiple of XLOG_BLCKSZ, and make sure
- * it's at least 4*Max(BLCKSZ, XLOG_BLCKSZ) to start with. (That is
- * enough for all "normal" records, but very large commit or abort records
- * might need more space.)
- */
- if (total_len > readRecordBufSize)
- {
- uint32 newSize = total_len;
-
- newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
- newSize = Max(newSize, 4 * Max(BLCKSZ, XLOG_BLCKSZ));
- if (readRecordBuf)
- free(readRecordBuf);
- readRecordBuf = (char *) malloc(newSize);
- if (!readRecordBuf)
- {
- readRecordBufSize = 0;
- /* We treat this as a "bogus data" condition */
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errmsg("record length %u at %X/%X too long",
- total_len, (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
- goto next_record_is_invalid;
- }
- readRecordBufSize = newSize;
- }
-
- len = XLOG_BLCKSZ - (*RecPtr) % XLOG_BLCKSZ;
- if (total_len > len)
- {
- /* Need to reassemble record */
- char *contrecord;
- XLogPageHeader pageHeader;
- XLogRecPtr pagelsn;
- char *buffer;
- uint32 gotlen;
-
- /* Initialize pagelsn to the beginning of the page this record is on */
- pagelsn = ((*RecPtr) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
-
- /* Copy the first fragment of the record from the first page. */
- memcpy(readRecordBuf, readBuf + (*RecPtr) % XLOG_BLCKSZ, len);
- buffer = readRecordBuf + len;
- gotlen = len;
-
- do
- {
- /* Calculate pointer to beginning of next page */
- XLByteAdvance(pagelsn, XLOG_BLCKSZ);
- /* Wait for the next page to become available */
- if (!XLogPageRead(&pagelsn, emode, false, false))
- return NULL;
-
- /* Check that the continuation on next page looks valid */
- pageHeader = (XLogPageHeader) readBuf;
- if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
- {
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errmsg("there is no contrecord flag in log segment %s, offset %u",
- XLogFileNameP(curFileTLI, readSegNo),
- readOff)));
- goto next_record_is_invalid;
- }
- /*
- * Cross-check that xlp_rem_len agrees with how much of the record
- * we expect there to be left.
- */
- if (pageHeader->xlp_rem_len == 0 ||
- total_len != (pageHeader->xlp_rem_len + gotlen))
- {
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errmsg("invalid contrecord length %u in log segment %s, offset %u",
- pageHeader->xlp_rem_len,
- XLogFileNameP(curFileTLI, readSegNo),
- readOff)));
- goto next_record_is_invalid;
- }
-
- /* Append the continuation from this page to the buffer */
- pageHeaderSize = XLogPageHeaderSize(pageHeader);
- contrecord = (char *) readBuf + pageHeaderSize;
- len = XLOG_BLCKSZ - pageHeaderSize;
- if (pageHeader->xlp_rem_len < len)
- len = pageHeader->xlp_rem_len;
- memcpy(buffer, (char *) contrecord, len);
- buffer += len;
- gotlen += len;
-
- /* If we just reassembled the record header, validate it. */
- if (!gotheader)
- {
- record = (XLogRecord *) readRecordBuf;
- if (!ValidXLogRecordHeader(RecPtr, record, emode, randAccess))
- goto next_record_is_invalid;
- gotheader = true;
- }
- } while (pageHeader->xlp_rem_len > len);
-
- record = (XLogRecord *) readRecordBuf;
- if (!RecordIsValid(record, *RecPtr, emode))
- goto next_record_is_invalid;
- pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
- XLogSegNoOffsetToRecPtr(
- readSegNo,
- readOff + pageHeaderSize + MAXALIGN(pageHeader->xlp_rem_len),
- EndRecPtr);
- ReadRecPtr = *RecPtr;
- }
- else
- {
- /* Record does not cross a page boundary */
- if (!RecordIsValid(record, *RecPtr, emode))
- goto next_record_is_invalid;
- EndRecPtr = *RecPtr + MAXALIGN(total_len);
-
- ReadRecPtr = *RecPtr;
- memcpy(readRecordBuf, record, total_len);
- }
-
- /*
- * Special processing if it's an XLOG SWITCH record
- */
- if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
- {
- /* Pretend it extends to end of segment */
- EndRecPtr += XLogSegSize - 1;
- EndRecPtr -= EndRecPtr % XLogSegSize;
-
- /*
- * Pretend that readBuf contains the last page of the segment. This is
- * just to avoid Assert failure in StartupXLOG if XLOG ends with this
- * segment.
- */
- readOff = XLogSegSize - XLOG_BLCKSZ;
- }
- return record;
-
-next_record_is_invalid:
- lastSourceFailed = true;
-
- if (readFile >= 0)
- {
- close(readFile);
- readFile = -1;
- }
+ Buffer buffer;
+ Page page;
- /* In standby-mode, keep trying */
- if (StandbyMode)
- goto retry;
+ buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block,
+ RBM_ZERO);
+ Assert(BufferIsValid(buffer));
+ if (get_cleanup_lock)
+ LockBufferForCleanup(buffer);
else
- return NULL;
-}
-
-/*
- * Check whether the xlog header of a page just read in looks valid.
- *
- * This is just a convenience subroutine to avoid duplicated code in
- * ReadRecord. It's not intended for use from anywhere else.
- */
-static bool
-ValidXLogPageHeader(XLogPageHeader hdr, int emode, bool segmentonly)
-{
- XLogRecPtr recaddr;
-
- XLogSegNoOffsetToRecPtr(readSegNo, readOff, recaddr);
-
- if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
- {
- ereport(emode_for_corrupt_record(emode, recaddr),
- (errmsg("invalid magic number %04X in log segment %s, offset %u",
- hdr->xlp_magic,
- XLogFileNameP(curFileTLI, readSegNo),
- readOff)));
- return false;
- }
- if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
- {
- ereport(emode_for_corrupt_record(emode, recaddr),
- (errmsg("invalid info bits %04X in log segment %s, offset %u",
- hdr->xlp_info,
- XLogFileNameP(curFileTLI, readSegNo),
- readOff)));
- return false;
- }
- if (hdr->xlp_info & XLP_LONG_HEADER)
- {
- XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
- if (longhdr->xlp_sysid != ControlFile->system_identifier)
- {
- char fhdrident_str[32];
- char sysident_str[32];
+ page = (Page) BufferGetPage(buffer);
- /*
- * Format sysids separately to keep platform-dependent format code
- * out of the translatable message string.
- */
- snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT,
- longhdr->xlp_sysid);
- snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT,
- ControlFile->system_identifier);
- ereport(emode_for_corrupt_record(emode, recaddr),
- (errmsg("WAL file is from different database system"),
- errdetail("WAL file database system identifier is %s, pg_control database system identifier is %s.",
- fhdrident_str, sysident_str)));
- return false;
- }
- if (longhdr->xlp_seg_size != XLogSegSize)
- {
- ereport(emode_for_corrupt_record(emode, recaddr),
- (errmsg("WAL file is from different database system"),
- errdetail("Incorrect XLOG_SEG_SIZE in page header.")));
- return false;
- }
- if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
- {
- ereport(emode_for_corrupt_record(emode, recaddr),
- (errmsg("WAL file is from different database system"),
- errdetail("Incorrect XLOG_BLCKSZ in page header.")));
- return false;
- }
- }
- else if (readOff == 0)
+ if (bkpb.hole_length == 0)
{
- /* hmm, first page of file doesn't have a long header? */
- ereport(emode_for_corrupt_record(emode, recaddr),
- (errmsg("invalid info bits %04X in log segment %s, offset %u",
- hdr->xlp_info,
- XLogFileNameP(curFileTLI, readSegNo),
- readOff)));
- return false;
+ memcpy((char *) page, blk, BLCKSZ);
}
-
- if (!XLByteEQ(hdr->xlp_pageaddr, recaddr))
+ else
{
- ereport(emode_for_corrupt_record(emode, recaddr),
- (errmsg("unexpected pageaddr %X/%X in log segment %s, offset %u",
- (uint32) (hdr->xlp_pageaddr >> 32), (uint32) hdr->xlp_pageaddr,
- XLogFileNameP(curFileTLI, readSegNo),
- readOff)));
- return false;
+ memcpy((char *) page, blk, bkpb.hole_offset);
+ /* must zero-fill the hole */
+ MemSet((char *) page + bkpb.hole_offset, 0, bkpb.hole_length);
+ memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length),
+ blk + bkpb.hole_offset,
+ BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
}
/*
- * Check page TLI is one of the expected values.
+ * The checksum value on this page is currently invalid. We don't need to
+ * reset it here since it will be set before being written.
*/
- if (!tliInHistory(hdr->xlp_tli, expectedTLEs))
- {
- ereport(emode_for_corrupt_record(emode, recaddr),
- (errmsg("unexpected timeline ID %u in log segment %s, offset %u",
- hdr->xlp_tli,
- XLogFileNameP(curFileTLI, readSegNo),
- readOff)));
- return false;
- }
- /*
- * Since child timelines are always assigned a TLI greater than their
- * immediate parent's TLI, we should never see TLI go backwards across
- * successive pages of a consistent WAL sequence.
- *
- * Of course this check should only be applied when advancing sequentially
- * across pages; therefore ReadRecord resets lastPageTLI and
- * lastSegmentTLI to zero when going to a random page.
- *
- * Sometimes we re-open a segment that's already been partially replayed.
- * In that case we cannot perform the normal TLI check: if there is a
- * timeline switch within the segment, the first page has a smaller TLI
- * than later pages following the timeline switch, and we might've read
- * them already. As a weaker test, we still check that it's not smaller
- * than the TLI we last saw at the beginning of a segment. Pass
- * segmentonly = true when re-validating the first page like that, and the
- * page you're actually interested in comes later.
- */
- if (hdr->xlp_tli < (segmentonly ? lastSegmentTLI : lastPageTLI))
- {
- ereport(emode_for_corrupt_record(emode, recaddr),
- (errmsg("out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u",
- hdr->xlp_tli,
- segmentonly ? lastSegmentTLI : lastPageTLI,
- XLogFileNameP(curFileTLI, readSegNo),
- readOff)));
- return false;
- }
- lastPageTLI = hdr->xlp_tli;
- if (readOff == 0)
- lastSegmentTLI = hdr->xlp_tli;
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
- return true;
+ if (!keep_buffer)
+ UnlockReleaseBuffer(buffer);
+
+ return buffer;
}
/*
- * Validate an XLOG record header.
+ * Attempt to read an XLOG record.
+ *
+ * If RecPtr is not NULL, try to read a record at that position. Otherwise
+ * try to read a record just after the last one previously read.
*
- * This is just a convenience subroutine to avoid duplicated code in
- * ReadRecord. It's not intended for use from anywhere else.
+ * If no valid record is available, returns NULL, or fails if emode is PANIC.
+ * (emode must be either PANIC, LOG). In standby mode, retries until a valid
+ * record is available.
+ *
+ * The record is copied into readRecordBuf, so that on successful return,
+ * the returned record pointer always points there.
*/
-static bool
-ValidXLogRecordHeader(XLogRecPtr *RecPtr, XLogRecord *record, int emode,
- bool randAccess)
+static XLogRecord *
+ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
+ bool fetching_ckpt)
{
- /*
- * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
- * required.
- */
- if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
+ XLogRecord *record;
+ XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
+
+ /* Pass through parameters to XLogPageRead */
+ private->fetching_ckpt = fetching_ckpt;
+ private->emode = emode;
+ private->randAccess = (RecPtr != InvalidXLogRecPtr);
+
+ /* This is the first attempt to read this page. */
+ lastSourceFailed = false;
+
+ for (;;)
{
- if (record->xl_len != 0)
+ char *errormsg;
+
+ record = XLogReadRecord(xlogreader, RecPtr, &errormsg);
+ ReadRecPtr = xlogreader->ReadRecPtr;
+ EndRecPtr = xlogreader->EndRecPtr;
+ if (record == NULL)
{
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errmsg("invalid xlog switch record at %X/%X",
- (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
- return false;
+ if (readFile >= 0)
+ {
+ close(readFile);
+ readFile = -1;
+ }
+
+ /*
+ * We only end up here without a message when XLogPageRead()
+ * failed - in that case we already logged something. In
+ * StandbyMode that only happens if we have been triggered, so we
+ * shouldn't loop anymore in that case.
+ */
+ if (errormsg)
+ ereport(emode_for_corrupt_record(emode,
+ RecPtr ? RecPtr : EndRecPtr),
+ (errmsg_internal("%s", errormsg) /* already translated */ ));
}
- }
- else if (record->xl_len == 0)
- {
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errmsg("record with zero length at %X/%X",
- (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
- return false;
- }
- if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len ||
- record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
- XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
- {
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errmsg("invalid record length at %X/%X",
- (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
- return false;
- }
- if (record->xl_rmid > RM_MAX_ID)
- {
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errmsg("invalid resource manager ID %u at %X/%X",
- record->xl_rmid, (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
- return false;
- }
- if (randAccess)
- {
+
/*
- * We can't exactly verify the prev-link, but surely it should be less
- * than the record's own address.
+ * Check page TLI is one of the expected values.
*/
- if (!XLByteLT(record->xl_prev, *RecPtr))
+ else if (!tliInHistory(xlogreader->latestPageTLI, expectedTLEs))
{
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errmsg("record with incorrect prev-link %X/%X at %X/%X",
- (uint32) (record->xl_prev >> 32), (uint32) record->xl_prev,
- (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
- return false;
+ char fname[MAXFNAMELEN];
+ XLogSegNo segno;
+ int32 offset;
+
+ XLByteToSeg(xlogreader->latestPagePtr, segno);
+ offset = xlogreader->latestPagePtr % XLogSegSize;
+ XLogFileName(fname, xlogreader->readPageTLI, segno);
+ ereport(emode_for_corrupt_record(emode,
+ RecPtr ? RecPtr : EndRecPtr),
+ (errmsg("unexpected timeline ID %u in log segment %s, offset %u",
+ xlogreader->latestPageTLI,
+ fname,
+ offset)));
+ record = NULL;
}
- }
- else
- {
- /*
- * Record's prev-link should exactly match our previous location. This
- * check guards against torn WAL pages where a stale but valid-looking
- * WAL record starts on a sector boundary.
- */
- if (!XLByteEQ(record->xl_prev, ReadRecPtr))
+
+ if (record)
{
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errmsg("record with incorrect prev-link %X/%X at %X/%X",
- (uint32) (record->xl_prev >> 32), (uint32) record->xl_prev,
- (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
- return false;
+ /* Great, got a record */
+ return record;
}
- }
+ else
+ {
+ /* No valid record available from this source */
+ lastSourceFailed = true;
- return true;
+ /*
+ * If archive recovery was requested, but we were still doing
+ * crash recovery, switch to archive recovery and retry using the
+ * offline archive. We have now replayed all the valid WAL in
+ * pg_xlog, so we are presumably now consistent.
+ *
+ * We require that there's at least some valid WAL present in
+ * pg_xlog, however (!fetch_ckpt). We could recover using the WAL
+ * from the archive, even if pg_xlog is completely empty, but we'd
+ * have no idea how far we'd have to replay to reach consistency.
+ * So err on the safe side and give up.
+ */
+ if (!InArchiveRecovery && ArchiveRecoveryRequested &&
+ !fetching_ckpt)
+ {
+ ereport(DEBUG1,
+ (errmsg_internal("reached end of WAL in pg_xlog, entering archive recovery")));
+ InArchiveRecovery = true;
+ if (StandbyModeRequested)
+ StandbyMode = true;
+
+ /* initialize minRecoveryPoint to this record */
+ LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+ ControlFile->state = DB_IN_ARCHIVE_RECOVERY;
+ if (ControlFile->minRecoveryPoint < EndRecPtr)
+ {
+ ControlFile->minRecoveryPoint = EndRecPtr;
+ ControlFile->minRecoveryPointTLI = ThisTimeLineID;
+ }
+ /* update local copy */
+ minRecoveryPoint = ControlFile->minRecoveryPoint;
+ minRecoveryPointTLI = ControlFile->minRecoveryPointTLI;
+
+ UpdateControlFile();
+ LWLockRelease(ControlFileLock);
+
+ CheckRecoveryConsistency();
+
+ /*
+ * Before we retry, reset lastSourceFailed and currentSource
+ * so that we will check the archive next.
+ */
+ lastSourceFailed = false;
+ currentSource = 0;
+
+ continue;
+ }
+
+ /* In standby mode, loop back to retry. Otherwise, give up. */
+ if (StandbyMode && !CheckForStandbyTrigger())
+ continue;
+ else
+ return NULL;
+ }
+ }
}
/*
bool found;
ListCell *cell;
TimeLineID newtarget;
+ TimeLineID oldtarget = recoveryTargetTLI;
TimeLineHistoryEntry *currentTle = NULL;
- /* use volatile pointer to prevent code rearrangement */
newtarget = findNewestTimeLine(recoveryTargetTLI);
if (newtarget == recoveryTargetTLI)
newExpectedTLEs = readTimeLineHistory(newtarget);
/*
- * If the current timeline is not part of the history of the new
- * timeline, we cannot proceed to it.
+ * If the current timeline is not part of the history of the new timeline,
+ * we cannot proceed to it.
*/
found = false;
- foreach (cell, newExpectedTLEs)
+ foreach(cell, newExpectedTLEs)
{
currentTle = (TimeLineHistoryEntry *) lfirst(cell);
* next timeline was forked off from it *after* the current recovery
* location.
*/
- if (XLByteLT(currentTle->end, EndRecPtr))
+ if (currentTle->end < EndRecPtr)
{
ereport(LOG,
(errmsg("new timeline %u forked off current database system timeline %u before current recovery point %X/%X",
list_free_deep(expectedTLEs);
expectedTLEs = newExpectedTLEs;
+ /*
+ * As in StartupXLOG(), try to ensure we have all the history files
+ * between the old target and new target in pg_xlog.
+ */
+ restoreTimeLineHistoryFiles(oldtarget + 1, newtarget);
+
ereport(LOG,
(errmsg("new target timeline is %u",
recoveryTargetTLI)));
return ControlFile->system_identifier;
}
+/*
+ * Are checksums enabled for data pages?
+ */
+bool
+DataChecksumsEnabled(void)
+{
+ Assert(ControlFile != NULL);
+ return (ControlFile->data_checksum_version > 0);
+}
+
+/*
+ * Returns a fake LSN for unlogged relations.
+ *
+ * Each call generates an LSN that is greater than any previous value
+ * returned. The current counter value is saved and restored across clean
+ * shutdowns, but like unlogged relations, does not survive a crash. This can
+ * be used in lieu of real LSN values returned by XLogInsert, if you need an
+ * LSN-like increasing sequence of numbers without writing any WAL.
+ */
+XLogRecPtr
+GetFakeLSNForUnloggedRel(void)
+{
+ XLogRecPtr nextUnloggedLSN;
+
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+
+ /* increment the unloggedLSN counter, need SpinLock */
+ SpinLockAcquire(&xlogctl->ulsn_lck);
+ nextUnloggedLSN = xlogctl->unloggedLSN++;
+ SpinLockRelease(&xlogctl->ulsn_lck);
+
+ return nextUnloggedLSN;
+}
+
/*
* Auto-tune the number of XLOG buffers.
*
/* XLogCtl */
size = sizeof(XLogCtlData);
+
+ /* xlog insertion slots, plus alignment */
+ size = add_size(size, mul_size(sizeof(XLogInsertSlotPadded), num_xloginsert_slots + 1));
/* xlblocks array */
size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers));
/* extra alignment padding for XLOG I/O buffers */
- size = add_size(size, ALIGNOF_XLOG_BUFFER);
+ size = add_size(size, XLOG_BLCKSZ);
/* and the buffers themselves */
size = add_size(size, mul_size(XLOG_BLCKSZ, XLOGbuffers));
bool foundCFile,
foundXLog;
char *allocptr;
+ int i;
ControlFile = (ControlFileData *)
ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile);
Assert(foundCFile && foundXLog);
return;
}
-
memset(XLogCtl, 0, sizeof(XLogCtlData));
/*
memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
+ /* Xlog insertion slots. Ensure they're aligned to the full padded size */
+ allocptr += sizeof(XLogInsertSlotPadded) -
+ ((uintptr_t) allocptr) % sizeof(XLogInsertSlotPadded);
+ XLogCtl->Insert.insertSlots = (XLogInsertSlotPadded *) allocptr;
+ allocptr += sizeof(XLogInsertSlotPadded) * num_xloginsert_slots;
+
/*
- * Align the start of the page buffers to an ALIGNOF_XLOG_BUFFER boundary.
+ * Align the start of the page buffers to a full xlog block size boundary.
+ * This simplifies some calculations in XLOG insertion. It is also required
+ * for O_DIRECT.
*/
- allocptr = (char *) TYPEALIGN(ALIGNOF_XLOG_BUFFER, allocptr);
+ allocptr = (char *) TYPEALIGN(XLOG_BLCKSZ, allocptr);
XLogCtl->pages = allocptr;
memset(XLogCtl->pages, 0, (Size) XLOG_BLCKSZ * XLOGbuffers);
XLogCtl->SharedRecoveryInProgress = true;
XLogCtl->SharedHotStandbyActive = false;
XLogCtl->WalWriterSleeping = false;
- XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
+
+ for (i = 0; i < num_xloginsert_slots; i++)
+ {
+ XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[i].slot;
+ SpinLockInit(&slot->mutex);
+ slot->xlogInsertingAt = InvalidXLogRecPtr;
+ slot->owner = NULL;
+
+ slot->releaseOK = true;
+ slot->exclusive = 0;
+ slot->head = NULL;
+ slot->tail = NULL;
+ }
+
+ SpinLockInit(&XLogCtl->Insert.insertpos_lck);
SpinLockInit(&XLogCtl->info_lck);
+ SpinLockInit(&XLogCtl->ulsn_lck);
InitSharedLatch(&XLogCtl->recoveryWakeupLatch);
/*
ThisTimeLineID = 1;
/* page buffer must be aligned suitably for O_DIRECT */
- buffer = (char *) palloc(XLOG_BLCKSZ + ALIGNOF_XLOG_BUFFER);
- page = (XLogPageHeader) TYPEALIGN(ALIGNOF_XLOG_BUFFER, buffer);
+ buffer = (char *) palloc(XLOG_BLCKSZ + XLOG_BLCKSZ);
+ page = (XLogPageHeader) TYPEALIGN(XLOG_BLCKSZ, buffer);
memset(page, 0, XLOG_BLCKSZ);
/*
*/
checkPoint.redo = XLogSegSize + SizeOfXLogLongPHD;
checkPoint.ThisTimeLineID = ThisTimeLineID;
+ checkPoint.PrevTimeLineID = ThisTimeLineID;
checkPoint.fullPageWrites = fullPageWrites;
checkPoint.nextXidEpoch = 0;
checkPoint.nextXid = FirstNormalTransactionId;
checkPoint.nextMultiOffset = 0;
checkPoint.oldestXid = FirstNormalTransactionId;
checkPoint.oldestXidDB = TemplateDbOid;
+ checkPoint.oldestMulti = FirstMultiXactId;
+ checkPoint.oldestMultiDB = TemplateDbOid;
checkPoint.time = (pg_time_t) time(NULL);
checkPoint.oldestActiveXid = InvalidTransactionId;
ShmemVariableCache->oidCount = 0;
MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+ SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
/* Set up the XLOG page header */
page->xlp_magic = XLOG_PAGE_MAGIC;
ControlFile->time = checkPoint.time;
ControlFile->checkPoint = checkPoint.redo;
ControlFile->checkPointCopy = checkPoint;
+ ControlFile->unloggedLSN = 1;
/* Set important parameter values for use when replaying WAL */
ControlFile->MaxConnections = MaxConnections;
+ ControlFile->max_worker_processes = max_worker_processes;
ControlFile->max_prepared_xacts = max_prepared_xacts;
ControlFile->max_locks_per_xact = max_locks_per_xact;
ControlFile->wal_level = wal_level;
+ ControlFile->data_checksum_version = bootstrap_data_checksum_version;
/* some additional ControlFile fields are set in WriteControlFile() */
}
else if (strcmp(item->name, "standby_mode") == 0)
{
- if (!parse_bool(item->value, &StandbyMode))
+ if (!parse_bool(item->value, &StandbyModeRequested))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("parameter \"%s\" requires a Boolean value",
/*
* Check for compulsory parameters
*/
- if (StandbyMode)
+ if (StandbyModeRequested)
{
if (PrimaryConnInfo == NULL && recoveryRestoreCommand == NULL)
ereport(WARNING,
}
/* Enable fetching from archive recovery area */
- InArchiveRecovery = true;
+ ArchiveRecoveryRequested = true;
/*
* If user specified recovery_target_timeline, validate it or compute the
RecoveryRequiresIntParameter("max_connections",
MaxConnections,
ControlFile->MaxConnections);
+ RecoveryRequiresIntParameter("max_worker_processes",
+ max_worker_processes,
+ ControlFile->max_worker_processes);
RecoveryRequiresIntParameter("max_prepared_transactions",
max_prepared_xacts,
ControlFile->max_prepared_xacts);
checkPointLoc,
EndOfLog;
XLogSegNo endLogSegNo;
+ TimeLineID PrevTimeLineID;
XLogRecord *record;
- uint32 freespace;
TransactionId oldestActiveXID;
bool backupEndRequired = false;
bool backupFromStandby = false;
DBState dbstate_at_startup;
+ XLogReaderState *xlogreader;
+ XLogPageReadPrivate private;
+ bool fast_promoted = false;
/*
* Read control file and check XLOG status looks valid.
(errmsg("control file contains invalid data")));
if (ControlFile->state == DB_SHUTDOWNED)
- ereport(LOG,
+ {
+ /* This is the expected case, so don't be chatty in standalone mode */
+ ereport(IsPostmasterEnvironment ? LOG : NOTICE,
(errmsg("database system was shut down at %s",
str_time(ControlFile->time))));
+ }
else if (ControlFile->state == DB_SHUTDOWNED_IN_RECOVERY)
ereport(LOG,
(errmsg("database system was shut down in recovery at %s",
RelationCacheInitFileRemove();
/*
- * Initialize on the assumption we want to recover to the same timeline
+ * Initialize on the assumption we want to recover to the latest timeline
* that's active according to pg_control.
*/
- recoveryTargetTLI = ControlFile->checkPointCopy.ThisTimeLineID;
+ if (ControlFile->minRecoveryPointTLI >
+ ControlFile->checkPointCopy.ThisTimeLineID)
+ recoveryTargetTLI = ControlFile->minRecoveryPointTLI;
+ else
+ recoveryTargetTLI = ControlFile->checkPointCopy.ThisTimeLineID;
/*
* Check for recovery control file, and if so set up state for offline
*/
readRecoveryCommandFile();
- /* Now we can determine the list of expected TLIs */
- expectedTLEs = readTimeLineHistory(recoveryTargetTLI);
-
- /*
- * If the location of the checkpoint record is not on the expected
- * timeline in the history of the requested timeline, we cannot proceed:
- * the backup is not part of the history of the requested timeline.
- */
- if (tliOfPointInHistory(ControlFile->checkPoint, expectedTLEs) !=
- ControlFile->checkPointCopy.ThisTimeLineID)
- {
- XLogRecPtr switchpoint;
-
- /*
- * tliSwitchPoint will throw an error if the checkpoint's timeline
- * is not in expectedTLEs at all.
- */
- switchpoint = tliSwitchPoint(ControlFile->checkPointCopy.ThisTimeLineID, expectedTLEs);
- ereport(FATAL,
- (errmsg("requested timeline %u is not a child of this server's history",
- recoveryTargetTLI),
- errdetail("Latest checkpoint is at %X/%X on timeline %u, but in the history of the requested timeline, the server forked off from that timeline at %X/%X",
- (uint32) (ControlFile->checkPoint >> 32),
- (uint32) ControlFile->checkPoint,
- ControlFile->checkPointCopy.ThisTimeLineID,
- (uint32) (switchpoint >> 32),
- (uint32) switchpoint)));
- }
-
- /*
- * The min recovery point should be part of the requested timeline's
- * history, too.
- */
- if (!XLogRecPtrIsInvalid(ControlFile->minRecoveryPoint) &&
- tliOfPointInHistory(ControlFile->minRecoveryPoint - 1, expectedTLEs) !=
- ControlFile->minRecoveryPointTLI)
- ereport(FATAL,
- (errmsg("requested timeline %u does not contain minimum recovery point %X/%X on timeline %u",
- recoveryTargetTLI,
- (uint32) (ControlFile->minRecoveryPoint >> 32),
- (uint32) ControlFile->minRecoveryPoint,
- ControlFile->minRecoveryPointTLI)));
-
/*
* Save archive_cleanup_command in shared memory so that other processes
* can see it.
archiveCleanupCommand ? archiveCleanupCommand : "",
sizeof(XLogCtl->archiveCleanupCommand));
- if (InArchiveRecovery)
+ if (ArchiveRecoveryRequested)
{
- if (StandbyMode)
+ if (StandbyModeRequested)
ereport(LOG,
(errmsg("entering standby mode")));
else if (recoveryTarget == RECOVERY_TARGET_XID)
* Take ownership of the wakeup latch if we're going to sleep during
* recovery.
*/
- if (StandbyMode)
+ if (StandbyModeRequested)
OwnLatch(&XLogCtl->recoveryWakeupLatch);
+ /* Set up XLOG reader facility */
+ MemSet(&private, 0, sizeof(XLogPageReadPrivate));
+ xlogreader = XLogReaderAllocate(&XLogPageRead, &private);
+ if (!xlogreader)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory"),
+ errdetail("Failed while allocating an XLog reading processor.")));
+ xlogreader->system_identifier = ControlFile->system_identifier;
+
if (read_backup_label(&checkPointLoc, &backupEndRequired,
&backupFromStandby))
{
+ /*
+ * Archive recovery was requested, and thanks to the backup label
+ * file, we know how far we need to replay to reach consistency. Enter
+ * archive recovery directly.
+ */
+ InArchiveRecovery = true;
+ if (StandbyModeRequested)
+ StandbyMode = true;
+
/*
* When a backup_label file is present, we want to roll forward from
* the checkpoint it identifies, rather than using pg_control.
*/
- record = ReadCheckpointRecord(checkPointLoc, 0);
+ record = ReadCheckpointRecord(xlogreader, checkPointLoc, 0, true);
if (record != NULL)
{
memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
ereport(DEBUG1,
(errmsg("checkpoint record is at %X/%X",
- (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
+ (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
InRecovery = true; /* force recovery even if SHUTDOWNED */
/*
* backup_label around that references a WAL segment that's
* already been archived.
*/
- if (XLByteLT(checkPoint.redo, checkPointLoc))
+ if (checkPoint.redo < checkPointLoc)
{
- if (!ReadRecord(&(checkPoint.redo), LOG, false))
+ if (!ReadRecord(xlogreader, checkPoint.redo, LOG, false))
ereport(FATAL,
(errmsg("could not find redo location referenced by checkpoint record"),
errhint("If you are not restoring from a backup, try removing the file \"%s/backup_label\".", DataDir)));
}
else
{
+ /*
+ * It's possible that archive recovery was requested, but we don't
+ * know how far we need to replay the WAL before we reach consistency.
+ * This can happen for example if a base backup is taken from a
+ * running server using an atomic filesystem snapshot, without calling
+ * pg_start/stop_backup. Or if you just kill a running master server
+ * and put it into archive recovery by creating a recovery.conf file.
+ *
+ * Our strategy in that case is to perform crash recovery first,
+ * replaying all the WAL present in pg_xlog, and only enter archive
+ * recovery after that.
+ *
+ * But usually we already know how far we need to replay the WAL (up
+ * to minRecoveryPoint, up to backupEndPoint, or until we see an
+ * end-of-backup record), and we can enter archive recovery directly.
+ */
+ if (ArchiveRecoveryRequested &&
+ (ControlFile->minRecoveryPoint != InvalidXLogRecPtr ||
+ ControlFile->backupEndRequired ||
+ ControlFile->backupEndPoint != InvalidXLogRecPtr ||
+ ControlFile->state == DB_SHUTDOWNED))
+ {
+ InArchiveRecovery = true;
+ if (StandbyModeRequested)
+ StandbyMode = true;
+ }
+
/*
* Get the last valid checkpoint record. If the latest one according
* to pg_control is broken, try the next-to-last one.
*/
checkPointLoc = ControlFile->checkPoint;
RedoStartLSN = ControlFile->checkPointCopy.redo;
- record = ReadCheckpointRecord(checkPointLoc, 1);
+ record = ReadCheckpointRecord(xlogreader, checkPointLoc, 1, true);
if (record != NULL)
{
ereport(DEBUG1,
(errmsg("checkpoint record is at %X/%X",
- (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
+ (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
}
else if (StandbyMode)
{
else
{
checkPointLoc = ControlFile->prevCheckPoint;
- record = ReadCheckpointRecord(checkPointLoc, 2);
+ record = ReadCheckpointRecord(xlogreader, checkPointLoc, 2, true);
if (record != NULL)
{
ereport(LOG,
(errmsg("using previous checkpoint record at %X/%X",
- (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
+ (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
InRecovery = true; /* force recovery even if SHUTDOWNED */
}
else
wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
}
+ /*
+ * If the location of the checkpoint record is not on the expected
+ * timeline in the history of the requested timeline, we cannot proceed:
+ * the backup is not part of the history of the requested timeline.
+ */
+ Assert(expectedTLEs); /* was initialized by reading checkpoint
+ * record */
+ if (tliOfPointInHistory(checkPointLoc, expectedTLEs) !=
+ checkPoint.ThisTimeLineID)
+ {
+ XLogRecPtr switchpoint;
+
+ /*
+ * tliSwitchPoint will throw an error if the checkpoint's timeline is
+ * not in expectedTLEs at all.
+ */
+ switchpoint = tliSwitchPoint(ControlFile->checkPointCopy.ThisTimeLineID, expectedTLEs, NULL);
+ ereport(FATAL,
+ (errmsg("requested timeline %u is not a child of this server's history",
+ recoveryTargetTLI),
+ errdetail("Latest checkpoint is at %X/%X on timeline %u, but in the history of the requested timeline, the server forked off from that timeline at %X/%X.",
+ (uint32) (ControlFile->checkPoint >> 32),
+ (uint32) ControlFile->checkPoint,
+ ControlFile->checkPointCopy.ThisTimeLineID,
+ (uint32) (switchpoint >> 32),
+ (uint32) switchpoint)));
+ }
+
+ /*
+ * The min recovery point should be part of the requested timeline's
+ * history, too.
+ */
+ if (!XLogRecPtrIsInvalid(ControlFile->minRecoveryPoint) &&
+ tliOfPointInHistory(ControlFile->minRecoveryPoint - 1, expectedTLEs) !=
+ ControlFile->minRecoveryPointTLI)
+ ereport(FATAL,
+ (errmsg("requested timeline %u does not contain minimum recovery point %X/%X on timeline %u",
+ recoveryTargetTLI,
+ (uint32) (ControlFile->minRecoveryPoint >> 32),
+ (uint32) ControlFile->minRecoveryPoint,
+ ControlFile->minRecoveryPointTLI)));
+
LastRec = RecPtr = checkPointLoc;
ereport(DEBUG1,
(errmsg("redo record is at %X/%X; shutdown %s",
- (uint32) (checkPoint.redo >> 32), (uint32) checkPoint.redo,
+ (uint32) (checkPoint.redo >> 32), (uint32) checkPoint.redo,
wasShutdown ? "TRUE" : "FALSE")));
ereport(DEBUG1,
(errmsg("next transaction ID: %u/%u; next OID: %u",
ereport(DEBUG1,
(errmsg("oldest unfrozen transaction ID: %u, in database %u",
checkPoint.oldestXid, checkPoint.oldestXidDB)));
+ ereport(DEBUG1,
+ (errmsg("oldest MultiXactId: %u, in database %u",
+ checkPoint.oldestMulti, checkPoint.oldestMultiDB)));
if (!TransactionIdIsNormal(checkPoint.nextXid))
ereport(PANIC,
(errmsg("invalid next transaction ID")));
ShmemVariableCache->oidCount = 0;
MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+ SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
XLogCtl->ckptXid = checkPoint.nextXid;
+ /*
+ * Initialize unlogged LSN. On a clean shutdown, it's restored from the
+ * control file. On recovery, all unlogged relations are blown away, so
+ * the unlogged LSN counter can be reset too.
+ */
+ if (ControlFile->state == DB_SHUTDOWNED)
+ XLogCtl->unloggedLSN = ControlFile->unloggedLSN;
+ else
+ XLogCtl->unloggedLSN = 1;
+
/*
* We must replay WAL entries using the same TimeLineID they were created
* under, so temporarily adopt the TLI indicated by the checkpoint (see
*/
ThisTimeLineID = checkPoint.ThisTimeLineID;
+ /*
+ * Copy any missing timeline history files between 'now' and the recovery
+ * target timeline from archive to pg_xlog. While we don't need those
+ * files ourselves - the history file of the recovery target timeline
+ * covers all the previous timelines in the history too - a cascading
+ * standby server might be interested in them. Or, if you archive the WAL
+ * from this server to a different archive than the master, it'd be good
+ * for all the history files to get archived there after failover, so that
+ * you can use one of the old timelines as a PITR target. Timeline history
+ * files are small, so it's better to copy them unnecessarily than not
+ * copy them and regret later.
+ */
+ restoreTimeLineHistoryFiles(ThisTimeLineID, recoveryTargetTLI);
+
lastFullPageWrites = checkPoint.fullPageWrites;
- RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
+ RedoRecPtr = XLogCtl->RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
- if (XLByteLT(RecPtr, checkPoint.redo))
+ if (RecPtr < checkPoint.redo)
ereport(PANIC,
(errmsg("invalid redo in checkpoint record")));
* have been a clean shutdown and we did not have a recovery.conf file,
* then assume no recovery needed.
*/
- if (XLByteLT(checkPoint.redo, RecPtr))
+ if (checkPoint.redo < RecPtr)
{
if (wasShutdown)
ereport(PANIC,
}
else if (ControlFile->state != DB_SHUTDOWNED)
InRecovery = true;
- else if (InArchiveRecovery)
+ else if (ArchiveRecoveryRequested)
{
/* force recovery due to presence of recovery.conf */
InRecovery = true;
ereport(LOG,
(errmsg("database system was not properly shut down; "
"automatic recovery in progress")));
+ if (recoveryTargetTLI > ControlFile->checkPointCopy.ThisTimeLineID)
+ ereport(LOG,
+ (errmsg("crash recovery starts in timeline %u "
+ "and has target timeline %u",
+ ControlFile->checkPointCopy.ThisTimeLineID,
+ recoveryTargetTLI)));
ControlFile->state = DB_IN_CRASH_RECOVERY;
}
ControlFile->prevCheckPoint = ControlFile->checkPoint;
if (InArchiveRecovery)
{
/* initialize minRecoveryPoint if not set yet */
- if (XLByteLT(ControlFile->minRecoveryPoint, checkPoint.redo))
+ if (ControlFile->minRecoveryPoint < checkPoint.redo)
{
ControlFile->minRecoveryPoint = checkPoint.redo;
ControlFile->minRecoveryPointTLI = checkPoint.ThisTimeLineID;
* control file and we've established a recovery snapshot from a
* running-xacts WAL record.
*/
- if (InArchiveRecovery && EnableHotStandby)
+ if (ArchiveRecoveryRequested && EnableHotStandby)
{
TransactionId *xids;
int nxids;
oldestActiveXID = checkPoint.oldestActiveXid;
Assert(TransactionIdIsValid(oldestActiveXID));
+ /* Tell procarray about the range of xids it has to deal with */
+ ProcArrayInitRecovery(ShmemVariableCache->nextXid);
+
/*
* Startup commit log and subtrans only. Other SLRUs are not
* maintained during recovery and need not be started yet.
* recoveryLastXTime.
*
* This is slightly confusing if we're starting from an online
- * checkpoint; we've just read and replayed the chekpoint record, but
+ * checkpoint; we've just read and replayed the checkpoint record, but
* we're going to start replay from its redo pointer, which precedes
* the location of the checkpoint record itself. So even though the
* last record we've replayed is indeed ReadRecPtr, we haven't
xlogctl->replayEndRecPtr = ReadRecPtr;
xlogctl->replayEndTLI = ThisTimeLineID;
xlogctl->lastReplayedEndRecPtr = EndRecPtr;
- xlogctl->lastReplayedEndRecPtr = ThisTimeLineID;
+ xlogctl->lastReplayedTLI = ThisTimeLineID;
xlogctl->recoveryLastXTime = 0;
xlogctl->currentChunkStartTime = 0;
xlogctl->recoveryPause = false;
* process in addition to postmaster! Also, fsync requests are
* subsequently to be handled by the checkpointer, not locally.
*/
- if (InArchiveRecovery && IsUnderPostmaster)
+ if (ArchiveRecoveryRequested && IsUnderPostmaster)
{
PublishStartupProcessInformation();
SetForwardFsyncRequests();
* Find the first record that logically follows the checkpoint --- it
* might physically precede it, though.
*/
- if (XLByteLT(checkPoint.redo, RecPtr))
+ if (checkPoint.redo < RecPtr)
{
/* back up to find the record */
- record = ReadRecord(&(checkPoint.redo), PANIC, false);
+ record = ReadRecord(xlogreader, checkPoint.redo, PANIC, false);
}
else
{
/* just have to read next record after CheckPoint */
- record = ReadRecord(NULL, LOG, false);
+ record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
}
if (record != NULL)
ereport(LOG,
(errmsg("redo starts at %X/%X",
- (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr)));
+ (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr)));
/*
* main redo apply loop
*/
do
{
- bool switchedTLI = false;
+ bool switchedTLI = false;
+
#ifdef WAL_DEBUG
if (XLOG_DEBUG ||
(rmid == RM_XACT_ID && trace_recovery_messages <= DEBUG2) ||
initStringInfo(&buf);
appendStringInfo(&buf, "REDO @ %X/%X; LSN %X/%X: ",
- (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr,
- (uint32) (EndRecPtr >> 32), (uint32) EndRecPtr);
+ (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr,
+ (uint32) (EndRecPtr >> 32), (uint32) EndRecPtr);
xlog_outrec(&buf, record);
appendStringInfo(&buf, " - ");
RmgrTable[record->xl_rmid].rm_desc(&buf,
}
/*
- * Before replaying this record, check if it is a shutdown
- * checkpoint record that causes the current timeline to
- * change. The checkpoint record is already considered to be
- * part of the new timeline, so we update ThisTimeLineID
- * before replaying it. That's important so that replayEndTLI,
- * which is recorded as the minimum recovery point's TLI if
- * recovery stops after this record, is set correctly.
+ * Before replaying this record, check if this record causes
+ * the current timeline to change. The record is already
+ * considered to be part of the new timeline, so we update
+ * ThisTimeLineID before replaying it. That's important so
+ * that replayEndTLI, which is recorded as the minimum
+ * recovery point's TLI if recovery stops after this record,
+ * is set correctly.
*/
- if (record->xl_rmid == RM_XLOG_ID &&
- (record->xl_info & ~XLR_INFO_MASK) == XLOG_CHECKPOINT_SHUTDOWN)
+ if (record->xl_rmid == RM_XLOG_ID)
{
- CheckPoint checkPoint;
- TimeLineID newTLI;
+ TimeLineID newTLI = ThisTimeLineID;
+ TimeLineID prevTLI = ThisTimeLineID;
+ uint8 info = record->xl_info & ~XLR_INFO_MASK;
- memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
- newTLI = checkPoint.ThisTimeLineID;
+ if (info == XLOG_CHECKPOINT_SHUTDOWN)
+ {
+ CheckPoint checkPoint;
+
+ memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
+ newTLI = checkPoint.ThisTimeLineID;
+ prevTLI = checkPoint.PrevTimeLineID;
+ }
+ else if (info == XLOG_END_OF_RECOVERY)
+ {
+ xl_end_of_recovery xlrec;
+
+ memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_end_of_recovery));
+ newTLI = xlrec.ThisTimeLineID;
+ prevTLI = xlrec.PrevTimeLineID;
+ }
if (newTLI != ThisTimeLineID)
{
/* Check that it's OK to switch to this TLI */
- checkTimeLineSwitch(EndRecPtr, newTLI);
+ checkTimeLineSwitch(EndRecPtr, newTLI, prevTLI);
/* Following WAL records should be run with new TLI */
ThisTimeLineID = newTLI;
break;
/* Else, try to fetch the next WAL record */
- record = ReadRecord(NULL, LOG, false);
+ record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
} while (record != NULL);
/*
ereport(LOG,
(errmsg("redo done at %X/%X",
- (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr)));
+ (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr)));
xtime = GetLatestXTime();
if (xtime)
ereport(LOG,
* We don't need the latch anymore. It's not strictly necessary to disown
* it, but let's do it for the sake of tidiness.
*/
- if (StandbyMode)
+ if (StandbyModeRequested)
DisownLatch(&XLogCtl->recoveryWakeupLatch);
/*
* Re-fetch the last valid or last applied record, so we can identify the
* exact endpoint of what we consider the valid portion of WAL.
*/
- record = ReadRecord(&LastRec, PANIC, false);
+ record = ReadRecord(xlogreader, LastRec, PANIC, false);
EndOfLog = EndRecPtr;
XLByteToPrevSeg(EndOfLog, endLogSegNo);
* advanced beyond the WAL we processed.
*/
if (InRecovery &&
- (XLByteLT(EndOfLog, minRecoveryPoint) ||
+ (EndOfLog < minRecoveryPoint ||
!XLogRecPtrIsInvalid(ControlFile->backupStartPoint)))
{
if (reachedStopPoint)
* crashes while an online backup is in progress. We must not treat
* that as an error, or the database will refuse to start up.
*/
- if (InArchiveRecovery || ControlFile->backupEndRequired)
+ if (ArchiveRecoveryRequested || ControlFile->backupEndRequired)
{
if (ControlFile->backupEndRequired)
ereport(FATAL,
*
* In a normal crash recovery, we can just extend the timeline we were in.
*/
- if (InArchiveRecovery)
+ PrevTimeLineID = ThisTimeLineID;
+ if (ArchiveRecoveryRequested)
{
- char reason[200];
+ char reason[200];
+
+ Assert(InArchiveRecovery);
ThisTimeLineID = findNewestTimeLine(recoveryTargetTLI) + 1;
ereport(LOG,
/* Save the selected TimeLineID in shared memory, too */
XLogCtl->ThisTimeLineID = ThisTimeLineID;
+ XLogCtl->PrevTimeLineID = PrevTimeLineID;
/*
* We are now done reading the old WAL. Turn off archive fetching if it
* that we also have a copy of the last block of the old WAL in readBuf;
* we will use that below.)
*/
- if (InArchiveRecovery)
- exitArchiveRecovery(curFileTLI, endLogSegNo);
+ if (ArchiveRecoveryRequested)
+ exitArchiveRecovery(xlogreader->readPageTLI, endLogSegNo);
/*
* Prepare to write WAL starting at EndOfLog position, and init xlog
openLogFile = XLogFileOpen(openLogSegNo);
openLogOff = 0;
Insert = &XLogCtl->Insert;
- Insert->PrevRecord = LastRec;
- XLogCtl->xlblocks[0] = ((EndOfLog - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ;
+ Insert->PrevBytePos = XLogRecPtrToBytePos(LastRec);
+ Insert->CurrBytePos = XLogRecPtrToBytePos(EndOfLog);
/*
* Tricky point here: readBuf contains the *last* block that the LastRec
* record spans, not the one it starts in. The last block is indeed the
* one we want to use.
*/
- Assert(readOff == (XLogCtl->xlblocks[0] - XLOG_BLCKSZ) % XLogSegSize);
- memcpy((char *) Insert->currpage, readBuf, XLOG_BLCKSZ);
- Insert->currpos = (char *) Insert->currpage +
- (EndOfLog + XLOG_BLCKSZ - XLogCtl->xlblocks[0]);
+ if (EndOfLog % XLOG_BLCKSZ != 0)
+ {
+ char *page;
+ int len;
+ int firstIdx;
+ XLogRecPtr pageBeginPtr;
- LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
+ pageBeginPtr = EndOfLog - (EndOfLog % XLOG_BLCKSZ);
+ Assert(readOff == pageBeginPtr % XLogSegSize);
- XLogCtl->LogwrtResult = LogwrtResult;
+ firstIdx = XLogRecPtrToBufIdx(EndOfLog);
- XLogCtl->LogwrtRqst.Write = EndOfLog;
- XLogCtl->LogwrtRqst.Flush = EndOfLog;
+ /* Copy the valid part of the last block, and zero the rest */
+ page = &XLogCtl->pages[firstIdx * XLOG_BLCKSZ];
+ len = EndOfLog % XLOG_BLCKSZ;
+ memcpy(page, xlogreader->readBuf, len);
+ memset(page + len, 0, XLOG_BLCKSZ - len);
- freespace = INSERT_FREESPACE(Insert);
- if (freespace > 0)
- {
- /* Make sure rest of page is zero */
- MemSet(Insert->currpos, 0, freespace);
- XLogCtl->Write.curridx = 0;
+ XLogCtl->xlblocks[firstIdx] = pageBeginPtr + XLOG_BLCKSZ;
+ XLogCtl->InitializedUpTo = pageBeginPtr + XLOG_BLCKSZ;
}
else
{
/*
- * Whenever LogwrtResult points to exactly the end of a page,
- * Write.curridx must point to the *next* page (see XLogWrite()).
- *
- * Note: it might seem we should do AdvanceXLInsertBuffer() here, but
- * this is sufficient. The first actual attempt to insert a log
- * record will advance the insert state.
+ * There is no partial block to copy. Just set InitializedUpTo,
+ * and let the first attempt to insert a log record to initialize
+ * the next buffer.
*/
- XLogCtl->Write.curridx = NextBufIdx(0);
+ XLogCtl->InitializedUpTo = EndOfLog;
}
+ LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
+
+ XLogCtl->LogwrtResult = LogwrtResult;
+
+ XLogCtl->LogwrtRqst.Write = EndOfLog;
+ XLogCtl->LogwrtRqst.Flush = EndOfLog;
+
/* Pre-scan prepared transactions to find out the range of XIDs present */
oldestActiveXID = PrescanPreparedTransactions(NULL, NULL);
* assigning a new TLI, using a shutdown checkpoint allows us to have
* the rule that TLI only changes in shutdown checkpoints, which
* allows some extra error checking in xlog_redo.
+ *
+ * In fast promotion, only create a lightweight end-of-recovery record
+ * instead of a full checkpoint. A checkpoint is requested later,
+ * after we're fully out of recovery mode and already accepting
+ * queries.
*/
if (bgwriterLaunched)
- RequestCheckpoint(CHECKPOINT_END_OF_RECOVERY |
- CHECKPOINT_IMMEDIATE |
- CHECKPOINT_WAIT);
+ {
+ if (fast_promote)
+ {
+ checkPointLoc = ControlFile->prevCheckPoint;
+
+ /*
+ * Confirm the last checkpoint is available for us to recover
+ * from if we fail. Note that we don't check for the secondary
+ * checkpoint since that isn't available in most base backups.
+ */
+ record = ReadCheckpointRecord(xlogreader, checkPointLoc, 1, false);
+ if (record != NULL)
+ {
+ fast_promoted = true;
+
+ /*
+ * Insert a special WAL record to mark the end of
+ * recovery, since we aren't doing a checkpoint. That
+ * means that the checkpointer process may likely be in
+ * the middle of a time-smoothed restartpoint and could
+ * continue to be for minutes after this. That sounds
+ * strange, but the effect is roughly the same and it
+ * would be stranger to try to come out of the
+ * restartpoint and then checkpoint. We request a
+ * checkpoint later anyway, just for safety.
+ */
+ CreateEndOfRecoveryRecord();
+ }
+ }
+
+ if (!fast_promoted)
+ RequestCheckpoint(CHECKPOINT_END_OF_RECOVERY |
+ CHECKPOINT_IMMEDIATE |
+ CHECKPOINT_WAIT);
+ }
else
CreateCheckPoint(CHECKPOINT_END_OF_RECOVERY | CHECKPOINT_IMMEDIATE);
LWLockRelease(ControlFileLock);
/* start the archive_timeout timer running */
- XLogCtl->Write.lastSegSwitchTime = (pg_time_t) time(NULL);
+ XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);
/* also initialize latestCompletedXid, to nextXid - 1 */
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
if (standbyState != STANDBY_DISABLED)
ShutdownRecoveryTransactionEnvironment();
- /* Shut down readFile facility, free space */
+ /* Shut down xlogreader */
if (readFile >= 0)
{
close(readFile);
readFile = -1;
}
- if (readBuf)
- {
- free(readBuf);
- readBuf = NULL;
- }
- if (readRecordBuf)
- {
- free(readRecordBuf);
- readRecordBuf = NULL;
- readRecordBufSize = 0;
- }
+ XLogReaderFree(xlogreader);
/*
* If any of the critical GUCs have changed, log them before we allow
}
/*
- * If there were cascading standby servers connected to us, nudge any
- * wal sender processes to notice that we've been promoted.
+ * If there were cascading standby servers connected to us, nudge any wal
+ * sender processes to notice that we've been promoted.
*/
WalSndWakeup();
+
+ /*
+ * If this was a fast promotion, request an (online) checkpoint now. This
+ * isn't required for consistency, but the last restartpoint might be far
+ * back, and in case of a crash, recovering from it might take a longer
+ * than is appropriate now that we're not in standby mode anymore.
+ */
+ if (fast_promoted)
+ RequestCheckpoint(CHECKPOINT_FORCE);
}
/*
* Have we reached the point where our base backup was completed?
*/
if (!XLogRecPtrIsInvalid(ControlFile->backupEndPoint) &&
- XLByteLE(ControlFile->backupEndPoint, EndRecPtr))
+ ControlFile->backupEndPoint <= EndRecPtr)
{
/*
* We have reached the end of base backup, as indicated by pg_control.
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
- if (XLByteLT(ControlFile->minRecoveryPoint, EndRecPtr))
+ if (ControlFile->minRecoveryPoint < EndRecPtr)
ControlFile->minRecoveryPoint = EndRecPtr;
- MemSet(&ControlFile->backupStartPoint, 0, sizeof(XLogRecPtr));
- MemSet(&ControlFile->backupEndPoint, 0, sizeof(XLogRecPtr));
+ ControlFile->backupStartPoint = InvalidXLogRecPtr;
+ ControlFile->backupEndPoint = InvalidXLogRecPtr;
ControlFile->backupEndRequired = false;
UpdateControlFile();
}
/*
- * Have we passed our safe starting point? Note that minRecoveryPoint
- * is known to be incorrectly set if ControlFile->backupEndRequired,
- * until the XLOG_BACKUP_RECORD arrives to advise us of the correct
+ * Have we passed our safe starting point? Note that minRecoveryPoint is
+ * known to be incorrectly set if ControlFile->backupEndRequired, until
+ * the XLOG_BACKUP_RECORD arrives to advise us of the correct
* minRecoveryPoint. All we know prior to that is that we're not
* consistent yet.
*/
if (!reachedConsistency && !ControlFile->backupEndRequired &&
- XLByteLE(minRecoveryPoint, XLogCtl->lastReplayedEndRecPtr) &&
+ minRecoveryPoint <= XLogCtl->lastReplayedEndRecPtr &&
XLogRecPtrIsInvalid(ControlFile->backupStartPoint))
{
/*
* 1 for "primary", 2 for "secondary", 0 for "other" (backup_label)
*/
static XLogRecord *
-ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
+ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
+ int whichChkpt, bool report)
{
XLogRecord *record;
if (!XRecOffIsValid(RecPtr))
{
+ if (!report)
+ return NULL;
+
switch (whichChkpt)
{
case 1:
return NULL;
}
- record = ReadRecord(&RecPtr, LOG, true);
+ record = ReadRecord(xlogreader, RecPtr, LOG, true);
if (record == NULL)
{
+ if (!report)
+ return NULL;
+
switch (whichChkpt)
{
case 1:
}
/*
- * Once spawned, a backend may update its local RedoRecPtr from
- * XLogCtl->Insert.RedoRecPtr; it must hold the insert lock or info_lck
- * to do so. This is done in XLogInsert() or GetRedoRecPtr().
+ * Return the current Redo pointer from shared memory.
+ *
+ * As a side-effect, the local RedoRecPtr copy is updated.
*/
XLogRecPtr
GetRedoRecPtr(void)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
+ XLogRecPtr ptr;
+ /*
+ * The possibly not up-to-date copy in XlogCtl is enough. Even if we
+ * grabbed a WAL insertion slot to read the master copy, someone might
+ * update it just after we've released the lock.
+ */
SpinLockAcquire(&xlogctl->info_lck);
- Assert(XLByteLE(RedoRecPtr, xlogctl->Insert.RedoRecPtr));
- RedoRecPtr = xlogctl->Insert.RedoRecPtr;
+ ptr = xlogctl->RedoRecPtr;
SpinLockRelease(&xlogctl->info_lck);
+ if (RedoRecPtr < ptr)
+ RedoRecPtr = ptr;
+
return RedoRecPtr;
}
*
* NOTE: The value *actually* returned is the position of the last full
* xlog page. It lags behind the real insert position by at most 1 page.
- * For that, we don't need to acquire WALInsertLock which can be quite
- * heavily contended, and an approximation is enough for the current
- * usage of this function.
+ * For that, we don't need to scan through WAL insertion slots, and an
+ * approximation is enough for the current usage of this function.
*/
XLogRecPtr
GetInsertRecPtr(void)
/* Need WALWriteLock, but shared lock is sufficient */
LWLockAcquire(WALWriteLock, LW_SHARED);
- result = XLogCtl->Write.lastSegSwitchTime;
+ result = XLogCtl->lastSegSwitchTime;
LWLockRelease(WALWriteLock);
return result;
void
ShutdownXLOG(int code, Datum arg)
{
- ereport(LOG,
+ /* Don't be chatty in standalone mode */
+ ereport(IsPostmasterEnvironment ? LOG : NOTICE,
(errmsg("shutting down")));
if (RecoveryInProgress())
ShutdownSUBTRANS();
ShutdownMultiXact();
- ereport(LOG,
+ /* Don't be chatty in standalone mode */
+ ereport(IsPostmasterEnvironment ? LOG : NOTICE,
(errmsg("database system is shut down")));
}
void
CreateCheckPoint(int flags)
{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
bool shutdown;
CheckPoint checkPoint;
XLogRecPtr recptr;
XLogRecData rdata;
uint32 freespace;
XLogSegNo _logSegNo;
+ XLogRecPtr curInsert;
VirtualTransactionId *vxids;
- int nvxids;
+ int nvxids;
/*
* An end-of-recovery checkpoint is really a shutdown checkpoint, just
checkPoint.oldestActiveXid = InvalidTransactionId;
/*
- * We must hold WALInsertLock while examining insert state to determine
- * the checkpoint REDO pointer.
+ * We must block concurrent insertions while examining insert state to
+ * determine the checkpoint REDO pointer.
*/
- LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+ WALInsertSlotAcquire(true);
+ curInsert = XLogBytePosToRecPtr(Insert->CurrBytePos);
/*
* If this isn't a shutdown or forced checkpoint, and we have not inserted
if ((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY |
CHECKPOINT_FORCE)) == 0)
{
- XLogRecPtr curInsert;
-
- INSERT_RECPTR(curInsert, Insert, Insert->curridx);
- if (curInsert == ControlFile->checkPoint +
+ if (curInsert == ControlFile->checkPoint +
MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
ControlFile->checkPoint == ControlFile->checkPointCopy.redo)
{
- LWLockRelease(WALInsertLock);
+ WALInsertSlotRelease();
LWLockRelease(CheckpointLock);
END_CRIT_SECTION();
return;
LocalSetXLogInsertAllowed();
checkPoint.ThisTimeLineID = ThisTimeLineID;
+ if (flags & CHECKPOINT_END_OF_RECOVERY)
+ checkPoint.PrevTimeLineID = XLogCtl->PrevTimeLineID;
+ else
+ checkPoint.PrevTimeLineID = ThisTimeLineID;
+
checkPoint.fullPageWrites = Insert->fullPageWrites;
/*
* the buffer flush work. Those XLOG records are logically after the
* checkpoint, even though physically before it. Got that?
*/
- freespace = INSERT_FREESPACE(Insert);
+ freespace = INSERT_FREESPACE(curInsert);
if (freespace == 0)
{
- (void) AdvanceXLInsertBuffer(false);
- /* OK to ignore update return flag, since we will do flush anyway */
- freespace = INSERT_FREESPACE(Insert);
+ if (curInsert % XLogSegSize == 0)
+ curInsert += SizeOfXLogLongPHD;
+ else
+ curInsert += SizeOfXLogShortPHD;
}
- INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx);
+ checkPoint.redo = curInsert;
/*
* Here we update the shared RedoRecPtr for future XLogInsert calls; this
- * must be done while holding the insert lock AND the info_lck.
+ * must be done while holding the insertion slots.
*
* Note: if we fail to complete the checkpoint, RedoRecPtr will be left
* pointing past where it really needs to point. This is okay; the only
* XLogInserts that happen while we are dumping buffers must assume that
* their buffer changes are not included in the checkpoint.
*/
- {
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
-
- SpinLockAcquire(&xlogctl->info_lck);
- RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
- SpinLockRelease(&xlogctl->info_lck);
- }
+ RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
/*
- * Now we can release WAL insert lock, allowing other xacts to proceed
- * while we are flushing disk buffers.
+ * Now we can release the WAL insertion slots, allowing other xacts to
+ * proceed while we are flushing disk buffers.
*/
- LWLockRelease(WALInsertLock);
+ WALInsertSlotRelease();
+
+ /* Update the info_lck-protected copy of RedoRecPtr as well */
+ SpinLockAcquire(&xlogctl->info_lck);
+ xlogctl->RedoRecPtr = checkPoint.redo;
+ SpinLockRelease(&xlogctl->info_lck);
/*
* If enabled, log checkpoint start. We postpone this until now so as not
TRACE_POSTGRESQL_CHECKPOINT_START(flags);
/*
- * In some cases there are groups of actions that must all occur on
- * one side or the other of a checkpoint record. Before flushing the
+ * In some cases there are groups of actions that must all occur on one
+ * side or the other of a checkpoint record. Before flushing the
* checkpoint record we must explicitly wait for any backend currently
* performing those groups of actions.
*
* One example is end of transaction, so we must wait for any transactions
- * that are currently in commit critical sections. If an xact inserted
+ * that are currently in commit critical sections. If an xact inserted
* its commit record into XLOG just before the REDO point, then a crash
* restart from the REDO point would not replay that record, which means
* that our flushing had better include the xact's update of pg_clog. So
* we wait till he's out of his commit critical section before proceeding.
* See notes in RecordTransactionCommit().
*
- * Because we've already released WALInsertLock, this test is a bit fuzzy:
- * it is possible that we will wait for xacts we didn't really need to
- * wait for. But the delay should be short and it seems better to make
- * checkpoint take a bit longer than to hold locks longer than necessary.
+ * Because we've already released the insertion slots, this test is a bit
+ * fuzzy: it is possible that we will wait for xacts we didn't really need
+ * to wait for. But the delay should be short and it seems better to make
+ * checkpoint take a bit longer than to hold off insertions longer than
+ * necessary.
* (In fact, the whole reason we have this issue is that xact.c does
* commit record XLOG insertion and clog update as two separate steps
* protected by different locks, but again that seems best on grounds of
vxids = GetVirtualXIDsDelayingChkpt(&nvxids);
if (nvxids > 0)
{
- uint32 nwaits = 0;
-
do
{
pg_usleep(10000L); /* wait for 10 msec */
- nwaits++;
} while (HaveVirtualXIDsDelayingChkpt(vxids, nvxids));
}
pfree(vxids);
MultiXactGetCheckptMulti(shutdown,
&checkPoint.nextMulti,
- &checkPoint.nextMultiOffset);
+ &checkPoint.nextMultiOffset,
+ &checkPoint.oldestMulti,
+ &checkPoint.oldestMultiDB);
/*
* Having constructed the checkpoint record, ensure all shmem disk buffers
* We now have ProcLastRecPtr = start of actual checkpoint record, recptr
* = end of actual checkpoint record.
*/
- if (shutdown && !XLByteEQ(checkPoint.redo, ProcLastRecPtr))
+ if (shutdown && checkPoint.redo != ProcLastRecPtr)
ereport(PANIC,
(errmsg("concurrent transaction log activity while database system is shutting down")));
ControlFile->checkPointCopy = checkPoint;
ControlFile->time = (pg_time_t) time(NULL);
/* crash recovery should always recover to the end of WAL */
- MemSet(&ControlFile->minRecoveryPoint, 0, sizeof(XLogRecPtr));
+ ControlFile->minRecoveryPoint = InvalidXLogRecPtr;
ControlFile->minRecoveryPointTLI = 0;
+
+ /*
+ * Persist unloggedLSN value. It's reset on crash recovery, so this goes
+ * unused on non-shutdown checkpoints, but seems useful to store it always
+ * for debugging purposes.
+ */
+ SpinLockAcquire(&XLogCtl->ulsn_lck);
+ ControlFile->unloggedLSN = XLogCtl->unloggedLSN;
+ SpinLockRelease(&XLogCtl->ulsn_lck);
+
UpdateControlFile();
LWLockRelease(ControlFileLock);
LWLockRelease(CheckpointLock);
}
+/*
+ * Mark the end of recovery in WAL though without running a full checkpoint.
+ * We can expect that a restartpoint is likely to be in progress as we
+ * do this, though we are unwilling to wait for it to complete. So be
+ * careful to avoid taking the CheckpointLock anywhere here.
+ *
+ * CreateRestartPoint() allows for the case where recovery may end before
+ * the restartpoint completes so there is no concern of concurrent behaviour.
+ */
+void
+CreateEndOfRecoveryRecord(void)
+{
+ xl_end_of_recovery xlrec;
+ XLogRecData rdata;
+ XLogRecPtr recptr;
+
+ /* sanity check */
+ if (!RecoveryInProgress())
+ elog(ERROR, "can only be used to end recovery");
+
+ xlrec.end_time = time(NULL);
+
+ WALInsertSlotAcquire(true);
+ xlrec.ThisTimeLineID = ThisTimeLineID;
+ xlrec.PrevTimeLineID = XLogCtl->PrevTimeLineID;
+ WALInsertSlotRelease();
+
+ LocalSetXLogInsertAllowed();
+
+ START_CRIT_SECTION();
+
+ rdata.data = (char *) &xlrec;
+ rdata.len = sizeof(xl_end_of_recovery);
+ rdata.buffer = InvalidBuffer;
+ rdata.next = NULL;
+
+ recptr = XLogInsert(RM_XLOG_ID, XLOG_END_OF_RECOVERY, &rdata);
+
+ XLogFlush(recptr);
+
+ /*
+ * Update the control file so that crash recovery can follow the timeline
+ * changes to this point.
+ */
+ LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+ ControlFile->time = (pg_time_t) xlrec.end_time;
+ ControlFile->minRecoveryPoint = recptr;
+ ControlFile->minRecoveryPointTLI = ThisTimeLineID;
+ UpdateControlFile();
+ LWLockRelease(ControlFileLock);
+
+ END_CRIT_SECTION();
+
+ LocalXLogInsertAllowed = -1; /* return to "check" state */
+}
+
/*
* Flush all data in shared memory to disk, and fsync
*
* side-effect.
*/
if (XLogRecPtrIsInvalid(lastCheckPointRecPtr) ||
- XLByteLE(lastCheckPoint.redo, ControlFile->checkPointCopy.redo))
+ lastCheckPoint.redo <= ControlFile->checkPointCopy.redo)
{
ereport(DEBUG2,
(errmsg("skipping restartpoint, already performed at %X/%X",
- (uint32) (lastCheckPoint.redo >> 32), (uint32) lastCheckPoint.redo)));
+ (uint32) (lastCheckPoint.redo >> 32),
+ (uint32) lastCheckPoint.redo)));
UpdateMinRecoveryPoint(InvalidXLogRecPtr, true);
if (flags & CHECKPOINT_IS_SHUTDOWN)
* the number of segments replayed since last restartpoint, and request a
* restartpoint if it exceeds checkpoint_segments.
*
- * You need to hold WALInsertLock and info_lck to update it, although
- * during recovery acquiring WALInsertLock is just pro forma, because
- * there is no other processes updating Insert.RedoRecPtr.
+ * Like in CreateCheckPoint(), hold off insertions to update it, although
+ * during recovery this is just pro forma, because no WAL insertions are
+ * happening.
*/
- LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
- SpinLockAcquire(&xlogctl->info_lck);
+ WALInsertSlotAcquire(true);
xlogctl->Insert.RedoRecPtr = lastCheckPoint.redo;
+ WALInsertSlotRelease();
+
+ /* Also update the info_lck-protected copy */
+ SpinLockAcquire(&xlogctl->info_lck);
+ xlogctl->RedoRecPtr = lastCheckPoint.redo;
SpinLockRelease(&xlogctl->info_lck);
- LWLockRelease(WALInsertLock);
/*
* Prepare to accumulate statistics.
*/
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
if (ControlFile->state == DB_IN_ARCHIVE_RECOVERY &&
- XLByteLT(ControlFile->checkPointCopy.redo, lastCheckPoint.redo))
+ ControlFile->checkPointCopy.redo < lastCheckPoint.redo)
{
ControlFile->prevCheckPoint = ControlFile->checkPoint;
ControlFile->checkPoint = lastCheckPointRecPtr;
{
XLogRecPtr receivePtr;
XLogRecPtr replayPtr;
+ TimeLineID replayTLI;
XLogRecPtr endptr;
/*
- * Get the current end of xlog replayed or received, whichever is later.
+ * Get the current end of xlog replayed or received, whichever is
+ * later.
*/
receivePtr = GetWalRcvWriteRecPtr(NULL, NULL);
- replayPtr = GetXLogReplayRecPtr(NULL);
+ replayPtr = GetXLogReplayRecPtr(&replayTLI);
endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
KeepLogSeg(endptr, &_logSegNo);
_logSegNo--;
/*
- * Update ThisTimeLineID to the timeline we're currently replaying,
- * so that we install any recycled segments on that timeline.
+ * Try to recycle segments on a useful timeline. If we've been promoted
+ * since the beginning of this restartpoint, use the new timeline
+ * chosen at end of recovery (RecoveryInProgress() sets ThisTimeLineID
+ * in that case). If we're still in recovery, use the timeline we're
+ * currently replaying.
*
* There is no guarantee that the WAL segments will be useful on the
* current timeline; if recovery proceeds to a new timeline right
* not be used, and will go wasted until recycled on the next
* restartpoint. We'll live with that.
*/
- (void) GetXLogReplayRecPtr(&ThisTimeLineID);
+ if (RecoveryInProgress())
+ ThisTimeLineID = replayTLI;
RemoveOldXlogFiles(_logSegNo, endptr);
* segments, since that may supply some of the needed files.)
*/
PreallocXlogFiles(endptr);
+
+ /*
+ * ThisTimeLineID is normally not set when we're still in recovery.
+ * However, recycling/preallocating segments above needed
+ * ThisTimeLineID to determine which timeline to install the segments
+ * on. Reset it now, to restore the normal state of affairs for
+ * debugging purposes.
+ */
+ if (RecoveryInProgress())
+ ThisTimeLineID = 0;
}
/*
xtime = GetLatestXTime();
ereport((log_checkpoints ? LOG : DEBUG2),
(errmsg("recovery restart point at %X/%X",
- (uint32) (lastCheckPoint.redo >> 32), (uint32) lastCheckPoint.redo),
+ (uint32) (lastCheckPoint.redo >> 32), (uint32) lastCheckPoint.redo),
xtime ? errdetail("last completed transaction was at log time %s",
timestamptz_to_str(xtime)) : 0));
}
/*
- * Calculate the last segment that we need to retain because of
- * wal_keep_segments, by subtracting wal_keep_segments from
- * the given xlog location, recptr.
+ * Retreat *logSegNo to the last segment that we need to retain because of
+ * wal_keep_segments. This is calculated by subtracting wal_keep_segments
+ * from the given xlog location, recptr.
*/
static void
KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
if (segno <= wal_keep_segments)
segno = 1;
else
- segno = *logSegNo - wal_keep_segments;
+ segno = segno - wal_keep_segments;
/* don't delete WAL segments newer than the calculated segment */
if (segno < *logSegNo)
return RecPtr;
}
+/*
+ * Write a backup block if needed when we are setting a hint. Note that
+ * this may be called for a variety of page types, not just heaps.
+ *
+ * Callable while holding just share lock on the buffer content.
+ *
+ * We can't use the plain backup block mechanism since that relies on the
+ * Buffer being exclusively locked. Since some modifications (setting LSN, hint
+ * bits) are allowed in a sharelocked buffer that can lead to wal checksum
+ * failures. So instead we copy the page and insert the copied data as normal
+ * record data.
+ *
+ * We only need to do something if page has not yet been full page written in
+ * this checkpoint round. The LSN of the inserted wal record is returned if we
+ * had to write, InvalidXLogRecPtr otherwise.
+ *
+ * It is possible that multiple concurrent backends could attempt to write WAL
+ * records. In that case, multiple copies of the same block would be recorded
+ * in separate WAL records by different backends, though that is still OK from
+ * a correctness perspective.
+ */
+XLogRecPtr
+XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
+{
+ XLogRecPtr recptr = InvalidXLogRecPtr;
+ XLogRecPtr lsn;
+ XLogRecData rdata[2];
+ BkpBlock bkpb;
+
+ /*
+ * Ensure no checkpoint can change our view of RedoRecPtr.
+ */
+ Assert(MyPgXact->delayChkpt);
+
+ /*
+ * Update RedoRecPtr so XLogCheckBuffer can make the right decision
+ */
+ GetRedoRecPtr();
+
+ /*
+ * Setup phony rdata element for use within XLogCheckBuffer only. We reuse
+ * and reset rdata for any actual WAL record insert.
+ */
+ rdata[0].buffer = buffer;
+ rdata[0].buffer_std = buffer_std;
+
+ /*
+ * Check buffer while not holding an exclusive lock.
+ */
+ if (XLogCheckBuffer(rdata, false, &lsn, &bkpb))
+ {
+ char copied_buffer[BLCKSZ];
+ char *origdata = (char *) BufferGetBlock(buffer);
+
+ /*
+ * Copy buffer so we don't have to worry about concurrent hint bit or
+ * lsn updates. We assume pd_lower/upper cannot be changed without an
+ * exclusive lock, so the contents bkp are not racy.
+ *
+ * With buffer_std set to false, XLogCheckBuffer() sets hole_length and
+ * hole_offset to 0; so the following code is safe for either case.
+ */
+ memcpy(copied_buffer, origdata, bkpb.hole_offset);
+ memcpy(copied_buffer + bkpb.hole_offset,
+ origdata + bkpb.hole_offset + bkpb.hole_length,
+ BLCKSZ - bkpb.hole_offset - bkpb.hole_length);
+
+ /*
+ * Header for backup block.
+ */
+ rdata[0].data = (char *) &bkpb;
+ rdata[0].len = sizeof(BkpBlock);
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].next = &(rdata[1]);
+
+ /*
+ * Save copy of the buffer.
+ */
+ rdata[1].data = copied_buffer;
+ rdata[1].len = BLCKSZ - bkpb.hole_length;
+ rdata[1].buffer = InvalidBuffer;
+ rdata[1].next = NULL;
+
+ recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI, rdata);
+ }
+
+ return recptr;
+}
+
/*
* Check if any of the GUC parameters that are critical for hot standby
* have changed, and update the value in pg_control file if necessary.
{
if (wal_level != ControlFile->wal_level ||
MaxConnections != ControlFile->MaxConnections ||
+ max_worker_processes != ControlFile->max_worker_processes ||
max_prepared_xacts != ControlFile->max_prepared_xacts ||
max_locks_per_xact != ControlFile->max_locks_per_xact)
{
xl_parameter_change xlrec;
xlrec.MaxConnections = MaxConnections;
+ xlrec.max_worker_processes = max_worker_processes;
xlrec.max_prepared_xacts = max_prepared_xacts;
xlrec.max_locks_per_xact = max_locks_per_xact;
xlrec.wal_level = wal_level;
}
ControlFile->MaxConnections = MaxConnections;
+ ControlFile->max_worker_processes = max_worker_processes;
ControlFile->max_prepared_xacts = max_prepared_xacts;
ControlFile->max_locks_per_xact = max_locks_per_xact;
ControlFile->wal_level = wal_level;
*/
if (fullPageWrites)
{
- LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+ WALInsertSlotAcquire(true);
Insert->fullPageWrites = true;
- LWLockRelease(WALInsertLock);
+ WALInsertSlotRelease();
}
/*
if (!fullPageWrites)
{
- LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+ WALInsertSlotAcquire(true);
Insert->fullPageWrites = false;
- LWLockRelease(WALInsertLock);
+ WALInsertSlotRelease();
}
END_CRIT_SECTION();
}
* replay. (Currently, timeline can only change at a shutdown checkpoint).
*/
static void
-checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI)
+checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI, TimeLineID prevTLI)
{
+ /* Check that the record agrees on what the current (old) timeline is */
+ if (prevTLI != ThisTimeLineID)
+ ereport(PANIC,
+ (errmsg("unexpected previous timeline ID %u (current timeline ID %u) in checkpoint record",
+ prevTLI, ThisTimeLineID)));
+
/*
- * The new timeline better be in the list of timelines we expect
- * to see, according to the timeline history. It should also not
- * decrease.
+ * The new timeline better be in the list of timelines we expect to see,
+ * according to the timeline history. It should also not decrease.
*/
if (newTLI < ThisTimeLineID || !tliInHistory(newTLI, expectedTLEs))
ereport(PANIC,
- (errmsg("unexpected timeline ID %u (after %u) in checkpoint record",
- newTLI, ThisTimeLineID)));
+ (errmsg("unexpected timeline ID %u (after %u) in checkpoint record",
+ newTLI, ThisTimeLineID)));
/*
- * If we have not yet reached min recovery point, and we're about
- * to switch to a timeline greater than the timeline of the min
- * recovery point: trouble. After switching to the new timeline,
- * we could not possibly visit the min recovery point on the
- * correct timeline anymore. This can happen if there is a newer
- * timeline in the archive that branched before the timeline the
- * min recovery point is on, and you attempt to do PITR to the
- * new timeline.
+ * If we have not yet reached min recovery point, and we're about to
+ * switch to a timeline greater than the timeline of the min recovery
+ * point: trouble. After switching to the new timeline, we could not
+ * possibly visit the min recovery point on the correct timeline anymore.
+ * This can happen if there is a newer timeline in the archive that
+ * branched before the timeline the min recovery point is on, and you
+ * attempt to do PITR to the new timeline.
*/
if (!XLogRecPtrIsInvalid(minRecoveryPoint) &&
- XLByteLT(lsn, minRecoveryPoint) &&
+ lsn < minRecoveryPoint &&
newTLI > minRecoveryPointTLI)
ereport(PANIC,
(errmsg("unexpected timeline ID %u in checkpoint record, before reaching minimum recovery point %X/%X on timeline %u",
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
- /* Backup blocks are not used in xlog records */
+ /* Backup blocks are not used by XLOG rmgr */
Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
if (info == XLOG_NEXTOID)
MultiXactSetNextMXact(checkPoint.nextMulti,
checkPoint.nextMultiOffset);
SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+ SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
/*
* If we see a shutdown checkpoint while waiting for an end-of-backup
* record, the backup was canceled and the end-of-backup record will
* never arrive.
*/
- if (InArchiveRecovery &&
+ if (ArchiveRecoveryRequested &&
!XLogRecPtrIsInvalid(ControlFile->backupStartPoint) &&
XLogRecPtrIsInvalid(ControlFile->backupEndPoint))
ereport(PANIC,
checkPoint.oldestXid))
SetTransactionIdLimit(checkPoint.oldestXid,
checkPoint.oldestXidDB);
+ MultiXactAdvanceOldest(checkPoint.oldestMulti,
+ checkPoint.oldestMultiDB);
/* ControlFile->checkPointCopy always tracks the latest ckpt XID */
ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
RecoveryRestartPoint(&checkPoint);
}
+ else if (info == XLOG_END_OF_RECOVERY)
+ {
+ xl_end_of_recovery xlrec;
+
+ memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_end_of_recovery));
+
+ /*
+ * For Hot Standby, we could treat this like a Shutdown Checkpoint,
+ * but this case is rarer and harder to test, so the benefit doesn't
+ * outweigh the potential extra cost of maintenance.
+ */
+
+ /*
+ * We should've already switched to the new TLI before replaying this
+ * record.
+ */
+ if (xlrec.ThisTimeLineID != ThisTimeLineID)
+ ereport(PANIC,
+ (errmsg("unexpected timeline ID %u (should be %u) in checkpoint record",
+ xlrec.ThisTimeLineID, ThisTimeLineID)));
+ }
else if (info == XLOG_NOOP)
{
/* nothing to do here */
{
/* nothing to do here */
}
+ else if (info == XLOG_FPI)
+ {
+ char *data;
+ BkpBlock bkpb;
+
+ /*
+ * Full-page image (FPI) records contain a backup block stored "inline"
+ * in the normal data since the locking when writing hint records isn't
+ * sufficient to use the normal backup block mechanism, which assumes
+ * exclusive lock on the buffer supplied.
+ *
+ * Since the only change in these backup block are hint bits, there
+ * are no recovery conflicts generated.
+ *
+ * This also means there is no corresponding API call for this, so an
+ * smgr implementation has no need to implement anything. Which means
+ * nothing is needed in md.c etc
+ */
+ data = XLogRecGetData(record);
+ memcpy(&bkpb, data, sizeof(BkpBlock));
+ data += sizeof(BkpBlock);
+
+ RestoreBackupBlockContents(lsn, bkpb, data, false, false);
+ }
else if (info == XLOG_BACKUP_END)
{
XLogRecPtr startpoint;
memcpy(&startpoint, XLogRecGetData(record), sizeof(startpoint));
- if (XLByteEQ(ControlFile->backupStartPoint, startpoint))
+ if (ControlFile->backupStartPoint == startpoint)
{
/*
* We have reached the end of base backup, the point where
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
- if (XLByteLT(ControlFile->minRecoveryPoint, lsn))
+ if (ControlFile->minRecoveryPoint < lsn)
{
ControlFile->minRecoveryPoint = lsn;
ControlFile->minRecoveryPointTLI = ThisTimeLineID;
}
- MemSet(&ControlFile->backupStartPoint, 0, sizeof(XLogRecPtr));
+ ControlFile->backupStartPoint = InvalidXLogRecPtr;
ControlFile->backupEndRequired = false;
UpdateControlFile();
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
ControlFile->MaxConnections = xlrec.MaxConnections;
+ ControlFile->max_worker_processes = xlrec.max_worker_processes;
ControlFile->max_prepared_xacts = xlrec.max_prepared_xacts;
ControlFile->max_locks_per_xact = xlrec.max_locks_per_xact;
ControlFile->wal_level = xlrec.wal_level;
*/
minRecoveryPoint = ControlFile->minRecoveryPoint;
minRecoveryPointTLI = ControlFile->minRecoveryPointTLI;
- if (minRecoveryPoint != 0 && XLByteLT(minRecoveryPoint, lsn))
+ if (minRecoveryPoint != 0 && minRecoveryPoint < lsn)
{
ControlFile->minRecoveryPoint = lsn;
ControlFile->minRecoveryPointTLI = ThisTimeLineID;
if (!fpw)
{
SpinLockAcquire(&xlogctl->info_lck);
- if (XLByteLT(xlogctl->lastFpwDisableRecPtr, ReadRecPtr))
+ if (xlogctl->lastFpwDisableRecPtr < ReadRecPtr)
xlogctl->lastFpwDisableRecPtr = ReadRecPtr;
SpinLockRelease(&xlogctl->info_lck);
}
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not fsync log segment %s: %m",
- XLogFileNameP(ThisTimeLineID, openLogSegNo))));
+ XLogFileNameP(ThisTimeLineID, openLogSegNo))));
if (get_sync_bit(sync_method) != get_sync_bit(new_sync_method))
XLogFileClose();
}
if (pg_fsync_writethrough(fd) != 0)
ereport(PANIC,
(errcode_for_file_access(),
- errmsg("could not fsync write-through log file %s: %m",
- XLogFileNameP(ThisTimeLineID, segno))));
+ errmsg("could not fsync write-through log file %s: %m",
+ XLogFileNameP(ThisTimeLineID, segno))));
break;
#endif
#ifdef HAVE_FDATASYNC
XLogFileNameP(TimeLineID tli, XLogSegNo segno)
{
char *result = palloc(MAXFNAMELEN);
+
XLogFileName(result, tli, segno);
return result;
}
* non-exclusive backups active at the same time, and they don't conflict
* with an exclusive backup either.
*
+ * Returns the minimum WAL position that must be present to restore from this
+ * backup, and the corresponding timeline ID in *starttli_p.
+ *
* Every successfully started non-exclusive backup must be stopped by calling
* do_pg_stop_backup() or do_pg_abort_backup().
*/
XLogRecPtr
-do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
+do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
+ char **labelfile)
{
bool exclusive = (labelfile == NULL);
bool backup_started_in_recovery = false;
XLogRecPtr checkpointloc;
XLogRecPtr startpoint;
+ TimeLineID starttli;
pg_time_t stamp_time;
char strfbuf[128];
char xlogfilename[MAXFNAMELEN];
backup_started_in_recovery = RecoveryInProgress();
- if (!superuser() && !is_authenticated_user_replication_role())
+ if (!superuser() && !has_rolreplication(GetUserId()))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser or replication role to run a backup")));
* Note that forcePageWrites has no effect during an online backup from
* the standby.
*
- * We must hold WALInsertLock to change the value of forcePageWrites, to
- * ensure adequate interlocking against XLogInsert().
+ * We must hold all the insertion slots to change the value of
+ * forcePageWrites, to ensure adequate interlocking against XLogInsert().
*/
- LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+ WALInsertSlotAcquire(true);
if (exclusive)
{
if (XLogCtl->Insert.exclusiveBackup)
{
- LWLockRelease(WALInsertLock);
+ WALInsertSlotRelease();
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("a backup is already in progress"),
else
XLogCtl->Insert.nonExclusiveBackups++;
XLogCtl->Insert.forcePageWrites = true;
- LWLockRelease(WALInsertLock);
+ WALInsertSlotRelease();
/* Ensure we release forcePageWrites if fail below */
PG_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) BoolGetDatum(exclusive));
LWLockAcquire(ControlFileLock, LW_SHARED);
checkpointloc = ControlFile->checkPoint;
startpoint = ControlFile->checkPointCopy.redo;
+ starttli = ControlFile->checkPointCopy.ThisTimeLineID;
checkpointfpw = ControlFile->checkPointCopy.fullPageWrites;
LWLockRelease(ControlFileLock);
recptr = xlogctl->lastFpwDisableRecPtr;
SpinLockRelease(&xlogctl->info_lck);
- if (!checkpointfpw || XLByteLE(startpoint, recptr))
+ if (!checkpointfpw || startpoint <= recptr)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("WAL generated with full_page_writes=off was replayed "
"since last restartpoint"),
- errhint("This means that the backup being taken on standby "
+ errhint("This means that the backup being taken on the standby "
"is corrupt and should not be used. "
"Enable full_page_writes and run CHECKPOINT on the master, "
"and then try an online backup again.")));
* taking a checkpoint right after another is not that expensive
* either because only few buffers have been dirtied yet.
*/
- LWLockAcquire(WALInsertLock, LW_SHARED);
- if (XLByteLT(XLogCtl->Insert.lastBackupStart, startpoint))
+ WALInsertSlotAcquire(true);
+ if (XLogCtl->Insert.lastBackupStart < startpoint)
{
XLogCtl->Insert.lastBackupStart = startpoint;
gotUniqueStartpoint = true;
}
- LWLockRelease(WALInsertLock);
+ WALInsertSlotRelease();
} while (!gotUniqueStartpoint);
XLByteToSeg(startpoint, _logSegNo);
"%Y-%m-%d %H:%M:%S %Z",
pg_localtime(&stamp_time, log_timezone));
appendStringInfo(&labelfbuf, "START WAL LOCATION: %X/%X (file %s)\n",
- (uint32) (startpoint >> 32), (uint32) startpoint, xlogfilename);
+ (uint32) (startpoint >> 32), (uint32) startpoint, xlogfilename);
appendStringInfo(&labelfbuf, "CHECKPOINT LOCATION: %X/%X\n",
- (uint32) (checkpointloc >> 32), (uint32) checkpointloc);
+ (uint32) (checkpointloc >> 32), (uint32) checkpointloc);
appendStringInfo(&labelfbuf, "BACKUP METHOD: %s\n",
exclusive ? "pg_start_backup" : "streamed");
appendStringInfo(&labelfbuf, "BACKUP FROM: %s\n",
/*
* We're done. As a convenience, return the starting WAL location.
*/
+ if (starttli_p)
+ *starttli_p = starttli;
return startpoint;
}
bool exclusive = DatumGetBool(arg);
/* Update backup counters and forcePageWrites on failure */
- LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+ WALInsertSlotAcquire(true);
if (exclusive)
{
Assert(XLogCtl->Insert.exclusiveBackup);
{
XLogCtl->Insert.forcePageWrites = false;
}
- LWLockRelease(WALInsertLock);
+ WALInsertSlotRelease();
}
/*
* If labelfile is NULL, this stops an exclusive backup. Otherwise this stops
* the non-exclusive backup specified by 'labelfile'.
+ *
+ * Returns the last WAL position that must be present to restore from this
+ * backup, and the corresponding timeline ID in *stoptli_p.
*/
XLogRecPtr
-do_pg_stop_backup(char *labelfile, bool waitforarchive)
+do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
{
bool exclusive = (labelfile == NULL);
bool backup_started_in_recovery = false;
XLogRecPtr startpoint;
XLogRecPtr stoppoint;
+ TimeLineID stoptli;
XLogRecData rdata;
pg_time_t stamp_time;
char strfbuf[128];
backup_started_in_recovery = RecoveryInProgress();
- if (!superuser() && !is_authenticated_user_replication_role())
+ if (!superuser() && !has_rolreplication(GetUserId()))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("must be superuser or replication role to run a backup"))));
/*
* OK to update backup counters and forcePageWrites
*/
- LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+ WALInsertSlotAcquire(true);
if (exclusive)
XLogCtl->Insert.exclusiveBackup = false;
else
{
XLogCtl->Insert.forcePageWrites = false;
}
- LWLockRelease(WALInsertLock);
+ WALInsertSlotRelease();
if (exclusive)
{
recptr = xlogctl->lastFpwDisableRecPtr;
SpinLockRelease(&xlogctl->info_lck);
- if (XLByteLE(startpoint, recptr))
+ if (startpoint <= recptr)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("WAL generated with full_page_writes=off was replayed "
"during online backup"),
- errhint("This means that the backup being taken on standby "
- "is corrupt and should not be used. "
+ errhint("This means that the backup being taken on the standby "
+ "is corrupt and should not be used. "
"Enable full_page_writes and run CHECKPOINT on the master, "
- "and then try an online backup again.")));
+ "and then try an online backup again.")));
LWLockAcquire(ControlFileLock, LW_SHARED);
stoppoint = ControlFile->minRecoveryPoint;
+ stoptli = ControlFile->minRecoveryPointTLI;
LWLockRelease(ControlFileLock);
+ if (stoptli_p)
+ *stoptli_p = stoptli;
return stoppoint;
}
rdata.buffer = InvalidBuffer;
rdata.next = NULL;
stoppoint = XLogInsert(RM_XLOG_ID, XLOG_BACKUP_END, &rdata);
+ stoptli = ThisTimeLineID;
/*
* Force a switch to a new xlog segment file, so that the backup is valid
errmsg("could not create file \"%s\": %m",
histfilepath)));
fprintf(fp, "START WAL LOCATION: %X/%X (file %s)\n",
- (uint32) (startpoint >> 32), (uint32) startpoint, startxlogfilename);
+ (uint32) (startpoint >> 32), (uint32) startpoint, startxlogfilename);
fprintf(fp, "STOP WAL LOCATION: %X/%X (file %s)\n",
(uint32) (stoppoint >> 32), (uint32) stoppoint, stopxlogfilename);
/* transfer remaining lines from label to history file */
/*
* We're done. As a convenience, return the ending WAL location.
*/
+ if (stoptli_p)
+ *stoptli_p = stoptli;
return stoppoint;
}
void
do_pg_abort_backup(void)
{
- LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+ WALInsertSlotAcquire(true);
Assert(XLogCtl->Insert.nonExclusiveBackups > 0);
XLogCtl->Insert.nonExclusiveBackups--;
{
XLogCtl->Insert.forcePageWrites = false;
}
- LWLockRelease(WALInsertLock);
+ WALInsertSlotRelease();
}
/*
XLogRecPtr
GetXLogInsertRecPtr(void)
{
- XLogCtlInsert *Insert = &XLogCtl->Insert;
- XLogRecPtr current_recptr;
+ volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+ uint64 current_bytepos;
- LWLockAcquire(WALInsertLock, LW_SHARED);
- INSERT_RECPTR(current_recptr, Insert, Insert->curridx);
- LWLockRelease(WALInsertLock);
+ SpinLockAcquire(&Insert->insertpos_lck);
+ current_bytepos = Insert->CurrBytePos;
+ SpinLockRelease(&Insert->insertpos_lck);
- return current_recptr;
+ return XLogBytePosToRecPtr(current_bytepos);
}
/*
/*
* Read the XLOG page containing RecPtr into readBuf (if not read already).
- * Returns true if the page is read successfully.
+ * Returns number of bytes read, if the page is read successfully, or -1
+ * in case of errors. When errors occur, they are ereport'ed, but only
+ * if they have not been previously reported.
*
* This is responsible for restoring files from archive as needed, as well
* as for waiting for the requested WAL record to arrive in standby mode.
* XLogPageRead() to try fetching the record from another source, or to
* sleep and retry.
*/
-static bool
-XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
- bool randAccess)
+static int
+XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
+ XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI)
{
+ XLogPageReadPrivate *private =
+ (XLogPageReadPrivate *) xlogreader->private_data;
+ int emode = private->emode;
uint32 targetPageOff;
- uint32 targetRecOff;
- XLogSegNo targetSegNo;
+ XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY;
- XLByteToSeg(*RecPtr, targetSegNo);
- targetPageOff = (((*RecPtr) % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
- targetRecOff = (*RecPtr) % XLOG_BLCKSZ;
-
- /* Fast exit if we have read the record in the current buffer already */
- if (!lastSourceFailed && targetSegNo == readSegNo &&
- targetPageOff == readOff && targetRecOff < readLen)
- return true;
+ XLByteToSeg(targetPagePtr, targetSegNo);
+ targetPageOff = targetPagePtr % XLogSegSize;
/*
* See if we need to switch to a new segment because the requested record
* is not in the currently open one.
*/
- if (readFile >= 0 && !XLByteInSeg(*RecPtr, readSegNo))
+ if (readFile >= 0 && !XLByteInSeg(targetPagePtr, readSegNo))
{
/*
* Request a restartpoint if we've replayed too much xlog since the
* last one.
*/
- if (StandbyMode && bgwriterLaunched)
+ if (StandbyModeRequested && bgwriterLaunched)
{
if (XLogCheckpointNeeded(readSegNo))
{
readSource = 0;
}
- XLByteToSeg(*RecPtr, readSegNo);
+ XLByteToSeg(targetPagePtr, readSegNo);
retry:
/* See if we need to retrieve more data */
if (readFile < 0 ||
- (readSource == XLOG_FROM_STREAM && !XLByteLT(*RecPtr, receivedUpto)))
+ (readSource == XLOG_FROM_STREAM &&
+ receivedUpto < targetPagePtr + reqLen))
{
- if (StandbyMode)
+ if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen,
+ private->randAccess,
+ private->fetching_ckpt,
+ targetRecPtr))
{
- if (!WaitForWALToBecomeAvailable(*RecPtr, randAccess,
- fetching_ckpt))
- goto triggered;
- }
- else
- {
- /* In archive or crash recovery. */
- if (readFile < 0)
- {
- int source;
-
- /* Reset curFileTLI if random fetch. */
- if (randAccess)
- curFileTLI = 0;
+ if (readFile >= 0)
+ close(readFile);
+ readFile = -1;
+ readLen = 0;
+ readSource = 0;
- if (InArchiveRecovery)
- source = XLOG_FROM_ANY;
- else
- source = XLOG_FROM_PG_XLOG;
-
- readFile = XLogFileReadAnyTLI(readSegNo, emode, source);
- if (readFile < 0)
- return false;
- }
+ return -1;
}
}
*/
if (readSource == XLOG_FROM_STREAM)
{
- if (((*RecPtr) / XLOG_BLCKSZ) != (receivedUpto / XLOG_BLCKSZ))
- {
+ if (((targetPagePtr) / XLOG_BLCKSZ) != (receivedUpto / XLOG_BLCKSZ))
readLen = XLOG_BLCKSZ;
- }
else
readLen = receivedUpto % XLogSegSize - targetPageOff;
}
else
readLen = XLOG_BLCKSZ;
- if (!readFileHeaderValidated && targetPageOff != 0)
- {
- /*
- * Whenever switching to a new WAL segment, we read the first page of
- * the file and validate its header, even if that's not where the
- * target record is. This is so that we can check the additional
- * identification info that is present in the first page's "long"
- * header.
- */
- readOff = 0;
- if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
- {
- char fname[MAXFNAMELEN];
- XLogFileName(fname, curFileTLI, readSegNo);
- ereport(emode_for_corrupt_record(emode, *RecPtr),
- (errcode_for_file_access(),
- errmsg("could not read from log segment %s, offset %u: %m",
- fname, readOff)));
- goto next_record_is_invalid;
- }
- if (!ValidXLogPageHeader((XLogPageHeader) readBuf, emode, true))
- goto next_record_is_invalid;
- }
-
/* Read the requested page */
readOff = targetPageOff;
if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
{
- char fname[MAXFNAMELEN];
+ char fname[MAXFNAMELEN];
+
XLogFileName(fname, curFileTLI, readSegNo);
- ereport(emode_for_corrupt_record(emode, *RecPtr),
+ ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
(errcode_for_file_access(),
- errmsg("could not seek in log segment %s to offset %u: %m",
- fname, readOff)));
+ errmsg("could not seek in log segment %s to offset %u: %m",
+ fname, readOff)));
goto next_record_is_invalid;
}
+
if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{
- char fname[MAXFNAMELEN];
+ char fname[MAXFNAMELEN];
+
XLogFileName(fname, curFileTLI, readSegNo);
- ereport(emode_for_corrupt_record(emode, *RecPtr),
+ ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
(errcode_for_file_access(),
- errmsg("could not read from log segment %s, offset %u: %m",
- fname, readOff)));
+ errmsg("could not read from log segment %s, offset %u: %m",
+ fname, readOff)));
goto next_record_is_invalid;
}
- if (!ValidXLogPageHeader((XLogPageHeader) readBuf, emode, false))
- goto next_record_is_invalid;
-
- readFileHeaderValidated = true;
Assert(targetSegNo == readSegNo);
Assert(targetPageOff == readOff);
- Assert(targetRecOff < readLen);
+ Assert(reqLen <= readLen);
- return true;
+ *readTLI = curFileTLI;
+ return readLen;
next_record_is_invalid:
lastSourceFailed = true;
if (StandbyMode)
goto retry;
else
- return false;
-
-triggered:
- if (readFile >= 0)
- close(readFile);
- readFile = -1;
- readLen = 0;
- readSource = 0;
-
- return false;
+ return -1;
}
/*
- * In standby mode, wait for the requested record to become available, either
- * via restore_command succeeding to restore the segment, or via walreceiver
- * having streamed the record (or via someone copying the segment directly to
- * pg_xlog, but that is not documented or recommended).
+ * Open the WAL segment containing WAL position 'RecPtr'.
+ *
+ * The segment can be fetched via restore_command, or via walreceiver having
+ * streamed the record, or it can already be present in pg_xlog. Checking
+ * pg_xlog is mainly for crash recovery, but it will be polled in standby mode
+ * too, in case someone copies a new segment directly to pg_xlog. That is not
+ * documented or recommended, though.
+ *
+ * If 'fetching_ckpt' is true, we're fetching a checkpoint record, and should
+ * prepare to read WAL starting from RedoStartLSN after this.
+ *
+ * 'RecPtr' might not point to the beginning of the record we're interested
+ * in, it might also point to the page or segment header. In that case,
+ * 'tliRecPtr' is the position of the WAL record we're interested in. It is
+ * used to decide which timeline to stream the requested WAL from.
+ *
+ * If the the record is not immediately available, the function returns false
+ * if we're not in standby mode. In standby mode, waits for it to become
+ * available.
*
* When the requested record becomes available, the function opens the file
* containing it (if not open already), and returns true. When end of standby
*/
static bool
WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
- bool fetching_ckpt)
+ bool fetching_ckpt, XLogRecPtr tliRecPtr)
{
static pg_time_t last_fail_time = 0;
- pg_time_t now;
+ pg_time_t now;
/*-------
* Standby mode is implemented by a state machine:
*
- * 1. Read from archive (XLOG_FROM_ARCHIVE)
+ * 1. Read from archive (XLOG_FROM_ARCHIVE)
* 2. Read from pg_xlog (XLOG_FROM_PG_XLOG)
* 3. Check trigger file
* 4. Read from primary server via walreceiver (XLOG_FROM_STREAM)
* part of advancing to the next state.
*-------
*/
- if (currentSource == 0)
+ if (!InArchiveRecovery)
+ currentSource = XLOG_FROM_PG_XLOG;
+ else if (currentSource == 0)
currentSource = XLOG_FROM_ARCHIVE;
for (;;)
{
- int oldSource = currentSource;
+ int oldSource = currentSource;
/*
* First check if we failed to read from the current source, and
*/
if (lastSourceFailed)
{
-
switch (currentSource)
{
case XLOG_FROM_ARCHIVE:
break;
case XLOG_FROM_PG_XLOG:
+
/*
- * Check to see if the trigger file exists. Note that we do
- * this only after failure, so when you create the trigger
- * file, we still finish replaying as much as we can from
- * archive and pg_xlog before failover.
+ * Check to see if the trigger file exists. Note that we
+ * do this only after failure, so when you create the
+ * trigger file, we still finish replaying as much as we
+ * can from archive and pg_xlog before failover.
*/
- if (CheckForStandbyTrigger())
+ if (StandbyMode && CheckForStandbyTrigger())
{
ShutdownWalRcv();
return false;
}
/*
- * If primary_conninfo is set, launch walreceiver to try to
- * stream the missing WAL.
+ * Not in standby mode, and we've now tried the archive
+ * and pg_xlog.
+ */
+ if (!StandbyMode)
+ return false;
+
+ /*
+ * If primary_conninfo is set, launch walreceiver to try
+ * to stream the missing WAL.
*
* If fetching_ckpt is TRUE, RecPtr points to the initial
* checkpoint location. In that case, we use RedoStartLSN
*/
if (PrimaryConnInfo)
{
- XLogRecPtr ptr = fetching_ckpt ? RedoStartLSN : RecPtr;
- TimeLineID tli = tliOfPointInHistory(ptr, expectedTLEs);
+ XLogRecPtr ptr;
+ TimeLineID tli;
+
+ if (fetching_ckpt)
+ {
+ ptr = RedoStartLSN;
+ tli = ControlFile->checkPointCopy.ThisTimeLineID;
+ }
+ else
+ {
+ ptr = tliRecPtr;
+ tli = tliOfPointInHistory(tliRecPtr, expectedTLEs);
- if (curFileTLI > 0 && tli < curFileTLI)
- elog(ERROR, "according to history file, WAL location %X/%X belongs to timeline %u, but previous recovered WAL file came from timeline %u",
- (uint32) (ptr >> 32), (uint32) ptr,
- tli, curFileTLI);
+ if (curFileTLI > 0 && tli < curFileTLI)
+ elog(ERROR, "according to history file, WAL location %X/%X belongs to timeline %u, but previous recovered WAL file came from timeline %u",
+ (uint32) (ptr >> 32), (uint32) ptr,
+ tli, curFileTLI);
+ }
curFileTLI = tli;
- RequestXLogStreaming(curFileTLI, ptr, PrimaryConnInfo);
+ RequestXLogStreaming(tli, ptr, PrimaryConnInfo);
+ receivedUpto = 0;
}
+
/*
- * Move to XLOG_FROM_STREAM state in either case. We'll get
- * immediate failure if we didn't launch walreceiver, and
- * move on to the next state.
+ * Move to XLOG_FROM_STREAM state in either case. We'll
+ * get immediate failure if we didn't launch walreceiver,
+ * and move on to the next state.
*/
currentSource = XLOG_FROM_STREAM;
break;
case XLOG_FROM_STREAM:
+
/*
- * Failure while streaming. Most likely, we got here because
- * streaming replication was terminated, or promotion was
- * triggered. But we also get here if we find an invalid
- * record in the WAL streamed from master, in which case
- * something is seriously wrong. There's little chance that
- * the problem will just go away, but PANIC is not good for
- * availability either, especially in hot standby mode. So,
- * we treat that the same as disconnection, and retry from
- * archive/pg_xlog again. The WAL in the archive should be
- * identical to what was streamed, so it's unlikely that it
- * helps, but one can hope...
+ * Failure while streaming. Most likely, we got here
+ * because streaming replication was terminated, or
+ * promotion was triggered. But we also get here if we
+ * find an invalid record in the WAL streamed from master,
+ * in which case something is seriously wrong. There's
+ * little chance that the problem will just go away, but
+ * PANIC is not good for availability either, especially
+ * in hot standby mode. So, we treat that the same as
+ * disconnection, and retry from archive/pg_xlog again.
+ * The WAL in the archive should be identical to what was
+ * streamed, so it's unlikely that it helps, but one can
+ * hope...
*/
+
/*
* Before we leave XLOG_FROM_STREAM state, make sure that
* walreceiver is not active, so that it won't overwrite
}
/*
- * XLOG_FROM_STREAM is the last state in our state machine,
- * so we've exhausted all the options for obtaining the
- * requested WAL. We're going to loop back and retry from
- * the archive, but if it hasn't been long since last
- * attempt, sleep 5 seconds to avoid busy-waiting.
+ * XLOG_FROM_STREAM is the last state in our state
+ * machine, so we've exhausted all the options for
+ * obtaining the requested WAL. We're going to loop back
+ * and retry from the archive, but if it hasn't been long
+ * since last attempt, sleep 5 seconds to avoid
+ * busy-waiting.
*/
now = (pg_time_t) time(NULL);
if ((now - last_fail_time) < 5)
else if (currentSource == XLOG_FROM_PG_XLOG)
{
/*
- * We just successfully read a file in pg_xlog. We prefer files
- * in the archive over ones in pg_xlog, so try the next file
- * again from the archive first.
+ * We just successfully read a file in pg_xlog. We prefer files in
+ * the archive over ones in pg_xlog, so try the next file again
+ * from the archive first.
*/
- currentSource = XLOG_FROM_ARCHIVE;
+ if (InArchiveRecovery)
+ currentSource = XLOG_FROM_ARCHIVE;
}
if (currentSource != oldSource)
break;
case XLOG_FROM_STREAM:
- {
- bool havedata;
-
- /*
- * Check if WAL receiver is still active.
- */
- if (!WalRcvStreaming())
- {
- lastSourceFailed = true;
- break;
- }
-
- /*
- * Walreceiver is active, so see if new data has arrived.
- *
- * We only advance XLogReceiptTime when we obtain fresh WAL
- * from walreceiver and observe that we had already processed
- * everything before the most recent "chunk" that it flushed to
- * disk. In steady state where we are keeping up with the
- * incoming data, XLogReceiptTime will be updated on each cycle.
- * When we are behind, XLogReceiptTime will not advance, so the
- * grace time allotted to conflicting queries will decrease.
- */
- if (XLByteLT(RecPtr, receivedUpto))
- havedata = true;
- else
{
- XLogRecPtr latestChunkStart;
+ bool havedata;
- receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart, &receiveTLI);
- if (XLByteLT(RecPtr, receivedUpto) && receiveTLI == curFileTLI)
+ /*
+ * Check if WAL receiver is still active.
+ */
+ if (!WalRcvStreaming())
{
+ lastSourceFailed = true;
+ break;
+ }
+
+ /*
+ * Walreceiver is active, so see if new data has arrived.
+ *
+ * We only advance XLogReceiptTime when we obtain fresh
+ * WAL from walreceiver and observe that we had already
+ * processed everything before the most recent "chunk"
+ * that it flushed to disk. In steady state where we are
+ * keeping up with the incoming data, XLogReceiptTime will
+ * be updated on each cycle. When we are behind,
+ * XLogReceiptTime will not advance, so the grace time
+ * allotted to conflicting queries will decrease.
+ */
+ if (RecPtr < receivedUpto)
havedata = true;
- if (!XLByteLT(RecPtr, latestChunkStart))
+ else
+ {
+ XLogRecPtr latestChunkStart;
+
+ receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart, &receiveTLI);
+ if (RecPtr < receivedUpto && receiveTLI == curFileTLI)
{
- XLogReceiptTime = GetCurrentTimestamp();
- SetCurrentChunkStartTime(XLogReceiptTime);
+ havedata = true;
+ if (latestChunkStart <= RecPtr)
+ {
+ XLogReceiptTime = GetCurrentTimestamp();
+ SetCurrentChunkStartTime(XLogReceiptTime);
+ }
}
+ else
+ havedata = false;
}
- else
- havedata = false;
- }
- if (havedata)
- {
- /*
- * Great, streamed far enough. Open the file if it's not
- * open already. Use XLOG_FROM_STREAM so that source info
- * is set correctly and XLogReceiptTime isn't changed.
- */
- if (readFile < 0)
+ if (havedata)
{
- readFile = XLogFileRead(readSegNo, PANIC,
- recoveryTargetTLI,
- XLOG_FROM_STREAM, false);
- Assert(readFile >= 0);
+ /*
+ * Great, streamed far enough. Open the file if it's
+ * not open already. Also read the timeline history
+ * file if we haven't initialized timeline history
+ * yet; it should be streamed over and present in
+ * pg_xlog by now. Use XLOG_FROM_STREAM so that
+ * source info is set correctly and XLogReceiptTime
+ * isn't changed.
+ */
+ if (readFile < 0)
+ {
+ if (!expectedTLEs)
+ expectedTLEs = readTimeLineHistory(receiveTLI);
+ readFile = XLogFileRead(readSegNo, PANIC,
+ receiveTLI,
+ XLOG_FROM_STREAM, false);
+ Assert(readFile >= 0);
+ }
+ else
+ {
+ /* just make sure source info is correct... */
+ readSource = XLOG_FROM_STREAM;
+ XLogReceiptSource = XLOG_FROM_STREAM;
+ return true;
+ }
+ break;
}
- else
+
+ /*
+ * Data not here yet. Check for trigger, then wait for
+ * walreceiver to wake us up when new WAL arrives.
+ */
+ if (CheckForStandbyTrigger())
{
- /* just make sure source info is correct... */
- readSource = XLOG_FROM_STREAM;
- XLogReceiptSource = XLOG_FROM_STREAM;
- return true;
+ /*
+ * Note that we don't "return false" immediately here.
+ * After being triggered, we still want to replay all
+ * the WAL that was already streamed. It's in pg_xlog
+ * now, so we just treat this as a failure, and the
+ * state machine will move on to replay the streamed
+ * WAL from pg_xlog, and then recheck the trigger and
+ * exit replay.
+ */
+ lastSourceFailed = true;
+ break;
}
- break;
- }
- /*
- * Data not here yet. Check for trigger, then wait for
- * walreceiver to wake us up when new WAL arrives.
- */
- if (CheckForStandbyTrigger())
- {
/*
- * Note that we don't "return false" immediately here.
- * After being triggered, we still want to replay all the
- * WAL that was already streamed. It's in pg_xlog now, so
- * we just treat this as a failure, and the state machine
- * will move on to replay the streamed WAL from pg_xlog,
- * and then recheck the trigger and exit replay.
+ * Wait for more WAL to arrive. Time out after 5 seconds,
+ * like when polling the archive, to react to a trigger
+ * file promptly.
*/
- lastSourceFailed = true;
+ WaitLatch(&XLogCtl->recoveryWakeupLatch,
+ WL_LATCH_SET | WL_TIMEOUT,
+ 5000L);
+ ResetLatch(&XLogCtl->recoveryWakeupLatch);
break;
}
- /*
- * Wait for more WAL to arrive. Time out after 5 seconds, like
- * when polling the archive, to react to a trigger file
- * promptly.
- */
- WaitLatch(&XLogCtl->recoveryWakeupLatch,
- WL_LATCH_SET | WL_TIMEOUT,
- 5000L);
- ResetLatch(&XLogCtl->recoveryWakeupLatch);
- break;
- }
-
default:
elog(ERROR, "unexpected WAL source %d", currentSource);
}
* process.
*/
HandleStartupProcInterrupts();
- }
+ } while (StandbyMode);
- return false; /* not reached */
+ return false;
}
/*
if (readSource == XLOG_FROM_PG_XLOG && emode == LOG)
{
- if (XLByteEQ(RecPtr, lastComplaint))
+ if (RecPtr == lastComplaint)
emode = DEBUG1;
else
lastComplaint = RecPtr;
if (IsPromoteTriggered())
{
- ereport(LOG,
- (errmsg("received promote request")));
+ /*
+ * In 9.1 and 9.2 the postmaster unlinked the promote file inside the
+ * signal handler. It now leaves the file in place and lets the
+ * Startup process do the unlink. This allows Startup to know whether
+ * it should create a full checkpoint before starting up (fallback
+ * mode). Fast promotion takes precedence.
+ */
+ if (stat(PROMOTE_SIGNAL_FILE, &stat_buf) == 0)
+ {
+ unlink(PROMOTE_SIGNAL_FILE);
+ unlink(FALLBACK_PROMOTE_SIGNAL_FILE);
+ fast_promote = true;
+ }
+ else if (stat(FALLBACK_PROMOTE_SIGNAL_FILE, &stat_buf) == 0)
+ {
+ unlink(FALLBACK_PROMOTE_SIGNAL_FILE);
+ fast_promote = false;
+ }
+
+ ereport(LOG, (errmsg("received promote request")));
+
ResetPromoteTriggered();
triggered = true;
return true;
(errmsg("trigger file found: %s", TriggerFile)));
unlink(TriggerFile);
triggered = true;
+ fast_promote = true;
return true;
}
return false;
{
struct stat stat_buf;
- if (stat(PROMOTE_SIGNAL_FILE, &stat_buf) == 0)
- {
- /*
- * Since we are in a signal handler, it's not safe to elog. We
- * silently ignore any error from unlink.
- */
- unlink(PROMOTE_SIGNAL_FILE);
+ if (stat(PROMOTE_SIGNAL_FILE, &stat_buf) == 0 ||
+ stat(FALLBACK_PROMOTE_SIGNAL_FILE, &stat_buf) == 0)
return true;
- }
+
return false;
}