]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/xlog.c
Cosmetic code cleanup: fix a bunch of places that used "return (expr);"
[postgresql] / src / backend / access / transam / xlog.c
1 /*-------------------------------------------------------------------------
2  *
3  * xlog.c
4  *              PostgreSQL transaction log manager
5  *
6  *
7  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.226 2006/01/11 08:43:12 neilc Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres.h"
16
17 #include <ctype.h>
18 #include <fcntl.h>
19 #include <signal.h>
20 #include <time.h>
21 #include <unistd.h>
22 #include <sys/stat.h>
23 #include <sys/time.h>
24
25 #include "access/clog.h"
26 #include "access/multixact.h"
27 #include "access/subtrans.h"
28 #include "access/twophase.h"
29 #include "access/xact.h"
30 #include "access/xlog.h"
31 #include "access/xlog_internal.h"
32 #include "access/xlogutils.h"
33 #include "catalog/catversion.h"
34 #include "catalog/pg_control.h"
35 #include "miscadmin.h"
36 #include "pgstat.h"
37 #include "postmaster/bgwriter.h"
38 #include "storage/bufpage.h"
39 #include "storage/fd.h"
40 #include "storage/lwlock.h"
41 #include "storage/pmsignal.h"
42 #include "storage/proc.h"
43 #include "storage/procarray.h"
44 #include "storage/spin.h"
45 #include "utils/builtins.h"
46 #include "utils/guc.h"
47 #include "utils/nabstime.h"
48 #include "utils/pg_locale.h"
49 #include "utils/relcache.h"
50
51
52 /*
53  *      Because O_DIRECT bypasses the kernel buffers, and because we never
54  *      read those buffers except during crash recovery, it is a win to use
55  *      it in all cases where we sync on each write().  We could allow O_DIRECT
56  *      with fsync(), but because skipping the kernel buffer forces writes out
57  *      quickly, it seems best just to use it for O_SYNC.  It is hard to imagine
58  *      how fsync() could be a win for O_DIRECT compared to O_SYNC and O_DIRECT.
59  *      Also, O_DIRECT is never enough to force data to the drives, it merely
60  *      tries to bypass the kernel cache, so we still need O_SYNC or fsync().
61  */
62 #ifdef O_DIRECT
63 #define PG_O_DIRECT                             O_DIRECT
64 #else
65 #define PG_O_DIRECT                             0
66 #endif
67
68 /*
69  * This chunk of hackery attempts to determine which file sync methods
70  * are available on the current platform, and to choose an appropriate
71  * default method.      We assume that fsync() is always available, and that
72  * configure determined whether fdatasync() is.
73  */
74 #if defined(O_SYNC)
75 #define BARE_OPEN_SYNC_FLAG             O_SYNC
76 #elif defined(O_FSYNC)
77 #define BARE_OPEN_SYNC_FLAG             O_FSYNC
78 #endif
79 #ifdef BARE_OPEN_SYNC_FLAG
80 #define OPEN_SYNC_FLAG                  (BARE_OPEN_SYNC_FLAG | PG_O_DIRECT)
81 #endif
82
83 #if defined(O_DSYNC)
84 #if defined(OPEN_SYNC_FLAG)
85 /* O_DSYNC is distinct? */
86 #if O_DSYNC != BARE_OPEN_SYNC_FLAG
87 #define OPEN_DATASYNC_FLAG              (O_DSYNC | PG_O_DIRECT)
88 #endif
89 #else                                                   /* !defined(OPEN_SYNC_FLAG) */
90 /* Win32 only has O_DSYNC */
91 #define OPEN_DATASYNC_FLAG              (O_DSYNC | PG_O_DIRECT)
92 #endif
93 #endif
94
95 #if defined(OPEN_DATASYNC_FLAG)
96 #define DEFAULT_SYNC_METHOD_STR "open_datasync"
97 #define DEFAULT_SYNC_METHOD             SYNC_METHOD_OPEN
98 #define DEFAULT_SYNC_FLAGBIT    OPEN_DATASYNC_FLAG
99 #elif defined(HAVE_FDATASYNC)
100 #define DEFAULT_SYNC_METHOD_STR "fdatasync"
101 #define DEFAULT_SYNC_METHOD             SYNC_METHOD_FDATASYNC
102 #define DEFAULT_SYNC_FLAGBIT    0
103 #elif defined(HAVE_FSYNC_WRITETHROUGH_ONLY)
104 #define DEFAULT_SYNC_METHOD_STR "fsync_writethrough"
105 #define DEFAULT_SYNC_METHOD             SYNC_METHOD_FSYNC_WRITETHROUGH
106 #define DEFAULT_SYNC_FLAGBIT    0
107 #else
108 #define DEFAULT_SYNC_METHOD_STR "fsync"
109 #define DEFAULT_SYNC_METHOD             SYNC_METHOD_FSYNC
110 #define DEFAULT_SYNC_FLAGBIT    0
111 #endif
112
113
114 /*
115  * Limitation of buffer-alignment for direct IO depends on OS and filesystem,
116  * but BLCKSZ is assumed to be enough for it.
117  */
118 #ifdef O_DIRECT
119 #define ALIGNOF_XLOG_BUFFER             BLCKSZ
120 #else
121 #define ALIGNOF_XLOG_BUFFER             ALIGNOF_BUFFER
122 #endif
123
124
125 /* File path names (all relative to $PGDATA) */
126 #define BACKUP_LABEL_FILE               "backup_label"
127 #define RECOVERY_COMMAND_FILE   "recovery.conf"
128 #define RECOVERY_COMMAND_DONE   "recovery.done"
129
130
131 /* User-settable parameters */
132 int                     CheckPointSegments = 3;
133 int                     XLOGbuffers = 8;
134 char       *XLogArchiveCommand = NULL;
135 char       *XLOG_sync_method = NULL;
136 const char      XLOG_sync_method_default[] = DEFAULT_SYNC_METHOD_STR;
137 bool            fullPageWrites = true;
138
139 #ifdef WAL_DEBUG
140 bool            XLOG_DEBUG = false;
141 #endif
142
143 /*
144  * XLOGfileslop is used in the code as the allowed "fuzz" in the number of
145  * preallocated XLOG segments --- we try to have at least XLOGfiles advance
146  * segments but no more than XLOGfileslop segments.  This could
147  * be made a separate GUC variable, but at present I think it's sufficient
148  * to hardwire it as 2*CheckPointSegments+1.  Under normal conditions, a
149  * checkpoint will free no more than 2*CheckPointSegments log segments, and
150  * we want to recycle all of them; the +1 allows boundary cases to happen
151  * without wasting a delete/create-segment cycle.
152  */
153
154 #define XLOGfileslop    (2*CheckPointSegments + 1)
155
156
157 /* these are derived from XLOG_sync_method by assign_xlog_sync_method */
158 int                     sync_method = DEFAULT_SYNC_METHOD;
159 static int      open_sync_bit = DEFAULT_SYNC_FLAGBIT;
160
161 #define XLOG_SYNC_BIT  (enableFsync ? open_sync_bit : 0)
162
163
164 /*
165  * ThisTimeLineID will be same in all backends --- it identifies current
166  * WAL timeline for the database system.
167  */
168 TimeLineID      ThisTimeLineID = 0;
169
170 /* Are we doing recovery from XLOG? */
171 bool            InRecovery = false;
172
173 /* Are we recovering using offline XLOG archives? */
174 static bool InArchiveRecovery = false;
175
176 /* Was the last xlog file restored from archive, or local? */
177 static bool restoredFromArchive = false;
178
179 /* options taken from recovery.conf */
180 static char *recoveryRestoreCommand = NULL;
181 static bool recoveryTarget = false;
182 static bool recoveryTargetExact = false;
183 static bool recoveryTargetInclusive = true;
184 static TransactionId recoveryTargetXid;
185 static time_t recoveryTargetTime;
186
187 /* if recoveryStopsHere returns true, it saves actual stop xid/time here */
188 static TransactionId recoveryStopXid;
189 static time_t recoveryStopTime;
190 static bool recoveryStopAfter;
191
192 /* constraint set by read_backup_label */
193 static XLogRecPtr recoveryMinXlogOffset = {0, 0};
194
195 /*
196  * During normal operation, the only timeline we care about is ThisTimeLineID.
197  * During recovery, however, things are more complicated.  To simplify life
198  * for rmgr code, we keep ThisTimeLineID set to the "current" timeline as we
199  * scan through the WAL history (that is, it is the line that was active when
200  * the currently-scanned WAL record was generated).  We also need these
201  * timeline values:
202  *
203  * recoveryTargetTLI: the desired timeline that we want to end in.
204  *
205  * expectedTLIs: an integer list of recoveryTargetTLI and the TLIs of
206  * its known parents, newest first (so recoveryTargetTLI is always the
207  * first list member).  Only these TLIs are expected to be seen in the WAL
208  * segments we read, and indeed only these TLIs will be considered as
209  * candidate WAL files to open at all.
210  *
211  * curFileTLI: the TLI appearing in the name of the current input WAL file.
212  * (This is not necessarily the same as ThisTimeLineID, because we could
213  * be scanning data that was copied from an ancestor timeline when the current
214  * file was created.)  During a sequential scan we do not allow this value
215  * to decrease.
216  */
217 static TimeLineID recoveryTargetTLI;
218 static List *expectedTLIs;
219 static TimeLineID curFileTLI;
220
221 /*
222  * MyLastRecPtr points to the start of the last XLOG record inserted by the
223  * current transaction.  If MyLastRecPtr.xrecoff == 0, then the current
224  * xact hasn't yet inserted any transaction-controlled XLOG records.
225  *
226  * Note that XLOG records inserted outside transaction control are not
227  * reflected into MyLastRecPtr.  They do, however, cause MyXactMadeXLogEntry
228  * to be set true.      The latter can be used to test whether the current xact
229  * made any loggable changes (including out-of-xact changes, such as
230  * sequence updates).
231  *
232  * When we insert/update/delete a tuple in a temporary relation, we do not
233  * make any XLOG record, since we don't care about recovering the state of
234  * the temp rel after a crash.  However, we will still need to remember
235  * whether our transaction committed or aborted in that case.  So, we must
236  * set MyXactMadeTempRelUpdate true to indicate that the XID will be of
237  * interest later.
238  */
239 XLogRecPtr      MyLastRecPtr = {0, 0};
240
241 bool            MyXactMadeXLogEntry = false;
242
243 bool            MyXactMadeTempRelUpdate = false;
244
245 /*
246  * ProcLastRecPtr points to the start of the last XLOG record inserted by the
247  * current backend.  It is updated for all inserts, transaction-controlled
248  * or not.      ProcLastRecEnd is similar but points to end+1 of last record.
249  */
250 static XLogRecPtr ProcLastRecPtr = {0, 0};
251
252 XLogRecPtr      ProcLastRecEnd = {0, 0};
253
254 /*
255  * RedoRecPtr is this backend's local copy of the REDO record pointer
256  * (which is almost but not quite the same as a pointer to the most recent
257  * CHECKPOINT record).  We update this from the shared-memory copy,
258  * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
259  * hold the Insert lock).  See XLogInsert for details.  We are also allowed
260  * to update from XLogCtl->Insert.RedoRecPtr if we hold the info_lck;
261  * see GetRedoRecPtr.  A freshly spawned backend obtains the value during
262  * InitXLOGAccess.
263  */
264 static XLogRecPtr RedoRecPtr;
265
266 /*----------
267  * Shared-memory data structures for XLOG control
268  *
269  * LogwrtRqst indicates a byte position that we need to write and/or fsync
270  * the log up to (all records before that point must be written or fsynced).
271  * LogwrtResult indicates the byte positions we have already written/fsynced.
272  * These structs are identical but are declared separately to indicate their
273  * slightly different functions.
274  *
275  * We do a lot of pushups to minimize the amount of access to lockable
276  * shared memory values.  There are actually three shared-memory copies of
277  * LogwrtResult, plus one unshared copy in each backend.  Here's how it works:
278  *              XLogCtl->LogwrtResult is protected by info_lck
279  *              XLogCtl->Write.LogwrtResult is protected by WALWriteLock
280  *              XLogCtl->Insert.LogwrtResult is protected by WALInsertLock
281  * One must hold the associated lock to read or write any of these, but
282  * of course no lock is needed to read/write the unshared LogwrtResult.
283  *
284  * XLogCtl->LogwrtResult and XLogCtl->Write.LogwrtResult are both "always
285  * right", since both are updated by a write or flush operation before
286  * it releases WALWriteLock.  The point of keeping XLogCtl->Write.LogwrtResult
287  * is that it can be examined/modified by code that already holds WALWriteLock
288  * without needing to grab info_lck as well.
289  *
290  * XLogCtl->Insert.LogwrtResult may lag behind the reality of the other two,
291  * but is updated when convenient.      Again, it exists for the convenience of
292  * code that is already holding WALInsertLock but not the other locks.
293  *
294  * The unshared LogwrtResult may lag behind any or all of these, and again
295  * is updated when convenient.
296  *
297  * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
298  * (protected by info_lck), but we don't need to cache any copies of it.
299  *
300  * Note that this all works because the request and result positions can only
301  * advance forward, never back up, and so we can easily determine which of two
302  * values is "more up to date".
303  *
304  * info_lck is only held long enough to read/update the protected variables,
305  * so it's a plain spinlock.  The other locks are held longer (potentially
306  * over I/O operations), so we use LWLocks for them.  These locks are:
307  *
308  * WALInsertLock: must be held to insert a record into the WAL buffers.
309  *
310  * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
311  * XLogFlush).
312  *
313  * ControlFileLock: must be held to read/update control file or create
314  * new log file.
315  *
316  * CheckpointLock: must be held to do a checkpoint (ensures only one
317  * checkpointer at a time; even though the postmaster won't launch
318  * parallel checkpoint processes, we need this because manual checkpoints
319  * could be launched simultaneously).
320  *
321  *----------
322  */
323
324 typedef struct XLogwrtRqst
325 {
326         XLogRecPtr      Write;                  /* last byte + 1 to write out */
327         XLogRecPtr      Flush;                  /* last byte + 1 to flush */
328 } XLogwrtRqst;
329
330 typedef struct XLogwrtResult
331 {
332         XLogRecPtr      Write;                  /* last byte + 1 written out */
333         XLogRecPtr      Flush;                  /* last byte + 1 flushed */
334 } XLogwrtResult;
335
336 /*
337  * Shared state data for XLogInsert.
338  */
339 typedef struct XLogCtlInsert
340 {
341         XLogwrtResult LogwrtResult; /* a recent value of LogwrtResult */
342         XLogRecPtr      PrevRecord;             /* start of previously-inserted record */
343         int                     curridx;                /* current block index in cache */
344         XLogPageHeader currpage;        /* points to header of block in cache */
345         char       *currpos;            /* current insertion point in cache */
346         XLogRecPtr      RedoRecPtr;             /* current redo point for insertions */
347 } XLogCtlInsert;
348
349 /*
350  * Shared state data for XLogWrite/XLogFlush.
351  */
352 typedef struct XLogCtlWrite
353 {
354         XLogwrtResult LogwrtResult; /* current value of LogwrtResult */
355         int                     curridx;                /* cache index of next block to write */
356 } XLogCtlWrite;
357
358 /*
359  * Total shared-memory state for XLOG.
360  */
361 typedef struct XLogCtlData
362 {
363         /* Protected by WALInsertLock: */
364         XLogCtlInsert Insert;
365         /* Protected by info_lck: */
366         XLogwrtRqst LogwrtRqst;
367         XLogwrtResult LogwrtResult;
368         /* Protected by WALWriteLock: */
369         XLogCtlWrite Write;
370
371         /*
372          * These values do not change after startup, although the pointed-to pages
373          * and xlblocks values certainly do.  Permission to read/write the pages
374          * and xlblocks values depends on WALInsertLock and WALWriteLock.
375          */
376         char       *pages;                      /* buffers for unwritten XLOG pages */
377         XLogRecPtr *xlblocks;           /* 1st byte ptr-s + BLCKSZ */
378         Size            XLogCacheByte;  /* # bytes in xlog buffers */
379         int                     XLogCacheBlck;  /* highest allocated xlog buffer index */
380         TimeLineID      ThisTimeLineID;
381
382         slock_t         info_lck;               /* locks shared LogwrtRqst/LogwrtResult */
383 } XLogCtlData;
384
385 static XLogCtlData *XLogCtl = NULL;
386
387 /*
388  * We maintain an image of pg_control in shared memory.
389  */
390 static ControlFileData *ControlFile = NULL;
391
392 /*
393  * Macros for managing XLogInsert state.  In most cases, the calling routine
394  * has local copies of XLogCtl->Insert and/or XLogCtl->Insert->curridx,
395  * so these are passed as parameters instead of being fetched via XLogCtl.
396  */
397
398 /* Free space remaining in the current xlog page buffer */
399 #define INSERT_FREESPACE(Insert)  \
400         (BLCKSZ - ((Insert)->currpos - (char *) (Insert)->currpage))
401
402 /* Construct XLogRecPtr value for current insertion point */
403 #define INSERT_RECPTR(recptr,Insert,curridx)  \
404         ( \
405           (recptr).xlogid = XLogCtl->xlblocks[curridx].xlogid, \
406           (recptr).xrecoff = \
407                 XLogCtl->xlblocks[curridx].xrecoff - INSERT_FREESPACE(Insert) \
408         )
409
410 #define PrevBufIdx(idx)         \
411                 (((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))
412
413 #define NextBufIdx(idx)         \
414                 (((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
415
416 /*
417  * Private, possibly out-of-date copy of shared LogwrtResult.
418  * See discussion above.
419  */
420 static XLogwrtResult LogwrtResult = {{0, 0}, {0, 0}};
421
422 /*
423  * openLogFile is -1 or a kernel FD for an open log file segment.
424  * When it's open, openLogOff is the current seek offset in the file.
425  * openLogId/openLogSeg identify the segment.  These variables are only
426  * used to write the XLOG, and so will normally refer to the active segment.
427  */
428 static int      openLogFile = -1;
429 static uint32 openLogId = 0;
430 static uint32 openLogSeg = 0;
431 static uint32 openLogOff = 0;
432
433 /*
434  * These variables are used similarly to the ones above, but for reading
435  * the XLOG.  Note, however, that readOff generally represents the offset
436  * of the page just read, not the seek position of the FD itself, which
437  * will be just past that page.
438  */
439 static int      readFile = -1;
440 static uint32 readId = 0;
441 static uint32 readSeg = 0;
442 static uint32 readOff = 0;
443
444 /* Buffer for currently read page (BLCKSZ bytes) */
445 static char *readBuf = NULL;
446
447 /* Buffer for current ReadRecord result (expandable) */
448 static char *readRecordBuf = NULL;
449 static uint32 readRecordBufSize = 0;
450
451 /* State information for XLOG reading */
452 static XLogRecPtr ReadRecPtr;   /* start of last record read */
453 static XLogRecPtr EndRecPtr;    /* end+1 of last record read */
454 static XLogRecord *nextRecord = NULL;
455 static TimeLineID lastPageTLI = 0;
456
457 static bool InRedo = false;
458
459
460 static void XLogArchiveNotify(const char *xlog);
461 static void XLogArchiveNotifySeg(uint32 log, uint32 seg);
462 static bool XLogArchiveIsDone(const char *xlog);
463 static void XLogArchiveCleanup(const char *xlog);
464 static void readRecoveryCommandFile(void);
465 static void exitArchiveRecovery(TimeLineID endTLI,
466                                         uint32 endLogId, uint32 endLogSeg);
467 static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
468
469 static bool XLogCheckBuffer(XLogRecData *rdata,
470                                 XLogRecPtr *lsn, BkpBlock *bkpb);
471 static bool AdvanceXLInsertBuffer(void);
472 static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
473 static int XLogFileInit(uint32 log, uint32 seg,
474                          bool *use_existent, bool use_lock);
475 static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
476                                            bool find_free, int *max_advance,
477                                            bool use_lock);
478 static int      XLogFileOpen(uint32 log, uint32 seg);
479 static int      XLogFileRead(uint32 log, uint32 seg, int emode);
480 static bool RestoreArchivedFile(char *path, const char *xlogfname,
481                                         const char *recovername, off_t expectedSize);
482 static int      PreallocXlogFiles(XLogRecPtr endptr);
483 static void MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr,
484                                 int *nsegsremoved, int *nsegsrecycled);
485 static void RemoveOldBackupHistory(void);
486 static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode);
487 static bool ValidXLOGHeader(XLogPageHeader hdr, int emode);
488 static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
489 static List *readTimeLineHistory(TimeLineID targetTLI);
490 static bool existsTimeLineHistory(TimeLineID probeTLI);
491 static TimeLineID findNewestTimeLine(TimeLineID startTLI);
492 static void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
493                                          TimeLineID endTLI,
494                                          uint32 endLogId, uint32 endLogSeg);
495 static void WriteControlFile(void);
496 static void ReadControlFile(void);
497 static char *str_time(time_t tnow);
498 static void issue_xlog_fsync(void);
499
500 #ifdef WAL_DEBUG
501 static void xlog_outrec(char *buf, XLogRecord *record);
502 #endif
503 static bool read_backup_label(XLogRecPtr *checkPointLoc);
504 static void remove_backup_label(void);
505
506
507 /*
508  * Insert an XLOG record having the specified RMID and info bytes,
509  * with the body of the record being the data chunk(s) described by
510  * the rdata chain (see xlog.h for notes about rdata).
511  *
512  * Returns XLOG pointer to end of record (beginning of next record).
513  * This can be used as LSN for data pages affected by the logged action.
514  * (LSN is the XLOG point up to which the XLOG must be flushed to disk
515  * before the data page can be written out.  This implements the basic
516  * WAL rule "write the log before the data".)
517  *
518  * NB: this routine feels free to scribble on the XLogRecData structs,
519  * though not on the data they reference.  This is OK since the XLogRecData
520  * structs are always just temporaries in the calling code.
521  */
522 XLogRecPtr
523 XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
524 {
525         XLogCtlInsert *Insert = &XLogCtl->Insert;
526         XLogRecord *record;
527         XLogContRecord *contrecord;
528         XLogRecPtr      RecPtr;
529         XLogRecPtr      WriteRqst;
530         uint32          freespace;
531         int                     curridx;
532         XLogRecData *rdt;
533         Buffer          dtbuf[XLR_MAX_BKP_BLOCKS];
534         bool            dtbuf_bkp[XLR_MAX_BKP_BLOCKS];
535         BkpBlock        dtbuf_xlg[XLR_MAX_BKP_BLOCKS];
536         XLogRecPtr      dtbuf_lsn[XLR_MAX_BKP_BLOCKS];
537         XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS];
538         XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS];
539         XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS];
540         pg_crc32        rdata_crc;
541         uint32          len,
542                                 write_len;
543         unsigned        i;
544         XLogwrtRqst LogwrtRqst;
545         bool            updrqst;
546         bool            no_tran = (rmid == RM_XLOG_ID) ? true : false;
547
548         if (info & XLR_INFO_MASK)
549         {
550                 if ((info & XLR_INFO_MASK) != XLOG_NO_TRAN)
551                         elog(PANIC, "invalid xlog info mask %02X", (info & XLR_INFO_MASK));
552                 no_tran = true;
553                 info &= ~XLR_INFO_MASK;
554         }
555
556         /*
557          * In bootstrap mode, we don't actually log anything but XLOG resources;
558          * return a phony record pointer.
559          */
560         if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
561         {
562                 RecPtr.xlogid = 0;
563                 RecPtr.xrecoff = SizeOfXLogLongPHD;             /* start of 1st chkpt record */
564                 return RecPtr;
565         }
566
567         /*
568          * Here we scan the rdata chain, determine which buffers must be backed
569          * up, and compute the CRC values for the data.  Note that the record
570          * header isn't added into the CRC initially since we don't know the final
571          * length or info bits quite yet.  Thus, the CRC will represent the CRC of
572          * the whole record in the order "rdata, then backup blocks, then record
573          * header".
574          *
575          * We may have to loop back to here if a race condition is detected below.
576          * We could prevent the race by doing all this work while holding the
577          * insert lock, but it seems better to avoid doing CRC calculations while
578          * holding the lock.  This means we have to be careful about modifying the
579          * rdata chain until we know we aren't going to loop back again.  The only
580          * change we allow ourselves to make earlier is to set rdt->data = NULL in
581          * chain items we have decided we will have to back up the whole buffer
582          * for.  This is OK because we will certainly decide the same thing again
583          * for those items if we do it over; doing it here saves an extra pass
584          * over the chain later.
585          */
586 begin:;
587         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
588         {
589                 dtbuf[i] = InvalidBuffer;
590                 dtbuf_bkp[i] = false;
591         }
592
593         INIT_CRC32(rdata_crc);
594         len = 0;
595         for (rdt = rdata;;)
596         {
597                 if (rdt->buffer == InvalidBuffer)
598                 {
599                         /* Simple data, just include it */
600                         len += rdt->len;
601                         COMP_CRC32(rdata_crc, rdt->data, rdt->len);
602                 }
603                 else
604                 {
605                         /* Find info for buffer */
606                         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
607                         {
608                                 if (rdt->buffer == dtbuf[i])
609                                 {
610                                         /* Buffer already referenced by earlier chain item */
611                                         if (dtbuf_bkp[i])
612                                                 rdt->data = NULL;
613                                         else if (rdt->data)
614                                         {
615                                                 len += rdt->len;
616                                                 COMP_CRC32(rdata_crc, rdt->data, rdt->len);
617                                         }
618                                         break;
619                                 }
620                                 if (dtbuf[i] == InvalidBuffer)
621                                 {
622                                         /* OK, put it in this slot */
623                                         dtbuf[i] = rdt->buffer;
624                                         if (XLogCheckBuffer(rdt, &(dtbuf_lsn[i]), &(dtbuf_xlg[i])))
625                                         {
626                                                 dtbuf_bkp[i] = true;
627                                                 rdt->data = NULL;
628                                         }
629                                         else if (rdt->data)
630                                         {
631                                                 len += rdt->len;
632                                                 COMP_CRC32(rdata_crc, rdt->data, rdt->len);
633                                         }
634                                         break;
635                                 }
636                         }
637                         if (i >= XLR_MAX_BKP_BLOCKS)
638                                 elog(PANIC, "can backup at most %d blocks per xlog record",
639                                          XLR_MAX_BKP_BLOCKS);
640                 }
641                 /* Break out of loop when rdt points to last chain item */
642                 if (rdt->next == NULL)
643                         break;
644                 rdt = rdt->next;
645         }
646
647         /*
648          * Now add the backup block headers and data into the CRC
649          */
650         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
651         {
652                 if (dtbuf_bkp[i])
653                 {
654                         BkpBlock   *bkpb = &(dtbuf_xlg[i]);
655                         char       *page;
656
657                         COMP_CRC32(rdata_crc,
658                                            (char *) bkpb,
659                                            sizeof(BkpBlock));
660                         page = (char *) BufferGetBlock(dtbuf[i]);
661                         if (bkpb->hole_length == 0)
662                         {
663                                 COMP_CRC32(rdata_crc,
664                                                    page,
665                                                    BLCKSZ);
666                         }
667                         else
668                         {
669                                 /* must skip the hole */
670                                 COMP_CRC32(rdata_crc,
671                                                    page,
672                                                    bkpb->hole_offset);
673                                 COMP_CRC32(rdata_crc,
674                                                    page + (bkpb->hole_offset + bkpb->hole_length),
675                                                    BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
676                         }
677                 }
678         }
679
680         /*
681          * NOTE: the test for len == 0 here is somewhat fishy, since in theory all
682          * of the rmgr data might have been suppressed in favor of backup blocks.
683          * Currently, all callers of XLogInsert provide at least some
684          * not-in-a-buffer data and so len == 0 should never happen, but that may
685          * not be true forever.  If you need to remove the len == 0 check, also
686          * remove the check for xl_len == 0 in ReadRecord, below.
687          */
688         if (len == 0)
689                 elog(PANIC, "invalid xlog record length %u", len);
690
691         START_CRIT_SECTION();
692
693         /* update LogwrtResult before doing cache fill check */
694         {
695                 /* use volatile pointer to prevent code rearrangement */
696                 volatile XLogCtlData *xlogctl = XLogCtl;
697
698                 SpinLockAcquire(&xlogctl->info_lck);
699                 LogwrtRqst = xlogctl->LogwrtRqst;
700                 LogwrtResult = xlogctl->LogwrtResult;
701                 SpinLockRelease(&xlogctl->info_lck);
702         }
703
704         /*
705          * If cache is half filled then try to acquire write lock and do
706          * XLogWrite. Ignore any fractional blocks in performing this check.
707          */
708         LogwrtRqst.Write.xrecoff -= LogwrtRqst.Write.xrecoff % BLCKSZ;
709         if (LogwrtRqst.Write.xlogid != LogwrtResult.Write.xlogid ||
710                 (LogwrtRqst.Write.xrecoff >= LogwrtResult.Write.xrecoff +
711                  XLogCtl->XLogCacheByte / 2))
712         {
713                 if (LWLockConditionalAcquire(WALWriteLock, LW_EXCLUSIVE))
714                 {
715                         /*
716                          * Since the amount of data we write here is completely optional
717                          * anyway, tell XLogWrite it can be "flexible" and stop at a
718                          * convenient boundary.  This allows writes triggered by this
719                          * mechanism to synchronize with the cache boundaries, so that in
720                          * a long transaction we'll basically dump alternating halves of
721                          * the buffer array.
722                          */
723                         LogwrtResult = XLogCtl->Write.LogwrtResult;
724                         if (XLByteLT(LogwrtResult.Write, LogwrtRqst.Write))
725                                 XLogWrite(LogwrtRqst, true);
726                         LWLockRelease(WALWriteLock);
727                 }
728         }
729
730         /* Now wait to get insert lock */
731         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
732
733         /*
734          * Check to see if my RedoRecPtr is out of date.  If so, may have to go
735          * back and recompute everything.  This can only happen just after a
736          * checkpoint, so it's better to be slow in this case and fast otherwise.
737          */
738         if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))
739         {
740                 Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));
741                 RedoRecPtr = Insert->RedoRecPtr;
742
743                 for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
744                 {
745                         if (dtbuf[i] == InvalidBuffer)
746                                 continue;
747                         if (dtbuf_bkp[i] == false &&
748                                 XLByteLE(dtbuf_lsn[i], RedoRecPtr))
749                         {
750                                 /*
751                                  * Oops, this buffer now needs to be backed up, but we didn't
752                                  * think so above.      Start over.
753                                  */
754                                 LWLockRelease(WALInsertLock);
755                                 END_CRIT_SECTION();
756                                 goto begin;
757                         }
758                 }
759         }
760
761         /*
762          * Make additional rdata chain entries for the backup blocks, so that we
763          * don't need to special-case them in the write loop.  Note that we have
764          * now irrevocably changed the input rdata chain.  At the exit of this
765          * loop, write_len includes the backup block data.
766          *
767          * Also set the appropriate info bits to show which buffers were backed
768          * up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th distinct
769          * buffer value (ignoring InvalidBuffer) appearing in the rdata chain.
770          */
771         write_len = len;
772         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
773         {
774                 BkpBlock   *bkpb;
775                 char       *page;
776
777                 if (!dtbuf_bkp[i])
778                         continue;
779
780                 info |= XLR_SET_BKP_BLOCK(i);
781
782                 bkpb = &(dtbuf_xlg[i]);
783                 page = (char *) BufferGetBlock(dtbuf[i]);
784
785                 rdt->next = &(dtbuf_rdt1[i]);
786                 rdt = rdt->next;
787
788                 rdt->data = (char *) bkpb;
789                 rdt->len = sizeof(BkpBlock);
790                 write_len += sizeof(BkpBlock);
791
792                 rdt->next = &(dtbuf_rdt2[i]);
793                 rdt = rdt->next;
794
795                 if (bkpb->hole_length == 0)
796                 {
797                         rdt->data = page;
798                         rdt->len = BLCKSZ;
799                         write_len += BLCKSZ;
800                         rdt->next = NULL;
801                 }
802                 else
803                 {
804                         /* must skip the hole */
805                         rdt->data = page;
806                         rdt->len = bkpb->hole_offset;
807                         write_len += bkpb->hole_offset;
808
809                         rdt->next = &(dtbuf_rdt3[i]);
810                         rdt = rdt->next;
811
812                         rdt->data = page + (bkpb->hole_offset + bkpb->hole_length);
813                         rdt->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length);
814                         write_len += rdt->len;
815                         rdt->next = NULL;
816                 }
817         }
818
819         /*
820          * If there isn't enough space on the current XLOG page for a record
821          * header, advance to the next page (leaving the unused space as zeroes).
822          */
823         updrqst = false;
824         freespace = INSERT_FREESPACE(Insert);
825         if (freespace < SizeOfXLogRecord)
826         {
827                 updrqst = AdvanceXLInsertBuffer();
828                 freespace = INSERT_FREESPACE(Insert);
829         }
830
831         curridx = Insert->curridx;
832         record = (XLogRecord *) Insert->currpos;
833
834         /* Insert record header */
835
836         record->xl_prev = Insert->PrevRecord;
837         record->xl_xid = GetCurrentTransactionIdIfAny();
838         record->xl_tot_len = SizeOfXLogRecord + write_len;
839         record->xl_len = len;           /* doesn't include backup blocks */
840         record->xl_info = info;
841         record->xl_rmid = rmid;
842
843         /* Now we can finish computing the record's CRC */
844         COMP_CRC32(rdata_crc, (char *) record + sizeof(pg_crc32),
845                            SizeOfXLogRecord - sizeof(pg_crc32));
846         FIN_CRC32(rdata_crc);
847         record->xl_crc = rdata_crc;
848
849         /* Compute record's XLOG location */
850         INSERT_RECPTR(RecPtr, Insert, curridx);
851
852 #ifdef WAL_DEBUG
853         if (XLOG_DEBUG)
854         {
855                 char            buf[8192];
856
857                 sprintf(buf, "INSERT @ %X/%X: ", RecPtr.xlogid, RecPtr.xrecoff);
858                 xlog_outrec(buf, record);
859                 if (rdata->data != NULL)
860                 {
861                         strcat(buf, " - ");
862                         RmgrTable[record->xl_rmid].rm_desc(buf, record->xl_info, rdata->data);
863                 }
864                 elog(LOG, "%s", buf);
865         }
866 #endif
867
868         /* Record begin of record in appropriate places */
869         if (!no_tran)
870                 MyLastRecPtr = RecPtr;
871         ProcLastRecPtr = RecPtr;
872         Insert->PrevRecord = RecPtr;
873         MyXactMadeXLogEntry = true;
874
875         Insert->currpos += SizeOfXLogRecord;
876         freespace -= SizeOfXLogRecord;
877
878         /*
879          * Append the data, including backup blocks if any
880          */
881         while (write_len)
882         {
883                 while (rdata->data == NULL)
884                         rdata = rdata->next;
885
886                 if (freespace > 0)
887                 {
888                         if (rdata->len > freespace)
889                         {
890                                 memcpy(Insert->currpos, rdata->data, freespace);
891                                 rdata->data += freespace;
892                                 rdata->len -= freespace;
893                                 write_len -= freespace;
894                         }
895                         else
896                         {
897                                 memcpy(Insert->currpos, rdata->data, rdata->len);
898                                 freespace -= rdata->len;
899                                 write_len -= rdata->len;
900                                 Insert->currpos += rdata->len;
901                                 rdata = rdata->next;
902                                 continue;
903                         }
904                 }
905
906                 /* Use next buffer */
907                 updrqst = AdvanceXLInsertBuffer();
908                 curridx = Insert->curridx;
909                 /* Insert cont-record header */
910                 Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
911                 contrecord = (XLogContRecord *) Insert->currpos;
912                 contrecord->xl_rem_len = write_len;
913                 Insert->currpos += SizeOfXLogContRecord;
914                 freespace = INSERT_FREESPACE(Insert);
915         }
916
917         /* Ensure next record will be properly aligned */
918         Insert->currpos = (char *) Insert->currpage +
919                 MAXALIGN(Insert->currpos - (char *) Insert->currpage);
920         freespace = INSERT_FREESPACE(Insert);
921
922         /*
923          * The recptr I return is the beginning of the *next* record. This will be
924          * stored as LSN for changed data pages...
925          */
926         INSERT_RECPTR(RecPtr, Insert, curridx);
927
928         /* Need to update shared LogwrtRqst if some block was filled up */
929         if (freespace < SizeOfXLogRecord)
930                 updrqst = true;                 /* curridx is filled and available for writing
931                                                                  * out */
932         else
933                 curridx = PrevBufIdx(curridx);
934         WriteRqst = XLogCtl->xlblocks[curridx];
935
936         LWLockRelease(WALInsertLock);
937
938         if (updrqst)
939         {
940                 /* use volatile pointer to prevent code rearrangement */
941                 volatile XLogCtlData *xlogctl = XLogCtl;
942
943                 SpinLockAcquire(&xlogctl->info_lck);
944                 /* advance global request to include new block(s) */
945                 if (XLByteLT(xlogctl->LogwrtRqst.Write, WriteRqst))
946                         xlogctl->LogwrtRqst.Write = WriteRqst;
947                 /* update local result copy while I have the chance */
948                 LogwrtResult = xlogctl->LogwrtResult;
949                 SpinLockRelease(&xlogctl->info_lck);
950         }
951
952         ProcLastRecEnd = RecPtr;
953
954         END_CRIT_SECTION();
955
956         return RecPtr;
957 }
958
959 /*
960  * Determine whether the buffer referenced by an XLogRecData item has to
961  * be backed up, and if so fill a BkpBlock struct for it.  In any case
962  * save the buffer's LSN at *lsn.
963  */
964 static bool
965 XLogCheckBuffer(XLogRecData *rdata,
966                                 XLogRecPtr *lsn, BkpBlock *bkpb)
967 {
968         PageHeader      page;
969
970         page = (PageHeader) BufferGetBlock(rdata->buffer);
971
972         /*
973          * XXX We assume page LSN is first data on *every* page that can be passed
974          * to XLogInsert, whether it otherwise has the standard page layout or
975          * not.
976          */
977         *lsn = page->pd_lsn;
978
979         if (fullPageWrites &&
980                 XLByteLE(page->pd_lsn, RedoRecPtr))
981         {
982                 /*
983                  * The page needs to be backed up, so set up *bkpb
984                  */
985                 bkpb->node = BufferGetFileNode(rdata->buffer);
986                 bkpb->block = BufferGetBlockNumber(rdata->buffer);
987
988                 if (rdata->buffer_std)
989                 {
990                         /* Assume we can omit data between pd_lower and pd_upper */
991                         uint16          lower = page->pd_lower;
992                         uint16          upper = page->pd_upper;
993
994                         if (lower >= SizeOfPageHeaderData &&
995                                 upper > lower &&
996                                 upper <= BLCKSZ)
997                         {
998                                 bkpb->hole_offset = lower;
999                                 bkpb->hole_length = upper - lower;
1000                         }
1001                         else
1002                         {
1003                                 /* No "hole" to compress out */
1004                                 bkpb->hole_offset = 0;
1005                                 bkpb->hole_length = 0;
1006                         }
1007                 }
1008                 else
1009                 {
1010                         /* Not a standard page header, don't try to eliminate "hole" */
1011                         bkpb->hole_offset = 0;
1012                         bkpb->hole_length = 0;
1013                 }
1014
1015                 return true;                    /* buffer requires backup */
1016         }
1017
1018         return false;                           /* buffer does not need to be backed up */
1019 }
1020
1021 /*
1022  * XLogArchiveNotify
1023  *
1024  * Create an archive notification file
1025  *
1026  * The name of the notification file is the message that will be picked up
1027  * by the archiver, e.g. we write 0000000100000001000000C6.ready
1028  * and the archiver then knows to archive XLOGDIR/0000000100000001000000C6,
1029  * then when complete, rename it to 0000000100000001000000C6.done
1030  */
1031 static void
1032 XLogArchiveNotify(const char *xlog)
1033 {
1034         char            archiveStatusPath[MAXPGPATH];
1035         FILE       *fd;
1036
1037         /* insert an otherwise empty file called <XLOG>.ready */
1038         StatusFilePath(archiveStatusPath, xlog, ".ready");
1039         fd = AllocateFile(archiveStatusPath, "w");
1040         if (fd == NULL)
1041         {
1042                 ereport(LOG,
1043                                 (errcode_for_file_access(),
1044                                  errmsg("could not create archive status file \"%s\": %m",
1045                                                 archiveStatusPath)));
1046                 return;
1047         }
1048         if (FreeFile(fd))
1049         {
1050                 ereport(LOG,
1051                                 (errcode_for_file_access(),
1052                                  errmsg("could not write archive status file \"%s\": %m",
1053                                                 archiveStatusPath)));
1054                 return;
1055         }
1056
1057         /* Notify archiver that it's got something to do */
1058         if (IsUnderPostmaster)
1059                 SendPostmasterSignal(PMSIGNAL_WAKEN_ARCHIVER);
1060 }
1061
1062 /*
1063  * Convenience routine to notify using log/seg representation of filename
1064  */
1065 static void
1066 XLogArchiveNotifySeg(uint32 log, uint32 seg)
1067 {
1068         char            xlog[MAXFNAMELEN];
1069
1070         XLogFileName(xlog, ThisTimeLineID, log, seg);
1071         XLogArchiveNotify(xlog);
1072 }
1073
1074 /*
1075  * XLogArchiveIsDone
1076  *
1077  * Checks for a ".done" archive notification file.      This is called when we
1078  * are ready to delete or recycle an old XLOG segment file.  If it is okay
1079  * to delete it then return true.
1080  *
1081  * If <XLOG>.done exists, then return true; else if <XLOG>.ready exists,
1082  * then return false; else create <XLOG>.ready and return false.  The
1083  * last case covers the possibility that the original attempt to create
1084  * <XLOG>.ready failed.
1085  */
1086 static bool
1087 XLogArchiveIsDone(const char *xlog)
1088 {
1089         char            archiveStatusPath[MAXPGPATH];
1090         struct stat stat_buf;
1091
1092         /* First check for .done --- this is the expected case */
1093         StatusFilePath(archiveStatusPath, xlog, ".done");
1094         if (stat(archiveStatusPath, &stat_buf) == 0)
1095                 return true;
1096
1097         /* check for .ready --- this means archiver is still busy with it */
1098         StatusFilePath(archiveStatusPath, xlog, ".ready");
1099         if (stat(archiveStatusPath, &stat_buf) == 0)
1100                 return false;
1101
1102         /* Race condition --- maybe archiver just finished, so recheck */
1103         StatusFilePath(archiveStatusPath, xlog, ".done");
1104         if (stat(archiveStatusPath, &stat_buf) == 0)
1105                 return true;
1106
1107         /* Retry creation of the .ready file */
1108         XLogArchiveNotify(xlog);
1109         return false;
1110 }
1111
1112 /*
1113  * XLogArchiveCleanup
1114  *
1115  * Cleanup archive notification file(s) for a particular xlog segment
1116  */
1117 static void
1118 XLogArchiveCleanup(const char *xlog)
1119 {
1120         char            archiveStatusPath[MAXPGPATH];
1121
1122         /* Remove the .done file */
1123         StatusFilePath(archiveStatusPath, xlog, ".done");
1124         unlink(archiveStatusPath);
1125         /* should we complain about failure? */
1126
1127         /* Remove the .ready file if present --- normally it shouldn't be */
1128         StatusFilePath(archiveStatusPath, xlog, ".ready");
1129         unlink(archiveStatusPath);
1130         /* should we complain about failure? */
1131 }
1132
1133 /*
1134  * Advance the Insert state to the next buffer page, writing out the next
1135  * buffer if it still contains unwritten data.
1136  *
1137  * The global LogwrtRqst.Write pointer needs to be advanced to include the
1138  * just-filled page.  If we can do this for free (without an extra lock),
1139  * we do so here.  Otherwise the caller must do it.  We return TRUE if the
1140  * request update still needs to be done, FALSE if we did it internally.
1141  *
1142  * Must be called with WALInsertLock held.
1143  */
1144 static bool
1145 AdvanceXLInsertBuffer(void)
1146 {
1147         XLogCtlInsert *Insert = &XLogCtl->Insert;
1148         XLogCtlWrite *Write = &XLogCtl->Write;
1149         int                     nextidx = NextBufIdx(Insert->curridx);
1150         bool            update_needed = true;
1151         XLogRecPtr      OldPageRqstPtr;
1152         XLogwrtRqst WriteRqst;
1153         XLogRecPtr      NewPageEndPtr;
1154         XLogPageHeader NewPage;
1155
1156         /* Use Insert->LogwrtResult copy if it's more fresh */
1157         if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write))
1158                 LogwrtResult = Insert->LogwrtResult;
1159
1160         /*
1161          * Get ending-offset of the buffer page we need to replace (this may be
1162          * zero if the buffer hasn't been used yet).  Fall through if it's already
1163          * written out.
1164          */
1165         OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
1166         if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
1167         {
1168                 /* nope, got work to do... */
1169                 XLogRecPtr      FinishedPageRqstPtr;
1170
1171                 FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1172
1173                 /* Before waiting, get info_lck and update LogwrtResult */
1174                 {
1175                         /* use volatile pointer to prevent code rearrangement */
1176                         volatile XLogCtlData *xlogctl = XLogCtl;
1177
1178                         SpinLockAcquire(&xlogctl->info_lck);
1179                         if (XLByteLT(xlogctl->LogwrtRqst.Write, FinishedPageRqstPtr))
1180                                 xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr;
1181                         LogwrtResult = xlogctl->LogwrtResult;
1182                         SpinLockRelease(&xlogctl->info_lck);
1183                 }
1184
1185                 update_needed = false;  /* Did the shared-request update */
1186
1187                 if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
1188                 {
1189                         /* OK, someone wrote it already */
1190                         Insert->LogwrtResult = LogwrtResult;
1191                 }
1192                 else
1193                 {
1194                         /* Must acquire write lock */
1195                         LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
1196                         LogwrtResult = Write->LogwrtResult;
1197                         if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
1198                         {
1199                                 /* OK, someone wrote it already */
1200                                 LWLockRelease(WALWriteLock);
1201                                 Insert->LogwrtResult = LogwrtResult;
1202                         }
1203                         else
1204                         {
1205                                 /*
1206                                  * Have to write buffers while holding insert lock. This is
1207                                  * not good, so only write as much as we absolutely must.
1208                                  */
1209                                 WriteRqst.Write = OldPageRqstPtr;
1210                                 WriteRqst.Flush.xlogid = 0;
1211                                 WriteRqst.Flush.xrecoff = 0;
1212                                 XLogWrite(WriteRqst, false);
1213                                 LWLockRelease(WALWriteLock);
1214                                 Insert->LogwrtResult = LogwrtResult;
1215                         }
1216                 }
1217         }
1218
1219         /*
1220          * Now the next buffer slot is free and we can set it up to be the next
1221          * output page.
1222          */
1223         NewPageEndPtr = XLogCtl->xlblocks[Insert->curridx];
1224         if (NewPageEndPtr.xrecoff >= XLogFileSize)
1225         {
1226                 /* crossing a logid boundary */
1227                 NewPageEndPtr.xlogid += 1;
1228                 NewPageEndPtr.xrecoff = BLCKSZ;
1229         }
1230         else
1231                 NewPageEndPtr.xrecoff += BLCKSZ;
1232         XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
1233         NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) BLCKSZ);
1234
1235         Insert->curridx = nextidx;
1236         Insert->currpage = NewPage;
1237
1238         Insert->currpos = ((char *) NewPage) +SizeOfXLogShortPHD;
1239
1240         /*
1241          * Be sure to re-zero the buffer so that bytes beyond what we've written
1242          * will look like zeroes and not valid XLOG records...
1243          */
1244         MemSet((char *) NewPage, 0, BLCKSZ);
1245
1246         /*
1247          * Fill the new page's header
1248          */
1249         NewPage   ->xlp_magic = XLOG_PAGE_MAGIC;
1250
1251         /* NewPage->xlp_info = 0; */    /* done by memset */
1252         NewPage   ->xlp_tli = ThisTimeLineID;
1253         NewPage   ->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid;
1254         NewPage   ->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - BLCKSZ;
1255
1256         /*
1257          * If first page of an XLOG segment file, make it a long header.
1258          */
1259         if ((NewPage->xlp_pageaddr.xrecoff % XLogSegSize) == 0)
1260         {
1261                 XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;
1262
1263                 NewLongPage->xlp_sysid = ControlFile->system_identifier;
1264                 NewLongPage->xlp_seg_size = XLogSegSize;
1265                 NewPage   ->xlp_info |= XLP_LONG_HEADER;
1266
1267                 Insert->currpos = ((char *) NewPage) +SizeOfXLogLongPHD;
1268         }
1269
1270         return update_needed;
1271 }
1272
1273 /*
1274  * Write and/or fsync the log at least as far as WriteRqst indicates.
1275  *
1276  * If flexible == TRUE, we don't have to write as far as WriteRqst, but
1277  * may stop at any convenient boundary (such as a cache or logfile boundary).
1278  * This option allows us to avoid uselessly issuing multiple writes when a
1279  * single one would do.
1280  *
1281  * Must be called with WALWriteLock held.
1282  */
1283 static void
1284 XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
1285 {
1286         XLogCtlWrite *Write = &XLogCtl->Write;
1287         bool            ispartialpage;
1288         bool            finishing_seg;
1289         bool            use_existent;
1290         int                     curridx;
1291         int                     npages;
1292         int                     startidx;
1293         uint32          startoffset;
1294
1295         /* We should always be inside a critical section here */
1296         Assert(CritSectionCount > 0);
1297
1298         /*
1299          * Update local LogwrtResult (caller probably did this already, but...)
1300          */
1301         LogwrtResult = Write->LogwrtResult;
1302
1303         /*
1304          * Since successive pages in the xlog cache are consecutively allocated,
1305          * we can usually gather multiple pages together and issue just one
1306          * write() call.  npages is the number of pages we have determined can be
1307          * written together; startidx is the cache block index of the first one,
1308          * and startoffset is the file offset at which it should go. The latter
1309          * two variables are only valid when npages > 0, but we must initialize
1310          * all of them to keep the compiler quiet.
1311          */
1312         npages = 0;
1313         startidx = 0;
1314         startoffset = 0;
1315
1316         /*
1317          * Within the loop, curridx is the cache block index of the page to
1318          * consider writing.  We advance Write->curridx only after successfully
1319          * writing pages.  (Right now, this refinement is useless since we are
1320          * going to PANIC if any error occurs anyway; but someday it may come in
1321          * useful.)
1322          */
1323         curridx = Write->curridx;
1324
1325         while (XLByteLT(LogwrtResult.Write, WriteRqst.Write))
1326         {
1327                 /*
1328                  * Make sure we're not ahead of the insert process.  This could happen
1329                  * if we're passed a bogus WriteRqst.Write that is past the end of the
1330                  * last page that's been initialized by AdvanceXLInsertBuffer.
1331                  */
1332                 if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[curridx]))
1333                         elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
1334                                  LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
1335                                  XLogCtl->xlblocks[curridx].xlogid,
1336                                  XLogCtl->xlblocks[curridx].xrecoff);
1337
1338                 /* Advance LogwrtResult.Write to end of current buffer page */
1339                 LogwrtResult.Write = XLogCtl->xlblocks[curridx];
1340                 ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);
1341
1342                 if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
1343                 {
1344                         /*
1345                          * Switch to new logfile segment.  We cannot have any pending
1346                          * pages here (since we dump what we have at segment end).
1347                          */
1348                         Assert(npages == 0);
1349                         if (openLogFile >= 0)
1350                         {
1351                                 if (close(openLogFile))
1352                                         ereport(PANIC,
1353                                                         (errcode_for_file_access(),
1354                                                 errmsg("could not close log file %u, segment %u: %m",
1355                                                            openLogId, openLogSeg)));
1356                                 openLogFile = -1;
1357                         }
1358                         XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1359
1360                         /* create/use new log file */
1361                         use_existent = true;
1362                         openLogFile = XLogFileInit(openLogId, openLogSeg,
1363                                                                            &use_existent, true);
1364                         openLogOff = 0;
1365
1366                         /* update pg_control, unless someone else already did */
1367                         LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
1368                         if (ControlFile->logId < openLogId ||
1369                                 (ControlFile->logId == openLogId &&
1370                                  ControlFile->logSeg < openLogSeg + 1))
1371                         {
1372                                 ControlFile->logId = openLogId;
1373                                 ControlFile->logSeg = openLogSeg + 1;
1374                                 ControlFile->time = time(NULL);
1375                                 UpdateControlFile();
1376
1377                                 /*
1378                                  * Signal bgwriter to start a checkpoint if it's been too long
1379                                  * since the last one.  (We look at local copy of RedoRecPtr
1380                                  * which might be a little out of date, but should be close
1381                                  * enough for this purpose.)
1382                                  *
1383                                  * A straight computation of segment number could overflow 32
1384                                  * bits.  Rather than assuming we have working 64-bit
1385                                  * arithmetic, we compare the highest-order bits separately,
1386                                  * and force a checkpoint immediately when they change.
1387                                  */
1388                                 if (IsUnderPostmaster)
1389                                 {
1390                                         uint32          old_segno,
1391                                                                 new_segno;
1392                                         uint32          old_highbits,
1393                                                                 new_highbits;
1394
1395                                         old_segno = (RedoRecPtr.xlogid % XLogSegSize) * XLogSegsPerFile +
1396                                                 (RedoRecPtr.xrecoff / XLogSegSize);
1397                                         old_highbits = RedoRecPtr.xlogid / XLogSegSize;
1398                                         new_segno = (openLogId % XLogSegSize) * XLogSegsPerFile +
1399                                                 openLogSeg;
1400                                         new_highbits = openLogId / XLogSegSize;
1401                                         if (new_highbits != old_highbits ||
1402                                                 new_segno >= old_segno + (uint32) CheckPointSegments)
1403                                         {
1404 #ifdef WAL_DEBUG
1405                                                 if (XLOG_DEBUG)
1406                                                         elog(LOG, "time for a checkpoint, signaling bgwriter");
1407 #endif
1408                                                 RequestCheckpoint(false, true);
1409                                         }
1410                                 }
1411                         }
1412                         LWLockRelease(ControlFileLock);
1413                 }
1414
1415                 /* Make sure we have the current logfile open */
1416                 if (openLogFile < 0)
1417                 {
1418                         XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1419                         openLogFile = XLogFileOpen(openLogId, openLogSeg);
1420                         openLogOff = 0;
1421                 }
1422
1423                 /* Add current page to the set of pending pages-to-dump */
1424                 if (npages == 0)
1425                 {
1426                         /* first of group */
1427                         startidx = curridx;
1428                         startoffset = (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize;
1429                 }
1430                 npages++;
1431
1432                 /*
1433                  * Dump the set if this will be the last loop iteration, or if we are
1434                  * at the last page of the cache area (since the next page won't be
1435                  * contiguous in memory), or if we are at the end of the logfile
1436                  * segment.
1437                  */
1438                 finishing_seg = !ispartialpage &&
1439                         (startoffset + npages * BLCKSZ) >= XLogSegSize;
1440
1441                 if (!XLByteLT(LogwrtResult.Write, WriteRqst.Write) ||
1442                         curridx == XLogCtl->XLogCacheBlck ||
1443                         finishing_seg)
1444                 {
1445                         char       *from;
1446                         Size            nbytes;
1447
1448                         /* Need to seek in the file? */
1449                         if (openLogOff != startoffset)
1450                         {
1451                                 if (lseek(openLogFile, (off_t) startoffset, SEEK_SET) < 0)
1452                                         ereport(PANIC,
1453                                                         (errcode_for_file_access(),
1454                                                          errmsg("could not seek in log file %u, "
1455                                                                         "segment %u to offset %u: %m",
1456                                                                         openLogId, openLogSeg, startoffset)));
1457                                 openLogOff = startoffset;
1458                         }
1459
1460                         /* OK to write the page(s) */
1461                         from = XLogCtl->pages + startidx * (Size) BLCKSZ;
1462                         nbytes = npages * (Size) BLCKSZ;
1463                         errno = 0;
1464                         if (write(openLogFile, from, nbytes) != nbytes)
1465                         {
1466                                 /* if write didn't set errno, assume no disk space */
1467                                 if (errno == 0)
1468                                         errno = ENOSPC;
1469                                 ereport(PANIC,
1470                                                 (errcode_for_file_access(),
1471                                                  errmsg("could not write to log file %u, segment %u "
1472                                                                 "at offset %u, length %lu: %m",
1473                                                                 openLogId, openLogSeg,
1474                                                                 openLogOff, (unsigned long) nbytes)));
1475                         }
1476
1477                         /* Update state for write */
1478                         openLogOff += nbytes;
1479                         Write->curridx = ispartialpage ? curridx : NextBufIdx(curridx);
1480                         npages = 0;
1481
1482                         /*
1483                          * If we just wrote the whole last page of a logfile segment,
1484                          * fsync the segment immediately.  This avoids having to go back
1485                          * and re-open prior segments when an fsync request comes along
1486                          * later. Doing it here ensures that one and only one backend will
1487                          * perform this fsync.
1488                          *
1489                          * This is also the right place to notify the Archiver that the
1490                          * segment is ready to copy to archival storage.
1491                          */
1492                         if (finishing_seg)
1493                         {
1494                                 issue_xlog_fsync();
1495                                 LogwrtResult.Flush = LogwrtResult.Write;                /* end of page */
1496
1497                                 if (XLogArchivingActive())
1498                                         XLogArchiveNotifySeg(openLogId, openLogSeg);
1499                         }
1500                 }
1501
1502                 if (ispartialpage)
1503                 {
1504                         /* Only asked to write a partial page */
1505                         LogwrtResult.Write = WriteRqst.Write;
1506                         break;
1507                 }
1508                 curridx = NextBufIdx(curridx);
1509
1510                 /* If flexible, break out of loop as soon as we wrote something */
1511                 if (flexible && npages == 0)
1512                         break;
1513         }
1514
1515         Assert(npages == 0);
1516         Assert(curridx == Write->curridx);
1517
1518         /*
1519          * If asked to flush, do so
1520          */
1521         if (XLByteLT(LogwrtResult.Flush, WriteRqst.Flush) &&
1522                 XLByteLT(LogwrtResult.Flush, LogwrtResult.Write))
1523         {
1524                 /*
1525                  * Could get here without iterating above loop, in which case we might
1526                  * have no open file or the wrong one.  However, we do not need to
1527                  * fsync more than one file.
1528                  */
1529                 if (sync_method != SYNC_METHOD_OPEN)
1530                 {
1531                         if (openLogFile >= 0 &&
1532                                 !XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
1533                         {
1534                                 if (close(openLogFile))
1535                                         ereport(PANIC,
1536                                                         (errcode_for_file_access(),
1537                                                 errmsg("could not close log file %u, segment %u: %m",
1538                                                            openLogId, openLogSeg)));
1539                                 openLogFile = -1;
1540                         }
1541                         if (openLogFile < 0)
1542                         {
1543                                 XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1544                                 openLogFile = XLogFileOpen(openLogId, openLogSeg);
1545                                 openLogOff = 0;
1546                         }
1547                         issue_xlog_fsync();
1548                 }
1549                 LogwrtResult.Flush = LogwrtResult.Write;
1550         }
1551
1552         /*
1553          * Update shared-memory status
1554          *
1555          * We make sure that the shared 'request' values do not fall behind the
1556          * 'result' values.  This is not absolutely essential, but it saves some
1557          * code in a couple of places.
1558          */
1559         {
1560                 /* use volatile pointer to prevent code rearrangement */
1561                 volatile XLogCtlData *xlogctl = XLogCtl;
1562
1563                 SpinLockAcquire(&xlogctl->info_lck);
1564                 xlogctl->LogwrtResult = LogwrtResult;
1565                 if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
1566                         xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
1567                 if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
1568                         xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
1569                 SpinLockRelease(&xlogctl->info_lck);
1570         }
1571
1572         Write->LogwrtResult = LogwrtResult;
1573 }
1574
1575 /*
1576  * Ensure that all XLOG data through the given position is flushed to disk.
1577  *
1578  * NOTE: this differs from XLogWrite mainly in that the WALWriteLock is not
1579  * already held, and we try to avoid acquiring it if possible.
1580  */
1581 void
1582 XLogFlush(XLogRecPtr record)
1583 {
1584         XLogRecPtr      WriteRqstPtr;
1585         XLogwrtRqst WriteRqst;
1586
1587         /* Disabled during REDO */
1588         if (InRedo)
1589                 return;
1590
1591         /* Quick exit if already known flushed */
1592         if (XLByteLE(record, LogwrtResult.Flush))
1593                 return;
1594
1595 #ifdef WAL_DEBUG
1596         if (XLOG_DEBUG)
1597                 elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X",
1598                          record.xlogid, record.xrecoff,
1599                          LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
1600                          LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
1601 #endif
1602
1603         START_CRIT_SECTION();
1604
1605         /*
1606          * Since fsync is usually a horribly expensive operation, we try to
1607          * piggyback as much data as we can on each fsync: if we see any more data
1608          * entered into the xlog buffer, we'll write and fsync that too, so that
1609          * the final value of LogwrtResult.Flush is as large as possible. This
1610          * gives us some chance of avoiding another fsync immediately after.
1611          */
1612
1613         /* initialize to given target; may increase below */
1614         WriteRqstPtr = record;
1615
1616         /* read LogwrtResult and update local state */
1617         {
1618                 /* use volatile pointer to prevent code rearrangement */
1619                 volatile XLogCtlData *xlogctl = XLogCtl;
1620
1621                 SpinLockAcquire(&xlogctl->info_lck);
1622                 if (XLByteLT(WriteRqstPtr, xlogctl->LogwrtRqst.Write))
1623                         WriteRqstPtr = xlogctl->LogwrtRqst.Write;
1624                 LogwrtResult = xlogctl->LogwrtResult;
1625                 SpinLockRelease(&xlogctl->info_lck);
1626         }
1627
1628         /* done already? */
1629         if (!XLByteLE(record, LogwrtResult.Flush))
1630         {
1631                 /* now wait for the write lock */
1632                 LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
1633                 LogwrtResult = XLogCtl->Write.LogwrtResult;
1634                 if (!XLByteLE(record, LogwrtResult.Flush))
1635                 {
1636                         /* try to write/flush later additions to XLOG as well */
1637                         if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE))
1638                         {
1639                                 XLogCtlInsert *Insert = &XLogCtl->Insert;
1640                                 uint32          freespace = INSERT_FREESPACE(Insert);
1641
1642                                 if (freespace < SizeOfXLogRecord)               /* buffer is full */
1643                                         WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1644                                 else
1645                                 {
1646                                         WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1647                                         WriteRqstPtr.xrecoff -= freespace;
1648                                 }
1649                                 LWLockRelease(WALInsertLock);
1650                                 WriteRqst.Write = WriteRqstPtr;
1651                                 WriteRqst.Flush = WriteRqstPtr;
1652                         }
1653                         else
1654                         {
1655                                 WriteRqst.Write = WriteRqstPtr;
1656                                 WriteRqst.Flush = record;
1657                         }
1658                         XLogWrite(WriteRqst, false);
1659                 }
1660                 LWLockRelease(WALWriteLock);
1661         }
1662
1663         END_CRIT_SECTION();
1664
1665         /*
1666          * If we still haven't flushed to the request point then we have a
1667          * problem; most likely, the requested flush point is past end of XLOG.
1668          * This has been seen to occur when a disk page has a corrupted LSN.
1669          *
1670          * Formerly we treated this as a PANIC condition, but that hurts the
1671          * system's robustness rather than helping it: we do not want to take down
1672          * the whole system due to corruption on one data page.  In particular, if
1673          * the bad page is encountered again during recovery then we would be
1674          * unable to restart the database at all!  (This scenario has actually
1675          * happened in the field several times with 7.1 releases. Note that we
1676          * cannot get here while InRedo is true, but if the bad page is brought in
1677          * and marked dirty during recovery then CreateCheckPoint will try to
1678          * flush it at the end of recovery.)
1679          *
1680          * The current approach is to ERROR under normal conditions, but only
1681          * WARNING during recovery, so that the system can be brought up even if
1682          * there's a corrupt LSN.  Note that for calls from xact.c, the ERROR will
1683          * be promoted to PANIC since xact.c calls this routine inside a critical
1684          * section.  However, calls from bufmgr.c are not within critical sections
1685          * and so we will not force a restart for a bad LSN on a data page.
1686          */
1687         if (XLByteLT(LogwrtResult.Flush, record))
1688                 elog(InRecovery ? WARNING : ERROR,
1689                 "xlog flush request %X/%X is not satisfied --- flushed only to %X/%X",
1690                          record.xlogid, record.xrecoff,
1691                          LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
1692 }
1693
1694 /*
1695  * Create a new XLOG file segment, or open a pre-existing one.
1696  *
1697  * log, seg: identify segment to be created/opened.
1698  *
1699  * *use_existent: if TRUE, OK to use a pre-existing file (else, any
1700  * pre-existing file will be deleted).  On return, TRUE if a pre-existing
1701  * file was used.
1702  *
1703  * use_lock: if TRUE, acquire ControlFileLock while moving file into
1704  * place.  This should be TRUE except during bootstrap log creation.  The
1705  * caller must *not* hold the lock at call.
1706  *
1707  * Returns FD of opened file.
1708  *
1709  * Note: errors here are ERROR not PANIC because we might or might not be
1710  * inside a critical section (eg, during checkpoint there is no reason to
1711  * take down the system on failure).  They will promote to PANIC if we are
1712  * in a critical section.
1713  */
1714 static int
1715 XLogFileInit(uint32 log, uint32 seg,
1716                          bool *use_existent, bool use_lock)
1717 {
1718         char            path[MAXPGPATH];
1719         char            tmppath[MAXPGPATH];
1720         char            zbuffer[BLCKSZ];
1721         uint32          installed_log;
1722         uint32          installed_seg;
1723         int                     max_advance;
1724         int                     fd;
1725         int                     nbytes;
1726
1727         XLogFilePath(path, ThisTimeLineID, log, seg);
1728
1729         /*
1730          * Try to use existent file (checkpoint maker may have created it already)
1731          */
1732         if (*use_existent)
1733         {
1734                 fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
1735                                                    S_IRUSR | S_IWUSR);
1736                 if (fd < 0)
1737                 {
1738                         if (errno != ENOENT)
1739                                 ereport(ERROR,
1740                                                 (errcode_for_file_access(),
1741                                                  errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
1742                                                                 path, log, seg)));
1743                 }
1744                 else
1745                         return fd;
1746         }
1747
1748         /*
1749          * Initialize an empty (all zeroes) segment.  NOTE: it is possible that
1750          * another process is doing the same thing.  If so, we will end up
1751          * pre-creating an extra log segment.  That seems OK, and better than
1752          * holding the lock throughout this lengthy process.
1753          */
1754         snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
1755
1756         unlink(tmppath);
1757
1758         /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
1759         fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
1760                                            S_IRUSR | S_IWUSR);
1761         if (fd < 0)
1762                 ereport(ERROR,
1763                                 (errcode_for_file_access(),
1764                                  errmsg("could not create file \"%s\": %m", tmppath)));
1765
1766         /*
1767          * Zero-fill the file.  We have to do this the hard way to ensure that all
1768          * the file space has really been allocated --- on platforms that allow
1769          * "holes" in files, just seeking to the end doesn't allocate intermediate
1770          * space.  This way, we know that we have all the space and (after the
1771          * fsync below) that all the indirect blocks are down on disk.  Therefore,
1772          * fdatasync(2) or O_DSYNC will be sufficient to sync future writes to the
1773          * log file.
1774          */
1775         MemSet(zbuffer, 0, sizeof(zbuffer));
1776         for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(zbuffer))
1777         {
1778                 errno = 0;
1779                 if ((int) write(fd, zbuffer, sizeof(zbuffer)) != (int) sizeof(zbuffer))
1780                 {
1781                         int                     save_errno = errno;
1782
1783                         /*
1784                          * If we fail to make the file, delete it to release disk space
1785                          */
1786                         unlink(tmppath);
1787                         /* if write didn't set errno, assume problem is no disk space */
1788                         errno = save_errno ? save_errno : ENOSPC;
1789
1790                         ereport(ERROR,
1791                                         (errcode_for_file_access(),
1792                                          errmsg("could not write to file \"%s\": %m", tmppath)));
1793                 }
1794         }
1795
1796         if (pg_fsync(fd) != 0)
1797                 ereport(ERROR,
1798                                 (errcode_for_file_access(),
1799                                  errmsg("could not fsync file \"%s\": %m", tmppath)));
1800
1801         if (close(fd))
1802                 ereport(ERROR,
1803                                 (errcode_for_file_access(),
1804                                  errmsg("could not close file \"%s\": %m", tmppath)));
1805
1806         /*
1807          * Now move the segment into place with its final name.
1808          *
1809          * If caller didn't want to use a pre-existing file, get rid of any
1810          * pre-existing file.  Otherwise, cope with possibility that someone else
1811          * has created the file while we were filling ours: if so, use ours to
1812          * pre-create a future log segment.
1813          */
1814         installed_log = log;
1815         installed_seg = seg;
1816         max_advance = XLOGfileslop;
1817         if (!InstallXLogFileSegment(&installed_log, &installed_seg, tmppath,
1818                                                                 *use_existent, &max_advance,
1819                                                                 use_lock))
1820         {
1821                 /* No need for any more future segments... */
1822                 unlink(tmppath);
1823         }
1824
1825         /* Set flag to tell caller there was no existent file */
1826         *use_existent = false;
1827
1828         /* Now open original target segment (might not be file I just made) */
1829         fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
1830                                            S_IRUSR | S_IWUSR);
1831         if (fd < 0)
1832                 ereport(ERROR,
1833                                 (errcode_for_file_access(),
1834                    errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
1835                                   path, log, seg)));
1836
1837         return fd;
1838 }
1839
1840 /*
1841  * Create a new XLOG file segment by copying a pre-existing one.
1842  *
1843  * log, seg: identify segment to be created.
1844  *
1845  * srcTLI, srclog, srcseg: identify segment to be copied (could be from
1846  *              a different timeline)
1847  *
1848  * Currently this is only used during recovery, and so there are no locking
1849  * considerations.      But we should be just as tense as XLogFileInit to avoid
1850  * emplacing a bogus file.
1851  */
1852 static void
1853 XLogFileCopy(uint32 log, uint32 seg,
1854                          TimeLineID srcTLI, uint32 srclog, uint32 srcseg)
1855 {
1856         char            path[MAXPGPATH];
1857         char            tmppath[MAXPGPATH];
1858         char            buffer[BLCKSZ];
1859         int                     srcfd;
1860         int                     fd;
1861         int                     nbytes;
1862
1863         /*
1864          * Open the source file
1865          */
1866         XLogFilePath(path, srcTLI, srclog, srcseg);
1867         srcfd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
1868         if (srcfd < 0)
1869                 ereport(ERROR,
1870                                 (errcode_for_file_access(),
1871                                  errmsg("could not open file \"%s\": %m", path)));
1872
1873         /*
1874          * Copy into a temp file name.
1875          */
1876         snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
1877
1878         unlink(tmppath);
1879
1880         /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
1881         fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
1882                                            S_IRUSR | S_IWUSR);
1883         if (fd < 0)
1884                 ereport(ERROR,
1885                                 (errcode_for_file_access(),
1886                                  errmsg("could not create file \"%s\": %m", tmppath)));
1887
1888         /*
1889          * Do the data copying.
1890          */
1891         for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(buffer))
1892         {
1893                 errno = 0;
1894                 if ((int) read(srcfd, buffer, sizeof(buffer)) != (int) sizeof(buffer))
1895                 {
1896                         if (errno != 0)
1897                                 ereport(ERROR,
1898                                                 (errcode_for_file_access(),
1899                                                  errmsg("could not read file \"%s\": %m", path)));
1900                         else
1901                                 ereport(ERROR,
1902                                                 (errmsg("not enough data in file \"%s\"", path)));
1903                 }
1904                 errno = 0;
1905                 if ((int) write(fd, buffer, sizeof(buffer)) != (int) sizeof(buffer))
1906                 {
1907                         int                     save_errno = errno;
1908
1909                         /*
1910                          * If we fail to make the file, delete it to release disk space
1911                          */
1912                         unlink(tmppath);
1913                         /* if write didn't set errno, assume problem is no disk space */
1914                         errno = save_errno ? save_errno : ENOSPC;
1915
1916                         ereport(ERROR,
1917                                         (errcode_for_file_access(),
1918                                          errmsg("could not write to file \"%s\": %m", tmppath)));
1919                 }
1920         }
1921
1922         if (pg_fsync(fd) != 0)
1923                 ereport(ERROR,
1924                                 (errcode_for_file_access(),
1925                                  errmsg("could not fsync file \"%s\": %m", tmppath)));
1926
1927         if (close(fd))
1928                 ereport(ERROR,
1929                                 (errcode_for_file_access(),
1930                                  errmsg("could not close file \"%s\": %m", tmppath)));
1931
1932         close(srcfd);
1933
1934         /*
1935          * Now move the segment into place with its final name.
1936          */
1937         if (!InstallXLogFileSegment(&log, &seg, tmppath, false, NULL, false))
1938                 elog(ERROR, "InstallXLogFileSegment should not have failed");
1939 }
1940
1941 /*
1942  * Install a new XLOG segment file as a current or future log segment.
1943  *
1944  * This is used both to install a newly-created segment (which has a temp
1945  * filename while it's being created) and to recycle an old segment.
1946  *
1947  * *log, *seg: identify segment to install as (or first possible target).
1948  * When find_free is TRUE, these are modified on return to indicate the
1949  * actual installation location or last segment searched.
1950  *
1951  * tmppath: initial name of file to install.  It will be renamed into place.
1952  *
1953  * find_free: if TRUE, install the new segment at the first empty log/seg
1954  * number at or after the passed numbers.  If FALSE, install the new segment
1955  * exactly where specified, deleting any existing segment file there.
1956  *
1957  * *max_advance: maximum number of log/seg slots to advance past the starting
1958  * point.  Fail if no free slot is found in this range.  On return, reduced
1959  * by the number of slots skipped over.  (Irrelevant, and may be NULL,
1960  * when find_free is FALSE.)
1961  *
1962  * use_lock: if TRUE, acquire ControlFileLock while moving file into
1963  * place.  This should be TRUE except during bootstrap log creation.  The
1964  * caller must *not* hold the lock at call.
1965  *
1966  * Returns TRUE if file installed, FALSE if not installed because of
1967  * exceeding max_advance limit.  (Any other kind of failure causes ereport().)
1968  */
1969 static bool
1970 InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
1971                                            bool find_free, int *max_advance,
1972                                            bool use_lock)
1973 {
1974         char            path[MAXPGPATH];
1975         struct stat stat_buf;
1976
1977         XLogFilePath(path, ThisTimeLineID, *log, *seg);
1978
1979         /*
1980          * We want to be sure that only one process does this at a time.
1981          */
1982         if (use_lock)
1983                 LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
1984
1985         if (!find_free)
1986         {
1987                 /* Force installation: get rid of any pre-existing segment file */
1988                 unlink(path);
1989         }
1990         else
1991         {
1992                 /* Find a free slot to put it in */
1993                 while (stat(path, &stat_buf) == 0)
1994                 {
1995                         if (*max_advance <= 0)
1996                         {
1997                                 /* Failed to find a free slot within specified range */
1998                                 if (use_lock)
1999                                         LWLockRelease(ControlFileLock);
2000                                 return false;
2001                         }
2002                         NextLogSeg(*log, *seg);
2003                         (*max_advance)--;
2004                         XLogFilePath(path, ThisTimeLineID, *log, *seg);
2005                 }
2006         }
2007
2008         /*
2009          * Prefer link() to rename() here just to be really sure that we don't
2010          * overwrite an existing logfile.  However, there shouldn't be one, so
2011          * rename() is an acceptable substitute except for the truly paranoid.
2012          */
2013 #if HAVE_WORKING_LINK
2014         if (link(tmppath, path) < 0)
2015                 ereport(ERROR,
2016                                 (errcode_for_file_access(),
2017                                  errmsg("could not link file \"%s\" to \"%s\" (initialization of log file %u, segment %u): %m",
2018                                                 tmppath, path, *log, *seg)));
2019         unlink(tmppath);
2020 #else
2021         if (rename(tmppath, path) < 0)
2022                 ereport(ERROR,
2023                                 (errcode_for_file_access(),
2024                                  errmsg("could not rename file \"%s\" to \"%s\" (initialization of log file %u, segment %u): %m",
2025                                                 tmppath, path, *log, *seg)));
2026 #endif
2027
2028         if (use_lock)
2029                 LWLockRelease(ControlFileLock);
2030
2031         return true;
2032 }
2033
2034 /*
2035  * Open a pre-existing logfile segment for writing.
2036  */
2037 static int
2038 XLogFileOpen(uint32 log, uint32 seg)
2039 {
2040         char            path[MAXPGPATH];
2041         int                     fd;
2042
2043         XLogFilePath(path, ThisTimeLineID, log, seg);
2044
2045         fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
2046                                            S_IRUSR | S_IWUSR);
2047         if (fd < 0)
2048                 ereport(PANIC,
2049                                 (errcode_for_file_access(),
2050                    errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2051                                   path, log, seg)));
2052
2053         return fd;
2054 }
2055
2056 /*
2057  * Open a logfile segment for reading (during recovery).
2058  */
2059 static int
2060 XLogFileRead(uint32 log, uint32 seg, int emode)
2061 {
2062         char            path[MAXPGPATH];
2063         char            xlogfname[MAXFNAMELEN];
2064         ListCell   *cell;
2065         int                     fd;
2066
2067         /*
2068          * Loop looking for a suitable timeline ID: we might need to read any of
2069          * the timelines listed in expectedTLIs.
2070          *
2071          * We expect curFileTLI on entry to be the TLI of the preceding file in
2072          * sequence, or 0 if there was no predecessor.  We do not allow curFileTLI
2073          * to go backwards; this prevents us from picking up the wrong file when a
2074          * parent timeline extends to higher segment numbers than the child we
2075          * want to read.
2076          */
2077         foreach(cell, expectedTLIs)
2078         {
2079                 TimeLineID      tli = (TimeLineID) lfirst_int(cell);
2080
2081                 if (tli < curFileTLI)
2082                         break;                          /* don't bother looking at too-old TLIs */
2083
2084                 if (InArchiveRecovery)
2085                 {
2086                         XLogFileName(xlogfname, tli, log, seg);
2087                         restoredFromArchive = RestoreArchivedFile(path, xlogfname,
2088                                                                                                           "RECOVERYXLOG",
2089                                                                                                           XLogSegSize);
2090                 }
2091                 else
2092                         XLogFilePath(path, tli, log, seg);
2093
2094                 fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
2095                 if (fd >= 0)
2096                 {
2097                         /* Success! */
2098                         curFileTLI = tli;
2099                         return fd;
2100                 }
2101                 if (errno != ENOENT)    /* unexpected failure? */
2102                         ereport(PANIC,
2103                                         (errcode_for_file_access(),
2104                         errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2105                                    path, log, seg)));
2106         }
2107
2108         /* Couldn't find it.  For simplicity, complain about front timeline */
2109         XLogFilePath(path, recoveryTargetTLI, log, seg);
2110         errno = ENOENT;
2111         ereport(emode,
2112                         (errcode_for_file_access(),
2113                    errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2114                                   path, log, seg)));
2115         return -1;
2116 }
2117
2118 /*
2119  * Attempt to retrieve the specified file from off-line archival storage.
2120  * If successful, fill "path" with its complete path (note that this will be
2121  * a temp file name that doesn't follow the normal naming convention), and
2122  * return TRUE.
2123  *
2124  * If not successful, fill "path" with the name of the normal on-line file
2125  * (which may or may not actually exist, but we'll try to use it), and return
2126  * FALSE.
2127  *
2128  * For fixed-size files, the caller may pass the expected size as an
2129  * additional crosscheck on successful recovery.  If the file size is not
2130  * known, set expectedSize = 0.
2131  */
2132 static bool
2133 RestoreArchivedFile(char *path, const char *xlogfname,
2134                                         const char *recovername, off_t expectedSize)
2135 {
2136         char            xlogpath[MAXPGPATH];
2137         char            xlogRestoreCmd[MAXPGPATH];
2138         char       *dp;
2139         char       *endp;
2140         const char *sp;
2141         int                     rc;
2142         struct stat stat_buf;
2143
2144         /*
2145          * When doing archive recovery, we always prefer an archived log file even
2146          * if a file of the same name exists in XLOGDIR.  The reason is that the
2147          * file in XLOGDIR could be an old, un-filled or partly-filled version
2148          * that was copied and restored as part of backing up $PGDATA.
2149          *
2150          * We could try to optimize this slightly by checking the local copy
2151          * lastchange timestamp against the archived copy, but we have no API to
2152          * do this, nor can we guarantee that the lastchange timestamp was
2153          * preserved correctly when we copied to archive. Our aim is robustness,
2154          * so we elect not to do this.
2155          *
2156          * If we cannot obtain the log file from the archive, however, we will try
2157          * to use the XLOGDIR file if it exists.  This is so that we can make use
2158          * of log segments that weren't yet transferred to the archive.
2159          *
2160          * Notice that we don't actually overwrite any files when we copy back
2161          * from archive because the recoveryRestoreCommand may inadvertently
2162          * restore inappropriate xlogs, or they may be corrupt, so we may wish to
2163          * fallback to the segments remaining in current XLOGDIR later. The
2164          * copy-from-archive filename is always the same, ensuring that we don't
2165          * run out of disk space on long recoveries.
2166          */
2167         snprintf(xlogpath, MAXPGPATH, XLOGDIR "/%s", recovername);
2168
2169         /*
2170          * Make sure there is no existing file named recovername.
2171          */
2172         if (stat(xlogpath, &stat_buf) != 0)
2173         {
2174                 if (errno != ENOENT)
2175                         ereport(FATAL,
2176                                         (errcode_for_file_access(),
2177                                          errmsg("could not stat file \"%s\": %m",
2178                                                         xlogpath)));
2179         }
2180         else
2181         {
2182                 if (unlink(xlogpath) != 0)
2183                         ereport(FATAL,
2184                                         (errcode_for_file_access(),
2185                                          errmsg("could not remove file \"%s\": %m",
2186                                                         xlogpath)));
2187         }
2188
2189         /*
2190          * construct the command to be executed
2191          */
2192         dp = xlogRestoreCmd;
2193         endp = xlogRestoreCmd + MAXPGPATH - 1;
2194         *endp = '\0';
2195
2196         for (sp = recoveryRestoreCommand; *sp; sp++)
2197         {
2198                 if (*sp == '%')
2199                 {
2200                         switch (sp[1])
2201                         {
2202                                 case 'p':
2203                                         /* %p: full path of target file */
2204                                         sp++;
2205                                         StrNCpy(dp, xlogpath, endp - dp);
2206                                         make_native_path(dp);
2207                                         dp += strlen(dp);
2208                                         break;
2209                                 case 'f':
2210                                         /* %f: filename of desired file */
2211                                         sp++;
2212                                         StrNCpy(dp, xlogfname, endp - dp);
2213                                         dp += strlen(dp);
2214                                         break;
2215                                 case '%':
2216                                         /* convert %% to a single % */
2217                                         sp++;
2218                                         if (dp < endp)
2219                                                 *dp++ = *sp;
2220                                         break;
2221                                 default:
2222                                         /* otherwise treat the % as not special */
2223                                         if (dp < endp)
2224                                                 *dp++ = *sp;
2225                                         break;
2226                         }
2227                 }
2228                 else
2229                 {
2230                         if (dp < endp)
2231                                 *dp++ = *sp;
2232                 }
2233         }
2234         *dp = '\0';
2235
2236         ereport(DEBUG3,
2237                         (errmsg_internal("executing restore command \"%s\"",
2238                                                          xlogRestoreCmd)));
2239
2240         /*
2241          * Copy xlog from archival storage to XLOGDIR
2242          */
2243         rc = system(xlogRestoreCmd);
2244         if (rc == 0)
2245         {
2246                 /*
2247                  * command apparently succeeded, but let's make sure the file is
2248                  * really there now and has the correct size.
2249                  *
2250                  * XXX I made wrong-size a fatal error to ensure the DBA would notice
2251                  * it, but is that too strong?  We could try to plow ahead with a
2252                  * local copy of the file ... but the problem is that there probably
2253                  * isn't one, and we'd incorrectly conclude we've reached the end of
2254                  * WAL and we're done recovering ...
2255                  */
2256                 if (stat(xlogpath, &stat_buf) == 0)
2257                 {
2258                         if (expectedSize > 0 && stat_buf.st_size != expectedSize)
2259                                 ereport(FATAL,
2260                                                 (errmsg("archive file \"%s\" has wrong size: %lu instead of %lu",
2261                                                                 xlogfname,
2262                                                                 (unsigned long) stat_buf.st_size,
2263                                                                 (unsigned long) expectedSize)));
2264                         else
2265                         {
2266                                 ereport(LOG,
2267                                                 (errmsg("restored log file \"%s\" from archive",
2268                                                                 xlogfname)));
2269                                 strcpy(path, xlogpath);
2270                                 return true;
2271                         }
2272                 }
2273                 else
2274                 {
2275                         /* stat failed */
2276                         if (errno != ENOENT)
2277                                 ereport(FATAL,
2278                                                 (errcode_for_file_access(),
2279                                                  errmsg("could not stat file \"%s\": %m",
2280                                                                 xlogpath)));
2281                 }
2282         }
2283
2284         /*
2285          * remember, we rollforward UNTIL the restore fails so failure here is
2286          * just part of the process... that makes it difficult to determine
2287          * whether the restore failed because there isn't an archive to restore,
2288          * or because the administrator has specified the restore program
2289          * incorrectly.  We have to assume the former.
2290          */
2291         ereport(DEBUG2,
2292                 (errmsg("could not restore file \"%s\" from archive: return code %d",
2293                                 xlogfname, rc)));
2294
2295         /*
2296          * if an archived file is not available, there might still be a version of
2297          * this file in XLOGDIR, so return that as the filename to open.
2298          *
2299          * In many recovery scenarios we expect this to fail also, but if so that
2300          * just means we've reached the end of WAL.
2301          */
2302         snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlogfname);
2303         return false;
2304 }
2305
2306 /*
2307  * Preallocate log files beyond the specified log endpoint, according to
2308  * the XLOGfile user parameter.
2309  */
2310 static int
2311 PreallocXlogFiles(XLogRecPtr endptr)
2312 {
2313         int                     nsegsadded = 0;
2314         uint32          _logId;
2315         uint32          _logSeg;
2316         int                     lf;
2317         bool            use_existent;
2318
2319         XLByteToPrevSeg(endptr, _logId, _logSeg);
2320         if ((endptr.xrecoff - 1) % XLogSegSize >=
2321                 (uint32) (0.75 * XLogSegSize))
2322         {
2323                 NextLogSeg(_logId, _logSeg);
2324                 use_existent = true;
2325                 lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
2326                 close(lf);
2327                 if (!use_existent)
2328                         nsegsadded++;
2329         }
2330         return nsegsadded;
2331 }
2332
2333 /*
2334  * Remove or move offline all log files older or equal to passed log/seg#
2335  *
2336  * endptr is current (or recent) end of xlog; this is used to determine
2337  * whether we want to recycle rather than delete no-longer-wanted log files.
2338  */
2339 static void
2340 MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr,
2341                                 int *nsegsremoved, int *nsegsrecycled)
2342 {
2343         uint32          endlogId;
2344         uint32          endlogSeg;
2345         int                     max_advance;
2346         DIR                *xldir;
2347         struct dirent *xlde;
2348         char            lastoff[MAXFNAMELEN];
2349         char            path[MAXPGPATH];
2350
2351         *nsegsremoved = 0;
2352         *nsegsrecycled = 0;
2353
2354         /*
2355          * Initialize info about where to try to recycle to.  We allow recycling
2356          * segments up to XLOGfileslop segments beyond the current XLOG location.
2357          */
2358         XLByteToPrevSeg(endptr, endlogId, endlogSeg);
2359         max_advance = XLOGfileslop;
2360
2361         xldir = AllocateDir(XLOGDIR);
2362         if (xldir == NULL)
2363                 ereport(ERROR,
2364                                 (errcode_for_file_access(),
2365                                  errmsg("could not open transaction log directory \"%s\": %m",
2366                                                 XLOGDIR)));
2367
2368         XLogFileName(lastoff, ThisTimeLineID, log, seg);
2369
2370         while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
2371         {
2372                 /*
2373                  * We ignore the timeline part of the XLOG segment identifiers in
2374                  * deciding whether a segment is still needed.  This ensures that we
2375                  * won't prematurely remove a segment from a parent timeline. We could
2376                  * probably be a little more proactive about removing segments of
2377                  * non-parent timelines, but that would be a whole lot more
2378                  * complicated.
2379                  *
2380                  * We use the alphanumeric sorting property of the filenames to decide
2381                  * which ones are earlier than the lastoff segment.
2382                  */
2383                 if (strlen(xlde->d_name) == 24 &&
2384                         strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
2385                         strcmp(xlde->d_name + 8, lastoff + 8) <= 0)
2386                 {
2387                         bool            recycle;
2388
2389                         if (XLogArchivingActive())
2390                                 recycle = XLogArchiveIsDone(xlde->d_name);
2391                         else
2392                                 recycle = true;
2393
2394                         if (recycle)
2395                         {
2396                                 snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
2397
2398                                 /*
2399                                  * Before deleting the file, see if it can be recycled as a
2400                                  * future log segment.
2401                                  */
2402                                 if (InstallXLogFileSegment(&endlogId, &endlogSeg, path,
2403                                                                                    true, &max_advance,
2404                                                                                    true))
2405                                 {
2406                                         ereport(DEBUG2,
2407                                                         (errmsg("recycled transaction log file \"%s\"",
2408                                                                         xlde->d_name)));
2409                                         (*nsegsrecycled)++;
2410                                         /* Needn't recheck that slot on future iterations */
2411                                         if (max_advance > 0)
2412                                         {
2413                                                 NextLogSeg(endlogId, endlogSeg);
2414                                                 max_advance--;
2415                                         }
2416                                 }
2417                                 else
2418                                 {
2419                                         /* No need for any more future segments... */
2420                                         ereport(DEBUG2,
2421                                                         (errmsg("removing transaction log file \"%s\"",
2422                                                                         xlde->d_name)));
2423                                         unlink(path);
2424                                         (*nsegsremoved)++;
2425                                 }
2426
2427                                 XLogArchiveCleanup(xlde->d_name);
2428                         }
2429                 }
2430         }
2431
2432         FreeDir(xldir);
2433 }
2434
2435 /*
2436  * Remove previous backup history files
2437  */
2438 static void
2439 RemoveOldBackupHistory(void)
2440 {
2441         DIR                *xldir;
2442         struct dirent *xlde;
2443         char            path[MAXPGPATH];
2444
2445         xldir = AllocateDir(XLOGDIR);
2446         if (xldir == NULL)
2447                 ereport(ERROR,
2448                                 (errcode_for_file_access(),
2449                                  errmsg("could not open transaction log directory \"%s\": %m",
2450                                                 XLOGDIR)));
2451
2452         while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
2453         {
2454                 if (strlen(xlde->d_name) > 24 &&
2455                         strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
2456                         strcmp(xlde->d_name + strlen(xlde->d_name) - strlen(".backup"),
2457                                    ".backup") == 0)
2458                 {
2459                         /* Remove any *.backup files that have been archived. */
2460                         if (!XLogArchivingActive() || XLogArchiveIsDone(xlde->d_name))
2461                         {
2462                                 ereport(DEBUG2,
2463                                 (errmsg("removing transaction log backup history file \"%s\"",
2464                                                 xlde->d_name)));
2465                                 snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
2466                                 unlink(path);
2467                                 XLogArchiveCleanup(xlde->d_name);
2468                         }
2469                 }
2470         }
2471
2472         FreeDir(xldir);
2473 }
2474
2475 /*
2476  * Restore the backup blocks present in an XLOG record, if any.
2477  *
2478  * We assume all of the record has been read into memory at *record.
2479  *
2480  * Note: when a backup block is available in XLOG, we restore it
2481  * unconditionally, even if the page in the database appears newer.
2482  * This is to protect ourselves against database pages that were partially
2483  * or incorrectly written during a crash.  We assume that the XLOG data
2484  * must be good because it has passed a CRC check, while the database
2485  * page might not be.  This will force us to replay all subsequent
2486  * modifications of the page that appear in XLOG, rather than possibly
2487  * ignoring them as already applied, but that's not a huge drawback.
2488  */
2489 static void
2490 RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
2491 {
2492         Relation        reln;
2493         Buffer          buffer;
2494         Page            page;
2495         BkpBlock        bkpb;
2496         char       *blk;
2497         int                     i;
2498
2499         blk = (char *) XLogRecGetData(record) + record->xl_len;
2500         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
2501         {
2502                 if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
2503                         continue;
2504
2505                 memcpy(&bkpb, blk, sizeof(BkpBlock));
2506                 blk += sizeof(BkpBlock);
2507
2508                 reln = XLogOpenRelation(bkpb.node);
2509
2510                 if (reln)
2511                 {
2512                         buffer = XLogReadBuffer(true, reln, bkpb.block);
2513                         if (BufferIsValid(buffer))
2514                         {
2515                                 page = (Page) BufferGetPage(buffer);
2516
2517                                 if (bkpb.hole_length == 0)
2518                                 {
2519                                         memcpy((char *) page, blk, BLCKSZ);
2520                                 }
2521                                 else
2522                                 {
2523                                         /* must zero-fill the hole */
2524                                         MemSet((char *) page, 0, BLCKSZ);
2525                                         memcpy((char *) page, blk, bkpb.hole_offset);
2526                                         memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length),
2527                                                    blk + bkpb.hole_offset,
2528                                                    BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
2529                                 }
2530
2531                                 PageSetLSN(page, lsn);
2532                                 PageSetTLI(page, ThisTimeLineID);
2533                                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
2534                                 WriteBuffer(buffer);
2535                         }
2536                 }
2537
2538                 blk += BLCKSZ - bkpb.hole_length;
2539         }
2540 }
2541
2542 /*
2543  * CRC-check an XLOG record.  We do not believe the contents of an XLOG
2544  * record (other than to the minimal extent of computing the amount of
2545  * data to read in) until we've checked the CRCs.
2546  *
2547  * We assume all of the record has been read into memory at *record.
2548  */
2549 static bool
2550 RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
2551 {
2552         pg_crc32        crc;
2553         int                     i;
2554         uint32          len = record->xl_len;
2555         BkpBlock        bkpb;
2556         char       *blk;
2557
2558         /* First the rmgr data */
2559         INIT_CRC32(crc);
2560         COMP_CRC32(crc, XLogRecGetData(record), len);
2561
2562         /* Add in the backup blocks, if any */
2563         blk = (char *) XLogRecGetData(record) + len;
2564         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
2565         {
2566                 uint32          blen;
2567
2568                 if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
2569                         continue;
2570
2571                 memcpy(&bkpb, blk, sizeof(BkpBlock));
2572                 if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ)
2573                 {
2574                         ereport(emode,
2575                                         (errmsg("incorrect hole size in record at %X/%X",
2576                                                         recptr.xlogid, recptr.xrecoff)));
2577                         return false;
2578                 }
2579                 blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length;
2580                 COMP_CRC32(crc, blk, blen);
2581                 blk += blen;
2582         }
2583
2584         /* Check that xl_tot_len agrees with our calculation */
2585         if (blk != (char *) record + record->xl_tot_len)
2586         {
2587                 ereport(emode,
2588                                 (errmsg("incorrect total length in record at %X/%X",
2589                                                 recptr.xlogid, recptr.xrecoff)));
2590                 return false;
2591         }
2592
2593         /* Finally include the record header */
2594         COMP_CRC32(crc, (char *) record + sizeof(pg_crc32),
2595                            SizeOfXLogRecord - sizeof(pg_crc32));
2596         FIN_CRC32(crc);
2597
2598         if (!EQ_CRC32(record->xl_crc, crc))
2599         {
2600                 ereport(emode,
2601                 (errmsg("incorrect resource manager data checksum in record at %X/%X",
2602                                 recptr.xlogid, recptr.xrecoff)));
2603                 return false;
2604         }
2605
2606         return true;
2607 }
2608
2609 /*
2610  * Attempt to read an XLOG record.
2611  *
2612  * If RecPtr is not NULL, try to read a record at that position.  Otherwise
2613  * try to read a record just after the last one previously read.
2614  *
2615  * If no valid record is available, returns NULL, or fails if emode is PANIC.
2616  * (emode must be either PANIC or LOG.)
2617  *
2618  * The record is copied into readRecordBuf, so that on successful return,
2619  * the returned record pointer always points there.
2620  */
2621 static XLogRecord *
2622 ReadRecord(XLogRecPtr *RecPtr, int emode)
2623 {
2624         XLogRecord *record;
2625         char       *buffer;
2626         XLogRecPtr      tmpRecPtr = EndRecPtr;
2627         bool            randAccess = false;
2628         uint32          len,
2629                                 total_len;
2630         uint32          targetPageOff;
2631         uint32          targetRecOff;
2632         uint32          pageHeaderSize;
2633
2634         if (readBuf == NULL)
2635         {
2636                 /*
2637                  * First time through, permanently allocate readBuf.  We do it this
2638                  * way, rather than just making a static array, for two reasons: (1)
2639                  * no need to waste the storage in most instantiations of the backend;
2640                  * (2) a static char array isn't guaranteed to have any particular
2641                  * alignment, whereas malloc() will provide MAXALIGN'd storage.
2642                  */
2643                 readBuf = (char *) malloc(BLCKSZ);
2644                 Assert(readBuf != NULL);
2645         }
2646
2647         if (RecPtr == NULL)
2648         {
2649                 RecPtr = &tmpRecPtr;
2650                 /* fast case if next record is on same page */
2651                 if (nextRecord != NULL)
2652                 {
2653                         record = nextRecord;
2654                         goto got_record;
2655                 }
2656                 /* align old recptr to next page */
2657                 if (tmpRecPtr.xrecoff % BLCKSZ != 0)
2658                         tmpRecPtr.xrecoff += (BLCKSZ - tmpRecPtr.xrecoff % BLCKSZ);
2659                 if (tmpRecPtr.xrecoff >= XLogFileSize)
2660                 {
2661                         (tmpRecPtr.xlogid)++;
2662                         tmpRecPtr.xrecoff = 0;
2663                 }
2664                 /* We will account for page header size below */
2665         }
2666         else
2667         {
2668                 if (!XRecOffIsValid(RecPtr->xrecoff))
2669                         ereport(PANIC,
2670                                         (errmsg("invalid record offset at %X/%X",
2671                                                         RecPtr->xlogid, RecPtr->xrecoff)));
2672
2673                 /*
2674                  * Since we are going to a random position in WAL, forget any prior
2675                  * state about what timeline we were in, and allow it to be any
2676                  * timeline in expectedTLIs.  We also set a flag to allow curFileTLI
2677                  * to go backwards (but we can't reset that variable right here, since
2678                  * we might not change files at all).
2679                  */
2680                 lastPageTLI = 0;                /* see comment in ValidXLOGHeader */
2681                 randAccess = true;              /* allow curFileTLI to go backwards too */
2682         }
2683
2684         if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
2685         {
2686                 close(readFile);
2687                 readFile = -1;
2688         }
2689         XLByteToSeg(*RecPtr, readId, readSeg);
2690         if (readFile < 0)
2691         {
2692                 /* Now it's okay to reset curFileTLI if random fetch */
2693                 if (randAccess)
2694                         curFileTLI = 0;
2695
2696                 readFile = XLogFileRead(readId, readSeg, emode);
2697                 if (readFile < 0)
2698                         goto next_record_is_invalid;
2699                 readOff = (uint32) (-1);        /* force read to occur below */
2700         }
2701
2702         targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / BLCKSZ) * BLCKSZ;
2703         if (readOff != targetPageOff)
2704         {
2705                 readOff = targetPageOff;
2706                 if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
2707                 {
2708                         ereport(emode,
2709                                         (errcode_for_file_access(),
2710                                          errmsg("could not seek in log file %u, segment %u to offset %u: %m",
2711                                                         readId, readSeg, readOff)));
2712                         goto next_record_is_invalid;
2713                 }
2714                 if (read(readFile, readBuf, BLCKSZ) != BLCKSZ)
2715                 {
2716                         ereport(emode,
2717                                         (errcode_for_file_access(),
2718                                          errmsg("could not read from log file %u, segment %u at offset %u: %m",
2719                                                         readId, readSeg, readOff)));
2720                         goto next_record_is_invalid;
2721                 }
2722                 if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
2723                         goto next_record_is_invalid;
2724         }
2725         pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
2726         targetRecOff = RecPtr->xrecoff % BLCKSZ;
2727         if (targetRecOff == 0)
2728         {
2729                 /*
2730                  * Can only get here in the continuing-from-prev-page case, because
2731                  * XRecOffIsValid eliminated the zero-page-offset case otherwise. Need
2732                  * to skip over the new page's header.
2733                  */
2734                 tmpRecPtr.xrecoff += pageHeaderSize;
2735                 targetRecOff = pageHeaderSize;
2736         }
2737         else if (targetRecOff < pageHeaderSize)
2738         {
2739                 ereport(emode,
2740                                 (errmsg("invalid record offset at %X/%X",
2741                                                 RecPtr->xlogid, RecPtr->xrecoff)));
2742                 goto next_record_is_invalid;
2743         }
2744         if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
2745                 targetRecOff == pageHeaderSize)
2746         {
2747                 ereport(emode,
2748                                 (errmsg("contrecord is requested by %X/%X",
2749                                                 RecPtr->xlogid, RecPtr->xrecoff)));
2750                 goto next_record_is_invalid;
2751         }
2752         record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % BLCKSZ);
2753
2754 got_record:;
2755
2756         /*
2757          * Currently, xl_len == 0 must be bad data, but that might not be true
2758          * forever.  See note in XLogInsert.
2759          */
2760         if (record->xl_len == 0)
2761         {
2762                 ereport(emode,
2763                                 (errmsg("record with zero length at %X/%X",
2764                                                 RecPtr->xlogid, RecPtr->xrecoff)));
2765                 goto next_record_is_invalid;
2766         }
2767         if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len ||
2768                 record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
2769                 XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
2770         {
2771                 ereport(emode,
2772                                 (errmsg("invalid record length at %X/%X",
2773                                                 RecPtr->xlogid, RecPtr->xrecoff)));
2774                 goto next_record_is_invalid;
2775         }
2776         if (record->xl_rmid > RM_MAX_ID)
2777         {
2778                 ereport(emode,
2779                                 (errmsg("invalid resource manager ID %u at %X/%X",
2780                                                 record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff)));
2781                 goto next_record_is_invalid;
2782         }
2783         if (randAccess)
2784         {
2785                 /*
2786                  * We can't exactly verify the prev-link, but surely it should be less
2787                  * than the record's own address.
2788                  */
2789                 if (!XLByteLT(record->xl_prev, *RecPtr))
2790                 {
2791                         ereport(emode,
2792                                         (errmsg("record with incorrect prev-link %X/%X at %X/%X",
2793                                                         record->xl_prev.xlogid, record->xl_prev.xrecoff,
2794                                                         RecPtr->xlogid, RecPtr->xrecoff)));
2795                         goto next_record_is_invalid;
2796                 }
2797         }
2798         else
2799         {
2800                 /*
2801                  * Record's prev-link should exactly match our previous location. This
2802                  * check guards against torn WAL pages where a stale but valid-looking
2803                  * WAL record starts on a sector boundary.
2804                  */
2805                 if (!XLByteEQ(record->xl_prev, ReadRecPtr))
2806                 {
2807                         ereport(emode,
2808                                         (errmsg("record with incorrect prev-link %X/%X at %X/%X",
2809                                                         record->xl_prev.xlogid, record->xl_prev.xrecoff,
2810                                                         RecPtr->xlogid, RecPtr->xrecoff)));
2811                         goto next_record_is_invalid;
2812                 }
2813         }
2814
2815         /*
2816          * Allocate or enlarge readRecordBuf as needed.  To avoid useless small
2817          * increases, round its size to a multiple of BLCKSZ, and make sure it's
2818          * at least 4*BLCKSZ to start with.  (That is enough for all "normal"
2819          * records, but very large commit or abort records might need more space.)
2820          */
2821         total_len = record->xl_tot_len;
2822         if (total_len > readRecordBufSize)
2823         {
2824                 uint32          newSize = total_len;
2825
2826                 newSize += BLCKSZ - (newSize % BLCKSZ);
2827                 newSize = Max(newSize, 4 * BLCKSZ);
2828                 if (readRecordBuf)
2829                         free(readRecordBuf);
2830                 readRecordBuf = (char *) malloc(newSize);
2831                 if (!readRecordBuf)
2832                 {
2833                         readRecordBufSize = 0;
2834                         /* We treat this as a "bogus data" condition */
2835                         ereport(emode,
2836                                         (errmsg("record length %u at %X/%X too long",
2837                                                         total_len, RecPtr->xlogid, RecPtr->xrecoff)));
2838                         goto next_record_is_invalid;
2839                 }
2840                 readRecordBufSize = newSize;
2841         }
2842
2843         buffer = readRecordBuf;
2844         nextRecord = NULL;
2845         len = BLCKSZ - RecPtr->xrecoff % BLCKSZ;
2846         if (total_len > len)
2847         {
2848                 /* Need to reassemble record */
2849                 XLogContRecord *contrecord;
2850                 uint32          gotlen = len;
2851
2852                 memcpy(buffer, record, len);
2853                 record = (XLogRecord *) buffer;
2854                 buffer += len;
2855                 for (;;)
2856                 {
2857                         readOff += BLCKSZ;
2858                         if (readOff >= XLogSegSize)
2859                         {
2860                                 close(readFile);
2861                                 readFile = -1;
2862                                 NextLogSeg(readId, readSeg);
2863                                 readFile = XLogFileRead(readId, readSeg, emode);
2864                                 if (readFile < 0)
2865                                         goto next_record_is_invalid;
2866                                 readOff = 0;
2867                         }
2868                         if (read(readFile, readBuf, BLCKSZ) != BLCKSZ)
2869                         {
2870                                 ereport(emode,
2871                                                 (errcode_for_file_access(),
2872                                                  errmsg("could not read from log file %u, segment %u, offset %u: %m",
2873                                                                 readId, readSeg, readOff)));
2874                                 goto next_record_is_invalid;
2875                         }
2876                         if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
2877                                 goto next_record_is_invalid;
2878                         if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD))
2879                         {
2880                                 ereport(emode,
2881                                                 (errmsg("there is no contrecord flag in log file %u, segment %u, offset %u",
2882                                                                 readId, readSeg, readOff)));
2883                                 goto next_record_is_invalid;
2884                         }
2885                         pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
2886                         contrecord = (XLogContRecord *) ((char *) readBuf + pageHeaderSize);
2887                         if (contrecord->xl_rem_len == 0 ||
2888                                 total_len != (contrecord->xl_rem_len + gotlen))
2889                         {
2890                                 ereport(emode,
2891                                                 (errmsg("invalid contrecord length %u in log file %u, segment %u, offset %u",
2892                                                                 contrecord->xl_rem_len,
2893                                                                 readId, readSeg, readOff)));
2894                                 goto next_record_is_invalid;
2895                         }
2896                         len = BLCKSZ - pageHeaderSize - SizeOfXLogContRecord;
2897                         if (contrecord->xl_rem_len > len)
2898                         {
2899                                 memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord, len);
2900                                 gotlen += len;
2901                                 buffer += len;
2902                                 continue;
2903                         }
2904                         memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord,
2905                                    contrecord->xl_rem_len);
2906                         break;
2907                 }
2908                 if (!RecordIsValid(record, *RecPtr, emode))
2909                         goto next_record_is_invalid;
2910                 pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
2911                 if (BLCKSZ - SizeOfXLogRecord >= pageHeaderSize +
2912                         MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len))
2913                 {
2914                         nextRecord = (XLogRecord *) ((char *) contrecord +
2915                                         MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len));
2916                 }
2917                 EndRecPtr.xlogid = readId;
2918                 EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff +
2919                         pageHeaderSize +
2920                         MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len);
2921                 ReadRecPtr = *RecPtr;
2922                 return record;
2923         }
2924
2925         /* Record does not cross a page boundary */
2926         if (!RecordIsValid(record, *RecPtr, emode))
2927                 goto next_record_is_invalid;
2928         if (BLCKSZ - SizeOfXLogRecord >= RecPtr->xrecoff % BLCKSZ +
2929                 MAXALIGN(total_len))
2930                 nextRecord = (XLogRecord *) ((char *) record + MAXALIGN(total_len));
2931         EndRecPtr.xlogid = RecPtr->xlogid;
2932         EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);
2933         ReadRecPtr = *RecPtr;
2934         memcpy(buffer, record, total_len);
2935         return (XLogRecord *) buffer;
2936
2937 next_record_is_invalid:;
2938         close(readFile);
2939         readFile = -1;
2940         nextRecord = NULL;
2941         return NULL;
2942 }
2943
2944 /*
2945  * Check whether the xlog header of a page just read in looks valid.
2946  *
2947  * This is just a convenience subroutine to avoid duplicated code in
2948  * ReadRecord.  It's not intended for use from anywhere else.
2949  */
2950 static bool
2951 ValidXLOGHeader(XLogPageHeader hdr, int emode)
2952 {
2953         XLogRecPtr      recaddr;
2954
2955         if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
2956         {
2957                 ereport(emode,
2958                                 (errmsg("invalid magic number %04X in log file %u, segment %u, offset %u",
2959                                                 hdr->xlp_magic, readId, readSeg, readOff)));
2960                 return false;
2961         }
2962         if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
2963         {
2964                 ereport(emode,
2965                                 (errmsg("invalid info bits %04X in log file %u, segment %u, offset %u",
2966                                                 hdr->xlp_info, readId, readSeg, readOff)));
2967                 return false;
2968         }
2969         if (hdr->xlp_info & XLP_LONG_HEADER)
2970         {
2971                 XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
2972
2973                 if (longhdr->xlp_sysid != ControlFile->system_identifier)
2974                 {
2975                         char            fhdrident_str[32];
2976                         char            sysident_str[32];
2977
2978                         /*
2979                          * Format sysids separately to keep platform-dependent format code
2980                          * out of the translatable message string.
2981                          */
2982                         snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT,
2983                                          longhdr->xlp_sysid);
2984                         snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT,
2985                                          ControlFile->system_identifier);
2986                         ereport(emode,
2987                                         (errmsg("WAL file is from different system"),
2988                                          errdetail("WAL file SYSID is %s, pg_control SYSID is %s",
2989                                                            fhdrident_str, sysident_str)));
2990                         return false;
2991                 }
2992                 if (longhdr->xlp_seg_size != XLogSegSize)
2993                 {
2994                         ereport(emode,
2995                                         (errmsg("WAL file is from different system"),
2996                                          errdetail("Incorrect XLOG_SEG_SIZE in page header.")));
2997                         return false;
2998                 }
2999         }
3000         recaddr.xlogid = readId;
3001         recaddr.xrecoff = readSeg * XLogSegSize + readOff;
3002         if (!XLByteEQ(hdr->xlp_pageaddr, recaddr))
3003         {
3004                 ereport(emode,
3005                                 (errmsg("unexpected pageaddr %X/%X in log file %u, segment %u, offset %u",
3006                                                 hdr->xlp_pageaddr.xlogid, hdr->xlp_pageaddr.xrecoff,
3007                                                 readId, readSeg, readOff)));
3008                 return false;
3009         }
3010
3011         /*
3012          * Check page TLI is one of the expected values.
3013          */
3014         if (!list_member_int(expectedTLIs, (int) hdr->xlp_tli))
3015         {
3016                 ereport(emode,
3017                                 (errmsg("unexpected timeline ID %u in log file %u, segment %u, offset %u",
3018                                                 hdr->xlp_tli,
3019                                                 readId, readSeg, readOff)));
3020                 return false;
3021         }
3022
3023         /*
3024          * Since child timelines are always assigned a TLI greater than their
3025          * immediate parent's TLI, we should never see TLI go backwards across
3026          * successive pages of a consistent WAL sequence.
3027          *
3028          * Of course this check should only be applied when advancing sequentially
3029          * across pages; therefore ReadRecord resets lastPageTLI to zero when
3030          * going to a random page.
3031          */
3032         if (hdr->xlp_tli < lastPageTLI)
3033         {
3034                 ereport(emode,
3035                                 (errmsg("out-of-sequence timeline ID %u (after %u) in log file %u, segment %u, offset %u",
3036                                                 hdr->xlp_tli, lastPageTLI,
3037                                                 readId, readSeg, readOff)));
3038                 return false;
3039         }
3040         lastPageTLI = hdr->xlp_tli;
3041         return true;
3042 }
3043
3044 /*
3045  * Try to read a timeline's history file.
3046  *
3047  * If successful, return the list of component TLIs (the given TLI followed by
3048  * its ancestor TLIs).  If we can't find the history file, assume that the
3049  * timeline has no parents, and return a list of just the specified timeline
3050  * ID.
3051  */
3052 static List *
3053 readTimeLineHistory(TimeLineID targetTLI)
3054 {
3055         List       *result;
3056         char            path[MAXPGPATH];
3057         char            histfname[MAXFNAMELEN];
3058         char            fline[MAXPGPATH];
3059         FILE       *fd;
3060
3061         if (InArchiveRecovery)
3062         {
3063                 TLHistoryFileName(histfname, targetTLI);
3064                 RestoreArchivedFile(path, histfname, "RECOVERYHISTORY", 0);
3065         }
3066         else
3067                 TLHistoryFilePath(path, targetTLI);
3068
3069         fd = AllocateFile(path, "r");
3070         if (fd == NULL)
3071         {
3072                 if (errno != ENOENT)
3073                         ereport(FATAL,
3074                                         (errcode_for_file_access(),
3075                                          errmsg("could not open file \"%s\": %m", path)));
3076                 /* Not there, so assume no parents */
3077                 return list_make1_int((int) targetTLI);
3078         }
3079
3080         result = NIL;
3081
3082         /*
3083          * Parse the file...
3084          */
3085         while (fgets(fline, MAXPGPATH, fd) != NULL)
3086         {
3087                 /* skip leading whitespace and check for # comment */
3088                 char       *ptr;
3089                 char       *endptr;
3090                 TimeLineID      tli;
3091
3092                 for (ptr = fline; *ptr; ptr++)
3093                 {
3094                         if (!isspace((unsigned char) *ptr))
3095                                 break;
3096                 }
3097                 if (*ptr == '\0' || *ptr == '#')
3098                         continue;
3099
3100                 /* expect a numeric timeline ID as first field of line */
3101                 tli = (TimeLineID) strtoul(ptr, &endptr, 0);
3102                 if (endptr == ptr)
3103                         ereport(FATAL,
3104                                         (errmsg("syntax error in history file: %s", fline),
3105                                          errhint("Expected a numeric timeline ID.")));
3106
3107                 if (result &&
3108                         tli <= (TimeLineID) linitial_int(result))
3109                         ereport(FATAL,
3110                                         (errmsg("invalid data in history file: %s", fline),
3111                                    errhint("Timeline IDs must be in increasing sequence.")));
3112
3113                 /* Build list with newest item first */
3114                 result = lcons_int((int) tli, result);
3115
3116                 /* we ignore the remainder of each line */
3117         }
3118
3119         FreeFile(fd);
3120
3121         if (result &&
3122                 targetTLI <= (TimeLineID) linitial_int(result))
3123                 ereport(FATAL,
3124                                 (errmsg("invalid data in history file \"%s\"", path),
3125                         errhint("Timeline IDs must be less than child timeline's ID.")));
3126
3127         result = lcons_int((int) targetTLI, result);
3128
3129         ereport(DEBUG3,
3130                         (errmsg_internal("history of timeline %u is %s",
3131                                                          targetTLI, nodeToString(result))));
3132
3133         return result;
3134 }
3135
3136 /*
3137  * Probe whether a timeline history file exists for the given timeline ID
3138  */
3139 static bool
3140 existsTimeLineHistory(TimeLineID probeTLI)
3141 {
3142         char            path[MAXPGPATH];
3143         char            histfname[MAXFNAMELEN];
3144         FILE       *fd;
3145
3146         if (InArchiveRecovery)
3147         {
3148                 TLHistoryFileName(histfname, probeTLI);
3149                 RestoreArchivedFile(path, histfname, "RECOVERYHISTORY", 0);
3150         }
3151         else
3152                 TLHistoryFilePath(path, probeTLI);
3153
3154         fd = AllocateFile(path, "r");
3155         if (fd != NULL)
3156         {
3157                 FreeFile(fd);
3158                 return true;
3159         }
3160         else
3161         {
3162                 if (errno != ENOENT)
3163                         ereport(FATAL,
3164                                         (errcode_for_file_access(),
3165                                          errmsg("could not open file \"%s\": %m", path)));
3166                 return false;
3167         }
3168 }
3169
3170 /*
3171  * Find the newest existing timeline, assuming that startTLI exists.
3172  *
3173  * Note: while this is somewhat heuristic, it does positively guarantee
3174  * that (result + 1) is not a known timeline, and therefore it should
3175  * be safe to assign that ID to a new timeline.
3176  */
3177 static TimeLineID
3178 findNewestTimeLine(TimeLineID startTLI)
3179 {
3180         TimeLineID      newestTLI;
3181         TimeLineID      probeTLI;
3182
3183         /*
3184          * The algorithm is just to probe for the existence of timeline history
3185          * files.  XXX is it useful to allow gaps in the sequence?
3186          */
3187         newestTLI = startTLI;
3188
3189         for (probeTLI = startTLI + 1;; probeTLI++)
3190         {
3191                 if (existsTimeLineHistory(probeTLI))
3192                 {
3193                         newestTLI = probeTLI;           /* probeTLI exists */
3194                 }
3195                 else
3196                 {
3197                         /* doesn't exist, assume we're done */
3198                         break;
3199                 }
3200         }
3201
3202         return newestTLI;
3203 }
3204
3205 /*
3206  * Create a new timeline history file.
3207  *
3208  *      newTLI: ID of the new timeline
3209  *      parentTLI: ID of its immediate parent
3210  *      endTLI et al: ID of the last used WAL file, for annotation purposes
3211  *
3212  * Currently this is only used during recovery, and so there are no locking
3213  * considerations.      But we should be just as tense as XLogFileInit to avoid
3214  * emplacing a bogus file.
3215  */
3216 static void
3217 writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
3218                                          TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
3219 {
3220         char            path[MAXPGPATH];
3221         char            tmppath[MAXPGPATH];
3222         char            histfname[MAXFNAMELEN];
3223         char            xlogfname[MAXFNAMELEN];
3224         char            buffer[BLCKSZ];
3225         int                     srcfd;
3226         int                     fd;
3227         int                     nbytes;
3228
3229         Assert(newTLI > parentTLI); /* else bad selection of newTLI */
3230
3231         /*
3232          * Write into a temp file name.
3233          */
3234         snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
3235
3236         unlink(tmppath);
3237
3238         /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
3239         fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL,
3240                                            S_IRUSR | S_IWUSR);
3241         if (fd < 0)
3242                 ereport(ERROR,
3243                                 (errcode_for_file_access(),
3244                                  errmsg("could not create file \"%s\": %m", tmppath)));
3245
3246         /*
3247          * If a history file exists for the parent, copy it verbatim
3248          */
3249         if (InArchiveRecovery)
3250         {
3251                 TLHistoryFileName(histfname, parentTLI);
3252                 RestoreArchivedFile(path, histfname, "RECOVERYHISTORY", 0);
3253         }
3254         else
3255                 TLHistoryFilePath(path, parentTLI);
3256
3257         srcfd = BasicOpenFile(path, O_RDONLY, 0);
3258         if (srcfd < 0)
3259         {
3260                 if (errno != ENOENT)
3261                         ereport(ERROR,
3262                                         (errcode_for_file_access(),
3263                                          errmsg("could not open file \"%s\": %m", path)));
3264                 /* Not there, so assume parent has no parents */
3265         }
3266         else
3267         {
3268                 for (;;)
3269                 {
3270                         errno = 0;
3271                         nbytes = (int) read(srcfd, buffer, sizeof(buffer));
3272                         if (nbytes < 0 || errno != 0)
3273                                 ereport(ERROR,
3274                                                 (errcode_for_file_access(),
3275                                                  errmsg("could not read file \"%s\": %m", path)));
3276                         if (nbytes == 0)
3277                                 break;
3278                         errno = 0;
3279                         if ((int) write(fd, buffer, nbytes) != nbytes)
3280                         {
3281                                 int                     save_errno = errno;
3282
3283                                 /*
3284                                  * If we fail to make the file, delete it to release disk
3285                                  * space
3286                                  */
3287                                 unlink(tmppath);
3288
3289                                 /*
3290                                  * if write didn't set errno, assume problem is no disk space
3291                                  */
3292                                 errno = save_errno ? save_errno : ENOSPC;
3293
3294                                 ereport(ERROR,
3295                                                 (errcode_for_file_access(),
3296                                          errmsg("could not write to file \"%s\": %m", tmppath)));
3297                         }
3298                 }
3299                 close(srcfd);
3300         }
3301
3302         /*
3303          * Append one line with the details of this timeline split.
3304          *
3305          * If we did have a parent file, insert an extra newline just in case the
3306          * parent file failed to end with one.
3307          */
3308         XLogFileName(xlogfname, endTLI, endLogId, endLogSeg);
3309
3310         snprintf(buffer, sizeof(buffer),
3311                          "%s%u\t%s\t%s transaction %u at %s\n",
3312                          (srcfd < 0) ? "" : "\n",
3313                          parentTLI,
3314                          xlogfname,
3315                          recoveryStopAfter ? "after" : "before",
3316                          recoveryStopXid,
3317                          str_time(recoveryStopTime));
3318
3319         nbytes = strlen(buffer);
3320         errno = 0;
3321         if ((int) write(fd, buffer, nbytes) != nbytes)
3322         {
3323                 int                     save_errno = errno;
3324
3325                 /*
3326                  * If we fail to make the file, delete it to release disk space
3327                  */
3328                 unlink(tmppath);
3329                 /* if write didn't set errno, assume problem is no disk space */
3330                 errno = save_errno ? save_errno : ENOSPC;
3331
3332                 ereport(ERROR,
3333                                 (errcode_for_file_access(),
3334                                  errmsg("could not write to file \"%s\": %m", tmppath)));
3335         }
3336
3337         if (pg_fsync(fd) != 0)
3338                 ereport(ERROR,
3339                                 (errcode_for_file_access(),
3340                                  errmsg("could not fsync file \"%s\": %m", tmppath)));
3341
3342         if (close(fd))
3343                 ereport(ERROR,
3344                                 (errcode_for_file_access(),
3345                                  errmsg("could not close file \"%s\": %m", tmppath)));
3346
3347
3348         /*
3349          * Now move the completed history file into place with its final name.
3350          */
3351         TLHistoryFilePath(path, newTLI);
3352
3353         /*
3354          * Prefer link() to rename() here just to be really sure that we don't
3355          * overwrite an existing logfile.  However, there shouldn't be one, so
3356          * rename() is an acceptable substitute except for the truly paranoid.
3357          */
3358 #if HAVE_WORKING_LINK
3359         if (link(tmppath, path) < 0)
3360                 ereport(ERROR,
3361                                 (errcode_for_file_access(),
3362                                  errmsg("could not link file \"%s\" to \"%s\": %m",
3363                                                 tmppath, path)));
3364         unlink(tmppath);
3365 #else
3366         if (rename(tmppath, path) < 0)
3367                 ereport(ERROR,
3368                                 (errcode_for_file_access(),
3369                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
3370                                                 tmppath, path)));
3371 #endif
3372
3373         /* The history file can be archived immediately. */
3374         TLHistoryFileName(histfname, newTLI);
3375         XLogArchiveNotify(histfname);
3376 }
3377
3378 /*
3379  * I/O routines for pg_control
3380  *
3381  * *ControlFile is a buffer in shared memory that holds an image of the
3382  * contents of pg_control.      WriteControlFile() initializes pg_control
3383  * given a preloaded buffer, ReadControlFile() loads the buffer from
3384  * the pg_control file (during postmaster or standalone-backend startup),
3385  * and UpdateControlFile() rewrites pg_control after we modify xlog state.
3386  *
3387  * For simplicity, WriteControlFile() initializes the fields of pg_control
3388  * that are related to checking backend/database compatibility, and
3389  * ReadControlFile() verifies they are correct.  We could split out the
3390  * I/O and compatibility-check functions, but there seems no need currently.
3391  */
3392 static void
3393 WriteControlFile(void)
3394 {
3395         int                     fd;
3396         char            buffer[BLCKSZ]; /* need not be aligned */
3397         char       *localeptr;
3398
3399         /*
3400          * Initialize version and compatibility-check fields
3401          */
3402         ControlFile->pg_control_version = PG_CONTROL_VERSION;
3403         ControlFile->catalog_version_no = CATALOG_VERSION_NO;
3404
3405         ControlFile->maxAlign = MAXIMUM_ALIGNOF;
3406         ControlFile->floatFormat = FLOATFORMAT_VALUE;
3407
3408         ControlFile->blcksz = BLCKSZ;
3409         ControlFile->relseg_size = RELSEG_SIZE;
3410         ControlFile->xlog_seg_size = XLOG_SEG_SIZE;
3411
3412         ControlFile->nameDataLen = NAMEDATALEN;
3413         ControlFile->indexMaxKeys = INDEX_MAX_KEYS;
3414
3415 #ifdef HAVE_INT64_TIMESTAMP
3416         ControlFile->enableIntTimes = TRUE;
3417 #else
3418         ControlFile->enableIntTimes = FALSE;
3419 #endif
3420
3421         ControlFile->localeBuflen = LOCALE_NAME_BUFLEN;
3422         localeptr = setlocale(LC_COLLATE, NULL);
3423         if (!localeptr)
3424                 ereport(PANIC,
3425                                 (errmsg("invalid LC_COLLATE setting")));
3426         StrNCpy(ControlFile->lc_collate, localeptr, LOCALE_NAME_BUFLEN);
3427         localeptr = setlocale(LC_CTYPE, NULL);
3428         if (!localeptr)
3429                 ereport(PANIC,
3430                                 (errmsg("invalid LC_CTYPE setting")));
3431         StrNCpy(ControlFile->lc_ctype, localeptr, LOCALE_NAME_BUFLEN);
3432
3433         /* Contents are protected with a CRC */
3434         INIT_CRC32(ControlFile->crc);
3435         COMP_CRC32(ControlFile->crc,
3436                            (char *) ControlFile,
3437                            offsetof(ControlFileData, crc));
3438         FIN_CRC32(ControlFile->crc);
3439
3440         /*
3441          * We write out BLCKSZ bytes into pg_control, zero-padding the excess over
3442          * sizeof(ControlFileData).  This reduces the odds of premature-EOF errors
3443          * when reading pg_control.  We'll still fail when we check the contents
3444          * of the file, but hopefully with a more specific error than "couldn't
3445          * read pg_control".
3446          */
3447         if (sizeof(ControlFileData) > BLCKSZ)
3448                 ereport(PANIC,
3449                                 (errmsg("sizeof(ControlFileData) is larger than BLCKSZ; fix either one")));
3450
3451         memset(buffer, 0, BLCKSZ);
3452         memcpy(buffer, ControlFile, sizeof(ControlFileData));
3453
3454         fd = BasicOpenFile(XLOG_CONTROL_FILE,
3455                                            O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
3456                                            S_IRUSR | S_IWUSR);
3457         if (fd < 0)
3458                 ereport(PANIC,
3459                                 (errcode_for_file_access(),
3460                                  errmsg("could not create control file \"%s\": %m",
3461                                                 XLOG_CONTROL_FILE)));
3462
3463         errno = 0;
3464         if (write(fd, buffer, BLCKSZ) != BLCKSZ)
3465         {
3466                 /* if write didn't set errno, assume problem is no disk space */
3467                 if (errno == 0)
3468                         errno = ENOSPC;
3469                 ereport(PANIC,
3470                                 (errcode_for_file_access(),
3471                                  errmsg("could not write to control file: %m")));
3472         }
3473
3474         if (pg_fsync(fd) != 0)
3475                 ereport(PANIC,
3476                                 (errcode_for_file_access(),
3477                                  errmsg("could not fsync control file: %m")));
3478
3479         if (close(fd))
3480                 ereport(PANIC,
3481                                 (errcode_for_file_access(),
3482                                  errmsg("could not close control file: %m")));
3483 }
3484
3485 static void
3486 ReadControlFile(void)
3487 {
3488         pg_crc32        crc;
3489         int                     fd;
3490
3491         /*
3492          * Read data...
3493          */
3494         fd = BasicOpenFile(XLOG_CONTROL_FILE,
3495                                            O_RDWR | PG_BINARY,
3496                                            S_IRUSR | S_IWUSR);
3497         if (fd < 0)
3498                 ereport(PANIC,
3499                                 (errcode_for_file_access(),
3500                                  errmsg("could not open control file \"%s\": %m",
3501                                                 XLOG_CONTROL_FILE)));
3502
3503         if (read(fd, ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
3504                 ereport(PANIC,
3505                                 (errcode_for_file_access(),
3506                                  errmsg("could not read from control file: %m")));
3507
3508         close(fd);
3509
3510         /*
3511          * Check for expected pg_control format version.  If this is wrong, the
3512          * CRC check will likely fail because we'll be checking the wrong number
3513          * of bytes.  Complaining about wrong version will probably be more
3514          * enlightening than complaining about wrong CRC.
3515          */
3516         if (ControlFile->pg_control_version != PG_CONTROL_VERSION)
3517                 ereport(FATAL,
3518                                 (errmsg("database files are incompatible with server"),
3519                                  errdetail("The database cluster was initialized with PG_CONTROL_VERSION %d,"
3520                                   " but the server was compiled with PG_CONTROL_VERSION %d.",
3521                                                 ControlFile->pg_control_version, PG_CONTROL_VERSION),
3522                                  errhint("It looks like you need to initdb.")));
3523         /* Now check the CRC. */
3524         INIT_CRC32(crc);
3525         COMP_CRC32(crc,
3526                            (char *) ControlFile,
3527                            offsetof(ControlFileData, crc));
3528         FIN_CRC32(crc);
3529
3530         if (!EQ_CRC32(crc, ControlFile->crc))
3531                 ereport(FATAL,
3532                                 (errmsg("incorrect checksum in control file")));
3533
3534         /*
3535          * Do compatibility checking immediately.  We do this here for 2 reasons:
3536          *
3537          * (1) if the database isn't compatible with the backend executable, we
3538          * want to abort before we can possibly do any damage;
3539          *
3540          * (2) this code is executed in the postmaster, so the setlocale() will
3541          * propagate to forked backends, which aren't going to read this file for
3542          * themselves.  (These locale settings are considered critical
3543          * compatibility items because they can affect sort order of indexes.)
3544          */
3545         if (ControlFile->catalog_version_no != CATALOG_VERSION_NO)
3546                 ereport(FATAL,
3547                                 (errmsg("database files are incompatible with server"),
3548                                  errdetail("The database cluster was initialized with CATALOG_VERSION_NO %d,"
3549                                   " but the server was compiled with CATALOG_VERSION_NO %d.",
3550                                                 ControlFile->catalog_version_no, CATALOG_VERSION_NO),
3551                                  errhint("It looks like you need to initdb.")));
3552         if (ControlFile->maxAlign != MAXIMUM_ALIGNOF)
3553                 ereport(FATAL,
3554                                 (errmsg("database files are incompatible with server"),
3555                    errdetail("The database cluster was initialized with MAXALIGN %d,"
3556                                          " but the server was compiled with MAXALIGN %d.",
3557                                          ControlFile->maxAlign, MAXIMUM_ALIGNOF),
3558                                  errhint("It looks like you need to initdb.")));
3559         if (ControlFile->floatFormat != FLOATFORMAT_VALUE)
3560                 ereport(FATAL,
3561                                 (errmsg("database files are incompatible with server"),
3562                                  errdetail("The database cluster appears to use a different floating-point number format than the server executable."),
3563                                  errhint("It looks like you need to initdb.")));
3564         if (ControlFile->blcksz != BLCKSZ)
3565                 ereport(FATAL,
3566                                 (errmsg("database files are incompatible with server"),
3567                          errdetail("The database cluster was initialized with BLCKSZ %d,"
3568                                            " but the server was compiled with BLCKSZ %d.",
3569                                            ControlFile->blcksz, BLCKSZ),
3570                                  errhint("It looks like you need to recompile or initdb.")));
3571         if (ControlFile->relseg_size != RELSEG_SIZE)
3572                 ereport(FATAL,
3573                                 (errmsg("database files are incompatible with server"),
3574                 errdetail("The database cluster was initialized with RELSEG_SIZE %d,"
3575                                   " but the server was compiled with RELSEG_SIZE %d.",
3576                                   ControlFile->relseg_size, RELSEG_SIZE),
3577                                  errhint("It looks like you need to recompile or initdb.")));
3578         if (ControlFile->xlog_seg_size != XLOG_SEG_SIZE)
3579                 ereport(FATAL,
3580                                 (errmsg("database files are incompatible with server"),
3581                                  errdetail("The database cluster was initialized with XLOG_SEG_SIZE %d,"
3582                                            " but the server was compiled with XLOG_SEG_SIZE %d.",
3583                                                    ControlFile->xlog_seg_size, XLOG_SEG_SIZE),
3584                                  errhint("It looks like you need to recompile or initdb.")));
3585         if (ControlFile->nameDataLen != NAMEDATALEN)
3586                 ereport(FATAL,
3587                                 (errmsg("database files are incompatible with server"),
3588                 errdetail("The database cluster was initialized with NAMEDATALEN %d,"
3589                                   " but the server was compiled with NAMEDATALEN %d.",
3590                                   ControlFile->nameDataLen, NAMEDATALEN),
3591                                  errhint("It looks like you need to recompile or initdb.")));
3592         if (ControlFile->indexMaxKeys != INDEX_MAX_KEYS)
3593                 ereport(FATAL,
3594                                 (errmsg("database files are incompatible with server"),
3595                                  errdetail("The database cluster was initialized with INDEX_MAX_KEYS %d,"
3596                                           " but the server was compiled with INDEX_MAX_KEYS %d.",
3597                                                    ControlFile->indexMaxKeys, INDEX_MAX_KEYS),
3598                                  errhint("It looks like you need to recompile or initdb.")));
3599
3600 #ifdef HAVE_INT64_TIMESTAMP
3601         if (ControlFile->enableIntTimes != TRUE)
3602                 ereport(FATAL,
3603                                 (errmsg("database files are incompatible with server"),
3604                                  errdetail("The database cluster was initialized without HAVE_INT64_TIMESTAMP"
3605                                   " but the server was compiled with HAVE_INT64_TIMESTAMP."),
3606                                  errhint("It looks like you need to recompile or initdb.")));
3607 #else
3608         if (ControlFile->enableIntTimes != FALSE)
3609                 ereport(FATAL,
3610                                 (errmsg("database files are incompatible with server"),
3611                                  errdetail("The database cluster was initialized with HAVE_INT64_TIMESTAMP"
3612                            " but the server was compiled without HAVE_INT64_TIMESTAMP."),
3613                                  errhint("It looks like you need to recompile or initdb.")));
3614 #endif
3615
3616         if (ControlFile->localeBuflen != LOCALE_NAME_BUFLEN)
3617                 ereport(FATAL,
3618                                 (errmsg("database files are incompatible with server"),
3619                                  errdetail("The database cluster was initialized with LOCALE_NAME_BUFLEN %d,"
3620                                   " but the server was compiled with LOCALE_NAME_BUFLEN %d.",
3621                                                    ControlFile->localeBuflen, LOCALE_NAME_BUFLEN),
3622                                  errhint("It looks like you need to recompile or initdb.")));
3623         if (pg_perm_setlocale(LC_COLLATE, ControlFile->lc_collate) == NULL)
3624                 ereport(FATAL,
3625                         (errmsg("database files are incompatible with operating system"),
3626                          errdetail("The database cluster was initialized with LC_COLLATE \"%s\","
3627                                            " which is not recognized by setlocale().",
3628                                            ControlFile->lc_collate),
3629                          errhint("It looks like you need to initdb or install locale support.")));
3630         if (pg_perm_setlocale(LC_CTYPE, ControlFile->lc_ctype) == NULL)
3631                 ereport(FATAL,
3632                         (errmsg("database files are incompatible with operating system"),
3633                 errdetail("The database cluster was initialized with LC_CTYPE \"%s\","
3634                                   " which is not recognized by setlocale().",
3635                                   ControlFile->lc_ctype),
3636                          errhint("It looks like you need to initdb or install locale support.")));
3637
3638         /* Make the fixed locale settings visible as GUC variables, too */
3639         SetConfigOption("lc_collate", ControlFile->lc_collate,
3640                                         PGC_INTERNAL, PGC_S_OVERRIDE);
3641         SetConfigOption("lc_ctype", ControlFile->lc_ctype,
3642                                         PGC_INTERNAL, PGC_S_OVERRIDE);
3643 }
3644
3645 void
3646 UpdateControlFile(void)
3647 {
3648         int                     fd;
3649
3650         INIT_CRC32(ControlFile->crc);
3651         COMP_CRC32(ControlFile->crc,
3652                            (char *) ControlFile,
3653                            offsetof(ControlFileData, crc));
3654         FIN_CRC32(ControlFile->crc);
3655
3656         fd = BasicOpenFile(XLOG_CONTROL_FILE,
3657                                            O_RDWR | PG_BINARY,
3658                                            S_IRUSR | S_IWUSR);
3659         if (fd < 0)
3660                 ereport(PANIC,
3661                                 (errcode_for_file_access(),
3662                                  errmsg("could not open control file \"%s\": %m",
3663                                                 XLOG_CONTROL_FILE)));
3664
3665         errno = 0;
3666         if (write(fd, ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
3667         {
3668                 /* if write didn't set errno, assume problem is no disk space */
3669                 if (errno == 0)
3670                         errno = ENOSPC;
3671                 ereport(PANIC,
3672                                 (errcode_for_file_access(),
3673                                  errmsg("could not write to control file: %m")));
3674         }
3675
3676         if (pg_fsync(fd) != 0)
3677                 ereport(PANIC,
3678                                 (errcode_for_file_access(),
3679                                  errmsg("could not fsync control file: %m")));
3680
3681         if (close(fd))
3682                 ereport(PANIC,
3683                                 (errcode_for_file_access(),
3684                                  errmsg("could not close control file: %m")));
3685 }
3686
3687 /*
3688  * Initialization of shared memory for XLOG
3689  */
3690 Size
3691 XLOGShmemSize(void)
3692 {
3693         Size            size;
3694
3695         /* XLogCtl */
3696         size = sizeof(XLogCtlData);
3697         /* xlblocks array */
3698         size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers));
3699         /* extra alignment padding for XLOG I/O buffers */
3700         size = add_size(size, ALIGNOF_XLOG_BUFFER);
3701         /* and the buffers themselves */
3702         size = add_size(size, mul_size(BLCKSZ, XLOGbuffers));
3703
3704         /*
3705          * Note: we don't count ControlFileData, it comes out of the "slop factor"
3706          * added by CreateSharedMemoryAndSemaphores.  This lets us use this
3707          * routine again below to compute the actual allocation size.
3708          */
3709
3710         return size;
3711 }
3712
3713 void
3714 XLOGShmemInit(void)
3715 {
3716         bool            foundCFile,
3717                                 foundXLog;
3718         char       *allocptr;
3719
3720         ControlFile = (ControlFileData *)
3721                 ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile);
3722         XLogCtl = (XLogCtlData *)
3723                 ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog);
3724
3725         if (foundCFile || foundXLog)
3726         {
3727                 /* both should be present or neither */
3728                 Assert(foundCFile && foundXLog);
3729                 return;
3730         }
3731
3732         memset(XLogCtl, 0, sizeof(XLogCtlData));
3733
3734         /*
3735          * Since XLogCtlData contains XLogRecPtr fields, its sizeof should be a
3736          * multiple of the alignment for same, so no extra alignment padding is
3737          * needed here.
3738          */
3739         allocptr = ((char *) XLogCtl) + sizeof(XLogCtlData);
3740         XLogCtl->xlblocks = (XLogRecPtr *) allocptr;
3741         memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
3742         allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
3743
3744         /*
3745          * Align the start of the page buffers to an ALIGNOF_XLOG_BUFFER boundary.
3746          */
3747         allocptr = (char *) TYPEALIGN(ALIGNOF_XLOG_BUFFER, allocptr);
3748         XLogCtl->pages = allocptr;
3749         memset(XLogCtl->pages, 0, (Size) BLCKSZ * XLOGbuffers);
3750
3751         /*
3752          * Do basic initialization of XLogCtl shared data. (StartupXLOG will fill
3753          * in additional info.)
3754          */
3755         XLogCtl->XLogCacheByte = (Size) BLCKSZ *XLOGbuffers;
3756
3757         XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
3758         XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
3759         SpinLockInit(&XLogCtl->info_lck);
3760
3761         /*
3762          * If we are not in bootstrap mode, pg_control should already exist. Read
3763          * and validate it immediately (see comments in ReadControlFile() for the
3764          * reasons why).
3765          */
3766         if (!IsBootstrapProcessingMode())
3767                 ReadControlFile();
3768 }
3769
3770 /*
3771  * This func must be called ONCE on system install.  It creates pg_control
3772  * and the initial XLOG segment.
3773  */
3774 void
3775 BootStrapXLOG(void)
3776 {
3777         CheckPoint      checkPoint;
3778         char       *buffer;
3779         XLogPageHeader page;
3780         XLogLongPageHeader longpage;
3781         XLogRecord *record;
3782         bool            use_existent;
3783         uint64          sysidentifier;
3784         struct timeval tv;
3785         pg_crc32        crc;
3786
3787         /*
3788          * Select a hopefully-unique system identifier code for this installation.
3789          * We use the result of gettimeofday(), including the fractional seconds
3790          * field, as being about as unique as we can easily get.  (Think not to
3791          * use random(), since it hasn't been seeded and there's no portable way
3792          * to seed it other than the system clock value...)  The upper half of the
3793          * uint64 value is just the tv_sec part, while the lower half is the XOR
3794          * of tv_sec and tv_usec.  This is to ensure that we don't lose uniqueness
3795          * unnecessarily if "uint64" is really only 32 bits wide.  A person
3796          * knowing this encoding can determine the initialization time of the
3797          * installation, which could perhaps be useful sometimes.
3798          */
3799         gettimeofday(&tv, NULL);
3800         sysidentifier = ((uint64) tv.tv_sec) << 32;
3801         sysidentifier |= (uint32) (tv.tv_sec | tv.tv_usec);
3802
3803         /* First timeline ID is always 1 */
3804         ThisTimeLineID = 1;
3805
3806         /* page buffer must be aligned suitably for O_DIRECT */
3807         buffer = (char *) palloc(BLCKSZ + ALIGNOF_XLOG_BUFFER);
3808         page = (XLogPageHeader) TYPEALIGN(ALIGNOF_XLOG_BUFFER, buffer);
3809         memset(page, 0, BLCKSZ);
3810
3811         /* Set up information for the initial checkpoint record */
3812         checkPoint.redo.xlogid = 0;
3813         checkPoint.redo.xrecoff = SizeOfXLogLongPHD;
3814         checkPoint.undo = checkPoint.redo;
3815         checkPoint.ThisTimeLineID = ThisTimeLineID;
3816         checkPoint.nextXid = FirstNormalTransactionId;
3817         checkPoint.nextOid = FirstBootstrapObjectId;
3818         checkPoint.nextMulti = FirstMultiXactId;
3819         checkPoint.nextMultiOffset = 0;
3820         checkPoint.time = time(NULL);
3821
3822         ShmemVariableCache->nextXid = checkPoint.nextXid;
3823         ShmemVariableCache->nextOid = checkPoint.nextOid;
3824         ShmemVariableCache->oidCount = 0;
3825         MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
3826
3827         /* Set up the XLOG page header */
3828         page->xlp_magic = XLOG_PAGE_MAGIC;
3829         page->xlp_info = XLP_LONG_HEADER;
3830         page->xlp_tli = ThisTimeLineID;
3831         page->xlp_pageaddr.xlogid = 0;
3832         page->xlp_pageaddr.xrecoff = 0;
3833         longpage = (XLogLongPageHeader) page;
3834         longpage->xlp_sysid = sysidentifier;
3835         longpage->xlp_seg_size = XLogSegSize;
3836
3837         /* Insert the initial checkpoint record */
3838         record = (XLogRecord *) ((char *) page + SizeOfXLogLongPHD);
3839         record->xl_prev.xlogid = 0;
3840         record->xl_prev.xrecoff = 0;
3841         record->xl_xid = InvalidTransactionId;
3842         record->xl_tot_len = SizeOfXLogRecord + sizeof(checkPoint);
3843         record->xl_len = sizeof(checkPoint);
3844         record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
3845         record->xl_rmid = RM_XLOG_ID;
3846         memcpy(XLogRecGetData(record), &checkPoint, sizeof(checkPoint));
3847
3848         INIT_CRC32(crc);
3849         COMP_CRC32(crc, &checkPoint, sizeof(checkPoint));
3850         COMP_CRC32(crc, (char *) record + sizeof(pg_crc32),
3851                            SizeOfXLogRecord - sizeof(pg_crc32));
3852         FIN_CRC32(crc);
3853         record->xl_crc = crc;
3854
3855         /* Create first XLOG segment file */
3856         use_existent = false;
3857         openLogFile = XLogFileInit(0, 0, &use_existent, false);
3858
3859         /* Write the first page with the initial record */
3860         errno = 0;
3861         if (write(openLogFile, page, BLCKSZ) != BLCKSZ)
3862         {
3863                 /* if write didn't set errno, assume problem is no disk space */
3864                 if (errno == 0)
3865                         errno = ENOSPC;
3866                 ereport(PANIC,
3867                                 (errcode_for_file_access(),
3868                           errmsg("could not write bootstrap transaction log file: %m")));
3869         }
3870
3871         if (pg_fsync(openLogFile) != 0)
3872                 ereport(PANIC,
3873                                 (errcode_for_file_access(),
3874                           errmsg("could not fsync bootstrap transaction log file: %m")));
3875
3876         if (close(openLogFile))
3877                 ereport(PANIC,
3878                                 (errcode_for_file_access(),
3879                           errmsg("could not close bootstrap transaction log file: %m")));
3880
3881         openLogFile = -1;
3882
3883         /* Now create pg_control */
3884
3885         memset(ControlFile, 0, sizeof(ControlFileData));
3886         /* Initialize pg_control status fields */
3887         ControlFile->system_identifier = sysidentifier;
3888         ControlFile->state = DB_SHUTDOWNED;
3889         ControlFile->time = checkPoint.time;
3890         ControlFile->logId = 0;
3891         ControlFile->logSeg = 1;
3892         ControlFile->checkPoint = checkPoint.redo;
3893         ControlFile->checkPointCopy = checkPoint;
3894         /* some additional ControlFile fields are set in WriteControlFile() */
3895
3896         WriteControlFile();
3897
3898         /* Bootstrap the commit log, too */
3899         BootStrapCLOG();
3900         BootStrapSUBTRANS();
3901         BootStrapMultiXact();
3902
3903         pfree(buffer);
3904 }
3905
3906 static char *
3907 str_time(time_t tnow)
3908 {
3909         static char buf[128];
3910
3911         strftime(buf, sizeof(buf),
3912                          "%Y-%m-%d %H:%M:%S %Z",
3913                          localtime(&tnow));
3914
3915         return buf;
3916 }
3917
3918 /*
3919  * See if there is a recovery command file (recovery.conf), and if so
3920  * read in parameters for archive recovery.
3921  *
3922  * XXX longer term intention is to expand this to
3923  * cater for additional parameters and controls
3924  * possibly use a flex lexer similar to the GUC one
3925  */
3926 static void
3927 readRecoveryCommandFile(void)
3928 {
3929         FILE       *fd;
3930         char            cmdline[MAXPGPATH];
3931         TimeLineID      rtli = 0;
3932         bool            rtliGiven = false;
3933         bool            syntaxError = false;
3934
3935         fd = AllocateFile(RECOVERY_COMMAND_FILE, "r");
3936         if (fd == NULL)
3937         {
3938                 if (errno == ENOENT)
3939                         return;                         /* not there, so no archive recovery */
3940                 ereport(FATAL,
3941                                 (errcode_for_file_access(),
3942                                  errmsg("could not open recovery command file \"%s\": %m",
3943                                                 RECOVERY_COMMAND_FILE)));
3944         }
3945
3946         ereport(LOG,
3947                         (errmsg("starting archive recovery")));
3948
3949         /*
3950          * Parse the file...
3951          */
3952         while (fgets(cmdline, MAXPGPATH, fd) != NULL)
3953         {
3954                 /* skip leading whitespace and check for # comment */
3955                 char       *ptr;
3956                 char       *tok1;
3957                 char       *tok2;
3958
3959                 for (ptr = cmdline; *ptr; ptr++)
3960                 {
3961                         if (!isspace((unsigned char) *ptr))
3962                                 break;
3963                 }
3964                 if (*ptr == '\0' || *ptr == '#')
3965                         continue;
3966
3967                 /* identify the quoted parameter value */
3968                 tok1 = strtok(ptr, "'");
3969                 if (!tok1)
3970                 {
3971                         syntaxError = true;
3972                         break;
3973                 }
3974                 tok2 = strtok(NULL, "'");
3975                 if (!tok2)
3976                 {
3977                         syntaxError = true;
3978                         break;
3979                 }
3980                 /* reparse to get just the parameter name */
3981                 tok1 = strtok(ptr, " \t=");
3982                 if (!tok1)
3983                 {
3984                         syntaxError = true;
3985                         break;
3986                 }
3987
3988                 if (strcmp(tok1, "restore_command") == 0)
3989                 {
3990                         recoveryRestoreCommand = pstrdup(tok2);
3991                         ereport(LOG,
3992                                         (errmsg("restore_command = \"%s\"",
3993                                                         recoveryRestoreCommand)));
3994                 }
3995                 else if (strcmp(tok1, "recovery_target_timeline") == 0)
3996                 {
3997                         rtliGiven = true;
3998                         if (strcmp(tok2, "latest") == 0)
3999                                 rtli = 0;
4000                         else
4001                         {
4002                                 errno = 0;
4003                                 rtli = (TimeLineID) strtoul(tok2, NULL, 0);
4004                                 if (errno == EINVAL || errno == ERANGE)
4005                                         ereport(FATAL,
4006                                                         (errmsg("recovery_target_timeline is not a valid number: \"%s\"",
4007                                                                         tok2)));
4008                         }
4009                         if (rtli)
4010                                 ereport(LOG,
4011                                                 (errmsg("recovery_target_timeline = %u", rtli)));
4012                         else
4013                                 ereport(LOG,
4014                                                 (errmsg("recovery_target_timeline = latest")));
4015                 }
4016                 else if (strcmp(tok1, "recovery_target_xid") == 0)
4017                 {
4018                         errno = 0;
4019                         recoveryTargetXid = (TransactionId) strtoul(tok2, NULL, 0);
4020                         if (errno == EINVAL || errno == ERANGE)
4021                                 ereport(FATAL,
4022                                  (errmsg("recovery_target_xid is not a valid number: \"%s\"",
4023                                                  tok2)));
4024                         ereport(LOG,
4025                                         (errmsg("recovery_target_xid = %u",
4026                                                         recoveryTargetXid)));
4027                         recoveryTarget = true;
4028                         recoveryTargetExact = true;
4029                 }
4030                 else if (strcmp(tok1, "recovery_target_time") == 0)
4031                 {
4032                         /*
4033                          * if recovery_target_xid specified, then this overrides
4034                          * recovery_target_time
4035                          */
4036                         if (recoveryTargetExact)
4037                                 continue;
4038                         recoveryTarget = true;
4039                         recoveryTargetExact = false;
4040
4041                         /*
4042                          * Convert the time string given by the user to the time_t format.
4043                          * We use type abstime's input converter because we know abstime
4044                          * has the same representation as time_t.
4045                          */
4046                         recoveryTargetTime = (time_t)
4047                                 DatumGetAbsoluteTime(DirectFunctionCall1(abstimein,
4048                                                                                                          CStringGetDatum(tok2)));
4049                         ereport(LOG,
4050                                         (errmsg("recovery_target_time = %s",
4051                                                         DatumGetCString(DirectFunctionCall1(abstimeout,
4052                                 AbsoluteTimeGetDatum((AbsoluteTime) recoveryTargetTime))))));
4053                 }
4054                 else if (strcmp(tok1, "recovery_target_inclusive") == 0)
4055                 {
4056                         /*
4057                          * does nothing if a recovery_target is not also set
4058                          */
4059                         if (strcmp(tok2, "true") == 0)
4060                                 recoveryTargetInclusive = true;
4061                         else
4062                         {
4063                                 recoveryTargetInclusive = false;
4064                                 tok2 = "false";
4065                         }
4066                         ereport(LOG,
4067                                         (errmsg("recovery_target_inclusive = %s", tok2)));
4068                 }
4069                 else
4070                         ereport(FATAL,
4071                                         (errmsg("unrecognized recovery parameter \"%s\"",
4072                                                         tok1)));
4073         }
4074
4075         FreeFile(fd);
4076
4077         if (syntaxError)
4078                 ereport(FATAL,
4079                                 (errmsg("syntax error in recovery command file: %s",
4080                                                 cmdline),
4081                           errhint("Lines should have the format parameter = 'value'.")));
4082
4083         /* Check that required parameters were supplied */
4084         if (recoveryRestoreCommand == NULL)
4085                 ereport(FATAL,
4086                                 (errmsg("recovery command file \"%s\" did not specify restore_command",
4087                                                 RECOVERY_COMMAND_FILE)));
4088
4089         /* Enable fetching from archive recovery area */
4090         InArchiveRecovery = true;
4091
4092         /*
4093          * If user specified recovery_target_timeline, validate it or compute the
4094          * "latest" value.      We can't do this until after we've gotten the restore
4095          * command and set InArchiveRecovery, because we need to fetch timeline
4096          * history files from the archive.
4097          */
4098         if (rtliGiven)
4099         {
4100                 if (rtli)
4101                 {
4102                         /* Timeline 1 does not have a history file, all else should */
4103                         if (rtli != 1 && !existsTimeLineHistory(rtli))
4104                                 ereport(FATAL,
4105                                                 (errmsg("recovery_target_timeline %u does not exist",
4106                                                                 rtli)));
4107                         recoveryTargetTLI = rtli;
4108                 }
4109                 else
4110                 {
4111                         /* We start the "latest" search from pg_control's timeline */
4112                         recoveryTargetTLI = findNewestTimeLine(recoveryTargetTLI);
4113                 }
4114         }
4115 }
4116
4117 /*
4118  * Exit archive-recovery state
4119  */
4120 static void
4121 exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
4122 {
4123         char            recoveryPath[MAXPGPATH];
4124         char            xlogpath[MAXPGPATH];
4125
4126         /*
4127          * We are no longer in archive recovery state.
4128          */
4129         InArchiveRecovery = false;
4130
4131         /*
4132          * We should have the ending log segment currently open.  Verify, and then
4133          * close it (to avoid problems on Windows with trying to rename or delete
4134          * an open file).
4135          */
4136         Assert(readFile >= 0);
4137         Assert(readId == endLogId);
4138         Assert(readSeg == endLogSeg);
4139
4140         close(readFile);
4141         readFile = -1;
4142
4143         /*
4144          * If the segment was fetched from archival storage, we want to replace
4145          * the existing xlog segment (if any) with the archival version.  This is
4146          * because whatever is in XLOGDIR is very possibly older than what we have
4147          * from the archives, since it could have come from restoring a PGDATA
4148          * backup.      In any case, the archival version certainly is more
4149          * descriptive of what our current database state is, because that is what
4150          * we replayed from.
4151          *
4152          * Note that if we are establishing a new timeline, ThisTimeLineID is
4153          * already set to the new value, and so we will create a new file instead
4154          * of overwriting any existing file.
4155          */
4156         snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYXLOG");
4157         XLogFilePath(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
4158
4159         if (restoredFromArchive)
4160         {
4161                 ereport(DEBUG3,
4162                                 (errmsg_internal("moving last restored xlog to \"%s\"",
4163                                                                  xlogpath)));
4164                 unlink(xlogpath);               /* might or might not exist */
4165                 if (rename(recoveryPath, xlogpath) != 0)
4166                         ereport(FATAL,
4167                                         (errcode_for_file_access(),
4168                                          errmsg("could not rename file \"%s\" to \"%s\": %m",
4169                                                         recoveryPath, xlogpath)));
4170                 /* XXX might we need to fix permissions on the file? */
4171         }
4172         else
4173         {
4174                 /*
4175                  * If the latest segment is not archival, but there's still a
4176                  * RECOVERYXLOG laying about, get rid of it.
4177                  */
4178                 unlink(recoveryPath);   /* ignore any error */
4179
4180                 /*
4181                  * If we are establishing a new timeline, we have to copy data from
4182                  * the last WAL segment of the old timeline to create a starting WAL
4183                  * segment for the new timeline.
4184                  */
4185                 if (endTLI != ThisTimeLineID)
4186                         XLogFileCopy(endLogId, endLogSeg,
4187                                                  endTLI, endLogId, endLogSeg);
4188         }
4189
4190         /*
4191          * Let's just make real sure there are not .ready or .done flags posted
4192          * for the new segment.
4193          */
4194         XLogFileName(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
4195         XLogArchiveCleanup(xlogpath);
4196
4197         /* Get rid of any remaining recovered timeline-history file, too */
4198         snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYHISTORY");
4199         unlink(recoveryPath);           /* ignore any error */
4200
4201         /*
4202          * Rename the config file out of the way, so that we don't accidentally
4203          * re-enter archive recovery mode in a subsequent crash.
4204          */
4205         unlink(RECOVERY_COMMAND_DONE);
4206         if (rename(RECOVERY_COMMAND_FILE, RECOVERY_COMMAND_DONE) != 0)
4207                 ereport(FATAL,
4208                                 (errcode_for_file_access(),
4209                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
4210                                                 RECOVERY_COMMAND_FILE, RECOVERY_COMMAND_DONE)));
4211
4212         ereport(LOG,
4213                         (errmsg("archive recovery complete")));
4214 }
4215
4216 /*
4217  * For point-in-time recovery, this function decides whether we want to
4218  * stop applying the XLOG at or after the current record.
4219  *
4220  * Returns TRUE if we are stopping, FALSE otherwise.  On TRUE return,
4221  * *includeThis is set TRUE if we should apply this record before stopping.
4222  * Also, some information is saved in recoveryStopXid et al for use in
4223  * annotating the new timeline's history file.
4224  */
4225 static bool
4226 recoveryStopsHere(XLogRecord *record, bool *includeThis)
4227 {
4228         bool            stopsHere;
4229         uint8           record_info;
4230         time_t          recordXtime;
4231
4232         /* Do we have a PITR target at all? */
4233         if (!recoveryTarget)
4234                 return false;
4235
4236         /* We only consider stopping at COMMIT or ABORT records */
4237         if (record->xl_rmid != RM_XACT_ID)
4238                 return false;
4239         record_info = record->xl_info & ~XLR_INFO_MASK;
4240         if (record_info == XLOG_XACT_COMMIT)
4241         {
4242                 xl_xact_commit *recordXactCommitData;
4243
4244                 recordXactCommitData = (xl_xact_commit *) XLogRecGetData(record);
4245                 recordXtime = recordXactCommitData->xtime;
4246         }
4247         else if (record_info == XLOG_XACT_ABORT)
4248         {
4249                 xl_xact_abort *recordXactAbortData;
4250
4251                 recordXactAbortData = (xl_xact_abort *) XLogRecGetData(record);
4252                 recordXtime = recordXactAbortData->xtime;
4253         }
4254         else
4255                 return false;
4256
4257         if (recoveryTargetExact)
4258         {
4259                 /*
4260                  * there can be only one transaction end record with this exact
4261                  * transactionid
4262                  *
4263                  * when testing for an xid, we MUST test for equality only, since
4264                  * transactions are numbered in the order they start, not the order
4265                  * they complete. A higher numbered xid will complete before you about
4266                  * 50% of the time...
4267                  */
4268                 stopsHere = (record->xl_xid == recoveryTargetXid);
4269                 if (stopsHere)
4270                         *includeThis = recoveryTargetInclusive;
4271         }
4272         else
4273         {
4274                 /*
4275                  * there can be many transactions that share the same commit time, so
4276                  * we stop after the last one, if we are inclusive, or stop at the
4277                  * first one if we are exclusive
4278                  */
4279                 if (recoveryTargetInclusive)
4280                         stopsHere = (recordXtime > recoveryTargetTime);
4281                 else
4282                         stopsHere = (recordXtime >= recoveryTargetTime);
4283                 if (stopsHere)
4284                         *includeThis = false;
4285         }
4286
4287         if (stopsHere)
4288         {
4289                 recoveryStopXid = record->xl_xid;
4290                 recoveryStopTime = recordXtime;
4291                 recoveryStopAfter = *includeThis;
4292
4293                 if (record_info == XLOG_XACT_COMMIT)
4294                 {
4295                         if (recoveryStopAfter)
4296                                 ereport(LOG,
4297                                                 (errmsg("recovery stopping after commit of transaction %u, time %s",
4298                                                           recoveryStopXid, str_time(recoveryStopTime))));
4299                         else
4300                                 ereport(LOG,
4301                                                 (errmsg("recovery stopping before commit of transaction %u, time %s",
4302                                                           recoveryStopXid, str_time(recoveryStopTime))));
4303                 }
4304                 else
4305                 {
4306                         if (recoveryStopAfter)
4307                                 ereport(LOG,
4308                                                 (errmsg("recovery stopping after abort of transaction %u, time %s",
4309                                                           recoveryStopXid, str_time(recoveryStopTime))));
4310                         else
4311                                 ereport(LOG,
4312                                                 (errmsg("recovery stopping before abort of transaction %u, time %s",
4313                                                           recoveryStopXid, str_time(recoveryStopTime))));
4314                 }
4315         }
4316
4317         return stopsHere;
4318 }
4319
4320 /*
4321  * This must be called ONCE during postmaster or standalone-backend startup
4322  */
4323 void
4324 StartupXLOG(void)
4325 {
4326         XLogCtlInsert *Insert;
4327         CheckPoint      checkPoint;
4328         bool            wasShutdown;
4329         bool            needNewTimeLine = false;
4330         XLogRecPtr      RecPtr,
4331                                 LastRec,
4332                                 checkPointLoc,
4333                                 EndOfLog;
4334         uint32          endLogId;
4335         uint32          endLogSeg;
4336         XLogRecord *record;
4337         uint32          freespace;
4338         TransactionId oldestActiveXID;
4339
4340         CritSectionCount++;
4341
4342         /*
4343          * Read control file and check XLOG status looks valid.
4344          *
4345          * Note: in most control paths, *ControlFile is already valid and we need
4346          * not do ReadControlFile() here, but might as well do it to be sure.
4347          */
4348         ReadControlFile();
4349
4350         if (ControlFile->logSeg == 0 ||
4351                 ControlFile->state < DB_SHUTDOWNED ||
4352                 ControlFile->state > DB_IN_PRODUCTION ||
4353                 !XRecOffIsValid(ControlFile->checkPoint.xrecoff))
4354                 ereport(FATAL,
4355                                 (errmsg("control file contains invalid data")));
4356
4357         if (ControlFile->state == DB_SHUTDOWNED)
4358                 ereport(LOG,
4359                                 (errmsg("database system was shut down at %s",
4360                                                 str_time(ControlFile->time))));
4361         else if (ControlFile->state == DB_SHUTDOWNING)
4362                 ereport(LOG,
4363                                 (errmsg("database system shutdown was interrupted at %s",
4364                                                 str_time(ControlFile->time))));
4365         else if (ControlFile->state == DB_IN_RECOVERY)
4366                 ereport(LOG,
4367                    (errmsg("database system was interrupted while in recovery at %s",
4368                                    str_time(ControlFile->time)),
4369                         errhint("This probably means that some data is corrupted and"
4370                                         " you will have to use the last backup for recovery.")));
4371         else if (ControlFile->state == DB_IN_PRODUCTION)
4372                 ereport(LOG,
4373                                 (errmsg("database system was interrupted at %s",
4374                                                 str_time(ControlFile->time))));
4375
4376         /* This is just to allow attaching to startup process with a debugger */
4377 #ifdef XLOG_REPLAY_DELAY
4378         if (ControlFile->state != DB_SHUTDOWNED)
4379                 pg_usleep(60000000L);
4380 #endif
4381
4382         /*
4383          * Initialize on the assumption we want to recover to the same timeline
4384          * that's active according to pg_control.
4385          */
4386         recoveryTargetTLI = ControlFile->checkPointCopy.ThisTimeLineID;
4387
4388         /*
4389          * Check for recovery control file, and if so set up state for offline
4390          * recovery
4391          */
4392         readRecoveryCommandFile();
4393
4394         /* Now we can determine the list of expected TLIs */
4395         expectedTLIs = readTimeLineHistory(recoveryTargetTLI);
4396
4397         /*
4398          * If pg_control's timeline is not in expectedTLIs, then we cannot
4399          * proceed: the backup is not part of the history of the requested
4400          * timeline.
4401          */
4402         if (!list_member_int(expectedTLIs,
4403                                                  (int) ControlFile->checkPointCopy.ThisTimeLineID))
4404                 ereport(FATAL,
4405                                 (errmsg("requested timeline %u is not a child of database system timeline %u",
4406                                                 recoveryTargetTLI,
4407                                                 ControlFile->checkPointCopy.ThisTimeLineID)));
4408
4409         if (read_backup_label(&checkPointLoc))
4410         {
4411                 /*
4412                  * When a backup_label file is present, we want to roll forward from
4413                  * the checkpoint it identifies, rather than using pg_control.
4414                  */
4415                 record = ReadCheckpointRecord(checkPointLoc, 0);
4416                 if (record != NULL)
4417                 {
4418                         ereport(LOG,
4419                                         (errmsg("checkpoint record is at %X/%X",
4420                                                         checkPointLoc.xlogid, checkPointLoc.xrecoff)));
4421                         InRecovery = true;      /* force recovery even if SHUTDOWNED */
4422                 }
4423                 else
4424                 {
4425                         ereport(PANIC,
4426                                         (errmsg("could not locate required checkpoint record"),
4427                                          errhint("If you are not restoring from a backup, try removing the file \"%s/backup_label\".", DataDir)));
4428                 }
4429         }
4430         else
4431         {
4432                 /*
4433                  * Get the last valid checkpoint record.  If the latest one according
4434                  * to pg_control is broken, try the next-to-last one.
4435                  */
4436                 checkPointLoc = ControlFile->checkPoint;
4437                 record = ReadCheckpointRecord(checkPointLoc, 1);
4438                 if (record != NULL)
4439                 {
4440                         ereport(LOG,
4441                                         (errmsg("checkpoint record is at %X/%X",
4442                                                         checkPointLoc.xlogid, checkPointLoc.xrecoff)));
4443                 }
4444                 else
4445                 {
4446                         checkPointLoc = ControlFile->prevCheckPoint;
4447                         record = ReadCheckpointRecord(checkPointLoc, 2);
4448                         if (record != NULL)
4449                         {
4450                                 ereport(LOG,
4451                                                 (errmsg("using previous checkpoint record at %X/%X",
4452                                                           checkPointLoc.xlogid, checkPointLoc.xrecoff)));
4453                                 InRecovery = true;              /* force recovery even if SHUTDOWNED */
4454                         }
4455                         else
4456                                 ereport(PANIC,
4457                                          (errmsg("could not locate a valid checkpoint record")));
4458                 }
4459         }
4460
4461         LastRec = RecPtr = checkPointLoc;
4462         memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
4463         wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
4464
4465         ereport(LOG,
4466          (errmsg("redo record is at %X/%X; undo record is at %X/%X; shutdown %s",
4467                          checkPoint.redo.xlogid, checkPoint.redo.xrecoff,
4468                          checkPoint.undo.xlogid, checkPoint.undo.xrecoff,
4469                          wasShutdown ? "TRUE" : "FALSE")));
4470         ereport(LOG,
4471                         (errmsg("next transaction ID: %u; next OID: %u",
4472                                         checkPoint.nextXid, checkPoint.nextOid)));
4473         ereport(LOG,
4474                         (errmsg("next MultiXactId: %u; next MultiXactOffset: %u",
4475                                         checkPoint.nextMulti, checkPoint.nextMultiOffset)));
4476         if (!TransactionIdIsNormal(checkPoint.nextXid))
4477                 ereport(PANIC,
4478                                 (errmsg("invalid next transaction ID")));
4479
4480         ShmemVariableCache->nextXid = checkPoint.nextXid;
4481         ShmemVariableCache->nextOid = checkPoint.nextOid;
4482         ShmemVariableCache->oidCount = 0;
4483         MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
4484
4485         /*
4486          * We must replay WAL entries using the same TimeLineID they were created
4487          * under, so temporarily adopt the TLI indicated by the checkpoint (see
4488          * also xlog_redo()).
4489          */
4490         ThisTimeLineID = checkPoint.ThisTimeLineID;
4491
4492         RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
4493
4494         if (XLByteLT(RecPtr, checkPoint.redo))
4495                 ereport(PANIC,
4496                                 (errmsg("invalid redo in checkpoint record")));
4497         if (checkPoint.undo.xrecoff == 0)
4498                 checkPoint.undo = RecPtr;
4499
4500         /*
4501          * Check whether we need to force recovery from WAL.  If it appears to
4502          * have been a clean shutdown and we did not have a recovery.conf file,
4503          * then assume no recovery needed.
4504          */
4505         if (XLByteLT(checkPoint.undo, RecPtr) ||
4506                 XLByteLT(checkPoint.redo, RecPtr))
4507         {
4508                 if (wasShutdown)
4509                         ereport(PANIC,
4510                                 (errmsg("invalid redo/undo record in shutdown checkpoint")));
4511                 InRecovery = true;
4512         }
4513         else if (ControlFile->state != DB_SHUTDOWNED)
4514                 InRecovery = true;
4515         else if (InArchiveRecovery)
4516         {
4517                 /* force recovery due to presence of recovery.conf */
4518                 InRecovery = true;
4519         }
4520
4521         /* REDO */
4522         if (InRecovery)
4523         {
4524                 int                     rmid;
4525
4526                 if (InArchiveRecovery)
4527                         ereport(LOG,
4528                                         (errmsg("automatic recovery in progress")));
4529                 else
4530                         ereport(LOG,
4531                                         (errmsg("database system was not properly shut down; "
4532                                                         "automatic recovery in progress")));
4533                 ControlFile->state = DB_IN_RECOVERY;
4534                 ControlFile->time = time(NULL);
4535                 UpdateControlFile();
4536
4537                 /* Start up the recovery environment */
4538                 XLogInitRelationCache();
4539
4540                 for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
4541                 {
4542                         if (RmgrTable[rmid].rm_startup != NULL)
4543                                 RmgrTable[rmid].rm_startup();
4544                 }
4545
4546                 /*
4547                  * Find the first record that logically follows the checkpoint --- it
4548                  * might physically precede it, though.
4549                  */
4550                 if (XLByteLT(checkPoint.redo, RecPtr))
4551                 {
4552                         /* back up to find the record */
4553                         record = ReadRecord(&(checkPoint.redo), PANIC);
4554                 }
4555                 else
4556                 {
4557                         /* just have to read next record after CheckPoint */
4558                         record = ReadRecord(NULL, LOG);
4559                 }
4560
4561                 if (record != NULL)
4562                 {
4563                         bool            recoveryContinue = true;
4564                         bool            recoveryApply = true;
4565
4566                         InRedo = true;
4567                         ereport(LOG,
4568                                         (errmsg("redo starts at %X/%X",
4569                                                         ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
4570
4571                         /*
4572                          * main redo apply loop
4573                          */
4574                         do
4575                         {
4576 #ifdef WAL_DEBUG
4577                                 if (XLOG_DEBUG)
4578                                 {
4579                                         char            buf[8192];
4580
4581                                         sprintf(buf, "REDO @ %X/%X; LSN %X/%X: ",
4582                                                         ReadRecPtr.xlogid, ReadRecPtr.xrecoff,
4583                                                         EndRecPtr.xlogid, EndRecPtr.xrecoff);
4584                                         xlog_outrec(buf, record);
4585                                         strcat(buf, " - ");
4586                                         RmgrTable[record->xl_rmid].rm_desc(buf,
4587                                                                         record->xl_info, XLogRecGetData(record));
4588                                         elog(LOG, "%s", buf);
4589                                 }
4590 #endif
4591
4592                                 /*
4593                                  * Have we reached our recovery target?
4594                                  */
4595                                 if (recoveryStopsHere(record, &recoveryApply))
4596                                 {
4597                                         needNewTimeLine = true;         /* see below */
4598                                         recoveryContinue = false;
4599                                         if (!recoveryApply)
4600                                                 break;
4601                                 }
4602
4603                                 /* nextXid must be beyond record's xid */
4604                                 if (TransactionIdFollowsOrEquals(record->xl_xid,
4605                                                                                                  ShmemVariableCache->nextXid))
4606                                 {
4607                                         ShmemVariableCache->nextXid = record->xl_xid;
4608                                         TransactionIdAdvance(ShmemVariableCache->nextXid);
4609                                 }
4610
4611                                 if (record->xl_info & XLR_BKP_BLOCK_MASK)
4612                                         RestoreBkpBlocks(record, EndRecPtr);
4613
4614                                 RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
4615
4616                                 LastRec = ReadRecPtr;
4617
4618                                 record = ReadRecord(NULL, LOG);
4619                         } while (record != NULL && recoveryContinue);
4620
4621                         /*
4622                          * end of main redo apply loop
4623                          */
4624
4625                         ereport(LOG,
4626                                         (errmsg("redo done at %X/%X",
4627                                                         ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
4628                         InRedo = false;
4629                 }
4630                 else
4631                 {
4632                         /* there are no WAL records following the checkpoint */
4633                         ereport(LOG,
4634                                         (errmsg("redo is not required")));
4635                 }
4636         }
4637
4638         /*
4639          * Re-fetch the last valid or last applied record, so we can identify the
4640          * exact endpoint of what we consider the valid portion of WAL.
4641          */
4642         record = ReadRecord(&LastRec, PANIC);
4643         EndOfLog = EndRecPtr;
4644         XLByteToPrevSeg(EndOfLog, endLogId, endLogSeg);
4645
4646         /*
4647          * Complain if we did not roll forward far enough to render the backup
4648          * dump consistent.
4649          */
4650         if (XLByteLT(EndOfLog, recoveryMinXlogOffset))
4651         {
4652                 if (needNewTimeLine)    /* stopped because of stop request */
4653                         ereport(FATAL,
4654                                         (errmsg("requested recovery stop point is before end time of backup dump")));
4655                 else
4656                         /* ran off end of WAL */
4657                         ereport(FATAL,
4658                                         (errmsg("WAL ends before end time of backup dump")));
4659         }
4660
4661         /*
4662          * Consider whether we need to assign a new timeline ID.
4663          *
4664          * If we stopped short of the end of WAL during recovery, then we are
4665          * generating a new timeline and must assign it a unique new ID.
4666          * Otherwise, we can just extend the timeline we were in when we ran out
4667          * of WAL.
4668          */
4669         if (needNewTimeLine)
4670         {
4671                 ThisTimeLineID = findNewestTimeLine(recoveryTargetTLI) + 1;
4672                 ereport(LOG,
4673                                 (errmsg("selected new timeline ID: %u", ThisTimeLineID)));
4674                 writeTimeLineHistory(ThisTimeLineID, recoveryTargetTLI,
4675                                                          curFileTLI, endLogId, endLogSeg);
4676         }
4677
4678         /* Save the selected TimeLineID in shared memory, too */
4679         XLogCtl->ThisTimeLineID = ThisTimeLineID;
4680
4681         /*
4682          * We are now done reading the old WAL.  Turn off archive fetching if it
4683          * was active, and make a writable copy of the last WAL segment. (Note
4684          * that we also have a copy of the last block of the old WAL in readBuf;
4685          * we will use that below.)
4686          */
4687         if (InArchiveRecovery)
4688                 exitArchiveRecovery(curFileTLI, endLogId, endLogSeg);
4689
4690         /*
4691          * Prepare to write WAL starting at EndOfLog position, and init xlog
4692          * buffer cache using the block containing the last record from the
4693          * previous incarnation.
4694          */
4695         openLogId = endLogId;
4696         openLogSeg = endLogSeg;
4697         openLogFile = XLogFileOpen(openLogId, openLogSeg);
4698         openLogOff = 0;
4699         ControlFile->logId = openLogId;
4700         ControlFile->logSeg = openLogSeg + 1;
4701         Insert = &XLogCtl->Insert;
4702         Insert->PrevRecord = LastRec;
4703         XLogCtl->xlblocks[0].xlogid = openLogId;
4704         XLogCtl->xlblocks[0].xrecoff =
4705                 ((EndOfLog.xrecoff - 1) / BLCKSZ + 1) * BLCKSZ;
4706
4707         /*
4708          * Tricky point here: readBuf contains the *last* block that the LastRec
4709          * record spans, not the one it starts in.      The last block is indeed the
4710          * one we want to use.
4711          */
4712         Assert(readOff == (XLogCtl->xlblocks[0].xrecoff - BLCKSZ) % XLogSegSize);
4713         memcpy((char *) Insert->currpage, readBuf, BLCKSZ);
4714         Insert->currpos = (char *) Insert->currpage +
4715                 (EndOfLog.xrecoff + BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
4716
4717         LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
4718
4719         XLogCtl->Write.LogwrtResult = LogwrtResult;
4720         Insert->LogwrtResult = LogwrtResult;
4721         XLogCtl->LogwrtResult = LogwrtResult;
4722
4723         XLogCtl->LogwrtRqst.Write = EndOfLog;
4724         XLogCtl->LogwrtRqst.Flush = EndOfLog;
4725
4726         freespace = INSERT_FREESPACE(Insert);
4727         if (freespace > 0)
4728         {
4729                 /* Make sure rest of page is zero */
4730                 MemSet(Insert->currpos, 0, freespace);
4731                 XLogCtl->Write.curridx = 0;
4732         }
4733         else
4734         {
4735                 /*
4736                  * Whenever Write.LogwrtResult points to exactly the end of a page,
4737                  * Write.curridx must point to the *next* page (see XLogWrite()).
4738                  *
4739                  * Note: it might seem we should do AdvanceXLInsertBuffer() here, but
4740                  * this is sufficient.  The first actual attempt to insert a log
4741                  * record will advance the insert state.
4742                  */
4743                 XLogCtl->Write.curridx = NextBufIdx(0);
4744         }
4745
4746         /* Pre-scan prepared transactions to find out the range of XIDs present */
4747         oldestActiveXID = PrescanPreparedTransactions();
4748
4749         if (InRecovery)
4750         {
4751                 int                     rmid;
4752
4753                 /*
4754                  * Allow resource managers to do any required cleanup.
4755                  */
4756                 for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
4757                 {
4758                         if (RmgrTable[rmid].rm_cleanup != NULL)
4759                                 RmgrTable[rmid].rm_cleanup();
4760                 }
4761
4762                 /*
4763                  * Reset pgstat data, because it may be invalid after recovery.
4764                  */
4765                 pgstat_reset_all();
4766
4767                 /*
4768                  * Perform a new checkpoint to update our recovery activity to disk.
4769                  *
4770                  * Note that we write a shutdown checkpoint rather than an on-line
4771                  * one. This is not particularly critical, but since we may be
4772                  * assigning a new TLI, using a shutdown checkpoint allows us to have
4773                  * the rule that TLI only changes in shutdown checkpoints, which
4774                  * allows some extra error checking in xlog_redo.
4775                  *
4776                  * In case we had to use the secondary checkpoint, make sure that it
4777                  * will still be shown as the secondary checkpoint after this
4778                  * CreateCheckPoint operation; we don't want the broken primary
4779                  * checkpoint to become prevCheckPoint...
4780                  */
4781                 if (XLByteEQ(checkPointLoc, ControlFile->prevCheckPoint))
4782                         ControlFile->checkPoint = checkPointLoc;
4783
4784                 CreateCheckPoint(true, true);
4785
4786                 /*
4787                  * Close down recovery environment
4788                  */
4789                 XLogCloseRelationCache();
4790
4791                 /*
4792                  * Now that we've checkpointed the recovery, it's safe to flush old
4793                  * backup_label, if present.
4794                  */
4795                 remove_backup_label();
4796         }
4797
4798         /*
4799          * Preallocate additional log files, if wanted.
4800          */
4801         (void) PreallocXlogFiles(EndOfLog);
4802
4803         /*
4804          * Okay, we're officially UP.
4805          */
4806         InRecovery = false;
4807
4808         ControlFile->state = DB_IN_PRODUCTION;
4809         ControlFile->time = time(NULL);
4810         UpdateControlFile();
4811
4812         /* Start up the commit log and related stuff, too */
4813         StartupCLOG();
4814         StartupSUBTRANS(oldestActiveXID);
4815         StartupMultiXact();
4816
4817         /* Reload shared-memory state for prepared transactions */
4818         RecoverPreparedTransactions();
4819
4820         ereport(LOG,
4821                         (errmsg("database system is ready")));
4822         CritSectionCount--;
4823
4824         /* Shut down readFile facility, free space */
4825         if (readFile >= 0)
4826         {
4827                 close(readFile);
4828                 readFile = -1;
4829         }
4830         if (readBuf)
4831         {
4832                 free(readBuf);
4833                 readBuf = NULL;
4834         }
4835         if (readRecordBuf)
4836         {
4837                 free(readRecordBuf);
4838                 readRecordBuf = NULL;
4839                 readRecordBufSize = 0;
4840         }
4841 }
4842
4843 /*
4844  * Subroutine to try to fetch and validate a prior checkpoint record.
4845  *
4846  * whichChkpt identifies the checkpoint (merely for reporting purposes).
4847  * 1 for "primary", 2 for "secondary", 0 for "other" (backup_label)
4848  */
4849 static XLogRecord *
4850 ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
4851 {
4852         XLogRecord *record;
4853
4854         if (!XRecOffIsValid(RecPtr.xrecoff))
4855         {
4856                 switch (whichChkpt)
4857                 {
4858                         case 1:
4859                                 ereport(LOG,
4860                                 (errmsg("invalid primary checkpoint link in control file")));
4861                                 break;
4862                         case 2:
4863                                 ereport(LOG,
4864                                                 (errmsg("invalid secondary checkpoint link in control file")));
4865                                 break;
4866                         default:
4867                                 ereport(LOG,
4868                                    (errmsg("invalid checkpoint link in backup_label file")));
4869                                 break;
4870                 }
4871                 return NULL;
4872         }
4873
4874         record = ReadRecord(&RecPtr, LOG);
4875
4876         if (record == NULL)
4877         {
4878                 switch (whichChkpt)
4879                 {
4880                         case 1:
4881                                 ereport(LOG,
4882                                                 (errmsg("invalid primary checkpoint record")));
4883                                 break;
4884                         case 2:
4885                                 ereport(LOG,
4886                                                 (errmsg("invalid secondary checkpoint record")));
4887                                 break;
4888                         default:
4889                                 ereport(LOG,
4890                                                 (errmsg("invalid checkpoint record")));
4891                                 break;
4892                 }
4893                 return NULL;
4894         }
4895         if (record->xl_rmid != RM_XLOG_ID)
4896         {
4897                 switch (whichChkpt)
4898                 {
4899                         case 1:
4900                                 ereport(LOG,
4901                                                 (errmsg("invalid resource manager ID in primary checkpoint record")));
4902                                 break;
4903                         case 2:
4904                                 ereport(LOG,
4905                                                 (errmsg("invalid resource manager ID in secondary checkpoint record")));
4906                                 break;
4907                         default:
4908                                 ereport(LOG,
4909                                 (errmsg("invalid resource manager ID in checkpoint record")));
4910                                 break;
4911                 }
4912                 return NULL;
4913         }
4914         if (record->xl_info != XLOG_CHECKPOINT_SHUTDOWN &&
4915                 record->xl_info != XLOG_CHECKPOINT_ONLINE)
4916         {
4917                 switch (whichChkpt)
4918                 {
4919                         case 1:
4920                                 ereport(LOG,
4921                                    (errmsg("invalid xl_info in primary checkpoint record")));
4922                                 break;
4923                         case 2:
4924                                 ereport(LOG,
4925                                  (errmsg("invalid xl_info in secondary checkpoint record")));
4926                                 break;
4927                         default:
4928                                 ereport(LOG,
4929                                                 (errmsg("invalid xl_info in checkpoint record")));
4930                                 break;
4931                 }
4932                 return NULL;
4933         }
4934         if (record->xl_len != sizeof(CheckPoint) ||
4935                 record->xl_tot_len != SizeOfXLogRecord + sizeof(CheckPoint))
4936         {
4937                 switch (whichChkpt)
4938                 {
4939                         case 1:
4940                                 ereport(LOG,
4941                                         (errmsg("invalid length of primary checkpoint record")));
4942                                 break;
4943                         case 2:
4944                                 ereport(LOG,
4945                                   (errmsg("invalid length of secondary checkpoint record")));
4946                                 break;
4947                         default:
4948                                 ereport(LOG,
4949                                                 (errmsg("invalid length of checkpoint record")));
4950                                 break;
4951                 }
4952                 return NULL;
4953         }
4954         return record;
4955 }
4956
4957 /*
4958  * This must be called during startup of a backend process, except that
4959  * it need not be called in a standalone backend (which does StartupXLOG
4960  * instead).  We need to initialize the local copies of ThisTimeLineID and
4961  * RedoRecPtr.
4962  *
4963  * Note: before Postgres 8.0, we went to some effort to keep the postmaster
4964  * process's copies of ThisTimeLineID and RedoRecPtr valid too.  This was
4965  * unnecessary however, since the postmaster itself never touches XLOG anyway.
4966  */
4967 void
4968 InitXLOGAccess(void)
4969 {
4970         /* ThisTimeLineID doesn't change so we need no lock to copy it */
4971         ThisTimeLineID = XLogCtl->ThisTimeLineID;
4972         /* Use GetRedoRecPtr to copy the RedoRecPtr safely */
4973         (void) GetRedoRecPtr();
4974 }
4975
4976 /*
4977  * Once spawned, a backend may update its local RedoRecPtr from
4978  * XLogCtl->Insert.RedoRecPtr; it must hold the insert lock or info_lck
4979  * to do so.  This is done in XLogInsert() or GetRedoRecPtr().
4980  */
4981 XLogRecPtr
4982 GetRedoRecPtr(void)
4983 {
4984         /* use volatile pointer to prevent code rearrangement */
4985         volatile XLogCtlData *xlogctl = XLogCtl;
4986
4987         SpinLockAcquire(&xlogctl->info_lck);
4988         Assert(XLByteLE(RedoRecPtr, xlogctl->Insert.RedoRecPtr));
4989         RedoRecPtr = xlogctl->Insert.RedoRecPtr;
4990         SpinLockRelease(&xlogctl->info_lck);
4991
4992         return RedoRecPtr;
4993 }
4994
4995 /*
4996  * GetRecentNextXid - get the nextXid value saved by the most recent checkpoint
4997  *
4998  * This is currently used only by the autovacuum daemon.  To check for
4999  * impending XID wraparound, autovac needs an approximate idea of the current
5000  * XID counter, and it needs it before choosing which DB to attach to, hence
5001  * before it sets up a PGPROC, hence before it can take any LWLocks.  But it
5002  * has attached to shared memory, and so we can let it reach into the shared
5003  * ControlFile structure and pull out the last checkpoint nextXID.
5004  *
5005  * Since we don't take any sort of lock, we have to assume that reading a
5006  * TransactionId is atomic ... but that assumption is made elsewhere, too,
5007  * and in any case the worst possible consequence of a bogus result is that
5008  * autovac issues an unnecessary database-wide VACUUM.
5009  *
5010  * Note: we could also choose to read ShmemVariableCache->nextXid in an
5011  * unlocked fashion, thus getting a more up-to-date result; but since that
5012  * changes far more frequently than the controlfile checkpoint copy, it would
5013  * pose a far higher risk of bogus result if we did have a nonatomic-read
5014  * problem.
5015  *
5016  * A (theoretically) completely safe answer is to read the actual pg_control
5017  * file into local process memory, but that certainly seems like overkill.
5018  */
5019 TransactionId
5020 GetRecentNextXid(void)
5021 {
5022         return ControlFile->checkPointCopy.nextXid;
5023 }
5024
5025 /*
5026  * This must be called ONCE during postmaster or standalone-backend shutdown
5027  */
5028 void
5029 ShutdownXLOG(int code, Datum arg)
5030 {
5031         ereport(LOG,
5032                         (errmsg("shutting down")));
5033
5034         CritSectionCount++;
5035         CreateCheckPoint(true, true);
5036         ShutdownCLOG();
5037         ShutdownSUBTRANS();
5038         ShutdownMultiXact();
5039         CritSectionCount--;
5040
5041         ereport(LOG,
5042                         (errmsg("database system is shut down")));
5043 }
5044
5045 /*
5046  * Perform a checkpoint --- either during shutdown, or on-the-fly
5047  *
5048  * If force is true, we force a checkpoint regardless of whether any XLOG
5049  * activity has occurred since the last one.
5050  */
5051 void
5052 CreateCheckPoint(bool shutdown, bool force)
5053 {
5054         CheckPoint      checkPoint;
5055         XLogRecPtr      recptr;
5056         XLogCtlInsert *Insert = &XLogCtl->Insert;
5057         XLogRecData rdata;
5058         uint32          freespace;
5059         uint32          _logId;
5060         uint32          _logSeg;
5061         int                     nsegsadded = 0;
5062         int                     nsegsremoved = 0;
5063         int                     nsegsrecycled = 0;
5064
5065         /*
5066          * Acquire CheckpointLock to ensure only one checkpoint happens at a time.
5067          * (This is just pro forma, since in the present system structure there is
5068          * only one process that is allowed to issue checkpoints at any given
5069          * time.)
5070          */
5071         LWLockAcquire(CheckpointLock, LW_EXCLUSIVE);
5072
5073         /*
5074          * Use a critical section to force system panic if we have trouble.
5075          */
5076         START_CRIT_SECTION();
5077
5078         if (shutdown)
5079         {
5080                 ControlFile->state = DB_SHUTDOWNING;
5081                 ControlFile->time = time(NULL);
5082                 UpdateControlFile();
5083         }
5084
5085         MemSet(&checkPoint, 0, sizeof(checkPoint));
5086         checkPoint.ThisTimeLineID = ThisTimeLineID;
5087         checkPoint.time = time(NULL);
5088
5089         /*
5090          * We must hold CheckpointStartLock while determining the checkpoint REDO
5091          * pointer.  This ensures that any concurrent transaction commits will be
5092          * either not yet logged, or logged and recorded in pg_clog. See notes in
5093          * RecordTransactionCommit().
5094          */
5095         LWLockAcquire(CheckpointStartLock, LW_EXCLUSIVE);
5096
5097         /* And we need WALInsertLock too */
5098         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
5099
5100         /*
5101          * If this isn't a shutdown or forced checkpoint, and we have not inserted
5102          * any XLOG records since the start of the last checkpoint, skip the
5103          * checkpoint.  The idea here is to avoid inserting duplicate checkpoints
5104          * when the system is idle. That wastes log space, and more importantly it
5105          * exposes us to possible loss of both current and previous checkpoint
5106          * records if the machine crashes just as we're writing the update.
5107          * (Perhaps it'd make even more sense to checkpoint only when the previous
5108          * checkpoint record is in a different xlog page?)
5109          *
5110          * We have to make two tests to determine that nothing has happened since
5111          * the start of the last checkpoint: current insertion point must match
5112          * the end of the last checkpoint record, and its redo pointer must point
5113          * to itself.
5114          */
5115         if (!shutdown && !force)
5116         {
5117                 XLogRecPtr      curInsert;
5118
5119                 INSERT_RECPTR(curInsert, Insert, Insert->curridx);
5120                 if (curInsert.xlogid == ControlFile->checkPoint.xlogid &&
5121                         curInsert.xrecoff == ControlFile->checkPoint.xrecoff +
5122                         MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
5123                         ControlFile->checkPoint.xlogid ==
5124                         ControlFile->checkPointCopy.redo.xlogid &&
5125                         ControlFile->checkPoint.xrecoff ==
5126                         ControlFile->checkPointCopy.redo.xrecoff)
5127                 {
5128                         LWLockRelease(WALInsertLock);
5129                         LWLockRelease(CheckpointStartLock);
5130                         LWLockRelease(CheckpointLock);
5131                         END_CRIT_SECTION();
5132                         return;
5133                 }
5134         }
5135
5136         /*
5137          * Compute new REDO record ptr = location of next XLOG record.
5138          *
5139          * NB: this is NOT necessarily where the checkpoint record itself will be,
5140          * since other backends may insert more XLOG records while we're off doing
5141          * the buffer flush work.  Those XLOG records are logically after the
5142          * checkpoint, even though physically before it.  Got that?
5143          */
5144         freespace = INSERT_FREESPACE(Insert);
5145         if (freespace < SizeOfXLogRecord)
5146         {
5147                 (void) AdvanceXLInsertBuffer();
5148                 /* OK to ignore update return flag, since we will do flush anyway */
5149                 freespace = INSERT_FREESPACE(Insert);
5150         }
5151         INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx);
5152
5153         /*
5154          * Here we update the shared RedoRecPtr for future XLogInsert calls; this
5155          * must be done while holding the insert lock AND the info_lck.
5156          *
5157          * Note: if we fail to complete the checkpoint, RedoRecPtr will be left
5158          * pointing past where it really needs to point.  This is okay; the only
5159          * consequence is that XLogInsert might back up whole buffers that it
5160          * didn't really need to.  We can't postpone advancing RedoRecPtr because
5161          * XLogInserts that happen while we are dumping buffers must assume that
5162          * their buffer changes are not included in the checkpoint.
5163          */
5164         {
5165                 /* use volatile pointer to prevent code rearrangement */
5166                 volatile XLogCtlData *xlogctl = XLogCtl;
5167
5168                 SpinLockAcquire(&xlogctl->info_lck);
5169                 RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
5170                 SpinLockRelease(&xlogctl->info_lck);
5171         }
5172
5173         /*
5174          * Now we can release insert lock and checkpoint start lock, allowing
5175          * other xacts to proceed even while we are flushing disk buffers.
5176          */
5177         LWLockRelease(WALInsertLock);
5178
5179         LWLockRelease(CheckpointStartLock);
5180
5181         /*
5182          * Get the other info we need for the checkpoint record.
5183          */
5184         LWLockAcquire(XidGenLock, LW_SHARED);
5185         checkPoint.nextXid = ShmemVariableCache->nextXid;
5186         LWLockRelease(XidGenLock);
5187
5188         LWLockAcquire(OidGenLock, LW_SHARED);
5189         checkPoint.nextOid = ShmemVariableCache->nextOid;
5190         if (!shutdown)
5191                 checkPoint.nextOid += ShmemVariableCache->oidCount;
5192         LWLockRelease(OidGenLock);
5193
5194         MultiXactGetCheckptMulti(shutdown,
5195                                                          &checkPoint.nextMulti,
5196                                                          &checkPoint.nextMultiOffset);
5197
5198         /*
5199          * Having constructed the checkpoint record, ensure all shmem disk buffers
5200          * and commit-log buffers are flushed to disk.
5201          *
5202          * This I/O could fail for various reasons.  If so, we will fail to
5203          * complete the checkpoint, but there is no reason to force a system
5204          * panic. Accordingly, exit critical section while doing it.  (If we are
5205          * doing a shutdown checkpoint, we probably *should* panic --- but that
5206          * will happen anyway because we'll still be inside the critical section
5207          * established by ShutdownXLOG.)
5208          */
5209         END_CRIT_SECTION();
5210
5211         if (!shutdown)
5212                 ereport(DEBUG2,
5213                                 (errmsg("checkpoint starting")));
5214
5215         CheckPointCLOG();
5216         CheckPointSUBTRANS();
5217         CheckPointMultiXact();
5218         FlushBufferPool();
5219         /* We deliberately delay 2PC checkpointing as long as possible */
5220         CheckPointTwoPhase(checkPoint.redo);
5221
5222         START_CRIT_SECTION();
5223
5224         /*
5225          * Now insert the checkpoint record into XLOG.
5226          */
5227         rdata.data = (char *) (&checkPoint);
5228         rdata.len = sizeof(checkPoint);
5229         rdata.buffer = InvalidBuffer;
5230         rdata.next = NULL;
5231
5232         recptr = XLogInsert(RM_XLOG_ID,
5233                                                 shutdown ? XLOG_CHECKPOINT_SHUTDOWN :
5234                                                 XLOG_CHECKPOINT_ONLINE,
5235                                                 &rdata);
5236
5237         XLogFlush(recptr);
5238
5239         /*
5240          * We now have ProcLastRecPtr = start of actual checkpoint record, recptr
5241          * = end of actual checkpoint record.
5242          */
5243         if (shutdown && !XLByteEQ(checkPoint.redo, ProcLastRecPtr))
5244                 ereport(PANIC,
5245                                 (errmsg("concurrent transaction log activity while database system is shutting down")));
5246
5247         /*
5248          * Select point at which we can truncate the log, which we base on the
5249          * prior checkpoint's earliest info.
5250          */
5251         XLByteToSeg(ControlFile->checkPointCopy.redo, _logId, _logSeg);
5252
5253         /*
5254          * Update the control file.
5255          */
5256         LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
5257         if (shutdown)
5258                 ControlFile->state = DB_SHUTDOWNED;
5259         ControlFile->prevCheckPoint = ControlFile->checkPoint;
5260         ControlFile->checkPoint = ProcLastRecPtr;
5261         ControlFile->checkPointCopy = checkPoint;
5262         ControlFile->time = time(NULL);
5263         UpdateControlFile();
5264         LWLockRelease(ControlFileLock);
5265
5266         /*
5267          * We are now done with critical updates; no need for system panic if we
5268          * have trouble while fooling with offline log segments.
5269          */
5270         END_CRIT_SECTION();
5271
5272         /*
5273          * Delete offline log files (those no longer needed even for previous
5274          * checkpoint).
5275          */
5276         if (_logId || _logSeg)
5277         {
5278                 PrevLogSeg(_logId, _logSeg);
5279                 MoveOfflineLogs(_logId, _logSeg, recptr,
5280                                                 &nsegsremoved, &nsegsrecycled);
5281         }
5282
5283         /*
5284          * Make more log segments if needed.  (Do this after deleting offline log
5285          * segments, to avoid having peak disk space usage higher than necessary.)
5286          */
5287         if (!shutdown)
5288                 nsegsadded = PreallocXlogFiles(recptr);
5289
5290         /*
5291          * Truncate pg_subtrans if possible.  We can throw away all data before
5292          * the oldest XMIN of any running transaction.  No future transaction will
5293          * attempt to reference any pg_subtrans entry older than that (see Asserts
5294          * in subtrans.c).      During recovery, though, we mustn't do this because
5295          * StartupSUBTRANS hasn't been called yet.
5296          */
5297         if (!InRecovery)
5298                 TruncateSUBTRANS(GetOldestXmin(true));
5299
5300         if (!shutdown)
5301                 ereport(DEBUG2,
5302                                 (errmsg("checkpoint complete; %d transaction log file(s) added, %d removed, %d recycled",
5303                                                 nsegsadded, nsegsremoved, nsegsrecycled)));
5304
5305         LWLockRelease(CheckpointLock);
5306 }
5307
5308 /*
5309  * Write a NEXTOID log record
5310  */
5311 void
5312 XLogPutNextOid(Oid nextOid)
5313 {
5314         XLogRecData rdata;
5315
5316         rdata.data = (char *) (&nextOid);
5317         rdata.len = sizeof(Oid);
5318         rdata.buffer = InvalidBuffer;
5319         rdata.next = NULL;
5320         (void) XLogInsert(RM_XLOG_ID, XLOG_NEXTOID, &rdata);
5321
5322         /*
5323          * We need not flush the NEXTOID record immediately, because any of the
5324          * just-allocated OIDs could only reach disk as part of a tuple insert or
5325          * update that would have its own XLOG record that must follow the NEXTOID
5326          * record.      Therefore, the standard buffer LSN interlock applied to those
5327          * records will ensure no such OID reaches disk before the NEXTOID record
5328          * does.
5329          */
5330 }
5331
5332 /*
5333  * XLOG resource manager's routines
5334  */
5335 void
5336 xlog_redo(XLogRecPtr lsn, XLogRecord *record)
5337 {
5338         uint8           info = record->xl_info & ~XLR_INFO_MASK;
5339
5340         if (info == XLOG_NEXTOID)
5341         {
5342                 Oid                     nextOid;
5343
5344                 memcpy(&nextOid, XLogRecGetData(record), sizeof(Oid));
5345                 if (ShmemVariableCache->nextOid < nextOid)
5346                 {
5347                         ShmemVariableCache->nextOid = nextOid;
5348                         ShmemVariableCache->oidCount = 0;
5349                 }
5350         }
5351         else if (info == XLOG_CHECKPOINT_SHUTDOWN)
5352         {
5353                 CheckPoint      checkPoint;
5354
5355                 memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
5356                 /* In a SHUTDOWN checkpoint, believe the counters exactly */
5357                 ShmemVariableCache->nextXid = checkPoint.nextXid;
5358                 ShmemVariableCache->nextOid = checkPoint.nextOid;
5359                 ShmemVariableCache->oidCount = 0;
5360                 MultiXactSetNextMXact(checkPoint.nextMulti,
5361                                                           checkPoint.nextMultiOffset);
5362
5363                 /*
5364                  * TLI may change in a shutdown checkpoint, but it shouldn't decrease
5365                  */
5366                 if (checkPoint.ThisTimeLineID != ThisTimeLineID)
5367                 {
5368                         if (checkPoint.ThisTimeLineID < ThisTimeLineID ||
5369                                 !list_member_int(expectedTLIs,
5370                                                                  (int) checkPoint.ThisTimeLineID))
5371                                 ereport(PANIC,
5372                                                 (errmsg("unexpected timeline ID %u (after %u) in checkpoint record",
5373                                                                 checkPoint.ThisTimeLineID, ThisTimeLineID)));
5374                         /* Following WAL records should be run with new TLI */
5375                         ThisTimeLineID = checkPoint.ThisTimeLineID;
5376                 }
5377         }
5378         else if (info == XLOG_CHECKPOINT_ONLINE)
5379         {
5380                 CheckPoint      checkPoint;
5381
5382                 memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
5383                 /* In an ONLINE checkpoint, treat the counters like NEXTOID */
5384                 if (TransactionIdPrecedes(ShmemVariableCache->nextXid,
5385                                                                   checkPoint.nextXid))
5386                         ShmemVariableCache->nextXid = checkPoint.nextXid;
5387                 if (ShmemVariableCache->nextOid < checkPoint.nextOid)
5388                 {
5389                         ShmemVariableCache->nextOid = checkPoint.nextOid;
5390                         ShmemVariableCache->oidCount = 0;
5391                 }
5392                 MultiXactAdvanceNextMXact(checkPoint.nextMulti,
5393                                                                   checkPoint.nextMultiOffset);
5394                 /* TLI should not change in an on-line checkpoint */
5395                 if (checkPoint.ThisTimeLineID != ThisTimeLineID)
5396                         ereport(PANIC,
5397                                         (errmsg("unexpected timeline ID %u (should be %u) in checkpoint record",
5398                                                         checkPoint.ThisTimeLineID, ThisTimeLineID)));
5399         }
5400 }
5401
5402 void
5403 xlog_desc(char *buf, uint8 xl_info, char *rec)
5404 {
5405         uint8           info = xl_info & ~XLR_INFO_MASK;
5406
5407         if (info == XLOG_CHECKPOINT_SHUTDOWN ||
5408                 info == XLOG_CHECKPOINT_ONLINE)
5409         {
5410                 CheckPoint *checkpoint = (CheckPoint *) rec;
5411
5412                 sprintf(buf + strlen(buf), "checkpoint: redo %X/%X; undo %X/%X; "
5413                                 "tli %u; xid %u; oid %u; multi %u; offset %u; %s",
5414                                 checkpoint->redo.xlogid, checkpoint->redo.xrecoff,
5415                                 checkpoint->undo.xlogid, checkpoint->undo.xrecoff,
5416                                 checkpoint->ThisTimeLineID, checkpoint->nextXid,
5417                                 checkpoint->nextOid,
5418                                 checkpoint->nextMulti,
5419                                 checkpoint->nextMultiOffset,
5420                                 (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
5421         }
5422         else if (info == XLOG_NEXTOID)
5423         {
5424                 Oid                     nextOid;
5425
5426                 memcpy(&nextOid, rec, sizeof(Oid));
5427                 sprintf(buf + strlen(buf), "nextOid: %u", nextOid);
5428         }
5429         else
5430                 strcat(buf, "UNKNOWN");
5431 }
5432
5433 #ifdef WAL_DEBUG
5434
5435 static void
5436 xlog_outrec(char *buf, XLogRecord *record)
5437 {
5438         int                     bkpb;
5439         int                     i;
5440
5441         sprintf(buf + strlen(buf), "prev %X/%X; xid %u",
5442                         record->xl_prev.xlogid, record->xl_prev.xrecoff,
5443                         record->xl_xid);
5444
5445         for (i = 0, bkpb = 0; i < XLR_MAX_BKP_BLOCKS; i++)
5446         {
5447                 if (!(record->xl_info & (XLR_SET_BKP_BLOCK(i))))
5448                         continue;
5449                 bkpb++;
5450         }
5451
5452         if (bkpb)
5453                 sprintf(buf + strlen(buf), "; bkpb %d", bkpb);
5454
5455         sprintf(buf + strlen(buf), ": %s",
5456                         RmgrTable[record->xl_rmid].rm_name);
5457 }
5458 #endif   /* WAL_DEBUG */
5459
5460
5461 /*
5462  * GUC support
5463  */
5464 const char *
5465 assign_xlog_sync_method(const char *method, bool doit, GucSource source)
5466 {
5467         int                     new_sync_method;
5468         int                     new_sync_bit;
5469
5470         if (pg_strcasecmp(method, "fsync") == 0)
5471         {
5472                 new_sync_method = SYNC_METHOD_FSYNC;
5473                 new_sync_bit = 0;
5474         }
5475 #ifdef HAVE_FSYNC_WRITETHROUGH
5476         else if (pg_strcasecmp(method, "fsync_writethrough") == 0)
5477         {
5478                 new_sync_method = SYNC_METHOD_FSYNC_WRITETHROUGH;
5479                 new_sync_bit = 0;
5480         }
5481 #endif
5482 #ifdef HAVE_FDATASYNC
5483         else if (pg_strcasecmp(method, "fdatasync") == 0)
5484         {
5485                 new_sync_method = SYNC_METHOD_FDATASYNC;
5486                 new_sync_bit = 0;
5487         }
5488 #endif
5489 #ifdef OPEN_SYNC_FLAG
5490         else if (pg_strcasecmp(method, "open_sync") == 0)
5491         {
5492                 new_sync_method = SYNC_METHOD_OPEN;
5493                 new_sync_bit = OPEN_SYNC_FLAG;
5494         }
5495 #endif
5496 #ifdef OPEN_DATASYNC_FLAG
5497         else if (pg_strcasecmp(method, "open_datasync") == 0)
5498         {
5499                 new_sync_method = SYNC_METHOD_OPEN;
5500                 new_sync_bit = OPEN_DATASYNC_FLAG;
5501         }
5502 #endif
5503         else
5504                 return NULL;
5505
5506         if (!doit)
5507                 return method;
5508
5509         if (sync_method != new_sync_method || open_sync_bit != new_sync_bit)
5510         {
5511                 /*
5512                  * To ensure that no blocks escape unsynced, force an fsync on the
5513                  * currently open log segment (if any).  Also, if the open flag is
5514                  * changing, close the log file so it will be reopened (with new flag
5515                  * bit) at next use.
5516                  */
5517                 if (openLogFile >= 0)
5518                 {
5519                         if (pg_fsync(openLogFile) != 0)
5520                                 ereport(PANIC,
5521                                                 (errcode_for_file_access(),
5522                                                  errmsg("could not fsync log file %u, segment %u: %m",
5523                                                                 openLogId, openLogSeg)));
5524                         if (open_sync_bit != new_sync_bit)
5525                         {
5526                                 if (close(openLogFile))
5527                                         ereport(PANIC,
5528                                                         (errcode_for_file_access(),
5529                                                 errmsg("could not close log file %u, segment %u: %m",
5530                                                            openLogId, openLogSeg)));
5531                                 openLogFile = -1;
5532                         }
5533                 }
5534                 sync_method = new_sync_method;
5535                 open_sync_bit = new_sync_bit;
5536         }
5537
5538         return method;
5539 }
5540
5541
5542 /*
5543  * Issue appropriate kind of fsync (if any) on the current XLOG output file
5544  */
5545 static void
5546 issue_xlog_fsync(void)
5547 {
5548         switch (sync_method)
5549         {
5550                 case SYNC_METHOD_FSYNC:
5551                         if (pg_fsync_no_writethrough(openLogFile) != 0)
5552                                 ereport(PANIC,
5553                                                 (errcode_for_file_access(),
5554                                                  errmsg("could not fsync log file %u, segment %u: %m",
5555                                                                 openLogId, openLogSeg)));
5556                         break;
5557 #ifdef HAVE_FSYNC_WRITETHROUGH
5558                 case SYNC_METHOD_FSYNC_WRITETHROUGH:
5559                         if (pg_fsync_writethrough(openLogFile) != 0)
5560                                 ereport(PANIC,
5561                                                 (errcode_for_file_access(),
5562                                                  errmsg("could not fsync write-through log file %u, segment %u: %m",
5563                                                                 openLogId, openLogSeg)));
5564                         break;
5565 #endif
5566 #ifdef HAVE_FDATASYNC
5567                 case SYNC_METHOD_FDATASYNC:
5568                         if (pg_fdatasync(openLogFile) != 0)
5569                                 ereport(PANIC,
5570                                                 (errcode_for_file_access(),
5571                                         errmsg("could not fdatasync log file %u, segment %u: %m",
5572                                                    openLogId, openLogSeg)));
5573                         break;
5574 #endif
5575                 case SYNC_METHOD_OPEN:
5576                         /* write synced it already */
5577                         break;
5578                 default:
5579                         elog(PANIC, "unrecognized wal_sync_method: %d", sync_method);
5580                         break;
5581         }
5582 }
5583
5584
5585 /*
5586  * pg_start_backup: set up for taking an on-line backup dump
5587  *
5588  * Essentially what this does is to create a backup label file in $PGDATA,
5589  * where it will be archived as part of the backup dump.  The label file
5590  * contains the user-supplied label string (typically this would be used
5591  * to tell where the backup dump will be stored) and the starting time and
5592  * starting WAL offset for the dump.
5593  */
5594 Datum
5595 pg_start_backup(PG_FUNCTION_ARGS)
5596 {
5597         text       *backupid = PG_GETARG_TEXT_P(0);
5598         text       *result;
5599         char       *backupidstr;
5600         XLogRecPtr      checkpointloc;
5601         XLogRecPtr      startpoint;
5602         time_t          stamp_time;
5603         char            strfbuf[128];
5604         char            xlogfilename[MAXFNAMELEN];
5605         uint32          _logId;
5606         uint32          _logSeg;
5607         struct stat stat_buf;
5608         FILE       *fp;
5609
5610         if (!superuser())
5611                 ereport(ERROR,
5612                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
5613                                  (errmsg("must be superuser to run a backup"))));
5614
5615         if (!XLogArchivingActive())
5616                 ereport(ERROR,
5617                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5618                                  (errmsg("WAL archiving is not active"),
5619                                   (errhint("archive_command must be defined before "
5620                                                    "online backups can be made safely.")))));
5621
5622         backupidstr = DatumGetCString(DirectFunctionCall1(textout,
5623                                                                                                  PointerGetDatum(backupid)));
5624
5625         /*
5626          * Force a CHECKPOINT.  This is not strictly necessary, but it seems like
5627          * a good idea to minimize the amount of past WAL needed to use the
5628          * backup.      Also, this guarantees that two successive backup runs will
5629          * have different checkpoint positions and hence different history file
5630          * names, even if nothing happened in between.
5631          */
5632         RequestCheckpoint(true, false);
5633
5634         /*
5635          * Now we need to fetch the checkpoint record location, and also its REDO
5636          * pointer.  The oldest point in WAL that would be needed to restore
5637          * starting from the checkpoint is precisely the REDO pointer.
5638          */
5639         LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
5640         checkpointloc = ControlFile->checkPoint;
5641         startpoint = ControlFile->checkPointCopy.redo;
5642         LWLockRelease(ControlFileLock);
5643
5644         XLByteToSeg(startpoint, _logId, _logSeg);
5645         XLogFileName(xlogfilename, ThisTimeLineID, _logId, _logSeg);
5646
5647         /*
5648          * We deliberately use strftime/localtime not the src/timezone functions,
5649          * so that backup labels will consistently be recorded in the same
5650          * timezone regardless of TimeZone setting.  This matches elog.c's
5651          * practice.
5652          */
5653         stamp_time = time(NULL);
5654         strftime(strfbuf, sizeof(strfbuf),
5655                          "%Y-%m-%d %H:%M:%S %Z",
5656                          localtime(&stamp_time));
5657
5658         /*
5659          * Check for existing backup label --- implies a backup is already running
5660          */
5661         if (stat(BACKUP_LABEL_FILE, &stat_buf) != 0)
5662         {
5663                 if (errno != ENOENT)
5664                         ereport(ERROR,
5665                                         (errcode_for_file_access(),
5666                                          errmsg("could not stat file \"%s\": %m",
5667                                                         BACKUP_LABEL_FILE)));
5668         }
5669         else
5670                 ereport(ERROR,
5671                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5672                                  errmsg("a backup is already in progress"),
5673                                  errhint("If you're sure there is no backup in progress, remove file \"%s\" and try again.",
5674                                                  BACKUP_LABEL_FILE)));
5675
5676         /*
5677          * Okay, write the file
5678          */
5679         fp = AllocateFile(BACKUP_LABEL_FILE, "w");
5680         if (!fp)
5681                 ereport(ERROR,
5682                                 (errcode_for_file_access(),
5683                                  errmsg("could not create file \"%s\": %m",
5684                                                 BACKUP_LABEL_FILE)));
5685         fprintf(fp, "START WAL LOCATION: %X/%X (file %s)\n",
5686                         startpoint.xlogid, startpoint.xrecoff, xlogfilename);
5687         fprintf(fp, "CHECKPOINT LOCATION: %X/%X\n",
5688                         checkpointloc.xlogid, checkpointloc.xrecoff);
5689         fprintf(fp, "START TIME: %s\n", strfbuf);
5690         fprintf(fp, "LABEL: %s\n", backupidstr);
5691         if (fflush(fp) || ferror(fp) || FreeFile(fp))
5692                 ereport(ERROR,
5693                                 (errcode_for_file_access(),
5694                                  errmsg("could not write file \"%s\": %m",
5695                                                 BACKUP_LABEL_FILE)));
5696
5697         /*
5698          * We're done.  As a convenience, return the starting WAL offset.
5699          */
5700         snprintf(xlogfilename, sizeof(xlogfilename), "%X/%X",
5701                          startpoint.xlogid, startpoint.xrecoff);
5702         result = DatumGetTextP(DirectFunctionCall1(textin,
5703                                                                                          CStringGetDatum(xlogfilename)));
5704         PG_RETURN_TEXT_P(result);
5705 }
5706
5707 /*
5708  * pg_stop_backup: finish taking an on-line backup dump
5709  *
5710  * We remove the backup label file created by pg_start_backup, and instead
5711  * create a backup history file in pg_xlog (whence it will immediately be
5712  * archived).  The backup history file contains the same info found in
5713  * the label file, plus the backup-end time and WAL offset.
5714  */
5715 Datum
5716 pg_stop_backup(PG_FUNCTION_ARGS)
5717 {
5718         text       *result;
5719         XLogCtlInsert *Insert = &XLogCtl->Insert;
5720         XLogRecPtr      startpoint;
5721         XLogRecPtr      stoppoint;
5722         time_t          stamp_time;
5723         char            strfbuf[128];
5724         char            histfilepath[MAXPGPATH];
5725         char            startxlogfilename[MAXFNAMELEN];
5726         char            stopxlogfilename[MAXFNAMELEN];
5727         uint32          _logId;
5728         uint32          _logSeg;
5729         FILE       *lfp;
5730         FILE       *fp;
5731         char            ch;
5732         int                     ich;
5733
5734         if (!superuser())
5735                 ereport(ERROR,
5736                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
5737                                  (errmsg("must be superuser to run a backup"))));
5738
5739         /*
5740          * Get the current end-of-WAL position; it will be unsafe to use this dump
5741          * to restore to a point in advance of this time.
5742          */
5743         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
5744         INSERT_RECPTR(stoppoint, Insert, Insert->curridx);
5745         LWLockRelease(WALInsertLock);
5746
5747         XLByteToSeg(stoppoint, _logId, _logSeg);
5748         XLogFileName(stopxlogfilename, ThisTimeLineID, _logId, _logSeg);
5749
5750         /*
5751          * We deliberately use strftime/localtime not the src/timezone functions,
5752          * so that backup labels will consistently be recorded in the same
5753          * timezone regardless of TimeZone setting.  This matches elog.c's
5754          * practice.
5755          */
5756         stamp_time = time(NULL);
5757         strftime(strfbuf, sizeof(strfbuf),
5758                          "%Y-%m-%d %H:%M:%S %Z",
5759                          localtime(&stamp_time));
5760
5761         /*
5762          * Open the existing label file
5763          */
5764         lfp = AllocateFile(BACKUP_LABEL_FILE, "r");
5765         if (!lfp)
5766         {
5767                 if (errno != ENOENT)
5768                         ereport(ERROR,
5769                                         (errcode_for_file_access(),
5770                                          errmsg("could not read file \"%s\": %m",
5771                                                         BACKUP_LABEL_FILE)));
5772                 ereport(ERROR,
5773                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5774                                  errmsg("a backup is not in progress")));
5775         }
5776
5777         /*
5778          * Read and parse the START WAL LOCATION line (this code is pretty crude,
5779          * but we are not expecting any variability in the file format).
5780          */
5781         if (fscanf(lfp, "START WAL LOCATION: %X/%X (file %24s)%c",
5782                            &startpoint.xlogid, &startpoint.xrecoff, startxlogfilename,
5783                            &ch) != 4 || ch != '\n')
5784                 ereport(ERROR,
5785                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5786                                  errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE)));
5787
5788         /*
5789          * Write the backup history file
5790          */
5791         XLByteToSeg(startpoint, _logId, _logSeg);
5792         BackupHistoryFilePath(histfilepath, ThisTimeLineID, _logId, _logSeg,
5793                                                   startpoint.xrecoff % XLogSegSize);
5794         fp = AllocateFile(histfilepath, "w");
5795         if (!fp)
5796                 ereport(ERROR,
5797                                 (errcode_for_file_access(),
5798                                  errmsg("could not create file \"%s\": %m",
5799                                                 histfilepath)));
5800         fprintf(fp, "START WAL LOCATION: %X/%X (file %s)\n",
5801                         startpoint.xlogid, startpoint.xrecoff, startxlogfilename);
5802         fprintf(fp, "STOP WAL LOCATION: %X/%X (file %s)\n",
5803                         stoppoint.xlogid, stoppoint.xrecoff, stopxlogfilename);
5804         /* transfer remaining lines from label to history file */
5805         while ((ich = fgetc(lfp)) != EOF)
5806                 fputc(ich, fp);
5807         fprintf(fp, "STOP TIME: %s\n", strfbuf);
5808         if (fflush(fp) || ferror(fp) || FreeFile(fp))
5809                 ereport(ERROR,
5810                                 (errcode_for_file_access(),
5811                                  errmsg("could not write file \"%s\": %m",
5812                                                 histfilepath)));
5813
5814         /*
5815          * Close and remove the backup label file
5816          */
5817         if (ferror(lfp) || FreeFile(lfp))
5818                 ereport(ERROR,
5819                                 (errcode_for_file_access(),
5820                                  errmsg("could not read file \"%s\": %m",
5821                                                 BACKUP_LABEL_FILE)));
5822         if (unlink(BACKUP_LABEL_FILE) != 0)
5823                 ereport(ERROR,
5824                                 (errcode_for_file_access(),
5825                                  errmsg("could not remove file \"%s\": %m",
5826                                                 BACKUP_LABEL_FILE)));
5827
5828         RemoveOldBackupHistory();
5829
5830         /*
5831          * Notify archiver that history file may be archived immediately
5832          */
5833         if (XLogArchivingActive())
5834         {
5835                 BackupHistoryFileName(histfilepath, ThisTimeLineID, _logId, _logSeg,
5836                                                           startpoint.xrecoff % XLogSegSize);
5837                 XLogArchiveNotify(histfilepath);
5838         }
5839
5840         /*
5841          * We're done.  As a convenience, return the ending WAL offset.
5842          */
5843         snprintf(stopxlogfilename, sizeof(stopxlogfilename), "%X/%X",
5844                          stoppoint.xlogid, stoppoint.xrecoff);
5845         result = DatumGetTextP(DirectFunctionCall1(textin,
5846                                                                                  CStringGetDatum(stopxlogfilename)));
5847         PG_RETURN_TEXT_P(result);
5848 }
5849
5850 /*
5851  * read_backup_label: check to see if a backup_label file is present
5852  *
5853  * If we see a backup_label during recovery, we assume that we are recovering
5854  * from a backup dump file, and we therefore roll forward from the checkpoint
5855  * identified by the label file, NOT what pg_control says.      This avoids the
5856  * problem that pg_control might have been archived one or more checkpoints
5857  * later than the start of the dump, and so if we rely on it as the start
5858  * point, we will fail to restore a consistent database state.
5859  *
5860  * We also attempt to retrieve the corresponding backup history file.
5861  * If successful, set recoveryMinXlogOffset to constrain valid PITR stopping
5862  * points.
5863  *
5864  * Returns TRUE if a backup_label was found (and fills the checkpoint
5865  * location into *checkPointLoc); returns FALSE if not.
5866  */
5867 static bool
5868 read_backup_label(XLogRecPtr *checkPointLoc)
5869 {
5870         XLogRecPtr      startpoint;
5871         XLogRecPtr      stoppoint;
5872         char            histfilename[MAXFNAMELEN];
5873         char            histfilepath[MAXPGPATH];
5874         char            startxlogfilename[MAXFNAMELEN];
5875         char            stopxlogfilename[MAXFNAMELEN];
5876         TimeLineID      tli;
5877         uint32          _logId;
5878         uint32          _logSeg;
5879         FILE       *lfp;
5880         FILE       *fp;
5881         char            ch;
5882
5883         /*
5884          * See if label file is present
5885          */
5886         lfp = AllocateFile(BACKUP_LABEL_FILE, "r");
5887         if (!lfp)
5888         {
5889                 if (errno != ENOENT)
5890                         ereport(FATAL,
5891                                         (errcode_for_file_access(),
5892                                          errmsg("could not read file \"%s\": %m",
5893                                                         BACKUP_LABEL_FILE)));
5894                 return false;                   /* it's not there, all is fine */
5895         }
5896
5897         /*
5898          * Read and parse the START WAL LOCATION and CHECKPOINT lines (this code
5899          * is pretty crude, but we are not expecting any variability in the file
5900          * format).
5901          */
5902         if (fscanf(lfp, "START WAL LOCATION: %X/%X (file %08X%16s)%c",
5903                            &startpoint.xlogid, &startpoint.xrecoff, &tli,
5904                            startxlogfilename, &ch) != 5 || ch != '\n')
5905                 ereport(FATAL,
5906                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5907                                  errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE)));
5908         if (fscanf(lfp, "CHECKPOINT LOCATION: %X/%X%c",
5909                            &checkPointLoc->xlogid, &checkPointLoc->xrecoff,
5910                            &ch) != 3 || ch != '\n')
5911                 ereport(FATAL,
5912                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5913                                  errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE)));
5914         if (ferror(lfp) || FreeFile(lfp))
5915                 ereport(FATAL,
5916                                 (errcode_for_file_access(),
5917                                  errmsg("could not read file \"%s\": %m",
5918                                                 BACKUP_LABEL_FILE)));
5919
5920         /*
5921          * Try to retrieve the backup history file (no error if we can't)
5922          */
5923         XLByteToSeg(startpoint, _logId, _logSeg);
5924         BackupHistoryFileName(histfilename, tli, _logId, _logSeg,
5925                                                   startpoint.xrecoff % XLogSegSize);
5926
5927         if (InArchiveRecovery)
5928                 RestoreArchivedFile(histfilepath, histfilename, "RECOVERYHISTORY", 0);
5929         else
5930                 BackupHistoryFilePath(histfilepath, tli, _logId, _logSeg,
5931                                                           startpoint.xrecoff % XLogSegSize);
5932
5933         fp = AllocateFile(histfilepath, "r");
5934         if (fp)
5935         {
5936                 /*
5937                  * Parse history file to identify stop point.
5938                  */
5939                 if (fscanf(fp, "START WAL LOCATION: %X/%X (file %24s)%c",
5940                                    &startpoint.xlogid, &startpoint.xrecoff, startxlogfilename,
5941                                    &ch) != 4 || ch != '\n')
5942                         ereport(FATAL,
5943                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5944                                          errmsg("invalid data in file \"%s\"", histfilename)));
5945                 if (fscanf(fp, "STOP WAL LOCATION: %X/%X (file %24s)%c",
5946                                    &stoppoint.xlogid, &stoppoint.xrecoff, stopxlogfilename,
5947                                    &ch) != 4 || ch != '\n')
5948                         ereport(FATAL,
5949                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5950                                          errmsg("invalid data in file \"%s\"", histfilename)));
5951                 recoveryMinXlogOffset = stoppoint;
5952                 if (ferror(fp) || FreeFile(fp))
5953                         ereport(FATAL,
5954                                         (errcode_for_file_access(),
5955                                          errmsg("could not read file \"%s\": %m",
5956                                                         histfilepath)));
5957         }
5958
5959         return true;
5960 }
5961
5962 /*
5963  * remove_backup_label: remove any extant backup_label after successful
5964  * recovery.  Once we have completed the end-of-recovery checkpoint there
5965  * is no reason to have to replay from the start point indicated by the
5966  * label (and indeed we'll probably have removed/recycled the needed WAL
5967  * segments), so remove the label to prevent trouble in later crash recoveries.
5968  */
5969 static void
5970 remove_backup_label(void)
5971 {
5972         if (unlink(BACKUP_LABEL_FILE) != 0)
5973                 if (errno != ENOENT)
5974                         ereport(FATAL,
5975                                         (errcode_for_file_access(),
5976                                          errmsg("could not remove file \"%s\": %m",
5977                                                         BACKUP_LABEL_FILE)));
5978 }