]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/xlog.c
Commit to match discussed elog() changes. Only update is that LOG is
[postgresql] / src / backend / access / transam / xlog.c
1 /*-------------------------------------------------------------------------
2  *
3  * xlog.c
4  *              PostgreSQL transaction log manager
5  *
6  *
7  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.88 2002/03/02 21:39:20 momjian Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres.h"
16
17 #include <fcntl.h>
18 #include <signal.h>
19 #include <unistd.h>
20 #include <errno.h>
21 #include <sys/stat.h>
22 #include <sys/time.h>
23 #include <sys/types.h>
24 #include <dirent.h>
25 #ifdef USE_LOCALE
26 #include <locale.h>
27 #endif
28
29 #include "access/clog.h"
30 #include "access/transam.h"
31 #include "access/xact.h"
32 #include "access/xlog.h"
33 #include "access/xlogutils.h"
34 #include "catalog/catversion.h"
35 #include "catalog/pg_control.h"
36 #include "storage/bufpage.h"
37 #include "storage/lwlock.h"
38 #include "storage/pmsignal.h"
39 #include "storage/proc.h"
40 #include "storage/sinval.h"
41 #include "storage/spin.h"
42 #include "utils/builtins.h"
43 #include "utils/relcache.h"
44 #include "utils/selfuncs.h"
45 #include "miscadmin.h"
46
47
48 /*
49  * This chunk of hackery attempts to determine which file sync methods
50  * are available on the current platform, and to choose an appropriate
51  * default method.      We assume that fsync() is always available, and that
52  * configure determined whether fdatasync() is.
53  */
54 #define SYNC_METHOD_FSYNC               0
55 #define SYNC_METHOD_FDATASYNC   1
56 #define SYNC_METHOD_OPEN                2               /* used for both O_SYNC and
57                                                                                  * O_DSYNC */
58
59 #if defined(O_SYNC)
60 #define OPEN_SYNC_FLAG     O_SYNC
61 #else
62 #if defined(O_FSYNC)
63 #define OPEN_SYNC_FLAG    O_FSYNC
64 #endif
65 #endif
66
67 #if defined(OPEN_SYNC_FLAG)
68 #if defined(O_DSYNC) && (O_DSYNC != OPEN_SYNC_FLAG)
69 #define OPEN_DATASYNC_FLAG        O_DSYNC
70 #endif
71 #endif
72
73 #if defined(OPEN_DATASYNC_FLAG)
74 #define DEFAULT_SYNC_METHOD_STR    "open_datasync"
75 #define DEFAULT_SYNC_METHOD                SYNC_METHOD_OPEN
76 #define DEFAULT_SYNC_FLAGBIT       OPEN_DATASYNC_FLAG
77 #else
78 #if defined(HAVE_FDATASYNC)
79 #define DEFAULT_SYNC_METHOD_STR   "fdatasync"
80 #define DEFAULT_SYNC_METHOD               SYNC_METHOD_FDATASYNC
81 #define DEFAULT_SYNC_FLAGBIT      0
82 #else
83 #define DEFAULT_SYNC_METHOD_STR   "fsync"
84 #define DEFAULT_SYNC_METHOD               SYNC_METHOD_FSYNC
85 #define DEFAULT_SYNC_FLAGBIT      0
86 #endif
87 #endif
88
89
90 /* User-settable parameters */
91 int                     CheckPointSegments = 3;
92 int                     XLOGbuffers = 8;
93 int                     XLOGfiles = 0;          /* # of files to preallocate during ckpt */
94 int                     XLOG_DEBUG = 0;
95 char       *XLOG_sync_method = NULL;
96 const char      XLOG_sync_method_default[] = DEFAULT_SYNC_METHOD_STR;
97 char            XLOG_archive_dir[MAXPGPATH];            /* null string means
98                                                                                                  * delete 'em */
99
100 /*
101  * XLOGfileslop is used in the code as the allowed "fuzz" in the number of
102  * preallocated XLOG segments --- we try to have at least XLOGfiles advance
103  * segments but no more than XLOGfiles+XLOGfileslop segments.  This could
104  * be made a separate GUC variable, but at present I think it's sufficient
105  * to hardwire it as 2*CheckPointSegments+1.  Under normal conditions, a
106  * checkpoint will free no more than 2*CheckPointSegments log segments, and
107  * we want to recycle all of them; the +1 allows boundary cases to happen
108  * without wasting a delete/create-segment cycle.
109  */
110
111 #define XLOGfileslop    (2*CheckPointSegments + 1)
112
113
114 /* these are derived from XLOG_sync_method by assign_xlog_sync_method */
115 static int      sync_method = DEFAULT_SYNC_METHOD;
116 static int      open_sync_bit = DEFAULT_SYNC_FLAGBIT;
117
118 #define XLOG_SYNC_BIT  (enableFsync ? open_sync_bit : 0)
119
120 #define MinXLOGbuffers  4
121
122
123 /*
124  * ThisStartUpID will be same in all backends --- it identifies current
125  * instance of the database system.
126  */
127 StartUpID       ThisStartUpID = 0;
128
129 /* Are we doing recovery by reading XLOG? */
130 bool            InRecovery = false;
131
132 /*
133  * MyLastRecPtr points to the start of the last XLOG record inserted by the
134  * current transaction.  If MyLastRecPtr.xrecoff == 0, then we are not in
135  * a transaction or the transaction has not yet made any loggable changes.
136  *
137  * Note that XLOG records inserted outside transaction control are not
138  * reflected into MyLastRecPtr.
139  */
140 XLogRecPtr      MyLastRecPtr = {0, 0};
141
142 /*
143  * ProcLastRecPtr points to the start of the last XLOG record inserted by the
144  * current backend.  It is updated for all inserts, transaction-controlled
145  * or not.
146  */
147 static XLogRecPtr ProcLastRecPtr = {0, 0};
148
149 /*
150  * RedoRecPtr is this backend's local copy of the REDO record pointer
151  * (which is almost but not quite the same as a pointer to the most recent
152  * CHECKPOINT record).  We update this from the shared-memory copy,
153  * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
154  * hold the Insert lock).  See XLogInsert for details.
155  */
156 static XLogRecPtr RedoRecPtr;
157
158 /*----------
159  * Shared-memory data structures for XLOG control
160  *
161  * LogwrtRqst indicates a byte position that we need to write and/or fsync
162  * the log up to (all records before that point must be written or fsynced).
163  * LogwrtResult indicates the byte positions we have already written/fsynced.
164  * These structs are identical but are declared separately to indicate their
165  * slightly different functions.
166  *
167  * We do a lot of pushups to minimize the amount of access to lockable
168  * shared memory values.  There are actually three shared-memory copies of
169  * LogwrtResult, plus one unshared copy in each backend.  Here's how it works:
170  *              XLogCtl->LogwrtResult is protected by info_lck
171  *              XLogCtl->Write.LogwrtResult is protected by WALWriteLock
172  *              XLogCtl->Insert.LogwrtResult is protected by WALInsertLock
173  * One must hold the associated lock to read or write any of these, but
174  * of course no lock is needed to read/write the unshared LogwrtResult.
175  *
176  * XLogCtl->LogwrtResult and XLogCtl->Write.LogwrtResult are both "always
177  * right", since both are updated by a write or flush operation before
178  * it releases WALWriteLock.  The point of keeping XLogCtl->Write.LogwrtResult
179  * is that it can be examined/modified by code that already holds WALWriteLock
180  * without needing to grab info_lck as well.
181  *
182  * XLogCtl->Insert.LogwrtResult may lag behind the reality of the other two,
183  * but is updated when convenient.      Again, it exists for the convenience of
184  * code that is already holding WALInsertLock but not the other locks.
185  *
186  * The unshared LogwrtResult may lag behind any or all of these, and again
187  * is updated when convenient.
188  *
189  * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
190  * (protected by info_lck), but we don't need to cache any copies of it.
191  *
192  * Note that this all works because the request and result positions can only
193  * advance forward, never back up, and so we can easily determine which of two
194  * values is "more up to date".
195  *
196  * info_lck is only held long enough to read/update the protected variables,
197  * so it's a plain spinlock.  The other locks are held longer (potentially
198  * over I/O operations), so we use LWLocks for them.  These locks are:
199  *
200  * WALInsertLock: must be held to insert a record into the WAL buffers.
201  *
202  * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
203  * XLogFlush).
204  *
205  * ControlFileLock: must be held to read/update control file or create
206  * new log file.
207  *
208  * CheckpointLock: must be held to do a checkpoint (ensures only one
209  * checkpointer at a time; even though the postmaster won't launch
210  * parallel checkpoint processes, we need this because manual checkpoints
211  * could be launched simultaneously).
212  *
213  *----------
214  */
215 typedef struct XLogwrtRqst
216 {
217         XLogRecPtr      Write;                  /* last byte + 1 to write out */
218         XLogRecPtr      Flush;                  /* last byte + 1 to flush */
219 } XLogwrtRqst;
220
221 typedef struct XLogwrtResult
222 {
223         XLogRecPtr      Write;                  /* last byte + 1 written out */
224         XLogRecPtr      Flush;                  /* last byte + 1 flushed */
225 } XLogwrtResult;
226
227 /*
228  * Shared state data for XLogInsert.
229  */
230 typedef struct XLogCtlInsert
231 {
232         XLogwrtResult LogwrtResult; /* a recent value of LogwrtResult */
233         XLogRecPtr      PrevRecord;             /* start of previously-inserted record */
234         uint16          curridx;                /* current block index in cache */
235         XLogPageHeader currpage;        /* points to header of block in cache */
236         char       *currpos;            /* current insertion point in cache */
237         XLogRecPtr      RedoRecPtr;             /* current redo point for insertions */
238 } XLogCtlInsert;
239
240 /*
241  * Shared state data for XLogWrite/XLogFlush.
242  */
243 typedef struct XLogCtlWrite
244 {
245         XLogwrtResult LogwrtResult; /* current value of LogwrtResult */
246         uint16          curridx;                /* cache index of next block to write */
247 } XLogCtlWrite;
248
249 /*
250  * Total shared-memory state for XLOG.
251  */
252 typedef struct XLogCtlData
253 {
254         /* Protected by WALInsertLock: */
255         XLogCtlInsert Insert;
256         /* Protected by info_lck: */
257         XLogwrtRqst LogwrtRqst;
258         XLogwrtResult LogwrtResult;
259         /* Protected by WALWriteLock: */
260         XLogCtlWrite Write;
261
262         /*
263          * These values do not change after startup, although the pointed-to
264          * pages and xlblocks values certainly do.      Permission to read/write
265          * the pages and xlblocks values depends on WALInsertLock and
266          * WALWriteLock.
267          */
268         char       *pages;                      /* buffers for unwritten XLOG pages */
269         XLogRecPtr *xlblocks;           /* 1st byte ptr-s + BLCKSZ */
270         uint32          XLogCacheByte;  /* # bytes in xlog buffers */
271         uint32          XLogCacheBlck;  /* highest allocated xlog buffer index */
272         StartUpID       ThisStartUpID;
273
274         /* This value is not protected by *any* lock... */
275         XLogRecPtr      RedoRecPtr;             /* see SetRedoRecPtr/GetRedoRecPtr */
276
277         slock_t         info_lck;               /* locks shared LogwrtRqst/LogwrtResult */
278 } XLogCtlData;
279
280 static XLogCtlData *XLogCtl = NULL;
281
282 /*
283  * We maintain an image of pg_control in shared memory.
284  */
285 static ControlFileData *ControlFile = NULL;
286
287 /*
288  * Macros for managing XLogInsert state.  In most cases, the calling routine
289  * has local copies of XLogCtl->Insert and/or XLogCtl->Insert->curridx,
290  * so these are passed as parameters instead of being fetched via XLogCtl.
291  */
292
293 /* Free space remaining in the current xlog page buffer */
294 #define INSERT_FREESPACE(Insert)  \
295         (BLCKSZ - ((Insert)->currpos - (char *) (Insert)->currpage))
296
297 /* Construct XLogRecPtr value for current insertion point */
298 #define INSERT_RECPTR(recptr,Insert,curridx)  \
299         ( \
300           (recptr).xlogid = XLogCtl->xlblocks[curridx].xlogid, \
301           (recptr).xrecoff = \
302                 XLogCtl->xlblocks[curridx].xrecoff - INSERT_FREESPACE(Insert) \
303         )
304
305
306 /* Increment an xlogid/segment pair */
307 #define NextLogSeg(logId, logSeg)       \
308         do { \
309                 if ((logSeg) >= XLogSegsPerFile-1) \
310                 { \
311                         (logId)++; \
312                         (logSeg) = 0; \
313                 } \
314                 else \
315                         (logSeg)++; \
316         } while (0)
317
318 /* Decrement an xlogid/segment pair (assume it's not 0,0) */
319 #define PrevLogSeg(logId, logSeg)       \
320         do { \
321                 if (logSeg) \
322                         (logSeg)--; \
323                 else \
324                 { \
325                         (logId)--; \
326                         (logSeg) = XLogSegsPerFile-1; \
327                 } \
328         } while (0)
329
330 /*
331  * Compute ID and segment from an XLogRecPtr.
332  *
333  * For XLByteToSeg, do the computation at face value.  For XLByteToPrevSeg,
334  * a boundary byte is taken to be in the previous segment.      This is suitable
335  * for deciding which segment to write given a pointer to a record end,
336  * for example.
337  */
338 #define XLByteToSeg(xlrp, logId, logSeg)        \
339         ( logId = (xlrp).xlogid, \
340           logSeg = (xlrp).xrecoff / XLogSegSize \
341         )
342 #define XLByteToPrevSeg(xlrp, logId, logSeg)    \
343         ( logId = (xlrp).xlogid, \
344           logSeg = ((xlrp).xrecoff - 1) / XLogSegSize \
345         )
346
347 /*
348  * Is an XLogRecPtr within a particular XLOG segment?
349  *
350  * For XLByteInSeg, do the computation at face value.  For XLByteInPrevSeg,
351  * a boundary byte is taken to be in the previous segment.
352  */
353 #define XLByteInSeg(xlrp, logId, logSeg)        \
354         ((xlrp).xlogid == (logId) && \
355          (xlrp).xrecoff / XLogSegSize == (logSeg))
356
357 #define XLByteInPrevSeg(xlrp, logId, logSeg)    \
358         ((xlrp).xlogid == (logId) && \
359          ((xlrp).xrecoff - 1) / XLogSegSize == (logSeg))
360
361
362 #define XLogFileName(path, log, seg)    \
363                         snprintf(path, MAXPGPATH, "%s/%08X%08X",        \
364                                          XLogDir, log, seg)
365
366 #define PrevBufIdx(idx)         \
367                 (((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))
368
369 #define NextBufIdx(idx)         \
370                 (((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
371
372 #define XRecOffIsValid(xrecoff) \
373                 ((xrecoff) % BLCKSZ >= SizeOfXLogPHD && \
374                 (BLCKSZ - (xrecoff) % BLCKSZ) >= SizeOfXLogRecord)
375
376 /*
377  * _INTL_MAXLOGRECSZ: max space needed for a record including header and
378  * any backup-block data.
379  */
380 #define _INTL_MAXLOGRECSZ       (SizeOfXLogRecord + MAXLOGRECSZ + \
381                                                          XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
382
383
384 /* File path names */
385 static char XLogDir[MAXPGPATH];
386 static char ControlFilePath[MAXPGPATH];
387
388 /*
389  * Private, possibly out-of-date copy of shared LogwrtResult.
390  * See discussion above.
391  */
392 static XLogwrtResult LogwrtResult = {{0, 0}, {0, 0}};
393
394 /*
395  * openLogFile is -1 or a kernel FD for an open log file segment.
396  * When it's open, openLogOff is the current seek offset in the file.
397  * openLogId/openLogSeg identify the segment.  These variables are only
398  * used to write the XLOG, and so will normally refer to the active segment.
399  */
400 static int      openLogFile = -1;
401 static uint32 openLogId = 0;
402 static uint32 openLogSeg = 0;
403 static uint32 openLogOff = 0;
404
405 /*
406  * These variables are used similarly to the ones above, but for reading
407  * the XLOG.  Note, however, that readOff generally represents the offset
408  * of the page just read, not the seek position of the FD itself, which
409  * will be just past that page.
410  */
411 static int      readFile = -1;
412 static uint32 readId = 0;
413 static uint32 readSeg = 0;
414 static uint32 readOff = 0;
415
416 /* Buffer for currently read page (BLCKSZ bytes) */
417 static char *readBuf = NULL;
418
419 /* State information for XLOG reading */
420 static XLogRecPtr ReadRecPtr;
421 static XLogRecPtr EndRecPtr;
422 static XLogRecord *nextRecord = NULL;
423 static StartUpID lastReadSUI;
424
425 static bool InRedo = false;
426
427
428 static bool AdvanceXLInsertBuffer(void);
429 static void XLogWrite(XLogwrtRqst WriteRqst);
430 static int XLogFileInit(uint32 log, uint32 seg,
431                          bool *use_existent, bool use_lock);
432 static bool InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath,
433                                            bool find_free, int max_advance,
434                                            bool use_lock);
435 static int      XLogFileOpen(uint32 log, uint32 seg, bool econt);
436 static void PreallocXlogFiles(XLogRecPtr endptr);
437 static void MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr);
438 static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer);
439 static bool ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI);
440 static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr,
441                                          int whichChkpt,
442                                          char *buffer);
443 static void WriteControlFile(void);
444 static void ReadControlFile(void);
445 static char *str_time(time_t tnow);
446 static void xlog_outrec(char *buf, XLogRecord *record);
447 static void issue_xlog_fsync(void);
448
449
450 /*
451  * Insert an XLOG record having the specified RMID and info bytes,
452  * with the body of the record being the data chunk(s) described by
453  * the rdata list (see xlog.h for notes about rdata).
454  *
455  * Returns XLOG pointer to end of record (beginning of next record).
456  * This can be used as LSN for data pages affected by the logged action.
457  * (LSN is the XLOG point up to which the XLOG must be flushed to disk
458  * before the data page can be written out.  This implements the basic
459  * WAL rule "write the log before the data".)
460  *
461  * NB: this routine feels free to scribble on the XLogRecData structs,
462  * though not on the data they reference.  This is OK since the XLogRecData
463  * structs are always just temporaries in the calling code.
464  */
465 XLogRecPtr
466 XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
467 {
468         XLogCtlInsert *Insert = &XLogCtl->Insert;
469         XLogRecord *record;
470         XLogContRecord *contrecord;
471         XLogRecPtr      RecPtr;
472         XLogRecPtr      WriteRqst;
473         uint32          freespace;
474         uint16          curridx;
475         XLogRecData *rdt;
476         Buffer          dtbuf[XLR_MAX_BKP_BLOCKS];
477         bool            dtbuf_bkp[XLR_MAX_BKP_BLOCKS];
478         BkpBlock        dtbuf_xlg[XLR_MAX_BKP_BLOCKS];
479         XLogRecPtr      dtbuf_lsn[XLR_MAX_BKP_BLOCKS];
480         XLogRecData dtbuf_rdt[2 * XLR_MAX_BKP_BLOCKS];
481         crc64           rdata_crc;
482         uint32          len,
483                                 write_len;
484         unsigned        i;
485         XLogwrtRqst LogwrtRqst;
486         bool            updrqst;
487         bool            no_tran = (rmid == RM_XLOG_ID) ? true : false;
488
489         if (info & XLR_INFO_MASK)
490         {
491                 if ((info & XLR_INFO_MASK) != XLOG_NO_TRAN)
492                         elog(PANIC, "XLogInsert: invalid info mask %02X",
493                                  (info & XLR_INFO_MASK));
494                 no_tran = true;
495                 info &= ~XLR_INFO_MASK;
496         }
497
498         /*
499          * In bootstrap mode, we don't actually log anything but XLOG
500          * resources; return a phony record pointer.
501          */
502         if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
503         {
504                 RecPtr.xlogid = 0;
505                 RecPtr.xrecoff = SizeOfXLogPHD; /* start of 1st checkpoint record */
506                 return (RecPtr);
507         }
508
509         /*
510          * Here we scan the rdata list, determine which buffers must be backed
511          * up, and compute the CRC values for the data.  Note that the record
512          * header isn't added into the CRC yet since we don't know the final
513          * length or info bits quite yet.
514          *
515          * We may have to loop back to here if a race condition is detected
516          * below. We could prevent the race by doing all this work while
517          * holding the insert lock, but it seems better to avoid doing CRC
518          * calculations while holding the lock.  This means we have to be
519          * careful about modifying the rdata list until we know we aren't
520          * going to loop back again.  The only change we allow ourselves to
521          * make earlier is to set rdt->data = NULL in list items we have
522          * decided we will have to back up the whole buffer for.  This is OK
523          * because we will certainly decide the same thing again for those
524          * items if we do it over; doing it here saves an extra pass over the
525          * list later.
526          */
527 begin:;
528         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
529         {
530                 dtbuf[i] = InvalidBuffer;
531                 dtbuf_bkp[i] = false;
532         }
533
534         INIT_CRC64(rdata_crc);
535         len = 0;
536         for (rdt = rdata;;)
537         {
538                 if (rdt->buffer == InvalidBuffer)
539                 {
540                         /* Simple data, just include it */
541                         len += rdt->len;
542                         COMP_CRC64(rdata_crc, rdt->data, rdt->len);
543                 }
544                 else
545                 {
546                         /* Find info for buffer */
547                         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
548                         {
549                                 if (rdt->buffer == dtbuf[i])
550                                 {
551                                         /* Buffer already referenced by earlier list item */
552                                         if (dtbuf_bkp[i])
553                                                 rdt->data = NULL;
554                                         else if (rdt->data)
555                                         {
556                                                 len += rdt->len;
557                                                 COMP_CRC64(rdata_crc, rdt->data, rdt->len);
558                                         }
559                                         break;
560                                 }
561                                 if (dtbuf[i] == InvalidBuffer)
562                                 {
563                                         /* OK, put it in this slot */
564                                         dtbuf[i] = rdt->buffer;
565
566                                         /*
567                                          * XXX We assume page LSN is first data on page
568                                          */
569                                         dtbuf_lsn[i] = *((XLogRecPtr *) BufferGetBlock(rdt->buffer));
570                                         if (XLByteLE(dtbuf_lsn[i], RedoRecPtr))
571                                         {
572                                                 crc64           dtcrc;
573
574                                                 dtbuf_bkp[i] = true;
575                                                 rdt->data = NULL;
576                                                 INIT_CRC64(dtcrc);
577                                                 COMP_CRC64(dtcrc,
578                                                                    BufferGetBlock(dtbuf[i]),
579                                                                    BLCKSZ);
580                                                 dtbuf_xlg[i].node = BufferGetFileNode(dtbuf[i]);
581                                                 dtbuf_xlg[i].block = BufferGetBlockNumber(dtbuf[i]);
582                                                 COMP_CRC64(dtcrc,
583                                                                 (char *) &(dtbuf_xlg[i]) + sizeof(crc64),
584                                                                    sizeof(BkpBlock) - sizeof(crc64));
585                                                 FIN_CRC64(dtcrc);
586                                                 dtbuf_xlg[i].crc = dtcrc;
587                                         }
588                                         else if (rdt->data)
589                                         {
590                                                 len += rdt->len;
591                                                 COMP_CRC64(rdata_crc, rdt->data, rdt->len);
592                                         }
593                                         break;
594                                 }
595                         }
596                         if (i >= XLR_MAX_BKP_BLOCKS)
597                                 elog(PANIC, "XLogInsert: can backup %d blocks at most",
598                                          XLR_MAX_BKP_BLOCKS);
599                 }
600                 /* Break out of loop when rdt points to last list item */
601                 if (rdt->next == NULL)
602                         break;
603                 rdt = rdt->next;
604         }
605
606         /*
607          * NOTE: the test for len == 0 here is somewhat fishy, since in theory
608          * all of the rmgr data might have been suppressed in favor of backup
609          * blocks.      Currently, all callers of XLogInsert provide at least some
610          * not-in-a-buffer data and so len == 0 should never happen, but that
611          * may not be true forever.  If you need to remove the len == 0 check,
612          * also remove the check for xl_len == 0 in ReadRecord, below.
613          */
614         if (len == 0 || len > MAXLOGRECSZ)
615                 elog(PANIC, "XLogInsert: invalid record length %u", len);
616
617         START_CRIT_SECTION();
618
619         /* update LogwrtResult before doing cache fill check */
620         {
621                 /* use volatile pointer to prevent code rearrangement */
622                 volatile XLogCtlData *xlogctl = XLogCtl;
623
624                 SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
625                 LogwrtRqst = xlogctl->LogwrtRqst;
626                 LogwrtResult = xlogctl->LogwrtResult;
627                 SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
628         }
629
630         /*
631          * If cache is half filled then try to acquire write lock and do
632          * XLogWrite. Ignore any fractional blocks in performing this check.
633          */
634         LogwrtRqst.Write.xrecoff -= LogwrtRqst.Write.xrecoff % BLCKSZ;
635         if (LogwrtRqst.Write.xlogid != LogwrtResult.Write.xlogid ||
636                 (LogwrtRqst.Write.xrecoff >= LogwrtResult.Write.xrecoff +
637                  XLogCtl->XLogCacheByte / 2))
638         {
639                 if (LWLockConditionalAcquire(WALWriteLock, LW_EXCLUSIVE))
640                 {
641                         LogwrtResult = XLogCtl->Write.LogwrtResult;
642                         if (XLByteLT(LogwrtResult.Write, LogwrtRqst.Write))
643                                 XLogWrite(LogwrtRqst);
644                         LWLockRelease(WALWriteLock);
645                 }
646         }
647
648         /* Now wait to get insert lock */
649         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
650
651         /*
652          * Check to see if my RedoRecPtr is out of date.  If so, may have to
653          * go back and recompute everything.  This can only happen just after
654          * a checkpoint, so it's better to be slow in this case and fast
655          * otherwise.
656          */
657         if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))
658         {
659                 Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));
660                 RedoRecPtr = Insert->RedoRecPtr;
661
662                 for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
663                 {
664                         if (dtbuf[i] == InvalidBuffer)
665                                 continue;
666                         if (dtbuf_bkp[i] == false &&
667                                 XLByteLE(dtbuf_lsn[i], RedoRecPtr))
668                         {
669                                 /*
670                                  * Oops, this buffer now needs to be backed up, but we
671                                  * didn't think so above.  Start over.
672                                  */
673                                 LWLockRelease(WALInsertLock);
674                                 END_CRIT_SECTION();
675                                 goto begin;
676                         }
677                 }
678         }
679
680         /*
681          * Make additional rdata list entries for the backup blocks, so that
682          * we don't need to special-case them in the write loop.  Note that we
683          * have now irrevocably changed the input rdata list.  At the exit of
684          * this loop, write_len includes the backup block data.
685          *
686          * Also set the appropriate info bits to show which buffers were backed
687          * up.  The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th
688          * distinct buffer value (ignoring InvalidBuffer) appearing in the
689          * rdata list.
690          */
691         write_len = len;
692         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
693         {
694                 if (dtbuf[i] == InvalidBuffer || !(dtbuf_bkp[i]))
695                         continue;
696
697                 info |= XLR_SET_BKP_BLOCK(i);
698
699                 rdt->next = &(dtbuf_rdt[2 * i]);
700
701                 dtbuf_rdt[2 * i].data = (char *) &(dtbuf_xlg[i]);
702                 dtbuf_rdt[2 * i].len = sizeof(BkpBlock);
703                 write_len += sizeof(BkpBlock);
704
705                 rdt = dtbuf_rdt[2 * i].next = &(dtbuf_rdt[2 * i + 1]);
706
707                 dtbuf_rdt[2 * i + 1].data = (char *) BufferGetBlock(dtbuf[i]);
708                 dtbuf_rdt[2 * i + 1].len = BLCKSZ;
709                 write_len += BLCKSZ;
710                 dtbuf_rdt[2 * i + 1].next = NULL;
711         }
712
713         /* Insert record header */
714
715         updrqst = false;
716         freespace = INSERT_FREESPACE(Insert);
717         if (freespace < SizeOfXLogRecord)
718         {
719                 updrqst = AdvanceXLInsertBuffer();
720                 freespace = BLCKSZ - SizeOfXLogPHD;
721         }
722
723         curridx = Insert->curridx;
724         record = (XLogRecord *) Insert->currpos;
725
726         record->xl_prev = Insert->PrevRecord;
727         if (no_tran)
728         {
729                 record->xl_xact_prev.xlogid = 0;
730                 record->xl_xact_prev.xrecoff = 0;
731         }
732         else
733                 record->xl_xact_prev = MyLastRecPtr;
734
735         record->xl_xid = GetCurrentTransactionId();
736         record->xl_len = len;           /* doesn't include backup blocks */
737         record->xl_info = info;
738         record->xl_rmid = rmid;
739
740         /* Now we can finish computing the main CRC */
741         COMP_CRC64(rdata_crc, (char *) record + sizeof(crc64),
742                            SizeOfXLogRecord - sizeof(crc64));
743         FIN_CRC64(rdata_crc);
744         record->xl_crc = rdata_crc;
745
746         /* Compute record's XLOG location */
747         INSERT_RECPTR(RecPtr, Insert, curridx);
748
749         /* If first XLOG record of transaction, save it in PROC array */
750         if (MyLastRecPtr.xrecoff == 0 && !no_tran)
751         {
752                 /*
753                  * We do not acquire SInvalLock here because of possible deadlock.
754                  * Anyone who wants to inspect other procs' logRec must acquire
755                  * WALInsertLock, instead.  A better solution would be a per-PROC
756                  * spinlock, but no time for that before 7.2 --- tgl 12/19/01.
757                  */
758                 MyProc->logRec = RecPtr;
759         }
760
761         if (XLOG_DEBUG)
762         {
763                 char            buf[8192];
764
765                 sprintf(buf, "INSERT @ %X/%X: ", RecPtr.xlogid, RecPtr.xrecoff);
766                 xlog_outrec(buf, record);
767                 if (rdata->data != NULL)
768                 {
769                         strcat(buf, " - ");
770                         RmgrTable[record->xl_rmid].rm_desc(buf, record->xl_info, rdata->data);
771                 }
772                 elog(LOG, "%s", buf);
773         }
774
775         /* Record begin of record in appropriate places */
776         if (!no_tran)
777                 MyLastRecPtr = RecPtr;
778         ProcLastRecPtr = RecPtr;
779         Insert->PrevRecord = RecPtr;
780
781         Insert->currpos += SizeOfXLogRecord;
782         freespace -= SizeOfXLogRecord;
783
784         /*
785          * Append the data, including backup blocks if any
786          */
787         while (write_len)
788         {
789                 while (rdata->data == NULL)
790                         rdata = rdata->next;
791
792                 if (freespace > 0)
793                 {
794                         if (rdata->len > freespace)
795                         {
796                                 memcpy(Insert->currpos, rdata->data, freespace);
797                                 rdata->data += freespace;
798                                 rdata->len -= freespace;
799                                 write_len -= freespace;
800                         }
801                         else
802                         {
803                                 memcpy(Insert->currpos, rdata->data, rdata->len);
804                                 freespace -= rdata->len;
805                                 write_len -= rdata->len;
806                                 Insert->currpos += rdata->len;
807                                 rdata = rdata->next;
808                                 continue;
809                         }
810                 }
811
812                 /* Use next buffer */
813                 updrqst = AdvanceXLInsertBuffer();
814                 curridx = Insert->curridx;
815                 /* Insert cont-record header */
816                 Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
817                 contrecord = (XLogContRecord *) Insert->currpos;
818                 contrecord->xl_rem_len = write_len;
819                 Insert->currpos += SizeOfXLogContRecord;
820                 freespace = BLCKSZ - SizeOfXLogPHD - SizeOfXLogContRecord;
821         }
822
823         /* Ensure next record will be properly aligned */
824         Insert->currpos = (char *) Insert->currpage +
825                 MAXALIGN(Insert->currpos - (char *) Insert->currpage);
826         freespace = INSERT_FREESPACE(Insert);
827
828         /*
829          * The recptr I return is the beginning of the *next* record. This
830          * will be stored as LSN for changed data pages...
831          */
832         INSERT_RECPTR(RecPtr, Insert, curridx);
833
834         /* Need to update shared LogwrtRqst if some block was filled up */
835         if (freespace < SizeOfXLogRecord)
836                 updrqst = true;                 /* curridx is filled and available for
837                                                                  * writing out */
838         else
839                 curridx = PrevBufIdx(curridx);
840         WriteRqst = XLogCtl->xlblocks[curridx];
841
842         LWLockRelease(WALInsertLock);
843
844         if (updrqst)
845         {
846                 /* use volatile pointer to prevent code rearrangement */
847                 volatile XLogCtlData *xlogctl = XLogCtl;
848
849                 SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
850                 /* advance global request to include new block(s) */
851                 if (XLByteLT(xlogctl->LogwrtRqst.Write, WriteRqst))
852                         xlogctl->LogwrtRqst.Write = WriteRqst;
853                 /* update local result copy while I have the chance */
854                 LogwrtResult = xlogctl->LogwrtResult;
855                 SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
856         }
857
858         END_CRIT_SECTION();
859
860         return (RecPtr);
861 }
862
863 /*
864  * Advance the Insert state to the next buffer page, writing out the next
865  * buffer if it still contains unwritten data.
866  *
867  * The global LogwrtRqst.Write pointer needs to be advanced to include the
868  * just-filled page.  If we can do this for free (without an extra lock),
869  * we do so here.  Otherwise the caller must do it.  We return TRUE if the
870  * request update still needs to be done, FALSE if we did it internally.
871  *
872  * Must be called with WALInsertLock held.
873  */
874 static bool
875 AdvanceXLInsertBuffer(void)
876 {
877         XLogCtlInsert *Insert = &XLogCtl->Insert;
878         XLogCtlWrite *Write = &XLogCtl->Write;
879         uint16          nextidx = NextBufIdx(Insert->curridx);
880         bool            update_needed = true;
881         XLogRecPtr      OldPageRqstPtr;
882         XLogwrtRqst WriteRqst;
883         XLogRecPtr      NewPageEndPtr;
884         XLogPageHeader NewPage;
885
886         /* Use Insert->LogwrtResult copy if it's more fresh */
887         if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write))
888                 LogwrtResult = Insert->LogwrtResult;
889
890         /*
891          * Get ending-offset of the buffer page we need to replace (this may
892          * be zero if the buffer hasn't been used yet).  Fall through if it's
893          * already written out.
894          */
895         OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
896         if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
897         {
898                 /* nope, got work to do... */
899                 XLogRecPtr      FinishedPageRqstPtr;
900
901                 FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx];
902
903                 /* Before waiting, get info_lck and update LogwrtResult */
904                 {
905                         /* use volatile pointer to prevent code rearrangement */
906                         volatile XLogCtlData *xlogctl = XLogCtl;
907
908                         SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
909                         if (XLByteLT(xlogctl->LogwrtRqst.Write, FinishedPageRqstPtr))
910                                 xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr;
911                         LogwrtResult = xlogctl->LogwrtResult;
912                         SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
913                 }
914
915                 update_needed = false;  /* Did the shared-request update */
916
917                 if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
918                 {
919                         /* OK, someone wrote it already */
920                         Insert->LogwrtResult = LogwrtResult;
921                 }
922                 else
923                 {
924                         /* Must acquire write lock */
925                         LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
926                         LogwrtResult = Write->LogwrtResult;
927                         if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
928                         {
929                                 /* OK, someone wrote it already */
930                                 LWLockRelease(WALWriteLock);
931                                 Insert->LogwrtResult = LogwrtResult;
932                         }
933                         else
934                         {
935                                 /*
936                                  * Have to write buffers while holding insert lock. This
937                                  * is not good, so only write as much as we absolutely
938                                  * must.
939                                  */
940                                 WriteRqst.Write = OldPageRqstPtr;
941                                 WriteRqst.Flush.xlogid = 0;
942                                 WriteRqst.Flush.xrecoff = 0;
943                                 XLogWrite(WriteRqst);
944                                 LWLockRelease(WALWriteLock);
945                                 Insert->LogwrtResult = LogwrtResult;
946                         }
947                 }
948         }
949
950         /*
951          * Now the next buffer slot is free and we can set it up to be the
952          * next output page.
953          */
954         NewPageEndPtr = XLogCtl->xlblocks[Insert->curridx];
955         if (NewPageEndPtr.xrecoff >= XLogFileSize)
956         {
957                 /* crossing a logid boundary */
958                 NewPageEndPtr.xlogid += 1;
959                 NewPageEndPtr.xrecoff = BLCKSZ;
960         }
961         else
962                 NewPageEndPtr.xrecoff += BLCKSZ;
963         XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
964         NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * BLCKSZ);
965         Insert->curridx = nextidx;
966         Insert->currpage = NewPage;
967         Insert->currpos = ((char *) NewPage) + SizeOfXLogPHD;
968
969         /*
970          * Be sure to re-zero the buffer so that bytes beyond what we've
971          * written will look like zeroes and not valid XLOG records...
972          */
973         MemSet((char *) NewPage, 0, BLCKSZ);
974
975         /* And fill the new page's header */
976         NewPage->xlp_magic = XLOG_PAGE_MAGIC;
977         /* NewPage->xlp_info = 0; */    /* done by memset */
978         NewPage->xlp_sui = ThisStartUpID;
979         NewPage->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid;
980         NewPage->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - BLCKSZ;
981
982         return update_needed;
983 }
984
985 /*
986  * Write and/or fsync the log at least as far as WriteRqst indicates.
987  *
988  * Must be called with WALWriteLock held.
989  */
990 static void
991 XLogWrite(XLogwrtRqst WriteRqst)
992 {
993         XLogCtlWrite *Write = &XLogCtl->Write;
994         char       *from;
995         bool            ispartialpage;
996         bool            use_existent;
997
998         /*
999          * Update local LogwrtResult (caller probably did this already,
1000          * but...)
1001          */
1002         LogwrtResult = Write->LogwrtResult;
1003
1004         while (XLByteLT(LogwrtResult.Write, WriteRqst.Write))
1005         {
1006                 /*
1007                  * Make sure we're not ahead of the insert process.  This could
1008                  * happen if we're passed a bogus WriteRqst.Write that is past the
1009                  * end of the last page that's been initialized by
1010                  * AdvanceXLInsertBuffer.
1011                  */
1012                 if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[Write->curridx]))
1013                         elog(PANIC, "XLogWrite: write request %X/%X is past end of log %X/%X",
1014                                  LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
1015                                  XLogCtl->xlblocks[Write->curridx].xlogid,
1016                                  XLogCtl->xlblocks[Write->curridx].xrecoff);
1017
1018                 /* Advance LogwrtResult.Write to end of current buffer page */
1019                 LogwrtResult.Write = XLogCtl->xlblocks[Write->curridx];
1020                 ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);
1021
1022                 if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
1023                 {
1024                         /*
1025                          * Switch to new logfile segment.
1026                          */
1027                         if (openLogFile >= 0)
1028                         {
1029                                 if (close(openLogFile) != 0)
1030                                         elog(PANIC, "close of log file %u, segment %u failed: %m",
1031                                                  openLogId, openLogSeg);
1032                                 openLogFile = -1;
1033                         }
1034                         XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1035
1036                         /* create/use new log file */
1037                         use_existent = true;
1038                         openLogFile = XLogFileInit(openLogId, openLogSeg,
1039                                                                            &use_existent, true);
1040                         openLogOff = 0;
1041
1042                         if (!use_existent)      /* there was no precreated file */
1043                                 elog(LOG, "XLogWrite: new log file created - "
1044                                          "consider increasing 'wal_files' in postgresql.conf.");
1045
1046                         /* update pg_control, unless someone else already did */
1047                         LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
1048                         if (ControlFile->logId < openLogId ||
1049                                 (ControlFile->logId == openLogId &&
1050                                  ControlFile->logSeg < openLogSeg + 1))
1051                         {
1052                                 ControlFile->logId = openLogId;
1053                                 ControlFile->logSeg = openLogSeg + 1;
1054                                 ControlFile->time = time(NULL);
1055                                 UpdateControlFile();
1056
1057                                 /*
1058                                  * Signal postmaster to start a checkpoint if it's been
1059                                  * too long since the last one.  (We look at local copy of
1060                                  * RedoRecPtr which might be a little out of date, but
1061                                  * should be close enough for this purpose.)
1062                                  */
1063                                 if (IsUnderPostmaster &&
1064                                         (openLogId != RedoRecPtr.xlogid ||
1065                                          openLogSeg >= (RedoRecPtr.xrecoff / XLogSegSize) +
1066                                          (uint32) CheckPointSegments))
1067                                 {
1068                                         if (XLOG_DEBUG)
1069                                                 elog(LOG, "XLogWrite: time for a checkpoint, signaling postmaster");
1070                                         SendPostmasterSignal(PMSIGNAL_DO_CHECKPOINT);
1071                                 }
1072                         }
1073                         LWLockRelease(ControlFileLock);
1074                 }
1075
1076                 if (openLogFile < 0)
1077                 {
1078                         XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1079                         openLogFile = XLogFileOpen(openLogId, openLogSeg, false);
1080                         openLogOff = 0;
1081                 }
1082
1083                 /* Need to seek in the file? */
1084                 if (openLogOff != (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize)
1085                 {
1086                         openLogOff = (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize;
1087                         if (lseek(openLogFile, (off_t) openLogOff, SEEK_SET) < 0)
1088                                 elog(PANIC, "lseek of log file %u, segment %u, offset %u failed: %m",
1089                                          openLogId, openLogSeg, openLogOff);
1090                 }
1091
1092                 /* OK to write the page */
1093                 from = XLogCtl->pages + Write->curridx * BLCKSZ;
1094                 errno = 0;
1095                 if (write(openLogFile, from, BLCKSZ) != BLCKSZ)
1096                 {
1097                         /* if write didn't set errno, assume problem is no disk space */
1098                         if (errno == 0)
1099                                 errno = ENOSPC;
1100                         elog(PANIC, "write of log file %u, segment %u, offset %u failed: %m",
1101                                  openLogId, openLogSeg, openLogOff);
1102                 }
1103                 openLogOff += BLCKSZ;
1104
1105                 /*
1106                  * If we just wrote the whole last page of a logfile segment,
1107                  * fsync the segment immediately.  This avoids having to go back
1108                  * and re-open prior segments when an fsync request comes along
1109                  * later. Doing it here ensures that one and only one backend will
1110                  * perform this fsync.
1111                  */
1112                 if (openLogOff >= XLogSegSize && !ispartialpage)
1113                 {
1114                         issue_xlog_fsync();
1115                         LogwrtResult.Flush = LogwrtResult.Write;        /* end of current page */
1116                 }
1117
1118                 if (ispartialpage)
1119                 {
1120                         /* Only asked to write a partial page */
1121                         LogwrtResult.Write = WriteRqst.Write;
1122                         break;
1123                 }
1124                 Write->curridx = NextBufIdx(Write->curridx);
1125         }
1126
1127         /*
1128          * If asked to flush, do so
1129          */
1130         if (XLByteLT(LogwrtResult.Flush, WriteRqst.Flush) &&
1131                 XLByteLT(LogwrtResult.Flush, LogwrtResult.Write))
1132         {
1133                 /*
1134                  * Could get here without iterating above loop, in which case we
1135                  * might have no open file or the wrong one.  However, we do not
1136                  * need to fsync more than one file.
1137                  */
1138                 if (sync_method != SYNC_METHOD_OPEN)
1139                 {
1140                         if (openLogFile >= 0 &&
1141                          !XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
1142                         {
1143                                 if (close(openLogFile) != 0)
1144                                         elog(PANIC, "close of log file %u, segment %u failed: %m",
1145                                                  openLogId, openLogSeg);
1146                                 openLogFile = -1;
1147                         }
1148                         if (openLogFile < 0)
1149                         {
1150                                 XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1151                                 openLogFile = XLogFileOpen(openLogId, openLogSeg, false);
1152                                 openLogOff = 0;
1153                         }
1154                         issue_xlog_fsync();
1155                 }
1156                 LogwrtResult.Flush = LogwrtResult.Write;
1157         }
1158
1159         /*
1160          * Update shared-memory status
1161          *
1162          * We make sure that the shared 'request' values do not fall behind the
1163          * 'result' values.  This is not absolutely essential, but it saves
1164          * some code in a couple of places.
1165          */
1166         {
1167                 /* use volatile pointer to prevent code rearrangement */
1168                 volatile XLogCtlData *xlogctl = XLogCtl;
1169
1170                 SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
1171                 xlogctl->LogwrtResult = LogwrtResult;
1172                 if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
1173                         xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
1174                 if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
1175                         xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
1176                 SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
1177         }
1178
1179         Write->LogwrtResult = LogwrtResult;
1180 }
1181
1182 /*
1183  * Ensure that all XLOG data through the given position is flushed to disk.
1184  *
1185  * NOTE: this differs from XLogWrite mainly in that the WALWriteLock is not
1186  * already held, and we try to avoid acquiring it if possible.
1187  */
1188 void
1189 XLogFlush(XLogRecPtr record)
1190 {
1191         XLogRecPtr      WriteRqstPtr;
1192         XLogwrtRqst WriteRqst;
1193
1194         if (XLOG_DEBUG)
1195         {
1196                 elog(LOG, "XLogFlush%s%s: request %X/%X; write %X/%X; flush %X/%X\n",
1197                          (IsBootstrapProcessingMode()) ? "(bootstrap)" : "",
1198                          (InRedo) ? "(redo)" : "",
1199                          record.xlogid, record.xrecoff,
1200                          LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
1201                          LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
1202                 fflush(stderr);
1203         }
1204
1205         /* Disabled during REDO */
1206         if (InRedo)
1207                 return;
1208
1209         /* Quick exit if already known flushed */
1210         if (XLByteLE(record, LogwrtResult.Flush))
1211                 return;
1212
1213         START_CRIT_SECTION();
1214
1215         /*
1216          * Since fsync is usually a horribly expensive operation, we try to
1217          * piggyback as much data as we can on each fsync: if we see any more
1218          * data entered into the xlog buffer, we'll write and fsync that too,
1219          * so that the final value of LogwrtResult.Flush is as large as
1220          * possible. This gives us some chance of avoiding another fsync
1221          * immediately after.
1222          */
1223
1224         /* initialize to given target; may increase below */
1225         WriteRqstPtr = record;
1226
1227         /* read LogwrtResult and update local state */
1228         {
1229                 /* use volatile pointer to prevent code rearrangement */
1230                 volatile XLogCtlData *xlogctl = XLogCtl;
1231
1232                 SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
1233                 if (XLByteLT(WriteRqstPtr, xlogctl->LogwrtRqst.Write))
1234                         WriteRqstPtr = xlogctl->LogwrtRqst.Write;
1235                 LogwrtResult = xlogctl->LogwrtResult;
1236                 SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
1237         }
1238
1239         /* done already? */
1240         if (!XLByteLE(record, LogwrtResult.Flush))
1241         {
1242                 /* if something was added to log cache then try to flush this too */
1243                 if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE))
1244                 {
1245                         XLogCtlInsert *Insert = &XLogCtl->Insert;
1246                         uint32          freespace = INSERT_FREESPACE(Insert);
1247
1248                         if (freespace < SizeOfXLogRecord)       /* buffer is full */
1249                                 WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1250                         else
1251                         {
1252                                 WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1253                                 WriteRqstPtr.xrecoff -= freespace;
1254                         }
1255                         LWLockRelease(WALInsertLock);
1256                 }
1257                 /* now wait for the write lock */
1258                 LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
1259                 LogwrtResult = XLogCtl->Write.LogwrtResult;
1260                 if (!XLByteLE(record, LogwrtResult.Flush))
1261                 {
1262                         WriteRqst.Write = WriteRqstPtr;
1263                         WriteRqst.Flush = record;
1264                         XLogWrite(WriteRqst);
1265                 }
1266                 LWLockRelease(WALWriteLock);
1267         }
1268
1269         END_CRIT_SECTION();
1270
1271         /*
1272          * If we still haven't flushed to the request point then we have a
1273          * problem; most likely, the requested flush point is past end of XLOG.
1274          * This has been seen to occur when a disk page has a corrupted LSN.
1275          *
1276          * Formerly we treated this as a PANIC condition, but that hurts the
1277          * system's robustness rather than helping it: we do not want to take
1278          * down the whole system due to corruption on one data page.  In
1279          * particular, if the bad page is encountered again during recovery then
1280          * we would be unable to restart the database at all!  (This scenario
1281          * has actually happened in the field several times with 7.1 releases.
1282          * Note that we cannot get here while InRedo is true, but if the bad
1283          * page is brought in and marked dirty during recovery then
1284          * CreateCheckpoint will try to flush it at the end of recovery.)
1285          *
1286          * The current approach is to ERROR under normal conditions, but only
1287          * NOTICE during recovery, so that the system can be brought up even if
1288          * there's a corrupt LSN.  Note that for calls from xact.c, the ERROR
1289          * will be promoted to PANIC since xact.c calls this routine inside a
1290          * critical section.  However, calls from bufmgr.c are not within
1291          * critical sections and so we will not force a restart for a bad LSN
1292          * on a data page.
1293          */
1294         if (XLByteLT(LogwrtResult.Flush, record))
1295                 elog(InRecovery ? NOTICE : ERROR,
1296                          "XLogFlush: request %X/%X is not satisfied --- flushed only to %X/%X",
1297                          record.xlogid, record.xrecoff,
1298                          LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
1299 }
1300
1301 /*
1302  * Create a new XLOG file segment, or open a pre-existing one.
1303  *
1304  * log, seg: identify segment to be created/opened.
1305  *
1306  * *use_existent: if TRUE, OK to use a pre-existing file (else, any
1307  * pre-existing file will be deleted).  On return, TRUE if a pre-existing
1308  * file was used.
1309  *
1310  * use_lock: if TRUE, acquire ControlFileLock while moving file into
1311  * place.  This should be TRUE except during bootstrap log creation.  The
1312  * caller must *not* hold the lock at call.
1313  *
1314  * Returns FD of opened file.
1315  */
1316 static int
1317 XLogFileInit(uint32 log, uint32 seg,
1318                          bool *use_existent, bool use_lock)
1319 {
1320         char            path[MAXPGPATH];
1321         char            tmppath[MAXPGPATH];
1322         char            zbuffer[BLCKSZ];
1323         int                     fd;
1324         int                     nbytes;
1325
1326         XLogFileName(path, log, seg);
1327
1328         /*
1329          * Try to use existent file (checkpoint maker may have created it
1330          * already)
1331          */
1332         if (*use_existent)
1333         {
1334                 fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
1335                                                    S_IRUSR | S_IWUSR);
1336                 if (fd < 0)
1337                 {
1338                         if (errno != ENOENT)
1339                                 elog(PANIC, "open of %s (log file %u, segment %u) failed: %m",
1340                                          path, log, seg);
1341                 }
1342                 else
1343                         return (fd);
1344         }
1345
1346         /*
1347          * Initialize an empty (all zeroes) segment.  NOTE: it is possible
1348          * that another process is doing the same thing.  If so, we will end
1349          * up pre-creating an extra log segment.  That seems OK, and better
1350          * than holding the lock throughout this lengthy process.
1351          */
1352         snprintf(tmppath, MAXPGPATH, "%s/xlogtemp.%d",
1353                          XLogDir, (int) getpid());
1354
1355         unlink(tmppath);
1356
1357         /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
1358         fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
1359                                            S_IRUSR | S_IWUSR);
1360         if (fd < 0)
1361                 elog(PANIC, "creation of file %s failed: %m", tmppath);
1362
1363         /*
1364          * Zero-fill the file.  We have to do this the hard way to ensure that
1365          * all the file space has really been allocated --- on platforms that
1366          * allow "holes" in files, just seeking to the end doesn't allocate
1367          * intermediate space.  This way, we know that we have all the space
1368          * and (after the fsync below) that all the indirect blocks are down
1369          * on disk.  Therefore, fdatasync(2) or O_DSYNC will be sufficient to
1370          * sync future writes to the log file.
1371          */
1372         MemSet(zbuffer, 0, sizeof(zbuffer));
1373         for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(zbuffer))
1374         {
1375                 errno = 0;
1376                 if ((int) write(fd, zbuffer, sizeof(zbuffer)) != (int) sizeof(zbuffer))
1377                 {
1378                         int                     save_errno = errno;
1379
1380                         /*
1381                          * If we fail to make the file, delete it to release disk
1382                          * space
1383                          */
1384                         unlink(tmppath);
1385                         /* if write didn't set errno, assume problem is no disk space */
1386                         errno = save_errno ? save_errno : ENOSPC;
1387
1388                         elog(PANIC, "ZeroFill failed to write %s: %m", tmppath);
1389                 }
1390         }
1391
1392         if (pg_fsync(fd) != 0)
1393                 elog(PANIC, "fsync of file %s failed: %m", tmppath);
1394
1395         close(fd);
1396
1397         /*
1398          * Now move the segment into place with its final name.
1399          *
1400          * If caller didn't want to use a pre-existing file, get rid of any
1401          * pre-existing file.  Otherwise, cope with possibility that someone
1402          * else has created the file while we were filling ours: if so, use
1403          * ours to pre-create a future log segment.
1404          */
1405         if (!InstallXLogFileSegment(log, seg, tmppath,
1406                                                                 *use_existent, XLOGfiles + XLOGfileslop,
1407                                                                 use_lock))
1408         {
1409                 /* No need for any more future segments... */
1410                 unlink(tmppath);
1411         }
1412
1413         /* Set flag to tell caller there was no existent file */
1414         *use_existent = false;
1415
1416         /* Now open original target segment (might not be file I just made) */
1417         fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
1418                                            S_IRUSR | S_IWUSR);
1419         if (fd < 0)
1420                 elog(PANIC, "open of %s (log file %u, segment %u) failed: %m",
1421                          path, log, seg);
1422
1423         return (fd);
1424 }
1425
1426 /*
1427  * Install a new XLOG segment file as a current or future log segment.
1428  *
1429  * This is used both to install a newly-created segment (which has a temp
1430  * filename while it's being created) and to recycle an old segment.
1431  *
1432  * log, seg: identify segment to install as (or first possible target).
1433  *
1434  * tmppath: initial name of file to install.  It will be renamed into place.
1435  *
1436  * find_free: if TRUE, install the new segment at the first empty log/seg
1437  * number at or after the passed numbers.  If FALSE, install the new segment
1438  * exactly where specified, deleting any existing segment file there.
1439  *
1440  * max_advance: maximum number of log/seg slots to advance past the starting
1441  * point.  Fail if no free slot is found in this range.  (Irrelevant if
1442  * find_free is FALSE.)
1443  *
1444  * use_lock: if TRUE, acquire ControlFileLock while moving file into
1445  * place.  This should be TRUE except during bootstrap log creation.  The
1446  * caller must *not* hold the lock at call.
1447  *
1448  * Returns TRUE if file installed, FALSE if not installed because of
1449  * exceeding max_advance limit.  (Any other kind of failure causes elog().)
1450  */
1451 static bool
1452 InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath,
1453                                            bool find_free, int max_advance,
1454                                            bool use_lock)
1455 {
1456         char            path[MAXPGPATH];
1457         int                     fd;
1458
1459         XLogFileName(path, log, seg);
1460
1461         /*
1462          * We want to be sure that only one process does this at a time.
1463          */
1464         if (use_lock)
1465                 LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
1466
1467         if (!find_free)
1468         {
1469                 /* Force installation: get rid of any pre-existing segment file */
1470                 unlink(path);
1471         }
1472         else
1473         {
1474                 /* Find a free slot to put it in */
1475                 while ((fd = BasicOpenFile(path, O_RDWR | PG_BINARY,
1476                                                                    S_IRUSR | S_IWUSR)) >= 0)
1477                 {
1478                         close(fd);
1479                         if (--max_advance < 0)
1480                         {
1481                                 /* Failed to find a free slot within specified range */
1482                                 if (use_lock)
1483                                         LWLockRelease(ControlFileLock);
1484                                 return false;
1485                         }
1486                         NextLogSeg(log, seg);
1487                         XLogFileName(path, log, seg);
1488                 }
1489         }
1490
1491         /*
1492          * Prefer link() to rename() here just to be really sure that we don't
1493          * overwrite an existing logfile.  However, there shouldn't be one, so
1494          * rename() is an acceptable substitute except for the truly paranoid.
1495          */
1496 #ifndef __BEOS__
1497         if (link(tmppath, path) < 0)
1498                 elog(PANIC, "link from %s to %s (initialization of log file %u, segment %u) failed: %m",
1499                          tmppath, path, log, seg);
1500         unlink(tmppath);
1501 #else
1502         if (rename(tmppath, path) < 0)
1503                 elog(PANIC, "rename from %s to %s (initialization of log file %u, segment %u) failed: %m",
1504                          tmppath, path, log, seg);
1505 #endif
1506
1507         if (use_lock)
1508                 LWLockRelease(ControlFileLock);
1509
1510         return true;
1511 }
1512
1513 /*
1514  * Open a pre-existing logfile segment.
1515  */
1516 static int
1517 XLogFileOpen(uint32 log, uint32 seg, bool econt)
1518 {
1519         char            path[MAXPGPATH];
1520         int                     fd;
1521
1522         XLogFileName(path, log, seg);
1523
1524         fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
1525                                            S_IRUSR | S_IWUSR);
1526         if (fd < 0)
1527         {
1528                 if (econt && errno == ENOENT)
1529                 {
1530                         elog(LOG, "open of %s (log file %u, segment %u) failed: %m",
1531                                  path, log, seg);
1532                         return (fd);
1533                 }
1534                 elog(PANIC, "open of %s (log file %u, segment %u) failed: %m",
1535                          path, log, seg);
1536         }
1537
1538         return (fd);
1539 }
1540
1541 /*
1542  * Preallocate log files beyond the specified log endpoint, according to
1543  * the XLOGfile user parameter.
1544  */
1545 static void
1546 PreallocXlogFiles(XLogRecPtr endptr)
1547 {
1548         uint32          _logId;
1549         uint32          _logSeg;
1550         int                     lf;
1551         bool            use_existent;
1552         int                     i;
1553
1554         XLByteToPrevSeg(endptr, _logId, _logSeg);
1555         if (XLOGfiles > 0)
1556         {
1557                 for (i = 1; i <= XLOGfiles; i++)
1558                 {
1559                         NextLogSeg(_logId, _logSeg);
1560                         use_existent = true;
1561                         lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
1562                         close(lf);
1563                 }
1564         }
1565         else if ((endptr.xrecoff - 1) % XLogSegSize >=
1566                          (uint32) (0.75 * XLogSegSize))
1567         {
1568                 NextLogSeg(_logId, _logSeg);
1569                 use_existent = true;
1570                 lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
1571                 close(lf);
1572         }
1573 }
1574
1575 /*
1576  * Remove or move offline all log files older or equal to passed log/seg#
1577  *
1578  * endptr is current (or recent) end of xlog; this is used to determine
1579  * whether we want to recycle rather than delete no-longer-wanted log files.
1580  */
1581 static void
1582 MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr)
1583 {
1584         uint32          endlogId;
1585         uint32          endlogSeg;
1586         DIR                *xldir;
1587         struct dirent *xlde;
1588         char            lastoff[32];
1589         char            path[MAXPGPATH];
1590
1591         XLByteToPrevSeg(endptr, endlogId, endlogSeg);
1592
1593         xldir = opendir(XLogDir);
1594         if (xldir == NULL)
1595                 elog(PANIC, "could not open transaction log directory (%s): %m",
1596                          XLogDir);
1597
1598         sprintf(lastoff, "%08X%08X", log, seg);
1599
1600         errno = 0;
1601         while ((xlde = readdir(xldir)) != NULL)
1602         {
1603                 if (strlen(xlde->d_name) == 16 &&
1604                         strspn(xlde->d_name, "0123456789ABCDEF") == 16 &&
1605                         strcmp(xlde->d_name, lastoff) <= 0)
1606                 {
1607                         snprintf(path, MAXPGPATH, "%s/%s", XLogDir, xlde->d_name);
1608                         if (XLOG_archive_dir[0])
1609                         {
1610                                 elog(LOG, "archiving transaction log file %s",
1611                                          xlde->d_name);
1612                                 elog(NOTICE, "archiving log files is not implemented!");
1613                         }
1614                         else
1615                         {
1616                                 /*
1617                                  * Before deleting the file, see if it can be recycled as
1618                                  * a future log segment.  We allow recycling segments up
1619                                  * to XLOGfiles + XLOGfileslop segments beyond the current
1620                                  * XLOG location.
1621                                  */
1622                                 if (InstallXLogFileSegment(endlogId, endlogSeg, path,
1623                                                                                    true, XLOGfiles + XLOGfileslop,
1624                                                                                    true))
1625                                 {
1626                                         elog(LOG, "recycled transaction log file %s",
1627                                                  xlde->d_name);
1628                                 }
1629                                 else
1630                                 {
1631                                         /* No need for any more future segments... */
1632                                         elog(LOG, "removing transaction log file %s",
1633                                                  xlde->d_name);
1634                                         unlink(path);
1635                                 }
1636                         }
1637                 }
1638                 errno = 0;
1639         }
1640         if (errno)
1641                 elog(PANIC, "could not read transaction log directory (%s): %m",
1642                          XLogDir);
1643         closedir(xldir);
1644 }
1645
1646 /*
1647  * Restore the backup blocks present in an XLOG record, if any.
1648  *
1649  * We assume all of the record has been read into memory at *record.
1650  */
1651 static void
1652 RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
1653 {
1654         Relation        reln;
1655         Buffer          buffer;
1656         Page            page;
1657         BkpBlock        bkpb;
1658         char       *blk;
1659         int                     i;
1660
1661         blk = (char *) XLogRecGetData(record) + record->xl_len;
1662         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
1663         {
1664                 if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
1665                         continue;
1666
1667                 memcpy((char *) &bkpb, blk, sizeof(BkpBlock));
1668                 blk += sizeof(BkpBlock);
1669
1670                 reln = XLogOpenRelation(true, record->xl_rmid, bkpb.node);
1671
1672                 if (reln)
1673                 {
1674                         buffer = XLogReadBuffer(true, reln, bkpb.block);
1675                         if (BufferIsValid(buffer))
1676                         {
1677                                 page = (Page) BufferGetPage(buffer);
1678                                 memcpy((char *) page, blk, BLCKSZ);
1679                                 PageSetLSN(page, lsn);
1680                                 PageSetSUI(page, ThisStartUpID);
1681                                 UnlockAndWriteBuffer(buffer);
1682                         }
1683                 }
1684
1685                 blk += BLCKSZ;
1686         }
1687 }
1688
1689 /*
1690  * CRC-check an XLOG record.  We do not believe the contents of an XLOG
1691  * record (other than to the minimal extent of computing the amount of
1692  * data to read in) until we've checked the CRCs.
1693  *
1694  * We assume all of the record has been read into memory at *record.
1695  */
1696 static bool
1697 RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
1698 {
1699         crc64           crc;
1700         crc64           cbuf;
1701         int                     i;
1702         uint32          len = record->xl_len;
1703         char       *blk;
1704
1705         /* Check CRC of rmgr data and record header */
1706         INIT_CRC64(crc);
1707         COMP_CRC64(crc, XLogRecGetData(record), len);
1708         COMP_CRC64(crc, (char *) record + sizeof(crc64),
1709                            SizeOfXLogRecord - sizeof(crc64));
1710         FIN_CRC64(crc);
1711
1712         if (!EQ_CRC64(record->xl_crc, crc))
1713         {
1714                 elog(emode, "ReadRecord: bad resource manager data checksum in record at %X/%X",
1715                          recptr.xlogid, recptr.xrecoff);
1716                 return (false);
1717         }
1718
1719         /* Check CRCs of backup blocks, if any */
1720         blk = (char *) XLogRecGetData(record) + len;
1721         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
1722         {
1723                 if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
1724                         continue;
1725
1726                 INIT_CRC64(crc);
1727                 COMP_CRC64(crc, blk + sizeof(BkpBlock), BLCKSZ);
1728                 COMP_CRC64(crc, blk + sizeof(crc64),
1729                                    sizeof(BkpBlock) - sizeof(crc64));
1730                 FIN_CRC64(crc);
1731                 memcpy((char *) &cbuf, blk, sizeof(crc64));             /* don't assume
1732                                                                                                                  * alignment */
1733
1734                 if (!EQ_CRC64(cbuf, crc))
1735                 {
1736                         elog(emode, "ReadRecord: bad checksum of backup block %d in record at %X/%X",
1737                                  i + 1, recptr.xlogid, recptr.xrecoff);
1738                         return (false);
1739                 }
1740                 blk += sizeof(BkpBlock) + BLCKSZ;
1741         }
1742
1743         return (true);
1744 }
1745
1746 /*
1747  * Attempt to read an XLOG record.
1748  *
1749  * If RecPtr is not NULL, try to read a record at that position.  Otherwise
1750  * try to read a record just after the last one previously read.
1751  *
1752  * If no valid record is available, returns NULL, or fails if emode is PANIC.
1753  * (emode must be either PANIC or LOG.)
1754  *
1755  * buffer is a workspace at least _INTL_MAXLOGRECSZ bytes long.  It is needed
1756  * to reassemble a record that crosses block boundaries.  Note that on
1757  * successful return, the returned record pointer always points at buffer.
1758  */
1759 static XLogRecord *
1760 ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer)
1761 {
1762         XLogRecord *record;
1763         XLogRecPtr      tmpRecPtr = EndRecPtr;
1764         uint32          len,
1765                                 total_len;
1766         uint32          targetPageOff;
1767         unsigned        i;
1768         bool            nextmode = false;
1769
1770         if (readBuf == NULL)
1771         {
1772                 /*
1773                  * First time through, permanently allocate readBuf.  We do it
1774                  * this way, rather than just making a static array, for two
1775                  * reasons: (1) no need to waste the storage in most
1776                  * instantiations of the backend; (2) a static char array isn't
1777                  * guaranteed to have any particular alignment, whereas malloc()
1778                  * will provide MAXALIGN'd storage.
1779                  */
1780                 readBuf = (char *) malloc(BLCKSZ);
1781                 Assert(readBuf != NULL);
1782         }
1783
1784         if (RecPtr == NULL)
1785         {
1786                 RecPtr = &tmpRecPtr;
1787                 nextmode = true;
1788                 /* fast case if next record is on same page */
1789                 if (nextRecord != NULL)
1790                 {
1791                         record = nextRecord;
1792                         goto got_record;
1793                 }
1794                 /* align old recptr to next page */
1795                 if (tmpRecPtr.xrecoff % BLCKSZ != 0)
1796                         tmpRecPtr.xrecoff += (BLCKSZ - tmpRecPtr.xrecoff % BLCKSZ);
1797                 if (tmpRecPtr.xrecoff >= XLogFileSize)
1798                 {
1799                         (tmpRecPtr.xlogid)++;
1800                         tmpRecPtr.xrecoff = 0;
1801                 }
1802                 tmpRecPtr.xrecoff += SizeOfXLogPHD;
1803         }
1804         else if (!XRecOffIsValid(RecPtr->xrecoff))
1805                 elog(PANIC, "ReadRecord: invalid record offset at %X/%X",
1806                          RecPtr->xlogid, RecPtr->xrecoff);
1807
1808         if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
1809         {
1810                 close(readFile);
1811                 readFile = -1;
1812         }
1813         XLByteToSeg(*RecPtr, readId, readSeg);
1814         if (readFile < 0)
1815         {
1816                 readFile = XLogFileOpen(readId, readSeg, (emode == LOG));
1817                 if (readFile < 0)
1818                         goto next_record_is_invalid;
1819                 readOff = (uint32) (-1);        /* force read to occur below */
1820         }
1821
1822         targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / BLCKSZ) * BLCKSZ;
1823         if (readOff != targetPageOff)
1824         {
1825                 readOff = targetPageOff;
1826                 if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
1827                 {
1828                         elog(emode, "ReadRecord: lseek of log file %u, segment %u, offset %u failed: %m",
1829                                  readId, readSeg, readOff);
1830                         goto next_record_is_invalid;
1831                 }
1832                 if (read(readFile, readBuf, BLCKSZ) != BLCKSZ)
1833                 {
1834                         elog(emode, "ReadRecord: read of log file %u, segment %u, offset %u failed: %m",
1835                                  readId, readSeg, readOff);
1836                         goto next_record_is_invalid;
1837                 }
1838                 if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode, nextmode))
1839                         goto next_record_is_invalid;
1840         }
1841         if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
1842                 RecPtr->xrecoff % BLCKSZ == SizeOfXLogPHD)
1843         {
1844                 elog(emode, "ReadRecord: contrecord is requested by %X/%X",
1845                          RecPtr->xlogid, RecPtr->xrecoff);
1846                 goto next_record_is_invalid;
1847         }
1848         record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % BLCKSZ);
1849
1850 got_record:;
1851
1852         /*
1853          * Currently, xl_len == 0 must be bad data, but that might not be true
1854          * forever.  See note in XLogInsert.
1855          */
1856         if (record->xl_len == 0)
1857         {
1858                 elog(emode, "ReadRecord: record with zero length at %X/%X",
1859                          RecPtr->xlogid, RecPtr->xrecoff);
1860                 goto next_record_is_invalid;
1861         }
1862
1863         /*
1864          * Compute total length of record including any appended backup
1865          * blocks.
1866          */
1867         total_len = SizeOfXLogRecord + record->xl_len;
1868         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
1869         {
1870                 if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
1871                         continue;
1872                 total_len += sizeof(BkpBlock) + BLCKSZ;
1873         }
1874
1875         /*
1876          * Make sure it will fit in buffer (currently, it is mechanically
1877          * impossible for this test to fail, but it seems like a good idea
1878          * anyway).
1879          */
1880         if (total_len > _INTL_MAXLOGRECSZ)
1881         {
1882                 elog(emode, "ReadRecord: record length %u at %X/%X too long",
1883                          total_len, RecPtr->xlogid, RecPtr->xrecoff);
1884                 goto next_record_is_invalid;
1885         }
1886         if (record->xl_rmid > RM_MAX_ID)
1887         {
1888                 elog(emode, "ReadRecord: invalid resource manager id %u at %X/%X",
1889                          record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff);
1890                 goto next_record_is_invalid;
1891         }
1892         nextRecord = NULL;
1893         len = BLCKSZ - RecPtr->xrecoff % BLCKSZ;
1894         if (total_len > len)
1895         {
1896                 /* Need to reassemble record */
1897                 XLogContRecord *contrecord;
1898                 uint32          gotlen = len;
1899
1900                 memcpy(buffer, record, len);
1901                 record = (XLogRecord *) buffer;
1902                 buffer += len;
1903                 for (;;)
1904                 {
1905                         readOff += BLCKSZ;
1906                         if (readOff >= XLogSegSize)
1907                         {
1908                                 close(readFile);
1909                                 readFile = -1;
1910                                 NextLogSeg(readId, readSeg);
1911                                 readFile = XLogFileOpen(readId, readSeg, (emode == LOG));
1912                                 if (readFile < 0)
1913                                         goto next_record_is_invalid;
1914                                 readOff = 0;
1915                         }
1916                         if (read(readFile, readBuf, BLCKSZ) != BLCKSZ)
1917                         {
1918                                 elog(emode, "ReadRecord: read of log file %u, segment %u, offset %u failed: %m",
1919                                          readId, readSeg, readOff);
1920                                 goto next_record_is_invalid;
1921                         }
1922                         if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode, true))
1923                                 goto next_record_is_invalid;
1924                         if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD))
1925                         {
1926                                 elog(emode, "ReadRecord: there is no ContRecord flag in log file %u, segment %u, offset %u",
1927                                          readId, readSeg, readOff);
1928                                 goto next_record_is_invalid;
1929                         }
1930                         contrecord = (XLogContRecord *) ((char *) readBuf + SizeOfXLogPHD);
1931                         if (contrecord->xl_rem_len == 0 ||
1932                                 total_len != (contrecord->xl_rem_len + gotlen))
1933                         {
1934                                 elog(emode, "ReadRecord: invalid ContRecord length %u in log file %u, segment %u, offset %u",
1935                                          contrecord->xl_rem_len, readId, readSeg, readOff);
1936                                 goto next_record_is_invalid;
1937                         }
1938                         len = BLCKSZ - SizeOfXLogPHD - SizeOfXLogContRecord;
1939                         if (contrecord->xl_rem_len > len)
1940                         {
1941                                 memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord, len);
1942                                 gotlen += len;
1943                                 buffer += len;
1944                                 continue;
1945                         }
1946                         memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord,
1947                                    contrecord->xl_rem_len);
1948                         break;
1949                 }
1950                 if (!RecordIsValid(record, *RecPtr, emode))
1951                         goto next_record_is_invalid;
1952                 if (BLCKSZ - SizeOfXLogRecord >= SizeOfXLogPHD +
1953                         SizeOfXLogContRecord + MAXALIGN(contrecord->xl_rem_len))
1954                 {
1955                         nextRecord = (XLogRecord *) ((char *) contrecord +
1956                                 SizeOfXLogContRecord + MAXALIGN(contrecord->xl_rem_len));
1957                 }
1958                 EndRecPtr.xlogid = readId;
1959                 EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff +
1960                         SizeOfXLogPHD + SizeOfXLogContRecord +
1961                         MAXALIGN(contrecord->xl_rem_len);
1962                 ReadRecPtr = *RecPtr;
1963                 return record;
1964         }
1965
1966         /* Record does not cross a page boundary */
1967         if (!RecordIsValid(record, *RecPtr, emode))
1968                 goto next_record_is_invalid;
1969         if (BLCKSZ - SizeOfXLogRecord >= RecPtr->xrecoff % BLCKSZ +
1970                 MAXALIGN(total_len))
1971                 nextRecord = (XLogRecord *) ((char *) record + MAXALIGN(total_len));
1972         EndRecPtr.xlogid = RecPtr->xlogid;
1973         EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);
1974         ReadRecPtr = *RecPtr;
1975         memcpy(buffer, record, total_len);
1976         return (XLogRecord *) buffer;
1977
1978 next_record_is_invalid:;
1979         close(readFile);
1980         readFile = -1;
1981         nextRecord = NULL;
1982         return NULL;
1983 }
1984
1985 /*
1986  * Check whether the xlog header of a page just read in looks valid.
1987  *
1988  * This is just a convenience subroutine to avoid duplicated code in
1989  * ReadRecord.  It's not intended for use from anywhere else.
1990  */
1991 static bool
1992 ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI)
1993 {
1994         XLogRecPtr      recaddr;
1995
1996         if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
1997         {
1998                 elog(emode, "ReadRecord: invalid magic number %04X in log file %u, segment %u, offset %u",
1999                          hdr->xlp_magic, readId, readSeg, readOff);
2000                 return false;
2001         }
2002         if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
2003         {
2004                 elog(emode, "ReadRecord: invalid info bits %04X in log file %u, segment %u, offset %u",
2005                          hdr->xlp_info, readId, readSeg, readOff);
2006                 return false;
2007         }
2008         recaddr.xlogid = readId;
2009         recaddr.xrecoff = readSeg * XLogSegSize + readOff;
2010         if (!XLByteEQ(hdr->xlp_pageaddr, recaddr))
2011         {
2012                 elog(emode, "ReadRecord: unexpected pageaddr %X/%X in log file %u, segment %u, offset %u",
2013                          hdr->xlp_pageaddr.xlogid, hdr->xlp_pageaddr.xrecoff,
2014                          readId, readSeg, readOff);
2015                 return false;
2016         }
2017
2018         /*
2019          * We disbelieve a SUI less than the previous page's SUI, or more than
2020          * a few counts greater.  In theory as many as 512 shutdown checkpoint
2021          * records could appear on a 32K-sized xlog page, so that's the most
2022          * differential there could legitimately be.
2023          *
2024          * Note this check can only be applied when we are reading the next page
2025          * in sequence, so ReadRecord passes a flag indicating whether to
2026          * check.
2027          */
2028         if (checkSUI)
2029         {
2030                 if (hdr->xlp_sui < lastReadSUI ||
2031                         hdr->xlp_sui > lastReadSUI + 512)
2032                 {
2033                         /* translator: SUI = startup id */
2034                         elog(emode, "ReadRecord: out-of-sequence SUI %u (after %u) in log file %u, segment %u, offset %u",
2035                                  hdr->xlp_sui, lastReadSUI, readId, readSeg, readOff);
2036                         return false;
2037                 }
2038         }
2039         lastReadSUI = hdr->xlp_sui;
2040         return true;
2041 }
2042
2043 /*
2044  * I/O routines for pg_control
2045  *
2046  * *ControlFile is a buffer in shared memory that holds an image of the
2047  * contents of pg_control.      WriteControlFile() initializes pg_control
2048  * given a preloaded buffer, ReadControlFile() loads the buffer from
2049  * the pg_control file (during postmaster or standalone-backend startup),
2050  * and UpdateControlFile() rewrites pg_control after we modify xlog state.
2051  *
2052  * For simplicity, WriteControlFile() initializes the fields of pg_control
2053  * that are related to checking backend/database compatibility, and
2054  * ReadControlFile() verifies they are correct.  We could split out the
2055  * I/O and compatibility-check functions, but there seems no need currently.
2056  */
2057
2058 void
2059 XLOGPathInit(void)
2060 {
2061         /* Init XLOG file paths */
2062         snprintf(XLogDir, MAXPGPATH, "%s/pg_xlog", DataDir);
2063         snprintf(ControlFilePath, MAXPGPATH, "%s/global/pg_control", DataDir);
2064 }
2065
2066 static void
2067 WriteControlFile(void)
2068 {
2069         int                     fd;
2070         char            buffer[BLCKSZ]; /* need not be aligned */
2071
2072 #ifdef USE_LOCALE
2073         char       *localeptr;
2074 #endif
2075
2076         /*
2077          * Initialize version and compatibility-check fields
2078          */
2079         ControlFile->pg_control_version = PG_CONTROL_VERSION;
2080         ControlFile->catalog_version_no = CATALOG_VERSION_NO;
2081         ControlFile->blcksz = BLCKSZ;
2082         ControlFile->relseg_size = RELSEG_SIZE;
2083 #ifdef USE_LOCALE
2084         localeptr = setlocale(LC_COLLATE, NULL);
2085         if (!localeptr)
2086                 elog(PANIC, "invalid LC_COLLATE setting");
2087         StrNCpy(ControlFile->lc_collate, localeptr, LOCALE_NAME_BUFLEN);
2088         localeptr = setlocale(LC_CTYPE, NULL);
2089         if (!localeptr)
2090                 elog(PANIC, "invalid LC_CTYPE setting");
2091         StrNCpy(ControlFile->lc_ctype, localeptr, LOCALE_NAME_BUFLEN);
2092
2093         /*
2094          * Issue warning notice if initdb'ing in a locale that will not permit
2095          * LIKE index optimization.  This is not a clean place to do it, but I
2096          * don't see a better place either...
2097          */
2098         if (!locale_is_like_safe())
2099                 elog(NOTICE, "Initializing database with %s collation order."
2100                          "\n\tThis locale setting will prevent use of index optimization for"
2101                          "\n\tLIKE and regexp searches.  If you are concerned about speed of"
2102                   "\n\tsuch queries, you may wish to set LC_COLLATE to \"C\" and"
2103                          "\n\tre-initdb.  For more information see the Administrator's Guide.",
2104                          ControlFile->lc_collate);
2105 #else                                                   /* not USE_LOCALE */
2106         strcpy(ControlFile->lc_collate, "C");
2107         strcpy(ControlFile->lc_ctype, "C");
2108 #endif   /* not USE_LOCALE */
2109
2110         /* Contents are protected with a CRC */
2111         INIT_CRC64(ControlFile->crc);
2112         COMP_CRC64(ControlFile->crc,
2113                            (char *) ControlFile + sizeof(crc64),
2114                            sizeof(ControlFileData) - sizeof(crc64));
2115         FIN_CRC64(ControlFile->crc);
2116
2117         /*
2118          * We write out BLCKSZ bytes into pg_control, zero-padding the excess
2119          * over sizeof(ControlFileData).  This reduces the odds of
2120          * premature-EOF errors when reading pg_control.  We'll still fail
2121          * when we check the contents of the file, but hopefully with a more
2122          * specific error than "couldn't read pg_control".
2123          */
2124         if (sizeof(ControlFileData) > BLCKSZ)
2125                 elog(PANIC, "sizeof(ControlFileData) is larger than BLCKSZ; fix either one");
2126
2127         memset(buffer, 0, BLCKSZ);
2128         memcpy(buffer, ControlFile, sizeof(ControlFileData));
2129
2130         fd = BasicOpenFile(ControlFilePath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
2131                                            S_IRUSR | S_IWUSR);
2132         if (fd < 0)
2133                 elog(PANIC, "WriteControlFile: could not create control file (%s): %m",
2134                          ControlFilePath);
2135
2136         errno = 0;
2137         if (write(fd, buffer, BLCKSZ) != BLCKSZ)
2138         {
2139                 /* if write didn't set errno, assume problem is no disk space */
2140                 if (errno == 0)
2141                         errno = ENOSPC;
2142                 elog(PANIC, "WriteControlFile: write to control file failed: %m");
2143         }
2144
2145         if (pg_fsync(fd) != 0)
2146                 elog(PANIC, "WriteControlFile: fsync of control file failed: %m");
2147
2148         close(fd);
2149 }
2150
2151 static void
2152 ReadControlFile(void)
2153 {
2154         crc64           crc;
2155         int                     fd;
2156
2157         /*
2158          * Read data...
2159          */
2160         fd = BasicOpenFile(ControlFilePath, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
2161         if (fd < 0)
2162                 elog(PANIC, "could not open control file (%s): %m", ControlFilePath);
2163
2164         if (read(fd, ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
2165                 elog(PANIC, "read from control file failed: %m");
2166
2167         close(fd);
2168
2169         /*
2170          * Check for expected pg_control format version.  If this is wrong,
2171          * the CRC check will likely fail because we'll be checking the wrong
2172          * number of bytes.  Complaining about wrong version will probably be
2173          * more enlightening than complaining about wrong CRC.
2174          */
2175         if (ControlFile->pg_control_version != PG_CONTROL_VERSION)
2176                 elog(PANIC,
2177                          "The database cluster was initialized with PG_CONTROL_VERSION %d,\n"
2178                          "\tbut the server was compiled with PG_CONTROL_VERSION %d.\n"
2179                          "\tIt looks like you need to initdb.",
2180                          ControlFile->pg_control_version, PG_CONTROL_VERSION);
2181
2182         /* Now check the CRC. */
2183         INIT_CRC64(crc);
2184         COMP_CRC64(crc,
2185                            (char *) ControlFile + sizeof(crc64),
2186                            sizeof(ControlFileData) - sizeof(crc64));
2187         FIN_CRC64(crc);
2188
2189         if (!EQ_CRC64(crc, ControlFile->crc))
2190                 elog(PANIC, "invalid checksum in control file");
2191
2192         /*
2193          * Do compatibility checking immediately.  We do this here for 2
2194          * reasons:
2195          *
2196          * (1) if the database isn't compatible with the backend executable, we
2197          * want to abort before we can possibly do any damage;
2198          *
2199          * (2) this code is executed in the postmaster, so the setlocale() will
2200          * propagate to forked backends, which aren't going to read this file
2201          * for themselves.      (These locale settings are considered critical
2202          * compatibility items because they can affect sort order of indexes.)
2203          */
2204         if (ControlFile->catalog_version_no != CATALOG_VERSION_NO)
2205                 elog(PANIC,
2206                          "The database cluster was initialized with CATALOG_VERSION_NO %d,\n"
2207                    "\tbut the backend was compiled with CATALOG_VERSION_NO %d.\n"
2208                          "\tIt looks like you need to initdb.",
2209                          ControlFile->catalog_version_no, CATALOG_VERSION_NO);
2210         if (ControlFile->blcksz != BLCKSZ)
2211                 elog(PANIC,
2212                          "The database cluster was initialized with BLCKSZ %d,\n"
2213                          "\tbut the backend was compiled with BLCKSZ %d.\n"
2214                          "\tIt looks like you need to initdb.",
2215                          ControlFile->blcksz, BLCKSZ);
2216         if (ControlFile->relseg_size != RELSEG_SIZE)
2217                 elog(PANIC,
2218                          "The database cluster was initialized with RELSEG_SIZE %d,\n"
2219                          "\tbut the backend was compiled with RELSEG_SIZE %d.\n"
2220                          "\tIt looks like you need to initdb.",
2221                          ControlFile->relseg_size, RELSEG_SIZE);
2222 #ifdef USE_LOCALE
2223         if (setlocale(LC_COLLATE, ControlFile->lc_collate) == NULL)
2224                 elog(PANIC,
2225                    "The database cluster was initialized with LC_COLLATE '%s',\n"
2226                          "\twhich is not recognized by setlocale().\n"
2227                          "\tIt looks like you need to initdb.",
2228                          ControlFile->lc_collate);
2229         if (setlocale(LC_CTYPE, ControlFile->lc_ctype) == NULL)
2230                 elog(PANIC,
2231                          "The database cluster was initialized with LC_CTYPE '%s',\n"
2232                          "\twhich is not recognized by setlocale().\n"
2233                          "\tIt looks like you need to initdb.",
2234                          ControlFile->lc_ctype);
2235 #else                                                   /* not USE_LOCALE */
2236         if (strcmp(ControlFile->lc_collate, "C") != 0 ||
2237                 strcmp(ControlFile->lc_ctype, "C") != 0)
2238                 elog(PANIC,
2239                 "The database cluster was initialized with LC_COLLATE '%s' and\n"
2240                          "\tLC_CTYPE '%s', but the server was compiled without locale support.\n"
2241                          "\tIt looks like you need to initdb or recompile.",
2242                          ControlFile->lc_collate, ControlFile->lc_ctype);
2243 #endif   /* not USE_LOCALE */
2244 }
2245
2246 void
2247 UpdateControlFile(void)
2248 {
2249         int                     fd;
2250
2251         INIT_CRC64(ControlFile->crc);
2252         COMP_CRC64(ControlFile->crc,
2253                            (char *) ControlFile + sizeof(crc64),
2254                            sizeof(ControlFileData) - sizeof(crc64));
2255         FIN_CRC64(ControlFile->crc);
2256
2257         fd = BasicOpenFile(ControlFilePath, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
2258         if (fd < 0)
2259                 elog(PANIC, "could not open control file (%s): %m", ControlFilePath);
2260
2261         errno = 0;
2262         if (write(fd, ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
2263         {
2264                 /* if write didn't set errno, assume problem is no disk space */
2265                 if (errno == 0)
2266                         errno = ENOSPC;
2267                 elog(PANIC, "write to control file failed: %m");
2268         }
2269
2270         if (pg_fsync(fd) != 0)
2271                 elog(PANIC, "fsync of control file failed: %m");
2272
2273         close(fd);
2274 }
2275
2276 /*
2277  * Initialization of shared memory for XLOG
2278  */
2279
2280 int
2281 XLOGShmemSize(void)
2282 {
2283         if (XLOGbuffers < MinXLOGbuffers)
2284                 XLOGbuffers = MinXLOGbuffers;
2285
2286         return MAXALIGN(sizeof(XLogCtlData) + sizeof(XLogRecPtr) * XLOGbuffers)
2287                 + BLCKSZ * XLOGbuffers +
2288                 MAXALIGN(sizeof(ControlFileData));
2289 }
2290
2291 void
2292 XLOGShmemInit(void)
2293 {
2294         bool            found;
2295
2296         /* this must agree with space requested by XLOGShmemSize() */
2297         if (XLOGbuffers < MinXLOGbuffers)
2298                 XLOGbuffers = MinXLOGbuffers;
2299
2300         XLogCtl = (XLogCtlData *)
2301                 ShmemInitStruct("XLOG Ctl",
2302                                                 MAXALIGN(sizeof(XLogCtlData) +
2303                                                                  sizeof(XLogRecPtr) * XLOGbuffers)
2304                                                 + BLCKSZ * XLOGbuffers,
2305                                                 &found);
2306         Assert(!found);
2307         ControlFile = (ControlFileData *)
2308                 ShmemInitStruct("Control File", sizeof(ControlFileData), &found);
2309         Assert(!found);
2310
2311         memset(XLogCtl, 0, sizeof(XLogCtlData));
2312
2313         /*
2314          * Since XLogCtlData contains XLogRecPtr fields, its sizeof should be
2315          * a multiple of the alignment for same, so no extra alignment padding
2316          * is needed here.
2317          */
2318         XLogCtl->xlblocks = (XLogRecPtr *)
2319                 (((char *) XLogCtl) + sizeof(XLogCtlData));
2320         memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
2321
2322         /*
2323          * Here, on the other hand, we must MAXALIGN to ensure the page
2324          * buffers have worst-case alignment.
2325          */
2326         XLogCtl->pages =
2327                 ((char *) XLogCtl) + MAXALIGN(sizeof(XLogCtlData) +
2328                                                                           sizeof(XLogRecPtr) * XLOGbuffers);
2329         memset(XLogCtl->pages, 0, BLCKSZ * XLOGbuffers);
2330
2331         /*
2332          * Do basic initialization of XLogCtl shared data. (StartupXLOG will
2333          * fill in additional info.)
2334          */
2335         XLogCtl->XLogCacheByte = BLCKSZ * XLOGbuffers;
2336         XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
2337         XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
2338         SpinLockInit(&XLogCtl->info_lck);
2339
2340         /*
2341          * If we are not in bootstrap mode, pg_control should already exist.
2342          * Read and validate it immediately (see comments in ReadControlFile()
2343          * for the reasons why).
2344          */
2345         if (!IsBootstrapProcessingMode())
2346                 ReadControlFile();
2347 }
2348
2349 /*
2350  * This func must be called ONCE on system install.  It creates pg_control
2351  * and the initial XLOG segment.
2352  */
2353 void
2354 BootStrapXLOG(void)
2355 {
2356         CheckPoint      checkPoint;
2357         char       *buffer;
2358         XLogPageHeader page;
2359         XLogRecord *record;
2360         bool            use_existent;
2361         crc64           crc;
2362
2363         /* Use malloc() to ensure buffer is MAXALIGNED */
2364         buffer = (char *) malloc(BLCKSZ);
2365         page = (XLogPageHeader) buffer;
2366
2367         checkPoint.redo.xlogid = 0;
2368         checkPoint.redo.xrecoff = SizeOfXLogPHD;
2369         checkPoint.undo = checkPoint.redo;
2370         checkPoint.ThisStartUpID = 0;
2371         checkPoint.nextXid = FirstNormalTransactionId;
2372         checkPoint.nextOid = BootstrapObjectIdData;
2373         checkPoint.time = time(NULL);
2374
2375         ShmemVariableCache->nextXid = checkPoint.nextXid;
2376         ShmemVariableCache->nextOid = checkPoint.nextOid;
2377         ShmemVariableCache->oidCount = 0;
2378
2379         memset(buffer, 0, BLCKSZ);
2380         page->xlp_magic = XLOG_PAGE_MAGIC;
2381         page->xlp_info = 0;
2382         page->xlp_sui = checkPoint.ThisStartUpID;
2383         page->xlp_pageaddr.xlogid = 0;
2384         page->xlp_pageaddr.xrecoff = 0;
2385         record = (XLogRecord *) ((char *) page + SizeOfXLogPHD);
2386         record->xl_prev.xlogid = 0;
2387         record->xl_prev.xrecoff = 0;
2388         record->xl_xact_prev = record->xl_prev;
2389         record->xl_xid = InvalidTransactionId;
2390         record->xl_len = sizeof(checkPoint);
2391         record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
2392         record->xl_rmid = RM_XLOG_ID;
2393         memcpy(XLogRecGetData(record), &checkPoint, sizeof(checkPoint));
2394
2395         INIT_CRC64(crc);
2396         COMP_CRC64(crc, &checkPoint, sizeof(checkPoint));
2397         COMP_CRC64(crc, (char *) record + sizeof(crc64),
2398                            SizeOfXLogRecord - sizeof(crc64));
2399         FIN_CRC64(crc);
2400         record->xl_crc = crc;
2401
2402         use_existent = false;
2403         openLogFile = XLogFileInit(0, 0, &use_existent, false);
2404
2405         errno = 0;
2406         if (write(openLogFile, buffer, BLCKSZ) != BLCKSZ)
2407         {
2408                 /* if write didn't set errno, assume problem is no disk space */
2409                 if (errno == 0)
2410                         errno = ENOSPC;
2411                 elog(PANIC, "BootStrapXLOG failed to write log file: %m");
2412         }
2413
2414         if (pg_fsync(openLogFile) != 0)
2415                 elog(PANIC, "BootStrapXLOG failed to fsync log file: %m");
2416
2417         close(openLogFile);
2418         openLogFile = -1;
2419
2420         memset(ControlFile, 0, sizeof(ControlFileData));
2421         /* Initialize pg_control status fields */
2422         ControlFile->state = DB_SHUTDOWNED;
2423         ControlFile->time = checkPoint.time;
2424         ControlFile->logId = 0;
2425         ControlFile->logSeg = 1;
2426         ControlFile->checkPoint = checkPoint.redo;
2427         ControlFile->checkPointCopy = checkPoint;
2428         /* some additional ControlFile fields are set in WriteControlFile() */
2429
2430         WriteControlFile();
2431
2432         /* Bootstrap the commit log, too */
2433         BootStrapCLOG();
2434 }
2435
2436 static char *
2437 str_time(time_t tnow)
2438 {
2439         static char buf[32];
2440
2441         strftime(buf, sizeof(buf),
2442                          "%Y-%m-%d %H:%M:%S %Z",
2443                          localtime(&tnow));
2444
2445         return buf;
2446 }
2447
2448 /*
2449  * This must be called ONCE during postmaster or standalone-backend startup
2450  */
2451 void
2452 StartupXLOG(void)
2453 {
2454         XLogCtlInsert *Insert;
2455         CheckPoint      checkPoint;
2456         bool            wasShutdown;
2457         XLogRecPtr      RecPtr,
2458                                 LastRec,
2459                                 checkPointLoc,
2460                                 EndOfLog;
2461         XLogRecord *record;
2462         char       *buffer;
2463
2464         /* Use malloc() to ensure record buffer is MAXALIGNED */
2465         buffer = (char *) malloc(_INTL_MAXLOGRECSZ);
2466
2467         CritSectionCount++;
2468
2469         /*
2470          * Read control file and check XLOG status looks valid.
2471          *
2472          * Note: in most control paths, *ControlFile is already valid and we need
2473          * not do ReadControlFile() here, but might as well do it to be sure.
2474          */
2475         ReadControlFile();
2476
2477         if (ControlFile->logSeg == 0 ||
2478                 ControlFile->state < DB_SHUTDOWNED ||
2479                 ControlFile->state > DB_IN_PRODUCTION ||
2480                 !XRecOffIsValid(ControlFile->checkPoint.xrecoff))
2481                 elog(PANIC, "control file context is broken");
2482
2483         if (ControlFile->state == DB_SHUTDOWNED)
2484                 elog(LOG, "database system was shut down at %s",
2485                          str_time(ControlFile->time));
2486         else if (ControlFile->state == DB_SHUTDOWNING)
2487                 elog(LOG, "database system shutdown was interrupted at %s",
2488                          str_time(ControlFile->time));
2489         else if (ControlFile->state == DB_IN_RECOVERY)
2490                 elog(LOG, "database system was interrupted being in recovery at %s\n"
2491                          "\tThis probably means that some data blocks are corrupted\n"
2492                          "\tand you will have to use the last backup for recovery.",
2493                          str_time(ControlFile->time));
2494         else if (ControlFile->state == DB_IN_PRODUCTION)
2495                 elog(LOG, "database system was interrupted at %s",
2496                          str_time(ControlFile->time));
2497
2498         /*
2499          * Get the last valid checkpoint record.  If the latest one according
2500          * to pg_control is broken, try the next-to-last one.
2501          */
2502         record = ReadCheckpointRecord(ControlFile->checkPoint, 1, buffer);
2503         if (record != NULL)
2504         {
2505                 checkPointLoc = ControlFile->checkPoint;
2506                 elog(LOG, "checkpoint record is at %X/%X",
2507                          checkPointLoc.xlogid, checkPointLoc.xrecoff);
2508         }
2509         else
2510         {
2511                 record = ReadCheckpointRecord(ControlFile->prevCheckPoint, 2, buffer);
2512                 if (record != NULL)
2513                 {
2514                         checkPointLoc = ControlFile->prevCheckPoint;
2515                         elog(LOG, "using previous checkpoint record at %X/%X",
2516                                  checkPointLoc.xlogid, checkPointLoc.xrecoff);
2517                         InRecovery = true;      /* force recovery even if SHUTDOWNED */
2518                 }
2519                 else
2520                         elog(PANIC, "unable to locate a valid checkpoint record");
2521         }
2522         LastRec = RecPtr = checkPointLoc;
2523         memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
2524         wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
2525
2526         elog(LOG, "redo record is at %X/%X; undo record is at %X/%X; shutdown %s",
2527                  checkPoint.redo.xlogid, checkPoint.redo.xrecoff,
2528                  checkPoint.undo.xlogid, checkPoint.undo.xrecoff,
2529                  wasShutdown ? "TRUE" : "FALSE");
2530         elog(LOG, "next transaction id: %u; next oid: %u",
2531                  checkPoint.nextXid, checkPoint.nextOid);
2532         if (!TransactionIdIsNormal(checkPoint.nextXid))
2533                 elog(PANIC, "invalid next transaction id");
2534
2535         ShmemVariableCache->nextXid = checkPoint.nextXid;
2536         ShmemVariableCache->nextOid = checkPoint.nextOid;
2537         ShmemVariableCache->oidCount = 0;
2538
2539         ThisStartUpID = checkPoint.ThisStartUpID;
2540         RedoRecPtr = XLogCtl->Insert.RedoRecPtr =
2541                 XLogCtl->RedoRecPtr = checkPoint.redo;
2542
2543         if (XLByteLT(RecPtr, checkPoint.redo))
2544                 elog(PANIC, "invalid redo in checkpoint record");
2545         if (checkPoint.undo.xrecoff == 0)
2546                 checkPoint.undo = RecPtr;
2547
2548         if (XLByteLT(checkPoint.undo, RecPtr) ||
2549                 XLByteLT(checkPoint.redo, RecPtr))
2550         {
2551                 if (wasShutdown)
2552                         elog(PANIC, "invalid redo/undo record in shutdown checkpoint");
2553                 InRecovery = true;
2554         }
2555         else if (ControlFile->state != DB_SHUTDOWNED)
2556                 InRecovery = true;
2557
2558         /* REDO */
2559         if (InRecovery)
2560         {
2561                 elog(LOG, "database system was not properly shut down; "
2562                          "automatic recovery in progress");
2563                 ControlFile->state = DB_IN_RECOVERY;
2564                 ControlFile->time = time(NULL);
2565                 UpdateControlFile();
2566
2567                 XLogInitRelationCache();
2568
2569                 /* Is REDO required ? */
2570                 if (XLByteLT(checkPoint.redo, RecPtr))
2571                         record = ReadRecord(&(checkPoint.redo), PANIC, buffer);
2572                 else
2573                 {
2574                         /* read past CheckPoint record */
2575                         record = ReadRecord(NULL, LOG, buffer);
2576                 }
2577
2578                 if (record != NULL)
2579                 {
2580                         InRedo = true;
2581                         elog(LOG, "redo starts at %X/%X",
2582                                  ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
2583                         do
2584                         {
2585                                 /* nextXid must be beyond record's xid */
2586                                 if (TransactionIdFollowsOrEquals(record->xl_xid,
2587                                                                                         ShmemVariableCache->nextXid))
2588                                 {
2589                                         ShmemVariableCache->nextXid = record->xl_xid;
2590                                         TransactionIdAdvance(ShmemVariableCache->nextXid);
2591                                 }
2592                                 if (XLOG_DEBUG)
2593                                 {
2594                                         char            buf[8192];
2595
2596                                         sprintf(buf, "REDO @ %X/%X; LSN %X/%X: ",
2597                                                         ReadRecPtr.xlogid, ReadRecPtr.xrecoff,
2598                                                         EndRecPtr.xlogid, EndRecPtr.xrecoff);
2599                                         xlog_outrec(buf, record);
2600                                         strcat(buf, " - ");
2601                                         RmgrTable[record->xl_rmid].rm_desc(buf,
2602                                                                 record->xl_info, XLogRecGetData(record));
2603                                         elog(LOG, "%s", buf);
2604                                 }
2605
2606                                 if (record->xl_info & XLR_BKP_BLOCK_MASK)
2607                                         RestoreBkpBlocks(record, EndRecPtr);
2608
2609                                 RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
2610                                 record = ReadRecord(NULL, LOG, buffer);
2611                         } while (record != NULL);
2612                         elog(LOG, "redo done at %X/%X",
2613                                  ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
2614                         LastRec = ReadRecPtr;
2615                         InRedo = false;
2616                 }
2617                 else
2618                         elog(LOG, "redo is not required");
2619         }
2620
2621         /*
2622          * Init xlog buffer cache using the block containing the last valid
2623          * record from the previous incarnation.
2624          */
2625         record = ReadRecord(&LastRec, PANIC, buffer);
2626         EndOfLog = EndRecPtr;
2627         XLByteToPrevSeg(EndOfLog, openLogId, openLogSeg);
2628         openLogFile = XLogFileOpen(openLogId, openLogSeg, false);
2629         openLogOff = 0;
2630         ControlFile->logId = openLogId;
2631         ControlFile->logSeg = openLogSeg + 1;
2632         Insert = &XLogCtl->Insert;
2633         Insert->PrevRecord = LastRec;
2634
2635         /*
2636          * If the next record will go to the new page then initialize for that
2637          * one.
2638          */
2639         if ((BLCKSZ - EndOfLog.xrecoff % BLCKSZ) < SizeOfXLogRecord)
2640                 EndOfLog.xrecoff += (BLCKSZ - EndOfLog.xrecoff % BLCKSZ);
2641         if (EndOfLog.xrecoff % BLCKSZ == 0)
2642         {
2643                 XLogRecPtr      NewPageEndPtr;
2644
2645                 NewPageEndPtr = EndOfLog;
2646                 if (NewPageEndPtr.xrecoff >= XLogFileSize)
2647                 {
2648                         /* crossing a logid boundary */
2649                         NewPageEndPtr.xlogid += 1;
2650                         NewPageEndPtr.xrecoff = BLCKSZ;
2651                 }
2652                 else
2653                         NewPageEndPtr.xrecoff += BLCKSZ;
2654                 XLogCtl->xlblocks[0] = NewPageEndPtr;
2655                 Insert->currpage->xlp_magic = XLOG_PAGE_MAGIC;
2656                 if (InRecovery)
2657                         Insert->currpage->xlp_sui = ThisStartUpID;
2658                 else
2659                         Insert->currpage->xlp_sui = ThisStartUpID + 1;
2660                 Insert->currpage->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid;
2661                 Insert->currpage->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - BLCKSZ;
2662                 /* rest of buffer was zeroed in XLOGShmemInit */
2663                 Insert->currpos = (char *) Insert->currpage + SizeOfXLogPHD;
2664         }
2665         else
2666         {
2667                 XLogCtl->xlblocks[0].xlogid = openLogId;
2668                 XLogCtl->xlblocks[0].xrecoff =
2669                         ((EndOfLog.xrecoff - 1) / BLCKSZ + 1) * BLCKSZ;
2670
2671                 /*
2672                  * Tricky point here: readBuf contains the *last* block that the
2673                  * LastRec record spans, not the one it starts in.      The last block
2674                  * is indeed the one we want to use.
2675                  */
2676                 Assert(readOff == (XLogCtl->xlblocks[0].xrecoff - BLCKSZ) % XLogSegSize);
2677                 memcpy((char *) Insert->currpage, readBuf, BLCKSZ);
2678                 Insert->currpos = (char *) Insert->currpage +
2679                         (EndOfLog.xrecoff + BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
2680                 /* Make sure rest of page is zero */
2681                 memset(Insert->currpos, 0, INSERT_FREESPACE(Insert));
2682         }
2683
2684         LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
2685
2686         XLogCtl->Write.LogwrtResult = LogwrtResult;
2687         Insert->LogwrtResult = LogwrtResult;
2688         XLogCtl->LogwrtResult = LogwrtResult;
2689
2690         XLogCtl->LogwrtRqst.Write = EndOfLog;
2691         XLogCtl->LogwrtRqst.Flush = EndOfLog;
2692
2693 #ifdef NOT_USED
2694         /* UNDO */
2695         if (InRecovery)
2696         {
2697                 RecPtr = ReadRecPtr;
2698                 if (XLByteLT(checkPoint.undo, RecPtr))
2699                 {
2700                         elog(LOG, "undo starts at %X/%X",
2701                                  RecPtr.xlogid, RecPtr.xrecoff);
2702                         do
2703                         {
2704                                 record = ReadRecord(&RecPtr, PANIC, buffer);
2705                                 if (TransactionIdIsValid(record->xl_xid) &&
2706                                         !TransactionIdDidCommit(record->xl_xid))
2707                                         RmgrTable[record->xl_rmid].rm_undo(EndRecPtr, record);
2708                                 RecPtr = record->xl_prev;
2709                         } while (XLByteLE(checkPoint.undo, RecPtr));
2710                         elog(LOG, "undo done at %X/%X",
2711                                  ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
2712                 }
2713                 else
2714                         elog(LOG, "undo is not required");
2715         }
2716 #endif
2717
2718         if (InRecovery)
2719         {
2720                 /*
2721                  * In case we had to use the secondary checkpoint, make sure that
2722                  * it will still be shown as the secondary checkpoint after this
2723                  * CreateCheckPoint operation; we don't want the broken primary
2724                  * checkpoint to become prevCheckPoint...
2725                  */
2726                 ControlFile->checkPoint = checkPointLoc;
2727                 CreateCheckPoint(true);
2728                 XLogCloseRelationCache();
2729         }
2730
2731         /*
2732          * Preallocate additional log files, if wanted.
2733          */
2734         PreallocXlogFiles(EndOfLog);
2735
2736         InRecovery = false;
2737
2738         ControlFile->state = DB_IN_PRODUCTION;
2739         ControlFile->time = time(NULL);
2740         UpdateControlFile();
2741
2742         ThisStartUpID++;
2743         XLogCtl->ThisStartUpID = ThisStartUpID;
2744
2745         /* Start up the commit log, too */
2746         StartupCLOG();
2747
2748         elog(LOG, "database system is ready");
2749         CritSectionCount--;
2750
2751         /* Shut down readFile facility, free space */
2752         if (readFile >= 0)
2753         {
2754                 close(readFile);
2755                 readFile = -1;
2756         }
2757         if (readBuf)
2758         {
2759                 free(readBuf);
2760                 readBuf = NULL;
2761         }
2762
2763         free(buffer);
2764 }
2765
2766 /*
2767  * Subroutine to try to fetch and validate a prior checkpoint record.
2768  * whichChkpt = 1 for "primary", 2 for "secondary", merely informative
2769  */
2770 static XLogRecord *
2771 ReadCheckpointRecord(XLogRecPtr RecPtr,
2772                                          int whichChkpt,
2773                                          char *buffer)
2774 {
2775         XLogRecord *record;
2776
2777         if (!XRecOffIsValid(RecPtr.xrecoff))
2778         {
2779                 elog(LOG, (whichChkpt == 1 ?
2780                                    "invalid primary checkpoint link in control file" :
2781                                    "invalid secondary checkpoint link in control file"));
2782                 return NULL;
2783         }
2784
2785         record = ReadRecord(&RecPtr, LOG, buffer);
2786
2787         if (record == NULL)
2788         {
2789                 elog(LOG, (whichChkpt == 1 ?
2790                                    "invalid primary checkpoint record" :
2791                                    "invalid secondary checkpoint record"));
2792                 return NULL;
2793         }
2794         if (record->xl_rmid != RM_XLOG_ID)
2795         {
2796                 elog(LOG, (whichChkpt == 1 ?
2797                          "invalid resource manager id in primary checkpoint record" :
2798                   "invalid resource manager id in secondary checkpoint record"));
2799                 return NULL;
2800         }
2801         if (record->xl_info != XLOG_CHECKPOINT_SHUTDOWN &&
2802                 record->xl_info != XLOG_CHECKPOINT_ONLINE)
2803         {
2804                 elog(LOG, (whichChkpt == 1 ?
2805                                    "invalid xl_info in primary checkpoint record" :
2806                                    "invalid xl_info in secondary checkpoint record"));
2807                 return NULL;
2808         }
2809         if (record->xl_len != sizeof(CheckPoint))
2810         {
2811                 elog(LOG, (whichChkpt == 1 ?
2812                                    "invalid length of primary checkpoint record" :
2813                                    "invalid length of secondary checkpoint record"));
2814                 return NULL;
2815         }
2816         return record;
2817 }
2818
2819 /*
2820  * Postmaster uses this to initialize ThisStartUpID & RedoRecPtr from
2821  * XLogCtlData located in shmem after successful startup.
2822  */
2823 void
2824 SetThisStartUpID(void)
2825 {
2826         ThisStartUpID = XLogCtl->ThisStartUpID;
2827         RedoRecPtr = XLogCtl->RedoRecPtr;
2828 }
2829
2830 /*
2831  * CheckPoint process called by postmaster saves copy of new RedoRecPtr
2832  * in shmem (using SetRedoRecPtr).      When checkpointer completes, postmaster
2833  * calls GetRedoRecPtr to update its own copy of RedoRecPtr, so that
2834  * subsequently-spawned backends will start out with a reasonably up-to-date
2835  * local RedoRecPtr.  Since these operations are not protected by any lock
2836  * and copying an XLogRecPtr isn't atomic, it's unsafe to use either of these
2837  * routines at other times!
2838  *
2839  * Note: once spawned, a backend must update its local RedoRecPtr from
2840  * XLogCtl->Insert.RedoRecPtr while holding the insert lock.  This is
2841  * done in XLogInsert().
2842  */
2843 void
2844 SetRedoRecPtr(void)
2845 {
2846         XLogCtl->RedoRecPtr = RedoRecPtr;
2847 }
2848
2849 void
2850 GetRedoRecPtr(void)
2851 {
2852         RedoRecPtr = XLogCtl->RedoRecPtr;
2853 }
2854
2855 /*
2856  * This must be called ONCE during postmaster or standalone-backend shutdown
2857  */
2858 void
2859 ShutdownXLOG(void)
2860 {
2861         elog(LOG, "shutting down");
2862
2863         /* suppress in-transaction check in CreateCheckPoint */
2864         MyLastRecPtr.xrecoff = 0;
2865
2866         CritSectionCount++;
2867         CreateDummyCaches();
2868         CreateCheckPoint(true);
2869         ShutdownCLOG();
2870         CritSectionCount--;
2871
2872         elog(LOG, "database system is shut down");
2873 }
2874
2875 /*
2876  * Perform a checkpoint --- either during shutdown, or on-the-fly
2877  */
2878 void
2879 CreateCheckPoint(bool shutdown)
2880 {
2881         CheckPoint      checkPoint;
2882         XLogRecPtr      recptr;
2883         XLogCtlInsert *Insert = &XLogCtl->Insert;
2884         XLogRecData rdata;
2885         uint32          freespace;
2886         uint32          _logId;
2887         uint32          _logSeg;
2888
2889         if (MyLastRecPtr.xrecoff != 0)
2890                 elog(ERROR, "CreateCheckPoint: cannot be called inside transaction block");
2891
2892         /*
2893          * The CheckpointLock can be held for quite a while, which is not good
2894          * because we won't respond to a cancel/die request while waiting for
2895          * an LWLock.  (But the alternative of using a regular lock won't work
2896          * for background checkpoint processes, which are not regular
2897          * backends.) So, rather than use a plain LWLockAcquire, use this
2898          * kluge to allow an interrupt to be accepted while we are waiting:
2899          */
2900         while (!LWLockConditionalAcquire(CheckpointLock, LW_EXCLUSIVE))
2901         {
2902                 CHECK_FOR_INTERRUPTS();
2903                 sleep(1);
2904         }
2905
2906         START_CRIT_SECTION();
2907
2908         if (shutdown)
2909         {
2910                 ControlFile->state = DB_SHUTDOWNING;
2911                 ControlFile->time = time(NULL);
2912                 UpdateControlFile();
2913         }
2914
2915         memset(&checkPoint, 0, sizeof(checkPoint));
2916         checkPoint.ThisStartUpID = ThisStartUpID;
2917         checkPoint.time = time(NULL);
2918
2919         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
2920
2921         /*
2922          * If this isn't a shutdown, and we have not inserted any XLOG records
2923          * since the start of the last checkpoint, skip the checkpoint.  The
2924          * idea here is to avoid inserting duplicate checkpoints when the
2925          * system is idle.      That wastes log space, and more importantly it
2926          * exposes us to possible loss of both current and previous checkpoint
2927          * records if the machine crashes just as we're writing the update.
2928          * (Perhaps it'd make even more sense to checkpoint only when the
2929          * previous checkpoint record is in a different xlog page?)
2930          *
2931          * We have to make two tests to determine that nothing has happened since
2932          * the start of the last checkpoint: current insertion point must
2933          * match the end of the last checkpoint record, and its redo pointer
2934          * must point to itself.
2935          */
2936         if (!shutdown)
2937         {
2938                 XLogRecPtr      curInsert;
2939
2940                 INSERT_RECPTR(curInsert, Insert, Insert->curridx);
2941                 if (curInsert.xlogid == ControlFile->checkPoint.xlogid &&
2942                         curInsert.xrecoff == ControlFile->checkPoint.xrecoff +
2943                         MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
2944                         ControlFile->checkPoint.xlogid ==
2945                         ControlFile->checkPointCopy.redo.xlogid &&
2946                         ControlFile->checkPoint.xrecoff ==
2947                         ControlFile->checkPointCopy.redo.xrecoff)
2948                 {
2949                         LWLockRelease(WALInsertLock);
2950                         LWLockRelease(CheckpointLock);
2951                         END_CRIT_SECTION();
2952                         return;
2953                 }
2954         }
2955
2956         /*
2957          * Compute new REDO record ptr = location of next XLOG record.
2958          *
2959          * NB: this is NOT necessarily where the checkpoint record itself will
2960          * be, since other backends may insert more XLOG records while we're
2961          * off doing the buffer flush work.  Those XLOG records are logically
2962          * after the checkpoint, even though physically before it.      Got that?
2963          */
2964         freespace = INSERT_FREESPACE(Insert);
2965         if (freespace < SizeOfXLogRecord)
2966         {
2967                 (void) AdvanceXLInsertBuffer();
2968                 /* OK to ignore update return flag, since we will do flush anyway */
2969                 freespace = BLCKSZ - SizeOfXLogPHD;
2970         }
2971         INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx);
2972
2973         /*
2974          * Here we update the shared RedoRecPtr for future XLogInsert calls;
2975          * this must be done while holding the insert lock.
2976          */
2977         RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
2978
2979         /*
2980          * Get UNDO record ptr - this is oldest of PROC->logRec values. We do
2981          * this while holding insert lock to ensure that we won't miss any
2982          * about-to-commit transactions (UNDO must include all xacts that have
2983          * commits after REDO point).
2984          *
2985          * XXX temporarily ifdef'd out to avoid three-way deadlock condition:
2986          * GetUndoRecPtr needs to grab SInvalLock to ensure that it is looking
2987          * at a stable set of proc records, but grabbing SInvalLock while holding
2988          * WALInsertLock is no good.  GetNewTransactionId may cause a WAL record
2989          * to be written while holding XidGenLock, and GetSnapshotData needs to
2990          * get XidGenLock while holding SInvalLock, so there's a risk of deadlock.
2991          * Need to find a better solution.  See pgsql-hackers discussion of
2992          * 17-Dec-01.
2993          */
2994 #ifdef NOT_USED
2995         checkPoint.undo = GetUndoRecPtr();
2996
2997         if (shutdown && checkPoint.undo.xrecoff != 0)
2998                 elog(PANIC, "active transaction while database system is shutting down");
2999 #endif
3000
3001         /*
3002          * Now we can release insert lock, allowing other xacts to proceed
3003          * even while we are flushing disk buffers.
3004          */
3005         LWLockRelease(WALInsertLock);
3006
3007         LWLockAcquire(XidGenLock, LW_SHARED);
3008         checkPoint.nextXid = ShmemVariableCache->nextXid;
3009         LWLockRelease(XidGenLock);
3010
3011         LWLockAcquire(OidGenLock, LW_SHARED);
3012         checkPoint.nextOid = ShmemVariableCache->nextOid;
3013         if (!shutdown)
3014                 checkPoint.nextOid += ShmemVariableCache->oidCount;
3015         LWLockRelease(OidGenLock);
3016
3017         /*
3018          * Having constructed the checkpoint record, ensure all shmem disk
3019          * buffers are flushed to disk.
3020          */
3021         FlushBufferPool();
3022
3023         /* And commit-log buffers, too */
3024         CheckPointCLOG();
3025
3026         /*
3027          * Now insert the checkpoint record into XLOG.
3028          */
3029         rdata.buffer = InvalidBuffer;
3030         rdata.data = (char *) (&checkPoint);
3031         rdata.len = sizeof(checkPoint);
3032         rdata.next = NULL;
3033
3034         recptr = XLogInsert(RM_XLOG_ID,
3035                                                 shutdown ? XLOG_CHECKPOINT_SHUTDOWN :
3036                                                 XLOG_CHECKPOINT_ONLINE,
3037                                                 &rdata);
3038
3039         XLogFlush(recptr);
3040
3041         /*
3042          * We now have ProcLastRecPtr = start of actual checkpoint record,
3043          * recptr = end of actual checkpoint record.
3044          */
3045         if (shutdown && !XLByteEQ(checkPoint.redo, ProcLastRecPtr))
3046                 elog(PANIC, "concurrent transaction log activity while database system is shutting down");
3047
3048         /*
3049          * Select point at which we can truncate the log, which we base on the
3050          * prior checkpoint's earliest info.
3051          *
3052          * With UNDO support: oldest item is redo or undo, whichever is older;
3053          * but watch out for case that undo = 0.
3054          *
3055          * Without UNDO support: just use the redo pointer.  This allows xlog
3056          * space to be freed much faster when there are long-running
3057          * transactions.
3058          */
3059 #ifdef NOT_USED
3060         if (ControlFile->checkPointCopy.undo.xrecoff != 0 &&
3061                 XLByteLT(ControlFile->checkPointCopy.undo,
3062                                  ControlFile->checkPointCopy.redo))
3063                 XLByteToSeg(ControlFile->checkPointCopy.undo, _logId, _logSeg);
3064         else
3065 #endif
3066                 XLByteToSeg(ControlFile->checkPointCopy.redo, _logId, _logSeg);
3067
3068         /*
3069          * Update the control file.
3070          */
3071         LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
3072         if (shutdown)
3073                 ControlFile->state = DB_SHUTDOWNED;
3074         ControlFile->prevCheckPoint = ControlFile->checkPoint;
3075         ControlFile->checkPoint = ProcLastRecPtr;
3076         ControlFile->checkPointCopy = checkPoint;
3077         ControlFile->time = time(NULL);
3078         UpdateControlFile();
3079         LWLockRelease(ControlFileLock);
3080
3081         /*
3082          * Delete offline log files (those no longer needed even for previous
3083          * checkpoint).
3084          */
3085         if (_logId || _logSeg)
3086         {
3087                 PrevLogSeg(_logId, _logSeg);
3088                 MoveOfflineLogs(_logId, _logSeg, recptr);
3089         }
3090
3091         /*
3092          * Make more log segments if needed.  (Do this after deleting offline
3093          * log segments, to avoid having peak disk space usage higher than
3094          * necessary.)
3095          */
3096         if (!shutdown)
3097                 PreallocXlogFiles(recptr);
3098
3099         LWLockRelease(CheckpointLock);
3100
3101         END_CRIT_SECTION();
3102 }
3103
3104 /*
3105  * Write a NEXTOID log record
3106  */
3107 void
3108 XLogPutNextOid(Oid nextOid)
3109 {
3110         XLogRecData rdata;
3111
3112         rdata.buffer = InvalidBuffer;
3113         rdata.data = (char *) (&nextOid);
3114         rdata.len = sizeof(Oid);
3115         rdata.next = NULL;
3116         (void) XLogInsert(RM_XLOG_ID, XLOG_NEXTOID, &rdata);
3117 }
3118
3119 /*
3120  * XLOG resource manager's routines
3121  */
3122 void
3123 xlog_redo(XLogRecPtr lsn, XLogRecord *record)
3124 {
3125         uint8           info = record->xl_info & ~XLR_INFO_MASK;
3126
3127         if (info == XLOG_NEXTOID)
3128         {
3129                 Oid                     nextOid;
3130
3131                 memcpy(&nextOid, XLogRecGetData(record), sizeof(Oid));
3132                 if (ShmemVariableCache->nextOid < nextOid)
3133                 {
3134                         ShmemVariableCache->nextOid = nextOid;
3135                         ShmemVariableCache->oidCount = 0;
3136                 }
3137         }
3138         else if (info == XLOG_CHECKPOINT_SHUTDOWN)
3139         {
3140                 CheckPoint      checkPoint;
3141
3142                 memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
3143                 /* In a SHUTDOWN checkpoint, believe the counters exactly */
3144                 ShmemVariableCache->nextXid = checkPoint.nextXid;
3145                 ShmemVariableCache->nextOid = checkPoint.nextOid;
3146                 ShmemVariableCache->oidCount = 0;
3147         }
3148         else if (info == XLOG_CHECKPOINT_ONLINE)
3149         {
3150                 CheckPoint      checkPoint;
3151
3152                 memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
3153                 /* In an ONLINE checkpoint, treat the counters like NEXTOID */
3154                 if (TransactionIdPrecedes(ShmemVariableCache->nextXid,
3155                                                                   checkPoint.nextXid))
3156                         ShmemVariableCache->nextXid = checkPoint.nextXid;
3157                 if (ShmemVariableCache->nextOid < checkPoint.nextOid)
3158                 {
3159                         ShmemVariableCache->nextOid = checkPoint.nextOid;
3160                         ShmemVariableCache->oidCount = 0;
3161                 }
3162         }
3163 }
3164
3165 void
3166 xlog_undo(XLogRecPtr lsn, XLogRecord *record)
3167 {
3168 }
3169
3170 void
3171 xlog_desc(char *buf, uint8 xl_info, char *rec)
3172 {
3173         uint8           info = xl_info & ~XLR_INFO_MASK;
3174
3175         if (info == XLOG_CHECKPOINT_SHUTDOWN ||
3176                 info == XLOG_CHECKPOINT_ONLINE)
3177         {
3178                 CheckPoint *checkpoint = (CheckPoint *) rec;
3179
3180                 sprintf(buf + strlen(buf), "checkpoint: redo %X/%X; undo %X/%X; "
3181                                 "sui %u; xid %u; oid %u; %s",
3182                                 checkpoint->redo.xlogid, checkpoint->redo.xrecoff,
3183                                 checkpoint->undo.xlogid, checkpoint->undo.xrecoff,
3184                                 checkpoint->ThisStartUpID, checkpoint->nextXid,
3185                                 checkpoint->nextOid,
3186                          (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
3187         }
3188         else if (info == XLOG_NEXTOID)
3189         {
3190                 Oid                     nextOid;
3191
3192                 memcpy(&nextOid, rec, sizeof(Oid));
3193                 sprintf(buf + strlen(buf), "nextOid: %u", nextOid);
3194         }
3195         else
3196                 strcat(buf, "UNKNOWN");
3197 }
3198
3199 static void
3200 xlog_outrec(char *buf, XLogRecord *record)
3201 {
3202         int                     bkpb;
3203         int                     i;
3204
3205         sprintf(buf + strlen(buf), "prev %X/%X; xprev %X/%X; xid %u",
3206                         record->xl_prev.xlogid, record->xl_prev.xrecoff,
3207                         record->xl_xact_prev.xlogid, record->xl_xact_prev.xrecoff,
3208                         record->xl_xid);
3209
3210         for (i = 0, bkpb = 0; i < XLR_MAX_BKP_BLOCKS; i++)
3211         {
3212                 if (!(record->xl_info & (XLR_SET_BKP_BLOCK(i))))
3213                         continue;
3214                 bkpb++;
3215         }
3216
3217         if (bkpb)
3218                 sprintf(buf + strlen(buf), "; bkpb %d", bkpb);
3219
3220         sprintf(buf + strlen(buf), ": %s",
3221                         RmgrTable[record->xl_rmid].rm_name);
3222 }
3223
3224
3225 /*
3226  * GUC support routines
3227  */
3228
3229 bool
3230 check_xlog_sync_method(const char *method)
3231 {
3232         if (strcasecmp(method, "fsync") == 0)
3233                 return true;
3234 #ifdef HAVE_FDATASYNC
3235         if (strcasecmp(method, "fdatasync") == 0)
3236                 return true;
3237 #endif
3238 #ifdef OPEN_SYNC_FLAG
3239         if (strcasecmp(method, "open_sync") == 0)
3240                 return true;
3241 #endif
3242 #ifdef OPEN_DATASYNC_FLAG
3243         if (strcasecmp(method, "open_datasync") == 0)
3244                 return true;
3245 #endif
3246         return false;
3247 }
3248
3249 void
3250 assign_xlog_sync_method(const char *method)
3251 {
3252         int                     new_sync_method;
3253         int                     new_sync_bit;
3254
3255         if (strcasecmp(method, "fsync") == 0)
3256         {
3257                 new_sync_method = SYNC_METHOD_FSYNC;
3258                 new_sync_bit = 0;
3259         }
3260 #ifdef HAVE_FDATASYNC
3261         else if (strcasecmp(method, "fdatasync") == 0)
3262         {
3263                 new_sync_method = SYNC_METHOD_FDATASYNC;
3264                 new_sync_bit = 0;
3265         }
3266 #endif
3267 #ifdef OPEN_SYNC_FLAG
3268         else if (strcasecmp(method, "open_sync") == 0)
3269         {
3270                 new_sync_method = SYNC_METHOD_OPEN;
3271                 new_sync_bit = OPEN_SYNC_FLAG;
3272         }
3273 #endif
3274 #ifdef OPEN_DATASYNC_FLAG
3275         else if (strcasecmp(method, "open_datasync") == 0)
3276         {
3277                 new_sync_method = SYNC_METHOD_OPEN;
3278                 new_sync_bit = OPEN_DATASYNC_FLAG;
3279         }
3280 #endif
3281         else
3282         {
3283                 /* Can't get here unless guc.c screwed up */
3284                 elog(ERROR, "bogus wal_sync_method %s", method);
3285                 new_sync_method = 0;    /* keep compiler quiet */
3286                 new_sync_bit = 0;
3287         }
3288
3289         if (sync_method != new_sync_method || open_sync_bit != new_sync_bit)
3290         {
3291                 /*
3292                  * To ensure that no blocks escape unsynced, force an fsync on the
3293                  * currently open log segment (if any).  Also, if the open flag is
3294                  * changing, close the log file so it will be reopened (with new
3295                  * flag bit) at next use.
3296                  */
3297                 if (openLogFile >= 0)
3298                 {
3299                         if (pg_fsync(openLogFile) != 0)
3300                                 elog(PANIC, "fsync of log file %u, segment %u failed: %m",
3301                                          openLogId, openLogSeg);
3302                         if (open_sync_bit != new_sync_bit)
3303                         {
3304                                 if (close(openLogFile) != 0)
3305                                         elog(PANIC, "close of log file %u, segment %u failed: %m",
3306                                                  openLogId, openLogSeg);
3307                                 openLogFile = -1;
3308                         }
3309                 }
3310                 sync_method = new_sync_method;
3311                 open_sync_bit = new_sync_bit;
3312         }
3313 }
3314
3315
3316 /*
3317  * Issue appropriate kind of fsync (if any) on the current XLOG output file
3318  */
3319 static void
3320 issue_xlog_fsync(void)
3321 {
3322         switch (sync_method)
3323         {
3324                 case SYNC_METHOD_FSYNC:
3325                         if (pg_fsync(openLogFile) != 0)
3326                                 elog(PANIC, "fsync of log file %u, segment %u failed: %m",
3327                                          openLogId, openLogSeg);
3328                         break;
3329 #ifdef HAVE_FDATASYNC
3330                 case SYNC_METHOD_FDATASYNC:
3331                         if (pg_fdatasync(openLogFile) != 0)
3332                                 elog(PANIC, "fdatasync of log file %u, segment %u failed: %m",
3333                                          openLogId, openLogSeg);
3334                         break;
3335 #endif
3336                 case SYNC_METHOD_OPEN:
3337                         /* write synced it already */
3338                         break;
3339                 default:
3340                         elog(PANIC, "bogus wal_sync_method %d", sync_method);
3341                         break;
3342         }
3343 }