1 /*-------------------------------------------------------------------------
4 * PostgreSQL transaction log manager
7 * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
10 * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.403 2010/04/23 20:21:31 sriggs Exp $
12 *-------------------------------------------------------------------------
26 #include "access/clog.h"
27 #include "access/multixact.h"
28 #include "access/subtrans.h"
29 #include "access/transam.h"
30 #include "access/tuptoaster.h"
31 #include "access/twophase.h"
32 #include "access/xact.h"
33 #include "access/xlog_internal.h"
34 #include "access/xlogutils.h"
35 #include "catalog/catversion.h"
36 #include "catalog/pg_control.h"
37 #include "catalog/pg_database.h"
38 #include "catalog/pg_type.h"
40 #include "libpq/pqsignal.h"
41 #include "miscadmin.h"
43 #include "postmaster/bgwriter.h"
44 #include "replication/walreceiver.h"
45 #include "replication/walsender.h"
46 #include "storage/bufmgr.h"
47 #include "storage/fd.h"
48 #include "storage/ipc.h"
49 #include "storage/pmsignal.h"
50 #include "storage/procarray.h"
51 #include "storage/smgr.h"
52 #include "storage/spin.h"
53 #include "utils/builtins.h"
54 #include "utils/guc.h"
55 #include "utils/ps_status.h"
56 #include "utils/relmapper.h"
60 /* File path names (all relative to $PGDATA) */
61 #define BACKUP_LABEL_FILE "backup_label"
62 #define BACKUP_LABEL_OLD "backup_label.old"
63 #define RECOVERY_COMMAND_FILE "recovery.conf"
64 #define RECOVERY_COMMAND_DONE "recovery.done"
67 /* User-settable parameters */
68 int CheckPointSegments = 3;
69 int wal_keep_segments = 0;
71 int XLogArchiveTimeout = 0;
72 bool XLogArchiveMode = false;
73 char *XLogArchiveCommand = NULL;
74 bool XLogRequestRecoveryConnections = true;
75 int MaxStandbyDelay = 30;
76 bool fullPageWrites = true;
77 bool log_checkpoints = false;
78 int sync_method = DEFAULT_SYNC_METHOD;
81 bool XLOG_DEBUG = false;
85 * XLOGfileslop is the maximum number of preallocated future XLOG segments.
86 * When we are done with an old XLOG segment file, we will recycle it as a
87 * future XLOG segment as long as there aren't already XLOGfileslop future
88 * segments; else we'll delete it. This could be made a separate GUC
89 * variable, but at present I think it's sufficient to hardwire it as
90 * 2*CheckPointSegments+1. Under normal conditions, a checkpoint will free
91 * no more than 2*CheckPointSegments log segments, and we want to recycle all
92 * of them; the +1 allows boundary cases to happen without wasting a
93 * delete/create-segment cycle.
95 #define XLOGfileslop (2*CheckPointSegments + 1)
100 const struct config_enum_entry sync_method_options[] = {
101 {"fsync", SYNC_METHOD_FSYNC, false},
102 #ifdef HAVE_FSYNC_WRITETHROUGH
103 {"fsync_writethrough", SYNC_METHOD_FSYNC_WRITETHROUGH, false},
105 #ifdef HAVE_FDATASYNC
106 {"fdatasync", SYNC_METHOD_FDATASYNC, false},
108 #ifdef OPEN_SYNC_FLAG
109 {"open_sync", SYNC_METHOD_OPEN, false},
111 #ifdef OPEN_DATASYNC_FLAG
112 {"open_datasync", SYNC_METHOD_OPEN_DSYNC, false},
118 * Statistics for current checkpoint are collected in this global struct.
119 * Because only the background writer or a stand-alone backend can perform
120 * checkpoints, this will be unused in normal backends.
122 CheckpointStatsData CheckpointStats;
125 * ThisTimeLineID will be same in all backends --- it identifies current
126 * WAL timeline for the database system.
128 TimeLineID ThisTimeLineID = 0;
131 * Are we doing recovery from XLOG?
133 * This is only ever true in the startup process; it should be read as meaning
134 * "this process is replaying WAL records", rather than "the system is in
135 * recovery mode". It should be examined primarily by functions that need
136 * to act differently when called from a WAL redo function (e.g., to skip WAL
137 * logging). To check whether the system is in recovery regardless of which
138 * process you're running in, use RecoveryInProgress() but only after shared
139 * memory startup and lock initialization.
141 bool InRecovery = false;
143 /* Are we in Hot Standby mode? Only valid in startup process, see xlog.h */
144 HotStandbyState standbyState = STANDBY_DISABLED;
146 static XLogRecPtr LastRec;
149 * Local copy of SharedRecoveryInProgress variable. True actually means "not
150 * known, need to check the shared state".
152 static bool LocalRecoveryInProgress = true;
155 * Local state for XLogInsertAllowed():
156 * 1: unconditionally allowed to insert XLOG
157 * 0: unconditionally not allowed to insert XLOG
158 * -1: must check RecoveryInProgress(); disallow until it is false
159 * Most processes start with -1 and transition to 1 after seeing that recovery
160 * is not in progress. But we can also force the value for special cases.
161 * The coding in XLogInsertAllowed() depends on the first two of these states
162 * being numerically the same as bool true and false.
164 static int LocalXLogInsertAllowed = -1;
166 /* Are we recovering using offline XLOG archives? */
167 static bool InArchiveRecovery = false;
169 /* Was the last xlog file restored from archive, or local? */
170 static bool restoredFromArchive = false;
172 /* options taken from recovery.conf for archive recovery */
173 static char *recoveryRestoreCommand = NULL;
174 static char *recoveryEndCommand = NULL;
175 static char *restartPointCommand = NULL;
176 static RecoveryTargetType recoveryTarget = RECOVERY_TARGET_UNSET;
177 static bool recoveryTargetInclusive = true;
178 static TransactionId recoveryTargetXid;
179 static TimestampTz recoveryTargetTime;
180 static TimestampTz recoveryLastXTime = 0;
182 /* options taken from recovery.conf for XLOG streaming */
183 static bool StandbyMode = false;
184 static char *PrimaryConnInfo = NULL;
185 char *TriggerFile = NULL;
187 /* if recoveryStopsHere returns true, it saves actual stop xid/time here */
188 static TransactionId recoveryStopXid;
189 static TimestampTz recoveryStopTime;
190 static bool recoveryStopAfter;
193 * During normal operation, the only timeline we care about is ThisTimeLineID.
194 * During recovery, however, things are more complicated. To simplify life
195 * for rmgr code, we keep ThisTimeLineID set to the "current" timeline as we
196 * scan through the WAL history (that is, it is the line that was active when
197 * the currently-scanned WAL record was generated). We also need these
200 * recoveryTargetTLI: the desired timeline that we want to end in.
202 * expectedTLIs: an integer list of recoveryTargetTLI and the TLIs of
203 * its known parents, newest first (so recoveryTargetTLI is always the
204 * first list member). Only these TLIs are expected to be seen in the WAL
205 * segments we read, and indeed only these TLIs will be considered as
206 * candidate WAL files to open at all.
208 * curFileTLI: the TLI appearing in the name of the current input WAL file.
209 * (This is not necessarily the same as ThisTimeLineID, because we could
210 * be scanning data that was copied from an ancestor timeline when the current
211 * file was created.) During a sequential scan we do not allow this value
214 static TimeLineID recoveryTargetTLI;
215 static List *expectedTLIs;
216 static TimeLineID curFileTLI;
219 * ProcLastRecPtr points to the start of the last XLOG record inserted by the
220 * current backend. It is updated for all inserts. XactLastRecEnd points to
221 * end+1 of the last record, and is reset when we end a top-level transaction,
222 * or start a new one; so it can be used to tell if the current transaction has
223 * created any XLOG records.
225 static XLogRecPtr ProcLastRecPtr = {0, 0};
227 XLogRecPtr XactLastRecEnd = {0, 0};
230 * RedoRecPtr is this backend's local copy of the REDO record pointer
231 * (which is almost but not quite the same as a pointer to the most recent
232 * CHECKPOINT record). We update this from the shared-memory copy,
233 * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
234 * hold the Insert lock). See XLogInsert for details. We are also allowed
235 * to update from XLogCtl->Insert.RedoRecPtr if we hold the info_lck;
236 * see GetRedoRecPtr. A freshly spawned backend obtains the value during
239 static XLogRecPtr RedoRecPtr;
242 * RedoStartLSN points to the checkpoint's REDO location which is specified
243 * in a backup label file, backup history file or control file. In standby
244 * mode, XLOG streaming usually starts from the position where an invalid
245 * record was found. But if we fail to read even the initial checkpoint
246 * record, we use the REDO location instead of the checkpoint location as
247 * the start position of XLOG streaming. Otherwise we would have to jump
248 * backwards to the REDO location after reading the checkpoint record,
249 * because the REDO record can precede the checkpoint record.
251 static XLogRecPtr RedoStartLSN = {0, 0};
254 * Shared-memory data structures for XLOG control
256 * LogwrtRqst indicates a byte position that we need to write and/or fsync
257 * the log up to (all records before that point must be written or fsynced).
258 * LogwrtResult indicates the byte positions we have already written/fsynced.
259 * These structs are identical but are declared separately to indicate their
260 * slightly different functions.
262 * We do a lot of pushups to minimize the amount of access to lockable
263 * shared memory values. There are actually three shared-memory copies of
264 * LogwrtResult, plus one unshared copy in each backend. Here's how it works:
265 * XLogCtl->LogwrtResult is protected by info_lck
266 * XLogCtl->Write.LogwrtResult is protected by WALWriteLock
267 * XLogCtl->Insert.LogwrtResult is protected by WALInsertLock
268 * One must hold the associated lock to read or write any of these, but
269 * of course no lock is needed to read/write the unshared LogwrtResult.
271 * XLogCtl->LogwrtResult and XLogCtl->Write.LogwrtResult are both "always
272 * right", since both are updated by a write or flush operation before
273 * it releases WALWriteLock. The point of keeping XLogCtl->Write.LogwrtResult
274 * is that it can be examined/modified by code that already holds WALWriteLock
275 * without needing to grab info_lck as well.
277 * XLogCtl->Insert.LogwrtResult may lag behind the reality of the other two,
278 * but is updated when convenient. Again, it exists for the convenience of
279 * code that is already holding WALInsertLock but not the other locks.
281 * The unshared LogwrtResult may lag behind any or all of these, and again
282 * is updated when convenient.
284 * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
285 * (protected by info_lck), but we don't need to cache any copies of it.
287 * Note that this all works because the request and result positions can only
288 * advance forward, never back up, and so we can easily determine which of two
289 * values is "more up to date".
291 * info_lck is only held long enough to read/update the protected variables,
292 * so it's a plain spinlock. The other locks are held longer (potentially
293 * over I/O operations), so we use LWLocks for them. These locks are:
295 * WALInsertLock: must be held to insert a record into the WAL buffers.
297 * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
300 * ControlFileLock: must be held to read/update control file or create
303 * CheckpointLock: must be held to do a checkpoint or restartpoint (ensures
304 * only one checkpointer at a time; currently, with all checkpoints done by
305 * the bgwriter, this is just pro forma).
310 typedef struct XLogwrtRqst
312 XLogRecPtr Write; /* last byte + 1 to write out */
313 XLogRecPtr Flush; /* last byte + 1 to flush */
316 typedef struct XLogwrtResult
318 XLogRecPtr Write; /* last byte + 1 written out */
319 XLogRecPtr Flush; /* last byte + 1 flushed */
323 * Shared state data for XLogInsert.
325 typedef struct XLogCtlInsert
327 XLogwrtResult LogwrtResult; /* a recent value of LogwrtResult */
328 XLogRecPtr PrevRecord; /* start of previously-inserted record */
329 int curridx; /* current block index in cache */
330 XLogPageHeader currpage; /* points to header of block in cache */
331 char *currpos; /* current insertion point in cache */
332 XLogRecPtr RedoRecPtr; /* current redo point for insertions */
333 bool forcePageWrites; /* forcing full-page writes for PITR? */
337 * Shared state data for XLogWrite/XLogFlush.
339 typedef struct XLogCtlWrite
341 XLogwrtResult LogwrtResult; /* current value of LogwrtResult */
342 int curridx; /* cache index of next block to write */
343 pg_time_t lastSegSwitchTime; /* time of last xlog segment switch */
347 * Total shared-memory state for XLOG.
349 typedef struct XLogCtlData
351 /* Protected by WALInsertLock: */
352 XLogCtlInsert Insert;
354 /* Protected by info_lck: */
355 XLogwrtRqst LogwrtRqst;
356 XLogwrtResult LogwrtResult;
357 uint32 ckptXidEpoch; /* nextXID & epoch of latest checkpoint */
358 TransactionId ckptXid;
359 XLogRecPtr asyncCommitLSN; /* LSN of newest async commit */
360 uint32 lastRemovedLog; /* latest removed/recycled XLOG segment */
361 uint32 lastRemovedSeg;
363 /* Protected by WALWriteLock: */
367 * These values do not change after startup, although the pointed-to pages
368 * and xlblocks values certainly do. Permission to read/write the pages
369 * and xlblocks values depends on WALInsertLock and WALWriteLock.
371 char *pages; /* buffers for unwritten XLOG pages */
372 XLogRecPtr *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */
373 int XLogCacheBlck; /* highest allocated xlog buffer index */
374 TimeLineID ThisTimeLineID;
375 TimeLineID RecoveryTargetTLI;
377 * restartPointCommand is read from recovery.conf but needs to be in
378 * shared memory so that the bgwriter process can access it.
380 char restartPointCommand[MAXPGPATH];
383 * SharedRecoveryInProgress indicates if we're still in crash or archive
384 * recovery. Protected by info_lck.
386 bool SharedRecoveryInProgress;
389 * During recovery, we keep a copy of the latest checkpoint record here.
390 * Used by the background writer when it wants to create a restartpoint.
392 * Protected by info_lck.
394 XLogRecPtr lastCheckPointRecPtr;
395 CheckPoint lastCheckPoint;
397 /* end+1 of the last record replayed (or being replayed) */
398 XLogRecPtr replayEndRecPtr;
399 /* timestamp of last record replayed (or being replayed) */
400 TimestampTz recoveryLastXTime;
401 /* end+1 of the last record replayed */
402 XLogRecPtr recoveryLastRecPtr;
404 slock_t info_lck; /* locks shared variables shown above */
407 static XLogCtlData *XLogCtl = NULL;
410 * We maintain an image of pg_control in shared memory.
412 static ControlFileData *ControlFile = NULL;
415 * Macros for managing XLogInsert state. In most cases, the calling routine
416 * has local copies of XLogCtl->Insert and/or XLogCtl->Insert->curridx,
417 * so these are passed as parameters instead of being fetched via XLogCtl.
420 /* Free space remaining in the current xlog page buffer */
421 #define INSERT_FREESPACE(Insert) \
422 (XLOG_BLCKSZ - ((Insert)->currpos - (char *) (Insert)->currpage))
424 /* Construct XLogRecPtr value for current insertion point */
425 #define INSERT_RECPTR(recptr,Insert,curridx) \
427 (recptr).xlogid = XLogCtl->xlblocks[curridx].xlogid, \
429 XLogCtl->xlblocks[curridx].xrecoff - INSERT_FREESPACE(Insert) \
432 #define PrevBufIdx(idx) \
433 (((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))
435 #define NextBufIdx(idx) \
436 (((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
439 * Private, possibly out-of-date copy of shared LogwrtResult.
440 * See discussion above.
442 static XLogwrtResult LogwrtResult = {{0, 0}, {0, 0}};
445 * openLogFile is -1 or a kernel FD for an open log file segment.
446 * When it's open, openLogOff is the current seek offset in the file.
447 * openLogId/openLogSeg identify the segment. These variables are only
448 * used to write the XLOG, and so will normally refer to the active segment.
450 static int openLogFile = -1;
451 static uint32 openLogId = 0;
452 static uint32 openLogSeg = 0;
453 static uint32 openLogOff = 0;
456 * Codes indicating where we got a WAL file from during recovery, or where
457 * to attempt to get one.
459 #define XLOG_FROM_ARCHIVE (1<<0) /* Restored using restore_command */
460 #define XLOG_FROM_PG_XLOG (1<<1) /* Existing file in pg_xlog */
461 #define XLOG_FROM_STREAM (1<<2) /* Streamed from master */
464 * These variables are used similarly to the ones above, but for reading
465 * the XLOG. Note, however, that readOff generally represents the offset
466 * of the page just read, not the seek position of the FD itself, which
467 * will be just past that page. readLen indicates how much of the current
468 * page has been read into readBuf, and readSource indicates where we got
469 * the currently open file from.
471 static int readFile = -1;
472 static uint32 readId = 0;
473 static uint32 readSeg = 0;
474 static uint32 readOff = 0;
475 static uint32 readLen = 0;
476 static int readSource = 0; /* XLOG_FROM_* code */
479 * Keeps track of which sources we've tried to read the current WAL
480 * record from and failed.
482 static int failedSources = 0;
484 /* Buffer for currently read page (XLOG_BLCKSZ bytes) */
485 static char *readBuf = NULL;
487 /* Buffer for current ReadRecord result (expandable) */
488 static char *readRecordBuf = NULL;
489 static uint32 readRecordBufSize = 0;
491 /* State information for XLOG reading */
492 static XLogRecPtr ReadRecPtr; /* start of last record read */
493 static XLogRecPtr EndRecPtr; /* end+1 of last record read */
494 static TimeLineID lastPageTLI = 0;
496 static XLogRecPtr minRecoveryPoint; /* local copy of
497 * ControlFile->minRecoveryPoint */
498 static bool updateMinRecoveryPoint = true;
499 static bool reachedMinRecoveryPoint = false;
501 static bool InRedo = false;
504 * Flags set by interrupt handlers for later service in the redo loop.
506 static volatile sig_atomic_t got_SIGHUP = false;
507 static volatile sig_atomic_t shutdown_requested = false;
510 * Flag set when executing a restore command, to tell SIGTERM signal handler
511 * that it's safe to just proc_exit.
513 static volatile sig_atomic_t in_restore_command = false;
516 static void XLogArchiveNotify(const char *xlog);
517 static void XLogArchiveNotifySeg(uint32 log, uint32 seg);
518 static bool XLogArchiveCheckDone(const char *xlog);
519 static bool XLogArchiveIsBusy(const char *xlog);
520 static void XLogArchiveCleanup(const char *xlog);
521 static void readRecoveryCommandFile(void);
522 static void exitArchiveRecovery(TimeLineID endTLI,
523 uint32 endLogId, uint32 endLogSeg);
524 static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
525 static void CheckRequiredParameterValues(CheckPoint checkPoint);
526 static void LocalSetXLogInsertAllowed(void);
527 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
529 static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
530 XLogRecPtr *lsn, BkpBlock *bkpb);
531 static bool AdvanceXLInsertBuffer(bool new_segment);
532 static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
533 static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
534 bool find_free, int *max_advance,
536 static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
537 int source, bool notexistOk);
538 static int XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode,
540 static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
542 static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
543 static void XLogFileClose(void);
544 static bool RestoreArchivedFile(char *path, const char *xlogfname,
545 const char *recovername, off_t expectedSize);
546 static void ExecuteRecoveryCommand(char *command, char *commandName,
548 static void PreallocXlogFiles(XLogRecPtr endptr);
549 static void RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr);
550 static void UpdateLastRemovedPtr(char *filename);
551 static void ValidateXLOGDirectoryStructure(void);
552 static void CleanupBackupHistory(void);
553 static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
554 static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt);
555 static void CheckRecoveryConsistency(void);
556 static bool ValidXLOGHeader(XLogPageHeader hdr, int emode);
557 static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
558 static List *readTimeLineHistory(TimeLineID targetTLI);
559 static bool existsTimeLineHistory(TimeLineID probeTLI);
560 static TimeLineID findNewestTimeLine(TimeLineID startTLI);
561 static void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
563 uint32 endLogId, uint32 endLogSeg);
564 static void WriteControlFile(void);
565 static void ReadControlFile(void);
566 static char *str_time(pg_time_t tnow);
567 static bool CheckForStandbyTrigger(void);
570 static void xlog_outrec(StringInfo buf, XLogRecord *record);
572 static void pg_start_backup_callback(int code, Datum arg);
573 static bool read_backup_label(XLogRecPtr *checkPointLoc);
574 static void rm_redo_error_callback(void *arg);
575 static int get_sync_bit(int method);
579 * Insert an XLOG record having the specified RMID and info bytes,
580 * with the body of the record being the data chunk(s) described by
581 * the rdata chain (see xlog.h for notes about rdata).
583 * Returns XLOG pointer to end of record (beginning of next record).
584 * This can be used as LSN for data pages affected by the logged action.
585 * (LSN is the XLOG point up to which the XLOG must be flushed to disk
586 * before the data page can be written out. This implements the basic
587 * WAL rule "write the log before the data".)
589 * NB: this routine feels free to scribble on the XLogRecData structs,
590 * though not on the data they reference. This is OK since the XLogRecData
591 * structs are always just temporaries in the calling code.
594 XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
596 XLogCtlInsert *Insert = &XLogCtl->Insert;
598 XLogContRecord *contrecord;
600 XLogRecPtr WriteRqst;
604 Buffer dtbuf[XLR_MAX_BKP_BLOCKS];
605 bool dtbuf_bkp[XLR_MAX_BKP_BLOCKS];
606 BkpBlock dtbuf_xlg[XLR_MAX_BKP_BLOCKS];
607 XLogRecPtr dtbuf_lsn[XLR_MAX_BKP_BLOCKS];
608 XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS];
609 XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS];
610 XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS];
617 bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
619 /* cross-check on whether we should be here or not */
620 if (!XLogInsertAllowed())
621 elog(ERROR, "cannot make new WAL entries during recovery");
623 /* info's high bits are reserved for use by me */
624 if (info & XLR_INFO_MASK)
625 elog(PANIC, "invalid xlog info mask %02X", info);
627 TRACE_POSTGRESQL_XLOG_INSERT(rmid, info);
630 * In bootstrap mode, we don't actually log anything but XLOG resources;
631 * return a phony record pointer.
633 if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
636 RecPtr.xrecoff = SizeOfXLogLongPHD; /* start of 1st chkpt record */
641 * Here we scan the rdata chain, determine which buffers must be backed
642 * up, and compute the CRC values for the data. Note that the record
643 * header isn't added into the CRC initially since we don't know the final
644 * length or info bits quite yet. Thus, the CRC will represent the CRC of
645 * the whole record in the order "rdata, then backup blocks, then record
648 * We may have to loop back to here if a race condition is detected below.
649 * We could prevent the race by doing all this work while holding the
650 * insert lock, but it seems better to avoid doing CRC calculations while
651 * holding the lock. This means we have to be careful about modifying the
652 * rdata chain until we know we aren't going to loop back again. The only
653 * change we allow ourselves to make earlier is to set rdt->data = NULL in
654 * chain items we have decided we will have to back up the whole buffer
655 * for. This is OK because we will certainly decide the same thing again
656 * for those items if we do it over; doing it here saves an extra pass
657 * over the chain later.
660 for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
662 dtbuf[i] = InvalidBuffer;
663 dtbuf_bkp[i] = false;
667 * Decide if we need to do full-page writes in this XLOG record: true if
668 * full_page_writes is on or we have a PITR request for it. Since we
669 * don't yet have the insert lock, forcePageWrites could change under us,
670 * but we'll recheck it once we have the lock.
672 doPageWrites = fullPageWrites || Insert->forcePageWrites;
674 INIT_CRC32(rdata_crc);
678 if (rdt->buffer == InvalidBuffer)
680 /* Simple data, just include it */
682 COMP_CRC32(rdata_crc, rdt->data, rdt->len);
686 /* Find info for buffer */
687 for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
689 if (rdt->buffer == dtbuf[i])
691 /* Buffer already referenced by earlier chain item */
697 COMP_CRC32(rdata_crc, rdt->data, rdt->len);
701 if (dtbuf[i] == InvalidBuffer)
703 /* OK, put it in this slot */
704 dtbuf[i] = rdt->buffer;
705 if (XLogCheckBuffer(rdt, doPageWrites,
706 &(dtbuf_lsn[i]), &(dtbuf_xlg[i])))
714 COMP_CRC32(rdata_crc, rdt->data, rdt->len);
719 if (i >= XLR_MAX_BKP_BLOCKS)
720 elog(PANIC, "can backup at most %d blocks per xlog record",
723 /* Break out of loop when rdt points to last chain item */
724 if (rdt->next == NULL)
730 * Now add the backup block headers and data into the CRC
732 for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
736 BkpBlock *bkpb = &(dtbuf_xlg[i]);
739 COMP_CRC32(rdata_crc,
742 page = (char *) BufferGetBlock(dtbuf[i]);
743 if (bkpb->hole_length == 0)
745 COMP_CRC32(rdata_crc,
751 /* must skip the hole */
752 COMP_CRC32(rdata_crc,
755 COMP_CRC32(rdata_crc,
756 page + (bkpb->hole_offset + bkpb->hole_length),
757 BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
763 * NOTE: We disallow len == 0 because it provides a useful bit of extra
764 * error checking in ReadRecord. This means that all callers of
765 * XLogInsert must supply at least some not-in-a-buffer data. However, we
766 * make an exception for XLOG SWITCH records because we don't want them to
767 * ever cross a segment boundary.
769 if (len == 0 && !isLogSwitch)
770 elog(PANIC, "invalid xlog record length %u", len);
772 START_CRIT_SECTION();
774 /* Now wait to get insert lock */
775 LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
778 * Check to see if my RedoRecPtr is out of date. If so, may have to go
779 * back and recompute everything. This can only happen just after a
780 * checkpoint, so it's better to be slow in this case and fast otherwise.
782 * If we aren't doing full-page writes then RedoRecPtr doesn't actually
783 * affect the contents of the XLOG record, so we'll update our local copy
784 * but not force a recomputation.
786 if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))
788 Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));
789 RedoRecPtr = Insert->RedoRecPtr;
793 for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
795 if (dtbuf[i] == InvalidBuffer)
797 if (dtbuf_bkp[i] == false &&
798 XLByteLE(dtbuf_lsn[i], RedoRecPtr))
801 * Oops, this buffer now needs to be backed up, but we
802 * didn't think so above. Start over.
804 LWLockRelease(WALInsertLock);
813 * Also check to see if forcePageWrites was just turned on; if we weren't
814 * already doing full-page writes then go back and recompute. (If it was
815 * just turned off, we could recompute the record without full pages, but
816 * we choose not to bother.)
818 if (Insert->forcePageWrites && !doPageWrites)
820 /* Oops, must redo it with full-page data */
821 LWLockRelease(WALInsertLock);
827 * Make additional rdata chain entries for the backup blocks, so that we
828 * don't need to special-case them in the write loop. Note that we have
829 * now irrevocably changed the input rdata chain. At the exit of this
830 * loop, write_len includes the backup block data.
832 * Also set the appropriate info bits to show which buffers were backed
833 * up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th distinct
834 * buffer value (ignoring InvalidBuffer) appearing in the rdata chain.
837 for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
845 info |= XLR_SET_BKP_BLOCK(i);
847 bkpb = &(dtbuf_xlg[i]);
848 page = (char *) BufferGetBlock(dtbuf[i]);
850 rdt->next = &(dtbuf_rdt1[i]);
853 rdt->data = (char *) bkpb;
854 rdt->len = sizeof(BkpBlock);
855 write_len += sizeof(BkpBlock);
857 rdt->next = &(dtbuf_rdt2[i]);
860 if (bkpb->hole_length == 0)
869 /* must skip the hole */
871 rdt->len = bkpb->hole_offset;
872 write_len += bkpb->hole_offset;
874 rdt->next = &(dtbuf_rdt3[i]);
877 rdt->data = page + (bkpb->hole_offset + bkpb->hole_length);
878 rdt->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length);
879 write_len += rdt->len;
885 * If we backed up any full blocks and online backup is not in progress,
886 * mark the backup blocks as removable. This allows the WAL archiver to
887 * know whether it is safe to compress archived WAL data by transforming
888 * full-block records into the non-full-block format.
890 * Note: we could just set the flag whenever !forcePageWrites, but
891 * defining it like this leaves the info bit free for some potential other
892 * use in records without any backup blocks.
894 if ((info & XLR_BKP_BLOCK_MASK) && !Insert->forcePageWrites)
895 info |= XLR_BKP_REMOVABLE;
898 * If there isn't enough space on the current XLOG page for a record
899 * header, advance to the next page (leaving the unused space as zeroes).
902 freespace = INSERT_FREESPACE(Insert);
903 if (freespace < SizeOfXLogRecord)
905 updrqst = AdvanceXLInsertBuffer(false);
906 freespace = INSERT_FREESPACE(Insert);
909 /* Compute record's XLOG location */
910 curridx = Insert->curridx;
911 INSERT_RECPTR(RecPtr, Insert, curridx);
914 * If the record is an XLOG_SWITCH, and we are exactly at the start of a
915 * segment, we need not insert it (and don't want to because we'd like
916 * consecutive switch requests to be no-ops). Instead, make sure
917 * everything is written and flushed through the end of the prior segment,
918 * and return the prior segment's end address.
921 (RecPtr.xrecoff % XLogSegSize) == SizeOfXLogLongPHD)
923 /* We can release insert lock immediately */
924 LWLockRelease(WALInsertLock);
926 RecPtr.xrecoff -= SizeOfXLogLongPHD;
927 if (RecPtr.xrecoff == 0)
929 /* crossing a logid boundary */
931 RecPtr.xrecoff = XLogFileSize;
934 LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
935 LogwrtResult = XLogCtl->Write.LogwrtResult;
936 if (!XLByteLE(RecPtr, LogwrtResult.Flush))
938 XLogwrtRqst FlushRqst;
940 FlushRqst.Write = RecPtr;
941 FlushRqst.Flush = RecPtr;
942 XLogWrite(FlushRqst, false, false);
944 LWLockRelease(WALWriteLock);
951 /* Insert record header */
953 record = (XLogRecord *) Insert->currpos;
954 record->xl_prev = Insert->PrevRecord;
955 record->xl_xid = GetCurrentTransactionIdIfAny();
956 record->xl_tot_len = SizeOfXLogRecord + write_len;
957 record->xl_len = len; /* doesn't include backup blocks */
958 record->xl_info = info;
959 record->xl_rmid = rmid;
961 /* Now we can finish computing the record's CRC */
962 COMP_CRC32(rdata_crc, (char *) record + sizeof(pg_crc32),
963 SizeOfXLogRecord - sizeof(pg_crc32));
964 FIN_CRC32(rdata_crc);
965 record->xl_crc = rdata_crc;
972 initStringInfo(&buf);
973 appendStringInfo(&buf, "INSERT @ %X/%X: ",
974 RecPtr.xlogid, RecPtr.xrecoff);
975 xlog_outrec(&buf, record);
976 if (rdata->data != NULL)
978 appendStringInfo(&buf, " - ");
979 RmgrTable[record->xl_rmid].rm_desc(&buf, record->xl_info, rdata->data);
981 elog(LOG, "%s", buf.data);
986 /* Record begin of record in appropriate places */
987 ProcLastRecPtr = RecPtr;
988 Insert->PrevRecord = RecPtr;
990 Insert->currpos += SizeOfXLogRecord;
991 freespace -= SizeOfXLogRecord;
994 * Append the data, including backup blocks if any
998 while (rdata->data == NULL)
1003 if (rdata->len > freespace)
1005 memcpy(Insert->currpos, rdata->data, freespace);
1006 rdata->data += freespace;
1007 rdata->len -= freespace;
1008 write_len -= freespace;
1012 memcpy(Insert->currpos, rdata->data, rdata->len);
1013 freespace -= rdata->len;
1014 write_len -= rdata->len;
1015 Insert->currpos += rdata->len;
1016 rdata = rdata->next;
1021 /* Use next buffer */
1022 updrqst = AdvanceXLInsertBuffer(false);
1023 curridx = Insert->curridx;
1024 /* Insert cont-record header */
1025 Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
1026 contrecord = (XLogContRecord *) Insert->currpos;
1027 contrecord->xl_rem_len = write_len;
1028 Insert->currpos += SizeOfXLogContRecord;
1029 freespace = INSERT_FREESPACE(Insert);
1032 /* Ensure next record will be properly aligned */
1033 Insert->currpos = (char *) Insert->currpage +
1034 MAXALIGN(Insert->currpos - (char *) Insert->currpage);
1035 freespace = INSERT_FREESPACE(Insert);
1038 * The recptr I return is the beginning of the *next* record. This will be
1039 * stored as LSN for changed data pages...
1041 INSERT_RECPTR(RecPtr, Insert, curridx);
1044 * If the record is an XLOG_SWITCH, we must now write and flush all the
1045 * existing data, and then forcibly advance to the start of the next
1046 * segment. It's not good to do this I/O while holding the insert lock,
1047 * but there seems too much risk of confusion if we try to release the
1048 * lock sooner. Fortunately xlog switch needn't be a high-performance
1049 * operation anyway...
1053 XLogCtlWrite *Write = &XLogCtl->Write;
1054 XLogwrtRqst FlushRqst;
1055 XLogRecPtr OldSegEnd;
1057 TRACE_POSTGRESQL_XLOG_SWITCH();
1059 LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
1062 * Flush through the end of the page containing XLOG_SWITCH, and
1063 * perform end-of-segment actions (eg, notifying archiver).
1065 WriteRqst = XLogCtl->xlblocks[curridx];
1066 FlushRqst.Write = WriteRqst;
1067 FlushRqst.Flush = WriteRqst;
1068 XLogWrite(FlushRqst, false, true);
1070 /* Set up the next buffer as first page of next segment */
1071 /* Note: AdvanceXLInsertBuffer cannot need to do I/O here */
1072 (void) AdvanceXLInsertBuffer(true);
1074 /* There should be no unwritten data */
1075 curridx = Insert->curridx;
1076 Assert(curridx == Write->curridx);
1078 /* Compute end address of old segment */
1079 OldSegEnd = XLogCtl->xlblocks[curridx];
1080 OldSegEnd.xrecoff -= XLOG_BLCKSZ;
1081 if (OldSegEnd.xrecoff == 0)
1083 /* crossing a logid boundary */
1084 OldSegEnd.xlogid -= 1;
1085 OldSegEnd.xrecoff = XLogFileSize;
1088 /* Make it look like we've written and synced all of old segment */
1089 LogwrtResult.Write = OldSegEnd;
1090 LogwrtResult.Flush = OldSegEnd;
1093 * Update shared-memory status --- this code should match XLogWrite
1096 /* use volatile pointer to prevent code rearrangement */
1097 volatile XLogCtlData *xlogctl = XLogCtl;
1099 SpinLockAcquire(&xlogctl->info_lck);
1100 xlogctl->LogwrtResult = LogwrtResult;
1101 if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
1102 xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
1103 if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
1104 xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
1105 SpinLockRelease(&xlogctl->info_lck);
1108 Write->LogwrtResult = LogwrtResult;
1110 LWLockRelease(WALWriteLock);
1112 updrqst = false; /* done already */
1116 /* normal case, ie not xlog switch */
1118 /* Need to update shared LogwrtRqst if some block was filled up */
1119 if (freespace < SizeOfXLogRecord)
1121 /* curridx is filled and available for writing out */
1126 /* if updrqst already set, write through end of previous buf */
1127 curridx = PrevBufIdx(curridx);
1129 WriteRqst = XLogCtl->xlblocks[curridx];
1132 LWLockRelease(WALInsertLock);
1136 /* use volatile pointer to prevent code rearrangement */
1137 volatile XLogCtlData *xlogctl = XLogCtl;
1139 SpinLockAcquire(&xlogctl->info_lck);
1140 /* advance global request to include new block(s) */
1141 if (XLByteLT(xlogctl->LogwrtRqst.Write, WriteRqst))
1142 xlogctl->LogwrtRqst.Write = WriteRqst;
1143 /* update local result copy while I have the chance */
1144 LogwrtResult = xlogctl->LogwrtResult;
1145 SpinLockRelease(&xlogctl->info_lck);
1148 XactLastRecEnd = RecPtr;
1156 * Determine whether the buffer referenced by an XLogRecData item has to
1157 * be backed up, and if so fill a BkpBlock struct for it. In any case
1158 * save the buffer's LSN at *lsn.
1161 XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
1162 XLogRecPtr *lsn, BkpBlock *bkpb)
1166 page = BufferGetPage(rdata->buffer);
1169 * XXX We assume page LSN is first data on *every* page that can be passed
1170 * to XLogInsert, whether it otherwise has the standard page layout or
1173 *lsn = PageGetLSN(page);
1176 XLByteLE(PageGetLSN(page), RedoRecPtr))
1179 * The page needs to be backed up, so set up *bkpb
1181 BufferGetTag(rdata->buffer, &bkpb->node, &bkpb->fork, &bkpb->block);
1183 if (rdata->buffer_std)
1185 /* Assume we can omit data between pd_lower and pd_upper */
1186 uint16 lower = ((PageHeader) page)->pd_lower;
1187 uint16 upper = ((PageHeader) page)->pd_upper;
1189 if (lower >= SizeOfPageHeaderData &&
1193 bkpb->hole_offset = lower;
1194 bkpb->hole_length = upper - lower;
1198 /* No "hole" to compress out */
1199 bkpb->hole_offset = 0;
1200 bkpb->hole_length = 0;
1205 /* Not a standard page header, don't try to eliminate "hole" */
1206 bkpb->hole_offset = 0;
1207 bkpb->hole_length = 0;
1210 return true; /* buffer requires backup */
1213 return false; /* buffer does not need to be backed up */
1219 * Create an archive notification file
1221 * The name of the notification file is the message that will be picked up
1222 * by the archiver, e.g. we write 0000000100000001000000C6.ready
1223 * and the archiver then knows to archive XLOGDIR/0000000100000001000000C6,
1224 * then when complete, rename it to 0000000100000001000000C6.done
1227 XLogArchiveNotify(const char *xlog)
1229 char archiveStatusPath[MAXPGPATH];
1232 /* insert an otherwise empty file called <XLOG>.ready */
1233 StatusFilePath(archiveStatusPath, xlog, ".ready");
1234 fd = AllocateFile(archiveStatusPath, "w");
1238 (errcode_for_file_access(),
1239 errmsg("could not create archive status file \"%s\": %m",
1240 archiveStatusPath)));
1246 (errcode_for_file_access(),
1247 errmsg("could not write archive status file \"%s\": %m",
1248 archiveStatusPath)));
1252 /* Notify archiver that it's got something to do */
1253 if (IsUnderPostmaster)
1254 SendPostmasterSignal(PMSIGNAL_WAKEN_ARCHIVER);
1258 * Convenience routine to notify using log/seg representation of filename
1261 XLogArchiveNotifySeg(uint32 log, uint32 seg)
1263 char xlog[MAXFNAMELEN];
1265 XLogFileName(xlog, ThisTimeLineID, log, seg);
1266 XLogArchiveNotify(xlog);
1270 * XLogArchiveCheckDone
1272 * This is called when we are ready to delete or recycle an old XLOG segment
1273 * file or backup history file. If it is okay to delete it then return true.
1274 * If it is not time to delete it, make sure a .ready file exists, and return
1277 * If <XLOG>.done exists, then return true; else if <XLOG>.ready exists,
1278 * then return false; else create <XLOG>.ready and return false.
1280 * The reason we do things this way is so that if the original attempt to
1281 * create <XLOG>.ready fails, we'll retry during subsequent checkpoints.
1284 XLogArchiveCheckDone(const char *xlog)
1286 char archiveStatusPath[MAXPGPATH];
1287 struct stat stat_buf;
1289 /* Always deletable if archiving is off */
1290 if (!XLogArchivingActive())
1293 /* First check for .done --- this means archiver is done with it */
1294 StatusFilePath(archiveStatusPath, xlog, ".done");
1295 if (stat(archiveStatusPath, &stat_buf) == 0)
1298 /* check for .ready --- this means archiver is still busy with it */
1299 StatusFilePath(archiveStatusPath, xlog, ".ready");
1300 if (stat(archiveStatusPath, &stat_buf) == 0)
1303 /* Race condition --- maybe archiver just finished, so recheck */
1304 StatusFilePath(archiveStatusPath, xlog, ".done");
1305 if (stat(archiveStatusPath, &stat_buf) == 0)
1308 /* Retry creation of the .ready file */
1309 XLogArchiveNotify(xlog);
1316 * Check to see if an XLOG segment file is still unarchived.
1317 * This is almost but not quite the inverse of XLogArchiveCheckDone: in
1318 * the first place we aren't chartered to recreate the .ready file, and
1319 * in the second place we should consider that if the file is already gone
1320 * then it's not busy. (This check is needed to handle the race condition
1321 * that a checkpoint already deleted the no-longer-needed file.)
1324 XLogArchiveIsBusy(const char *xlog)
1326 char archiveStatusPath[MAXPGPATH];
1327 struct stat stat_buf;
1329 /* First check for .done --- this means archiver is done with it */
1330 StatusFilePath(archiveStatusPath, xlog, ".done");
1331 if (stat(archiveStatusPath, &stat_buf) == 0)
1334 /* check for .ready --- this means archiver is still busy with it */
1335 StatusFilePath(archiveStatusPath, xlog, ".ready");
1336 if (stat(archiveStatusPath, &stat_buf) == 0)
1339 /* Race condition --- maybe archiver just finished, so recheck */
1340 StatusFilePath(archiveStatusPath, xlog, ".done");
1341 if (stat(archiveStatusPath, &stat_buf) == 0)
1345 * Check to see if the WAL file has been removed by checkpoint, which
1346 * implies it has already been archived, and explains why we can't see a
1347 * status file for it.
1349 snprintf(archiveStatusPath, MAXPGPATH, XLOGDIR "/%s", xlog);
1350 if (stat(archiveStatusPath, &stat_buf) != 0 &&
1358 * XLogArchiveCleanup
1360 * Cleanup archive notification file(s) for a particular xlog segment
1363 XLogArchiveCleanup(const char *xlog)
1365 char archiveStatusPath[MAXPGPATH];
1367 /* Remove the .done file */
1368 StatusFilePath(archiveStatusPath, xlog, ".done");
1369 unlink(archiveStatusPath);
1370 /* should we complain about failure? */
1372 /* Remove the .ready file if present --- normally it shouldn't be */
1373 StatusFilePath(archiveStatusPath, xlog, ".ready");
1374 unlink(archiveStatusPath);
1375 /* should we complain about failure? */
1379 * Advance the Insert state to the next buffer page, writing out the next
1380 * buffer if it still contains unwritten data.
1382 * If new_segment is TRUE then we set up the next buffer page as the first
1383 * page of the next xlog segment file, possibly but not usually the next
1384 * consecutive file page.
1386 * The global LogwrtRqst.Write pointer needs to be advanced to include the
1387 * just-filled page. If we can do this for free (without an extra lock),
1388 * we do so here. Otherwise the caller must do it. We return TRUE if the
1389 * request update still needs to be done, FALSE if we did it internally.
1391 * Must be called with WALInsertLock held.
1394 AdvanceXLInsertBuffer(bool new_segment)
1396 XLogCtlInsert *Insert = &XLogCtl->Insert;
1397 XLogCtlWrite *Write = &XLogCtl->Write;
1398 int nextidx = NextBufIdx(Insert->curridx);
1399 bool update_needed = true;
1400 XLogRecPtr OldPageRqstPtr;
1401 XLogwrtRqst WriteRqst;
1402 XLogRecPtr NewPageEndPtr;
1403 XLogPageHeader NewPage;
1405 /* Use Insert->LogwrtResult copy if it's more fresh */
1406 if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write))
1407 LogwrtResult = Insert->LogwrtResult;
1410 * Get ending-offset of the buffer page we need to replace (this may be
1411 * zero if the buffer hasn't been used yet). Fall through if it's already
1414 OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
1415 if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
1417 /* nope, got work to do... */
1418 XLogRecPtr FinishedPageRqstPtr;
1420 FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1422 /* Before waiting, get info_lck and update LogwrtResult */
1424 /* use volatile pointer to prevent code rearrangement */
1425 volatile XLogCtlData *xlogctl = XLogCtl;
1427 SpinLockAcquire(&xlogctl->info_lck);
1428 if (XLByteLT(xlogctl->LogwrtRqst.Write, FinishedPageRqstPtr))
1429 xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr;
1430 LogwrtResult = xlogctl->LogwrtResult;
1431 SpinLockRelease(&xlogctl->info_lck);
1434 update_needed = false; /* Did the shared-request update */
1436 if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
1438 /* OK, someone wrote it already */
1439 Insert->LogwrtResult = LogwrtResult;
1443 /* Must acquire write lock */
1444 LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
1445 LogwrtResult = Write->LogwrtResult;
1446 if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
1448 /* OK, someone wrote it already */
1449 LWLockRelease(WALWriteLock);
1450 Insert->LogwrtResult = LogwrtResult;
1455 * Have to write buffers while holding insert lock. This is
1456 * not good, so only write as much as we absolutely must.
1458 TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
1459 WriteRqst.Write = OldPageRqstPtr;
1460 WriteRqst.Flush.xlogid = 0;
1461 WriteRqst.Flush.xrecoff = 0;
1462 XLogWrite(WriteRqst, false, false);
1463 LWLockRelease(WALWriteLock);
1464 Insert->LogwrtResult = LogwrtResult;
1465 TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
1471 * Now the next buffer slot is free and we can set it up to be the next
1474 NewPageEndPtr = XLogCtl->xlblocks[Insert->curridx];
1478 /* force it to a segment start point */
1479 NewPageEndPtr.xrecoff += XLogSegSize - 1;
1480 NewPageEndPtr.xrecoff -= NewPageEndPtr.xrecoff % XLogSegSize;
1483 if (NewPageEndPtr.xrecoff >= XLogFileSize)
1485 /* crossing a logid boundary */
1486 NewPageEndPtr.xlogid += 1;
1487 NewPageEndPtr.xrecoff = XLOG_BLCKSZ;
1490 NewPageEndPtr.xrecoff += XLOG_BLCKSZ;
1491 XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
1492 NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
1494 Insert->curridx = nextidx;
1495 Insert->currpage = NewPage;
1497 Insert->currpos = ((char *) NewPage) +SizeOfXLogShortPHD;
1500 * Be sure to re-zero the buffer so that bytes beyond what we've written
1501 * will look like zeroes and not valid XLOG records...
1503 MemSet((char *) NewPage, 0, XLOG_BLCKSZ);
1506 * Fill the new page's header
1508 NewPage ->xlp_magic = XLOG_PAGE_MAGIC;
1510 /* NewPage->xlp_info = 0; */ /* done by memset */
1511 NewPage ->xlp_tli = ThisTimeLineID;
1512 NewPage ->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid;
1513 NewPage ->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - XLOG_BLCKSZ;
1516 * If first page of an XLOG segment file, make it a long header.
1518 if ((NewPage->xlp_pageaddr.xrecoff % XLogSegSize) == 0)
1520 XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;
1522 NewLongPage->xlp_sysid = ControlFile->system_identifier;
1523 NewLongPage->xlp_seg_size = XLogSegSize;
1524 NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
1525 NewPage ->xlp_info |= XLP_LONG_HEADER;
1527 Insert->currpos = ((char *) NewPage) +SizeOfXLogLongPHD;
1530 return update_needed;
1534 * Check whether we've consumed enough xlog space that a checkpoint is needed.
1536 * Caller must have just finished filling the open log file (so that
1537 * openLogId/openLogSeg are valid). We measure the distance from RedoRecPtr
1538 * to the open log file and see if that exceeds CheckPointSegments.
1540 * Note: it is caller's responsibility that RedoRecPtr is up-to-date.
1543 XLogCheckpointNeeded(void)
1546 * A straight computation of segment number could overflow 32 bits. Rather
1547 * than assuming we have working 64-bit arithmetic, we compare the
1548 * highest-order bits separately, and force a checkpoint immediately when
1553 uint32 old_highbits,
1556 old_segno = (RedoRecPtr.xlogid % XLogSegSize) * XLogSegsPerFile +
1557 (RedoRecPtr.xrecoff / XLogSegSize);
1558 old_highbits = RedoRecPtr.xlogid / XLogSegSize;
1559 new_segno = (openLogId % XLogSegSize) * XLogSegsPerFile + openLogSeg;
1560 new_highbits = openLogId / XLogSegSize;
1561 if (new_highbits != old_highbits ||
1562 new_segno >= old_segno + (uint32) (CheckPointSegments - 1))
1568 * Write and/or fsync the log at least as far as WriteRqst indicates.
1570 * If flexible == TRUE, we don't have to write as far as WriteRqst, but
1571 * may stop at any convenient boundary (such as a cache or logfile boundary).
1572 * This option allows us to avoid uselessly issuing multiple writes when a
1573 * single one would do.
1575 * If xlog_switch == TRUE, we are intending an xlog segment switch, so
1576 * perform end-of-segment actions after writing the last page, even if
1577 * it's not physically the end of its segment. (NB: this will work properly
1578 * only if caller specifies WriteRqst == page-end and flexible == false,
1579 * and there is some data to write.)
1581 * Must be called with WALWriteLock held.
1584 XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
1586 XLogCtlWrite *Write = &XLogCtl->Write;
1588 bool last_iteration;
1596 /* We should always be inside a critical section here */
1597 Assert(CritSectionCount > 0);
1600 * Update local LogwrtResult (caller probably did this already, but...)
1602 LogwrtResult = Write->LogwrtResult;
1605 * Since successive pages in the xlog cache are consecutively allocated,
1606 * we can usually gather multiple pages together and issue just one
1607 * write() call. npages is the number of pages we have determined can be
1608 * written together; startidx is the cache block index of the first one,
1609 * and startoffset is the file offset at which it should go. The latter
1610 * two variables are only valid when npages > 0, but we must initialize
1611 * all of them to keep the compiler quiet.
1618 * Within the loop, curridx is the cache block index of the page to
1619 * consider writing. We advance Write->curridx only after successfully
1620 * writing pages. (Right now, this refinement is useless since we are
1621 * going to PANIC if any error occurs anyway; but someday it may come in
1624 curridx = Write->curridx;
1626 while (XLByteLT(LogwrtResult.Write, WriteRqst.Write))
1629 * Make sure we're not ahead of the insert process. This could happen
1630 * if we're passed a bogus WriteRqst.Write that is past the end of the
1631 * last page that's been initialized by AdvanceXLInsertBuffer.
1633 if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[curridx]))
1634 elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
1635 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
1636 XLogCtl->xlblocks[curridx].xlogid,
1637 XLogCtl->xlblocks[curridx].xrecoff);
1639 /* Advance LogwrtResult.Write to end of current buffer page */
1640 LogwrtResult.Write = XLogCtl->xlblocks[curridx];
1641 ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);
1643 if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
1646 * Switch to new logfile segment. We cannot have any pending
1647 * pages here (since we dump what we have at segment end).
1649 Assert(npages == 0);
1650 if (openLogFile >= 0)
1652 XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1654 /* create/use new log file */
1655 use_existent = true;
1656 openLogFile = XLogFileInit(openLogId, openLogSeg,
1657 &use_existent, true);
1661 /* Make sure we have the current logfile open */
1662 if (openLogFile < 0)
1664 XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1665 openLogFile = XLogFileOpen(openLogId, openLogSeg);
1669 /* Add current page to the set of pending pages-to-dump */
1672 /* first of group */
1674 startoffset = (LogwrtResult.Write.xrecoff - XLOG_BLCKSZ) % XLogSegSize;
1679 * Dump the set if this will be the last loop iteration, or if we are
1680 * at the last page of the cache area (since the next page won't be
1681 * contiguous in memory), or if we are at the end of the logfile
1684 last_iteration = !XLByteLT(LogwrtResult.Write, WriteRqst.Write);
1686 finishing_seg = !ispartialpage &&
1687 (startoffset + npages * XLOG_BLCKSZ) >= XLogSegSize;
1689 if (last_iteration ||
1690 curridx == XLogCtl->XLogCacheBlck ||
1696 /* Need to seek in the file? */
1697 if (openLogOff != startoffset)
1699 if (lseek(openLogFile, (off_t) startoffset, SEEK_SET) < 0)
1701 (errcode_for_file_access(),
1702 errmsg("could not seek in log file %u, "
1703 "segment %u to offset %u: %m",
1704 openLogId, openLogSeg, startoffset)));
1705 openLogOff = startoffset;
1708 /* OK to write the page(s) */
1709 from = XLogCtl->pages + startidx * (Size) XLOG_BLCKSZ;
1710 nbytes = npages * (Size) XLOG_BLCKSZ;
1712 if (write(openLogFile, from, nbytes) != nbytes)
1714 /* if write didn't set errno, assume no disk space */
1718 (errcode_for_file_access(),
1719 errmsg("could not write to log file %u, segment %u "
1720 "at offset %u, length %lu: %m",
1721 openLogId, openLogSeg,
1722 openLogOff, (unsigned long) nbytes)));
1725 /* Update state for write */
1726 openLogOff += nbytes;
1727 Write->curridx = ispartialpage ? curridx : NextBufIdx(curridx);
1731 * If we just wrote the whole last page of a logfile segment,
1732 * fsync the segment immediately. This avoids having to go back
1733 * and re-open prior segments when an fsync request comes along
1734 * later. Doing it here ensures that one and only one backend will
1735 * perform this fsync.
1737 * We also do this if this is the last page written for an xlog
1740 * This is also the right place to notify the Archiver that the
1741 * segment is ready to copy to archival storage, and to update the
1742 * timer for archive_timeout, and to signal for a checkpoint if
1743 * too many logfile segments have been used since the last
1746 if (finishing_seg || (xlog_switch && last_iteration))
1748 issue_xlog_fsync(openLogFile, openLogId, openLogSeg);
1749 LogwrtResult.Flush = LogwrtResult.Write; /* end of page */
1751 if (XLogArchivingActive())
1752 XLogArchiveNotifySeg(openLogId, openLogSeg);
1754 Write->lastSegSwitchTime = (pg_time_t) time(NULL);
1757 * Signal bgwriter to start a checkpoint if we've consumed too
1758 * much xlog since the last one. For speed, we first check
1759 * using the local copy of RedoRecPtr, which might be out of
1760 * date; if it looks like a checkpoint is needed, forcibly
1761 * update RedoRecPtr and recheck.
1763 if (IsUnderPostmaster &&
1764 XLogCheckpointNeeded())
1766 (void) GetRedoRecPtr();
1767 if (XLogCheckpointNeeded())
1768 RequestCheckpoint(CHECKPOINT_CAUSE_XLOG);
1775 /* Only asked to write a partial page */
1776 LogwrtResult.Write = WriteRqst.Write;
1779 curridx = NextBufIdx(curridx);
1781 /* If flexible, break out of loop as soon as we wrote something */
1782 if (flexible && npages == 0)
1786 Assert(npages == 0);
1787 Assert(curridx == Write->curridx);
1790 * If asked to flush, do so
1792 if (XLByteLT(LogwrtResult.Flush, WriteRqst.Flush) &&
1793 XLByteLT(LogwrtResult.Flush, LogwrtResult.Write))
1796 * Could get here without iterating above loop, in which case we might
1797 * have no open file or the wrong one. However, we do not need to
1798 * fsync more than one file.
1800 if (sync_method != SYNC_METHOD_OPEN &&
1801 sync_method != SYNC_METHOD_OPEN_DSYNC)
1803 if (openLogFile >= 0 &&
1804 !XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
1806 if (openLogFile < 0)
1808 XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1809 openLogFile = XLogFileOpen(openLogId, openLogSeg);
1812 issue_xlog_fsync(openLogFile, openLogId, openLogSeg);
1814 LogwrtResult.Flush = LogwrtResult.Write;
1818 * Update shared-memory status
1820 * We make sure that the shared 'request' values do not fall behind the
1821 * 'result' values. This is not absolutely essential, but it saves some
1822 * code in a couple of places.
1825 /* use volatile pointer to prevent code rearrangement */
1826 volatile XLogCtlData *xlogctl = XLogCtl;
1828 SpinLockAcquire(&xlogctl->info_lck);
1829 xlogctl->LogwrtResult = LogwrtResult;
1830 if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
1831 xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
1832 if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
1833 xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
1834 SpinLockRelease(&xlogctl->info_lck);
1837 Write->LogwrtResult = LogwrtResult;
1841 * Record the LSN for an asynchronous transaction commit.
1842 * (This should not be called for aborts, nor for synchronous commits.)
1845 XLogSetAsyncCommitLSN(XLogRecPtr asyncCommitLSN)
1847 /* use volatile pointer to prevent code rearrangement */
1848 volatile XLogCtlData *xlogctl = XLogCtl;
1850 SpinLockAcquire(&xlogctl->info_lck);
1851 if (XLByteLT(xlogctl->asyncCommitLSN, asyncCommitLSN))
1852 xlogctl->asyncCommitLSN = asyncCommitLSN;
1853 SpinLockRelease(&xlogctl->info_lck);
1857 * Advance minRecoveryPoint in control file.
1859 * If we crash during recovery, we must reach this point again before the
1860 * database is consistent.
1862 * If 'force' is true, 'lsn' argument is ignored. Otherwise, minRecoveryPoint
1863 * is only updated if it's not already greater than or equal to 'lsn'.
1866 UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force)
1868 /* Quick check using our local copy of the variable */
1869 if (!updateMinRecoveryPoint || (!force && XLByteLE(lsn, minRecoveryPoint)))
1872 LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
1874 /* update local copy */
1875 minRecoveryPoint = ControlFile->minRecoveryPoint;
1878 * An invalid minRecoveryPoint means that we need to recover all the WAL,
1879 * i.e., we're doing crash recovery. We never modify the control file's
1880 * value in that case, so we can short-circuit future checks here too.
1882 if (minRecoveryPoint.xlogid == 0 && minRecoveryPoint.xrecoff == 0)
1883 updateMinRecoveryPoint = false;
1884 else if (force || XLByteLT(minRecoveryPoint, lsn))
1886 /* use volatile pointer to prevent code rearrangement */
1887 volatile XLogCtlData *xlogctl = XLogCtl;
1888 XLogRecPtr newMinRecoveryPoint;
1891 * To avoid having to update the control file too often, we update it
1892 * all the way to the last record being replayed, even though 'lsn'
1893 * would suffice for correctness. This also allows the 'force' case
1894 * to not need a valid 'lsn' value.
1896 * Another important reason for doing it this way is that the passed
1897 * 'lsn' value could be bogus, i.e., past the end of available WAL, if
1898 * the caller got it from a corrupted heap page. Accepting such a
1899 * value as the min recovery point would prevent us from coming up at
1900 * all. Instead, we just log a warning and continue with recovery.
1901 * (See also the comments about corrupt LSNs in XLogFlush.)
1903 SpinLockAcquire(&xlogctl->info_lck);
1904 newMinRecoveryPoint = xlogctl->replayEndRecPtr;
1905 SpinLockRelease(&xlogctl->info_lck);
1907 if (!force && XLByteLT(newMinRecoveryPoint, lsn))
1909 "xlog min recovery request %X/%X is past current point %X/%X",
1910 lsn.xlogid, lsn.xrecoff,
1911 newMinRecoveryPoint.xlogid, newMinRecoveryPoint.xrecoff);
1913 /* update control file */
1914 if (XLByteLT(ControlFile->minRecoveryPoint, newMinRecoveryPoint))
1916 ControlFile->minRecoveryPoint = newMinRecoveryPoint;
1917 UpdateControlFile();
1918 minRecoveryPoint = newMinRecoveryPoint;
1921 (errmsg("updated min recovery point to %X/%X",
1922 minRecoveryPoint.xlogid, minRecoveryPoint.xrecoff)));
1925 LWLockRelease(ControlFileLock);
1929 * Ensure that all XLOG data through the given position is flushed to disk.
1931 * NOTE: this differs from XLogWrite mainly in that the WALWriteLock is not
1932 * already held, and we try to avoid acquiring it if possible.
1935 XLogFlush(XLogRecPtr record)
1937 XLogRecPtr WriteRqstPtr;
1938 XLogwrtRqst WriteRqst;
1941 * During REDO, we are reading not writing WAL. Therefore, instead of
1942 * trying to flush the WAL, we should update minRecoveryPoint instead. We
1943 * test XLogInsertAllowed(), not InRecovery, because we need the bgwriter
1944 * to act this way too, and because when the bgwriter tries to write the
1945 * end-of-recovery checkpoint, it should indeed flush.
1947 if (!XLogInsertAllowed())
1949 UpdateMinRecoveryPoint(record, false);
1953 /* Quick exit if already known flushed */
1954 if (XLByteLE(record, LogwrtResult.Flush))
1959 elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X",
1960 record.xlogid, record.xrecoff,
1961 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
1962 LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
1965 START_CRIT_SECTION();
1968 * Since fsync is usually a horribly expensive operation, we try to
1969 * piggyback as much data as we can on each fsync: if we see any more data
1970 * entered into the xlog buffer, we'll write and fsync that too, so that
1971 * the final value of LogwrtResult.Flush is as large as possible. This
1972 * gives us some chance of avoiding another fsync immediately after.
1975 /* initialize to given target; may increase below */
1976 WriteRqstPtr = record;
1978 /* read LogwrtResult and update local state */
1980 /* use volatile pointer to prevent code rearrangement */
1981 volatile XLogCtlData *xlogctl = XLogCtl;
1983 SpinLockAcquire(&xlogctl->info_lck);
1984 if (XLByteLT(WriteRqstPtr, xlogctl->LogwrtRqst.Write))
1985 WriteRqstPtr = xlogctl->LogwrtRqst.Write;
1986 LogwrtResult = xlogctl->LogwrtResult;
1987 SpinLockRelease(&xlogctl->info_lck);
1991 if (!XLByteLE(record, LogwrtResult.Flush))
1993 /* now wait for the write lock */
1994 LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
1995 LogwrtResult = XLogCtl->Write.LogwrtResult;
1996 if (!XLByteLE(record, LogwrtResult.Flush))
1998 /* try to write/flush later additions to XLOG as well */
1999 if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE))
2001 XLogCtlInsert *Insert = &XLogCtl->Insert;
2002 uint32 freespace = INSERT_FREESPACE(Insert);
2004 if (freespace < SizeOfXLogRecord) /* buffer is full */
2005 WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
2008 WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
2009 WriteRqstPtr.xrecoff -= freespace;
2011 LWLockRelease(WALInsertLock);
2012 WriteRqst.Write = WriteRqstPtr;
2013 WriteRqst.Flush = WriteRqstPtr;
2017 WriteRqst.Write = WriteRqstPtr;
2018 WriteRqst.Flush = record;
2020 XLogWrite(WriteRqst, false, false);
2022 LWLockRelease(WALWriteLock);
2028 * If we still haven't flushed to the request point then we have a
2029 * problem; most likely, the requested flush point is past end of XLOG.
2030 * This has been seen to occur when a disk page has a corrupted LSN.
2032 * Formerly we treated this as a PANIC condition, but that hurts the
2033 * system's robustness rather than helping it: we do not want to take down
2034 * the whole system due to corruption on one data page. In particular, if
2035 * the bad page is encountered again during recovery then we would be
2036 * unable to restart the database at all! (This scenario actually
2037 * happened in the field several times with 7.1 releases.) As of 8.4, bad
2038 * LSNs encountered during recovery are UpdateMinRecoveryPoint's problem;
2039 * the only time we can reach here during recovery is while flushing the
2040 * end-of-recovery checkpoint record, and we don't expect that to have a
2043 * Note that for calls from xact.c, the ERROR will be promoted to PANIC
2044 * since xact.c calls this routine inside a critical section. However,
2045 * calls from bufmgr.c are not within critical sections and so we will not
2046 * force a restart for a bad LSN on a data page.
2048 if (XLByteLT(LogwrtResult.Flush, record))
2050 "xlog flush request %X/%X is not satisfied --- flushed only to %X/%X",
2051 record.xlogid, record.xrecoff,
2052 LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
2056 * Flush xlog, but without specifying exactly where to flush to.
2058 * We normally flush only completed blocks; but if there is nothing to do on
2059 * that basis, we check for unflushed async commits in the current incomplete
2060 * block, and flush through the latest one of those. Thus, if async commits
2061 * are not being used, we will flush complete blocks only. We can guarantee
2062 * that async commits reach disk after at most three cycles; normally only
2063 * one or two. (We allow XLogWrite to write "flexibly", meaning it can stop
2064 * at the end of the buffer ring; this makes a difference only with very high
2065 * load or long wal_writer_delay, but imposes one extra cycle for the worst
2066 * case for async commits.)
2068 * This routine is invoked periodically by the background walwriter process.
2071 XLogBackgroundFlush(void)
2073 XLogRecPtr WriteRqstPtr;
2074 bool flexible = true;
2076 /* XLOG doesn't need flushing during recovery */
2077 if (RecoveryInProgress())
2080 /* read LogwrtResult and update local state */
2082 /* use volatile pointer to prevent code rearrangement */
2083 volatile XLogCtlData *xlogctl = XLogCtl;
2085 SpinLockAcquire(&xlogctl->info_lck);
2086 LogwrtResult = xlogctl->LogwrtResult;
2087 WriteRqstPtr = xlogctl->LogwrtRqst.Write;
2088 SpinLockRelease(&xlogctl->info_lck);
2091 /* back off to last completed page boundary */
2092 WriteRqstPtr.xrecoff -= WriteRqstPtr.xrecoff % XLOG_BLCKSZ;
2094 /* if we have already flushed that far, consider async commit records */
2095 if (XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
2097 /* use volatile pointer to prevent code rearrangement */
2098 volatile XLogCtlData *xlogctl = XLogCtl;
2100 SpinLockAcquire(&xlogctl->info_lck);
2101 WriteRqstPtr = xlogctl->asyncCommitLSN;
2102 SpinLockRelease(&xlogctl->info_lck);
2103 flexible = false; /* ensure it all gets written */
2106 /* Done if already known flushed */
2107 if (XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
2112 elog(LOG, "xlog bg flush request %X/%X; write %X/%X; flush %X/%X",
2113 WriteRqstPtr.xlogid, WriteRqstPtr.xrecoff,
2114 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
2115 LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
2118 START_CRIT_SECTION();
2120 /* now wait for the write lock */
2121 LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
2122 LogwrtResult = XLogCtl->Write.LogwrtResult;
2123 if (!XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
2125 XLogwrtRqst WriteRqst;
2127 WriteRqst.Write = WriteRqstPtr;
2128 WriteRqst.Flush = WriteRqstPtr;
2129 XLogWrite(WriteRqst, flexible, false);
2131 LWLockRelease(WALWriteLock);
2137 * Test whether XLOG data has been flushed up to (at least) the given position.
2139 * Returns true if a flush is still needed. (It may be that someone else
2140 * is already in process of flushing that far, however.)
2143 XLogNeedsFlush(XLogRecPtr record)
2146 * During recovery, we don't flush WAL but update minRecoveryPoint
2147 * instead. So "needs flush" is taken to mean whether minRecoveryPoint
2148 * would need to be updated.
2150 if (RecoveryInProgress())
2152 /* Quick exit if already known updated */
2153 if (XLByteLE(record, minRecoveryPoint) || !updateMinRecoveryPoint)
2157 * Update local copy of minRecoveryPoint. But if the lock is busy,
2158 * just return a conservative guess.
2160 if (!LWLockConditionalAcquire(ControlFileLock, LW_SHARED))
2162 minRecoveryPoint = ControlFile->minRecoveryPoint;
2163 LWLockRelease(ControlFileLock);
2166 * An invalid minRecoveryPoint means that we need to recover all the
2167 * WAL, i.e., we're doing crash recovery. We never modify the control
2168 * file's value in that case, so we can short-circuit future checks
2171 if (minRecoveryPoint.xlogid == 0 && minRecoveryPoint.xrecoff == 0)
2172 updateMinRecoveryPoint = false;
2175 if (XLByteLE(record, minRecoveryPoint) || !updateMinRecoveryPoint)
2181 /* Quick exit if already known flushed */
2182 if (XLByteLE(record, LogwrtResult.Flush))
2185 /* read LogwrtResult and update local state */
2187 /* use volatile pointer to prevent code rearrangement */
2188 volatile XLogCtlData *xlogctl = XLogCtl;
2190 SpinLockAcquire(&xlogctl->info_lck);
2191 LogwrtResult = xlogctl->LogwrtResult;
2192 SpinLockRelease(&xlogctl->info_lck);
2196 if (XLByteLE(record, LogwrtResult.Flush))
2203 * Create a new XLOG file segment, or open a pre-existing one.
2205 * log, seg: identify segment to be created/opened.
2207 * *use_existent: if TRUE, OK to use a pre-existing file (else, any
2208 * pre-existing file will be deleted). On return, TRUE if a pre-existing
2211 * use_lock: if TRUE, acquire ControlFileLock while moving file into
2212 * place. This should be TRUE except during bootstrap log creation. The
2213 * caller must *not* hold the lock at call.
2215 * Returns FD of opened file.
2217 * Note: errors here are ERROR not PANIC because we might or might not be
2218 * inside a critical section (eg, during checkpoint there is no reason to
2219 * take down the system on failure). They will promote to PANIC if we are
2220 * in a critical section.
2223 XLogFileInit(uint32 log, uint32 seg,
2224 bool *use_existent, bool use_lock)
2226 char path[MAXPGPATH];
2227 char tmppath[MAXPGPATH];
2229 uint32 installed_log;
2230 uint32 installed_seg;
2235 XLogFilePath(path, ThisTimeLineID, log, seg);
2238 * Try to use existent file (checkpoint maker may have created it already)
2242 fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method),
2246 if (errno != ENOENT)
2248 (errcode_for_file_access(),
2249 errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2257 * Initialize an empty (all zeroes) segment. NOTE: it is possible that
2258 * another process is doing the same thing. If so, we will end up
2259 * pre-creating an extra log segment. That seems OK, and better than
2260 * holding the lock throughout this lengthy process.
2262 elog(DEBUG2, "creating and filling new WAL file");
2264 snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
2268 /* do not use get_sync_bit() here --- want to fsync only at end of fill */
2269 fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
2273 (errcode_for_file_access(),
2274 errmsg("could not create file \"%s\": %m", tmppath)));
2277 * Zero-fill the file. We have to do this the hard way to ensure that all
2278 * the file space has really been allocated --- on platforms that allow
2279 * "holes" in files, just seeking to the end doesn't allocate intermediate
2280 * space. This way, we know that we have all the space and (after the
2281 * fsync below) that all the indirect blocks are down on disk. Therefore,
2282 * fdatasync(2) or O_DSYNC will be sufficient to sync future writes to the
2285 * Note: palloc zbuffer, instead of just using a local char array, to
2286 * ensure it is reasonably well-aligned; this may save a few cycles
2287 * transferring data to the kernel.
2289 zbuffer = (char *) palloc0(XLOG_BLCKSZ);
2290 for (nbytes = 0; nbytes < XLogSegSize; nbytes += XLOG_BLCKSZ)
2293 if ((int) write(fd, zbuffer, XLOG_BLCKSZ) != (int) XLOG_BLCKSZ)
2295 int save_errno = errno;
2298 * If we fail to make the file, delete it to release disk space
2301 /* if write didn't set errno, assume problem is no disk space */
2302 errno = save_errno ? save_errno : ENOSPC;
2305 (errcode_for_file_access(),
2306 errmsg("could not write to file \"%s\": %m", tmppath)));
2311 if (pg_fsync(fd) != 0)
2313 (errcode_for_file_access(),
2314 errmsg("could not fsync file \"%s\": %m", tmppath)));
2318 (errcode_for_file_access(),
2319 errmsg("could not close file \"%s\": %m", tmppath)));
2322 * Now move the segment into place with its final name.
2324 * If caller didn't want to use a pre-existing file, get rid of any
2325 * pre-existing file. Otherwise, cope with possibility that someone else
2326 * has created the file while we were filling ours: if so, use ours to
2327 * pre-create a future log segment.
2329 installed_log = log;
2330 installed_seg = seg;
2331 max_advance = XLOGfileslop;
2332 if (!InstallXLogFileSegment(&installed_log, &installed_seg, tmppath,
2333 *use_existent, &max_advance,
2337 * No need for any more future segments, or InstallXLogFileSegment()
2338 * failed to rename the file into place. If the rename failed, opening
2339 * the file below will fail.
2344 /* Set flag to tell caller there was no existent file */
2345 *use_existent = false;
2347 /* Now open original target segment (might not be file I just made) */
2348 fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method),
2352 (errcode_for_file_access(),
2353 errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2356 elog(DEBUG2, "done creating and filling new WAL file");
2362 * Create a new XLOG file segment by copying a pre-existing one.
2364 * log, seg: identify segment to be created.
2366 * srcTLI, srclog, srcseg: identify segment to be copied (could be from
2367 * a different timeline)
2369 * Currently this is only used during recovery, and so there are no locking
2370 * considerations. But we should be just as tense as XLogFileInit to avoid
2371 * emplacing a bogus file.
2374 XLogFileCopy(uint32 log, uint32 seg,
2375 TimeLineID srcTLI, uint32 srclog, uint32 srcseg)
2377 char path[MAXPGPATH];
2378 char tmppath[MAXPGPATH];
2379 char buffer[XLOG_BLCKSZ];
2385 * Open the source file
2387 XLogFilePath(path, srcTLI, srclog, srcseg);
2388 srcfd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
2391 (errcode_for_file_access(),
2392 errmsg("could not open file \"%s\": %m", path)));
2395 * Copy into a temp file name.
2397 snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
2401 /* do not use get_sync_bit() here --- want to fsync only at end of fill */
2402 fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
2406 (errcode_for_file_access(),
2407 errmsg("could not create file \"%s\": %m", tmppath)));
2410 * Do the data copying.
2412 for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(buffer))
2415 if ((int) read(srcfd, buffer, sizeof(buffer)) != (int) sizeof(buffer))
2419 (errcode_for_file_access(),
2420 errmsg("could not read file \"%s\": %m", path)));
2423 (errmsg("not enough data in file \"%s\"", path)));
2426 if ((int) write(fd, buffer, sizeof(buffer)) != (int) sizeof(buffer))
2428 int save_errno = errno;
2431 * If we fail to make the file, delete it to release disk space
2434 /* if write didn't set errno, assume problem is no disk space */
2435 errno = save_errno ? save_errno : ENOSPC;
2438 (errcode_for_file_access(),
2439 errmsg("could not write to file \"%s\": %m", tmppath)));
2443 if (pg_fsync(fd) != 0)
2445 (errcode_for_file_access(),
2446 errmsg("could not fsync file \"%s\": %m", tmppath)));
2450 (errcode_for_file_access(),
2451 errmsg("could not close file \"%s\": %m", tmppath)));
2456 * Now move the segment into place with its final name.
2458 if (!InstallXLogFileSegment(&log, &seg, tmppath, false, NULL, false))
2459 elog(ERROR, "InstallXLogFileSegment should not have failed");
2463 * Install a new XLOG segment file as a current or future log segment.
2465 * This is used both to install a newly-created segment (which has a temp
2466 * filename while it's being created) and to recycle an old segment.
2468 * *log, *seg: identify segment to install as (or first possible target).
2469 * When find_free is TRUE, these are modified on return to indicate the
2470 * actual installation location or last segment searched.
2472 * tmppath: initial name of file to install. It will be renamed into place.
2474 * find_free: if TRUE, install the new segment at the first empty log/seg
2475 * number at or after the passed numbers. If FALSE, install the new segment
2476 * exactly where specified, deleting any existing segment file there.
2478 * *max_advance: maximum number of log/seg slots to advance past the starting
2479 * point. Fail if no free slot is found in this range. On return, reduced
2480 * by the number of slots skipped over. (Irrelevant, and may be NULL,
2481 * when find_free is FALSE.)
2483 * use_lock: if TRUE, acquire ControlFileLock while moving file into
2484 * place. This should be TRUE except during bootstrap log creation. The
2485 * caller must *not* hold the lock at call.
2487 * Returns TRUE if the file was installed successfully. FALSE indicates that
2488 * max_advance limit was exceeded, or an error occurred while renaming the
2492 InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
2493 bool find_free, int *max_advance,
2496 char path[MAXPGPATH];
2497 struct stat stat_buf;
2499 XLogFilePath(path, ThisTimeLineID, *log, *seg);
2502 * We want to be sure that only one process does this at a time.
2505 LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
2509 /* Force installation: get rid of any pre-existing segment file */
2514 /* Find a free slot to put it in */
2515 while (stat(path, &stat_buf) == 0)
2517 if (*max_advance <= 0)
2519 /* Failed to find a free slot within specified range */
2521 LWLockRelease(ControlFileLock);
2524 NextLogSeg(*log, *seg);
2526 XLogFilePath(path, ThisTimeLineID, *log, *seg);
2531 * Prefer link() to rename() here just to be really sure that we don't
2532 * overwrite an existing logfile. However, there shouldn't be one, so
2533 * rename() is an acceptable substitute except for the truly paranoid.
2535 #if HAVE_WORKING_LINK
2536 if (link(tmppath, path) < 0)
2539 LWLockRelease(ControlFileLock);
2541 (errcode_for_file_access(),
2542 errmsg("could not link file \"%s\" to \"%s\" (initialization of log file %u, segment %u): %m",
2543 tmppath, path, *log, *seg)));
2548 if (rename(tmppath, path) < 0)
2551 LWLockRelease(ControlFileLock);
2553 (errcode_for_file_access(),
2554 errmsg("could not rename file \"%s\" to \"%s\" (initialization of log file %u, segment %u): %m",
2555 tmppath, path, *log, *seg)));
2561 LWLockRelease(ControlFileLock);
2567 * Open a pre-existing logfile segment for writing.
2570 XLogFileOpen(uint32 log, uint32 seg)
2572 char path[MAXPGPATH];
2575 XLogFilePath(path, ThisTimeLineID, log, seg);
2577 fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method),
2581 (errcode_for_file_access(),
2582 errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2589 * Open a logfile segment for reading (during recovery).
2591 * If fromArchive is true, the segment is retrieved from archive, otherwise
2592 * it's read from pg_xlog.
2595 XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
2596 int source, bool notfoundOk)
2598 char xlogfname[MAXFNAMELEN];
2599 char activitymsg[MAXFNAMELEN + 16];
2600 char path[MAXPGPATH];
2603 XLogFileName(xlogfname, tli, log, seg);
2607 case XLOG_FROM_ARCHIVE:
2608 /* Report recovery progress in PS display */
2609 snprintf(activitymsg, sizeof(activitymsg), "waiting for %s",
2611 set_ps_display(activitymsg, false);
2613 restoredFromArchive = RestoreArchivedFile(path, xlogfname,
2616 if (!restoredFromArchive)
2620 case XLOG_FROM_PG_XLOG:
2621 XLogFilePath(path, tli, log, seg);
2622 restoredFromArchive = false;
2626 elog(ERROR, "invalid XLogFileRead source %d", source);
2629 fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
2635 /* Report recovery progress in PS display */
2636 snprintf(activitymsg, sizeof(activitymsg), "recovering %s",
2638 set_ps_display(activitymsg, false);
2640 readSource = source;
2643 if (errno != ENOENT || !notfoundOk) /* unexpected failure? */
2645 (errcode_for_file_access(),
2646 errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2652 * Open a logfile segment for reading (during recovery).
2654 * This version searches for the segment with any TLI listed in expectedTLIs.
2655 * If not in StandbyMode and fromArchive is true, the segment is also
2656 * searched in pg_xlog if not found in archive.
2659 XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, int sources)
2661 char path[MAXPGPATH];
2666 * Loop looking for a suitable timeline ID: we might need to read any of
2667 * the timelines listed in expectedTLIs.
2669 * We expect curFileTLI on entry to be the TLI of the preceding file in
2670 * sequence, or 0 if there was no predecessor. We do not allow curFileTLI
2671 * to go backwards; this prevents us from picking up the wrong file when a
2672 * parent timeline extends to higher segment numbers than the child we
2675 foreach(cell, expectedTLIs)
2677 TimeLineID tli = (TimeLineID) lfirst_int(cell);
2679 if (tli < curFileTLI)
2680 break; /* don't bother looking at too-old TLIs */
2682 if (sources & XLOG_FROM_ARCHIVE)
2684 fd = XLogFileRead(log, seg, emode, tli, XLOG_FROM_ARCHIVE, true);
2687 elog(DEBUG1, "got WAL segment from archive");
2692 if (sources & XLOG_FROM_PG_XLOG)
2694 fd = XLogFileRead(log, seg, emode, tli, XLOG_FROM_PG_XLOG, true);
2700 /* Couldn't find it. For simplicity, complain about front timeline */
2701 XLogFilePath(path, recoveryTargetTLI, log, seg);
2704 (errcode_for_file_access(),
2705 errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2711 * Close the current logfile segment for writing.
2716 Assert(openLogFile >= 0);
2719 * WAL segment files will not be re-read in normal operation, so we advise
2720 * the OS to release any cached pages. But do not do so if WAL archiving
2721 * or streaming is active, because archiver and walsender process could
2722 * use the cache to read the WAL segment.
2724 #if defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED)
2725 if (!XLogIsNeeded())
2726 (void) posix_fadvise(openLogFile, 0, 0, POSIX_FADV_DONTNEED);
2729 if (close(openLogFile))
2731 (errcode_for_file_access(),
2732 errmsg("could not close log file %u, segment %u: %m",
2733 openLogId, openLogSeg)));
2738 * Attempt to retrieve the specified file from off-line archival storage.
2739 * If successful, fill "path" with its complete path (note that this will be
2740 * a temp file name that doesn't follow the normal naming convention), and
2743 * If not successful, fill "path" with the name of the normal on-line file
2744 * (which may or may not actually exist, but we'll try to use it), and return
2747 * For fixed-size files, the caller may pass the expected size as an
2748 * additional crosscheck on successful recovery. If the file size is not
2749 * known, set expectedSize = 0.
2752 RestoreArchivedFile(char *path, const char *xlogfname,
2753 const char *recovername, off_t expectedSize)
2755 char xlogpath[MAXPGPATH];
2756 char xlogRestoreCmd[MAXPGPATH];
2757 char lastRestartPointFname[MAXPGPATH];
2763 struct stat stat_buf;
2767 /* In standby mode, restore_command might not be supplied */
2768 if (recoveryRestoreCommand == NULL)
2772 * When doing archive recovery, we always prefer an archived log file even
2773 * if a file of the same name exists in XLOGDIR. The reason is that the
2774 * file in XLOGDIR could be an old, un-filled or partly-filled version
2775 * that was copied and restored as part of backing up $PGDATA.
2777 * We could try to optimize this slightly by checking the local copy
2778 * lastchange timestamp against the archived copy, but we have no API to
2779 * do this, nor can we guarantee that the lastchange timestamp was
2780 * preserved correctly when we copied to archive. Our aim is robustness,
2781 * so we elect not to do this.
2783 * If we cannot obtain the log file from the archive, however, we will try
2784 * to use the XLOGDIR file if it exists. This is so that we can make use
2785 * of log segments that weren't yet transferred to the archive.
2787 * Notice that we don't actually overwrite any files when we copy back
2788 * from archive because the recoveryRestoreCommand may inadvertently
2789 * restore inappropriate xlogs, or they may be corrupt, so we may wish to
2790 * fallback to the segments remaining in current XLOGDIR later. The
2791 * copy-from-archive filename is always the same, ensuring that we don't
2792 * run out of disk space on long recoveries.
2794 snprintf(xlogpath, MAXPGPATH, XLOGDIR "/%s", recovername);
2797 * Make sure there is no existing file named recovername.
2799 if (stat(xlogpath, &stat_buf) != 0)
2801 if (errno != ENOENT)
2803 (errcode_for_file_access(),
2804 errmsg("could not stat file \"%s\": %m",
2809 if (unlink(xlogpath) != 0)
2811 (errcode_for_file_access(),
2812 errmsg("could not remove file \"%s\": %m",
2817 * Calculate the archive file cutoff point for use during log shipping
2818 * replication. All files earlier than this point can be deleted from the
2819 * archive, though there is no requirement to do so.
2821 * We initialise this with the filename of an InvalidXLogRecPtr, which
2822 * will prevent the deletion of any WAL files from the archive because of
2823 * the alphabetic sorting property of WAL filenames.
2825 * Once we have successfully located the redo pointer of the checkpoint
2826 * from which we start recovery we never request a file prior to the redo
2827 * pointer of the last restartpoint. When redo begins we know that we have
2828 * successfully located it, so there is no need for additional status
2829 * flags to signify the point when we can begin deleting WAL files from
2834 XLByteToSeg(ControlFile->checkPointCopy.redo,
2835 restartLog, restartSeg);
2836 XLogFileName(lastRestartPointFname,
2837 ControlFile->checkPointCopy.ThisTimeLineID,
2838 restartLog, restartSeg);
2839 /* we shouldn't need anything earlier than last restart point */
2840 Assert(strcmp(lastRestartPointFname, xlogfname) <= 0);
2843 XLogFileName(lastRestartPointFname, 0, 0, 0);
2846 * construct the command to be executed
2848 dp = xlogRestoreCmd;
2849 endp = xlogRestoreCmd + MAXPGPATH - 1;
2852 for (sp = recoveryRestoreCommand; *sp; sp++)
2859 /* %p: relative path of target file */
2861 StrNCpy(dp, xlogpath, endp - dp);
2862 make_native_path(dp);
2866 /* %f: filename of desired file */
2868 StrNCpy(dp, xlogfname, endp - dp);
2872 /* %r: filename of last restartpoint */
2874 StrNCpy(dp, lastRestartPointFname, endp - dp);
2878 /* convert %% to a single % */
2884 /* otherwise treat the % as not special */
2899 (errmsg_internal("executing restore command \"%s\"",
2903 * Set in_restore_command to tell the signal handler that we should exit
2904 * right away on SIGTERM. We know that we're at a safe point to do that.
2905 * Check if we had already received the signal, so that we don't miss a
2906 * shutdown request received just before this.
2908 in_restore_command = true;
2909 if (shutdown_requested)
2913 * Copy xlog from archival storage to XLOGDIR
2915 rc = system(xlogRestoreCmd);
2917 in_restore_command = false;
2922 * command apparently succeeded, but let's make sure the file is
2923 * really there now and has the correct size.
2925 if (stat(xlogpath, &stat_buf) == 0)
2927 if (expectedSize > 0 && stat_buf.st_size != expectedSize)
2932 * If we find a partial file in standby mode, we assume it's
2933 * because it's just being copied to the archive, and keep
2936 * Otherwise treat a wrong-sized file as FATAL to ensure the
2937 * DBA would notice it, but is that too strong? We could try
2938 * to plow ahead with a local copy of the file ... but the
2939 * problem is that there probably isn't one, and we'd
2940 * incorrectly conclude we've reached the end of WAL and we're
2941 * done recovering ...
2943 if (StandbyMode && stat_buf.st_size < expectedSize)
2948 (errmsg("archive file \"%s\" has wrong size: %lu instead of %lu",
2950 (unsigned long) stat_buf.st_size,
2951 (unsigned long) expectedSize)));
2957 (errmsg("restored log file \"%s\" from archive",
2959 strcpy(path, xlogpath);
2966 if (errno != ENOENT)
2968 (errcode_for_file_access(),
2969 errmsg("could not stat file \"%s\": %m",
2975 * Remember, we rollforward UNTIL the restore fails so failure here is
2976 * just part of the process... that makes it difficult to determine
2977 * whether the restore failed because there isn't an archive to restore,
2978 * or because the administrator has specified the restore program
2979 * incorrectly. We have to assume the former.
2981 * However, if the failure was due to any sort of signal, it's best to
2982 * punt and abort recovery. (If we "return false" here, upper levels will
2983 * assume that recovery is complete and start up the database!) It's
2984 * essential to abort on child SIGINT and SIGQUIT, because per spec
2985 * system() ignores SIGINT and SIGQUIT while waiting; if we see one of
2986 * those it's a good bet we should have gotten it too.
2988 * On SIGTERM, assume we have received a fast shutdown request, and exit
2989 * cleanly. It's pure chance whether we receive the SIGTERM first, or the
2990 * child process. If we receive it first, the signal handler will call
2991 * proc_exit, otherwise we do it here. If we or the child process received
2992 * SIGTERM for any other reason than a fast shutdown request, postmaster
2993 * will perform an immediate shutdown when it sees us exiting
2996 * Per the Single Unix Spec, shells report exit status > 128 when a called
2997 * command died on a signal. Also, 126 and 127 are used to report
2998 * problems such as an unfindable command; treat those as fatal errors
3001 if (WIFSIGNALED(rc) && WTERMSIG(rc) == SIGTERM)
3004 signaled = WIFSIGNALED(rc) || WEXITSTATUS(rc) > 125;
3006 ereport(signaled ? FATAL : DEBUG2,
3007 (errmsg("could not restore file \"%s\" from archive: return code %d",
3013 * if an archived file is not available, there might still be a version of
3014 * this file in XLOGDIR, so return that as the filename to open.
3016 * In many recovery scenarios we expect this to fail also, but if so that
3017 * just means we've reached the end of WAL.
3019 snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlogfname);
3024 * Attempt to execute an external shell command during recovery.
3026 * 'command' is the shell command to be executed, 'commandName' is a
3027 * human-readable name describing the command emitted in the logs. If
3028 * 'failonSignal' is true and the command is killed by a signal, a FATAL
3029 * error is thrown. Otherwise a WARNING is emitted.
3031 * This is currently used for restore_end_command and restartpoint_command.
3034 ExecuteRecoveryCommand(char *command, char *commandName, bool failOnSignal)
3036 char xlogRecoveryCmd[MAXPGPATH];
3037 char lastRestartPointFname[MAXPGPATH];
3046 Assert(command && commandName);
3049 * Calculate the archive file cutoff point for use during log shipping
3050 * replication. All files earlier than this point can be deleted from the
3051 * archive, though there is no requirement to do so.
3053 LWLockAcquire(ControlFileLock, LW_SHARED);
3054 XLByteToSeg(ControlFile->checkPointCopy.redo,
3055 restartLog, restartSeg);
3056 XLogFileName(lastRestartPointFname,
3057 ControlFile->checkPointCopy.ThisTimeLineID,
3058 restartLog, restartSeg);
3059 LWLockRelease(ControlFileLock);
3062 * construct the command to be executed
3064 dp = xlogRecoveryCmd;
3065 endp = xlogRecoveryCmd + MAXPGPATH - 1;
3068 for (sp = command; *sp; sp++)
3075 /* %r: filename of last restartpoint */
3077 StrNCpy(dp, lastRestartPointFname, endp - dp);
3081 /* convert %% to a single % */
3087 /* otherwise treat the % as not special */
3102 (errmsg_internal("executing %s \"%s\"", commandName, command)));
3105 * execute the constructed command
3107 rc = system(xlogRecoveryCmd);
3111 * If the failure was due to any sort of signal, it's best to punt and
3112 * abort recovery. See also detailed comments on signals in
3113 * RestoreArchivedFile().
3115 signaled = WIFSIGNALED(rc) || WEXITSTATUS(rc) > 125;
3118 * translator: First %s represents a recovery.conf parameter name like
3119 * "recovery_end_command", and the 2nd is the value of that parameter.
3121 ereport((signaled && failOnSignal) ? FATAL : WARNING,
3122 (errmsg("%s \"%s\": return code %d", commandName,
3128 * Preallocate log files beyond the specified log endpoint.
3130 * XXX this is currently extremely conservative, since it forces only one
3131 * future log segment to exist, and even that only if we are 75% done with
3132 * the current one. This is only appropriate for very low-WAL-volume systems.
3133 * High-volume systems will be OK once they've built up a sufficient set of
3134 * recycled log segments, but the startup transient is likely to include
3135 * a lot of segment creations by foreground processes, which is not so good.
3138 PreallocXlogFiles(XLogRecPtr endptr)
3145 XLByteToPrevSeg(endptr, _logId, _logSeg);
3146 if ((endptr.xrecoff - 1) % XLogSegSize >=
3147 (uint32) (0.75 * XLogSegSize))
3149 NextLogSeg(_logId, _logSeg);
3150 use_existent = true;
3151 lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
3154 CheckpointStats.ckpt_segs_added++;
3159 * Get the log/seg of the latest removed or recycled WAL segment.
3160 * Returns 0 if no WAL segments have been removed since startup.
3163 XLogGetLastRemoved(uint32 *log, uint32 *seg)
3165 /* use volatile pointer to prevent code rearrangement */
3166 volatile XLogCtlData *xlogctl = XLogCtl;
3168 SpinLockAcquire(&xlogctl->info_lck);
3169 *log = xlogctl->lastRemovedLog;
3170 *seg = xlogctl->lastRemovedSeg;
3171 SpinLockRelease(&xlogctl->info_lck);
3175 * Update the last removed log/seg pointer in shared memory, to reflect
3176 * that the given XLOG file has been removed.
3179 UpdateLastRemovedPtr(char *filename)
3181 /* use volatile pointer to prevent code rearrangement */
3182 volatile XLogCtlData *xlogctl = XLogCtl;
3187 XLogFromFileName(filename, &tli, &log, &seg);
3189 SpinLockAcquire(&xlogctl->info_lck);
3190 if (log > xlogctl->lastRemovedLog ||
3191 (log == xlogctl->lastRemovedLog && seg > xlogctl->lastRemovedSeg))
3193 xlogctl->lastRemovedLog = log;
3194 xlogctl->lastRemovedSeg = seg;
3196 SpinLockRelease(&xlogctl->info_lck);
3200 * Recycle or remove all log files older or equal to passed log/seg#
3202 * endptr is current (or recent) end of xlog; this is used to determine
3203 * whether we want to recycle rather than delete no-longer-wanted log files.
3206 RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr)
3212 struct dirent *xlde;
3213 char lastoff[MAXFNAMELEN];
3214 char path[MAXPGPATH];
3217 char newpath[MAXPGPATH];
3219 struct stat statbuf;
3221 elog(DEBUG2, "removing WAL segments older than %X/%X", log, seg);
3224 * Initialize info about where to try to recycle to. We allow recycling
3225 * segments up to XLOGfileslop segments beyond the current XLOG location.
3227 XLByteToPrevSeg(endptr, endlogId, endlogSeg);
3228 max_advance = XLOGfileslop;
3230 xldir = AllocateDir(XLOGDIR);
3233 (errcode_for_file_access(),
3234 errmsg("could not open transaction log directory \"%s\": %m",
3237 XLogFileName(lastoff, ThisTimeLineID, log, seg);
3239 while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
3242 * We ignore the timeline part of the XLOG segment identifiers in
3243 * deciding whether a segment is still needed. This ensures that we
3244 * won't prematurely remove a segment from a parent timeline. We could
3245 * probably be a little more proactive about removing segments of
3246 * non-parent timelines, but that would be a whole lot more
3249 * We use the alphanumeric sorting property of the filenames to decide
3250 * which ones are earlier than the lastoff segment.
3252 if (strlen(xlde->d_name) == 24 &&
3253 strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
3254 strcmp(xlde->d_name + 8, lastoff + 8) <= 0)
3257 * Normally we don't delete old XLOG files during recovery to
3258 * avoid accidentally deleting a file that looks stale due to a
3259 * bug or hardware issue, but in fact contains important data.
3260 * During streaming recovery, however, we will eventually fill the
3261 * disk if we never clean up, so we have to. That's not an issue
3262 * with file-based archive recovery because in that case we
3263 * restore one XLOG file at a time, on-demand, and with a
3264 * different filename that can't be confused with regular XLOG
3267 if (WalRcvInProgress() || XLogArchiveCheckDone(xlde->d_name))
3269 snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
3271 /* Update the last removed location in shared memory first */
3272 UpdateLastRemovedPtr(xlde->d_name);
3275 * Before deleting the file, see if it can be recycled as a
3276 * future log segment. Only recycle normal files, pg_standby
3277 * for example can create symbolic links pointing to a
3278 * separate archive directory.
3280 if (lstat(path, &statbuf) == 0 && S_ISREG(statbuf.st_mode) &&
3281 InstallXLogFileSegment(&endlogId, &endlogSeg, path,
3282 true, &max_advance, true))
3285 (errmsg("recycled transaction log file \"%s\"",
3287 CheckpointStats.ckpt_segs_recycled++;
3288 /* Needn't recheck that slot on future iterations */
3289 if (max_advance > 0)
3291 NextLogSeg(endlogId, endlogSeg);
3297 /* No need for any more future segments... */
3301 (errmsg("removing transaction log file \"%s\"",
3307 * On Windows, if another process (e.g another backend)
3308 * holds the file open in FILE_SHARE_DELETE mode, unlink
3309 * will succeed, but the file will still show up in
3310 * directory listing until the last handle is closed. To
3311 * avoid confusing the lingering deleted file for a live
3312 * WAL file that needs to be archived, rename it before
3315 * If another process holds the file open without
3316 * FILE_SHARE_DELETE flag, rename will fail. We'll try
3317 * again at the next checkpoint.
3319 snprintf(newpath, MAXPGPATH, "%s.deleted", path);
3320 if (rename(path, newpath) != 0)
3323 (errcode_for_file_access(),
3324 errmsg("could not rename old transaction log file \"%s\": %m",
3328 rc = unlink(newpath);
3335 (errcode_for_file_access(),
3336 errmsg("could not remove old transaction log file \"%s\": %m",
3340 CheckpointStats.ckpt_segs_removed++;
3343 XLogArchiveCleanup(xlde->d_name);
3352 * Verify whether pg_xlog and pg_xlog/archive_status exist.
3353 * If the latter does not exist, recreate it.
3355 * It is not the goal of this function to verify the contents of these
3356 * directories, but to help in cases where someone has performed a cluster
3357 * copy for PITR purposes but omitted pg_xlog from the copy.
3359 * We could also recreate pg_xlog if it doesn't exist, but a deliberate
3360 * policy decision was made not to. It is fairly common for pg_xlog to be
3361 * a symlink, and if that was the DBA's intent then automatically making a
3362 * plain directory would result in degraded performance with no notice.
3365 ValidateXLOGDirectoryStructure(void)
3367 char path[MAXPGPATH];
3368 struct stat stat_buf;
3370 /* Check for pg_xlog; if it doesn't exist, error out */
3371 if (stat(XLOGDIR, &stat_buf) != 0 ||
3372 !S_ISDIR(stat_buf.st_mode))
3374 (errmsg("required WAL directory \"%s\" does not exist",
3377 /* Check for archive_status */
3378 snprintf(path, MAXPGPATH, XLOGDIR "/archive_status");
3379 if (stat(path, &stat_buf) == 0)
3381 /* Check for weird cases where it exists but isn't a directory */
3382 if (!S_ISDIR(stat_buf.st_mode))
3384 (errmsg("required WAL directory \"%s\" does not exist",
3390 (errmsg("creating missing WAL directory \"%s\"", path)));
3391 if (mkdir(path, 0700) < 0)
3393 (errmsg("could not create missing directory \"%s\": %m",
3399 * Remove previous backup history files. This also retries creation of
3400 * .ready files for any backup history files for which XLogArchiveNotify
3404 CleanupBackupHistory(void)
3407 struct dirent *xlde;
3408 char path[MAXPGPATH];
3410 xldir = AllocateDir(XLOGDIR);
3413 (errcode_for_file_access(),
3414 errmsg("could not open transaction log directory \"%s\": %m",
3417 while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
3419 if (strlen(xlde->d_name) > 24 &&
3420 strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
3421 strcmp(xlde->d_name + strlen(xlde->d_name) - strlen(".backup"),
3424 if (XLogArchiveCheckDone(xlde->d_name))
3427 (errmsg("removing transaction log backup history file \"%s\"",
3429 snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
3431 XLogArchiveCleanup(xlde->d_name);
3440 * Restore the backup blocks present in an XLOG record, if any.
3442 * We assume all of the record has been read into memory at *record.
3444 * Note: when a backup block is available in XLOG, we restore it
3445 * unconditionally, even if the page in the database appears newer.
3446 * This is to protect ourselves against database pages that were partially
3447 * or incorrectly written during a crash. We assume that the XLOG data
3448 * must be good because it has passed a CRC check, while the database
3449 * page might not be. This will force us to replay all subsequent
3450 * modifications of the page that appear in XLOG, rather than possibly
3451 * ignoring them as already applied, but that's not a huge drawback.
3453 * If 'cleanup' is true, a cleanup lock is used when restoring blocks.
3454 * Otherwise, a normal exclusive lock is used. During crash recovery, that's
3455 * just pro forma because there can't be any regular backends in the system,
3456 * but in hot standby mode the distinction is important. The 'cleanup'
3457 * argument applies to all backup blocks in the WAL record, that suffices for
3461 RestoreBkpBlocks(XLogRecPtr lsn, XLogRecord *record, bool cleanup)
3469 if (!(record->xl_info & XLR_BKP_BLOCK_MASK))
3472 blk = (char *) XLogRecGetData(record) + record->xl_len;
3473 for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
3475 if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
3478 memcpy(&bkpb, blk, sizeof(BkpBlock));
3479 blk += sizeof(BkpBlock);
3481 buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block,
3483 Assert(BufferIsValid(buffer));
3485 LockBufferForCleanup(buffer);
3487 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
3489 page = (Page) BufferGetPage(buffer);
3491 if (bkpb.hole_length == 0)
3493 memcpy((char *) page, blk, BLCKSZ);
3497 /* must zero-fill the hole */
3498 MemSet((char *) page, 0, BLCKSZ);
3499 memcpy((char *) page, blk, bkpb.hole_offset);
3500 memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length),
3501 blk + bkpb.hole_offset,
3502 BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
3505 PageSetLSN(page, lsn);
3506 PageSetTLI(page, ThisTimeLineID);
3507 MarkBufferDirty(buffer);
3508 UnlockReleaseBuffer(buffer);
3510 blk += BLCKSZ - bkpb.hole_length;
3515 * CRC-check an XLOG record. We do not believe the contents of an XLOG
3516 * record (other than to the minimal extent of computing the amount of
3517 * data to read in) until we've checked the CRCs.
3519 * We assume all of the record has been read into memory at *record.
3522 RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
3526 uint32 len = record->xl_len;
3530 /* First the rmgr data */
3532 COMP_CRC32(crc, XLogRecGetData(record), len);
3534 /* Add in the backup blocks, if any */
3535 blk = (char *) XLogRecGetData(record) + len;
3536 for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
3540 if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
3543 memcpy(&bkpb, blk, sizeof(BkpBlock));
3544 if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ)
3546 ereport(emode_for_corrupt_record(emode, recptr),
3547 (errmsg("incorrect hole size in record at %X/%X",
3548 recptr.xlogid, recptr.xrecoff)));
3551 blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length;
3552 COMP_CRC32(crc, blk, blen);
3556 /* Check that xl_tot_len agrees with our calculation */
3557 if (blk != (char *) record + record->xl_tot_len)
3559 ereport(emode_for_corrupt_record(emode, recptr),
3560 (errmsg("incorrect total length in record at %X/%X",
3561 recptr.xlogid, recptr.xrecoff)));
3565 /* Finally include the record header */
3566 COMP_CRC32(crc, (char *) record + sizeof(pg_crc32),
3567 SizeOfXLogRecord - sizeof(pg_crc32));
3570 if (!EQ_CRC32(record->xl_crc, crc))
3572 ereport(emode_for_corrupt_record(emode, recptr),
3573 (errmsg("incorrect resource manager data checksum in record at %X/%X",
3574 recptr.xlogid, recptr.xrecoff)));
3582 * Attempt to read an XLOG record.
3584 * If RecPtr is not NULL, try to read a record at that position. Otherwise
3585 * try to read a record just after the last one previously read.
3587 * If no valid record is available, returns NULL, or fails if emode is PANIC.
3588 * (emode must be either PANIC, LOG)
3590 * The record is copied into readRecordBuf, so that on successful return,
3591 * the returned record pointer always points there.
3594 ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
3598 XLogRecPtr tmpRecPtr = EndRecPtr;
3599 bool randAccess = false;
3602 uint32 targetRecOff;
3603 uint32 pageHeaderSize;
3605 if (readBuf == NULL)
3608 * First time through, permanently allocate readBuf. We do it this
3609 * way, rather than just making a static array, for two reasons: (1)
3610 * no need to waste the storage in most instantiations of the backend;
3611 * (2) a static char array isn't guaranteed to have any particular
3612 * alignment, whereas malloc() will provide MAXALIGN'd storage.
3614 readBuf = (char *) malloc(XLOG_BLCKSZ);
3615 Assert(readBuf != NULL);
3620 RecPtr = &tmpRecPtr;
3623 * Align recptr to next page if no more records can fit on the current
3626 if (XLOG_BLCKSZ - (RecPtr->xrecoff % XLOG_BLCKSZ) < SizeOfXLogRecord)
3628 NextLogPage(tmpRecPtr);
3629 /* We will account for page header size below */
3632 if (tmpRecPtr.xrecoff >= XLogFileSize)
3634 (tmpRecPtr.xlogid)++;
3635 tmpRecPtr.xrecoff = 0;
3640 if (!XRecOffIsValid(RecPtr->xrecoff))
3642 (errmsg("invalid record offset at %X/%X",
3643 RecPtr->xlogid, RecPtr->xrecoff)));
3646 * Since we are going to a random position in WAL, forget any prior
3647 * state about what timeline we were in, and allow it to be any
3648 * timeline in expectedTLIs. We also set a flag to allow curFileTLI
3649 * to go backwards (but we can't reset that variable right here, since
3650 * we might not change files at all).
3652 lastPageTLI = 0; /* see comment in ValidXLOGHeader */
3653 randAccess = true; /* allow curFileTLI to go backwards too */
3656 /* This is the first try to read this page. */
3659 /* Read the page containing the record */
3660 if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess))
3663 pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
3664 targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
3665 if (targetRecOff == 0)
3668 * Can only get here in the continuing-from-prev-page case, because
3669 * XRecOffIsValid eliminated the zero-page-offset case otherwise. Need
3670 * to skip over the new page's header.
3672 tmpRecPtr.xrecoff += pageHeaderSize;
3673 targetRecOff = pageHeaderSize;
3675 else if (targetRecOff < pageHeaderSize)
3677 ereport(emode_for_corrupt_record(emode, *RecPtr),
3678 (errmsg("invalid record offset at %X/%X",
3679 RecPtr->xlogid, RecPtr->xrecoff)));
3680 goto next_record_is_invalid;
3682 if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
3683 targetRecOff == pageHeaderSize)
3685 ereport(emode_for_corrupt_record(emode, *RecPtr),
3686 (errmsg("contrecord is requested by %X/%X",
3687 RecPtr->xlogid, RecPtr->xrecoff)));
3688 goto next_record_is_invalid;
3690 record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % XLOG_BLCKSZ);
3693 * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
3696 if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
3698 if (record->xl_len != 0)
3700 ereport(emode_for_corrupt_record(emode, *RecPtr),
3701 (errmsg("invalid xlog switch record at %X/%X",
3702 RecPtr->xlogid, RecPtr->xrecoff)));
3703 goto next_record_is_invalid;
3706 else if (record->xl_len == 0)
3708 ereport(emode_for_corrupt_record(emode, *RecPtr),
3709 (errmsg("record with zero length at %X/%X",
3710 RecPtr->xlogid, RecPtr->xrecoff)));
3711 goto next_record_is_invalid;
3713 if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len ||
3714 record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
3715 XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
3717 ereport(emode_for_corrupt_record(emode, *RecPtr),
3718 (errmsg("invalid record length at %X/%X",
3719 RecPtr->xlogid, RecPtr->xrecoff)));
3720 goto next_record_is_invalid;
3722 if (record->xl_rmid > RM_MAX_ID)
3724 ereport(emode_for_corrupt_record(emode, *RecPtr),
3725 (errmsg("invalid resource manager ID %u at %X/%X",
3726 record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff)));
3727 goto next_record_is_invalid;
3732 * We can't exactly verify the prev-link, but surely it should be less
3733 * than the record's own address.
3735 if (!XLByteLT(record->xl_prev, *RecPtr))
3737 ereport(emode_for_corrupt_record(emode, *RecPtr),
3738 (errmsg("record with incorrect prev-link %X/%X at %X/%X",
3739 record->xl_prev.xlogid, record->xl_prev.xrecoff,
3740 RecPtr->xlogid, RecPtr->xrecoff)));
3741 goto next_record_is_invalid;
3747 * Record's prev-link should exactly match our previous location. This
3748 * check guards against torn WAL pages where a stale but valid-looking
3749 * WAL record starts on a sector boundary.
3751 if (!XLByteEQ(record->xl_prev, ReadRecPtr))
3753 ereport(emode_for_corrupt_record(emode, *RecPtr),
3754 (errmsg("record with incorrect prev-link %X/%X at %X/%X",
3755 record->xl_prev.xlogid, record->xl_prev.xrecoff,
3756 RecPtr->xlogid, RecPtr->xrecoff)));
3757 goto next_record_is_invalid;
3762 * Allocate or enlarge readRecordBuf as needed. To avoid useless small
3763 * increases, round its size to a multiple of XLOG_BLCKSZ, and make sure
3764 * it's at least 4*Max(BLCKSZ, XLOG_BLCKSZ) to start with. (That is
3765 * enough for all "normal" records, but very large commit or abort records
3766 * might need more space.)
3768 total_len = record->xl_tot_len;
3769 if (total_len > readRecordBufSize)
3771 uint32 newSize = total_len;
3773 newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
3774 newSize = Max(newSize, 4 * Max(BLCKSZ, XLOG_BLCKSZ));
3776 free(readRecordBuf);
3777 readRecordBuf = (char *) malloc(newSize);
3780 readRecordBufSize = 0;
3781 /* We treat this as a "bogus data" condition */
3782 ereport(emode_for_corrupt_record(emode, *RecPtr),
3783 (errmsg("record length %u at %X/%X too long",
3784 total_len, RecPtr->xlogid, RecPtr->xrecoff)));
3785 goto next_record_is_invalid;
3787 readRecordBufSize = newSize;
3790 buffer = readRecordBuf;
3791 len = XLOG_BLCKSZ - RecPtr->xrecoff % XLOG_BLCKSZ;
3792 if (total_len > len)
3794 /* Need to reassemble record */
3795 XLogContRecord *contrecord;
3797 uint32 gotlen = len;
3799 /* Initialize pagelsn to the beginning of the page this record is on */
3801 pagelsn.xrecoff = (pagelsn.xrecoff / XLOG_BLCKSZ) * XLOG_BLCKSZ;
3803 memcpy(buffer, record, len);
3804 record = (XLogRecord *) buffer;
3808 /* Calculate pointer to beginning of next page */
3809 pagelsn.xrecoff += XLOG_BLCKSZ;
3810 if (pagelsn.xrecoff >= XLogFileSize)
3813 pagelsn.xrecoff = 0;
3815 /* Wait for the next page to become available */
3816 if (!XLogPageRead(&pagelsn, emode, false, false))
3819 /* Check that the continuation record looks valid */
3820 if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD))
3822 ereport(emode_for_corrupt_record(emode, *RecPtr),
3823 (errmsg("there is no contrecord flag in log file %u, segment %u, offset %u",
3824 readId, readSeg, readOff)));
3825 goto next_record_is_invalid;
3827 pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
3828 contrecord = (XLogContRecord *) ((char *) readBuf + pageHeaderSize);
3829 if (contrecord->xl_rem_len == 0 ||
3830 total_len != (contrecord->xl_rem_len + gotlen))
3832 ereport(emode_for_corrupt_record(emode, *RecPtr),
3833 (errmsg("invalid contrecord length %u in log file %u, segment %u, offset %u",
3834 contrecord->xl_rem_len,
3835 readId, readSeg, readOff)));
3836 goto next_record_is_invalid;
3838 len = XLOG_BLCKSZ - pageHeaderSize - SizeOfXLogContRecord;
3839 if (contrecord->xl_rem_len > len)
3841 memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord, len);
3846 memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord,
3847 contrecord->xl_rem_len);
3850 if (!RecordIsValid(record, *RecPtr, emode))
3851 goto next_record_is_invalid;
3852 pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
3853 EndRecPtr.xlogid = readId;
3854 EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff +
3856 MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len);
3858 ReadRecPtr = *RecPtr;
3859 /* needn't worry about XLOG SWITCH, it can't cross page boundaries */
3863 /* Record does not cross a page boundary */
3864 if (!RecordIsValid(record, *RecPtr, emode))
3865 goto next_record_is_invalid;
3866 EndRecPtr.xlogid = RecPtr->xlogid;
3867 EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);
3869 ReadRecPtr = *RecPtr;
3870 memcpy(buffer, record, total_len);
3873 * Special processing if it's an XLOG SWITCH record
3875 if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
3877 /* Pretend it extends to end of segment */
3878 EndRecPtr.xrecoff += XLogSegSize - 1;
3879 EndRecPtr.xrecoff -= EndRecPtr.xrecoff % XLogSegSize;
3882 * Pretend that readBuf contains the last page of the segment. This is
3883 * just to avoid Assert failure in StartupXLOG if XLOG ends with this
3886 readOff = XLogSegSize - XLOG_BLCKSZ;
3888 return (XLogRecord *) buffer;
3890 next_record_is_invalid:
3891 failedSources |= readSource;
3899 /* In standby-mode, keep trying */
3907 * Check whether the xlog header of a page just read in looks valid.
3909 * This is just a convenience subroutine to avoid duplicated code in
3910 * ReadRecord. It's not intended for use from anywhere else.
3913 ValidXLOGHeader(XLogPageHeader hdr, int emode)
3917 recaddr.xlogid = readId;
3918 recaddr.xrecoff = readSeg * XLogSegSize + readOff;
3920 if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
3922 ereport(emode_for_corrupt_record(emode, recaddr),
3923 (errmsg("invalid magic number %04X in log file %u, segment %u, offset %u",
3924 hdr->xlp_magic, readId, readSeg, readOff)));
3927 if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
3929 ereport(emode_for_corrupt_record(emode, recaddr),
3930 (errmsg("invalid info bits %04X in log file %u, segment %u, offset %u",
3931 hdr->xlp_info, readId, readSeg, readOff)));
3934 if (hdr->xlp_info & XLP_LONG_HEADER)
3936 XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
3938 if (longhdr->xlp_sysid != ControlFile->system_identifier)
3940 char fhdrident_str[32];
3941 char sysident_str[32];
3944 * Format sysids separately to keep platform-dependent format code
3945 * out of the translatable message string.
3947 snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT,
3948 longhdr->xlp_sysid);
3949 snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT,
3950 ControlFile->system_identifier);
3951 ereport(emode_for_corrupt_record(emode, recaddr),
3952 (errmsg("WAL file is from different database system"),
3953 errdetail("WAL file database system identifier is %s, pg_control database system identifier is %s.",
3954 fhdrident_str, sysident_str)));
3957 if (longhdr->xlp_seg_size != XLogSegSize)
3959 ereport(emode_for_corrupt_record(emode, recaddr),
3960 (errmsg("WAL file is from different database system"),
3961 errdetail("Incorrect XLOG_SEG_SIZE in page header.")));
3964 if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
3966 ereport(emode_for_corrupt_record(emode, recaddr),
3967 (errmsg("WAL file is from different database system"),
3968 errdetail("Incorrect XLOG_BLCKSZ in page header.")));
3972 else if (readOff == 0)
3974 /* hmm, first page of file doesn't have a long header? */
3975 ereport(emode_for_corrupt_record(emode, recaddr),
3976 (errmsg("invalid info bits %04X in log file %u, segment %u, offset %u",
3977 hdr->xlp_info, readId, readSeg, readOff)));
3981 if (!XLByteEQ(hdr->xlp_pageaddr, recaddr))
3983 ereport(emode_for_corrupt_record(emode, recaddr),
3984 (errmsg("unexpected pageaddr %X/%X in log file %u, segment %u, offset %u",
3985 hdr->xlp_pageaddr.xlogid, hdr->xlp_pageaddr.xrecoff,
3986 readId, readSeg, readOff)));
3991 * Check page TLI is one of the expected values.
3993 if (!list_member_int(expectedTLIs, (int) hdr->xlp_tli))
3995 ereport(emode_for_corrupt_record(emode, recaddr),
3996 (errmsg("unexpected timeline ID %u in log file %u, segment %u, offset %u",
3998 readId, readSeg, readOff)));
4003 * Since child timelines are always assigned a TLI greater than their
4004 * immediate parent's TLI, we should never see TLI go backwards across
4005 * successive pages of a consistent WAL sequence.
4007 * Of course this check should only be applied when advancing sequentially
4008 * across pages; therefore ReadRecord resets lastPageTLI to zero when
4009 * going to a random page.
4011 if (hdr->xlp_tli < lastPageTLI)
4013 ereport(emode_for_corrupt_record(emode, recaddr),
4014 (errmsg("out-of-sequence timeline ID %u (after %u) in log file %u, segment %u, offset %u",
4015 hdr->xlp_tli, lastPageTLI,
4016 readId, readSeg, readOff)));
4019 lastPageTLI = hdr->xlp_tli;
4024 * Try to read a timeline's history file.
4026 * If successful, return the list of component TLIs (the given TLI followed by
4027 * its ancestor TLIs). If we can't find the history file, assume that the
4028 * timeline has no parents, and return a list of just the specified timeline
4032 readTimeLineHistory(TimeLineID targetTLI)
4035 char path[MAXPGPATH];
4036 char histfname[MAXFNAMELEN];
4037 char fline[MAXPGPATH];
4040 /* Timeline 1 does not have a history file, so no need to check */
4042 return list_make1_int((int) targetTLI);
4044 if (InArchiveRecovery)
4046 TLHistoryFileName(histfname, targetTLI);
4047 RestoreArchivedFile(path, histfname, "RECOVERYHISTORY", 0);
4050 TLHistoryFilePath(path, targetTLI);
4052 fd = AllocateFile(path, "r");
4055 if (errno != ENOENT)
4057 (errcode_for_file_access(),
4058 errmsg("could not open file \"%s\": %m", path)));
4059 /* Not there, so assume no parents */
4060 return list_make1_int((int) targetTLI);
4068 while (fgets(fline, sizeof(fline), fd) != NULL)
4070 /* skip leading whitespace and check for # comment */
4075 for (ptr = fline; *ptr; ptr++)
4077 if (!isspace((unsigned char) *ptr))
4080 if (*ptr == '\0' || *ptr == '#')
4083 /* expect a numeric timeline ID as first field of line */
4084 tli = (TimeLineID) strtoul(ptr, &endptr, 0);
4087 (errmsg("syntax error in history file: %s", fline),
4088 errhint("Expected a numeric timeline ID.")));
4091 tli <= (TimeLineID) linitial_int(result))
4093 (errmsg("invalid data in history file: %s", fline),
4094 errhint("Timeline IDs must be in increasing sequence.")));
4096 /* Build list with newest item first */
4097 result = lcons_int((int) tli, result);
4099 /* we ignore the remainder of each line */
4105 targetTLI <= (TimeLineID) linitial_int(result))
4107 (errmsg("invalid data in history file \"%s\"", path),
4108 errhint("Timeline IDs must be less than child timeline's ID.")));
4110 result = lcons_int((int) targetTLI, result);
4113 (errmsg_internal("history of timeline %u is %s",
4114 targetTLI, nodeToString(result))));
4120 * Probe whether a timeline history file exists for the given timeline ID
4123 existsTimeLineHistory(TimeLineID probeTLI)
4125 char path[MAXPGPATH];
4126 char histfname[MAXFNAMELEN];
4129 /* Timeline 1 does not have a history file, so no need to check */
4133 if (InArchiveRecovery)
4135 TLHistoryFileName(histfname, probeTLI);
4136 RestoreArchivedFile(path, histfname, "RECOVERYHISTORY", 0);
4139 TLHistoryFilePath(path, probeTLI);
4141 fd = AllocateFile(path, "r");
4149 if (errno != ENOENT)
4151 (errcode_for_file_access(),
4152 errmsg("could not open file \"%s\": %m", path)));
4158 * Find the newest existing timeline, assuming that startTLI exists.
4160 * Note: while this is somewhat heuristic, it does positively guarantee
4161 * that (result + 1) is not a known timeline, and therefore it should
4162 * be safe to assign that ID to a new timeline.
4165 findNewestTimeLine(TimeLineID startTLI)
4167 TimeLineID newestTLI;
4168 TimeLineID probeTLI;
4171 * The algorithm is just to probe for the existence of timeline history
4172 * files. XXX is it useful to allow gaps in the sequence?
4174 newestTLI = startTLI;
4176 for (probeTLI = startTLI + 1;; probeTLI++)
4178 if (existsTimeLineHistory(probeTLI))
4180 newestTLI = probeTLI; /* probeTLI exists */
4184 /* doesn't exist, assume we're done */
4193 * Create a new timeline history file.
4195 * newTLI: ID of the new timeline
4196 * parentTLI: ID of its immediate parent
4197 * endTLI et al: ID of the last used WAL file, for annotation purposes
4199 * Currently this is only used during recovery, and so there are no locking
4200 * considerations. But we should be just as tense as XLogFileInit to avoid
4201 * emplacing a bogus file.
4204 writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
4205 TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
4207 char path[MAXPGPATH];
4208 char tmppath[MAXPGPATH];
4209 char histfname[MAXFNAMELEN];
4210 char xlogfname[MAXFNAMELEN];
4211 char buffer[BLCKSZ];
4216 Assert(newTLI > parentTLI); /* else bad selection of newTLI */
4219 * Write into a temp file name.
4221 snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
4225 /* do not use get_sync_bit() here --- want to fsync only at end of fill */
4226 fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL,
4230 (errcode_for_file_access(),
4231 errmsg("could not create file \"%s\": %m", tmppath)));
4234 * If a history file exists for the parent, copy it verbatim
4236 if (InArchiveRecovery)
4238 TLHistoryFileName(histfname, parentTLI);
4239 RestoreArchivedFile(path, histfname, "RECOVERYHISTORY", 0);
4242 TLHistoryFilePath(path, parentTLI);
4244 srcfd = BasicOpenFile(path, O_RDONLY, 0);
4247 if (errno != ENOENT)
4249 (errcode_for_file_access(),
4250 errmsg("could not open file \"%s\": %m", path)));
4251 /* Not there, so assume parent has no parents */
4258 nbytes = (int) read(srcfd, buffer, sizeof(buffer));
4259 if (nbytes < 0 || errno != 0)
4261 (errcode_for_file_access(),
4262 errmsg("could not read file \"%s\": %m", path)));
4266 if ((int) write(fd, buffer, nbytes) != nbytes)
4268 int save_errno = errno;
4271 * If we fail to make the file, delete it to release disk
4277 * if write didn't set errno, assume problem is no disk space
4279 errno = save_errno ? save_errno : ENOSPC;
4282 (errcode_for_file_access(),
4283 errmsg("could not write to file \"%s\": %m", tmppath)));
4290 * Append one line with the details of this timeline split.
4292 * If we did have a parent file, insert an extra newline just in case the
4293 * parent file failed to end with one.
4295 XLogFileName(xlogfname, endTLI, endLogId, endLogSeg);
4298 * Write comment to history file to explain why and where timeline changed.
4299 * Comment varies according to the recovery target used.
4301 if (recoveryTarget == RECOVERY_TARGET_XID)
4302 snprintf(buffer, sizeof(buffer),
4303 "%s%u\t%s\t%s transaction %u\n",
4304 (srcfd < 0) ? "" : "\n",
4307 recoveryStopAfter ? "after" : "before",
4309 if (recoveryTarget == RECOVERY_TARGET_TIME)
4310 snprintf(buffer, sizeof(buffer),
4311 "%s%u\t%s\t%s %s\n",
4312 (srcfd < 0) ? "" : "\n",
4315 recoveryStopAfter ? "after" : "before",
4316 timestamptz_to_str(recoveryStopTime));
4318 snprintf(buffer, sizeof(buffer),
4319 "%s%u\t%s\tno recovery target specified\n",
4320 (srcfd < 0) ? "" : "\n",
4324 nbytes = strlen(buffer);
4326 if ((int) write(fd, buffer, nbytes) != nbytes)
4328 int save_errno = errno;
4331 * If we fail to make the file, delete it to release disk space
4334 /* if write didn't set errno, assume problem is no disk space */
4335 errno = save_errno ? save_errno : ENOSPC;
4338 (errcode_for_file_access(),
4339 errmsg("could not write to file \"%s\": %m", tmppath)));
4342 if (pg_fsync(fd) != 0)
4344 (errcode_for_file_access(),
4345 errmsg("could not fsync file \"%s\": %m", tmppath)));
4349 (errcode_for_file_access(),
4350 errmsg("could not close file \"%s\": %m", tmppath)));
4354 * Now move the completed history file into place with its final name.
4356 TLHistoryFilePath(path, newTLI);
4359 * Prefer link() to rename() here just to be really sure that we don't
4360 * overwrite an existing logfile. However, there shouldn't be one, so
4361 * rename() is an acceptable substitute except for the truly paranoid.
4363 #if HAVE_WORKING_LINK
4364 if (link(tmppath, path) < 0)
4366 (errcode_for_file_access(),
4367 errmsg("could not link file \"%s\" to \"%s\": %m",
4371 if (rename(tmppath, path) < 0)
4373 (errcode_for_file_access(),
4374 errmsg("could not rename file \"%s\" to \"%s\": %m",
4378 /* The history file can be archived immediately. */
4379 TLHistoryFileName(histfname, newTLI);
4380 XLogArchiveNotify(histfname);
4384 * I/O routines for pg_control
4386 * *ControlFile is a buffer in shared memory that holds an image of the
4387 * contents of pg_control. WriteControlFile() initializes pg_control
4388 * given a preloaded buffer, ReadControlFile() loads the buffer from
4389 * the pg_control file (during postmaster or standalone-backend startup),
4390 * and UpdateControlFile() rewrites pg_control after we modify xlog state.
4392 * For simplicity, WriteControlFile() initializes the fields of pg_control
4393 * that are related to checking backend/database compatibility, and
4394 * ReadControlFile() verifies they are correct. We could split out the
4395 * I/O and compatibility-check functions, but there seems no need currently.
4398 WriteControlFile(void)
4401 char buffer[PG_CONTROL_SIZE]; /* need not be aligned */
4404 * Initialize version and compatibility-check fields
4406 ControlFile->pg_control_version = PG_CONTROL_VERSION;
4407 ControlFile->catalog_version_no = CATALOG_VERSION_NO;
4409 ControlFile->maxAlign = MAXIMUM_ALIGNOF;
4410 ControlFile->floatFormat = FLOATFORMAT_VALUE;
4412 ControlFile->blcksz = BLCKSZ;
4413 ControlFile->relseg_size = RELSEG_SIZE;
4414 ControlFile->xlog_blcksz = XLOG_BLCKSZ;
4415 ControlFile->xlog_seg_size = XLOG_SEG_SIZE;
4417 ControlFile->nameDataLen = NAMEDATALEN;
4418 ControlFile->indexMaxKeys = INDEX_MAX_KEYS;
4420 ControlFile->toast_max_chunk_size = TOAST_MAX_CHUNK_SIZE;
4422 #ifdef HAVE_INT64_TIMESTAMP
4423 ControlFile->enableIntTimes = true;
4425 ControlFile->enableIntTimes = false;
4427 ControlFile->float4ByVal = FLOAT4PASSBYVAL;
4428 ControlFile->float8ByVal = FLOAT8PASSBYVAL;
4430 /* Contents are protected with a CRC */
4431 INIT_CRC32(ControlFile->crc);
4432 COMP_CRC32(ControlFile->crc,
4433 (char *) ControlFile,
4434 offsetof(ControlFileData, crc));
4435 FIN_CRC32(ControlFile->crc);
4438 * We write out PG_CONTROL_SIZE bytes into pg_control, zero-padding the
4439 * excess over sizeof(ControlFileData). This reduces the odds of
4440 * premature-EOF errors when reading pg_control. We'll still fail when we
4441 * check the contents of the file, but hopefully with a more specific
4442 * error than "couldn't read pg_control".
4444 if (sizeof(ControlFileData) > PG_CONTROL_SIZE)
4445 elog(PANIC, "sizeof(ControlFileData) is larger than PG_CONTROL_SIZE; fix either one");
4447 memset(buffer, 0, PG_CONTROL_SIZE);
4448 memcpy(buffer, ControlFile, sizeof(ControlFileData));
4450 fd = BasicOpenFile(XLOG_CONTROL_FILE,
4451 O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
4455 (errcode_for_file_access(),
4456 errmsg("could not create control file \"%s\": %m",
4457 XLOG_CONTROL_FILE)));
4460 if (write(fd, buffer, PG_CONTROL_SIZE) != PG_CONTROL_SIZE)
4462 /* if write didn't set errno, assume problem is no disk space */
4466 (errcode_for_file_access(),
4467 errmsg("could not write to control file: %m")));
4470 if (pg_fsync(fd) != 0)
4472 (errcode_for_file_access(),
4473 errmsg("could not fsync control file: %m")));
4477 (errcode_for_file_access(),
4478 errmsg("could not close control file: %m")));
4482 ReadControlFile(void)
4490 fd = BasicOpenFile(XLOG_CONTROL_FILE,
4495 (errcode_for_file_access(),
4496 errmsg("could not open control file \"%s\": %m",
4497 XLOG_CONTROL_FILE)));
4499 if (read(fd, ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
4501 (errcode_for_file_access(),
4502 errmsg("could not read from control file: %m")));
4507 * Check for expected pg_control format version. If this is wrong, the
4508 * CRC check will likely fail because we'll be checking the wrong number
4509 * of bytes. Complaining about wrong version will probably be more
4510 * enlightening than complaining about wrong CRC.
4513 if (ControlFile->pg_control_version != PG_CONTROL_VERSION && ControlFile->pg_control_version % 65536 == 0 && ControlFile->pg_control_version / 65536 != 0)
4515 (errmsg("database files are incompatible with server"),
4516 errdetail("The database cluster was initialized with PG_CONTROL_VERSION %d (0x%08x),"
4517 " but the server was compiled with PG_CONTROL_VERSION %d (0x%08x).",
4518 ControlFile->pg_control_version, ControlFile->pg_control_version,
4519 PG_CONTROL_VERSION, PG_CONTROL_VERSION),
4520 errhint("This could be a problem of mismatched byte ordering. It looks like you need to initdb.")));
4522 if (ControlFile->pg_control_version != PG_CONTROL_VERSION)
4524 (errmsg("database files are incompatible with server"),
4525 errdetail("The database cluster was initialized with PG_CONTROL_VERSION %d,"
4526 " but the server was compiled with PG_CONTROL_VERSION %d.",
4527 ControlFile->pg_control_version, PG_CONTROL_VERSION),
4528 errhint("It looks like you need to initdb.")));
4530 /* Now check the CRC. */
4533 (char *) ControlFile,
4534 offsetof(ControlFileData, crc));
4537 if (!EQ_CRC32(crc, ControlFile->crc))
4539 (errmsg("incorrect checksum in control file")));
4542 * Do compatibility checking immediately. If the database isn't
4543 * compatible with the backend executable, we want to abort before we can
4544 * possibly do any damage.
4546 if (ControlFile->catalog_version_no != CATALOG_VERSION_NO)
4548 (errmsg("database files are incompatible with server"),
4549 errdetail("The database cluster was initialized with CATALOG_VERSION_NO %d,"
4550 " but the server was compiled with CATALOG_VERSION_NO %d.",
4551 ControlFile->catalog_version_no, CATALOG_VERSION_NO),
4552 errhint("It looks like you need to initdb.")));
4553 if (ControlFile->maxAlign != MAXIMUM_ALIGNOF)
4555 (errmsg("database files are incompatible with server"),
4556 errdetail("The database cluster was initialized with MAXALIGN %d,"
4557 " but the server was compiled with MAXALIGN %d.",
4558 ControlFile->maxAlign, MAXIMUM_ALIGNOF),
4559 errhint("It looks like you need to initdb.")));
4560 if (ControlFile->floatFormat != FLOATFORMAT_VALUE)
4562 (errmsg("database files are incompatible with server"),
4563 errdetail("The database cluster appears to use a different floating-point number format than the server executable."),
4564 errhint("It looks like you need to initdb.")));
4565 if (ControlFile->blcksz != BLCKSZ)
4567 (errmsg("database files are incompatible with server"),
4568 errdetail("The database cluster was initialized with BLCKSZ %d,"
4569 " but the server was compiled with BLCKSZ %d.",
4570 ControlFile->blcksz, BLCKSZ),
4571 errhint("It looks like you need to recompile or initdb.")));
4572 if (ControlFile->relseg_size != RELSEG_SIZE)
4574 (errmsg("database files are incompatible with server"),
4575 errdetail("The database cluster was initialized with RELSEG_SIZE %d,"
4576 " but the server was compiled with RELSEG_SIZE %d.",
4577 ControlFile->relseg_size, RELSEG_SIZE),
4578 errhint("It looks like you need to recompile or initdb.")));
4579 if (ControlFile->xlog_blcksz != XLOG_BLCKSZ)
4581 (errmsg("database files are incompatible with server"),
4582 errdetail("The database cluster was initialized with XLOG_BLCKSZ %d,"
4583 " but the server was compiled with XLOG_BLCKSZ %d.",
4584 ControlFile->xlog_blcksz, XLOG_BLCKSZ),
4585 errhint("It looks like you need to recompile or initdb.")));
4586 if (ControlFile->xlog_seg_size != XLOG_SEG_SIZE)
4588 (errmsg("database files are incompatible with server"),
4589 errdetail("The database cluster was initialized with XLOG_SEG_SIZE %d,"
4590 " but the server was compiled with XLOG_SEG_SIZE %d.",
4591 ControlFile->xlog_seg_size, XLOG_SEG_SIZE),
4592 errhint("It looks like you need to recompile or initdb.")));
4593 if (ControlFile->nameDataLen != NAMEDATALEN)
4595 (errmsg("database files are incompatible with server"),
4596 errdetail("The database cluster was initialized with NAMEDATALEN %d,"
4597 " but the server was compiled with NAMEDATALEN %d.",
4598 ControlFile->nameDataLen, NAMEDATALEN),
4599 errhint("It looks like you need to recompile or initdb.")));
4600 if (ControlFile->indexMaxKeys != INDEX_MAX_KEYS)
4602 (errmsg("database files are incompatible with server"),
4603 errdetail("The database cluster was initialized with INDEX_MAX_KEYS %d,"
4604 " but the server was compiled with INDEX_MAX_KEYS %d.",
4605 ControlFile->indexMaxKeys, INDEX_MAX_KEYS),
4606 errhint("It looks like you need to recompile or initdb.")));
4607 if (ControlFile->toast_max_chunk_size != TOAST_MAX_CHUNK_SIZE)
4609 (errmsg("database files are incompatible with server"),
4610 errdetail("The database cluster was initialized with TOAST_MAX_CHUNK_SIZE %d,"
4611 " but the server was compiled with TOAST_MAX_CHUNK_SIZE %d.",
4612 ControlFile->toast_max_chunk_size, (int) TOAST_MAX_CHUNK_SIZE),
4613 errhint("It looks like you need to recompile or initdb.")));
4615 #ifdef HAVE_INT64_TIMESTAMP
4616 if (ControlFile->enableIntTimes != true)
4618 (errmsg("database files are incompatible with server"),
4619 errdetail("The database cluster was initialized without HAVE_INT64_TIMESTAMP"
4620 " but the server was compiled with HAVE_INT64_TIMESTAMP."),
4621 errhint("It looks like you need to recompile or initdb.")));
4623 if (ControlFile->enableIntTimes != false)
4625 (errmsg("database files are incompatible with server"),
4626 errdetail("The database cluster was initialized with HAVE_INT64_TIMESTAMP"
4627 " but the server was compiled without HAVE_INT64_TIMESTAMP."),
4628 errhint("It looks like you need to recompile or initdb.")));
4631 #ifdef USE_FLOAT4_BYVAL
4632 if (ControlFile->float4ByVal != true)
4634 (errmsg("database files are incompatible with server"),
4635 errdetail("The database cluster was initialized without USE_FLOAT4_BYVAL"
4636 " but the server was compiled with USE_FLOAT4_BYVAL."),
4637 errhint("It looks like you need to recompile or initdb.")));
4639 if (ControlFile->float4ByVal != false)
4641 (errmsg("database files are incompatible with server"),
4642 errdetail("The database cluster was initialized with USE_FLOAT4_BYVAL"
4643 " but the server was compiled without USE_FLOAT4_BYVAL."),
4644 errhint("It looks like you need to recompile or initdb.")));
4647 #ifdef USE_FLOAT8_BYVAL
4648 if (ControlFile->float8ByVal != true)
4650 (errmsg("database files are incompatible with server"),
4651 errdetail("The database cluster was initialized without USE_FLOAT8_BYVAL"
4652 " but the server was compiled with USE_FLOAT8_BYVAL."),
4653 errhint("It looks like you need to recompile or initdb.")));
4655 if (ControlFile->float8ByVal != false)
4657 (errmsg("database files are incompatible with server"),
4658 errdetail("The database cluster was initialized with USE_FLOAT8_BYVAL"
4659 " but the server was compiled without USE_FLOAT8_BYVAL."),
4660 errhint("It looks like you need to recompile or initdb.")));
4665 UpdateControlFile(void)
4669 INIT_CRC32(ControlFile->crc);
4670 COMP_CRC32(ControlFile->crc,
4671 (char *) ControlFile,
4672 offsetof(ControlFileData, crc));
4673 FIN_CRC32(ControlFile->crc);
4675 fd = BasicOpenFile(XLOG_CONTROL_FILE,
4680 (errcode_for_file_access(),
4681 errmsg("could not open control file \"%s\": %m",
4682 XLOG_CONTROL_FILE)));
4685 if (write(fd, ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
4687 /* if write didn't set errno, assume problem is no disk space */
4691 (errcode_for_file_access(),
4692 errmsg("could not write to control file: %m")));
4695 if (pg_fsync(fd) != 0)
4697 (errcode_for_file_access(),
4698 errmsg("could not fsync control file: %m")));
4702 (errcode_for_file_access(),
4703 errmsg("could not close control file: %m")));
4707 * Returns the unique system identifier from control file.
4710 GetSystemIdentifier(void)
4712 Assert(ControlFile != NULL);
4713 return ControlFile->system_identifier;
4717 * Initialization of shared memory for XLOG
4725 size = sizeof(XLogCtlData);
4726 /* xlblocks array */
4727 size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers));
4728 /* extra alignment padding for XLOG I/O buffers */
4729 size = add_size(size, ALIGNOF_XLOG_BUFFER);
4730 /* and the buffers themselves */
4731 size = add_size(size, mul_size(XLOG_BLCKSZ, XLOGbuffers));
4734 * Note: we don't count ControlFileData, it comes out of the "slop factor"
4735 * added by CreateSharedMemoryAndSemaphores. This lets us use this
4736 * routine again below to compute the actual allocation size.
4749 ControlFile = (ControlFileData *)
4750 ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile);
4751 XLogCtl = (XLogCtlData *)
4752 ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog);
4754 if (foundCFile || foundXLog)
4756 /* both should be present or neither */
4757 Assert(foundCFile && foundXLog);
4761 memset(XLogCtl, 0, sizeof(XLogCtlData));
4764 * Since XLogCtlData contains XLogRecPtr fields, its sizeof should be a
4765 * multiple of the alignment for same, so no extra alignment padding is
4768 allocptr = ((char *) XLogCtl) + sizeof(XLogCtlData);
4769 XLogCtl->xlblocks = (XLogRecPtr *) allocptr;
4770 memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
4771 allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
4774 * Align the start of the page buffers to an ALIGNOF_XLOG_BUFFER boundary.
4776 allocptr = (char *) TYPEALIGN(ALIGNOF_XLOG_BUFFER, allocptr);
4777 XLogCtl->pages = allocptr;
4778 memset(XLogCtl->pages, 0, (Size) XLOG_BLCKSZ * XLOGbuffers);
4781 * Do basic initialization of XLogCtl shared data. (StartupXLOG will fill
4782 * in additional info.)
4784 XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
4785 XLogCtl->SharedRecoveryInProgress = true;
4786 XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
4787 SpinLockInit(&XLogCtl->info_lck);
4790 * If we are not in bootstrap mode, pg_control should already exist. Read
4791 * and validate it immediately (see comments in ReadControlFile() for the
4794 if (!IsBootstrapProcessingMode())
4799 * This func must be called ONCE on system install. It creates pg_control
4800 * and the initial XLOG segment.
4805 CheckPoint checkPoint;
4807 XLogPageHeader page;
4808 XLogLongPageHeader longpage;
4811 uint64 sysidentifier;
4816 * Select a hopefully-unique system identifier code for this installation.
4817 * We use the result of gettimeofday(), including the fractional seconds
4818 * field, as being about as unique as we can easily get. (Think not to
4819 * use random(), since it hasn't been seeded and there's no portable way
4820 * to seed it other than the system clock value...) The upper half of the
4821 * uint64 value is just the tv_sec part, while the lower half is the XOR
4822 * of tv_sec and tv_usec. This is to ensure that we don't lose uniqueness
4823 * unnecessarily if "uint64" is really only 32 bits wide. A person
4824 * knowing this encoding can determine the initialization time of the
4825 * installation, which could perhaps be useful sometimes.
4827 gettimeofday(&tv, NULL);
4828 sysidentifier = ((uint64) tv.tv_sec) << 32;
4829 sysidentifier |= (uint32) (tv.tv_sec | tv.tv_usec);
4831 /* First timeline ID is always 1 */
4834 /* page buffer must be aligned suitably for O_DIRECT */
4835 buffer = (char *) palloc(XLOG_BLCKSZ + ALIGNOF_XLOG_BUFFER);
4836 page = (XLogPageHeader) TYPEALIGN(ALIGNOF_XLOG_BUFFER, buffer);
4837 memset(page, 0, XLOG_BLCKSZ);
4839 /* Set up information for the initial checkpoint record */
4840 checkPoint.redo.xlogid = 0;
4841 checkPoint.redo.xrecoff = SizeOfXLogLongPHD;
4842 checkPoint.ThisTimeLineID = ThisTimeLineID;
4843 checkPoint.nextXidEpoch = 0;
4844 checkPoint.nextXid = FirstNormalTransactionId;
4845 checkPoint.nextOid = FirstBootstrapObjectId;
4846 checkPoint.nextMulti = FirstMultiXactId;
4847 checkPoint.nextMultiOffset = 0;
4848 checkPoint.oldestXid = FirstNormalTransactionId;
4849 checkPoint.oldestXidDB = TemplateDbOid;
4850 checkPoint.time = (pg_time_t) time(NULL);
4851 checkPoint.oldestActiveXid = InvalidTransactionId;
4853 ShmemVariableCache->nextXid = checkPoint.nextXid;
4854 ShmemVariableCache->nextOid = checkPoint.nextOid;
4855 ShmemVariableCache->oidCount = 0;
4856 MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
4857 SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
4859 /* Set up the XLOG page header */
4860 page->xlp_magic = XLOG_PAGE_MAGIC;
4861 page->xlp_info = XLP_LONG_HEADER;
4862 page->xlp_tli = ThisTimeLineID;
4863 page->xlp_pageaddr.xlogid = 0;
4864 page->xlp_pageaddr.xrecoff = 0;
4865 longpage = (XLogLongPageHeader) page;
4866 longpage->xlp_sysid = sysidentifier;
4867 longpage->xlp_seg_size = XLogSegSize;
4868 longpage->xlp_xlog_blcksz = XLOG_BLCKSZ;
4870 /* Insert the initial checkpoint record */
4871 record = (XLogRecord *) ((char *) page + SizeOfXLogLongPHD);
4872 record->xl_prev.xlogid = 0;
4873 record->xl_prev.xrecoff = 0;
4874 record->xl_xid = InvalidTransactionId;
4875 record->xl_tot_len = SizeOfXLogRecord + sizeof(checkPoint);
4876 record->xl_len = sizeof(checkPoint);
4877 record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
4878 record->xl_rmid = RM_XLOG_ID;
4879 memcpy(XLogRecGetData(record), &checkPoint, sizeof(checkPoint));
4882 COMP_CRC32(crc, &checkPoint, sizeof(checkPoint));
4883 COMP_CRC32(crc, (char *) record + sizeof(pg_crc32),
4884 SizeOfXLogRecord - sizeof(pg_crc32));
4886 record->xl_crc = crc;
4888 /* Create first XLOG segment file */
4889 use_existent = false;
4890 openLogFile = XLogFileInit(0, 0, &use_existent, false);
4892 /* Write the first page with the initial record */
4894 if (write(openLogFile, page, XLOG_BLCKSZ) != XLOG_BLCKSZ)
4896 /* if write didn't set errno, assume problem is no disk space */
4900 (errcode_for_file_access(),
4901 errmsg("could not write bootstrap transaction log file: %m")));
4904 if (pg_fsync(openLogFile) != 0)
4906 (errcode_for_file_access(),
4907 errmsg("could not fsync bootstrap transaction log file: %m")));
4909 if (close(openLogFile))
4911 (errcode_for_file_access(),
4912 errmsg("could not close bootstrap transaction log file: %m")));
4916 /* Now create pg_control */
4918 memset(ControlFile, 0, sizeof(ControlFileData));
4919 /* Initialize pg_control status fields */
4920 ControlFile->system_identifier = sysidentifier;
4921 ControlFile->state = DB_SHUTDOWNED;
4922 ControlFile->time = checkPoint.time;
4923 ControlFile->checkPoint = checkPoint.redo;
4924 ControlFile->checkPointCopy = checkPoint;
4925 /* some additional ControlFile fields are set in WriteControlFile() */
4929 /* Bootstrap the commit log, too */
4931 BootStrapSUBTRANS();
4932 BootStrapMultiXact();
4938 str_time(pg_time_t tnow)
4940 static char buf[128];
4942 pg_strftime(buf, sizeof(buf),
4943 "%Y-%m-%d %H:%M:%S %Z",
4944 pg_localtime(&tnow, log_timezone));
4950 * Parse one line from recovery.conf. 'cmdline' is the raw line from the
4951 * file. If the line is parsed successfully, returns true, false indicates
4952 * syntax error. On success, *key_p and *value_p are set to the parameter
4953 * name and value on the line, respectively. If the line is an empty line,
4954 * consisting entirely of whitespace and comments, function returns true
4955 * and *keyp_p and *value_p are set to NULL.
4957 * The pointers returned in *key_p and *value_p point to an internal buffer
4958 * that is valid only until the next call of parseRecoveryCommandFile().
4961 parseRecoveryCommandFileLine(char *cmdline, char **key_p, char **value_p)
4967 static char *buf = NULL;
4969 *key_p = *value_p = NULL;
4972 * Allocate the buffer on first use. It's used to hold both the
4973 * parameter name and value.
4976 buf = malloc(MAXPGPATH + 1);
4979 /* Skip any whitespace at the beginning of line */
4980 for (ptr = cmdline; *ptr; ptr++)
4982 if (!isspace((unsigned char) *ptr))
4985 /* Ignore empty lines */
4986 if (*ptr == '\0' || *ptr == '#')
4989 /* Read the parameter name */
4991 while (*ptr && !isspace((unsigned char) *ptr) &&
4992 *ptr != '=' && *ptr != '\'')
4993 *(bufp++) = *(ptr++);
4996 /* Skip to the beginning quote of the parameter value */
4997 ptr = strchr(ptr, '\'');
5002 /* Read the parameter value to *bufp. Collapse any '' escapes as we go. */
5013 /* end of parameter */
5018 else if (*ptr == '\0')
5019 return false; /* unterminated quoted string */
5027 /* Check that there's no garbage after the value */
5032 if (!isspace((unsigned char) *ptr))
5044 * See if there is a recovery command file (recovery.conf), and if so
5045 * read in parameters for archive recovery and XLOG streaming.
5047 * XXX longer term intention is to expand this to
5048 * cater for additional parameters and controls
5049 * possibly use a flex lexer similar to the GUC one
5052 readRecoveryCommandFile(void)
5055 char cmdline[MAXPGPATH];
5056 TimeLineID rtli = 0;
5057 bool rtliGiven = false;
5058 bool syntaxError = false;
5060 fd = AllocateFile(RECOVERY_COMMAND_FILE, "r");
5063 if (errno == ENOENT)
5064 return; /* not there, so no archive recovery */
5066 (errcode_for_file_access(),
5067 errmsg("could not open recovery command file \"%s\": %m",
5068 RECOVERY_COMMAND_FILE)));
5074 while (fgets(cmdline, sizeof(cmdline), fd) != NULL)
5079 if (!parseRecoveryCommandFileLine(cmdline, &tok1, &tok2))
5087 if (strcmp(tok1, "restore_command") == 0)
5089 recoveryRestoreCommand = pstrdup(tok2);
5091 (errmsg("restore_command = '%s'",
5092 recoveryRestoreCommand)));
5094 else if (strcmp(tok1, "recovery_end_command") == 0)
5096 recoveryEndCommand = pstrdup(tok2);
5098 (errmsg("recovery_end_command = '%s'",
5099 recoveryEndCommand)));
5101 else if (strcmp(tok1, "restartpoint_command") == 0)
5103 restartPointCommand = pstrdup(tok2);
5105 (errmsg("restartpoint_command = '%s'",
5106 restartPointCommand)));
5108 else if (strcmp(tok1, "recovery_target_timeline") == 0)
5111 if (strcmp(tok2, "latest") == 0)
5116 rtli = (TimeLineID) strtoul(tok2, NULL, 0);
5117 if (errno == EINVAL || errno == ERANGE)
5119 (errmsg("recovery_target_timeline is not a valid number: \"%s\"",
5124 (errmsg("recovery_target_timeline = %u", rtli)));
5127 (errmsg("recovery_target_timeline = latest")));
5129 else if (strcmp(tok1, "recovery_target_xid") == 0)
5132 recoveryTargetXid = (TransactionId) strtoul(tok2, NULL, 0);
5133 if (errno == EINVAL || errno == ERANGE)
5135 (errmsg("recovery_target_xid is not a valid number: \"%s\"",
5138 (errmsg("recovery_target_xid = %u",
5139 recoveryTargetXid)));
5140 recoveryTarget = RECOVERY_TARGET_XID;
5142 else if (strcmp(tok1, "recovery_target_time") == 0)
5145 * if recovery_target_xid specified, then this overrides
5146 * recovery_target_time
5148 if (recoveryTarget == RECOVERY_TARGET_XID)
5150 recoveryTarget = RECOVERY_TARGET_TIME;
5153 * Convert the time string given by the user to TimestampTz form.
5155 recoveryTargetTime =
5156 DatumGetTimestampTz(DirectFunctionCall3(timestamptz_in,
5157 CStringGetDatum(tok2),
5158 ObjectIdGetDatum(InvalidOid),
5159 Int32GetDatum(-1)));
5161 (errmsg("recovery_target_time = '%s'",
5162 timestamptz_to_str(recoveryTargetTime))));
5164 else if (strcmp(tok1, "recovery_target_inclusive") == 0)
5167 * does nothing if a recovery_target is not also set
5169 if (!parse_bool(tok2, &recoveryTargetInclusive))
5171 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
5172 errmsg("parameter \"recovery_target_inclusive\" requires a Boolean value")));
5174 (errmsg("recovery_target_inclusive = %s", tok2)));
5176 else if (strcmp(tok1, "standby_mode") == 0)
5178 if (!parse_bool(tok2, &StandbyMode))
5180 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
5181 errmsg("parameter \"standby_mode\" requires a Boolean value")));
5183 (errmsg("standby_mode = '%s'", tok2)));
5185 else if (strcmp(tok1, "primary_conninfo") == 0)
5187 PrimaryConnInfo = pstrdup(tok2);
5189 (errmsg("primary_conninfo = '%s'",
5192 else if (strcmp(tok1, "trigger_file") == 0)
5194 TriggerFile = pstrdup(tok2);
5196 (errmsg("trigger_file = '%s'",
5201 (errmsg("unrecognized recovery parameter \"%s\"",
5209 (errmsg("syntax error in recovery command file: %s",
5211 errhint("Lines should have the format parameter = 'value'.")));
5214 * Check for compulsory parameters
5218 if (PrimaryConnInfo == NULL && recoveryRestoreCommand == NULL)
5220 (errmsg("recovery command file \"%s\" specified neither primary_conninfo nor restore_command",
5221 RECOVERY_COMMAND_FILE),
5222 errhint("The database server will regularly poll the pg_xlog subdirectory to check for files placed there.")));
5226 if (recoveryRestoreCommand == NULL)
5228 (errmsg("recovery command file \"%s\" must specify restore_command when standby mode is not enabled",
5229 RECOVERY_COMMAND_FILE)));
5232 /* Enable fetching from archive recovery area */
5233 InArchiveRecovery = true;
5236 * If user specified recovery_target_timeline, validate it or compute the
5237 * "latest" value. We can't do this until after we've gotten the restore
5238 * command and set InArchiveRecovery, because we need to fetch timeline
5239 * history files from the archive.
5245 /* Timeline 1 does not have a history file, all else should */
5246 if (rtli != 1 && !existsTimeLineHistory(rtli))
5248 (errmsg("recovery target timeline %u does not exist",
5250 recoveryTargetTLI = rtli;
5254 /* We start the "latest" search from pg_control's timeline */
5255 recoveryTargetTLI = findNewestTimeLine(recoveryTargetTLI);
5261 * Exit archive-recovery state
5264 exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
5266 char recoveryPath[MAXPGPATH];
5267 char xlogpath[MAXPGPATH];
5268 XLogRecPtr InvalidXLogRecPtr = {0, 0};
5271 * We are no longer in archive recovery state.
5273 InArchiveRecovery = false;
5276 * Update min recovery point one last time.
5278 UpdateMinRecoveryPoint(InvalidXLogRecPtr, true);
5281 * If the ending log segment is still open, close it (to avoid problems on
5282 * Windows with trying to rename or delete an open file).
5291 * If the segment was fetched from archival storage, we want to replace
5292 * the existing xlog segment (if any) with the archival version. This is
5293 * because whatever is in XLOGDIR is very possibly older than what we have
5294 * from the archives, since it could have come from restoring a PGDATA
5295 * backup. In any case, the archival version certainly is more
5296 * descriptive of what our current database state is, because that is what
5299 * Note that if we are establishing a new timeline, ThisTimeLineID is
5300 * already set to the new value, and so we will create a new file instead
5301 * of overwriting any existing file. (This is, in fact, always the case
5304 snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYXLOG");
5305 XLogFilePath(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
5307 if (restoredFromArchive)
5310 (errmsg_internal("moving last restored xlog to \"%s\"",
5312 unlink(xlogpath); /* might or might not exist */
5313 if (rename(recoveryPath, xlogpath) != 0)
5315 (errcode_for_file_access(),
5316 errmsg("could not rename file \"%s\" to \"%s\": %m",
5317 recoveryPath, xlogpath)));
5318 /* XXX might we need to fix permissions on the file? */
5323 * If the latest segment is not archival, but there's still a
5324 * RECOVERYXLOG laying about, get rid of it.
5326 unlink(recoveryPath); /* ignore any error */
5329 * If we are establishing a new timeline, we have to copy data from
5330 * the last WAL segment of the old timeline to create a starting WAL
5331 * segment for the new timeline.
5333 * Notify the archiver that the last WAL segment of the old timeline
5334 * is ready to copy to archival storage. Otherwise, it is not archived
5337 if (endTLI != ThisTimeLineID)
5339 XLogFileCopy(endLogId, endLogSeg,
5340 endTLI, endLogId, endLogSeg);
5342 if (XLogArchivingActive())
5344 XLogFileName(xlogpath, endTLI, endLogId, endLogSeg);
5345 XLogArchiveNotify(xlogpath);
5351 * Let's just make real sure there are not .ready or .done flags posted
5352 * for the new segment.
5354 XLogFileName(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
5355 XLogArchiveCleanup(xlogpath);
5357 /* Get rid of any remaining recovered timeline-history file, too */
5358 snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYHISTORY");
5359 unlink(recoveryPath); /* ignore any error */
5362 * Rename the config file out of the way, so that we don't accidentally
5363 * re-enter archive recovery mode in a subsequent crash.
5365 unlink(RECOVERY_COMMAND_DONE);
5366 if (rename(RECOVERY_COMMAND_FILE, RECOVERY_COMMAND_DONE) != 0)
5368 (errcode_for_file_access(),
5369 errmsg("could not rename file \"%s\" to \"%s\": %m",
5370 RECOVERY_COMMAND_FILE, RECOVERY_COMMAND_DONE)));
5373 (errmsg("archive recovery complete")));
5377 * For point-in-time recovery, this function decides whether we want to
5378 * stop applying the XLOG at or after the current record.
5380 * Returns TRUE if we are stopping, FALSE otherwise. On TRUE return,
5381 * *includeThis is set TRUE if we should apply this record before stopping.
5383 * We also track the timestamp of the latest applied COMMIT/ABORT record
5384 * in recoveryLastXTime, for logging purposes.
5385 * Also, some information is saved in recoveryStopXid et al for use in
5386 * annotating the new timeline's history file.
5389 recoveryStopsHere(XLogRecord *record, bool *includeThis)
5393 TimestampTz recordXtime;
5395 /* We only consider stopping at COMMIT or ABORT records */
5396 if (record->xl_rmid == RM_XACT_ID)
5398 record_info = record->xl_info & ~XLR_INFO_MASK;
5399 if (record_info == XLOG_XACT_COMMIT)
5401 xl_xact_commit *recordXactCommitData;
5403 recordXactCommitData = (xl_xact_commit *) XLogRecGetData(record);
5404 recordXtime = recordXactCommitData->xact_time;
5406 else if (record_info == XLOG_XACT_ABORT)
5408 xl_xact_abort *recordXactAbortData;
5410 recordXactAbortData = (xl_xact_abort *) XLogRecGetData(record);
5411 recordXtime = recordXactAbortData->xact_time;
5416 else if (record->xl_rmid == RM_XLOG_ID)
5418 record_info = record->xl_info & ~XLR_INFO_MASK;
5419 if (record_info == XLOG_CHECKPOINT_SHUTDOWN ||
5420 record_info == XLOG_CHECKPOINT_ONLINE)
5422 CheckPoint checkPoint;
5424 memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
5425 recoveryLastXTime = checkPoint.time;
5429 * We don't want to stop recovery on a checkpoint record, but we do
5430 * want to update recoveryLastXTime. So return is unconditional.
5437 /* Do we have a PITR target at all? */
5438 if (recoveryTarget == RECOVERY_TARGET_UNSET)
5440 recoveryLastXTime = recordXtime;
5444 if (recoveryTarget == RECOVERY_TARGET_XID)
5447 * there can be only one transaction end record with this exact
5450 * when testing for an xid, we MUST test for equality only, since
5451 * transactions are numbered in the order they start, not the order
5452 * they complete. A higher numbered xid will complete before you about
5453 * 50% of the time...
5455 stopsHere = (record->xl_xid == recoveryTargetXid);
5457 *includeThis = recoveryTargetInclusive;
5462 * there can be many transactions that share the same commit time, so
5463 * we stop after the last one, if we are inclusive, or stop at the
5464 * first one if we are exclusive
5466 if (recoveryTargetInclusive)
5467 stopsHere = (recordXtime > recoveryTargetTime);
5469 stopsHere = (recordXtime >= recoveryTargetTime);
5471 *includeThis = false;
5476 recoveryStopXid = record->xl_xid;
5477 recoveryStopTime = recordXtime;
5478 recoveryStopAfter = *includeThis;
5480 if (record_info == XLOG_XACT_COMMIT)
5482 if (recoveryStopAfter)
5484 (errmsg("recovery stopping after commit of transaction %u, time %s",
5486 timestamptz_to_str(recoveryStopTime))));
5489 (errmsg("recovery stopping before commit of transaction %u, time %s",
5491 timestamptz_to_str(recoveryStopTime))));
5495 if (recoveryStopAfter)
5497 (errmsg("recovery stopping after abort of transaction %u, time %s",
5499 timestamptz_to_str(recoveryStopTime))));
5502 (errmsg("recovery stopping before abort of transaction %u, time %s",
5504 timestamptz_to_str(recoveryStopTime))));
5507 if (recoveryStopAfter)
5508 recoveryLastXTime = recordXtime;
5511 recoveryLastXTime = recordXtime;
5517 * Returns bool with current recovery mode, a global state.
5520 pg_is_in_recovery(PG_FUNCTION_ARGS)
5522 PG_RETURN_BOOL(RecoveryInProgress());
5526 * Returns timestamp of last recovered commit/abort record.
5529 GetLatestXLogTime(void)
5531 /* use volatile pointer to prevent code rearrangement */
5532 volatile XLogCtlData *xlogctl = XLogCtl;
5534 SpinLockAcquire(&xlogctl->info_lck);
5535 recoveryLastXTime = xlogctl->recoveryLastXTime;
5536 SpinLockRelease(&xlogctl->info_lck);
5538 return recoveryLastXTime;
5542 * Note that text field supplied is a parameter name and does not require translation
5544 #define RecoveryRequiresIntParameter(param_name, currValue, checkpointValue) \
5546 if (currValue < checkpointValue) \
5548 (errmsg("recovery connections cannot continue because " \
5549 "%s = %u is a lower setting than on WAL source server (value was %u)", \
5552 checkpointValue))); \
5556 * Check to see if required parameters are set high enough on this server
5557 * for various aspects of recovery operation.
5560 CheckRequiredParameterValues(CheckPoint checkPoint)
5562 /* We ignore autovacuum_max_workers when we make this test. */
5563 RecoveryRequiresIntParameter("max_connections",
5564 MaxConnections, checkPoint.MaxConnections);
5566 RecoveryRequiresIntParameter("max_prepared_xacts",
5567 max_prepared_xacts, checkPoint.max_prepared_xacts);
5568 RecoveryRequiresIntParameter("max_locks_per_xact",
5569 max_locks_per_xact, checkPoint.max_locks_per_xact);
5571 if (!checkPoint.XLogStandbyInfoMode)
5573 (errmsg("recovery connections cannot start because the recovery_connections "
5574 "parameter is disabled on the WAL source server")));
5578 * This must be called ONCE during postmaster or standalone-backend startup
5583 XLogCtlInsert *Insert;
5584 CheckPoint checkPoint;
5586 bool reachedStopPoint = false;
5587 bool haveBackupLabel = false;
5595 TransactionId oldestActiveXID;
5596 bool bgwriterLaunched = false;
5599 * Read control file and check XLOG status looks valid.
5601 * Note: in most control paths, *ControlFile is already valid and we need
5602 * not do ReadControlFile() here, but might as well do it to be sure.
5606 if (ControlFile->state < DB_SHUTDOWNED ||
5607 ControlFile->state > DB_IN_PRODUCTION ||
5608 !XRecOffIsValid(ControlFile->checkPoint.xrecoff))
5610 (errmsg("control file contains invalid data")));
5612 if (ControlFile->state == DB_SHUTDOWNED)
5614 (errmsg("database system was shut down at %s",
5615 str_time(ControlFile->time))));
5616 else if (ControlFile->state == DB_SHUTDOWNING)
5618 (errmsg("database system shutdown was interrupted; last known up at %s",
5619 str_time(ControlFile->time))));
5620 else if (ControlFile->state == DB_IN_CRASH_RECOVERY)
5622 (errmsg("database system was interrupted while in recovery at %s",
5623 str_time(ControlFile->time)),
5624 errhint("This probably means that some data is corrupted and"
5625 " you will have to use the last backup for recovery.")));
5626 else if (ControlFile->state == DB_IN_ARCHIVE_RECOVERY)
5628 (errmsg("database system was interrupted while in recovery at log time %s",
5629 str_time(ControlFile->checkPointCopy.time)),
5630 errhint("If this has occurred more than once some data might be corrupted"
5631 " and you might need to choose an earlier recovery target.")));
5632 else if (ControlFile->state == DB_IN_PRODUCTION)
5634 (errmsg("database system was interrupted; last known up at %s",
5635 str_time(ControlFile->time))));
5637 /* This is just to allow attaching to startup process with a debugger */
5638 #ifdef XLOG_REPLAY_DELAY
5639 if (ControlFile->state != DB_SHUTDOWNED)
5640 pg_usleep(60000000L);
5644 * Verify that pg_xlog and pg_xlog/archive_status exist. In cases where
5645 * someone has performed a copy for PITR, these directories may have been
5646 * excluded and need to be re-created.
5648 ValidateXLOGDirectoryStructure();
5651 * Clear out any old relcache cache files. This is *necessary* if we do
5652 * any WAL replay, since that would probably result in the cache files
5653 * being out of sync with database reality. In theory we could leave them
5654 * in place if the database had been cleanly shut down, but it seems
5655 * safest to just remove them always and let them be rebuilt during the
5656 * first backend startup.
5658 RelationCacheInitFileRemove();
5661 * Initialize on the assumption we want to recover to the same timeline
5662 * that's active according to pg_control.
5664 recoveryTargetTLI = ControlFile->checkPointCopy.ThisTimeLineID;
5667 * Check for recovery control file, and if so set up state for offline
5670 readRecoveryCommandFile();
5672 /* Now we can determine the list of expected TLIs */
5673 expectedTLIs = readTimeLineHistory(recoveryTargetTLI);
5676 * If pg_control's timeline is not in expectedTLIs, then we cannot
5677 * proceed: the backup is not part of the history of the requested
5680 if (!list_member_int(expectedTLIs,
5681 (int) ControlFile->checkPointCopy.ThisTimeLineID))
5683 (errmsg("requested timeline %u is not a child of database system timeline %u",
5685 ControlFile->checkPointCopy.ThisTimeLineID)));
5688 * Save the selected recovery target timeline ID and restartpoint_command
5689 * in shared memory so that other processes can see them
5691 XLogCtl->RecoveryTargetTLI = recoveryTargetTLI;
5692 strncpy(XLogCtl->restartPointCommand,
5693 restartPointCommand ? restartPointCommand : "",
5694 sizeof(XLogCtl->restartPointCommand));
5696 if (InArchiveRecovery)
5700 (errmsg("entering standby mode")));
5701 else if (recoveryTarget == RECOVERY_TARGET_XID)
5703 (errmsg("starting point-in-time recovery to XID %u",
5704 recoveryTargetXid)));
5705 else if (recoveryTarget == RECOVERY_TARGET_TIME)
5707 (errmsg("starting point-in-time recovery to %s",
5708 timestamptz_to_str(recoveryTargetTime))));
5711 (errmsg("starting archive recovery")));
5714 if (read_backup_label(&checkPointLoc))
5717 * When a backup_label file is present, we want to roll forward from
5718 * the checkpoint it identifies, rather than using pg_control.
5720 record = ReadCheckpointRecord(checkPointLoc, 0);
5724 (errmsg("checkpoint record is at %X/%X",
5725 checkPointLoc.xlogid, checkPointLoc.xrecoff)));
5726 InRecovery = true; /* force recovery even if SHUTDOWNED */
5731 (errmsg("could not locate required checkpoint record"),
5732 errhint("If you are not restoring from a backup, try removing the file \"%s/backup_label\".", DataDir)));
5734 /* set flag to delete it later */
5735 haveBackupLabel = true;
5740 * Get the last valid checkpoint record. If the latest one according
5741 * to pg_control is broken, try the next-to-last one.
5743 checkPointLoc = ControlFile->checkPoint;
5744 RedoStartLSN = ControlFile->checkPointCopy.redo;
5745 record = ReadCheckpointRecord(checkPointLoc, 1);
5749 (errmsg("checkpoint record is at %X/%X",
5750 checkPointLoc.xlogid, checkPointLoc.xrecoff)));
5752 else if (StandbyMode)
5755 * The last valid checkpoint record required for a streaming
5756 * recovery exists in neither standby nor the primary.
5759 (errmsg("could not locate a valid checkpoint record")));
5763 checkPointLoc = ControlFile->prevCheckPoint;
5764 record = ReadCheckpointRecord(checkPointLoc, 2);
5768 (errmsg("using previous checkpoint record at %X/%X",
5769 checkPointLoc.xlogid, checkPointLoc.xrecoff)));
5770 InRecovery = true; /* force recovery even if SHUTDOWNED */
5774 (errmsg("could not locate a valid checkpoint record")));
5778 LastRec = RecPtr = checkPointLoc;
5779 memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
5780 wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
5783 (errmsg("redo record is at %X/%X; shutdown %s",
5784 checkPoint.redo.xlogid, checkPoint.redo.xrecoff,
5785 wasShutdown ? "TRUE" : "FALSE")));
5787 (errmsg("next transaction ID: %u/%u; next OID: %u",
5788 checkPoint.nextXidEpoch, checkPoint.nextXid,
5789 checkPoint.nextOid)));
5791 (errmsg("next MultiXactId: %u; next MultiXactOffset: %u",
5792 checkPoint.nextMulti, checkPoint.nextMultiOffset)));
5794 (errmsg("oldest unfrozen transaction ID: %u, in database %u",
5795 checkPoint.oldestXid, checkPoint.oldestXidDB)));
5796 if (!TransactionIdIsNormal(checkPoint.nextXid))
5798 (errmsg("invalid next transaction ID")));
5800 ShmemVariableCache->nextXid = checkPoint.nextXid;
5801 ShmemVariableCache->nextOid = checkPoint.nextOid;
5802 ShmemVariableCache->oidCount = 0;
5803 MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
5804 SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
5807 * We must replay WAL entries using the same TimeLineID they were created
5808 * under, so temporarily adopt the TLI indicated by the checkpoint (see
5809 * also xlog_redo()).
5811 ThisTimeLineID = checkPoint.ThisTimeLineID;
5813 RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
5815 if (XLByteLT(RecPtr, checkPoint.redo))
5817 (errmsg("invalid redo in checkpoint record")));
5820 * Check whether we need to force recovery from WAL. If it appears to
5821 * have been a clean shutdown and we did not have a recovery.conf file,
5822 * then assume no recovery needed.
5824 if (XLByteLT(checkPoint.redo, RecPtr))
5828 (errmsg("invalid redo record in shutdown checkpoint")));
5831 else if (ControlFile->state != DB_SHUTDOWNED)
5833 else if (InArchiveRecovery)
5835 /* force recovery due to presence of recovery.conf */
5843 /* use volatile pointer to prevent code rearrangement */
5844 volatile XLogCtlData *xlogctl = XLogCtl;
5847 * Update pg_control to show that we are recovering and to show the
5848 * selected checkpoint as the place we are starting from. We also mark
5849 * pg_control with any minimum recovery stop point obtained from a
5850 * backup history file.
5852 if (InArchiveRecovery)
5853 ControlFile->state = DB_IN_ARCHIVE_RECOVERY;
5857 (errmsg("database system was not properly shut down; "
5858 "automatic recovery in progress")));
5859 ControlFile->state = DB_IN_CRASH_RECOVERY;
5861 ControlFile->prevCheckPoint = ControlFile->checkPoint;
5862 ControlFile->checkPoint = checkPointLoc;
5863 ControlFile->checkPointCopy = checkPoint;
5864 if (InArchiveRecovery)
5866 /* initialize minRecoveryPoint if not set yet */
5867 if (XLByteLT(ControlFile->minRecoveryPoint, checkPoint.redo))
5868 ControlFile->minRecoveryPoint = checkPoint.redo;
5872 * set backupStartupPoint if we're starting archive recovery from a
5875 if (haveBackupLabel)
5876 ControlFile->backupStartPoint = checkPoint.redo;
5877 ControlFile->time = (pg_time_t) time(NULL);
5878 /* No need to hold ControlFileLock yet, we aren't up far enough */
5879 UpdateControlFile();
5881 /* initialize our local copy of minRecoveryPoint */
5882 minRecoveryPoint = ControlFile->minRecoveryPoint;
5885 * Reset pgstat data, because it may be invalid after recovery.
5890 * If there was a backup label file, it's done its job and the info
5891 * has now been propagated into pg_control. We must get rid of the
5892 * label file so that if we crash during recovery, we'll pick up at
5893 * the latest recovery restartpoint instead of going all the way back
5894 * to the backup start point. It seems prudent though to just rename
5895 * the file out of the way rather than delete it completely.
5897 if (haveBackupLabel)
5899 unlink(BACKUP_LABEL_OLD);
5900 if (rename(BACKUP_LABEL_FILE, BACKUP_LABEL_OLD) != 0)
5902 (errcode_for_file_access(),
5903 errmsg("could not rename file \"%s\" to \"%s\": %m",
5904 BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
5908 * Initialize recovery connections, if enabled. We won't let backends
5909 * in yet, not until we've reached the min recovery point specified in
5910 * control file and we've established a recovery snapshot from a
5911 * running-xacts WAL record.
5913 if (InArchiveRecovery && XLogRequestRecoveryConnections)
5915 TransactionId *xids;
5918 CheckRequiredParameterValues(checkPoint);
5921 (errmsg("initializing recovery connections")));
5923 InitRecoveryTransactionEnvironment();
5926 oldestActiveXID = PrescanPreparedTransactions(&xids, &nxids);
5928 oldestActiveXID = checkPoint.oldestActiveXid;
5929 Assert(TransactionIdIsValid(oldestActiveXID));
5931 /* Startup commit log and related stuff */
5933 StartupSUBTRANS(oldestActiveXID);
5936 ProcArrayInitRecoveryInfo(oldestActiveXID);
5939 * If we're beginning at a shutdown checkpoint, we know that
5940 * nothing was running on the master at this point. So fake-up
5941 * an empty running-xacts record and use that here and now.
5942 * Recover additional standby state for prepared transactions.
5946 RunningTransactionsData running;
5949 * Construct a RunningTransactions snapshot representing a shut
5950 * down server, with only prepared transactions still alive.
5951 * We're never overflowed at this point because all subxids
5952 * are listed with their parent prepared transactions.
5954 running.xcnt = nxids;
5955 running.subxid_overflow = false;
5956 running.nextXid = checkPoint.nextXid;
5957 running.oldestRunningXid = oldestActiveXID;
5958 running.xids = xids;
5960 ProcArrayApplyRecoveryInfo(&running);
5962 StandbyRecoverPreparedTransactions(false);
5966 /* Initialize resource managers */
5967 for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
5969 if (RmgrTable[rmid].rm_startup != NULL)
5970 RmgrTable[rmid].rm_startup();
5974 * Initialize shared replayEndRecPtr and recoveryLastRecPtr.
5976 * This is slightly confusing if we're starting from an online
5977 * checkpoint; we've just read and replayed the chekpoint record,
5978 * but we're going to start replay from its redo pointer, which
5979 * precedes the location of the checkpoint record itself. So even
5980 * though the last record we've replayed is indeed ReadRecPtr, we
5981 * haven't replayed all the preceding records yet. That's OK for
5982 * the current use of these variables.
5984 SpinLockAcquire(&xlogctl->info_lck);
5985 xlogctl->replayEndRecPtr = ReadRecPtr;
5986 xlogctl->recoveryLastRecPtr = ReadRecPtr;
5987 SpinLockRelease(&xlogctl->info_lck);
5990 * Let postmaster know we've started redo now, so that it can
5991 * launch bgwriter to perform restartpoints. We don't bother
5992 * during crash recovery as restartpoints can only be performed
5993 * during archive recovery. And we'd like to keep crash recovery
5994 * simple, to avoid introducing bugs that could affect you when
5995 * recovering after crash.
5997 * After this point, we can no longer assume that we're the only
5998 * process in addition to postmaster! Also, fsync requests are
5999 * subsequently to be handled by the bgwriter, not locally.
6001 if (InArchiveRecovery && IsUnderPostmaster)
6003 SetForwardFsyncRequests();
6004 SendPostmasterSignal(PMSIGNAL_RECOVERY_STARTED);
6005 bgwriterLaunched = true;
6009 * Allow read-only connections immediately if we're consistent already.
6011 CheckRecoveryConsistency();
6014 * Find the first record that logically follows the checkpoint --- it
6015 * might physically precede it, though.
6017 if (XLByteLT(checkPoint.redo, RecPtr))
6019 /* back up to find the record */
6020 record = ReadRecord(&(checkPoint.redo), PANIC, false);
6024 /* just have to read next record after CheckPoint */
6025 record = ReadRecord(NULL, LOG, false);
6030 bool recoveryContinue = true;
6031 bool recoveryApply = true;
6032 ErrorContextCallback errcontext;
6037 (errmsg("redo starts at %X/%X",
6038 ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
6041 * main redo apply loop
6047 (rmid == RM_XACT_ID && trace_recovery_messages <= DEBUG2) ||
6048 (rmid != RM_XACT_ID && trace_recovery_messages <= DEBUG3))
6052 initStringInfo(&buf);
6053 appendStringInfo(&buf, "REDO @ %X/%X; LSN %X/%X: ",
6054 ReadRecPtr.xlogid, ReadRecPtr.xrecoff,
6055 EndRecPtr.xlogid, EndRecPtr.xrecoff);
6056 xlog_outrec(&buf, record);
6057 appendStringInfo(&buf, " - ");
6058 RmgrTable[record->xl_rmid].rm_desc(&buf,
6060 XLogRecGetData(record));
6061 elog(LOG, "%s", buf.data);
6066 /* Handle interrupt signals of startup process */
6067 HandleStartupProcInterrupts();
6069 /* Allow read-only connections if we're consistent now */
6070 CheckRecoveryConsistency();
6073 * Have we reached our recovery target?
6075 if (recoveryStopsHere(record, &recoveryApply))
6077 reachedStopPoint = true; /* see below */
6078 recoveryContinue = false;
6083 /* Setup error traceback support for ereport() */
6084 errcontext.callback = rm_redo_error_callback;
6085 errcontext.arg = (void *) record;
6086 errcontext.previous = error_context_stack;
6087 error_context_stack = &errcontext;
6089 /* nextXid must be beyond record's xid */
6090 if (TransactionIdFollowsOrEquals(record->xl_xid,
6091 ShmemVariableCache->nextXid))
6093 ShmemVariableCache->nextXid = record->xl_xid;
6094 TransactionIdAdvance(ShmemVariableCache->nextXid);
6098 * Update shared replayEndRecPtr before replaying this record,
6099 * so that XLogFlush will update minRecoveryPoint correctly.
6101 SpinLockAcquire(&xlogctl->info_lck);
6102 xlogctl->replayEndRecPtr = EndRecPtr;
6103 xlogctl->recoveryLastXTime = recoveryLastXTime;
6104 SpinLockRelease(&xlogctl->info_lck);
6106 /* In Hot Standby mode, keep track of XIDs we've seen */
6107 if (InHotStandby && TransactionIdIsValid(record->xl_xid))
6108 RecordKnownAssignedTransactionIds(record->xl_xid);
6110 RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
6112 /* Pop the error context stack */
6113 error_context_stack = errcontext.previous;
6116 * Update shared recoveryLastRecPtr after this record has been
6119 SpinLockAcquire(&xlogctl->info_lck);
6120 xlogctl->recoveryLastRecPtr = EndRecPtr;
6121 SpinLockRelease(&xlogctl->info_lck);
6123 LastRec = ReadRecPtr;
6125 record = ReadRecord(NULL, LOG, false);
6126 } while (record != NULL && recoveryContinue);
6129 * end of main redo apply loop
6133 (errmsg("redo done at %X/%X",
6134 ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
6135 if (recoveryLastXTime)
6137 (errmsg("last completed transaction was at log time %s",
6138 timestamptz_to_str(recoveryLastXTime))));
6143 /* there are no WAL records following the checkpoint */
6145 (errmsg("redo is not required")));
6150 * If we launched a WAL receiver, it should be gone by now. It will trump
6151 * over the startup checkpoint and subsequent records if it's still alive,
6152 * so be extra sure that it's gone.
6154 if (WalRcvInProgress())
6155 elog(PANIC, "wal receiver still active");
6158 * We are now done reading the xlog from stream. Turn off streaming
6159 * recovery to force fetching the files (which would be required at end of
6160 * recovery, e.g., timeline history file) from archive or pg_xlog.
6162 StandbyMode = false;
6165 * Re-fetch the last valid or last applied record, so we can identify the
6166 * exact endpoint of what we consider the valid portion of WAL.
6168 record = ReadRecord(&LastRec, PANIC, false);
6169 EndOfLog = EndRecPtr;
6170 XLByteToPrevSeg(EndOfLog, endLogId, endLogSeg);
6173 * Complain if we did not roll forward far enough to render the backup
6174 * dump consistent. Note: it is indeed okay to look at the local variable
6175 * minRecoveryPoint here, even though ControlFile->minRecoveryPoint might
6176 * be further ahead --- ControlFile->minRecoveryPoint cannot have been
6177 * advanced beyond the WAL we processed.
6179 if (InArchiveRecovery &&
6180 (XLByteLT(EndOfLog, minRecoveryPoint) ||
6181 !XLogRecPtrIsInvalid(ControlFile->backupStartPoint)))
6183 if (reachedStopPoint) /* stopped because of stop request */
6185 (errmsg("requested recovery stop point is before consistent recovery point")));
6186 else /* ran off end of WAL */
6188 (errmsg("WAL ends before consistent recovery point")));
6192 * Consider whether we need to assign a new timeline ID.
6194 * If we are doing an archive recovery, we always assign a new ID. This
6195 * handles a couple of issues. If we stopped short of the end of WAL
6196 * during recovery, then we are clearly generating a new timeline and must
6197 * assign it a unique new ID. Even if we ran to the end, modifying the
6198 * current last segment is problematic because it may result in trying to
6199 * overwrite an already-archived copy of that segment, and we encourage
6200 * DBAs to make their archive_commands reject that. We can dodge the
6201 * problem by making the new active segment have a new timeline ID.
6203 * In a normal crash recovery, we can just extend the timeline we were in.
6205 if (InArchiveRecovery)
6207 ThisTimeLineID = findNewestTimeLine(recoveryTargetTLI) + 1;
6209 (errmsg("selected new timeline ID: %u", ThisTimeLineID)));
6210 writeTimeLineHistory(ThisTimeLineID, recoveryTargetTLI,
6211 curFileTLI, endLogId, endLogSeg);
6214 /* Save the selected TimeLineID in shared memory, too */
6215 XLogCtl->ThisTimeLineID = ThisTimeLineID;
6218 * We are now done reading the old WAL. Turn off archive fetching if it
6219 * was active, and make a writable copy of the last WAL segment. (Note
6220 * that we also have a copy of the last block of the old WAL in readBuf;
6221 * we will use that below.)
6223 if (InArchiveRecovery)
6224 exitArchiveRecovery(curFileTLI, endLogId, endLogSeg);
6227 * Prepare to write WAL starting at EndOfLog position, and init xlog
6228 * buffer cache using the block containing the last record from the
6229 * previous incarnation.
6231 openLogId = endLogId;
6232 openLogSeg = endLogSeg;
6233 openLogFile = XLogFileOpen(openLogId, openLogSeg);
6235 Insert = &XLogCtl->Insert;
6236 Insert->PrevRecord = LastRec;
6237 XLogCtl->xlblocks[0].xlogid = openLogId;
6238 XLogCtl->xlblocks[0].xrecoff =
6239 ((EndOfLog.xrecoff - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ;
6242 * Tricky point here: readBuf contains the *last* block that the LastRec
6243 * record spans, not the one it starts in. The last block is indeed the
6244 * one we want to use.
6246 Assert(readOff == (XLogCtl->xlblocks[0].xrecoff - XLOG_BLCKSZ) % XLogSegSize);
6247 memcpy((char *) Insert->currpage, readBuf, XLOG_BLCKSZ);
6248 Insert->currpos = (char *) Insert->currpage +
6249 (EndOfLog.xrecoff + XLOG_BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
6251 LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
6253 XLogCtl->Write.LogwrtResult = LogwrtResult;
6254 Insert->LogwrtResult = LogwrtResult;
6255 XLogCtl->LogwrtResult = LogwrtResult;
6257 XLogCtl->LogwrtRqst.Write = EndOfLog;
6258 XLogCtl->LogwrtRqst.Flush = EndOfLog;
6260 freespace = INSERT_FREESPACE(Insert);
6263 /* Make sure rest of page is zero */
6264 MemSet(Insert->currpos, 0, freespace);
6265 XLogCtl->Write.curridx = 0;
6270 * Whenever Write.LogwrtResult points to exactly the end of a page,
6271 * Write.curridx must point to the *next* page (see XLogWrite()).
6273 * Note: it might seem we should do AdvanceXLInsertBuffer() here, but
6274 * this is sufficient. The first actual attempt to insert a log
6275 * record will advance the insert state.
6277 XLogCtl->Write.curridx = NextBufIdx(0);
6280 /* Pre-scan prepared transactions to find out the range of XIDs present */
6281 oldestActiveXID = PrescanPreparedTransactions(NULL, NULL);
6288 * Resource managers might need to write WAL records, eg, to record
6289 * index cleanup actions. So temporarily enable XLogInsertAllowed in
6290 * this process only.
6292 LocalSetXLogInsertAllowed();
6295 * Allow resource managers to do any required cleanup.
6297 for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
6299 if (RmgrTable[rmid].rm_cleanup != NULL)
6300 RmgrTable[rmid].rm_cleanup();
6303 /* Disallow XLogInsert again */
6304 LocalXLogInsertAllowed = -1;
6307 * Check to see if the XLOG sequence contained any unresolved
6308 * references to uninitialized pages.
6310 XLogCheckInvalidPages();
6313 * Perform a checkpoint to update all our recovery activity to disk.
6315 * Note that we write a shutdown checkpoint rather than an on-line
6316 * one. This is not particularly critical, but since we may be
6317 * assigning a new TLI, using a shutdown checkpoint allows us to have
6318 * the rule that TLI only changes in shutdown checkpoints, which
6319 * allows some extra error checking in xlog_redo.
6321 if (bgwriterLaunched)
6322 RequestCheckpoint(CHECKPOINT_END_OF_RECOVERY |
6323 CHECKPOINT_IMMEDIATE |
6326 CreateCheckPoint(CHECKPOINT_END_OF_RECOVERY | CHECKPOINT_IMMEDIATE);
6329 * And finally, execute the recovery_end_command, if any.
6331 if (recoveryEndCommand)
6332 ExecuteRecoveryCommand(recoveryEndCommand,
6333 "recovery_end_command",
6338 * Preallocate additional log files, if wanted.
6340 PreallocXlogFiles(EndOfLog);
6343 * Okay, we're officially UP.
6347 LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
6348 ControlFile->state = DB_IN_PRODUCTION;
6349 ControlFile->time = (pg_time_t) time(NULL);
6350 UpdateControlFile();
6351 LWLockRelease(ControlFileLock);
6353 /* start the archive_timeout timer running */
6354 XLogCtl->Write.lastSegSwitchTime = (pg_time_t) time(NULL);
6356 /* initialize shared-memory copy of latest checkpoint XID/epoch */
6357 XLogCtl->ckptXidEpoch = ControlFile->checkPointCopy.nextXidEpoch;
6358 XLogCtl->ckptXid = ControlFile->checkPointCopy.nextXid;
6360 /* also initialize latestCompletedXid, to nextXid - 1 */
6361 ShmemVariableCache->latestCompletedXid = ShmemVariableCache->nextXid;
6362 TransactionIdRetreat(ShmemVariableCache->latestCompletedXid);
6365 * Start up the commit log and related stuff, too. In hot standby mode we
6366 * did this already before WAL replay.
6368 if (standbyState == STANDBY_DISABLED)
6371 StartupSUBTRANS(oldestActiveXID);
6375 /* Reload shared-memory state for prepared transactions */
6376 RecoverPreparedTransactions();
6379 * Shutdown the recovery environment. This must occur after
6380 * RecoverPreparedTransactions(), see notes for lock_twophase_recover()
6382 if (standbyState != STANDBY_DISABLED)
6383 ShutdownRecoveryTransactionEnvironment();
6385 /* Shut down readFile facility, free space */
6398 free(readRecordBuf);
6399 readRecordBuf = NULL;
6400 readRecordBufSize = 0;
6404 * All done. Allow backends to write WAL. (Although the bool flag is
6405 * probably atomic in itself, we use the info_lck here to ensure that
6406 * there are no race conditions concerning visibility of other recent
6407 * updates to shared memory.)
6410 /* use volatile pointer to prevent code rearrangement */
6411 volatile XLogCtlData *xlogctl = XLogCtl;
6413 SpinLockAcquire(&xlogctl->info_lck);
6414 xlogctl->SharedRecoveryInProgress = false;
6415 SpinLockRelease(&xlogctl->info_lck);
6420 * Checks if recovery has reached a consistent state. When consistency is
6421 * reached and we have a valid starting standby snapshot, tell postmaster
6422 * that it can start accepting read-only connections.
6425 CheckRecoveryConsistency(void)
6427 static bool backendsAllowed = false;
6430 * Have we passed our safe starting point?
6432 if (!reachedMinRecoveryPoint &&
6433 XLByteLE(minRecoveryPoint, EndRecPtr) &&
6434 XLogRecPtrIsInvalid(ControlFile->backupStartPoint))
6436 reachedMinRecoveryPoint = true;
6438 (errmsg("consistent recovery state reached at %X/%X",
6439 EndRecPtr.xlogid, EndRecPtr.xrecoff)));
6443 * Have we got a valid starting snapshot that will allow
6444 * queries to be run? If so, we can tell postmaster that the
6445 * database is consistent now, enabling connections.
6447 if (standbyState == STANDBY_SNAPSHOT_READY &&
6449 reachedMinRecoveryPoint &&
6452 backendsAllowed = true;
6453 SendPostmasterSignal(PMSIGNAL_RECOVERY_CONSISTENT);
6458 * Is the system still in recovery?
6460 * Unlike testing InRecovery, this works in any process that's connected to
6463 * As a side-effect, we initialize the local TimeLineID and RedoRecPtr
6464 * variables the first time we see that recovery is finished.
6467 RecoveryInProgress(void)
6470 * We check shared state each time only until we leave recovery mode. We
6471 * can't re-enter recovery, so there's no need to keep checking after the
6472 * shared variable has once been seen false.
6474 if (!LocalRecoveryInProgress)
6478 /* use volatile pointer to prevent code rearrangement */
6479 volatile XLogCtlData *xlogctl = XLogCtl;
6481 /* spinlock is essential on machines with weak memory ordering! */
6482 SpinLockAcquire(&xlogctl->info_lck);
6483 LocalRecoveryInProgress = xlogctl->SharedRecoveryInProgress;
6484 SpinLockRelease(&xlogctl->info_lck);
6487 * Initialize TimeLineID and RedoRecPtr when we discover that recovery
6488 * is finished. InitPostgres() relies upon this behaviour to ensure
6489 * that InitXLOGAccess() is called at backend startup. (If you change
6490 * this, see also LocalSetXLogInsertAllowed.)
6492 if (!LocalRecoveryInProgress)
6495 return LocalRecoveryInProgress;
6500 * Is this process allowed to insert new WAL records?
6502 * Ordinarily this is essentially equivalent to !RecoveryInProgress().
6503 * But we also have provisions for forcing the result "true" or "false"
6504 * within specific processes regardless of the global state.
6507 XLogInsertAllowed(void)
6510 * If value is "unconditionally true" or "unconditionally false", just
6511 * return it. This provides the normal fast path once recovery is known
6514 if (LocalXLogInsertAllowed >= 0)
6515 return (bool) LocalXLogInsertAllowed;
6518 * Else, must check to see if we're still in recovery.
6520 if (RecoveryInProgress())
6524 * On exit from recovery, reset to "unconditionally true", since there is
6525 * no need to keep checking.
6527 LocalXLogInsertAllowed = 1;
6532 * Make XLogInsertAllowed() return true in the current process only.
6534 * Note: it is allowed to switch LocalXLogInsertAllowed back to -1 later,
6535 * and even call LocalSetXLogInsertAllowed() again after that.
6538 LocalSetXLogInsertAllowed(void)
6540 Assert(LocalXLogInsertAllowed == -1);
6541 LocalXLogInsertAllowed = 1;
6543 /* Initialize as RecoveryInProgress() would do when switching state */
6548 * Subroutine to try to fetch and validate a prior checkpoint record.
6550 * whichChkpt identifies the checkpoint (merely for reporting purposes).
6551 * 1 for "primary", 2 for "secondary", 0 for "other" (backup_label)
6554 ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
6558 if (!XRecOffIsValid(RecPtr.xrecoff))
6564 (errmsg("invalid primary checkpoint link in control file")));
6568 (errmsg("invalid secondary checkpoint link in control file")));
6572 (errmsg("invalid checkpoint link in backup_label file")));
6578 record = ReadRecord(&RecPtr, LOG, true);
6586 (errmsg("invalid primary checkpoint record")));
6590 (errmsg("invalid secondary checkpoint record")));
6594 (errmsg("invalid checkpoint record")));
6599 if (record->xl_rmid != RM_XLOG_ID)
6605 (errmsg("invalid resource manager ID in primary checkpoint record")));
6609 (errmsg("invalid resource manager ID in secondary checkpoint record")));
6613 (errmsg("invalid resource manager ID in checkpoint record")));
6618 if (record->xl_info != XLOG_CHECKPOINT_SHUTDOWN &&
6619 record->xl_info != XLOG_CHECKPOINT_ONLINE)
6625 (errmsg("invalid xl_info in primary checkpoint record")));
6629 (errmsg("invalid xl_info in secondary checkpoint record")));
6633 (errmsg("invalid xl_info in checkpoint record")));
6638 if (record->xl_len != sizeof(CheckPoint) ||
6639 record->xl_tot_len != SizeOfXLogRecord + sizeof(CheckPoint))
6645 (errmsg("invalid length of primary checkpoint record")));
6649 (errmsg("invalid length of secondary checkpoint record")));
6653 (errmsg("invalid length of checkpoint record")));
6662 * This must be called during startup of a backend process, except that
6663 * it need not be called in a standalone backend (which does StartupXLOG
6664 * instead). We need to initialize the local copies of ThisTimeLineID and
6667 * Note: before Postgres 8.0, we went to some effort to keep the postmaster
6668 * process's copies of ThisTimeLineID and RedoRecPtr valid too. This was
6669 * unnecessary however, since the postmaster itself never touches XLOG anyway.
6672 InitXLOGAccess(void)
6674 /* ThisTimeLineID doesn't change so we need no lock to copy it */
6675 ThisTimeLineID = XLogCtl->ThisTimeLineID;
6676 Assert(ThisTimeLineID != 0 || IsBootstrapProcessingMode());
6678 /* Use GetRedoRecPtr to copy the RedoRecPtr safely */
6679 (void) GetRedoRecPtr();
6683 * Once spawned, a backend may update its local RedoRecPtr from
6684 * XLogCtl->Insert.RedoRecPtr; it must hold the insert lock or info_lck
6685 * to do so. This is done in XLogInsert() or GetRedoRecPtr().
6690 /* use volatile pointer to prevent code rearrangement */
6691 volatile XLogCtlData *xlogctl = XLogCtl;
6693 SpinLockAcquire(&xlogctl->info_lck);
6694 Assert(XLByteLE(RedoRecPtr, xlogctl->Insert.RedoRecPtr));
6695 RedoRecPtr = xlogctl->Insert.RedoRecPtr;
6696 SpinLockRelease(&xlogctl->info_lck);
6702 * GetInsertRecPtr -- Returns the current insert position.
6704 * NOTE: The value *actually* returned is the position of the last full
6705 * xlog page. It lags behind the real insert position by at most 1 page.
6706 * For that, we don't need to acquire WALInsertLock which can be quite
6707 * heavily contended, and an approximation is enough for the current
6708 * usage of this function.
6711 GetInsertRecPtr(void)
6713 /* use volatile pointer to prevent code rearrangement */
6714 volatile XLogCtlData *xlogctl = XLogCtl;
6717 SpinLockAcquire(&xlogctl->info_lck);
6718 recptr = xlogctl->LogwrtRqst.Write;
6719 SpinLockRelease(&xlogctl->info_lck);
6725 * GetWriteRecPtr -- Returns the current write position.
6728 GetWriteRecPtr(void)
6730 /* use volatile pointer to prevent code rearrangement */
6731 volatile XLogCtlData *xlogctl = XLogCtl;
6734 SpinLockAcquire(&xlogctl->info_lck);
6735 recptr = xlogctl->LogwrtResult.Write;
6736 SpinLockRelease(&xlogctl->info_lck);
6742 * Get the time of the last xlog segment switch
6745 GetLastSegSwitchTime(void)
6749 /* Need WALWriteLock, but shared lock is sufficient */
6750 LWLockAcquire(WALWriteLock, LW_SHARED);
6751 result = XLogCtl->Write.lastSegSwitchTime;
6752 LWLockRelease(WALWriteLock);
6758 * GetNextXidAndEpoch - get the current nextXid value and associated epoch
6760 * This is exported for use by code that would like to have 64-bit XIDs.
6761 * We don't really support such things, but all XIDs within the system
6762 * can be presumed "close to" the result, and thus the epoch associated
6763 * with them can be determined.
6766 GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
6768 uint32 ckptXidEpoch;
6769 TransactionId ckptXid;
6770 TransactionId nextXid;
6772 /* Must read checkpoint info first, else have race condition */
6774 /* use volatile pointer to prevent code rearrangement */
6775 volatile XLogCtlData *xlogctl = XLogCtl;
6777 SpinLockAcquire(&xlogctl->info_lck);
6778 ckptXidEpoch = xlogctl->ckptXidEpoch;
6779 ckptXid = xlogctl->ckptXid;
6780 SpinLockRelease(&xlogctl->info_lck);
6783 /* Now fetch current nextXid */
6784 nextXid = ReadNewTransactionId();
6787 * nextXid is certainly logically later than ckptXid. So if it's
6788 * numerically less, it must have wrapped into the next epoch.
6790 if (nextXid < ckptXid)
6794 *epoch = ckptXidEpoch;
6798 * GetRecoveryTargetTLI - get the recovery target timeline ID
6801 GetRecoveryTargetTLI(void)
6803 /* RecoveryTargetTLI doesn't change so we need no lock to copy it */
6804 return XLogCtl->RecoveryTargetTLI;
6808 * This must be called ONCE during postmaster or standalone-backend shutdown
6811 ShutdownXLOG(int code, Datum arg)
6814 (errmsg("shutting down")));
6816 if (RecoveryInProgress())
6817 CreateRestartPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
6821 * If archiving is enabled, rotate the last XLOG file so that all the
6822 * remaining records are archived (postmaster wakes up the archiver
6823 * process one more time at the end of shutdown). The checkpoint
6824 * record will go to the next XLOG file and won't be archived (yet).
6826 if (XLogArchivingActive() && XLogArchiveCommandSet())
6827 RequestXLogSwitch();
6829 CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
6833 ShutdownMultiXact();
6836 (errmsg("database system is shut down")));
6840 * Log start of a checkpoint.
6843 LogCheckpointStart(int flags, bool restartpoint)
6848 * XXX: This is hopelessly untranslatable. We could call gettext_noop for
6849 * the main message, but what about all the flags?
6852 msg = "restartpoint starting:%s%s%s%s%s%s%s";
6854 msg = "checkpoint starting:%s%s%s%s%s%s%s";
6857 (flags & CHECKPOINT_IS_SHUTDOWN) ? " shutdown" : "",
6858 (flags & CHECKPOINT_END_OF_RECOVERY) ? " end-of-recovery" : "",
6859 (flags & CHECKPOINT_IMMEDIATE) ? " immediate" : "",
6860 (flags & CHECKPOINT_FORCE) ? " force" : "",
6861 (flags & CHECKPOINT_WAIT) ? " wait" : "",
6862 (flags & CHECKPOINT_CAUSE_XLOG) ? " xlog" : "",
6863 (flags & CHECKPOINT_CAUSE_TIME) ? " time" : "");
6867 * Log end of a checkpoint.
6870 LogCheckpointEnd(bool restartpoint)
6879 CheckpointStats.ckpt_end_t = GetCurrentTimestamp();
6881 TimestampDifference(CheckpointStats.ckpt_start_t,
6882 CheckpointStats.ckpt_end_t,
6883 &total_secs, &total_usecs);
6885 TimestampDifference(CheckpointStats.ckpt_write_t,
6886 CheckpointStats.ckpt_sync_t,
6887 &write_secs, &write_usecs);
6889 TimestampDifference(CheckpointStats.ckpt_sync_t,
6890 CheckpointStats.ckpt_sync_end_t,
6891 &sync_secs, &sync_usecs);
6894 elog(LOG, "restartpoint complete: wrote %d buffers (%.1f%%); "
6895 "write=%ld.%03d s, sync=%ld.%03d s, total=%ld.%03d s",
6896 CheckpointStats.ckpt_bufs_written,
6897 (double) CheckpointStats.ckpt_bufs_written * 100 / NBuffers,
6898 write_secs, write_usecs / 1000,
6899 sync_secs, sync_usecs / 1000,
6900 total_secs, total_usecs / 1000);
6902 elog(LOG, "checkpoint complete: wrote %d buffers (%.1f%%); "
6903 "%d transaction log file(s) added, %d removed, %d recycled; "
6904 "write=%ld.%03d s, sync=%ld.%03d s, total=%ld.%03d s",
6905 CheckpointStats.ckpt_bufs_written,
6906 (double) CheckpointStats.ckpt_bufs_written * 100 / NBuffers,
6907 CheckpointStats.ckpt_segs_added,
6908 CheckpointStats.ckpt_segs_removed,
6909 CheckpointStats.ckpt_segs_recycled,
6910 write_secs, write_usecs / 1000,
6911 sync_secs, sync_usecs / 1000,
6912 total_secs, total_usecs / 1000);
6916 * Perform a checkpoint --- either during shutdown, or on-the-fly
6918 * flags is a bitwise OR of the following:
6919 * CHECKPOINT_IS_SHUTDOWN: checkpoint is for database shutdown.
6920 * CHECKPOINT_END_OF_RECOVERY: checkpoint is for end of WAL recovery.
6921 * CHECKPOINT_IMMEDIATE: finish the checkpoint ASAP,
6922 * ignoring checkpoint_completion_target parameter.
6923 * CHECKPOINT_FORCE: force a checkpoint even if no XLOG activity has occured
6924 * since the last one (implied by CHECKPOINT_IS_SHUTDOWN or
6925 * CHECKPOINT_END_OF_RECOVERY).
6927 * Note: flags contains other bits, of interest here only for logging purposes.
6928 * In particular note that this routine is synchronous and does not pay
6929 * attention to CHECKPOINT_WAIT.
6932 CreateCheckPoint(int flags)
6935 CheckPoint checkPoint;
6937 XLogCtlInsert *Insert = &XLogCtl->Insert;
6942 TransactionId *inCommitXids;
6946 * An end-of-recovery checkpoint is really a shutdown checkpoint, just
6947 * issued at a different time.
6949 if (flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY))
6955 if (RecoveryInProgress() && (flags & CHECKPOINT_END_OF_RECOVERY) == 0)
6956 elog(ERROR, "can't create a checkpoint during recovery");
6959 * Acquire CheckpointLock to ensure only one checkpoint happens at a time.
6960 * (This is just pro forma, since in the present system structure there is
6961 * only one process that is allowed to issue checkpoints at any given
6964 LWLockAcquire(CheckpointLock, LW_EXCLUSIVE);
6967 * Prepare to accumulate statistics.
6969 * Note: because it is possible for log_checkpoints to change while a
6970 * checkpoint proceeds, we always accumulate stats, even if
6971 * log_checkpoints is currently off.
6973 MemSet(&CheckpointStats, 0, sizeof(CheckpointStats));
6974 CheckpointStats.ckpt_start_t = GetCurrentTimestamp();
6977 * Use a critical section to force system panic if we have trouble.
6979 START_CRIT_SECTION();
6983 LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
6984 ControlFile->state = DB_SHUTDOWNING;
6985 ControlFile->time = (pg_time_t) time(NULL);
6986 UpdateControlFile();
6987 LWLockRelease(ControlFileLock);
6991 * Let smgr prepare for checkpoint; this has to happen before we determine
6992 * the REDO pointer. Note that smgr must not do anything that'd have to
6993 * be undone if we decide no checkpoint is needed.
6997 /* Begin filling in the checkpoint WAL record */
6998 MemSet(&checkPoint, 0, sizeof(checkPoint));
6999 checkPoint.time = (pg_time_t) time(NULL);
7001 /* Set important parameter values for use when replaying WAL */
7002 checkPoint.MaxConnections = MaxConnections;
7003 checkPoint.max_prepared_xacts = max_prepared_xacts;
7004 checkPoint.max_locks_per_xact = max_locks_per_xact;
7005 checkPoint.XLogStandbyInfoMode = XLogStandbyInfoActive();
7008 * We must hold WALInsertLock while examining insert state to determine
7009 * the checkpoint REDO pointer.
7011 LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
7014 * If this isn't a shutdown or forced checkpoint, and we have not inserted
7015 * any XLOG records since the start of the last checkpoint, skip the
7016 * checkpoint. The idea here is to avoid inserting duplicate checkpoints
7017 * when the system is idle. That wastes log space, and more importantly it
7018 * exposes us to possible loss of both current and previous checkpoint
7019 * records if the machine crashes just as we're writing the update.
7020 * (Perhaps it'd make even more sense to checkpoint only when the previous
7021 * checkpoint record is in a different xlog page?)
7023 * We have to make two tests to determine that nothing has happened since
7024 * the start of the last checkpoint: current insertion point must match
7025 * the end of the last checkpoint record, and its redo pointer must point
7028 if ((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY |
7029 CHECKPOINT_FORCE)) == 0)
7031 XLogRecPtr curInsert;
7033 INSERT_RECPTR(curInsert, Insert, Insert->curridx);
7034 if (curInsert.xlogid == ControlFile->checkPoint.xlogid &&
7035 curInsert.xrecoff == ControlFile->checkPoint.xrecoff +
7036 MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
7037 ControlFile->checkPoint.xlogid ==
7038 ControlFile->checkPointCopy.redo.xlogid &&
7039 ControlFile->checkPoint.xrecoff ==
7040 ControlFile->checkPointCopy.redo.xrecoff)
7042 LWLockRelease(WALInsertLock);
7043 LWLockRelease(CheckpointLock);
7050 * An end-of-recovery checkpoint is created before anyone is allowed to
7051 * write WAL. To allow us to write the checkpoint record, temporarily
7052 * enable XLogInsertAllowed. (This also ensures ThisTimeLineID is
7053 * initialized, which we need here and in AdvanceXLInsertBuffer.)
7055 if (flags & CHECKPOINT_END_OF_RECOVERY)
7056 LocalSetXLogInsertAllowed();
7058 checkPoint.ThisTimeLineID = ThisTimeLineID;
7061 * Compute new REDO record ptr = location of next XLOG record.
7063 * NB: this is NOT necessarily where the checkpoint record itself will be,
7064 * since other backends may insert more XLOG records while we're off doing
7065 * the buffer flush work. Those XLOG records are logically after the
7066 * checkpoint, even though physically before it. Got that?
7068 freespace = INSERT_FREESPACE(Insert);
7069 if (freespace < SizeOfXLogRecord)
7071 (void) AdvanceXLInsertBuffer(false);
7072 /* OK to ignore update return flag, since we will do flush anyway */
7073 freespace = INSERT_FREESPACE(Insert);
7075 INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx);
7078 * Here we update the shared RedoRecPtr for future XLogInsert calls; this
7079 * must be done while holding the insert lock AND the info_lck.
7081 * Note: if we fail to complete the checkpoint, RedoRecPtr will be left
7082 * pointing past where it really needs to point. This is okay; the only
7083 * consequence is that XLogInsert might back up whole buffers that it
7084 * didn't really need to. We can't postpone advancing RedoRecPtr because
7085 * XLogInserts that happen while we are dumping buffers must assume that
7086 * their buffer changes are not included in the checkpoint.
7089 /* use volatile pointer to prevent code rearrangement */
7090 volatile XLogCtlData *xlogctl = XLogCtl;
7092 SpinLockAcquire(&xlogctl->info_lck);
7093 RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
7094 SpinLockRelease(&xlogctl->info_lck);
7098 * Now we can release WAL insert lock, allowing other xacts to proceed
7099 * while we are flushing disk buffers.
7101 LWLockRelease(WALInsertLock);
7104 * If enabled, log checkpoint start. We postpone this until now so as not
7105 * to log anything if we decided to skip the checkpoint.
7107 if (log_checkpoints)
7108 LogCheckpointStart(flags, false);
7110 TRACE_POSTGRESQL_CHECKPOINT_START(flags);
7113 * Before flushing data, we must wait for any transactions that are
7114 * currently in their commit critical sections. If an xact inserted its
7115 * commit record into XLOG just before the REDO point, then a crash
7116 * restart from the REDO point would not replay that record, which means
7117 * that our flushing had better include the xact's update of pg_clog. So
7118 * we wait till he's out of his commit critical section before proceeding.
7119 * See notes in RecordTransactionCommit().
7121 * Because we've already released WALInsertLock, this test is a bit fuzzy:
7122 * it is possible that we will wait for xacts we didn't really need to
7123 * wait for. But the delay should be short and it seems better to make
7124 * checkpoint take a bit longer than to hold locks longer than necessary.
7125 * (In fact, the whole reason we have this issue is that xact.c does
7126 * commit record XLOG insertion and clog update as two separate steps
7127 * protected by different locks, but again that seems best on grounds of
7128 * minimizing lock contention.)
7130 * A transaction that has not yet set inCommit when we look cannot be at
7131 * risk, since he's not inserted his commit record yet; and one that's
7132 * already cleared it is not at risk either, since he's done fixing clog
7133 * and we will correctly flush the update below. So we cannot miss any
7134 * xacts we need to wait for.
7136 nInCommit = GetTransactionsInCommit(&inCommitXids);
7141 pg_usleep(10000L); /* wait for 10 msec */
7142 } while (HaveTransactionsInCommit(inCommitXids, nInCommit));
7144 pfree(inCommitXids);
7147 * Get the other info we need for the checkpoint record.
7149 LWLockAcquire(XidGenLock, LW_SHARED);
7150 checkPoint.nextXid = ShmemVariableCache->nextXid;
7151 checkPoint.oldestXid = ShmemVariableCache->oldestXid;
7152 checkPoint.oldestXidDB = ShmemVariableCache->oldestXidDB;
7153 LWLockRelease(XidGenLock);
7155 /* Increase XID epoch if we've wrapped around since last checkpoint */
7156 checkPoint.nextXidEpoch = ControlFile->checkPointCopy.nextXidEpoch;
7157 if (checkPoint.nextXid < ControlFile->checkPointCopy.nextXid)
7158 checkPoint.nextXidEpoch++;
7160 LWLockAcquire(OidGenLock, LW_SHARED);
7161 checkPoint.nextOid = ShmemVariableCache->nextOid;
7163 checkPoint.nextOid += ShmemVariableCache->oidCount;
7164 LWLockRelease(OidGenLock);
7166 MultiXactGetCheckptMulti(shutdown,
7167 &checkPoint.nextMulti,
7168 &checkPoint.nextMultiOffset);
7171 * Having constructed the checkpoint record, ensure all shmem disk buffers
7172 * and commit-log buffers are flushed to disk.
7174 * This I/O could fail for various reasons. If so, we will fail to
7175 * complete the checkpoint, but there is no reason to force a system
7176 * panic. Accordingly, exit critical section while doing it.
7180 CheckPointGuts(checkPoint.redo, flags);
7183 * Take a snapshot of running transactions and write this to WAL. This
7184 * allows us to reconstruct the state of running transactions during
7185 * archive recovery, if required. Skip, if this info disabled.
7187 * If we are shutting down, or Startup process is completing crash
7188 * recovery we don't need to write running xact data.
7190 * Update checkPoint.nextXid since we have a later value
7192 if (!shutdown && XLogStandbyInfoActive())
7193 LogStandbySnapshot(&checkPoint.oldestActiveXid, &checkPoint.nextXid);
7195 checkPoint.oldestActiveXid = InvalidTransactionId;
7197 START_CRIT_SECTION();
7200 * Now insert the checkpoint record into XLOG.
7202 rdata.data = (char *) (&checkPoint);
7203 rdata.len = sizeof(checkPoint);
7204 rdata.buffer = InvalidBuffer;
7207 recptr = XLogInsert(RM_XLOG_ID,
7208 shutdown ? XLOG_CHECKPOINT_SHUTDOWN :
7209 XLOG_CHECKPOINT_ONLINE,
7215 * We mustn't write any new WAL after a shutdown checkpoint, or it will be
7216 * overwritten at next startup. No-one should even try, this just allows
7217 * sanity-checking. In the case of an end-of-recovery checkpoint, we want
7218 * to just temporarily disable writing until the system has exited
7223 if (flags & CHECKPOINT_END_OF_RECOVERY)
7224 LocalXLogInsertAllowed = -1; /* return to "check" state */
7226 LocalXLogInsertAllowed = 0; /* never again write WAL */
7230 * We now have ProcLastRecPtr = start of actual checkpoint record, recptr
7231 * = end of actual checkpoint record.
7233 if (shutdown && !XLByteEQ(checkPoint.redo, ProcLastRecPtr))
7235 (errmsg("concurrent transaction log activity while database system is shutting down")));
7238 * Select point at which we can truncate the log, which we base on the
7239 * prior checkpoint's earliest info.
7241 XLByteToSeg(ControlFile->checkPointCopy.redo, _logId, _logSeg);
7244 * Update the control file.
7246 LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
7248 ControlFile->state = DB_SHUTDOWNED;
7249 ControlFile->prevCheckPoint = ControlFile->checkPoint;
7250 ControlFile->checkPoint = ProcLastRecPtr;
7251 ControlFile->checkPointCopy = checkPoint;
7252 ControlFile->time = (pg_time_t) time(NULL);
7253 /* crash recovery should always recover to the end of WAL */
7254 MemSet(&ControlFile->minRecoveryPoint, 0, sizeof(XLogRecPtr));
7255 UpdateControlFile();
7256 LWLockRelease(ControlFileLock);
7258 /* Update shared-memory copy of checkpoint XID/epoch */
7260 /* use volatile pointer to prevent code rearrangement */
7261 volatile XLogCtlData *xlogctl = XLogCtl;
7263 SpinLockAcquire(&xlogctl->info_lck);
7264 xlogctl->ckptXidEpoch = checkPoint.nextXidEpoch;
7265 xlogctl->ckptXid = checkPoint.nextXid;
7266 SpinLockRelease(&xlogctl->info_lck);
7270 * We are now done with critical updates; no need for system panic if we
7271 * have trouble while fooling with old log segments.
7276 * Let smgr do post-checkpoint cleanup (eg, deleting old files).
7281 * Delete old log files (those no longer needed even for previous
7282 * checkpoint or the standbys in XLOG streaming).
7284 if (_logId || _logSeg)
7287 * Calculate the last segment that we need to retain because of
7288 * wal_keep_segments, by subtracting wal_keep_segments from the
7289 * new checkpoint location.
7291 if (wal_keep_segments > 0)
7298 XLByteToSeg(recptr, log, seg);
7300 d_seg = wal_keep_segments % XLogSegsPerFile;
7301 d_log = wal_keep_segments / XLogSegsPerFile;
7305 seg = seg - d_seg + XLogSegsPerFile;
7309 /* avoid underflow, don't go below (0,1) */
7310 if (log < d_log || (log == d_log && seg == 0))
7318 /* don't delete WAL segments newer than the calculated segment */
7319 if (log < _logId || (log == _logId && seg < _logSeg))
7326 PrevLogSeg(_logId, _logSeg);
7327 RemoveOldXlogFiles(_logId, _logSeg, recptr);
7331 * Make more log segments if needed. (Do this after recycling old log
7332 * segments, since that may supply some of the needed files.)
7335 PreallocXlogFiles(recptr);
7338 * Truncate pg_subtrans if possible. We can throw away all data before
7339 * the oldest XMIN of any running transaction. No future transaction will
7340 * attempt to reference any pg_subtrans entry older than that (see Asserts
7341 * in subtrans.c). During recovery, though, we mustn't do this because
7342 * StartupSUBTRANS hasn't been called yet.
7344 if (!RecoveryInProgress())
7345 TruncateSUBTRANS(GetOldestXmin(true, false));
7347 /* All real work is done, but log before releasing lock. */
7348 if (log_checkpoints)
7349 LogCheckpointEnd(false);
7351 TRACE_POSTGRESQL_CHECKPOINT_DONE(CheckpointStats.ckpt_bufs_written,
7353 CheckpointStats.ckpt_segs_added,
7354 CheckpointStats.ckpt_segs_removed,
7355 CheckpointStats.ckpt_segs_recycled);
7357 LWLockRelease(CheckpointLock);
7361 * Flush all data in shared memory to disk, and fsync
7363 * This is the common code shared between regular checkpoints and
7364 * recovery restartpoints.
7367 CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
7370 CheckPointSUBTRANS();
7371 CheckPointMultiXact();
7372 CheckPointRelationMap();
7373 CheckPointBuffers(flags); /* performs all required fsyncs */
7374 /* We deliberately delay 2PC checkpointing as long as possible */
7375 CheckPointTwoPhase(checkPointRedo);
7379 * Save a checkpoint for recovery restart if appropriate
7381 * This function is called each time a checkpoint record is read from XLOG.
7382 * It must determine whether the checkpoint represents a safe restartpoint or
7383 * not. If so, the checkpoint record is stashed in shared memory so that
7384 * CreateRestartPoint can consult it. (Note that the latter function is
7385 * executed by the bgwriter, while this one will be executed by the startup
7389 RecoveryRestartPoint(const CheckPoint *checkPoint)
7393 /* use volatile pointer to prevent code rearrangement */
7394 volatile XLogCtlData *xlogctl = XLogCtl;
7397 * Is it safe to checkpoint? We must ask each of the resource managers
7398 * whether they have any partial state information that might prevent a
7399 * correct restart from this point. If so, we skip this opportunity, but
7400 * return at the next checkpoint record for another try.
7402 for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
7404 if (RmgrTable[rmid].rm_safe_restartpoint != NULL)
7405 if (!(RmgrTable[rmid].rm_safe_restartpoint()))
7407 elog(trace_recovery(DEBUG2), "RM %d not safe to record restart point at %X/%X",
7409 checkPoint->redo.xlogid,
7410 checkPoint->redo.xrecoff);
7416 * Copy the checkpoint record to shared memory, so that bgwriter can use
7417 * it the next time it wants to perform a restartpoint.
7419 SpinLockAcquire(&xlogctl->info_lck);
7420 XLogCtl->lastCheckPointRecPtr = ReadRecPtr;
7421 memcpy(&XLogCtl->lastCheckPoint, checkPoint, sizeof(CheckPoint));
7422 SpinLockRelease(&xlogctl->info_lck);
7426 * Establish a restartpoint if possible.
7428 * This is similar to CreateCheckPoint, but is used during WAL recovery
7429 * to establish a point from which recovery can roll forward without
7430 * replaying the entire recovery log.
7432 * Returns true if a new restartpoint was established. We can only establish
7433 * a restartpoint if we have replayed a safe checkpoint record since last
7437 CreateRestartPoint(int flags)
7439 XLogRecPtr lastCheckPointRecPtr;
7440 CheckPoint lastCheckPoint;
7444 /* use volatile pointer to prevent code rearrangement */
7445 volatile XLogCtlData *xlogctl = XLogCtl;
7448 * Acquire CheckpointLock to ensure only one restartpoint or checkpoint
7449 * happens at a time.
7451 LWLockAcquire(CheckpointLock, LW_EXCLUSIVE);
7453 /* Get a local copy of the last safe checkpoint record. */
7454 SpinLockAcquire(&xlogctl->info_lck);
7455 lastCheckPointRecPtr = xlogctl->lastCheckPointRecPtr;
7456 memcpy(&lastCheckPoint, &XLogCtl->lastCheckPoint, sizeof(CheckPoint));
7457 SpinLockRelease(&xlogctl->info_lck);
7460 * Check that we're still in recovery mode. It's ok if we exit recovery
7461 * mode after this check, the restart point is valid anyway.
7463 if (!RecoveryInProgress())
7466 (errmsg("skipping restartpoint, recovery has already ended")));
7467 LWLockRelease(CheckpointLock);
7472 * If the last checkpoint record we've replayed is already our last
7473 * restartpoint, we can't perform a new restart point. We still update
7474 * minRecoveryPoint in that case, so that if this is a shutdown restart
7475 * point, we won't start up earlier than before. That's not strictly
7476 * necessary, but when we get hot standby capability, it would be rather
7477 * weird if the database opened up for read-only connections at a
7478 * point-in-time before the last shutdown. Such time travel is still
7479 * possible in case of immediate shutdown, though.
7481 * We don't explicitly advance minRecoveryPoint when we do create a
7482 * restartpoint. It's assumed that flushing the buffers will do that as a
7485 if (XLogRecPtrIsInvalid(lastCheckPointRecPtr) ||
7486 XLByteLE(lastCheckPoint.redo, ControlFile->checkPointCopy.redo))
7488 XLogRecPtr InvalidXLogRecPtr = {0, 0};
7491 (errmsg("skipping restartpoint, already performed at %X/%X",
7492 lastCheckPoint.redo.xlogid, lastCheckPoint.redo.xrecoff)));
7494 UpdateMinRecoveryPoint(InvalidXLogRecPtr, true);
7495 LWLockRelease(CheckpointLock);
7499 if (log_checkpoints)
7502 * Prepare to accumulate statistics.
7504 MemSet(&CheckpointStats, 0, sizeof(CheckpointStats));
7505 CheckpointStats.ckpt_start_t = GetCurrentTimestamp();
7507 LogCheckpointStart(flags, true);
7510 CheckPointGuts(lastCheckPoint.redo, flags);
7513 * Select point at which we can truncate the xlog, which we base on the
7514 * prior checkpoint's earliest info.
7516 XLByteToSeg(ControlFile->checkPointCopy.redo, _logId, _logSeg);
7519 * Update pg_control, using current time. Check that it still shows
7520 * IN_ARCHIVE_RECOVERY state and an older checkpoint, else do nothing;
7521 * this is a quick hack to make sure nothing really bad happens if somehow
7522 * we get here after the end-of-recovery checkpoint.
7524 LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
7525 if (ControlFile->state == DB_IN_ARCHIVE_RECOVERY &&
7526 XLByteLT(ControlFile->checkPointCopy.redo, lastCheckPoint.redo))
7528 ControlFile->prevCheckPoint = ControlFile->checkPoint;
7529 ControlFile->checkPoint = lastCheckPointRecPtr;
7530 ControlFile->checkPointCopy = lastCheckPoint;
7531 ControlFile->time = (pg_time_t) time(NULL);
7532 UpdateControlFile();
7534 LWLockRelease(ControlFileLock);
7537 * Delete old log files (those no longer needed even for previous
7538 * checkpoint/restartpoint) to prevent the disk holding the xlog from
7539 * growing full. We don't need do this during normal recovery, but during
7540 * streaming recovery we have to or the disk will eventually fill up from
7541 * old log files streamed from master.
7543 if (WalRcvInProgress() && (_logId || _logSeg))
7547 /* Get the current (or recent) end of xlog */
7548 endptr = GetWalRcvWriteRecPtr();
7550 PrevLogSeg(_logId, _logSeg);
7551 RemoveOldXlogFiles(_logId, _logSeg, endptr);
7554 * Make more log segments if needed. (Do this after recycling old log
7555 * segments, since that may supply some of the needed files.)
7557 PreallocXlogFiles(endptr);
7561 * Currently, there is no need to truncate pg_subtrans during recovery. If
7562 * we did do that, we will need to have called StartupSUBTRANS() already
7563 * and then TruncateSUBTRANS() would go here.
7566 /* All real work is done, but log before releasing lock. */
7567 if (log_checkpoints)
7568 LogCheckpointEnd(true);
7570 ereport((log_checkpoints ? LOG : DEBUG2),
7571 (errmsg("recovery restart point at %X/%X with latest known log time %s",
7572 lastCheckPoint.redo.xlogid, lastCheckPoint.redo.xrecoff,
7573 timestamptz_to_str(GetLatestXLogTime()))));
7575 LWLockRelease(CheckpointLock);
7578 * Finally, execute restartpoint_command, if any.
7580 if (XLogCtl->restartPointCommand[0])
7581 ExecuteRecoveryCommand(XLogCtl->restartPointCommand,
7582 "restartpoint_command",
7589 * Write a NEXTOID log record
7592 XLogPutNextOid(Oid nextOid)
7596 rdata.data = (char *) (&nextOid);
7597 rdata.len = sizeof(Oid);
7598 rdata.buffer = InvalidBuffer;
7600 (void) XLogInsert(RM_XLOG_ID, XLOG_NEXTOID, &rdata);
7603 * We need not flush the NEXTOID record immediately, because any of the
7604 * just-allocated OIDs could only reach disk as part of a tuple insert or
7605 * update that would have its own XLOG record that must follow the NEXTOID
7606 * record. Therefore, the standard buffer LSN interlock applied to those
7607 * records will ensure no such OID reaches disk before the NEXTOID record
7610 * Note, however, that the above statement only covers state "within" the
7611 * database. When we use a generated OID as a file or directory name, we
7612 * are in a sense violating the basic WAL rule, because that filesystem
7613 * change may reach disk before the NEXTOID WAL record does. The impact
7614 * of this is that if a database crash occurs immediately afterward, we
7615 * might after restart re-generate the same OID and find that it conflicts
7616 * with the leftover file or directory. But since for safety's sake we
7617 * always loop until finding a nonconflicting filename, this poses no real
7618 * problem in practice. See pgsql-hackers discussion 27-Sep-2006.
7623 * Write an XLOG SWITCH record.
7625 * Here we just blindly issue an XLogInsert request for the record.
7626 * All the magic happens inside XLogInsert.
7628 * The return value is either the end+1 address of the switch record,
7629 * or the end+1 address of the prior segment if we did not need to
7630 * write a switch record because we are already at segment start.
7633 RequestXLogSwitch(void)
7638 /* XLOG SWITCH, alone among xlog record types, has no data */
7639 rdata.buffer = InvalidBuffer;
7644 RecPtr = XLogInsert(RM_XLOG_ID, XLOG_SWITCH, &rdata);
7650 * Write an XLOG UNLOGGED record, indicating that some operation was
7651 * performed on data that we fsync()'d directly to disk, skipping
7654 * Such operations screw up archive recovery, so we complain if we see
7655 * these records during archive recovery. That shouldn't happen in a
7656 * correctly configured server, but you can induce it by temporarily
7657 * disabling archiving and restarting, so it's good to at least get a
7658 * warning of silent data loss in such cases. These records serve no
7659 * other purpose and are simply ignored during crash recovery.
7662 XLogReportUnloggedStatement(char *reason)
7666 rdata.buffer = InvalidBuffer;
7667 rdata.data = reason;
7668 rdata.len = strlen(reason) + 1;
7671 XLogInsert(RM_XLOG_ID, XLOG_UNLOGGED, &rdata);
7675 * XLOG resource manager's routines
7677 * Definitions of info values are in include/catalog/pg_control.h, though
7678 * not all record types are related to control file updates.
7681 xlog_redo(XLogRecPtr lsn, XLogRecord *record)
7683 uint8 info = record->xl_info & ~XLR_INFO_MASK;
7685 /* Backup blocks are not used in xlog records */
7686 Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
7688 if (info == XLOG_NEXTOID)
7692 memcpy(&nextOid, XLogRecGetData(record), sizeof(Oid));
7693 if (ShmemVariableCache->nextOid < nextOid)
7695 ShmemVariableCache->nextOid = nextOid;
7696 ShmemVariableCache->oidCount = 0;
7699 else if (info == XLOG_CHECKPOINT_SHUTDOWN)
7701 CheckPoint checkPoint;
7703 memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
7704 /* In a SHUTDOWN checkpoint, believe the counters exactly */
7705 ShmemVariableCache->nextXid = checkPoint.nextXid;
7706 ShmemVariableCache->nextOid = checkPoint.nextOid;
7707 ShmemVariableCache->oidCount = 0;
7708 MultiXactSetNextMXact(checkPoint.nextMulti,
7709 checkPoint.nextMultiOffset);
7710 SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
7712 /* Check to see if any changes to max_connections give problems */
7713 if (standbyState != STANDBY_DISABLED)
7714 CheckRequiredParameterValues(checkPoint);
7717 * If we see a shutdown checkpoint, we know that nothing was
7718 * running on the master at this point. So fake-up an empty
7719 * running-xacts record and use that here and now. Recover
7720 * additional standby state for prepared transactions.
7722 if (standbyState >= STANDBY_INITIALIZED)
7724 TransactionId *xids;
7726 TransactionId oldestActiveXID;
7727 RunningTransactionsData running;
7729 oldestActiveXID = PrescanPreparedTransactions(&xids, &nxids);
7732 * Construct a RunningTransactions snapshot representing a shut
7733 * down server, with only prepared transactions still alive.
7734 * We're never overflowed at this point because all subxids
7735 * are listed with their parent prepared transactions.
7737 running.xcnt = nxids;
7738 running.subxid_overflow = false;
7739 running.nextXid = checkPoint.nextXid;
7740 running.oldestRunningXid = oldestActiveXID;
7741 running.xids = xids;
7743 ProcArrayApplyRecoveryInfo(&running);
7745 StandbyRecoverPreparedTransactions(true);
7748 /* ControlFile->checkPointCopy always tracks the latest ckpt XID */
7749 ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
7750 ControlFile->checkPointCopy.nextXid = checkPoint.nextXid;
7753 * TLI may change in a shutdown checkpoint, but it shouldn't decrease
7755 if (checkPoint.ThisTimeLineID != ThisTimeLineID)
7757 if (checkPoint.ThisTimeLineID < ThisTimeLineID ||
7758 !list_member_int(expectedTLIs,
7759 (int) checkPoint.ThisTimeLineID))
7761 (errmsg("unexpected timeline ID %u (after %u) in checkpoint record",
7762 checkPoint.ThisTimeLineID, ThisTimeLineID)));
7763 /* Following WAL records should be run with new TLI */
7764 ThisTimeLineID = checkPoint.ThisTimeLineID;
7767 RecoveryRestartPoint(&checkPoint);
7769 else if (info == XLOG_CHECKPOINT_ONLINE)
7771 CheckPoint checkPoint;
7773 memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
7774 /* In an ONLINE checkpoint, treat the counters like NEXTOID */
7775 if (TransactionIdPrecedes(ShmemVariableCache->nextXid,
7776 checkPoint.nextXid))
7777 ShmemVariableCache->nextXid = checkPoint.nextXid;
7778 if (ShmemVariableCache->nextOid < checkPoint.nextOid)
7780 ShmemVariableCache->nextOid = checkPoint.nextOid;
7781 ShmemVariableCache->oidCount = 0;
7783 MultiXactAdvanceNextMXact(checkPoint.nextMulti,
7784 checkPoint.nextMultiOffset);
7785 if (TransactionIdPrecedes(ShmemVariableCache->oldestXid,
7786 checkPoint.oldestXid))
7787 SetTransactionIdLimit(checkPoint.oldestXid,
7788 checkPoint.oldestXidDB);
7790 /* ControlFile->checkPointCopy always tracks the latest ckpt XID */
7791 ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
7792 ControlFile->checkPointCopy.nextXid = checkPoint.nextXid;
7794 /* TLI should not change in an on-line checkpoint */
7795 if (checkPoint.ThisTimeLineID != ThisTimeLineID)
7797 (errmsg("unexpected timeline ID %u (should be %u) in checkpoint record",
7798 checkPoint.ThisTimeLineID, ThisTimeLineID)));
7800 RecoveryRestartPoint(&checkPoint);
7802 else if (info == XLOG_NOOP)
7804 /* nothing to do here */
7806 else if (info == XLOG_SWITCH)
7808 /* nothing to do here */
7810 else if (info == XLOG_BACKUP_END)
7812 XLogRecPtr startpoint;
7814 memcpy(&startpoint, XLogRecGetData(record), sizeof(startpoint));
7816 if (XLByteEQ(ControlFile->backupStartPoint, startpoint))
7819 * We have reached the end of base backup, the point where
7820 * pg_stop_backup() was done. The data on disk is now consistent.
7821 * Reset backupStartPoint, and update minRecoveryPoint to make
7822 * sure we don't allow starting up at an earlier point even if
7823 * recovery is stopped and restarted soon after this.
7825 elog(DEBUG1, "end of backup reached");
7827 LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
7829 if (XLByteLT(ControlFile->minRecoveryPoint, lsn))
7830 ControlFile->minRecoveryPoint = lsn;
7831 MemSet(&ControlFile->backupStartPoint, 0, sizeof(XLogRecPtr));
7832 UpdateControlFile();
7834 LWLockRelease(ControlFileLock);
7837 else if (info == XLOG_UNLOGGED)
7839 if (InArchiveRecovery)
7842 * Note: We don't print the reason string from the record, because
7843 * that gets added as a line using xlog_desc()
7846 (errmsg("unlogged operation performed, data may be missing"),
7847 errhint("This can happen if you temporarily disable archive_mode without taking a new base backup.")));
7853 xlog_desc(StringInfo buf, uint8 xl_info, char *rec)
7855 uint8 info = xl_info & ~XLR_INFO_MASK;
7857 if (info == XLOG_CHECKPOINT_SHUTDOWN ||
7858 info == XLOG_CHECKPOINT_ONLINE)
7860 CheckPoint *checkpoint = (CheckPoint *) rec;
7862 appendStringInfo(buf, "checkpoint: redo %X/%X; "
7863 "tli %u; xid %u/%u; oid %u; multi %u; offset %u; "
7864 "oldest xid %u in DB %u; oldest running xid %u; %s",
7865 checkpoint->redo.xlogid, checkpoint->redo.xrecoff,
7866 checkpoint->ThisTimeLineID,
7867 checkpoint->nextXidEpoch, checkpoint->nextXid,
7868 checkpoint->nextOid,
7869 checkpoint->nextMulti,
7870 checkpoint->nextMultiOffset,
7871 checkpoint->oldestXid,
7872 checkpoint->oldestXidDB,
7873 checkpoint->oldestActiveXid,
7874 (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
7876 else if (info == XLOG_NOOP)
7878 appendStringInfo(buf, "xlog no-op");
7880 else if (info == XLOG_NEXTOID)
7884 memcpy(&nextOid, rec, sizeof(Oid));
7885 appendStringInfo(buf, "nextOid: %u", nextOid);
7887 else if (info == XLOG_SWITCH)
7889 appendStringInfo(buf, "xlog switch");
7891 else if (info == XLOG_BACKUP_END)
7893 XLogRecPtr startpoint;
7895 memcpy(&startpoint, rec, sizeof(XLogRecPtr));
7896 appendStringInfo(buf, "backup end: %X/%X",
7897 startpoint.xlogid, startpoint.xrecoff);
7899 else if (info == XLOG_UNLOGGED)
7903 appendStringInfo(buf, "unlogged operation: %s", reason);
7906 appendStringInfo(buf, "UNKNOWN");
7912 xlog_outrec(StringInfo buf, XLogRecord *record)
7916 appendStringInfo(buf, "prev %X/%X; xid %u",
7917 record->xl_prev.xlogid, record->xl_prev.xrecoff,
7920 appendStringInfo(buf, "; len %u",
7923 for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
7925 if (record->xl_info & XLR_SET_BKP_BLOCK(i))
7926 appendStringInfo(buf, "; bkpb%d", i + 1);
7929 appendStringInfo(buf, ": %s", RmgrTable[record->xl_rmid].rm_name);
7931 #endif /* WAL_DEBUG */
7935 * Return the (possible) sync flag used for opening a file, depending on the
7936 * value of the GUC wal_sync_method.
7939 get_sync_bit(int method)
7941 int o_direct_flag = 0;
7943 /* If fsync is disabled, never open in sync mode */
7948 * Optimize writes by bypassing kernel cache with O_DIRECT when using
7949 * O_SYNC, O_DSYNC or O_FSYNC. But only if archiving and streaming are
7950 * disabled, otherwise the archive command or walsender process will read
7951 * the WAL soon after writing it, which is guaranteed to cause a physical
7952 * read if we bypassed the kernel cache. We also skip the
7953 * posix_fadvise(POSIX_FADV_DONTNEED) call in XLogFileClose() for the same
7956 * Never use O_DIRECT in walreceiver process for similar reasons; the WAL
7957 * written by walreceiver is normally read by the startup process soon
7958 * after its written. Also, walreceiver performs unaligned writes, which
7959 * don't work with O_DIRECT, so it is required for correctness too.
7961 if (!XLogIsNeeded() && !am_walreceiver)
7962 o_direct_flag = PG_O_DIRECT;
7967 * enum values for all sync options are defined even if they are
7968 * not supported on the current platform. But if not, they are
7969 * not included in the enum option array, and therefore will never
7972 case SYNC_METHOD_FSYNC:
7973 case SYNC_METHOD_FSYNC_WRITETHROUGH:
7974 case SYNC_METHOD_FDATASYNC:
7976 #ifdef OPEN_SYNC_FLAG
7977 case SYNC_METHOD_OPEN:
7978 return OPEN_SYNC_FLAG | o_direct_flag;
7980 #ifdef OPEN_DATASYNC_FLAG
7981 case SYNC_METHOD_OPEN_DSYNC:
7982 return OPEN_DATASYNC_FLAG | o_direct_flag;
7985 /* can't happen (unless we are out of sync with option array) */
7986 elog(ERROR, "unrecognized wal_sync_method: %d", method);
7987 return 0; /* silence warning */
7995 assign_xlog_sync_method(int new_sync_method, bool doit, GucSource source)
8000 if (sync_method != new_sync_method)
8003 * To ensure that no blocks escape unsynced, force an fsync on the
8004 * currently open log segment (if any). Also, if the open flag is
8005 * changing, close the log file so it will be reopened (with new flag
8008 if (openLogFile >= 0)
8010 if (pg_fsync(openLogFile) != 0)
8012 (errcode_for_file_access(),
8013 errmsg("could not fsync log file %u, segment %u: %m",
8014 openLogId, openLogSeg)));
8015 if (get_sync_bit(sync_method) != get_sync_bit(new_sync_method))
8025 * Issue appropriate kind of fsync (if any) for an XLOG output file.
8027 * 'fd' is a file descriptor for the XLOG file to be fsync'd.
8028 * 'log' and 'seg' are for error reporting purposes.
8031 issue_xlog_fsync(int fd, uint32 log, uint32 seg)
8033 switch (sync_method)
8035 case SYNC_METHOD_FSYNC:
8036 if (pg_fsync_no_writethrough(fd) != 0)
8038 (errcode_for_file_access(),
8039 errmsg("could not fsync log file %u, segment %u: %m",
8042 #ifdef HAVE_FSYNC_WRITETHROUGH
8043 case SYNC_METHOD_FSYNC_WRITETHROUGH:
8044 if (pg_fsync_writethrough(fd) != 0)
8046 (errcode_for_file_access(),
8047 errmsg("could not fsync write-through log file %u, segment %u: %m",
8051 #ifdef HAVE_FDATASYNC
8052 case SYNC_METHOD_FDATASYNC:
8053 if (pg_fdatasync(fd) != 0)
8055 (errcode_for_file_access(),
8056 errmsg("could not fdatasync log file %u, segment %u: %m",
8060 case SYNC_METHOD_OPEN:
8061 case SYNC_METHOD_OPEN_DSYNC:
8062 /* write synced it already */
8065 elog(PANIC, "unrecognized wal_sync_method: %d", sync_method);
8072 * pg_start_backup: set up for taking an on-line backup dump
8074 * Essentially what this does is to create a backup label file in $PGDATA,
8075 * where it will be archived as part of the backup dump. The label file
8076 * contains the user-supplied label string (typically this would be used
8077 * to tell where the backup dump will be stored) and the starting time and
8078 * starting WAL location for the dump.
8081 pg_start_backup(PG_FUNCTION_ARGS)
8083 text *backupid = PG_GETARG_TEXT_P(0);
8084 bool fast = PG_GETARG_BOOL(1);
8086 XLogRecPtr checkpointloc;
8087 XLogRecPtr startpoint;
8088 pg_time_t stamp_time;
8090 char xlogfilename[MAXFNAMELEN];
8093 struct stat stat_buf;
8098 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
8099 errmsg("must be superuser to run a backup")));
8101 if (RecoveryInProgress())
8103 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
8104 errmsg("recovery is in progress"),
8105 errhint("WAL control functions cannot be executed during recovery.")));
8107 if (!XLogArchivingActive())
8109 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
8110 errmsg("WAL archiving is not active"),
8111 errhint("archive_mode must be enabled at server start.")));
8113 if (!XLogArchiveCommandSet())
8115 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
8116 errmsg("WAL archiving is not active"),
8117 errhint("archive_command must be defined before "
8118 "online backups can be made safely.")));
8120 backupidstr = text_to_cstring(backupid);
8123 * Mark backup active in shared memory. We must do full-page WAL writes
8124 * during an on-line backup even if not doing so at other times, because
8125 * it's quite possible for the backup dump to obtain a "torn" (partially
8126 * written) copy of a database page if it reads the page concurrently with
8127 * our write to the same page. This can be fixed as long as the first
8128 * write to the page in the WAL sequence is a full-page write. Hence, we
8129 * turn on forcePageWrites and then force a CHECKPOINT, to ensure there
8130 * are no dirty pages in shared memory that might get dumped while the
8131 * backup is in progress without having a corresponding WAL record. (Once
8132 * the backup is complete, we need not force full-page writes anymore,
8133 * since we expect that any pages not modified during the backup interval
8134 * must have been correctly captured by the backup.)
8136 * We must hold WALInsertLock to change the value of forcePageWrites, to
8137 * ensure adequate interlocking against XLogInsert().
8139 LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
8140 if (XLogCtl->Insert.forcePageWrites)
8142 LWLockRelease(WALInsertLock);
8144 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
8145 errmsg("a backup is already in progress"),
8146 errhint("Run pg_stop_backup() and try again.")));
8148 XLogCtl->Insert.forcePageWrites = true;
8149 LWLockRelease(WALInsertLock);
8152 * Force an XLOG file switch before the checkpoint, to ensure that the WAL
8153 * segment the checkpoint is written to doesn't contain pages with old
8154 * timeline IDs. That would otherwise happen if you called
8155 * pg_start_backup() right after restoring from a PITR archive: the first
8156 * WAL segment containing the startup checkpoint has pages in the
8157 * beginning with the old timeline ID. That can cause trouble at recovery:
8158 * we won't have a history file covering the old timeline if pg_xlog
8159 * directory was not included in the base backup and the WAL archive was
8160 * cleared too before starting the backup.
8162 RequestXLogSwitch();
8164 /* Ensure we release forcePageWrites if fail below */
8165 PG_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) 0);
8168 * Force a CHECKPOINT. Aside from being necessary to prevent torn
8169 * page problems, this guarantees that two successive backup runs will
8170 * have different checkpoint positions and hence different history
8171 * file names, even if nothing happened in between.
8173 * We use CHECKPOINT_IMMEDIATE only if requested by user (via passing
8174 * fast = true). Otherwise this can take awhile.
8176 RequestCheckpoint(CHECKPOINT_FORCE | CHECKPOINT_WAIT |
8177 (fast ? CHECKPOINT_IMMEDIATE : 0));
8180 * Now we need to fetch the checkpoint record location, and also its
8181 * REDO pointer. The oldest point in WAL that would be needed to
8182 * restore starting from the checkpoint is precisely the REDO pointer.
8184 LWLockAcquire(ControlFileLock, LW_SHARED);
8185 checkpointloc = ControlFile->checkPoint;
8186 startpoint = ControlFile->checkPointCopy.redo;
8187 LWLockRelease(ControlFileLock);
8189 XLByteToSeg(startpoint, _logId, _logSeg);
8190 XLogFileName(xlogfilename, ThisTimeLineID, _logId, _logSeg);
8192 /* Use the log timezone here, not the session timezone */
8193 stamp_time = (pg_time_t) time(NULL);
8194 pg_strftime(strfbuf, sizeof(strfbuf),
8195 "%Y-%m-%d %H:%M:%S %Z",
8196 pg_localtime(&stamp_time, log_timezone));
8199 * Check for existing backup label --- implies a backup is already
8200 * running. (XXX given that we checked forcePageWrites above, maybe
8201 * it would be OK to just unlink any such label file?)
8203 if (stat(BACKUP_LABEL_FILE, &stat_buf) != 0)
8205 if (errno != ENOENT)
8207 (errcode_for_file_access(),
8208 errmsg("could not stat file \"%s\": %m",
8209 BACKUP_LABEL_FILE)));
8213 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
8214 errmsg("a backup is already in progress"),
8215 errhint("If you're sure there is no backup in progress, remove file \"%s\" and try again.",
8216 BACKUP_LABEL_FILE)));
8219 * Okay, write the file
8221 fp = AllocateFile(BACKUP_LABEL_FILE, "w");
8224 (errcode_for_file_access(),
8225 errmsg("could not create file \"%s\": %m",
8226 BACKUP_LABEL_FILE)));
8227 fprintf(fp, "START WAL LOCATION: %X/%X (file %s)\n",
8228 startpoint.xlogid, startpoint.xrecoff, xlogfilename);
8229 fprintf(fp, "CHECKPOINT LOCATION: %X/%X\n",
8230 checkpointloc.xlogid, checkpointloc.xrecoff);
8231 fprintf(fp, "START TIME: %s\n", strfbuf);
8232 fprintf(fp, "LABEL: %s\n", backupidstr);
8233 if (fflush(fp) || ferror(fp) || FreeFile(fp))
8235 (errcode_for_file_access(),
8236 errmsg("could not write file \"%s\": %m",
8237 BACKUP_LABEL_FILE)));
8239 PG_END_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) 0);
8242 * We're done. As a convenience, return the starting WAL location.
8244 snprintf(xlogfilename, sizeof(xlogfilename), "%X/%X",
8245 startpoint.xlogid, startpoint.xrecoff);
8246 PG_RETURN_TEXT_P(cstring_to_text(xlogfilename));
8249 /* Error cleanup callback for pg_start_backup */
8251 pg_start_backup_callback(int code, Datum arg)
8253 /* Turn off forcePageWrites on failure */
8254 LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
8255 XLogCtl->Insert.forcePageWrites = false;
8256 LWLockRelease(WALInsertLock);
8260 * pg_stop_backup: finish taking an on-line backup dump
8262 * We write an end-of-backup WAL record, and remove the backup label file
8263 * created by pg_start_backup, creating a backup history file in pg_xlog
8264 * instead (whence it will immediately be archived). The backup history file
8265 * contains the same info found in the label file, plus the backup-end time
8266 * and WAL location. Before 9.0, the backup-end time was read from the backup
8267 * history file at the beginning of archive recovery, but we now use the WAL
8268 * record for that and the file is for informational and debug purposes only.
8270 * Note: different from CancelBackup which just cancels online backup mode.
8273 pg_stop_backup(PG_FUNCTION_ARGS)
8275 XLogRecPtr startpoint;
8276 XLogRecPtr stoppoint;
8278 pg_time_t stamp_time;
8280 char histfilepath[MAXPGPATH];
8281 char startxlogfilename[MAXFNAMELEN];
8282 char stopxlogfilename[MAXFNAMELEN];
8283 char lastxlogfilename[MAXFNAMELEN];
8284 char histfilename[MAXFNAMELEN];
8291 int seconds_before_warning;
8293 bool reported_waiting = false;
8297 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
8298 (errmsg("must be superuser to run a backup"))));
8300 if (RecoveryInProgress())
8302 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
8303 errmsg("recovery is in progress"),
8304 errhint("WAL control functions cannot be executed during recovery.")));
8306 if (!XLogArchivingActive())
8308 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
8309 errmsg("WAL archiving is not active"),
8310 errhint("archive_mode must be enabled at server start.")));
8313 * OK to clear forcePageWrites
8315 LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
8316 XLogCtl->Insert.forcePageWrites = false;
8317 LWLockRelease(WALInsertLock);
8320 * Open the existing label file
8322 lfp = AllocateFile(BACKUP_LABEL_FILE, "r");
8325 if (errno != ENOENT)
8327 (errcode_for_file_access(),
8328 errmsg("could not read file \"%s\": %m",
8329 BACKUP_LABEL_FILE)));
8331 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
8332 errmsg("a backup is not in progress")));
8336 * Read and parse the START WAL LOCATION line (this code is pretty crude,
8337 * but we are not expecting any variability in the file format).
8339 if (fscanf(lfp, "START WAL LOCATION: %X/%X (file %24s)%c",
8340 &startpoint.xlogid, &startpoint.xrecoff, startxlogfilename,
8341 &ch) != 4 || ch != '\n')
8343 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
8344 errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE)));
8347 * Write the backup-end xlog record
8349 rdata.data = (char *) (&startpoint);
8350 rdata.len = sizeof(startpoint);
8351 rdata.buffer = InvalidBuffer;
8353 stoppoint = XLogInsert(RM_XLOG_ID, XLOG_BACKUP_END, &rdata);
8356 * Force a switch to a new xlog segment file, so that the backup is valid
8357 * as soon as archiver moves out the current segment file.
8359 RequestXLogSwitch();
8361 XLByteToPrevSeg(stoppoint, _logId, _logSeg);
8362 XLogFileName(stopxlogfilename, ThisTimeLineID, _logId, _logSeg);
8364 /* Use the log timezone here, not the session timezone */
8365 stamp_time = (pg_time_t) time(NULL);
8366 pg_strftime(strfbuf, sizeof(strfbuf),
8367 "%Y-%m-%d %H:%M:%S %Z",
8368 pg_localtime(&stamp_time, log_timezone));
8371 * Write the backup history file
8373 XLByteToSeg(startpoint, _logId, _logSeg);
8374 BackupHistoryFilePath(histfilepath, ThisTimeLineID, _logId, _logSeg,
8375 startpoint.xrecoff % XLogSegSize);
8376 fp = AllocateFile(histfilepath, "w");
8379 (errcode_for_file_access(),
8380 errmsg("could not create file \"%s\": %m",
8382 fprintf(fp, "START WAL LOCATION: %X/%X (file %s)\n",
8383 startpoint.xlogid, startpoint.xrecoff, startxlogfilename);
8384 fprintf(fp, "STOP WAL LOCATION: %X/%X (file %s)\n",
8385 stoppoint.xlogid, stoppoint.xrecoff, stopxlogfilename);
8386 /* transfer remaining lines from label to history file */
8387 while ((ich = fgetc(lfp)) != EOF)
8389 fprintf(fp, "STOP TIME: %s\n", strfbuf);
8390 if (fflush(fp) || ferror(fp) || FreeFile(fp))
8392 (errcode_for_file_access(),
8393 errmsg("could not write file \"%s\": %m",
8397 * Close and remove the backup label file
8399 if (ferror(lfp) || FreeFile(lfp))
8401 (errcode_for_file_access(),
8402 errmsg("could not read file \"%s\": %m",
8403 BACKUP_LABEL_FILE)));
8404 if (unlink(BACKUP_LABEL_FILE) != 0)
8406 (errcode_for_file_access(),
8407 errmsg("could not remove file \"%s\": %m",
8408 BACKUP_LABEL_FILE)));
8411 * Clean out any no-longer-needed history files. As a side effect, this
8412 * will post a .ready file for the newly created history file, notifying
8413 * the archiver that history file may be archived immediately.
8415 CleanupBackupHistory();
8418 * Wait until both the last WAL file filled during backup and the history
8419 * file have been archived. We assume that the alphabetic sorting
8420 * property of the WAL files ensures any earlier WAL files are safely
8423 * We wait forever, since archive_command is supposed to work and we
8424 * assume the admin wanted his backup to work completely. If you don't
8425 * wish to wait, you can set statement_timeout. Also, some notices are
8426 * issued to clue in anyone who might be doing this interactively.
8428 XLByteToPrevSeg(stoppoint, _logId, _logSeg);
8429 XLogFileName(lastxlogfilename, ThisTimeLineID, _logId, _logSeg);
8431 XLByteToSeg(startpoint, _logId, _logSeg);
8432 BackupHistoryFileName(histfilename, ThisTimeLineID, _logId, _logSeg,
8433 startpoint.xrecoff % XLogSegSize);
8435 seconds_before_warning = 60;
8438 while (XLogArchiveIsBusy(lastxlogfilename) ||
8439 XLogArchiveIsBusy(histfilename))
8441 CHECK_FOR_INTERRUPTS();
8443 if (!reported_waiting && waits > 5)
8446 (errmsg("pg_stop_backup cleanup done, waiting for required WAL segments to be archived")));
8447 reported_waiting = true;
8450 pg_usleep(1000000L);
8452 if (++waits >= seconds_before_warning)
8454 seconds_before_warning *= 2; /* This wraps in >10 years... */
8456 (errmsg("pg_stop_backup still waiting for all required WAL segments to be archived (%d seconds elapsed)",
8458 errhint("Check that your archive_command is executing properly. "
8459 "pg_stop_backup can be cancelled safely, "
8460 "but the database backup will not be usable without all the WAL segments.")));
8465 (errmsg("pg_stop_backup complete, all required WAL segments have been archived")));
8468 * We're done. As a convenience, return the ending WAL location.
8470 snprintf(stopxlogfilename, sizeof(stopxlogfilename), "%X/%X",
8471 stoppoint.xlogid, stoppoint.xrecoff);
8472 PG_RETURN_TEXT_P(cstring_to_text(stopxlogfilename));
8476 * pg_switch_xlog: switch to next xlog file
8479 pg_switch_xlog(PG_FUNCTION_ARGS)
8481 XLogRecPtr switchpoint;
8482 char location[MAXFNAMELEN];
8486 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
8487 (errmsg("must be superuser to switch transaction log files"))));
8489 if (RecoveryInProgress())
8491 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
8492 errmsg("recovery is in progress"),
8493 errhint("WAL control functions cannot be executed during recovery.")));
8495 switchpoint = RequestXLogSwitch();
8498 * As a convenience, return the WAL location of the switch record
8500 snprintf(location, sizeof(location), "%X/%X",
8501 switchpoint.xlogid, switchpoint.xrecoff);
8502 PG_RETURN_TEXT_P(cstring_to_text(location));
8506 * Report the current WAL write location (same format as pg_start_backup etc)
8508 * This is useful for determining how much of WAL is visible to an external
8509 * archiving process. Note that the data before this point is written out
8510 * to the kernel, but is not necessarily synced to disk.
8513 pg_current_xlog_location(PG_FUNCTION_ARGS)
8515 char location[MAXFNAMELEN];
8517 if (RecoveryInProgress())
8519 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
8520 errmsg("recovery is in progress"),
8521 errhint("WAL control functions cannot be executed during recovery.")));
8523 /* Make sure we have an up-to-date local LogwrtResult */
8525 /* use volatile pointer to prevent code rearrangement */
8526 volatile XLogCtlData *xlogctl = XLogCtl;
8528 SpinLockAcquire(&xlogctl->info_lck);
8529 LogwrtResult = xlogctl->LogwrtResult;
8530 SpinLockRelease(&xlogctl->info_lck);
8533 snprintf(location, sizeof(location), "%X/%X",
8534 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff);
8535 PG_RETURN_TEXT_P(cstring_to_text(location));
8539 * Report the current WAL insert location (same format as pg_start_backup etc)
8541 * This function is mostly for debugging purposes.
8544 pg_current_xlog_insert_location(PG_FUNCTION_ARGS)
8546 XLogCtlInsert *Insert = &XLogCtl->Insert;
8547 XLogRecPtr current_recptr;
8548 char location[MAXFNAMELEN];
8550 if (RecoveryInProgress())
8552 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
8553 errmsg("recovery is in progress"),
8554 errhint("WAL control functions cannot be executed during recovery.")));
8557 * Get the current end-of-WAL position ... shared lock is sufficient
8559 LWLockAcquire(WALInsertLock, LW_SHARED);
8560 INSERT_RECPTR(current_recptr, Insert, Insert->curridx);
8561 LWLockRelease(WALInsertLock);
8563 snprintf(location, sizeof(location), "%X/%X",
8564 current_recptr.xlogid, current_recptr.xrecoff);
8565 PG_RETURN_TEXT_P(cstring_to_text(location));
8569 * Report the last WAL receive location (same format as pg_start_backup etc)
8571 * This is useful for determining how much of WAL is guaranteed to be received
8572 * and synced to disk by walreceiver.
8575 pg_last_xlog_receive_location(PG_FUNCTION_ARGS)
8578 char location[MAXFNAMELEN];
8580 recptr = GetWalRcvWriteRecPtr();
8582 snprintf(location, sizeof(location), "%X/%X",
8583 recptr.xlogid, recptr.xrecoff);
8584 PG_RETURN_TEXT_P(cstring_to_text(location));
8588 * Report the last WAL replay location (same format as pg_start_backup etc)
8590 * This is useful for determining how much of WAL is visible to read-only
8591 * connections during recovery.
8594 pg_last_xlog_replay_location(PG_FUNCTION_ARGS)
8596 /* use volatile pointer to prevent code rearrangement */
8597 volatile XLogCtlData *xlogctl = XLogCtl;
8599 char location[MAXFNAMELEN];
8601 SpinLockAcquire(&xlogctl->info_lck);
8602 recptr = xlogctl->recoveryLastRecPtr;
8603 SpinLockRelease(&xlogctl->info_lck);
8605 snprintf(location, sizeof(location), "%X/%X",
8606 recptr.xlogid, recptr.xrecoff);
8607 PG_RETURN_TEXT_P(cstring_to_text(location));
8611 * Compute an xlog file name and decimal byte offset given a WAL location,
8612 * such as is returned by pg_stop_backup() or pg_xlog_switch().
8614 * Note that a location exactly at a segment boundary is taken to be in
8615 * the previous segment. This is usually the right thing, since the
8616 * expected usage is to determine which xlog file(s) are ready to archive.
8619 pg_xlogfile_name_offset(PG_FUNCTION_ARGS)
8621 text *location = PG_GETARG_TEXT_P(0);
8623 unsigned int uxlogid;
8624 unsigned int uxrecoff;
8628 XLogRecPtr locationpoint;
8629 char xlogfilename[MAXFNAMELEN];
8632 TupleDesc resultTupleDesc;
8633 HeapTuple resultHeapTuple;
8636 if (RecoveryInProgress())
8638 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
8639 errmsg("recovery is in progress"),
8640 errhint("pg_xlogfile_name_offset() cannot be executed during recovery.")));
8643 * Read input and parse
8645 locationstr = text_to_cstring(location);
8647 if (sscanf(locationstr, "%X/%X", &uxlogid, &uxrecoff) != 2)
8649 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
8650 errmsg("could not parse transaction log location \"%s\"",
8653 locationpoint.xlogid = uxlogid;
8654 locationpoint.xrecoff = uxrecoff;
8657 * Construct a tuple descriptor for the result row. This must match this
8658 * function's pg_proc entry!
8660 resultTupleDesc = CreateTemplateTupleDesc(2, false);
8661 TupleDescInitEntry(resultTupleDesc, (AttrNumber) 1, "file_name",
8663 TupleDescInitEntry(resultTupleDesc, (AttrNumber) 2, "file_offset",
8666 resultTupleDesc = BlessTupleDesc(resultTupleDesc);
8671 XLByteToPrevSeg(locationpoint, xlogid, xlogseg);
8672 XLogFileName(xlogfilename, ThisTimeLineID, xlogid, xlogseg);
8674 values[0] = CStringGetTextDatum(xlogfilename);
8680 xrecoff = locationpoint.xrecoff - xlogseg * XLogSegSize;
8682 values[1] = UInt32GetDatum(xrecoff);
8686 * Tuple jam: Having first prepared your Datums, then squash together
8688 resultHeapTuple = heap_form_tuple(resultTupleDesc, values, isnull);
8690 result = HeapTupleGetDatum(resultHeapTuple);
8692 PG_RETURN_DATUM(result);
8696 * Compute an xlog file name given a WAL location,
8697 * such as is returned by pg_stop_backup() or pg_xlog_switch().
8700 pg_xlogfile_name(PG_FUNCTION_ARGS)
8702 text *location = PG_GETARG_TEXT_P(0);
8704 unsigned int uxlogid;
8705 unsigned int uxrecoff;
8708 XLogRecPtr locationpoint;
8709 char xlogfilename[MAXFNAMELEN];
8711 if (RecoveryInProgress())
8713 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
8714 errmsg("recovery is in progress"),
8715 errhint("pg_xlogfile_name() cannot be executed during recovery.")));
8717 locationstr = text_to_cstring(location);
8719 if (sscanf(locationstr, "%X/%X", &uxlogid, &uxrecoff) != 2)
8721 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
8722 errmsg("could not parse transaction log location \"%s\"",
8725 locationpoint.xlogid = uxlogid;
8726 locationpoint.xrecoff = uxrecoff;
8728 XLByteToPrevSeg(locationpoint, xlogid, xlogseg);
8729 XLogFileName(xlogfilename, ThisTimeLineID, xlogid, xlogseg);
8731 PG_RETURN_TEXT_P(cstring_to_text(xlogfilename));
8735 * read_backup_label: check to see if a backup_label file is present
8737 * If we see a backup_label during recovery, we assume that we are recovering
8738 * from a backup dump file, and we therefore roll forward from the checkpoint
8739 * identified by the label file, NOT what pg_control says. This avoids the
8740 * problem that pg_control might have been archived one or more checkpoints
8741 * later than the start of the dump, and so if we rely on it as the start
8742 * point, we will fail to restore a consistent database state.
8744 * Returns TRUE if a backup_label was found (and fills the checkpoint
8745 * location and its REDO location into *checkPointLoc and RedoStartLSN,
8746 * respectively); returns FALSE if not.
8749 read_backup_label(XLogRecPtr *checkPointLoc)
8751 char startxlogfilename[MAXFNAMELEN];
8757 * See if label file is present
8759 lfp = AllocateFile(BACKUP_LABEL_FILE, "r");
8762 if (errno != ENOENT)
8764 (errcode_for_file_access(),
8765 errmsg("could not read file \"%s\": %m",
8766 BACKUP_LABEL_FILE)));
8767 return false; /* it's not there, all is fine */
8771 * Read and parse the START WAL LOCATION and CHECKPOINT lines (this code
8772 * is pretty crude, but we are not expecting any variability in the file
8775 if (fscanf(lfp, "START WAL LOCATION: %X/%X (file %08X%16s)%c",
8776 &RedoStartLSN.xlogid, &RedoStartLSN.xrecoff, &tli,
8777 startxlogfilename, &ch) != 5 || ch != '\n')
8779 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
8780 errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE)));
8781 if (fscanf(lfp, "CHECKPOINT LOCATION: %X/%X%c",
8782 &checkPointLoc->xlogid, &checkPointLoc->xrecoff,
8783 &ch) != 3 || ch != '\n')
8785 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
8786 errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE)));
8787 if (ferror(lfp) || FreeFile(lfp))
8789 (errcode_for_file_access(),
8790 errmsg("could not read file \"%s\": %m",
8791 BACKUP_LABEL_FILE)));
8797 * Error context callback for errors occurring during rm_redo().
8800 rm_redo_error_callback(void *arg)
8802 XLogRecord *record = (XLogRecord *) arg;
8805 initStringInfo(&buf);
8806 RmgrTable[record->xl_rmid].rm_desc(&buf,
8808 XLogRecGetData(record));
8810 /* don't bother emitting empty description */
8812 errcontext("xlog redo %s", buf.data);
8818 * BackupInProgress: check if online backup mode is active
8820 * This is done by checking for existence of the "backup_label" file.
8823 BackupInProgress(void)
8825 struct stat stat_buf;
8827 return (stat(BACKUP_LABEL_FILE, &stat_buf) == 0);
8831 * CancelBackup: rename the "backup_label" file to cancel backup mode
8833 * If the "backup_label" file exists, it will be renamed to "backup_label.old".
8834 * Note that this will render an online backup in progress useless.
8835 * To correctly finish an online backup, pg_stop_backup must be called.
8840 struct stat stat_buf;
8842 /* if the file is not there, return */
8843 if (stat(BACKUP_LABEL_FILE, &stat_buf) < 0)
8846 /* remove leftover file from previously cancelled backup if it exists */
8847 unlink(BACKUP_LABEL_OLD);
8849 if (rename(BACKUP_LABEL_FILE, BACKUP_LABEL_OLD) == 0)
8852 (errmsg("online backup mode cancelled"),
8853 errdetail("\"%s\" was renamed to \"%s\".",
8854 BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
8859 (errcode_for_file_access(),
8860 errmsg("online backup mode was not cancelled"),
8861 errdetail("Could not rename \"%s\" to \"%s\": %m.",
8862 BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
8866 /* ------------------------------------------------------
8867 * Startup Process main entry point and signal handlers
8868 * ------------------------------------------------------
8872 * startupproc_quickdie() occurs when signalled SIGQUIT by the postmaster.
8874 * Some backend has bought the farm,
8875 * so we need to stop what we're doing and exit.
8878 startupproc_quickdie(SIGNAL_ARGS)
8880 PG_SETMASK(&BlockSig);
8883 * We DO NOT want to run proc_exit() callbacks -- we're here because
8884 * shared memory may be corrupted, so we don't want to try to clean up our
8885 * transaction. Just nail the windows shut and get out of town. Now that
8886 * there's an atexit callback to prevent third-party code from breaking
8887 * things by calling exit() directly, we have to reset the callbacks
8888 * explicitly to make this work as intended.
8893 * Note we do exit(2) not exit(0). This is to force the postmaster into a
8894 * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
8895 * backend. This is necessary precisely because we don't clean up our
8896 * shared memory state. (The "dead man switch" mechanism in pmsignal.c
8897 * should ensure the postmaster sees this as a crash, too, but no harm in
8898 * being doubly sure.)
8904 /* SIGHUP: set flag to re-read config file at next convenient time */
8906 StartupProcSigHupHandler(SIGNAL_ARGS)
8911 /* SIGTERM: set flag to abort redo and exit */
8913 StartupProcShutdownHandler(SIGNAL_ARGS)
8915 if (in_restore_command)
8918 shutdown_requested = true;
8921 /* Handle SIGHUP and SIGTERM signals of startup process */
8923 HandleStartupProcInterrupts(void)
8926 * Check if we were requested to re-read config file.
8931 ProcessConfigFile(PGC_SIGHUP);
8935 * Check if we were requested to exit without finishing recovery.
8937 if (shutdown_requested)
8941 * Emergency bailout if postmaster has died. This is to avoid the
8942 * necessity for manual cleanup of all postmaster children.
8944 if (IsUnderPostmaster && !PostmasterIsAlive(true))
8948 /* Main entry point for startup process */
8950 StartupProcessMain(void)
8953 * If possible, make this process a group leader, so that the postmaster
8954 * can signal any child processes too.
8958 elog(FATAL, "setsid() failed: %m");
8962 * Properly accept or ignore signals the postmaster might send us
8964 pqsignal(SIGHUP, StartupProcSigHupHandler); /* reload config file */
8965 pqsignal(SIGINT, SIG_IGN); /* ignore query cancel */
8966 pqsignal(SIGTERM, StartupProcShutdownHandler); /* request shutdown */
8967 pqsignal(SIGQUIT, startupproc_quickdie); /* hard crash time */
8968 if (XLogRequestRecoveryConnections)
8969 pqsignal(SIGALRM, handle_standby_sig_alarm); /* ignored unless
8972 pqsignal(SIGALRM, SIG_IGN);
8973 pqsignal(SIGPIPE, SIG_IGN);
8974 pqsignal(SIGUSR1, SIG_IGN);
8975 pqsignal(SIGUSR2, SIG_IGN);
8978 * Reset some signals that are accepted by postmaster but not here
8980 pqsignal(SIGCHLD, SIG_DFL);
8981 pqsignal(SIGTTIN, SIG_DFL);
8982 pqsignal(SIGTTOU, SIG_DFL);
8983 pqsignal(SIGCONT, SIG_DFL);
8984 pqsignal(SIGWINCH, SIG_DFL);
8987 * Unblock signals (they were blocked when the postmaster forked us)
8989 PG_SETMASK(&UnBlockSig);
8994 * Exit normally. Exit code 0 tells postmaster that we completed recovery
9001 * Read the XLOG page containing RecPtr into readBuf (if not read already).
9002 * Returns true if the page is read successfully.
9004 * This is responsible for restoring files from archive as needed, as well
9005 * as for waiting for the requested WAL record to arrive in standby mode.
9007 * 'emode' specifies the log level used for reporting "file not found" or
9008 * "end of WAL" situations in archive recovery, or in standby mode when a
9009 * trigger file is found. If set to WARNING or below, XLogPageRead() returns
9010 * false in those situations, on higher log levels the ereport() won't
9013 * In standby mode, if after a successful return of XLogPageRead() the
9014 * caller finds the record it's interested in to be broken, it should
9015 * ereport the error with the level determined by
9016 * emode_for_corrupt_record(), and then set "failedSources |= readSource"
9017 * and call XLogPageRead() again with the same arguments. This lets
9018 * XLogPageRead() to try fetching the record from another source, or to
9022 XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
9025 static XLogRecPtr receivedUpto = {0, 0};
9026 bool switched_segment = false;
9027 uint32 targetPageOff;
9028 uint32 targetRecOff;
9031 static pg_time_t last_fail_time = 0;
9033 XLByteToSeg(*RecPtr, targetId, targetSeg);
9034 targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
9035 targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
9037 /* Fast exit if we have read the record in the current buffer already */
9038 if (failedSources == 0 && targetId == readId && targetSeg == readSeg &&
9039 targetPageOff == readOff && targetRecOff < readLen)
9043 * See if we need to switch to a new segment because the requested record
9044 * is not in the currently open one.
9046 if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
9053 XLByteToSeg(*RecPtr, readId, readSeg);
9056 /* See if we need to retrieve more data */
9058 (readSource == XLOG_FROM_STREAM && !XLByteLT(*RecPtr, receivedUpto)))
9063 * In standby mode, wait for the requested record to become
9064 * available, either via restore_command succeeding to restore the
9065 * segment, or via walreceiver having streamed the record.
9069 if (WalRcvInProgress())
9072 * While walreceiver is active, wait for new WAL to arrive
9075 receivedUpto = GetWalRcvWriteRecPtr();
9076 if (XLByteLT(*RecPtr, receivedUpto))
9079 * Great, streamed far enough. Open the file if it's
9085 XLogFileRead(readId, readSeg, PANIC,
9087 XLOG_FROM_PG_XLOG, false);
9088 switched_segment = true;
9089 readSource = XLOG_FROM_STREAM;
9094 if (CheckForStandbyTrigger())
9098 * When streaming is active, we want to react quickly when
9099 * the next WAL record arrives, so sleep only a bit.
9101 pg_usleep(100000L); /* 100ms */
9109 * Until walreceiver manages to reconnect, poll the
9117 /* Reset curFileTLI if random fetch. */
9122 * Try to restore the file from archive, or read an
9123 * existing file from pg_xlog.
9125 sources = XLOG_FROM_ARCHIVE | XLOG_FROM_PG_XLOG;
9126 if (!(sources & ~failedSources))
9129 * We've exhausted all options for retrieving the
9135 * ... but sleep first if it hasn't been long since
9138 now = (pg_time_t) time(NULL);
9139 if ((now - last_fail_time) < 5)
9141 pg_usleep(1000000L * (5 - (now - last_fail_time)));
9142 now = (pg_time_t) time(NULL);
9144 last_fail_time = now;
9147 * If primary_conninfo is set, launch walreceiver to
9148 * try to stream the missing WAL, before retrying
9149 * to restore from archive/pg_xlog.
9151 * If fetching_ckpt is TRUE, RecPtr points to the
9152 * initial checkpoint location. In that case, we use
9153 * RedoStartLSN as the streaming start position instead
9154 * of RecPtr, so that when we later jump backwards to
9155 * start redo at RedoStartLSN, we will have the logs
9158 if (PrimaryConnInfo)
9160 RequestXLogStreaming(
9161 fetching_ckpt ? RedoStartLSN : *RecPtr,
9166 /* Don't try to read from a source that just failed */
9167 sources &= ~failedSources;
9168 readFile = XLogFileReadAnyTLI(readId, readSeg, DEBUG2,
9170 switched_segment = true;
9175 * Nope, not found in archive and/or pg_xlog.
9177 failedSources |= sources;
9180 * Check to see if the trigger file exists. Note that
9181 * we do this only after failure, so when you create
9182 * the trigger file, we still finish replaying as much
9183 * as we can from archive and pg_xlog before failover.
9185 if (CheckForStandbyTrigger())
9190 * This possibly-long loop needs to handle interrupts of
9193 HandleStartupProcInterrupts();
9198 /* In archive or crash recovery. */
9203 /* Reset curFileTLI if random fetch. */
9207 sources = XLOG_FROM_PG_XLOG;
9208 if (InArchiveRecovery)
9209 sources |= XLOG_FROM_ARCHIVE;
9211 readFile = XLogFileReadAnyTLI(readId, readSeg, emode,
9213 switched_segment = true;
9221 * At this point, we have the right segment open and if we're streaming
9222 * we know the requested record is in it.
9224 Assert(readFile != -1);
9227 * If the current segment is being streamed from master, calculate how
9228 * much of the current page we have received already. We know the
9229 * requested record has been received, but this is for the benefit of
9230 * future calls, to allow quick exit at the top of this function.
9232 if (readSource == XLOG_FROM_STREAM)
9234 if (RecPtr->xlogid != receivedUpto.xlogid ||
9235 (RecPtr->xrecoff / XLOG_BLCKSZ) != (receivedUpto.xrecoff / XLOG_BLCKSZ))
9237 readLen = XLOG_BLCKSZ;
9240 readLen = receivedUpto.xrecoff % XLogSegSize - targetPageOff;
9243 readLen = XLOG_BLCKSZ;
9245 if (switched_segment && targetPageOff != 0)
9248 * Whenever switching to a new WAL segment, we read the first page of
9249 * the file and validate its header, even if that's not where the
9250 * target record is. This is so that we can check the additional
9251 * identification info that is present in the first page's "long"
9255 if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
9257 ereport(emode_for_corrupt_record(emode, *RecPtr),
9258 (errcode_for_file_access(),
9259 errmsg("could not read from log file %u, segment %u, offset %u: %m",
9260 readId, readSeg, readOff)));
9261 goto next_record_is_invalid;
9263 if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
9264 goto next_record_is_invalid;
9267 /* Read the requested page */
9268 readOff = targetPageOff;
9269 if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
9271 ereport(emode_for_corrupt_record(emode, *RecPtr),
9272 (errcode_for_file_access(),
9273 errmsg("could not seek in log file %u, segment %u to offset %u: %m",
9274 readId, readSeg, readOff)));
9275 goto next_record_is_invalid;
9277 if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
9279 ereport(emode_for_corrupt_record(emode, *RecPtr),
9280 (errcode_for_file_access(),
9281 errmsg("could not read from log file %u, segment %u, offset %u: %m",
9282 readId, readSeg, readOff)));
9283 goto next_record_is_invalid;
9285 if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
9286 goto next_record_is_invalid;
9288 Assert(targetId == readId);
9289 Assert(targetSeg == readSeg);
9290 Assert(targetPageOff == readOff);
9291 Assert(targetRecOff < readLen);
9295 next_record_is_invalid:
9296 failedSources |= readSource;
9304 /* In standby-mode, keep trying */
9321 * Determine what log level should be used to report a corrupt WAL record
9322 * in the current WAL page, previously read by XLogPageRead().
9324 * 'emode' is the error mode that would be used to report a file-not-found
9325 * or legitimate end-of-WAL situation. It is upgraded to WARNING or PANIC
9326 * if a corrupt record is not expected at this point.
9328 * NOTE: This function remembers the RecPtr value it was last called with,
9329 * to suppress repeated messages about the same record. Only call this when
9330 * you are about to ereport(), or you might cause a later message to be
9331 * erroneously suppressed.
9334 emode_for_corrupt_record(int emode, XLogRecPtr RecPtr)
9336 static XLogRecPtr lastComplaint = {0, 0};
9339 * We don't expect any invalid records in archive or in records streamed
9340 * from master. Files in the archive should be complete, and we should
9341 * never hit the end of WAL because we stop and wait for more WAL to
9342 * arrive before replaying it.
9344 * In standby mode, throw a WARNING and keep retrying. If we're lucky
9345 * it's a transient error and will go away by itself, and in any case
9346 * it's better to keep the standby open for any possible read-only
9347 * queries. We throw WARNING in PITR as well, which causes the recovery
9348 * to end. That's questionable, you probably would want to abort the
9349 * recovery if the archive is corrupt and investigate the situation.
9350 * But that's the behavior we've always had, and it does make sense
9351 * for tools like pg_standby that implement a standby mode externally.
9353 if (readSource == XLOG_FROM_STREAM || readSource == XLOG_FROM_ARCHIVE)
9355 if (emode < WARNING)
9359 * If we retry reading a record in pg_xlog, only complain on the first
9360 * time to keep the noise down.
9362 else if (emode == LOG)
9364 if (XLByteEQ(RecPtr, lastComplaint))
9367 lastComplaint = RecPtr;
9373 * Check to see if the trigger file exists. If it does, request postmaster
9374 * to shut down walreceiver, wait for it to exit, remove the trigger
9375 * file, and return true.
9378 CheckForStandbyTrigger(void)
9380 struct stat stat_buf;
9382 if (TriggerFile == NULL)
9385 if (stat(TriggerFile, &stat_buf) == 0)
9388 (errmsg("trigger file found: %s", TriggerFile)));
9390 unlink(TriggerFile);