1 /*-------------------------------------------------------------------------
4 * PostgreSQL transaction log manager
7 * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
10 * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.88 2002/03/02 21:39:20 momjian Exp $
12 *-------------------------------------------------------------------------
23 #include <sys/types.h>
29 #include "access/clog.h"
30 #include "access/transam.h"
31 #include "access/xact.h"
32 #include "access/xlog.h"
33 #include "access/xlogutils.h"
34 #include "catalog/catversion.h"
35 #include "catalog/pg_control.h"
36 #include "storage/bufpage.h"
37 #include "storage/lwlock.h"
38 #include "storage/pmsignal.h"
39 #include "storage/proc.h"
40 #include "storage/sinval.h"
41 #include "storage/spin.h"
42 #include "utils/builtins.h"
43 #include "utils/relcache.h"
44 #include "utils/selfuncs.h"
45 #include "miscadmin.h"
49 * This chunk of hackery attempts to determine which file sync methods
50 * are available on the current platform, and to choose an appropriate
51 * default method. We assume that fsync() is always available, and that
52 * configure determined whether fdatasync() is.
54 #define SYNC_METHOD_FSYNC 0
55 #define SYNC_METHOD_FDATASYNC 1
56 #define SYNC_METHOD_OPEN 2 /* used for both O_SYNC and
60 #define OPEN_SYNC_FLAG O_SYNC
63 #define OPEN_SYNC_FLAG O_FSYNC
67 #if defined(OPEN_SYNC_FLAG)
68 #if defined(O_DSYNC) && (O_DSYNC != OPEN_SYNC_FLAG)
69 #define OPEN_DATASYNC_FLAG O_DSYNC
73 #if defined(OPEN_DATASYNC_FLAG)
74 #define DEFAULT_SYNC_METHOD_STR "open_datasync"
75 #define DEFAULT_SYNC_METHOD SYNC_METHOD_OPEN
76 #define DEFAULT_SYNC_FLAGBIT OPEN_DATASYNC_FLAG
78 #if defined(HAVE_FDATASYNC)
79 #define DEFAULT_SYNC_METHOD_STR "fdatasync"
80 #define DEFAULT_SYNC_METHOD SYNC_METHOD_FDATASYNC
81 #define DEFAULT_SYNC_FLAGBIT 0
83 #define DEFAULT_SYNC_METHOD_STR "fsync"
84 #define DEFAULT_SYNC_METHOD SYNC_METHOD_FSYNC
85 #define DEFAULT_SYNC_FLAGBIT 0
90 /* User-settable parameters */
91 int CheckPointSegments = 3;
93 int XLOGfiles = 0; /* # of files to preallocate during ckpt */
95 char *XLOG_sync_method = NULL;
96 const char XLOG_sync_method_default[] = DEFAULT_SYNC_METHOD_STR;
97 char XLOG_archive_dir[MAXPGPATH]; /* null string means
101 * XLOGfileslop is used in the code as the allowed "fuzz" in the number of
102 * preallocated XLOG segments --- we try to have at least XLOGfiles advance
103 * segments but no more than XLOGfiles+XLOGfileslop segments. This could
104 * be made a separate GUC variable, but at present I think it's sufficient
105 * to hardwire it as 2*CheckPointSegments+1. Under normal conditions, a
106 * checkpoint will free no more than 2*CheckPointSegments log segments, and
107 * we want to recycle all of them; the +1 allows boundary cases to happen
108 * without wasting a delete/create-segment cycle.
111 #define XLOGfileslop (2*CheckPointSegments + 1)
114 /* these are derived from XLOG_sync_method by assign_xlog_sync_method */
115 static int sync_method = DEFAULT_SYNC_METHOD;
116 static int open_sync_bit = DEFAULT_SYNC_FLAGBIT;
118 #define XLOG_SYNC_BIT (enableFsync ? open_sync_bit : 0)
120 #define MinXLOGbuffers 4
124 * ThisStartUpID will be same in all backends --- it identifies current
125 * instance of the database system.
127 StartUpID ThisStartUpID = 0;
129 /* Are we doing recovery by reading XLOG? */
130 bool InRecovery = false;
133 * MyLastRecPtr points to the start of the last XLOG record inserted by the
134 * current transaction. If MyLastRecPtr.xrecoff == 0, then we are not in
135 * a transaction or the transaction has not yet made any loggable changes.
137 * Note that XLOG records inserted outside transaction control are not
138 * reflected into MyLastRecPtr.
140 XLogRecPtr MyLastRecPtr = {0, 0};
143 * ProcLastRecPtr points to the start of the last XLOG record inserted by the
144 * current backend. It is updated for all inserts, transaction-controlled
147 static XLogRecPtr ProcLastRecPtr = {0, 0};
150 * RedoRecPtr is this backend's local copy of the REDO record pointer
151 * (which is almost but not quite the same as a pointer to the most recent
152 * CHECKPOINT record). We update this from the shared-memory copy,
153 * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
154 * hold the Insert lock). See XLogInsert for details.
156 static XLogRecPtr RedoRecPtr;
159 * Shared-memory data structures for XLOG control
161 * LogwrtRqst indicates a byte position that we need to write and/or fsync
162 * the log up to (all records before that point must be written or fsynced).
163 * LogwrtResult indicates the byte positions we have already written/fsynced.
164 * These structs are identical but are declared separately to indicate their
165 * slightly different functions.
167 * We do a lot of pushups to minimize the amount of access to lockable
168 * shared memory values. There are actually three shared-memory copies of
169 * LogwrtResult, plus one unshared copy in each backend. Here's how it works:
170 * XLogCtl->LogwrtResult is protected by info_lck
171 * XLogCtl->Write.LogwrtResult is protected by WALWriteLock
172 * XLogCtl->Insert.LogwrtResult is protected by WALInsertLock
173 * One must hold the associated lock to read or write any of these, but
174 * of course no lock is needed to read/write the unshared LogwrtResult.
176 * XLogCtl->LogwrtResult and XLogCtl->Write.LogwrtResult are both "always
177 * right", since both are updated by a write or flush operation before
178 * it releases WALWriteLock. The point of keeping XLogCtl->Write.LogwrtResult
179 * is that it can be examined/modified by code that already holds WALWriteLock
180 * without needing to grab info_lck as well.
182 * XLogCtl->Insert.LogwrtResult may lag behind the reality of the other two,
183 * but is updated when convenient. Again, it exists for the convenience of
184 * code that is already holding WALInsertLock but not the other locks.
186 * The unshared LogwrtResult may lag behind any or all of these, and again
187 * is updated when convenient.
189 * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
190 * (protected by info_lck), but we don't need to cache any copies of it.
192 * Note that this all works because the request and result positions can only
193 * advance forward, never back up, and so we can easily determine which of two
194 * values is "more up to date".
196 * info_lck is only held long enough to read/update the protected variables,
197 * so it's a plain spinlock. The other locks are held longer (potentially
198 * over I/O operations), so we use LWLocks for them. These locks are:
200 * WALInsertLock: must be held to insert a record into the WAL buffers.
202 * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
205 * ControlFileLock: must be held to read/update control file or create
208 * CheckpointLock: must be held to do a checkpoint (ensures only one
209 * checkpointer at a time; even though the postmaster won't launch
210 * parallel checkpoint processes, we need this because manual checkpoints
211 * could be launched simultaneously).
215 typedef struct XLogwrtRqst
217 XLogRecPtr Write; /* last byte + 1 to write out */
218 XLogRecPtr Flush; /* last byte + 1 to flush */
221 typedef struct XLogwrtResult
223 XLogRecPtr Write; /* last byte + 1 written out */
224 XLogRecPtr Flush; /* last byte + 1 flushed */
228 * Shared state data for XLogInsert.
230 typedef struct XLogCtlInsert
232 XLogwrtResult LogwrtResult; /* a recent value of LogwrtResult */
233 XLogRecPtr PrevRecord; /* start of previously-inserted record */
234 uint16 curridx; /* current block index in cache */
235 XLogPageHeader currpage; /* points to header of block in cache */
236 char *currpos; /* current insertion point in cache */
237 XLogRecPtr RedoRecPtr; /* current redo point for insertions */
241 * Shared state data for XLogWrite/XLogFlush.
243 typedef struct XLogCtlWrite
245 XLogwrtResult LogwrtResult; /* current value of LogwrtResult */
246 uint16 curridx; /* cache index of next block to write */
250 * Total shared-memory state for XLOG.
252 typedef struct XLogCtlData
254 /* Protected by WALInsertLock: */
255 XLogCtlInsert Insert;
256 /* Protected by info_lck: */
257 XLogwrtRqst LogwrtRqst;
258 XLogwrtResult LogwrtResult;
259 /* Protected by WALWriteLock: */
263 * These values do not change after startup, although the pointed-to
264 * pages and xlblocks values certainly do. Permission to read/write
265 * the pages and xlblocks values depends on WALInsertLock and
268 char *pages; /* buffers for unwritten XLOG pages */
269 XLogRecPtr *xlblocks; /* 1st byte ptr-s + BLCKSZ */
270 uint32 XLogCacheByte; /* # bytes in xlog buffers */
271 uint32 XLogCacheBlck; /* highest allocated xlog buffer index */
272 StartUpID ThisStartUpID;
274 /* This value is not protected by *any* lock... */
275 XLogRecPtr RedoRecPtr; /* see SetRedoRecPtr/GetRedoRecPtr */
277 slock_t info_lck; /* locks shared LogwrtRqst/LogwrtResult */
280 static XLogCtlData *XLogCtl = NULL;
283 * We maintain an image of pg_control in shared memory.
285 static ControlFileData *ControlFile = NULL;
288 * Macros for managing XLogInsert state. In most cases, the calling routine
289 * has local copies of XLogCtl->Insert and/or XLogCtl->Insert->curridx,
290 * so these are passed as parameters instead of being fetched via XLogCtl.
293 /* Free space remaining in the current xlog page buffer */
294 #define INSERT_FREESPACE(Insert) \
295 (BLCKSZ - ((Insert)->currpos - (char *) (Insert)->currpage))
297 /* Construct XLogRecPtr value for current insertion point */
298 #define INSERT_RECPTR(recptr,Insert,curridx) \
300 (recptr).xlogid = XLogCtl->xlblocks[curridx].xlogid, \
302 XLogCtl->xlblocks[curridx].xrecoff - INSERT_FREESPACE(Insert) \
306 /* Increment an xlogid/segment pair */
307 #define NextLogSeg(logId, logSeg) \
309 if ((logSeg) >= XLogSegsPerFile-1) \
318 /* Decrement an xlogid/segment pair (assume it's not 0,0) */
319 #define PrevLogSeg(logId, logSeg) \
326 (logSeg) = XLogSegsPerFile-1; \
331 * Compute ID and segment from an XLogRecPtr.
333 * For XLByteToSeg, do the computation at face value. For XLByteToPrevSeg,
334 * a boundary byte is taken to be in the previous segment. This is suitable
335 * for deciding which segment to write given a pointer to a record end,
338 #define XLByteToSeg(xlrp, logId, logSeg) \
339 ( logId = (xlrp).xlogid, \
340 logSeg = (xlrp).xrecoff / XLogSegSize \
342 #define XLByteToPrevSeg(xlrp, logId, logSeg) \
343 ( logId = (xlrp).xlogid, \
344 logSeg = ((xlrp).xrecoff - 1) / XLogSegSize \
348 * Is an XLogRecPtr within a particular XLOG segment?
350 * For XLByteInSeg, do the computation at face value. For XLByteInPrevSeg,
351 * a boundary byte is taken to be in the previous segment.
353 #define XLByteInSeg(xlrp, logId, logSeg) \
354 ((xlrp).xlogid == (logId) && \
355 (xlrp).xrecoff / XLogSegSize == (logSeg))
357 #define XLByteInPrevSeg(xlrp, logId, logSeg) \
358 ((xlrp).xlogid == (logId) && \
359 ((xlrp).xrecoff - 1) / XLogSegSize == (logSeg))
362 #define XLogFileName(path, log, seg) \
363 snprintf(path, MAXPGPATH, "%s/%08X%08X", \
366 #define PrevBufIdx(idx) \
367 (((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))
369 #define NextBufIdx(idx) \
370 (((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
372 #define XRecOffIsValid(xrecoff) \
373 ((xrecoff) % BLCKSZ >= SizeOfXLogPHD && \
374 (BLCKSZ - (xrecoff) % BLCKSZ) >= SizeOfXLogRecord)
377 * _INTL_MAXLOGRECSZ: max space needed for a record including header and
378 * any backup-block data.
380 #define _INTL_MAXLOGRECSZ (SizeOfXLogRecord + MAXLOGRECSZ + \
381 XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
384 /* File path names */
385 static char XLogDir[MAXPGPATH];
386 static char ControlFilePath[MAXPGPATH];
389 * Private, possibly out-of-date copy of shared LogwrtResult.
390 * See discussion above.
392 static XLogwrtResult LogwrtResult = {{0, 0}, {0, 0}};
395 * openLogFile is -1 or a kernel FD for an open log file segment.
396 * When it's open, openLogOff is the current seek offset in the file.
397 * openLogId/openLogSeg identify the segment. These variables are only
398 * used to write the XLOG, and so will normally refer to the active segment.
400 static int openLogFile = -1;
401 static uint32 openLogId = 0;
402 static uint32 openLogSeg = 0;
403 static uint32 openLogOff = 0;
406 * These variables are used similarly to the ones above, but for reading
407 * the XLOG. Note, however, that readOff generally represents the offset
408 * of the page just read, not the seek position of the FD itself, which
409 * will be just past that page.
411 static int readFile = -1;
412 static uint32 readId = 0;
413 static uint32 readSeg = 0;
414 static uint32 readOff = 0;
416 /* Buffer for currently read page (BLCKSZ bytes) */
417 static char *readBuf = NULL;
419 /* State information for XLOG reading */
420 static XLogRecPtr ReadRecPtr;
421 static XLogRecPtr EndRecPtr;
422 static XLogRecord *nextRecord = NULL;
423 static StartUpID lastReadSUI;
425 static bool InRedo = false;
428 static bool AdvanceXLInsertBuffer(void);
429 static void XLogWrite(XLogwrtRqst WriteRqst);
430 static int XLogFileInit(uint32 log, uint32 seg,
431 bool *use_existent, bool use_lock);
432 static bool InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath,
433 bool find_free, int max_advance,
435 static int XLogFileOpen(uint32 log, uint32 seg, bool econt);
436 static void PreallocXlogFiles(XLogRecPtr endptr);
437 static void MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr);
438 static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer);
439 static bool ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI);
440 static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr,
443 static void WriteControlFile(void);
444 static void ReadControlFile(void);
445 static char *str_time(time_t tnow);
446 static void xlog_outrec(char *buf, XLogRecord *record);
447 static void issue_xlog_fsync(void);
451 * Insert an XLOG record having the specified RMID and info bytes,
452 * with the body of the record being the data chunk(s) described by
453 * the rdata list (see xlog.h for notes about rdata).
455 * Returns XLOG pointer to end of record (beginning of next record).
456 * This can be used as LSN for data pages affected by the logged action.
457 * (LSN is the XLOG point up to which the XLOG must be flushed to disk
458 * before the data page can be written out. This implements the basic
459 * WAL rule "write the log before the data".)
461 * NB: this routine feels free to scribble on the XLogRecData structs,
462 * though not on the data they reference. This is OK since the XLogRecData
463 * structs are always just temporaries in the calling code.
466 XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
468 XLogCtlInsert *Insert = &XLogCtl->Insert;
470 XLogContRecord *contrecord;
472 XLogRecPtr WriteRqst;
476 Buffer dtbuf[XLR_MAX_BKP_BLOCKS];
477 bool dtbuf_bkp[XLR_MAX_BKP_BLOCKS];
478 BkpBlock dtbuf_xlg[XLR_MAX_BKP_BLOCKS];
479 XLogRecPtr dtbuf_lsn[XLR_MAX_BKP_BLOCKS];
480 XLogRecData dtbuf_rdt[2 * XLR_MAX_BKP_BLOCKS];
485 XLogwrtRqst LogwrtRqst;
487 bool no_tran = (rmid == RM_XLOG_ID) ? true : false;
489 if (info & XLR_INFO_MASK)
491 if ((info & XLR_INFO_MASK) != XLOG_NO_TRAN)
492 elog(PANIC, "XLogInsert: invalid info mask %02X",
493 (info & XLR_INFO_MASK));
495 info &= ~XLR_INFO_MASK;
499 * In bootstrap mode, we don't actually log anything but XLOG
500 * resources; return a phony record pointer.
502 if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
505 RecPtr.xrecoff = SizeOfXLogPHD; /* start of 1st checkpoint record */
510 * Here we scan the rdata list, determine which buffers must be backed
511 * up, and compute the CRC values for the data. Note that the record
512 * header isn't added into the CRC yet since we don't know the final
513 * length or info bits quite yet.
515 * We may have to loop back to here if a race condition is detected
516 * below. We could prevent the race by doing all this work while
517 * holding the insert lock, but it seems better to avoid doing CRC
518 * calculations while holding the lock. This means we have to be
519 * careful about modifying the rdata list until we know we aren't
520 * going to loop back again. The only change we allow ourselves to
521 * make earlier is to set rdt->data = NULL in list items we have
522 * decided we will have to back up the whole buffer for. This is OK
523 * because we will certainly decide the same thing again for those
524 * items if we do it over; doing it here saves an extra pass over the
528 for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
530 dtbuf[i] = InvalidBuffer;
531 dtbuf_bkp[i] = false;
534 INIT_CRC64(rdata_crc);
538 if (rdt->buffer == InvalidBuffer)
540 /* Simple data, just include it */
542 COMP_CRC64(rdata_crc, rdt->data, rdt->len);
546 /* Find info for buffer */
547 for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
549 if (rdt->buffer == dtbuf[i])
551 /* Buffer already referenced by earlier list item */
557 COMP_CRC64(rdata_crc, rdt->data, rdt->len);
561 if (dtbuf[i] == InvalidBuffer)
563 /* OK, put it in this slot */
564 dtbuf[i] = rdt->buffer;
567 * XXX We assume page LSN is first data on page
569 dtbuf_lsn[i] = *((XLogRecPtr *) BufferGetBlock(rdt->buffer));
570 if (XLByteLE(dtbuf_lsn[i], RedoRecPtr))
578 BufferGetBlock(dtbuf[i]),
580 dtbuf_xlg[i].node = BufferGetFileNode(dtbuf[i]);
581 dtbuf_xlg[i].block = BufferGetBlockNumber(dtbuf[i]);
583 (char *) &(dtbuf_xlg[i]) + sizeof(crc64),
584 sizeof(BkpBlock) - sizeof(crc64));
586 dtbuf_xlg[i].crc = dtcrc;
591 COMP_CRC64(rdata_crc, rdt->data, rdt->len);
596 if (i >= XLR_MAX_BKP_BLOCKS)
597 elog(PANIC, "XLogInsert: can backup %d blocks at most",
600 /* Break out of loop when rdt points to last list item */
601 if (rdt->next == NULL)
607 * NOTE: the test for len == 0 here is somewhat fishy, since in theory
608 * all of the rmgr data might have been suppressed in favor of backup
609 * blocks. Currently, all callers of XLogInsert provide at least some
610 * not-in-a-buffer data and so len == 0 should never happen, but that
611 * may not be true forever. If you need to remove the len == 0 check,
612 * also remove the check for xl_len == 0 in ReadRecord, below.
614 if (len == 0 || len > MAXLOGRECSZ)
615 elog(PANIC, "XLogInsert: invalid record length %u", len);
617 START_CRIT_SECTION();
619 /* update LogwrtResult before doing cache fill check */
621 /* use volatile pointer to prevent code rearrangement */
622 volatile XLogCtlData *xlogctl = XLogCtl;
624 SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
625 LogwrtRqst = xlogctl->LogwrtRqst;
626 LogwrtResult = xlogctl->LogwrtResult;
627 SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
631 * If cache is half filled then try to acquire write lock and do
632 * XLogWrite. Ignore any fractional blocks in performing this check.
634 LogwrtRqst.Write.xrecoff -= LogwrtRqst.Write.xrecoff % BLCKSZ;
635 if (LogwrtRqst.Write.xlogid != LogwrtResult.Write.xlogid ||
636 (LogwrtRqst.Write.xrecoff >= LogwrtResult.Write.xrecoff +
637 XLogCtl->XLogCacheByte / 2))
639 if (LWLockConditionalAcquire(WALWriteLock, LW_EXCLUSIVE))
641 LogwrtResult = XLogCtl->Write.LogwrtResult;
642 if (XLByteLT(LogwrtResult.Write, LogwrtRqst.Write))
643 XLogWrite(LogwrtRqst);
644 LWLockRelease(WALWriteLock);
648 /* Now wait to get insert lock */
649 LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
652 * Check to see if my RedoRecPtr is out of date. If so, may have to
653 * go back and recompute everything. This can only happen just after
654 * a checkpoint, so it's better to be slow in this case and fast
657 if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))
659 Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));
660 RedoRecPtr = Insert->RedoRecPtr;
662 for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
664 if (dtbuf[i] == InvalidBuffer)
666 if (dtbuf_bkp[i] == false &&
667 XLByteLE(dtbuf_lsn[i], RedoRecPtr))
670 * Oops, this buffer now needs to be backed up, but we
671 * didn't think so above. Start over.
673 LWLockRelease(WALInsertLock);
681 * Make additional rdata list entries for the backup blocks, so that
682 * we don't need to special-case them in the write loop. Note that we
683 * have now irrevocably changed the input rdata list. At the exit of
684 * this loop, write_len includes the backup block data.
686 * Also set the appropriate info bits to show which buffers were backed
687 * up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th
688 * distinct buffer value (ignoring InvalidBuffer) appearing in the
692 for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
694 if (dtbuf[i] == InvalidBuffer || !(dtbuf_bkp[i]))
697 info |= XLR_SET_BKP_BLOCK(i);
699 rdt->next = &(dtbuf_rdt[2 * i]);
701 dtbuf_rdt[2 * i].data = (char *) &(dtbuf_xlg[i]);
702 dtbuf_rdt[2 * i].len = sizeof(BkpBlock);
703 write_len += sizeof(BkpBlock);
705 rdt = dtbuf_rdt[2 * i].next = &(dtbuf_rdt[2 * i + 1]);
707 dtbuf_rdt[2 * i + 1].data = (char *) BufferGetBlock(dtbuf[i]);
708 dtbuf_rdt[2 * i + 1].len = BLCKSZ;
710 dtbuf_rdt[2 * i + 1].next = NULL;
713 /* Insert record header */
716 freespace = INSERT_FREESPACE(Insert);
717 if (freespace < SizeOfXLogRecord)
719 updrqst = AdvanceXLInsertBuffer();
720 freespace = BLCKSZ - SizeOfXLogPHD;
723 curridx = Insert->curridx;
724 record = (XLogRecord *) Insert->currpos;
726 record->xl_prev = Insert->PrevRecord;
729 record->xl_xact_prev.xlogid = 0;
730 record->xl_xact_prev.xrecoff = 0;
733 record->xl_xact_prev = MyLastRecPtr;
735 record->xl_xid = GetCurrentTransactionId();
736 record->xl_len = len; /* doesn't include backup blocks */
737 record->xl_info = info;
738 record->xl_rmid = rmid;
740 /* Now we can finish computing the main CRC */
741 COMP_CRC64(rdata_crc, (char *) record + sizeof(crc64),
742 SizeOfXLogRecord - sizeof(crc64));
743 FIN_CRC64(rdata_crc);
744 record->xl_crc = rdata_crc;
746 /* Compute record's XLOG location */
747 INSERT_RECPTR(RecPtr, Insert, curridx);
749 /* If first XLOG record of transaction, save it in PROC array */
750 if (MyLastRecPtr.xrecoff == 0 && !no_tran)
753 * We do not acquire SInvalLock here because of possible deadlock.
754 * Anyone who wants to inspect other procs' logRec must acquire
755 * WALInsertLock, instead. A better solution would be a per-PROC
756 * spinlock, but no time for that before 7.2 --- tgl 12/19/01.
758 MyProc->logRec = RecPtr;
765 sprintf(buf, "INSERT @ %X/%X: ", RecPtr.xlogid, RecPtr.xrecoff);
766 xlog_outrec(buf, record);
767 if (rdata->data != NULL)
770 RmgrTable[record->xl_rmid].rm_desc(buf, record->xl_info, rdata->data);
772 elog(LOG, "%s", buf);
775 /* Record begin of record in appropriate places */
777 MyLastRecPtr = RecPtr;
778 ProcLastRecPtr = RecPtr;
779 Insert->PrevRecord = RecPtr;
781 Insert->currpos += SizeOfXLogRecord;
782 freespace -= SizeOfXLogRecord;
785 * Append the data, including backup blocks if any
789 while (rdata->data == NULL)
794 if (rdata->len > freespace)
796 memcpy(Insert->currpos, rdata->data, freespace);
797 rdata->data += freespace;
798 rdata->len -= freespace;
799 write_len -= freespace;
803 memcpy(Insert->currpos, rdata->data, rdata->len);
804 freespace -= rdata->len;
805 write_len -= rdata->len;
806 Insert->currpos += rdata->len;
812 /* Use next buffer */
813 updrqst = AdvanceXLInsertBuffer();
814 curridx = Insert->curridx;
815 /* Insert cont-record header */
816 Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
817 contrecord = (XLogContRecord *) Insert->currpos;
818 contrecord->xl_rem_len = write_len;
819 Insert->currpos += SizeOfXLogContRecord;
820 freespace = BLCKSZ - SizeOfXLogPHD - SizeOfXLogContRecord;
823 /* Ensure next record will be properly aligned */
824 Insert->currpos = (char *) Insert->currpage +
825 MAXALIGN(Insert->currpos - (char *) Insert->currpage);
826 freespace = INSERT_FREESPACE(Insert);
829 * The recptr I return is the beginning of the *next* record. This
830 * will be stored as LSN for changed data pages...
832 INSERT_RECPTR(RecPtr, Insert, curridx);
834 /* Need to update shared LogwrtRqst if some block was filled up */
835 if (freespace < SizeOfXLogRecord)
836 updrqst = true; /* curridx is filled and available for
839 curridx = PrevBufIdx(curridx);
840 WriteRqst = XLogCtl->xlblocks[curridx];
842 LWLockRelease(WALInsertLock);
846 /* use volatile pointer to prevent code rearrangement */
847 volatile XLogCtlData *xlogctl = XLogCtl;
849 SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
850 /* advance global request to include new block(s) */
851 if (XLByteLT(xlogctl->LogwrtRqst.Write, WriteRqst))
852 xlogctl->LogwrtRqst.Write = WriteRqst;
853 /* update local result copy while I have the chance */
854 LogwrtResult = xlogctl->LogwrtResult;
855 SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
864 * Advance the Insert state to the next buffer page, writing out the next
865 * buffer if it still contains unwritten data.
867 * The global LogwrtRqst.Write pointer needs to be advanced to include the
868 * just-filled page. If we can do this for free (without an extra lock),
869 * we do so here. Otherwise the caller must do it. We return TRUE if the
870 * request update still needs to be done, FALSE if we did it internally.
872 * Must be called with WALInsertLock held.
875 AdvanceXLInsertBuffer(void)
877 XLogCtlInsert *Insert = &XLogCtl->Insert;
878 XLogCtlWrite *Write = &XLogCtl->Write;
879 uint16 nextidx = NextBufIdx(Insert->curridx);
880 bool update_needed = true;
881 XLogRecPtr OldPageRqstPtr;
882 XLogwrtRqst WriteRqst;
883 XLogRecPtr NewPageEndPtr;
884 XLogPageHeader NewPage;
886 /* Use Insert->LogwrtResult copy if it's more fresh */
887 if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write))
888 LogwrtResult = Insert->LogwrtResult;
891 * Get ending-offset of the buffer page we need to replace (this may
892 * be zero if the buffer hasn't been used yet). Fall through if it's
893 * already written out.
895 OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
896 if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
898 /* nope, got work to do... */
899 XLogRecPtr FinishedPageRqstPtr;
901 FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx];
903 /* Before waiting, get info_lck and update LogwrtResult */
905 /* use volatile pointer to prevent code rearrangement */
906 volatile XLogCtlData *xlogctl = XLogCtl;
908 SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
909 if (XLByteLT(xlogctl->LogwrtRqst.Write, FinishedPageRqstPtr))
910 xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr;
911 LogwrtResult = xlogctl->LogwrtResult;
912 SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
915 update_needed = false; /* Did the shared-request update */
917 if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
919 /* OK, someone wrote it already */
920 Insert->LogwrtResult = LogwrtResult;
924 /* Must acquire write lock */
925 LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
926 LogwrtResult = Write->LogwrtResult;
927 if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
929 /* OK, someone wrote it already */
930 LWLockRelease(WALWriteLock);
931 Insert->LogwrtResult = LogwrtResult;
936 * Have to write buffers while holding insert lock. This
937 * is not good, so only write as much as we absolutely
940 WriteRqst.Write = OldPageRqstPtr;
941 WriteRqst.Flush.xlogid = 0;
942 WriteRqst.Flush.xrecoff = 0;
943 XLogWrite(WriteRqst);
944 LWLockRelease(WALWriteLock);
945 Insert->LogwrtResult = LogwrtResult;
951 * Now the next buffer slot is free and we can set it up to be the
954 NewPageEndPtr = XLogCtl->xlblocks[Insert->curridx];
955 if (NewPageEndPtr.xrecoff >= XLogFileSize)
957 /* crossing a logid boundary */
958 NewPageEndPtr.xlogid += 1;
959 NewPageEndPtr.xrecoff = BLCKSZ;
962 NewPageEndPtr.xrecoff += BLCKSZ;
963 XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
964 NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * BLCKSZ);
965 Insert->curridx = nextidx;
966 Insert->currpage = NewPage;
967 Insert->currpos = ((char *) NewPage) + SizeOfXLogPHD;
970 * Be sure to re-zero the buffer so that bytes beyond what we've
971 * written will look like zeroes and not valid XLOG records...
973 MemSet((char *) NewPage, 0, BLCKSZ);
975 /* And fill the new page's header */
976 NewPage->xlp_magic = XLOG_PAGE_MAGIC;
977 /* NewPage->xlp_info = 0; */ /* done by memset */
978 NewPage->xlp_sui = ThisStartUpID;
979 NewPage->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid;
980 NewPage->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - BLCKSZ;
982 return update_needed;
986 * Write and/or fsync the log at least as far as WriteRqst indicates.
988 * Must be called with WALWriteLock held.
991 XLogWrite(XLogwrtRqst WriteRqst)
993 XLogCtlWrite *Write = &XLogCtl->Write;
999 * Update local LogwrtResult (caller probably did this already,
1002 LogwrtResult = Write->LogwrtResult;
1004 while (XLByteLT(LogwrtResult.Write, WriteRqst.Write))
1007 * Make sure we're not ahead of the insert process. This could
1008 * happen if we're passed a bogus WriteRqst.Write that is past the
1009 * end of the last page that's been initialized by
1010 * AdvanceXLInsertBuffer.
1012 if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[Write->curridx]))
1013 elog(PANIC, "XLogWrite: write request %X/%X is past end of log %X/%X",
1014 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
1015 XLogCtl->xlblocks[Write->curridx].xlogid,
1016 XLogCtl->xlblocks[Write->curridx].xrecoff);
1018 /* Advance LogwrtResult.Write to end of current buffer page */
1019 LogwrtResult.Write = XLogCtl->xlblocks[Write->curridx];
1020 ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);
1022 if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
1025 * Switch to new logfile segment.
1027 if (openLogFile >= 0)
1029 if (close(openLogFile) != 0)
1030 elog(PANIC, "close of log file %u, segment %u failed: %m",
1031 openLogId, openLogSeg);
1034 XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1036 /* create/use new log file */
1037 use_existent = true;
1038 openLogFile = XLogFileInit(openLogId, openLogSeg,
1039 &use_existent, true);
1042 if (!use_existent) /* there was no precreated file */
1043 elog(LOG, "XLogWrite: new log file created - "
1044 "consider increasing 'wal_files' in postgresql.conf.");
1046 /* update pg_control, unless someone else already did */
1047 LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
1048 if (ControlFile->logId < openLogId ||
1049 (ControlFile->logId == openLogId &&
1050 ControlFile->logSeg < openLogSeg + 1))
1052 ControlFile->logId = openLogId;
1053 ControlFile->logSeg = openLogSeg + 1;
1054 ControlFile->time = time(NULL);
1055 UpdateControlFile();
1058 * Signal postmaster to start a checkpoint if it's been
1059 * too long since the last one. (We look at local copy of
1060 * RedoRecPtr which might be a little out of date, but
1061 * should be close enough for this purpose.)
1063 if (IsUnderPostmaster &&
1064 (openLogId != RedoRecPtr.xlogid ||
1065 openLogSeg >= (RedoRecPtr.xrecoff / XLogSegSize) +
1066 (uint32) CheckPointSegments))
1069 elog(LOG, "XLogWrite: time for a checkpoint, signaling postmaster");
1070 SendPostmasterSignal(PMSIGNAL_DO_CHECKPOINT);
1073 LWLockRelease(ControlFileLock);
1076 if (openLogFile < 0)
1078 XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1079 openLogFile = XLogFileOpen(openLogId, openLogSeg, false);
1083 /* Need to seek in the file? */
1084 if (openLogOff != (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize)
1086 openLogOff = (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize;
1087 if (lseek(openLogFile, (off_t) openLogOff, SEEK_SET) < 0)
1088 elog(PANIC, "lseek of log file %u, segment %u, offset %u failed: %m",
1089 openLogId, openLogSeg, openLogOff);
1092 /* OK to write the page */
1093 from = XLogCtl->pages + Write->curridx * BLCKSZ;
1095 if (write(openLogFile, from, BLCKSZ) != BLCKSZ)
1097 /* if write didn't set errno, assume problem is no disk space */
1100 elog(PANIC, "write of log file %u, segment %u, offset %u failed: %m",
1101 openLogId, openLogSeg, openLogOff);
1103 openLogOff += BLCKSZ;
1106 * If we just wrote the whole last page of a logfile segment,
1107 * fsync the segment immediately. This avoids having to go back
1108 * and re-open prior segments when an fsync request comes along
1109 * later. Doing it here ensures that one and only one backend will
1110 * perform this fsync.
1112 if (openLogOff >= XLogSegSize && !ispartialpage)
1115 LogwrtResult.Flush = LogwrtResult.Write; /* end of current page */
1120 /* Only asked to write a partial page */
1121 LogwrtResult.Write = WriteRqst.Write;
1124 Write->curridx = NextBufIdx(Write->curridx);
1128 * If asked to flush, do so
1130 if (XLByteLT(LogwrtResult.Flush, WriteRqst.Flush) &&
1131 XLByteLT(LogwrtResult.Flush, LogwrtResult.Write))
1134 * Could get here without iterating above loop, in which case we
1135 * might have no open file or the wrong one. However, we do not
1136 * need to fsync more than one file.
1138 if (sync_method != SYNC_METHOD_OPEN)
1140 if (openLogFile >= 0 &&
1141 !XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
1143 if (close(openLogFile) != 0)
1144 elog(PANIC, "close of log file %u, segment %u failed: %m",
1145 openLogId, openLogSeg);
1148 if (openLogFile < 0)
1150 XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1151 openLogFile = XLogFileOpen(openLogId, openLogSeg, false);
1156 LogwrtResult.Flush = LogwrtResult.Write;
1160 * Update shared-memory status
1162 * We make sure that the shared 'request' values do not fall behind the
1163 * 'result' values. This is not absolutely essential, but it saves
1164 * some code in a couple of places.
1167 /* use volatile pointer to prevent code rearrangement */
1168 volatile XLogCtlData *xlogctl = XLogCtl;
1170 SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
1171 xlogctl->LogwrtResult = LogwrtResult;
1172 if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
1173 xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
1174 if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
1175 xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
1176 SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
1179 Write->LogwrtResult = LogwrtResult;
1183 * Ensure that all XLOG data through the given position is flushed to disk.
1185 * NOTE: this differs from XLogWrite mainly in that the WALWriteLock is not
1186 * already held, and we try to avoid acquiring it if possible.
1189 XLogFlush(XLogRecPtr record)
1191 XLogRecPtr WriteRqstPtr;
1192 XLogwrtRqst WriteRqst;
1196 elog(LOG, "XLogFlush%s%s: request %X/%X; write %X/%X; flush %X/%X\n",
1197 (IsBootstrapProcessingMode()) ? "(bootstrap)" : "",
1198 (InRedo) ? "(redo)" : "",
1199 record.xlogid, record.xrecoff,
1200 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
1201 LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
1205 /* Disabled during REDO */
1209 /* Quick exit if already known flushed */
1210 if (XLByteLE(record, LogwrtResult.Flush))
1213 START_CRIT_SECTION();
1216 * Since fsync is usually a horribly expensive operation, we try to
1217 * piggyback as much data as we can on each fsync: if we see any more
1218 * data entered into the xlog buffer, we'll write and fsync that too,
1219 * so that the final value of LogwrtResult.Flush is as large as
1220 * possible. This gives us some chance of avoiding another fsync
1221 * immediately after.
1224 /* initialize to given target; may increase below */
1225 WriteRqstPtr = record;
1227 /* read LogwrtResult and update local state */
1229 /* use volatile pointer to prevent code rearrangement */
1230 volatile XLogCtlData *xlogctl = XLogCtl;
1232 SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
1233 if (XLByteLT(WriteRqstPtr, xlogctl->LogwrtRqst.Write))
1234 WriteRqstPtr = xlogctl->LogwrtRqst.Write;
1235 LogwrtResult = xlogctl->LogwrtResult;
1236 SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
1240 if (!XLByteLE(record, LogwrtResult.Flush))
1242 /* if something was added to log cache then try to flush this too */
1243 if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE))
1245 XLogCtlInsert *Insert = &XLogCtl->Insert;
1246 uint32 freespace = INSERT_FREESPACE(Insert);
1248 if (freespace < SizeOfXLogRecord) /* buffer is full */
1249 WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1252 WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1253 WriteRqstPtr.xrecoff -= freespace;
1255 LWLockRelease(WALInsertLock);
1257 /* now wait for the write lock */
1258 LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
1259 LogwrtResult = XLogCtl->Write.LogwrtResult;
1260 if (!XLByteLE(record, LogwrtResult.Flush))
1262 WriteRqst.Write = WriteRqstPtr;
1263 WriteRqst.Flush = record;
1264 XLogWrite(WriteRqst);
1266 LWLockRelease(WALWriteLock);
1272 * If we still haven't flushed to the request point then we have a
1273 * problem; most likely, the requested flush point is past end of XLOG.
1274 * This has been seen to occur when a disk page has a corrupted LSN.
1276 * Formerly we treated this as a PANIC condition, but that hurts the
1277 * system's robustness rather than helping it: we do not want to take
1278 * down the whole system due to corruption on one data page. In
1279 * particular, if the bad page is encountered again during recovery then
1280 * we would be unable to restart the database at all! (This scenario
1281 * has actually happened in the field several times with 7.1 releases.
1282 * Note that we cannot get here while InRedo is true, but if the bad
1283 * page is brought in and marked dirty during recovery then
1284 * CreateCheckpoint will try to flush it at the end of recovery.)
1286 * The current approach is to ERROR under normal conditions, but only
1287 * NOTICE during recovery, so that the system can be brought up even if
1288 * there's a corrupt LSN. Note that for calls from xact.c, the ERROR
1289 * will be promoted to PANIC since xact.c calls this routine inside a
1290 * critical section. However, calls from bufmgr.c are not within
1291 * critical sections and so we will not force a restart for a bad LSN
1294 if (XLByteLT(LogwrtResult.Flush, record))
1295 elog(InRecovery ? NOTICE : ERROR,
1296 "XLogFlush: request %X/%X is not satisfied --- flushed only to %X/%X",
1297 record.xlogid, record.xrecoff,
1298 LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
1302 * Create a new XLOG file segment, or open a pre-existing one.
1304 * log, seg: identify segment to be created/opened.
1306 * *use_existent: if TRUE, OK to use a pre-existing file (else, any
1307 * pre-existing file will be deleted). On return, TRUE if a pre-existing
1310 * use_lock: if TRUE, acquire ControlFileLock while moving file into
1311 * place. This should be TRUE except during bootstrap log creation. The
1312 * caller must *not* hold the lock at call.
1314 * Returns FD of opened file.
1317 XLogFileInit(uint32 log, uint32 seg,
1318 bool *use_existent, bool use_lock)
1320 char path[MAXPGPATH];
1321 char tmppath[MAXPGPATH];
1322 char zbuffer[BLCKSZ];
1326 XLogFileName(path, log, seg);
1329 * Try to use existent file (checkpoint maker may have created it
1334 fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
1338 if (errno != ENOENT)
1339 elog(PANIC, "open of %s (log file %u, segment %u) failed: %m",
1347 * Initialize an empty (all zeroes) segment. NOTE: it is possible
1348 * that another process is doing the same thing. If so, we will end
1349 * up pre-creating an extra log segment. That seems OK, and better
1350 * than holding the lock throughout this lengthy process.
1352 snprintf(tmppath, MAXPGPATH, "%s/xlogtemp.%d",
1353 XLogDir, (int) getpid());
1357 /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
1358 fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
1361 elog(PANIC, "creation of file %s failed: %m", tmppath);
1364 * Zero-fill the file. We have to do this the hard way to ensure that
1365 * all the file space has really been allocated --- on platforms that
1366 * allow "holes" in files, just seeking to the end doesn't allocate
1367 * intermediate space. This way, we know that we have all the space
1368 * and (after the fsync below) that all the indirect blocks are down
1369 * on disk. Therefore, fdatasync(2) or O_DSYNC will be sufficient to
1370 * sync future writes to the log file.
1372 MemSet(zbuffer, 0, sizeof(zbuffer));
1373 for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(zbuffer))
1376 if ((int) write(fd, zbuffer, sizeof(zbuffer)) != (int) sizeof(zbuffer))
1378 int save_errno = errno;
1381 * If we fail to make the file, delete it to release disk
1385 /* if write didn't set errno, assume problem is no disk space */
1386 errno = save_errno ? save_errno : ENOSPC;
1388 elog(PANIC, "ZeroFill failed to write %s: %m", tmppath);
1392 if (pg_fsync(fd) != 0)
1393 elog(PANIC, "fsync of file %s failed: %m", tmppath);
1398 * Now move the segment into place with its final name.
1400 * If caller didn't want to use a pre-existing file, get rid of any
1401 * pre-existing file. Otherwise, cope with possibility that someone
1402 * else has created the file while we were filling ours: if so, use
1403 * ours to pre-create a future log segment.
1405 if (!InstallXLogFileSegment(log, seg, tmppath,
1406 *use_existent, XLOGfiles + XLOGfileslop,
1409 /* No need for any more future segments... */
1413 /* Set flag to tell caller there was no existent file */
1414 *use_existent = false;
1416 /* Now open original target segment (might not be file I just made) */
1417 fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
1420 elog(PANIC, "open of %s (log file %u, segment %u) failed: %m",
1427 * Install a new XLOG segment file as a current or future log segment.
1429 * This is used both to install a newly-created segment (which has a temp
1430 * filename while it's being created) and to recycle an old segment.
1432 * log, seg: identify segment to install as (or first possible target).
1434 * tmppath: initial name of file to install. It will be renamed into place.
1436 * find_free: if TRUE, install the new segment at the first empty log/seg
1437 * number at or after the passed numbers. If FALSE, install the new segment
1438 * exactly where specified, deleting any existing segment file there.
1440 * max_advance: maximum number of log/seg slots to advance past the starting
1441 * point. Fail if no free slot is found in this range. (Irrelevant if
1442 * find_free is FALSE.)
1444 * use_lock: if TRUE, acquire ControlFileLock while moving file into
1445 * place. This should be TRUE except during bootstrap log creation. The
1446 * caller must *not* hold the lock at call.
1448 * Returns TRUE if file installed, FALSE if not installed because of
1449 * exceeding max_advance limit. (Any other kind of failure causes elog().)
1452 InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath,
1453 bool find_free, int max_advance,
1456 char path[MAXPGPATH];
1459 XLogFileName(path, log, seg);
1462 * We want to be sure that only one process does this at a time.
1465 LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
1469 /* Force installation: get rid of any pre-existing segment file */
1474 /* Find a free slot to put it in */
1475 while ((fd = BasicOpenFile(path, O_RDWR | PG_BINARY,
1476 S_IRUSR | S_IWUSR)) >= 0)
1479 if (--max_advance < 0)
1481 /* Failed to find a free slot within specified range */
1483 LWLockRelease(ControlFileLock);
1486 NextLogSeg(log, seg);
1487 XLogFileName(path, log, seg);
1492 * Prefer link() to rename() here just to be really sure that we don't
1493 * overwrite an existing logfile. However, there shouldn't be one, so
1494 * rename() is an acceptable substitute except for the truly paranoid.
1497 if (link(tmppath, path) < 0)
1498 elog(PANIC, "link from %s to %s (initialization of log file %u, segment %u) failed: %m",
1499 tmppath, path, log, seg);
1502 if (rename(tmppath, path) < 0)
1503 elog(PANIC, "rename from %s to %s (initialization of log file %u, segment %u) failed: %m",
1504 tmppath, path, log, seg);
1508 LWLockRelease(ControlFileLock);
1514 * Open a pre-existing logfile segment.
1517 XLogFileOpen(uint32 log, uint32 seg, bool econt)
1519 char path[MAXPGPATH];
1522 XLogFileName(path, log, seg);
1524 fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
1528 if (econt && errno == ENOENT)
1530 elog(LOG, "open of %s (log file %u, segment %u) failed: %m",
1534 elog(PANIC, "open of %s (log file %u, segment %u) failed: %m",
1542 * Preallocate log files beyond the specified log endpoint, according to
1543 * the XLOGfile user parameter.
1546 PreallocXlogFiles(XLogRecPtr endptr)
1554 XLByteToPrevSeg(endptr, _logId, _logSeg);
1557 for (i = 1; i <= XLOGfiles; i++)
1559 NextLogSeg(_logId, _logSeg);
1560 use_existent = true;
1561 lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
1565 else if ((endptr.xrecoff - 1) % XLogSegSize >=
1566 (uint32) (0.75 * XLogSegSize))
1568 NextLogSeg(_logId, _logSeg);
1569 use_existent = true;
1570 lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
1576 * Remove or move offline all log files older or equal to passed log/seg#
1578 * endptr is current (or recent) end of xlog; this is used to determine
1579 * whether we want to recycle rather than delete no-longer-wanted log files.
1582 MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr)
1587 struct dirent *xlde;
1589 char path[MAXPGPATH];
1591 XLByteToPrevSeg(endptr, endlogId, endlogSeg);
1593 xldir = opendir(XLogDir);
1595 elog(PANIC, "could not open transaction log directory (%s): %m",
1598 sprintf(lastoff, "%08X%08X", log, seg);
1601 while ((xlde = readdir(xldir)) != NULL)
1603 if (strlen(xlde->d_name) == 16 &&
1604 strspn(xlde->d_name, "0123456789ABCDEF") == 16 &&
1605 strcmp(xlde->d_name, lastoff) <= 0)
1607 snprintf(path, MAXPGPATH, "%s/%s", XLogDir, xlde->d_name);
1608 if (XLOG_archive_dir[0])
1610 elog(LOG, "archiving transaction log file %s",
1612 elog(NOTICE, "archiving log files is not implemented!");
1617 * Before deleting the file, see if it can be recycled as
1618 * a future log segment. We allow recycling segments up
1619 * to XLOGfiles + XLOGfileslop segments beyond the current
1622 if (InstallXLogFileSegment(endlogId, endlogSeg, path,
1623 true, XLOGfiles + XLOGfileslop,
1626 elog(LOG, "recycled transaction log file %s",
1631 /* No need for any more future segments... */
1632 elog(LOG, "removing transaction log file %s",
1641 elog(PANIC, "could not read transaction log directory (%s): %m",
1647 * Restore the backup blocks present in an XLOG record, if any.
1649 * We assume all of the record has been read into memory at *record.
1652 RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
1661 blk = (char *) XLogRecGetData(record) + record->xl_len;
1662 for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
1664 if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
1667 memcpy((char *) &bkpb, blk, sizeof(BkpBlock));
1668 blk += sizeof(BkpBlock);
1670 reln = XLogOpenRelation(true, record->xl_rmid, bkpb.node);
1674 buffer = XLogReadBuffer(true, reln, bkpb.block);
1675 if (BufferIsValid(buffer))
1677 page = (Page) BufferGetPage(buffer);
1678 memcpy((char *) page, blk, BLCKSZ);
1679 PageSetLSN(page, lsn);
1680 PageSetSUI(page, ThisStartUpID);
1681 UnlockAndWriteBuffer(buffer);
1690 * CRC-check an XLOG record. We do not believe the contents of an XLOG
1691 * record (other than to the minimal extent of computing the amount of
1692 * data to read in) until we've checked the CRCs.
1694 * We assume all of the record has been read into memory at *record.
1697 RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
1702 uint32 len = record->xl_len;
1705 /* Check CRC of rmgr data and record header */
1707 COMP_CRC64(crc, XLogRecGetData(record), len);
1708 COMP_CRC64(crc, (char *) record + sizeof(crc64),
1709 SizeOfXLogRecord - sizeof(crc64));
1712 if (!EQ_CRC64(record->xl_crc, crc))
1714 elog(emode, "ReadRecord: bad resource manager data checksum in record at %X/%X",
1715 recptr.xlogid, recptr.xrecoff);
1719 /* Check CRCs of backup blocks, if any */
1720 blk = (char *) XLogRecGetData(record) + len;
1721 for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
1723 if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
1727 COMP_CRC64(crc, blk + sizeof(BkpBlock), BLCKSZ);
1728 COMP_CRC64(crc, blk + sizeof(crc64),
1729 sizeof(BkpBlock) - sizeof(crc64));
1731 memcpy((char *) &cbuf, blk, sizeof(crc64)); /* don't assume
1734 if (!EQ_CRC64(cbuf, crc))
1736 elog(emode, "ReadRecord: bad checksum of backup block %d in record at %X/%X",
1737 i + 1, recptr.xlogid, recptr.xrecoff);
1740 blk += sizeof(BkpBlock) + BLCKSZ;
1747 * Attempt to read an XLOG record.
1749 * If RecPtr is not NULL, try to read a record at that position. Otherwise
1750 * try to read a record just after the last one previously read.
1752 * If no valid record is available, returns NULL, or fails if emode is PANIC.
1753 * (emode must be either PANIC or LOG.)
1755 * buffer is a workspace at least _INTL_MAXLOGRECSZ bytes long. It is needed
1756 * to reassemble a record that crosses block boundaries. Note that on
1757 * successful return, the returned record pointer always points at buffer.
1760 ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer)
1763 XLogRecPtr tmpRecPtr = EndRecPtr;
1766 uint32 targetPageOff;
1768 bool nextmode = false;
1770 if (readBuf == NULL)
1773 * First time through, permanently allocate readBuf. We do it
1774 * this way, rather than just making a static array, for two
1775 * reasons: (1) no need to waste the storage in most
1776 * instantiations of the backend; (2) a static char array isn't
1777 * guaranteed to have any particular alignment, whereas malloc()
1778 * will provide MAXALIGN'd storage.
1780 readBuf = (char *) malloc(BLCKSZ);
1781 Assert(readBuf != NULL);
1786 RecPtr = &tmpRecPtr;
1788 /* fast case if next record is on same page */
1789 if (nextRecord != NULL)
1791 record = nextRecord;
1794 /* align old recptr to next page */
1795 if (tmpRecPtr.xrecoff % BLCKSZ != 0)
1796 tmpRecPtr.xrecoff += (BLCKSZ - tmpRecPtr.xrecoff % BLCKSZ);
1797 if (tmpRecPtr.xrecoff >= XLogFileSize)
1799 (tmpRecPtr.xlogid)++;
1800 tmpRecPtr.xrecoff = 0;
1802 tmpRecPtr.xrecoff += SizeOfXLogPHD;
1804 else if (!XRecOffIsValid(RecPtr->xrecoff))
1805 elog(PANIC, "ReadRecord: invalid record offset at %X/%X",
1806 RecPtr->xlogid, RecPtr->xrecoff);
1808 if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
1813 XLByteToSeg(*RecPtr, readId, readSeg);
1816 readFile = XLogFileOpen(readId, readSeg, (emode == LOG));
1818 goto next_record_is_invalid;
1819 readOff = (uint32) (-1); /* force read to occur below */
1822 targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / BLCKSZ) * BLCKSZ;
1823 if (readOff != targetPageOff)
1825 readOff = targetPageOff;
1826 if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
1828 elog(emode, "ReadRecord: lseek of log file %u, segment %u, offset %u failed: %m",
1829 readId, readSeg, readOff);
1830 goto next_record_is_invalid;
1832 if (read(readFile, readBuf, BLCKSZ) != BLCKSZ)
1834 elog(emode, "ReadRecord: read of log file %u, segment %u, offset %u failed: %m",
1835 readId, readSeg, readOff);
1836 goto next_record_is_invalid;
1838 if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode, nextmode))
1839 goto next_record_is_invalid;
1841 if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
1842 RecPtr->xrecoff % BLCKSZ == SizeOfXLogPHD)
1844 elog(emode, "ReadRecord: contrecord is requested by %X/%X",
1845 RecPtr->xlogid, RecPtr->xrecoff);
1846 goto next_record_is_invalid;
1848 record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % BLCKSZ);
1853 * Currently, xl_len == 0 must be bad data, but that might not be true
1854 * forever. See note in XLogInsert.
1856 if (record->xl_len == 0)
1858 elog(emode, "ReadRecord: record with zero length at %X/%X",
1859 RecPtr->xlogid, RecPtr->xrecoff);
1860 goto next_record_is_invalid;
1864 * Compute total length of record including any appended backup
1867 total_len = SizeOfXLogRecord + record->xl_len;
1868 for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
1870 if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
1872 total_len += sizeof(BkpBlock) + BLCKSZ;
1876 * Make sure it will fit in buffer (currently, it is mechanically
1877 * impossible for this test to fail, but it seems like a good idea
1880 if (total_len > _INTL_MAXLOGRECSZ)
1882 elog(emode, "ReadRecord: record length %u at %X/%X too long",
1883 total_len, RecPtr->xlogid, RecPtr->xrecoff);
1884 goto next_record_is_invalid;
1886 if (record->xl_rmid > RM_MAX_ID)
1888 elog(emode, "ReadRecord: invalid resource manager id %u at %X/%X",
1889 record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff);
1890 goto next_record_is_invalid;
1893 len = BLCKSZ - RecPtr->xrecoff % BLCKSZ;
1894 if (total_len > len)
1896 /* Need to reassemble record */
1897 XLogContRecord *contrecord;
1898 uint32 gotlen = len;
1900 memcpy(buffer, record, len);
1901 record = (XLogRecord *) buffer;
1906 if (readOff >= XLogSegSize)
1910 NextLogSeg(readId, readSeg);
1911 readFile = XLogFileOpen(readId, readSeg, (emode == LOG));
1913 goto next_record_is_invalid;
1916 if (read(readFile, readBuf, BLCKSZ) != BLCKSZ)
1918 elog(emode, "ReadRecord: read of log file %u, segment %u, offset %u failed: %m",
1919 readId, readSeg, readOff);
1920 goto next_record_is_invalid;
1922 if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode, true))
1923 goto next_record_is_invalid;
1924 if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD))
1926 elog(emode, "ReadRecord: there is no ContRecord flag in log file %u, segment %u, offset %u",
1927 readId, readSeg, readOff);
1928 goto next_record_is_invalid;
1930 contrecord = (XLogContRecord *) ((char *) readBuf + SizeOfXLogPHD);
1931 if (contrecord->xl_rem_len == 0 ||
1932 total_len != (contrecord->xl_rem_len + gotlen))
1934 elog(emode, "ReadRecord: invalid ContRecord length %u in log file %u, segment %u, offset %u",
1935 contrecord->xl_rem_len, readId, readSeg, readOff);
1936 goto next_record_is_invalid;
1938 len = BLCKSZ - SizeOfXLogPHD - SizeOfXLogContRecord;
1939 if (contrecord->xl_rem_len > len)
1941 memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord, len);
1946 memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord,
1947 contrecord->xl_rem_len);
1950 if (!RecordIsValid(record, *RecPtr, emode))
1951 goto next_record_is_invalid;
1952 if (BLCKSZ - SizeOfXLogRecord >= SizeOfXLogPHD +
1953 SizeOfXLogContRecord + MAXALIGN(contrecord->xl_rem_len))
1955 nextRecord = (XLogRecord *) ((char *) contrecord +
1956 SizeOfXLogContRecord + MAXALIGN(contrecord->xl_rem_len));
1958 EndRecPtr.xlogid = readId;
1959 EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff +
1960 SizeOfXLogPHD + SizeOfXLogContRecord +
1961 MAXALIGN(contrecord->xl_rem_len);
1962 ReadRecPtr = *RecPtr;
1966 /* Record does not cross a page boundary */
1967 if (!RecordIsValid(record, *RecPtr, emode))
1968 goto next_record_is_invalid;
1969 if (BLCKSZ - SizeOfXLogRecord >= RecPtr->xrecoff % BLCKSZ +
1970 MAXALIGN(total_len))
1971 nextRecord = (XLogRecord *) ((char *) record + MAXALIGN(total_len));
1972 EndRecPtr.xlogid = RecPtr->xlogid;
1973 EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);
1974 ReadRecPtr = *RecPtr;
1975 memcpy(buffer, record, total_len);
1976 return (XLogRecord *) buffer;
1978 next_record_is_invalid:;
1986 * Check whether the xlog header of a page just read in looks valid.
1988 * This is just a convenience subroutine to avoid duplicated code in
1989 * ReadRecord. It's not intended for use from anywhere else.
1992 ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI)
1996 if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
1998 elog(emode, "ReadRecord: invalid magic number %04X in log file %u, segment %u, offset %u",
1999 hdr->xlp_magic, readId, readSeg, readOff);
2002 if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
2004 elog(emode, "ReadRecord: invalid info bits %04X in log file %u, segment %u, offset %u",
2005 hdr->xlp_info, readId, readSeg, readOff);
2008 recaddr.xlogid = readId;
2009 recaddr.xrecoff = readSeg * XLogSegSize + readOff;
2010 if (!XLByteEQ(hdr->xlp_pageaddr, recaddr))
2012 elog(emode, "ReadRecord: unexpected pageaddr %X/%X in log file %u, segment %u, offset %u",
2013 hdr->xlp_pageaddr.xlogid, hdr->xlp_pageaddr.xrecoff,
2014 readId, readSeg, readOff);
2019 * We disbelieve a SUI less than the previous page's SUI, or more than
2020 * a few counts greater. In theory as many as 512 shutdown checkpoint
2021 * records could appear on a 32K-sized xlog page, so that's the most
2022 * differential there could legitimately be.
2024 * Note this check can only be applied when we are reading the next page
2025 * in sequence, so ReadRecord passes a flag indicating whether to
2030 if (hdr->xlp_sui < lastReadSUI ||
2031 hdr->xlp_sui > lastReadSUI + 512)
2033 /* translator: SUI = startup id */
2034 elog(emode, "ReadRecord: out-of-sequence SUI %u (after %u) in log file %u, segment %u, offset %u",
2035 hdr->xlp_sui, lastReadSUI, readId, readSeg, readOff);
2039 lastReadSUI = hdr->xlp_sui;
2044 * I/O routines for pg_control
2046 * *ControlFile is a buffer in shared memory that holds an image of the
2047 * contents of pg_control. WriteControlFile() initializes pg_control
2048 * given a preloaded buffer, ReadControlFile() loads the buffer from
2049 * the pg_control file (during postmaster or standalone-backend startup),
2050 * and UpdateControlFile() rewrites pg_control after we modify xlog state.
2052 * For simplicity, WriteControlFile() initializes the fields of pg_control
2053 * that are related to checking backend/database compatibility, and
2054 * ReadControlFile() verifies they are correct. We could split out the
2055 * I/O and compatibility-check functions, but there seems no need currently.
2061 /* Init XLOG file paths */
2062 snprintf(XLogDir, MAXPGPATH, "%s/pg_xlog", DataDir);
2063 snprintf(ControlFilePath, MAXPGPATH, "%s/global/pg_control", DataDir);
2067 WriteControlFile(void)
2070 char buffer[BLCKSZ]; /* need not be aligned */
2077 * Initialize version and compatibility-check fields
2079 ControlFile->pg_control_version = PG_CONTROL_VERSION;
2080 ControlFile->catalog_version_no = CATALOG_VERSION_NO;
2081 ControlFile->blcksz = BLCKSZ;
2082 ControlFile->relseg_size = RELSEG_SIZE;
2084 localeptr = setlocale(LC_COLLATE, NULL);
2086 elog(PANIC, "invalid LC_COLLATE setting");
2087 StrNCpy(ControlFile->lc_collate, localeptr, LOCALE_NAME_BUFLEN);
2088 localeptr = setlocale(LC_CTYPE, NULL);
2090 elog(PANIC, "invalid LC_CTYPE setting");
2091 StrNCpy(ControlFile->lc_ctype, localeptr, LOCALE_NAME_BUFLEN);
2094 * Issue warning notice if initdb'ing in a locale that will not permit
2095 * LIKE index optimization. This is not a clean place to do it, but I
2096 * don't see a better place either...
2098 if (!locale_is_like_safe())
2099 elog(NOTICE, "Initializing database with %s collation order."
2100 "\n\tThis locale setting will prevent use of index optimization for"
2101 "\n\tLIKE and regexp searches. If you are concerned about speed of"
2102 "\n\tsuch queries, you may wish to set LC_COLLATE to \"C\" and"
2103 "\n\tre-initdb. For more information see the Administrator's Guide.",
2104 ControlFile->lc_collate);
2105 #else /* not USE_LOCALE */
2106 strcpy(ControlFile->lc_collate, "C");
2107 strcpy(ControlFile->lc_ctype, "C");
2108 #endif /* not USE_LOCALE */
2110 /* Contents are protected with a CRC */
2111 INIT_CRC64(ControlFile->crc);
2112 COMP_CRC64(ControlFile->crc,
2113 (char *) ControlFile + sizeof(crc64),
2114 sizeof(ControlFileData) - sizeof(crc64));
2115 FIN_CRC64(ControlFile->crc);
2118 * We write out BLCKSZ bytes into pg_control, zero-padding the excess
2119 * over sizeof(ControlFileData). This reduces the odds of
2120 * premature-EOF errors when reading pg_control. We'll still fail
2121 * when we check the contents of the file, but hopefully with a more
2122 * specific error than "couldn't read pg_control".
2124 if (sizeof(ControlFileData) > BLCKSZ)
2125 elog(PANIC, "sizeof(ControlFileData) is larger than BLCKSZ; fix either one");
2127 memset(buffer, 0, BLCKSZ);
2128 memcpy(buffer, ControlFile, sizeof(ControlFileData));
2130 fd = BasicOpenFile(ControlFilePath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
2133 elog(PANIC, "WriteControlFile: could not create control file (%s): %m",
2137 if (write(fd, buffer, BLCKSZ) != BLCKSZ)
2139 /* if write didn't set errno, assume problem is no disk space */
2142 elog(PANIC, "WriteControlFile: write to control file failed: %m");
2145 if (pg_fsync(fd) != 0)
2146 elog(PANIC, "WriteControlFile: fsync of control file failed: %m");
2152 ReadControlFile(void)
2160 fd = BasicOpenFile(ControlFilePath, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
2162 elog(PANIC, "could not open control file (%s): %m", ControlFilePath);
2164 if (read(fd, ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
2165 elog(PANIC, "read from control file failed: %m");
2170 * Check for expected pg_control format version. If this is wrong,
2171 * the CRC check will likely fail because we'll be checking the wrong
2172 * number of bytes. Complaining about wrong version will probably be
2173 * more enlightening than complaining about wrong CRC.
2175 if (ControlFile->pg_control_version != PG_CONTROL_VERSION)
2177 "The database cluster was initialized with PG_CONTROL_VERSION %d,\n"
2178 "\tbut the server was compiled with PG_CONTROL_VERSION %d.\n"
2179 "\tIt looks like you need to initdb.",
2180 ControlFile->pg_control_version, PG_CONTROL_VERSION);
2182 /* Now check the CRC. */
2185 (char *) ControlFile + sizeof(crc64),
2186 sizeof(ControlFileData) - sizeof(crc64));
2189 if (!EQ_CRC64(crc, ControlFile->crc))
2190 elog(PANIC, "invalid checksum in control file");
2193 * Do compatibility checking immediately. We do this here for 2
2196 * (1) if the database isn't compatible with the backend executable, we
2197 * want to abort before we can possibly do any damage;
2199 * (2) this code is executed in the postmaster, so the setlocale() will
2200 * propagate to forked backends, which aren't going to read this file
2201 * for themselves. (These locale settings are considered critical
2202 * compatibility items because they can affect sort order of indexes.)
2204 if (ControlFile->catalog_version_no != CATALOG_VERSION_NO)
2206 "The database cluster was initialized with CATALOG_VERSION_NO %d,\n"
2207 "\tbut the backend was compiled with CATALOG_VERSION_NO %d.\n"
2208 "\tIt looks like you need to initdb.",
2209 ControlFile->catalog_version_no, CATALOG_VERSION_NO);
2210 if (ControlFile->blcksz != BLCKSZ)
2212 "The database cluster was initialized with BLCKSZ %d,\n"
2213 "\tbut the backend was compiled with BLCKSZ %d.\n"
2214 "\tIt looks like you need to initdb.",
2215 ControlFile->blcksz, BLCKSZ);
2216 if (ControlFile->relseg_size != RELSEG_SIZE)
2218 "The database cluster was initialized with RELSEG_SIZE %d,\n"
2219 "\tbut the backend was compiled with RELSEG_SIZE %d.\n"
2220 "\tIt looks like you need to initdb.",
2221 ControlFile->relseg_size, RELSEG_SIZE);
2223 if (setlocale(LC_COLLATE, ControlFile->lc_collate) == NULL)
2225 "The database cluster was initialized with LC_COLLATE '%s',\n"
2226 "\twhich is not recognized by setlocale().\n"
2227 "\tIt looks like you need to initdb.",
2228 ControlFile->lc_collate);
2229 if (setlocale(LC_CTYPE, ControlFile->lc_ctype) == NULL)
2231 "The database cluster was initialized with LC_CTYPE '%s',\n"
2232 "\twhich is not recognized by setlocale().\n"
2233 "\tIt looks like you need to initdb.",
2234 ControlFile->lc_ctype);
2235 #else /* not USE_LOCALE */
2236 if (strcmp(ControlFile->lc_collate, "C") != 0 ||
2237 strcmp(ControlFile->lc_ctype, "C") != 0)
2239 "The database cluster was initialized with LC_COLLATE '%s' and\n"
2240 "\tLC_CTYPE '%s', but the server was compiled without locale support.\n"
2241 "\tIt looks like you need to initdb or recompile.",
2242 ControlFile->lc_collate, ControlFile->lc_ctype);
2243 #endif /* not USE_LOCALE */
2247 UpdateControlFile(void)
2251 INIT_CRC64(ControlFile->crc);
2252 COMP_CRC64(ControlFile->crc,
2253 (char *) ControlFile + sizeof(crc64),
2254 sizeof(ControlFileData) - sizeof(crc64));
2255 FIN_CRC64(ControlFile->crc);
2257 fd = BasicOpenFile(ControlFilePath, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
2259 elog(PANIC, "could not open control file (%s): %m", ControlFilePath);
2262 if (write(fd, ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
2264 /* if write didn't set errno, assume problem is no disk space */
2267 elog(PANIC, "write to control file failed: %m");
2270 if (pg_fsync(fd) != 0)
2271 elog(PANIC, "fsync of control file failed: %m");
2277 * Initialization of shared memory for XLOG
2283 if (XLOGbuffers < MinXLOGbuffers)
2284 XLOGbuffers = MinXLOGbuffers;
2286 return MAXALIGN(sizeof(XLogCtlData) + sizeof(XLogRecPtr) * XLOGbuffers)
2287 + BLCKSZ * XLOGbuffers +
2288 MAXALIGN(sizeof(ControlFileData));
2296 /* this must agree with space requested by XLOGShmemSize() */
2297 if (XLOGbuffers < MinXLOGbuffers)
2298 XLOGbuffers = MinXLOGbuffers;
2300 XLogCtl = (XLogCtlData *)
2301 ShmemInitStruct("XLOG Ctl",
2302 MAXALIGN(sizeof(XLogCtlData) +
2303 sizeof(XLogRecPtr) * XLOGbuffers)
2304 + BLCKSZ * XLOGbuffers,
2307 ControlFile = (ControlFileData *)
2308 ShmemInitStruct("Control File", sizeof(ControlFileData), &found);
2311 memset(XLogCtl, 0, sizeof(XLogCtlData));
2314 * Since XLogCtlData contains XLogRecPtr fields, its sizeof should be
2315 * a multiple of the alignment for same, so no extra alignment padding
2318 XLogCtl->xlblocks = (XLogRecPtr *)
2319 (((char *) XLogCtl) + sizeof(XLogCtlData));
2320 memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
2323 * Here, on the other hand, we must MAXALIGN to ensure the page
2324 * buffers have worst-case alignment.
2327 ((char *) XLogCtl) + MAXALIGN(sizeof(XLogCtlData) +
2328 sizeof(XLogRecPtr) * XLOGbuffers);
2329 memset(XLogCtl->pages, 0, BLCKSZ * XLOGbuffers);
2332 * Do basic initialization of XLogCtl shared data. (StartupXLOG will
2333 * fill in additional info.)
2335 XLogCtl->XLogCacheByte = BLCKSZ * XLOGbuffers;
2336 XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
2337 XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
2338 SpinLockInit(&XLogCtl->info_lck);
2341 * If we are not in bootstrap mode, pg_control should already exist.
2342 * Read and validate it immediately (see comments in ReadControlFile()
2343 * for the reasons why).
2345 if (!IsBootstrapProcessingMode())
2350 * This func must be called ONCE on system install. It creates pg_control
2351 * and the initial XLOG segment.
2356 CheckPoint checkPoint;
2358 XLogPageHeader page;
2363 /* Use malloc() to ensure buffer is MAXALIGNED */
2364 buffer = (char *) malloc(BLCKSZ);
2365 page = (XLogPageHeader) buffer;
2367 checkPoint.redo.xlogid = 0;
2368 checkPoint.redo.xrecoff = SizeOfXLogPHD;
2369 checkPoint.undo = checkPoint.redo;
2370 checkPoint.ThisStartUpID = 0;
2371 checkPoint.nextXid = FirstNormalTransactionId;
2372 checkPoint.nextOid = BootstrapObjectIdData;
2373 checkPoint.time = time(NULL);
2375 ShmemVariableCache->nextXid = checkPoint.nextXid;
2376 ShmemVariableCache->nextOid = checkPoint.nextOid;
2377 ShmemVariableCache->oidCount = 0;
2379 memset(buffer, 0, BLCKSZ);
2380 page->xlp_magic = XLOG_PAGE_MAGIC;
2382 page->xlp_sui = checkPoint.ThisStartUpID;
2383 page->xlp_pageaddr.xlogid = 0;
2384 page->xlp_pageaddr.xrecoff = 0;
2385 record = (XLogRecord *) ((char *) page + SizeOfXLogPHD);
2386 record->xl_prev.xlogid = 0;
2387 record->xl_prev.xrecoff = 0;
2388 record->xl_xact_prev = record->xl_prev;
2389 record->xl_xid = InvalidTransactionId;
2390 record->xl_len = sizeof(checkPoint);
2391 record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
2392 record->xl_rmid = RM_XLOG_ID;
2393 memcpy(XLogRecGetData(record), &checkPoint, sizeof(checkPoint));
2396 COMP_CRC64(crc, &checkPoint, sizeof(checkPoint));
2397 COMP_CRC64(crc, (char *) record + sizeof(crc64),
2398 SizeOfXLogRecord - sizeof(crc64));
2400 record->xl_crc = crc;
2402 use_existent = false;
2403 openLogFile = XLogFileInit(0, 0, &use_existent, false);
2406 if (write(openLogFile, buffer, BLCKSZ) != BLCKSZ)
2408 /* if write didn't set errno, assume problem is no disk space */
2411 elog(PANIC, "BootStrapXLOG failed to write log file: %m");
2414 if (pg_fsync(openLogFile) != 0)
2415 elog(PANIC, "BootStrapXLOG failed to fsync log file: %m");
2420 memset(ControlFile, 0, sizeof(ControlFileData));
2421 /* Initialize pg_control status fields */
2422 ControlFile->state = DB_SHUTDOWNED;
2423 ControlFile->time = checkPoint.time;
2424 ControlFile->logId = 0;
2425 ControlFile->logSeg = 1;
2426 ControlFile->checkPoint = checkPoint.redo;
2427 ControlFile->checkPointCopy = checkPoint;
2428 /* some additional ControlFile fields are set in WriteControlFile() */
2432 /* Bootstrap the commit log, too */
2437 str_time(time_t tnow)
2439 static char buf[32];
2441 strftime(buf, sizeof(buf),
2442 "%Y-%m-%d %H:%M:%S %Z",
2449 * This must be called ONCE during postmaster or standalone-backend startup
2454 XLogCtlInsert *Insert;
2455 CheckPoint checkPoint;
2464 /* Use malloc() to ensure record buffer is MAXALIGNED */
2465 buffer = (char *) malloc(_INTL_MAXLOGRECSZ);
2470 * Read control file and check XLOG status looks valid.
2472 * Note: in most control paths, *ControlFile is already valid and we need
2473 * not do ReadControlFile() here, but might as well do it to be sure.
2477 if (ControlFile->logSeg == 0 ||
2478 ControlFile->state < DB_SHUTDOWNED ||
2479 ControlFile->state > DB_IN_PRODUCTION ||
2480 !XRecOffIsValid(ControlFile->checkPoint.xrecoff))
2481 elog(PANIC, "control file context is broken");
2483 if (ControlFile->state == DB_SHUTDOWNED)
2484 elog(LOG, "database system was shut down at %s",
2485 str_time(ControlFile->time));
2486 else if (ControlFile->state == DB_SHUTDOWNING)
2487 elog(LOG, "database system shutdown was interrupted at %s",
2488 str_time(ControlFile->time));
2489 else if (ControlFile->state == DB_IN_RECOVERY)
2490 elog(LOG, "database system was interrupted being in recovery at %s\n"
2491 "\tThis probably means that some data blocks are corrupted\n"
2492 "\tand you will have to use the last backup for recovery.",
2493 str_time(ControlFile->time));
2494 else if (ControlFile->state == DB_IN_PRODUCTION)
2495 elog(LOG, "database system was interrupted at %s",
2496 str_time(ControlFile->time));
2499 * Get the last valid checkpoint record. If the latest one according
2500 * to pg_control is broken, try the next-to-last one.
2502 record = ReadCheckpointRecord(ControlFile->checkPoint, 1, buffer);
2505 checkPointLoc = ControlFile->checkPoint;
2506 elog(LOG, "checkpoint record is at %X/%X",
2507 checkPointLoc.xlogid, checkPointLoc.xrecoff);
2511 record = ReadCheckpointRecord(ControlFile->prevCheckPoint, 2, buffer);
2514 checkPointLoc = ControlFile->prevCheckPoint;
2515 elog(LOG, "using previous checkpoint record at %X/%X",
2516 checkPointLoc.xlogid, checkPointLoc.xrecoff);
2517 InRecovery = true; /* force recovery even if SHUTDOWNED */
2520 elog(PANIC, "unable to locate a valid checkpoint record");
2522 LastRec = RecPtr = checkPointLoc;
2523 memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
2524 wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
2526 elog(LOG, "redo record is at %X/%X; undo record is at %X/%X; shutdown %s",
2527 checkPoint.redo.xlogid, checkPoint.redo.xrecoff,
2528 checkPoint.undo.xlogid, checkPoint.undo.xrecoff,
2529 wasShutdown ? "TRUE" : "FALSE");
2530 elog(LOG, "next transaction id: %u; next oid: %u",
2531 checkPoint.nextXid, checkPoint.nextOid);
2532 if (!TransactionIdIsNormal(checkPoint.nextXid))
2533 elog(PANIC, "invalid next transaction id");
2535 ShmemVariableCache->nextXid = checkPoint.nextXid;
2536 ShmemVariableCache->nextOid = checkPoint.nextOid;
2537 ShmemVariableCache->oidCount = 0;
2539 ThisStartUpID = checkPoint.ThisStartUpID;
2540 RedoRecPtr = XLogCtl->Insert.RedoRecPtr =
2541 XLogCtl->RedoRecPtr = checkPoint.redo;
2543 if (XLByteLT(RecPtr, checkPoint.redo))
2544 elog(PANIC, "invalid redo in checkpoint record");
2545 if (checkPoint.undo.xrecoff == 0)
2546 checkPoint.undo = RecPtr;
2548 if (XLByteLT(checkPoint.undo, RecPtr) ||
2549 XLByteLT(checkPoint.redo, RecPtr))
2552 elog(PANIC, "invalid redo/undo record in shutdown checkpoint");
2555 else if (ControlFile->state != DB_SHUTDOWNED)
2561 elog(LOG, "database system was not properly shut down; "
2562 "automatic recovery in progress");
2563 ControlFile->state = DB_IN_RECOVERY;
2564 ControlFile->time = time(NULL);
2565 UpdateControlFile();
2567 XLogInitRelationCache();
2569 /* Is REDO required ? */
2570 if (XLByteLT(checkPoint.redo, RecPtr))
2571 record = ReadRecord(&(checkPoint.redo), PANIC, buffer);
2574 /* read past CheckPoint record */
2575 record = ReadRecord(NULL, LOG, buffer);
2581 elog(LOG, "redo starts at %X/%X",
2582 ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
2585 /* nextXid must be beyond record's xid */
2586 if (TransactionIdFollowsOrEquals(record->xl_xid,
2587 ShmemVariableCache->nextXid))
2589 ShmemVariableCache->nextXid = record->xl_xid;
2590 TransactionIdAdvance(ShmemVariableCache->nextXid);
2596 sprintf(buf, "REDO @ %X/%X; LSN %X/%X: ",
2597 ReadRecPtr.xlogid, ReadRecPtr.xrecoff,
2598 EndRecPtr.xlogid, EndRecPtr.xrecoff);
2599 xlog_outrec(buf, record);
2601 RmgrTable[record->xl_rmid].rm_desc(buf,
2602 record->xl_info, XLogRecGetData(record));
2603 elog(LOG, "%s", buf);
2606 if (record->xl_info & XLR_BKP_BLOCK_MASK)
2607 RestoreBkpBlocks(record, EndRecPtr);
2609 RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
2610 record = ReadRecord(NULL, LOG, buffer);
2611 } while (record != NULL);
2612 elog(LOG, "redo done at %X/%X",
2613 ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
2614 LastRec = ReadRecPtr;
2618 elog(LOG, "redo is not required");
2622 * Init xlog buffer cache using the block containing the last valid
2623 * record from the previous incarnation.
2625 record = ReadRecord(&LastRec, PANIC, buffer);
2626 EndOfLog = EndRecPtr;
2627 XLByteToPrevSeg(EndOfLog, openLogId, openLogSeg);
2628 openLogFile = XLogFileOpen(openLogId, openLogSeg, false);
2630 ControlFile->logId = openLogId;
2631 ControlFile->logSeg = openLogSeg + 1;
2632 Insert = &XLogCtl->Insert;
2633 Insert->PrevRecord = LastRec;
2636 * If the next record will go to the new page then initialize for that
2639 if ((BLCKSZ - EndOfLog.xrecoff % BLCKSZ) < SizeOfXLogRecord)
2640 EndOfLog.xrecoff += (BLCKSZ - EndOfLog.xrecoff % BLCKSZ);
2641 if (EndOfLog.xrecoff % BLCKSZ == 0)
2643 XLogRecPtr NewPageEndPtr;
2645 NewPageEndPtr = EndOfLog;
2646 if (NewPageEndPtr.xrecoff >= XLogFileSize)
2648 /* crossing a logid boundary */
2649 NewPageEndPtr.xlogid += 1;
2650 NewPageEndPtr.xrecoff = BLCKSZ;
2653 NewPageEndPtr.xrecoff += BLCKSZ;
2654 XLogCtl->xlblocks[0] = NewPageEndPtr;
2655 Insert->currpage->xlp_magic = XLOG_PAGE_MAGIC;
2657 Insert->currpage->xlp_sui = ThisStartUpID;
2659 Insert->currpage->xlp_sui = ThisStartUpID + 1;
2660 Insert->currpage->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid;
2661 Insert->currpage->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - BLCKSZ;
2662 /* rest of buffer was zeroed in XLOGShmemInit */
2663 Insert->currpos = (char *) Insert->currpage + SizeOfXLogPHD;
2667 XLogCtl->xlblocks[0].xlogid = openLogId;
2668 XLogCtl->xlblocks[0].xrecoff =
2669 ((EndOfLog.xrecoff - 1) / BLCKSZ + 1) * BLCKSZ;
2672 * Tricky point here: readBuf contains the *last* block that the
2673 * LastRec record spans, not the one it starts in. The last block
2674 * is indeed the one we want to use.
2676 Assert(readOff == (XLogCtl->xlblocks[0].xrecoff - BLCKSZ) % XLogSegSize);
2677 memcpy((char *) Insert->currpage, readBuf, BLCKSZ);
2678 Insert->currpos = (char *) Insert->currpage +
2679 (EndOfLog.xrecoff + BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
2680 /* Make sure rest of page is zero */
2681 memset(Insert->currpos, 0, INSERT_FREESPACE(Insert));
2684 LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
2686 XLogCtl->Write.LogwrtResult = LogwrtResult;
2687 Insert->LogwrtResult = LogwrtResult;
2688 XLogCtl->LogwrtResult = LogwrtResult;
2690 XLogCtl->LogwrtRqst.Write = EndOfLog;
2691 XLogCtl->LogwrtRqst.Flush = EndOfLog;
2697 RecPtr = ReadRecPtr;
2698 if (XLByteLT(checkPoint.undo, RecPtr))
2700 elog(LOG, "undo starts at %X/%X",
2701 RecPtr.xlogid, RecPtr.xrecoff);
2704 record = ReadRecord(&RecPtr, PANIC, buffer);
2705 if (TransactionIdIsValid(record->xl_xid) &&
2706 !TransactionIdDidCommit(record->xl_xid))
2707 RmgrTable[record->xl_rmid].rm_undo(EndRecPtr, record);
2708 RecPtr = record->xl_prev;
2709 } while (XLByteLE(checkPoint.undo, RecPtr));
2710 elog(LOG, "undo done at %X/%X",
2711 ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
2714 elog(LOG, "undo is not required");
2721 * In case we had to use the secondary checkpoint, make sure that
2722 * it will still be shown as the secondary checkpoint after this
2723 * CreateCheckPoint operation; we don't want the broken primary
2724 * checkpoint to become prevCheckPoint...
2726 ControlFile->checkPoint = checkPointLoc;
2727 CreateCheckPoint(true);
2728 XLogCloseRelationCache();
2732 * Preallocate additional log files, if wanted.
2734 PreallocXlogFiles(EndOfLog);
2738 ControlFile->state = DB_IN_PRODUCTION;
2739 ControlFile->time = time(NULL);
2740 UpdateControlFile();
2743 XLogCtl->ThisStartUpID = ThisStartUpID;
2745 /* Start up the commit log, too */
2748 elog(LOG, "database system is ready");
2751 /* Shut down readFile facility, free space */
2767 * Subroutine to try to fetch and validate a prior checkpoint record.
2768 * whichChkpt = 1 for "primary", 2 for "secondary", merely informative
2771 ReadCheckpointRecord(XLogRecPtr RecPtr,
2777 if (!XRecOffIsValid(RecPtr.xrecoff))
2779 elog(LOG, (whichChkpt == 1 ?
2780 "invalid primary checkpoint link in control file" :
2781 "invalid secondary checkpoint link in control file"));
2785 record = ReadRecord(&RecPtr, LOG, buffer);
2789 elog(LOG, (whichChkpt == 1 ?
2790 "invalid primary checkpoint record" :
2791 "invalid secondary checkpoint record"));
2794 if (record->xl_rmid != RM_XLOG_ID)
2796 elog(LOG, (whichChkpt == 1 ?
2797 "invalid resource manager id in primary checkpoint record" :
2798 "invalid resource manager id in secondary checkpoint record"));
2801 if (record->xl_info != XLOG_CHECKPOINT_SHUTDOWN &&
2802 record->xl_info != XLOG_CHECKPOINT_ONLINE)
2804 elog(LOG, (whichChkpt == 1 ?
2805 "invalid xl_info in primary checkpoint record" :
2806 "invalid xl_info in secondary checkpoint record"));
2809 if (record->xl_len != sizeof(CheckPoint))
2811 elog(LOG, (whichChkpt == 1 ?
2812 "invalid length of primary checkpoint record" :
2813 "invalid length of secondary checkpoint record"));
2820 * Postmaster uses this to initialize ThisStartUpID & RedoRecPtr from
2821 * XLogCtlData located in shmem after successful startup.
2824 SetThisStartUpID(void)
2826 ThisStartUpID = XLogCtl->ThisStartUpID;
2827 RedoRecPtr = XLogCtl->RedoRecPtr;
2831 * CheckPoint process called by postmaster saves copy of new RedoRecPtr
2832 * in shmem (using SetRedoRecPtr). When checkpointer completes, postmaster
2833 * calls GetRedoRecPtr to update its own copy of RedoRecPtr, so that
2834 * subsequently-spawned backends will start out with a reasonably up-to-date
2835 * local RedoRecPtr. Since these operations are not protected by any lock
2836 * and copying an XLogRecPtr isn't atomic, it's unsafe to use either of these
2837 * routines at other times!
2839 * Note: once spawned, a backend must update its local RedoRecPtr from
2840 * XLogCtl->Insert.RedoRecPtr while holding the insert lock. This is
2841 * done in XLogInsert().
2846 XLogCtl->RedoRecPtr = RedoRecPtr;
2852 RedoRecPtr = XLogCtl->RedoRecPtr;
2856 * This must be called ONCE during postmaster or standalone-backend shutdown
2861 elog(LOG, "shutting down");
2863 /* suppress in-transaction check in CreateCheckPoint */
2864 MyLastRecPtr.xrecoff = 0;
2867 CreateDummyCaches();
2868 CreateCheckPoint(true);
2872 elog(LOG, "database system is shut down");
2876 * Perform a checkpoint --- either during shutdown, or on-the-fly
2879 CreateCheckPoint(bool shutdown)
2881 CheckPoint checkPoint;
2883 XLogCtlInsert *Insert = &XLogCtl->Insert;
2889 if (MyLastRecPtr.xrecoff != 0)
2890 elog(ERROR, "CreateCheckPoint: cannot be called inside transaction block");
2893 * The CheckpointLock can be held for quite a while, which is not good
2894 * because we won't respond to a cancel/die request while waiting for
2895 * an LWLock. (But the alternative of using a regular lock won't work
2896 * for background checkpoint processes, which are not regular
2897 * backends.) So, rather than use a plain LWLockAcquire, use this
2898 * kluge to allow an interrupt to be accepted while we are waiting:
2900 while (!LWLockConditionalAcquire(CheckpointLock, LW_EXCLUSIVE))
2902 CHECK_FOR_INTERRUPTS();
2906 START_CRIT_SECTION();
2910 ControlFile->state = DB_SHUTDOWNING;
2911 ControlFile->time = time(NULL);
2912 UpdateControlFile();
2915 memset(&checkPoint, 0, sizeof(checkPoint));
2916 checkPoint.ThisStartUpID = ThisStartUpID;
2917 checkPoint.time = time(NULL);
2919 LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
2922 * If this isn't a shutdown, and we have not inserted any XLOG records
2923 * since the start of the last checkpoint, skip the checkpoint. The
2924 * idea here is to avoid inserting duplicate checkpoints when the
2925 * system is idle. That wastes log space, and more importantly it
2926 * exposes us to possible loss of both current and previous checkpoint
2927 * records if the machine crashes just as we're writing the update.
2928 * (Perhaps it'd make even more sense to checkpoint only when the
2929 * previous checkpoint record is in a different xlog page?)
2931 * We have to make two tests to determine that nothing has happened since
2932 * the start of the last checkpoint: current insertion point must
2933 * match the end of the last checkpoint record, and its redo pointer
2934 * must point to itself.
2938 XLogRecPtr curInsert;
2940 INSERT_RECPTR(curInsert, Insert, Insert->curridx);
2941 if (curInsert.xlogid == ControlFile->checkPoint.xlogid &&
2942 curInsert.xrecoff == ControlFile->checkPoint.xrecoff +
2943 MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
2944 ControlFile->checkPoint.xlogid ==
2945 ControlFile->checkPointCopy.redo.xlogid &&
2946 ControlFile->checkPoint.xrecoff ==
2947 ControlFile->checkPointCopy.redo.xrecoff)
2949 LWLockRelease(WALInsertLock);
2950 LWLockRelease(CheckpointLock);
2957 * Compute new REDO record ptr = location of next XLOG record.
2959 * NB: this is NOT necessarily where the checkpoint record itself will
2960 * be, since other backends may insert more XLOG records while we're
2961 * off doing the buffer flush work. Those XLOG records are logically
2962 * after the checkpoint, even though physically before it. Got that?
2964 freespace = INSERT_FREESPACE(Insert);
2965 if (freespace < SizeOfXLogRecord)
2967 (void) AdvanceXLInsertBuffer();
2968 /* OK to ignore update return flag, since we will do flush anyway */
2969 freespace = BLCKSZ - SizeOfXLogPHD;
2971 INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx);
2974 * Here we update the shared RedoRecPtr for future XLogInsert calls;
2975 * this must be done while holding the insert lock.
2977 RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
2980 * Get UNDO record ptr - this is oldest of PROC->logRec values. We do
2981 * this while holding insert lock to ensure that we won't miss any
2982 * about-to-commit transactions (UNDO must include all xacts that have
2983 * commits after REDO point).
2985 * XXX temporarily ifdef'd out to avoid three-way deadlock condition:
2986 * GetUndoRecPtr needs to grab SInvalLock to ensure that it is looking
2987 * at a stable set of proc records, but grabbing SInvalLock while holding
2988 * WALInsertLock is no good. GetNewTransactionId may cause a WAL record
2989 * to be written while holding XidGenLock, and GetSnapshotData needs to
2990 * get XidGenLock while holding SInvalLock, so there's a risk of deadlock.
2991 * Need to find a better solution. See pgsql-hackers discussion of
2995 checkPoint.undo = GetUndoRecPtr();
2997 if (shutdown && checkPoint.undo.xrecoff != 0)
2998 elog(PANIC, "active transaction while database system is shutting down");
3002 * Now we can release insert lock, allowing other xacts to proceed
3003 * even while we are flushing disk buffers.
3005 LWLockRelease(WALInsertLock);
3007 LWLockAcquire(XidGenLock, LW_SHARED);
3008 checkPoint.nextXid = ShmemVariableCache->nextXid;
3009 LWLockRelease(XidGenLock);
3011 LWLockAcquire(OidGenLock, LW_SHARED);
3012 checkPoint.nextOid = ShmemVariableCache->nextOid;
3014 checkPoint.nextOid += ShmemVariableCache->oidCount;
3015 LWLockRelease(OidGenLock);
3018 * Having constructed the checkpoint record, ensure all shmem disk
3019 * buffers are flushed to disk.
3023 /* And commit-log buffers, too */
3027 * Now insert the checkpoint record into XLOG.
3029 rdata.buffer = InvalidBuffer;
3030 rdata.data = (char *) (&checkPoint);
3031 rdata.len = sizeof(checkPoint);
3034 recptr = XLogInsert(RM_XLOG_ID,
3035 shutdown ? XLOG_CHECKPOINT_SHUTDOWN :
3036 XLOG_CHECKPOINT_ONLINE,
3042 * We now have ProcLastRecPtr = start of actual checkpoint record,
3043 * recptr = end of actual checkpoint record.
3045 if (shutdown && !XLByteEQ(checkPoint.redo, ProcLastRecPtr))
3046 elog(PANIC, "concurrent transaction log activity while database system is shutting down");
3049 * Select point at which we can truncate the log, which we base on the
3050 * prior checkpoint's earliest info.
3052 * With UNDO support: oldest item is redo or undo, whichever is older;
3053 * but watch out for case that undo = 0.
3055 * Without UNDO support: just use the redo pointer. This allows xlog
3056 * space to be freed much faster when there are long-running
3060 if (ControlFile->checkPointCopy.undo.xrecoff != 0 &&
3061 XLByteLT(ControlFile->checkPointCopy.undo,
3062 ControlFile->checkPointCopy.redo))
3063 XLByteToSeg(ControlFile->checkPointCopy.undo, _logId, _logSeg);
3066 XLByteToSeg(ControlFile->checkPointCopy.redo, _logId, _logSeg);
3069 * Update the control file.
3071 LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
3073 ControlFile->state = DB_SHUTDOWNED;
3074 ControlFile->prevCheckPoint = ControlFile->checkPoint;
3075 ControlFile->checkPoint = ProcLastRecPtr;
3076 ControlFile->checkPointCopy = checkPoint;
3077 ControlFile->time = time(NULL);
3078 UpdateControlFile();
3079 LWLockRelease(ControlFileLock);
3082 * Delete offline log files (those no longer needed even for previous
3085 if (_logId || _logSeg)
3087 PrevLogSeg(_logId, _logSeg);
3088 MoveOfflineLogs(_logId, _logSeg, recptr);
3092 * Make more log segments if needed. (Do this after deleting offline
3093 * log segments, to avoid having peak disk space usage higher than
3097 PreallocXlogFiles(recptr);
3099 LWLockRelease(CheckpointLock);
3105 * Write a NEXTOID log record
3108 XLogPutNextOid(Oid nextOid)
3112 rdata.buffer = InvalidBuffer;
3113 rdata.data = (char *) (&nextOid);
3114 rdata.len = sizeof(Oid);
3116 (void) XLogInsert(RM_XLOG_ID, XLOG_NEXTOID, &rdata);
3120 * XLOG resource manager's routines
3123 xlog_redo(XLogRecPtr lsn, XLogRecord *record)
3125 uint8 info = record->xl_info & ~XLR_INFO_MASK;
3127 if (info == XLOG_NEXTOID)
3131 memcpy(&nextOid, XLogRecGetData(record), sizeof(Oid));
3132 if (ShmemVariableCache->nextOid < nextOid)
3134 ShmemVariableCache->nextOid = nextOid;
3135 ShmemVariableCache->oidCount = 0;
3138 else if (info == XLOG_CHECKPOINT_SHUTDOWN)
3140 CheckPoint checkPoint;
3142 memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
3143 /* In a SHUTDOWN checkpoint, believe the counters exactly */
3144 ShmemVariableCache->nextXid = checkPoint.nextXid;
3145 ShmemVariableCache->nextOid = checkPoint.nextOid;
3146 ShmemVariableCache->oidCount = 0;
3148 else if (info == XLOG_CHECKPOINT_ONLINE)
3150 CheckPoint checkPoint;
3152 memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
3153 /* In an ONLINE checkpoint, treat the counters like NEXTOID */
3154 if (TransactionIdPrecedes(ShmemVariableCache->nextXid,
3155 checkPoint.nextXid))
3156 ShmemVariableCache->nextXid = checkPoint.nextXid;
3157 if (ShmemVariableCache->nextOid < checkPoint.nextOid)
3159 ShmemVariableCache->nextOid = checkPoint.nextOid;
3160 ShmemVariableCache->oidCount = 0;
3166 xlog_undo(XLogRecPtr lsn, XLogRecord *record)
3171 xlog_desc(char *buf, uint8 xl_info, char *rec)
3173 uint8 info = xl_info & ~XLR_INFO_MASK;
3175 if (info == XLOG_CHECKPOINT_SHUTDOWN ||
3176 info == XLOG_CHECKPOINT_ONLINE)
3178 CheckPoint *checkpoint = (CheckPoint *) rec;
3180 sprintf(buf + strlen(buf), "checkpoint: redo %X/%X; undo %X/%X; "
3181 "sui %u; xid %u; oid %u; %s",
3182 checkpoint->redo.xlogid, checkpoint->redo.xrecoff,
3183 checkpoint->undo.xlogid, checkpoint->undo.xrecoff,
3184 checkpoint->ThisStartUpID, checkpoint->nextXid,
3185 checkpoint->nextOid,
3186 (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
3188 else if (info == XLOG_NEXTOID)
3192 memcpy(&nextOid, rec, sizeof(Oid));
3193 sprintf(buf + strlen(buf), "nextOid: %u", nextOid);
3196 strcat(buf, "UNKNOWN");
3200 xlog_outrec(char *buf, XLogRecord *record)
3205 sprintf(buf + strlen(buf), "prev %X/%X; xprev %X/%X; xid %u",
3206 record->xl_prev.xlogid, record->xl_prev.xrecoff,
3207 record->xl_xact_prev.xlogid, record->xl_xact_prev.xrecoff,
3210 for (i = 0, bkpb = 0; i < XLR_MAX_BKP_BLOCKS; i++)
3212 if (!(record->xl_info & (XLR_SET_BKP_BLOCK(i))))
3218 sprintf(buf + strlen(buf), "; bkpb %d", bkpb);
3220 sprintf(buf + strlen(buf), ": %s",
3221 RmgrTable[record->xl_rmid].rm_name);
3226 * GUC support routines
3230 check_xlog_sync_method(const char *method)
3232 if (strcasecmp(method, "fsync") == 0)
3234 #ifdef HAVE_FDATASYNC
3235 if (strcasecmp(method, "fdatasync") == 0)
3238 #ifdef OPEN_SYNC_FLAG
3239 if (strcasecmp(method, "open_sync") == 0)
3242 #ifdef OPEN_DATASYNC_FLAG
3243 if (strcasecmp(method, "open_datasync") == 0)
3250 assign_xlog_sync_method(const char *method)
3252 int new_sync_method;
3255 if (strcasecmp(method, "fsync") == 0)
3257 new_sync_method = SYNC_METHOD_FSYNC;
3260 #ifdef HAVE_FDATASYNC
3261 else if (strcasecmp(method, "fdatasync") == 0)
3263 new_sync_method = SYNC_METHOD_FDATASYNC;
3267 #ifdef OPEN_SYNC_FLAG
3268 else if (strcasecmp(method, "open_sync") == 0)
3270 new_sync_method = SYNC_METHOD_OPEN;
3271 new_sync_bit = OPEN_SYNC_FLAG;
3274 #ifdef OPEN_DATASYNC_FLAG
3275 else if (strcasecmp(method, "open_datasync") == 0)
3277 new_sync_method = SYNC_METHOD_OPEN;
3278 new_sync_bit = OPEN_DATASYNC_FLAG;
3283 /* Can't get here unless guc.c screwed up */
3284 elog(ERROR, "bogus wal_sync_method %s", method);
3285 new_sync_method = 0; /* keep compiler quiet */
3289 if (sync_method != new_sync_method || open_sync_bit != new_sync_bit)
3292 * To ensure that no blocks escape unsynced, force an fsync on the
3293 * currently open log segment (if any). Also, if the open flag is
3294 * changing, close the log file so it will be reopened (with new
3295 * flag bit) at next use.
3297 if (openLogFile >= 0)
3299 if (pg_fsync(openLogFile) != 0)
3300 elog(PANIC, "fsync of log file %u, segment %u failed: %m",
3301 openLogId, openLogSeg);
3302 if (open_sync_bit != new_sync_bit)
3304 if (close(openLogFile) != 0)
3305 elog(PANIC, "close of log file %u, segment %u failed: %m",
3306 openLogId, openLogSeg);
3310 sync_method = new_sync_method;
3311 open_sync_bit = new_sync_bit;
3317 * Issue appropriate kind of fsync (if any) on the current XLOG output file
3320 issue_xlog_fsync(void)
3322 switch (sync_method)
3324 case SYNC_METHOD_FSYNC:
3325 if (pg_fsync(openLogFile) != 0)
3326 elog(PANIC, "fsync of log file %u, segment %u failed: %m",
3327 openLogId, openLogSeg);
3329 #ifdef HAVE_FDATASYNC
3330 case SYNC_METHOD_FDATASYNC:
3331 if (pg_fdatasync(openLogFile) != 0)
3332 elog(PANIC, "fdatasync of log file %u, segment %u failed: %m",
3333 openLogId, openLogSeg);
3336 case SYNC_METHOD_OPEN:
3337 /* write synced it already */
3340 elog(PANIC, "bogus wal_sync_method %d", sync_method);