1 /*-------------------------------------------------------------------------
6 * Copyright (c) 1994, Regents of the University of California
8 * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.6 1999/10/24 20:42:27 tgl Exp $
10 *-------------------------------------------------------------------------
20 #include "access/xlog.h"
21 #include "access/xact.h"
22 #include "catalog/catversion.h"
23 #include "storage/sinval.h"
24 #include "storage/proc.h"
25 #include "storage/spin.h"
26 #include "storage/s_lock.h"
28 void UpdateControlFile(void);
29 int XLOGShmemSize(void);
30 void XLOGShmemInit(void);
31 void BootStrapXLOG(void);
32 void StartupXLOG(void);
33 void ShutdownXLOG(void);
34 void CreateCheckPoint(bool shutdown);
36 char XLogDir[MAXPGPATH+1];
37 char ControlFilePath[MAXPGPATH+1];
38 uint32 XLOGbuffers = 0;
39 XLogRecPtr MyLastRecPtr = {0, 0};
40 bool StopIfError = false;
42 SPINLOCK ControlFileLockId;
43 SPINLOCK XidGenLockId;
45 extern bool ReleaseDataFile(void);
47 extern VariableCache ShmemVariableCache;
49 #define MinXLOGbuffers 4
51 typedef struct XLgwrRqst
53 XLogRecPtr Write; /* byte (1-based) to write out */
54 XLogRecPtr Flush; /* byte (1-based) to flush */
57 typedef struct XLgwrResult
59 XLogRecPtr Write; /* bytes written out */
60 XLogRecPtr Flush; /* bytes flushed */
63 typedef struct XLogCtlInsert
65 XLgwrResult LgwrResult;
66 XLogRecPtr PrevRecord;
67 uint16 curridx; /* current block index in cache */
68 XLogPageHeader currpage;
72 typedef struct XLogCtlWrite
74 XLgwrResult LgwrResult;
75 uint16 curridx; /* index of next block to write */
78 typedef struct XLogCtlData
82 XLgwrResult LgwrResult;
85 XLogRecPtr *xlblocks; /* 1st byte ptr-s + BLCKSZ */
88 #ifdef HAS_TEST_AND_SET
95 static XLogCtlData *XLogCtl = NULL;
106 typedef struct ControlFileData
108 uint32 logId; /* current log file id */
109 uint32 logSeg; /* current log file segment (1-based) */
110 XLogRecPtr checkPoint; /* last check point record ptr */
111 time_t time; /* time stamp of last modification */
115 * this data is used to make sure that configuration of this DB
116 * is compatible with the current backend
118 uint32 blcksz; /* block size for this DB */
119 uint32 relseg_size; /* blocks per segment of large relation */
120 uint32 catalog_version_no; /* internal version number */
123 * MORE DATA FOLLOWS AT THE END OF THIS STRUCTURE
124 * - locations of data dirs
128 static ControlFileData *ControlFile = NULL;
130 typedef struct CheckPoint
132 XLogRecPtr redo; /* next RecPtr available when we */
133 /* began to create CheckPoint */
134 /* (i.e. REDO start point) */
135 XLogRecPtr undo; /* first record of oldest in-progress */
136 /* transaction when we started */
137 /* (i.e. UNDO end point) */
138 TransactionId nextXid;
143 * We break each log file in 16Mb segments
145 #define XLogSegSize (16*1024*1024)
146 #define XLogLastSeg (0xffffffff / XLogSegSize)
147 #define XLogFileSize (XLogLastSeg * XLogSegSize)
149 #define XLogFileName(path, log, seg) \
150 sprintf(path, "%.*s%c%08X%08X", \
151 MAXPGPATH, XLogDir, SEP_CHAR, log, seg)
153 #define PrevBufIdx(curridx) \
154 ((curridx == 0) ? XLogCtl->XLogCacheBlck : (curridx - 1))
156 #define NextBufIdx(curridx) \
157 ((curridx == XLogCtl->XLogCacheBlck) ? 0 : (curridx + 1))
159 #define XLByteLT(left, right) \
160 (right.xlogid > left.xlogid || \
161 (right.xlogid == left.xlogid && right.xrecoff > left.xrecoff))
163 #define XLByteLE(left, right) \
164 (right.xlogid > left.xlogid || \
165 (right.xlogid == left.xlogid && right.xrecoff >= left.xrecoff))
167 #define XLByteEQ(left, right) \
168 (right.xlogid == left.xlogid && right.xrecoff == left.xrecoff)
170 #define InitXLBuffer(curridx) (\
171 XLogCtl->xlblocks[curridx].xrecoff = \
172 (XLogCtl->xlblocks[Insert->curridx].xrecoff == XLogFileSize) ? \
173 BLCKSZ : (XLogCtl->xlblocks[Insert->curridx].xrecoff + BLCKSZ), \
174 XLogCtl->xlblocks[curridx].xlogid = \
175 (XLogCtl->xlblocks[Insert->curridx].xrecoff == XLogFileSize) ? \
176 (XLogCtl->xlblocks[Insert->curridx].xlogid + 1) : \
177 XLogCtl->xlblocks[Insert->curridx].xlogid, \
178 Insert->curridx = curridx, \
179 Insert->currpage = (XLogPageHeader) (XLogCtl->pages + curridx * BLCKSZ), \
181 ((char*) Insert->currpage) + SizeOfXLogPHD, \
182 Insert->currpage->xlp_magic = XLOG_PAGE_MAGIC, \
183 Insert->currpage->xlp_info = 0 \
186 #define XRecOffIsValid(xrecoff) \
187 (xrecoff % BLCKSZ >= SizeOfXLogPHD && \
188 (BLCKSZ - xrecoff % BLCKSZ) >= SizeOfXLogRecord)
190 static void GetFreeXLBuffer(void);
191 static void XLogWrite(char *buffer);
192 static int XLogFileInit(uint32 log, uint32 seg);
193 static int XLogFileOpen(uint32 log, uint32 seg, bool econt);
194 static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, char *buffer);
195 static char *str_time(time_t tnow);
197 static XLgwrResult LgwrResult = {{0, 0}, {0, 0}};
198 static XLgwrRqst LgwrRqst = {{0, 0}, {0, 0}};
200 static int logFile = -1;
201 static uint32 logId = 0;
202 static uint32 logSeg = 0;
203 static uint32 logOff = 0;
205 static XLogRecPtr ReadRecPtr;
206 static XLogRecPtr EndRecPtr;
207 static int readFile = -1;
208 static uint32 readId = 0;
209 static uint32 readSeg = 0;
210 static uint32 readOff = 0;
211 static char readBuf[BLCKSZ];
212 static XLogRecord *nextRecord = NULL;
215 XLogInsert(RmgrId rmid, char *hdr, uint32 hdrlen, char *buf, uint32 buflen)
217 XLogCtlInsert *Insert = &XLogCtl->Insert;
219 XLogSubRecord *subrecord;
221 uint32 len = hdrlen + buflen,
225 bool updrqst = false;
227 if (len == 0 || len > MAXLOGRECSZ)
228 elog(STOP, "XLogInsert: invalid record len %u", len);
230 /* obtain xlog insert lock */
231 if (TAS(&(XLogCtl->insert_lck))) /* busy */
238 /* try to read LgwrResult while waiting for insert lock */
239 if (!TAS(&(XLogCtl->info_lck)))
241 LgwrRqst = XLogCtl->LgwrRqst;
242 LgwrResult = XLogCtl->LgwrResult;
243 S_UNLOCK(&(XLogCtl->info_lck));
245 * If cache is half filled then try to acquire lgwr lock
246 * and do LGWR work, but only once.
249 (LgwrRqst.Write.xlogid != LgwrResult.Write.xlogid ||
250 (LgwrRqst.Write.xrecoff - LgwrResult.Write.xrecoff >=
251 XLogCtl->XLogCacheByte / 2)))
253 if (!TAS(&(XLogCtl->lgwr_lck)))
255 LgwrResult = XLogCtl->Write.LgwrResult;
256 if (!TAS(&(XLogCtl->info_lck)))
258 LgwrRqst = XLogCtl->LgwrRqst;
259 S_UNLOCK(&(XLogCtl->info_lck));
261 if (XLByteLT(LgwrResult.Write, LgwrRqst.Write))
266 S_UNLOCK(&(XLogCtl->lgwr_lck));
271 if (!TAS(&(XLogCtl->insert_lck)))
276 freespace = ((char*) Insert->currpage) + BLCKSZ - Insert->currpos;
277 if (freespace < SizeOfXLogRecord)
279 curridx = NextBufIdx(Insert->curridx);
280 if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write))
281 InitXLBuffer(curridx);
284 freespace = BLCKSZ - SizeOfXLogPHD;
287 curridx = Insert->curridx;
289 freespace -= SizeOfXLogRecord;
290 record = (XLogRecord*) Insert->currpos;
291 record->xl_prev = Insert->PrevRecord;
292 if (rmid != RM_XLOG_ID)
293 record->xl_xact_prev = MyLastRecPtr;
296 record->xl_xact_prev.xlogid = 0;
297 record->xl_xact_prev.xrecoff = 0;
299 record->xl_xid = GetCurrentTransactionId();
300 record->xl_len = (len > freespace) ? freespace : len;
301 record->xl_info = (len > freespace) ? XLR_TO_BE_CONTINUED : 0;
302 record->xl_rmid = rmid;
303 RecPtr.xlogid = XLogCtl->xlblocks[curridx].xlogid;
305 XLogCtl->xlblocks[curridx].xrecoff - BLCKSZ +
306 Insert->currpos - ((char*) Insert->currpage);
307 if (MyLastRecPtr.xrecoff == 0 && rmid != RM_XLOG_ID)
309 SpinAcquire(SInvalLock);
310 MyProc->logRec = RecPtr;
311 SpinRelease(SInvalLock);
313 MyLastRecPtr = RecPtr;
314 RecPtr.xrecoff += record->xl_len;
315 Insert->currpos += SizeOfXLogRecord;
318 wlen = (hdrlen > freespace) ? freespace : hdrlen;
319 memcpy(Insert->currpos, hdr, wlen);
323 Insert->currpos += wlen;
324 if (buflen > 0 && freespace > 0)
326 wlen = (buflen > freespace) ? freespace : buflen;
327 memcpy(Insert->currpos, buf, wlen);
331 Insert->currpos += wlen;
333 Insert->currpos = ((char*)Insert->currpage) +
334 DOUBLEALIGN(Insert->currpos - ((char*)Insert->currpage));
335 len = hdrlen + buflen;
341 curridx = NextBufIdx(curridx);
342 if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write))
344 InitXLBuffer(curridx);
352 freespace = BLCKSZ - SizeOfXLogPHD - SizeOfXLogSubRecord;
353 Insert->currpage->xlp_info |= XLP_FIRST_IS_SUBRECORD;
354 subrecord = (XLogSubRecord*) Insert->currpos;
355 Insert->currpos += SizeOfXLogSubRecord;
356 if (hdrlen > freespace)
358 subrecord->xl_len = freespace;
359 subrecord->xl_info = XLR_TO_BE_CONTINUED;
360 memcpy(Insert->currpos, hdr, freespace);
367 subrecord->xl_len = hdrlen;
368 memcpy(Insert->currpos, hdr, hdrlen);
369 Insert->currpos += hdrlen;
374 subrecord->xl_len = 0;
375 if (buflen > freespace)
377 subrecord->xl_len += freespace;
378 subrecord->xl_info = XLR_TO_BE_CONTINUED;
379 memcpy(Insert->currpos, buf, freespace);
386 subrecord->xl_len += buflen;
387 memcpy(Insert->currpos, buf, buflen);
388 Insert->currpos += buflen;
390 subrecord->xl_info = 0;
391 RecPtr.xlogid = XLogCtl->xlblocks[curridx].xlogid;
392 RecPtr.xrecoff = XLogCtl->xlblocks[curridx].xrecoff -
393 BLCKSZ + SizeOfXLogPHD + subrecord->xl_len;
394 Insert->currpos = ((char*)Insert->currpage) +
395 DOUBLEALIGN(Insert->currpos - ((char*)Insert->currpage));
397 freespace = ((char*) Insert->currpage) + BLCKSZ - Insert->currpos;
399 * All done! Update global LgwrRqst if some block was filled up.
401 if (freespace < SizeOfXLogRecord)
402 updrqst = true; /* curridx is filled and available for writing out */
404 curridx = PrevBufIdx(curridx);
405 LgwrRqst.Write = XLogCtl->xlblocks[curridx];
407 S_UNLOCK(&(XLogCtl->insert_lck));
415 if (!TAS(&(XLogCtl->info_lck)))
417 if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrRqst.Write))
418 XLogCtl->LgwrRqst.Write = LgwrRqst.Write;
419 S_UNLOCK(&(XLogCtl->info_lck));
430 XLogFlush(XLogRecPtr record)
432 XLogRecPtr WriteRqst;
436 bool force_lgwr = false;
438 if (XLByteLE(record, LgwrResult.Flush))
440 WriteRqst = LgwrRqst.Write;
443 /* try to read LgwrResult */
444 if (!TAS(&(XLogCtl->info_lck)))
446 LgwrResult = XLogCtl->LgwrResult;
447 if (XLByteLE(record, LgwrResult.Flush))
449 S_UNLOCK(&(XLogCtl->info_lck));
452 if (XLByteLT(XLogCtl->LgwrRqst.Flush, record))
453 XLogCtl->LgwrRqst.Flush = record;
454 if (XLByteLT(WriteRqst, XLogCtl->LgwrRqst.Write))
456 WriteRqst = XLogCtl->LgwrRqst.Write;
459 S_UNLOCK(&(XLogCtl->info_lck));
461 /* if something was added to log cache then try to flush this too */
462 if (!TAS(&(XLogCtl->insert_lck)))
464 XLogCtlInsert *Insert = &XLogCtl->Insert;
466 ((char*) Insert->currpage) + BLCKSZ - Insert->currpos;
468 if (freespace < SizeOfXLogRecord) /* buffer is full */
471 LgwrRqst.Write = WriteRqst = XLogCtl->xlblocks[Insert->curridx];
476 memcpy(usebuf, Insert->currpage, BLCKSZ - freespace);
477 memset(usebuf + BLCKSZ - freespace, 0, freespace);
478 WriteRqst = XLogCtl->xlblocks[Insert->curridx];
479 WriteRqst.xrecoff = WriteRqst.xrecoff - BLCKSZ +
480 Insert->currpos - ((char*) Insert->currpage);
482 S_UNLOCK(&(XLogCtl->insert_lck));
485 if (force_lgwr || WriteRqst.xlogid > record.xlogid ||
486 (WriteRqst.xlogid == record.xlogid &&
487 WriteRqst.xrecoff >= record.xrecoff + BLCKSZ))
489 if (!TAS(&(XLogCtl->lgwr_lck)))
491 LgwrResult = XLogCtl->Write.LgwrResult;
492 if (XLByteLE(record, LgwrResult.Flush))
494 S_UNLOCK(&(XLogCtl->lgwr_lck));
497 if (XLByteLT(LgwrResult.Write, WriteRqst))
499 LgwrRqst.Flush = LgwrRqst.Write = WriteRqst;
501 S_UNLOCK(&(XLogCtl->lgwr_lck));
502 if (XLByteLT(LgwrResult.Flush, record))
503 elog(STOP, "XLogFlush: request is not satisfyed");
512 if (logFile >= 0 && (LgwrResult.Write.xlogid != logId ||
513 (LgwrResult.Write.xrecoff - 1) / XLogSegSize != logSeg))
515 if (close(logFile) != 0)
516 elog(STOP, "Close(logfile %u seg %u) failed: %d",
517 logId, logSeg, errno);
523 logId = LgwrResult.Write.xlogid;
524 logSeg = (LgwrResult.Write.xrecoff - 1) / XLogSegSize;
526 logFile = XLogFileOpen(logId, logSeg, false);
529 if (fsync(logFile) != 0)
530 elog(STOP, "Fsync(logfile %u seg %u) failed: %d",
531 logId, logSeg, errno);
532 LgwrResult.Flush = LgwrResult.Write;
536 if (!TAS(&(XLogCtl->info_lck)))
538 XLogCtl->LgwrResult = LgwrResult;
539 if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrResult.Write))
540 XLogCtl->LgwrRqst.Write = LgwrResult.Write;
541 S_UNLOCK(&(XLogCtl->info_lck));
546 XLogCtl->Write.LgwrResult = LgwrResult;
548 S_UNLOCK(&(XLogCtl->lgwr_lck));
556 XLogCtlInsert *Insert = &XLogCtl->Insert;
557 XLogCtlWrite *Write = &XLogCtl->Write;
558 uint16 curridx = NextBufIdx(Insert->curridx);
560 LgwrRqst.Write = XLogCtl->xlblocks[Insert->curridx];
563 if (!TAS(&(XLogCtl->info_lck)))
565 LgwrResult = XLogCtl->LgwrResult;
566 XLogCtl->LgwrRqst.Write = LgwrRqst.Write;
567 S_UNLOCK(&(XLogCtl->info_lck));
568 if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write))
570 Insert->LgwrResult = LgwrResult;
571 InitXLBuffer(curridx);
576 * LgwrResult lock is busy or un-updated. Try to acquire lgwr lock
577 * and write full blocks.
579 if (!TAS(&(XLogCtl->lgwr_lck)))
581 LgwrResult = Write->LgwrResult;
582 if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write))
584 S_UNLOCK(&(XLogCtl->lgwr_lck));
585 Insert->LgwrResult = LgwrResult;
586 InitXLBuffer(curridx);
590 * Have to write buffers while holding insert lock -
594 S_UNLOCK(&(XLogCtl->lgwr_lck));
595 Insert->LgwrResult = LgwrResult;
596 InitXLBuffer(curridx);
605 XLogWrite(char *buffer)
607 XLogCtlWrite *Write = &XLogCtl->Write;
612 for ( ; XLByteLT(LgwrResult.Write, LgwrRqst.Write); )
614 LgwrResult.Write = XLogCtl->xlblocks[Write->curridx];
615 if (LgwrResult.Write.xlogid != logId ||
616 (LgwrResult.Write.xrecoff - 1) / XLogSegSize != logSeg)
620 if (fsync(logFile) != 0)
621 elog(STOP, "Fsync(logfile %u seg %u) failed: %d",
622 logId, logSeg, errno);
623 if (LgwrResult.Write.xlogid != logId)
624 LgwrResult.Flush.xrecoff = XLogFileSize;
626 LgwrResult.Flush.xrecoff = LgwrResult.Write.xrecoff - BLCKSZ;
627 LgwrResult.Flush.xlogid = logId;
628 if (!TAS(&(XLogCtl->info_lck)))
630 XLogCtl->LgwrResult.Flush = LgwrResult.Flush;
631 XLogCtl->LgwrResult.Write = LgwrResult.Flush;
632 if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrResult.Flush))
633 XLogCtl->LgwrRqst.Write = LgwrResult.Flush;
634 if (XLByteLT(XLogCtl->LgwrRqst.Flush, LgwrResult.Flush))
635 XLogCtl->LgwrRqst.Flush = LgwrResult.Flush;
636 S_UNLOCK(&(XLogCtl->info_lck));
641 if (close(logFile) != 0)
642 elog(STOP, "Close(logfile %u seg %u) failed: %d",
643 logId, logSeg, errno);
646 logId = LgwrResult.Write.xlogid;
647 logSeg = (LgwrResult.Write.xrecoff - 1) / XLogSegSize;
649 logFile = XLogFileInit(logId, logSeg);
650 SpinAcquire(ControlFileLockId);
651 ControlFile->logId = logId;
652 ControlFile->logSeg = logSeg + 1;
653 ControlFile->time = time(NULL);
655 SpinRelease(ControlFileLockId);
660 logId = LgwrResult.Write.xlogid;
661 logSeg = (LgwrResult.Write.xrecoff - 1) / XLogSegSize;
663 logFile = XLogFileOpen(logId, logSeg, false);
666 if (logOff != (LgwrResult.Write.xrecoff - BLCKSZ) % XLogSegSize)
668 logOff = (LgwrResult.Write.xrecoff - BLCKSZ) % XLogSegSize;
669 if (lseek(logFile, (off_t)logOff, SEEK_SET) < 0)
670 elog(STOP, "Lseek(logfile %u seg %u off %u) failed: %d",
671 logId, logSeg, logOff, errno);
674 if (buffer != NULL && XLByteLT(LgwrRqst.Write, LgwrResult.Write))
677 from = XLogCtl->pages + Write->curridx * BLCKSZ;
679 if (write(logFile, from, BLCKSZ) != BLCKSZ)
680 elog(STOP, "Write(logfile %u seg %u off %u) failed: %d",
681 logId, logSeg, logOff, errno);
687 Write->curridx = NextBufIdx(Write->curridx);
689 LgwrResult.Write = LgwrRqst.Write;
692 elog(STOP, "XLogWrite: nothing written");
694 if (XLByteLT(LgwrResult.Flush, LgwrRqst.Flush) &&
695 XLByteLE(LgwrRqst.Flush, LgwrResult.Write))
697 if (fsync(logFile) != 0)
698 elog(STOP, "Fsync(logfile %u seg %u) failed: %d",
699 logId, logSeg, errno);
700 LgwrResult.Flush = LgwrResult.Write;
705 if (!TAS(&(XLogCtl->info_lck)))
707 XLogCtl->LgwrResult = LgwrResult;
708 if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrResult.Write))
709 XLogCtl->LgwrRqst.Write = LgwrResult.Write;
710 S_UNLOCK(&(XLogCtl->info_lck));
715 Write->LgwrResult = LgwrResult;
719 XLogFileInit(uint32 log, uint32 seg)
721 char path[MAXPGPATH+1];
724 XLogFileName(path, log, seg);
728 fd = open(path, O_RDWR|O_CREAT|O_EXCL, S_IRUSR|S_IWUSR);
729 if (fd < 0 && (errno == EMFILE || errno == ENFILE))
732 if (!ReleaseDataFile())
733 elog(STOP, "Create(logfile %u seg %u) failed: %d (and no one data file can be closed)",
738 elog(STOP, "Init(logfile %u seg %u) failed: %d",
739 logId, logSeg, errno);
741 if (lseek(fd, XLogSegSize - 1, SEEK_SET) != (off_t) (XLogSegSize - 1))
742 elog(STOP, "Lseek(logfile %u seg %u) failed: %d",
743 logId, logSeg, errno);
745 if (write(fd, "", 1) != 1)
746 elog(STOP, "Init(logfile %u seg %u) failed: %d",
747 logId, logSeg, errno);
750 elog(STOP, "Fsync(logfile %u seg %u) failed: %d",
751 logId, logSeg, errno);
753 if (lseek(fd, 0, SEEK_SET) < 0)
754 elog(STOP, "Lseek(logfile %u seg %u off %u) failed: %d",
761 XLogFileOpen(uint32 log, uint32 seg, bool econt)
763 char path[MAXPGPATH+1];
766 XLogFileName(path, log, seg);
769 fd = open(path, O_RDWR);
770 if (fd < 0 && (errno == EMFILE || errno == ENFILE))
773 if (!ReleaseDataFile())
774 elog(STOP, "Open(logfile %u seg %u) failed: %d (and no one data file can be closed)",
780 if (econt && errno == ENOENT)
782 elog(LOG, "Open(logfile %u seg %u) failed: file doesn't exist",
786 elog(STOP, "Open(logfile %u seg %u) failed: %d",
787 logId, logSeg, errno);
794 ReadRecord(XLogRecPtr *RecPtr, char *buffer)
797 XLogRecPtr tmpRecPtr = EndRecPtr;
798 bool nextmode = (RecPtr == NULL);
799 int emode = (nextmode) ? LOG : STOP;
805 if (nextRecord != NULL)
810 if (tmpRecPtr.xrecoff % BLCKSZ != 0)
811 tmpRecPtr.xrecoff += (BLCKSZ - tmpRecPtr.xrecoff % BLCKSZ);
812 if (tmpRecPtr.xrecoff >= XLogFileSize)
814 (tmpRecPtr.xlogid)++;
815 tmpRecPtr.xrecoff = 0;
817 tmpRecPtr.xrecoff += SizeOfXLogPHD;
819 else if (!XRecOffIsValid(RecPtr->xrecoff))
820 elog(STOP, "ReadRecord: invalid record offset in (%u, %u)",
821 RecPtr->xlogid, RecPtr->xrecoff);
823 if (readFile >= 0 && (RecPtr->xlogid != readId ||
824 RecPtr->xrecoff / XLogSegSize != readSeg))
829 readId = RecPtr->xlogid;
830 readSeg = RecPtr->xrecoff / XLogSegSize;
834 readFile = XLogFileOpen(readId, readSeg, nextmode);
836 goto next_record_is_invalid;
839 if (noBlck || readOff != (RecPtr->xrecoff % XLogSegSize) / BLCKSZ)
841 readOff = (RecPtr->xrecoff % XLogSegSize) / BLCKSZ;
842 if (lseek(readFile, (off_t)(readOff * BLCKSZ), SEEK_SET) < 0)
843 elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d",
844 readId, readSeg, readOff, errno);
845 if (read(readFile, readBuf, BLCKSZ) != BLCKSZ)
846 elog(STOP, "ReadRecord: read(logfile %u seg %u off %u) failed: %d",
847 readId, readSeg, readOff, errno);
848 if (((XLogPageHeader)readBuf)->xlp_magic != XLOG_PAGE_MAGIC)
850 elog(emode, "ReadRecord: invalid magic number %u in logfile %u seg %u off %u",
851 ((XLogPageHeader)readBuf)->xlp_magic,
852 readId, readSeg, readOff);
853 goto next_record_is_invalid;
856 if ((((XLogPageHeader)readBuf)->xlp_info & XLP_FIRST_IS_SUBRECORD) &&
857 RecPtr->xrecoff % BLCKSZ == SizeOfXLogPHD)
859 elog(emode, "ReadRecord: subrecord is requested by (%u, %u)",
860 RecPtr->xlogid, RecPtr->xrecoff);
861 goto next_record_is_invalid;
863 record = (XLogRecord*)((char*) readBuf + RecPtr->xrecoff % BLCKSZ);
866 if (record->xl_len == 0 || record->xl_len >
867 (BLCKSZ - RecPtr->xrecoff % BLCKSZ - SizeOfXLogRecord))
869 elog(emode, "ReadRecord: invalid record len %u in (%u, %u)",
870 record->xl_len, RecPtr->xlogid, RecPtr->xrecoff);
871 goto next_record_is_invalid;
873 if (record->xl_rmid > RM_MAX_ID)
875 elog(emode, "ReadRecord: invalid resource managed id %u in (%u, %u)",
876 record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff);
877 goto next_record_is_invalid;
880 if (record->xl_info & XLR_TO_BE_CONTINUED)
882 XLogSubRecord *subrecord;
883 uint32 len = record->xl_len;
885 if (record->xl_len + RecPtr->xrecoff % BLCKSZ + SizeOfXLogRecord != BLCKSZ)
887 elog(emode, "ReadRecord: invalid fragmented record len %u in (%u, %u)",
888 record->xl_len, RecPtr->xlogid, RecPtr->xrecoff);
889 goto next_record_is_invalid;
891 memcpy(buffer, record, record->xl_len + SizeOfXLogRecord);
892 record = (XLogRecord*) buffer;
893 buffer += record->xl_len + SizeOfXLogRecord;
897 if (readOff == XLogSegSize / BLCKSZ)
900 if (readSeg == XLogLastSeg)
907 readFile = XLogFileOpen(readId, readSeg, nextmode);
909 goto next_record_is_invalid;
911 if (read(readFile, readBuf, BLCKSZ) != BLCKSZ)
912 elog(STOP, "ReadRecord: read(logfile %u seg %u off %u) failed: %d",
913 readId, readSeg, readOff, errno);
914 if (((XLogPageHeader)readBuf)->xlp_magic != XLOG_PAGE_MAGIC)
916 elog(emode, "ReadRecord: invalid magic number %u in logfile %u seg %u off %u",
917 ((XLogPageHeader)readBuf)->xlp_magic,
918 readId, readSeg, readOff);
919 goto next_record_is_invalid;
921 if (!(((XLogPageHeader)readBuf)->xlp_info & XLP_FIRST_IS_SUBRECORD))
923 elog(emode, "ReadRecord: there is no subrecord flag in logfile %u seg %u off %u",
924 readId, readSeg, readOff);
925 goto next_record_is_invalid;
927 subrecord = (XLogSubRecord*)((char*) readBuf + SizeOfXLogPHD);
928 if (subrecord->xl_len == 0 || subrecord->xl_len >
929 (BLCKSZ - SizeOfXLogPHD - SizeOfXLogSubRecord))
931 elog(emode, "ReadRecord: invalid subrecord len %u in logfile %u seg %u off %u",
932 subrecord->xl_len, readId, readSeg, readOff);
933 goto next_record_is_invalid;
935 len += subrecord->xl_len;
936 if (len > MAXLOGRECSZ)
938 elog(emode, "ReadRecord: too long record len %u in (%u, %u)",
939 len, RecPtr->xlogid, RecPtr->xrecoff);
940 goto next_record_is_invalid;
942 memcpy(buffer, (char*)subrecord + SizeOfXLogSubRecord, subrecord->xl_len);
943 buffer += subrecord->xl_len;
944 if (subrecord->xl_info & XLR_TO_BE_CONTINUED)
946 if (subrecord->xl_len +
947 SizeOfXLogPHD + SizeOfXLogSubRecord != BLCKSZ)
949 elog(emode, "ReadRecord: invalid fragmented subrecord len %u in logfile %u seg %u off %u",
950 subrecord->xl_len, readId, readSeg, readOff);
951 goto next_record_is_invalid;
957 if (BLCKSZ - SizeOfXLogRecord >=
958 subrecord->xl_len + SizeOfXLogPHD + SizeOfXLogSubRecord)
960 nextRecord = (XLogRecord*)
961 ((char*)subrecord + subrecord->xl_len + SizeOfXLogSubRecord);
963 EndRecPtr.xlogid = readId;
964 EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff * BLCKSZ +
965 SizeOfXLogPHD + SizeOfXLogSubRecord + subrecord->xl_len;
966 ReadRecPtr = *RecPtr;
969 if (BLCKSZ - SizeOfXLogRecord >=
970 record->xl_len + RecPtr->xrecoff % BLCKSZ + SizeOfXLogRecord)
972 nextRecord = (XLogRecord*)((char*)record + record->xl_len + SizeOfXLogRecord);
974 EndRecPtr.xlogid = RecPtr->xlogid;
975 EndRecPtr.xrecoff = RecPtr->xrecoff + record->xl_len + SizeOfXLogRecord;
976 ReadRecPtr = *RecPtr;
980 next_record_is_invalid:;
984 memset(buffer, 0, SizeOfXLogRecord);
985 record = (XLogRecord*) buffer;
987 * If we assumed that next record began on the same page where
988 * previous one ended - zero end of page.
990 if (XLByteEQ(tmpRecPtr, EndRecPtr))
992 Assert (EndRecPtr.xrecoff % BLCKSZ > (SizeOfXLogPHD + SizeOfXLogSubRecord) &&
993 BLCKSZ - EndRecPtr.xrecoff % BLCKSZ >= SizeOfXLogRecord);
994 readId = EndRecPtr.xlogid;
995 readSeg = EndRecPtr.xrecoff / XLogSegSize;
996 readOff = (EndRecPtr.xrecoff % XLogSegSize) / BLCKSZ;
997 elog(LOG, "Formating logfile %u seg %u block %u at offset %u",
998 readId, readSeg, readOff, EndRecPtr.xrecoff % BLCKSZ);
999 readFile = XLogFileOpen(readId, readSeg, false);
1000 if (lseek(readFile, (off_t)(readOff * BLCKSZ), SEEK_SET) < 0)
1001 elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d",
1002 readId, readSeg, readOff, errno);
1003 if (read(readFile, readBuf, BLCKSZ) != BLCKSZ)
1004 elog(STOP, "ReadRecord: read(logfile %u seg %u off %u) failed: %d",
1005 readId, readSeg, readOff, errno);
1006 memset(readBuf + EndRecPtr.xrecoff % BLCKSZ, 0,
1007 BLCKSZ - EndRecPtr.xrecoff % BLCKSZ);
1008 if (lseek(readFile, (off_t)(readOff * BLCKSZ), SEEK_SET) < 0)
1009 elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d",
1010 readId, readSeg, readOff, errno);
1011 if (write(readFile, readBuf, BLCKSZ) != BLCKSZ)
1012 elog(STOP, "ReadRecord: write(logfile %u seg %u off %u) failed: %d",
1013 readId, readSeg, readOff, errno);
1018 Assert (EndRecPtr.xrecoff % BLCKSZ == 0 ||
1019 BLCKSZ - EndRecPtr.xrecoff % BLCKSZ < SizeOfXLogRecord);
1020 readId = tmpRecPtr.xlogid;
1021 readSeg = tmpRecPtr.xrecoff / XLogSegSize;
1022 readOff = (tmpRecPtr.xrecoff % XLogSegSize) / BLCKSZ;
1023 Assert(readOff > 0);
1027 if (!XLByteEQ(tmpRecPtr, EndRecPtr))
1028 elog(LOG, "Formating logfile %u seg %u block %u at offset 0",
1029 readId, readSeg, readOff);
1031 memset(readBuf, 0, BLCKSZ);
1032 readFile = XLogFileOpen(readId, readSeg, false);
1033 if (lseek(readFile, (off_t)readOff, SEEK_SET) < 0)
1034 elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d",
1035 readId, readSeg, readOff, errno);
1036 while (readOff < XLogSegSize)
1038 if (write(readFile, readBuf, BLCKSZ) != BLCKSZ)
1039 elog(STOP, "ReadRecord: write(logfile %u seg %u off %u) failed: %d",
1040 readId, readSeg, readOff, errno);
1046 if (fsync(readFile) < 0)
1047 elog(STOP, "ReadRecord: fsync(logfile %u seg %u) failed: %d",
1048 readId, readSeg, errno);
1053 readId = EndRecPtr.xlogid;
1054 readSeg = (EndRecPtr.xrecoff - 1) / XLogSegSize + 1;
1055 elog(LOG, "The last logId/logSeg is (%u, %u)", readId, readSeg - 1);
1056 if (ControlFile->logId != readId || ControlFile->logSeg != readSeg)
1058 elog(LOG, "Set logId/logSeg in control file");
1059 ControlFile->logId = readId;
1060 ControlFile->logSeg = readSeg;
1061 ControlFile->time = time(NULL);
1062 UpdateControlFile();
1064 if (readSeg == XLogLastSeg)
1070 char path[MAXPGPATH+1];
1072 XLogFileName(path, readId, readSeg);
1085 fd = open(ControlFilePath, O_RDWR);
1086 if (fd < 0 && (errno == EMFILE || errno == ENFILE))
1089 if (!ReleaseDataFile())
1090 elog(STOP, "Open(cntlfile) failed: %d (and no one data file can be closed)",
1095 elog(STOP, "Open(cntlfile) failed: %d", errno);
1097 if (write(fd, ControlFile, BLCKSZ) != BLCKSZ)
1098 elog(STOP, "Write(cntlfile) failed: %d", errno);
1101 elog(STOP, "Fsync(cntlfile) failed: %d", errno);
1111 if (XLOGbuffers < MinXLOGbuffers)
1112 XLOGbuffers = MinXLOGbuffers;
1114 return(sizeof(XLogCtlData) + BLCKSZ * XLOGbuffers +
1115 sizeof(XLogRecPtr) * XLOGbuffers + BLCKSZ);
1123 if (XLOGbuffers < MinXLOGbuffers)
1124 XLOGbuffers = MinXLOGbuffers;
1126 ControlFile = (ControlFileData*)
1127 ShmemInitStruct("Control File", BLCKSZ, &found);
1129 XLogCtl = (XLogCtlData*)
1130 ShmemInitStruct("XLOG Ctl", sizeof(XLogCtlData) + BLCKSZ * XLOGbuffers +
1131 sizeof(XLogRecPtr) * XLOGbuffers, &found);
1136 * This func must be called ONCE on system install
1142 char buffer[BLCKSZ];
1143 XLogPageHeader page = (XLogPageHeader)buffer;
1144 CheckPoint checkPoint;
1147 fd = open(ControlFilePath, O_RDWR|O_CREAT|O_EXCL, S_IRUSR|S_IWUSR);
1149 elog(STOP, "BootStrapXLOG failed to create control file (%s): %d",
1150 ControlFilePath, errno);
1152 logFile = XLogFileInit(0, 0);
1154 checkPoint.redo.xlogid = 0;
1155 checkPoint.redo.xrecoff = SizeOfXLogPHD;
1156 checkPoint.undo = checkPoint.redo;
1157 checkPoint.nextXid = FirstTransactionId;
1158 checkPoint.nextOid = BootstrapObjectIdData;
1160 memset(buffer, 0, BLCKSZ);
1161 page->xlp_magic = XLOG_PAGE_MAGIC;
1163 record = (XLogRecord*) ((char*)page + SizeOfXLogPHD);
1164 record->xl_prev.xlogid = 0; record->xl_prev.xrecoff = 0;
1165 record->xl_xact_prev = record->xl_prev;
1166 record->xl_xid = InvalidTransactionId;
1167 record->xl_len = sizeof(checkPoint);
1168 record->xl_info = 0;
1169 record->xl_rmid = RM_XLOG_ID;
1170 memcpy((char*)record + SizeOfXLogRecord, &checkPoint, sizeof(checkPoint));
1172 if (write(logFile, buffer, BLCKSZ) != BLCKSZ)
1173 elog(STOP, "BootStrapXLOG failed to write logfile: %d", errno);
1175 if (fsync(logFile) != 0)
1176 elog(STOP, "BootStrapXLOG failed to fsync logfile: %d", errno);
1181 memset(buffer, 0, BLCKSZ);
1182 ControlFile = (ControlFileData*) buffer;
1183 ControlFile->logId = 0;
1184 ControlFile->logSeg = 1;
1185 ControlFile->checkPoint = checkPoint.redo;
1186 ControlFile->time = time(NULL);
1187 ControlFile->state = DB_SHUTDOWNED;
1188 ControlFile->blcksz = BLCKSZ;
1189 ControlFile->relseg_size = RELSEG_SIZE;
1190 ControlFile->catalog_version_no = CATALOG_VERSION_NO;
1192 if (write(fd, buffer, BLCKSZ) != BLCKSZ)
1193 elog(STOP, "BootStrapXLOG failed to write control file: %d", errno);
1196 elog(STOP, "BootStrapXLOG failed to fsync control file: %d", errno);
1202 str_time(time_t tnow)
1204 char *result = ctime(&tnow);
1205 char *p = strchr(result, '\n');
1214 * This func must be called ONCE on system startup
1219 XLogCtlInsert *Insert;
1220 CheckPoint checkPoint;
1224 char buffer[MAXLOGRECSZ+SizeOfXLogRecord];
1227 bool sie_saved = false;
1229 elog(LOG, "Data Base System is starting up at %s", str_time(time(NULL)));
1231 XLogCtl->xlblocks = (XLogRecPtr*) (((char *)XLogCtl) + sizeof(XLogCtlData));
1232 XLogCtl->pages = ((char *)XLogCtl->xlblocks + sizeof(XLogRecPtr) * XLOGbuffers);
1233 XLogCtl->XLogCacheByte = BLCKSZ * XLOGbuffers;
1234 XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
1235 memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
1236 XLogCtl->LgwrRqst = LgwrRqst;
1237 XLogCtl->LgwrResult = LgwrResult;
1238 XLogCtl->Insert.LgwrResult = LgwrResult;
1239 XLogCtl->Insert.curridx = 0;
1240 XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
1241 XLogCtl->Write.LgwrResult = LgwrResult;
1242 XLogCtl->Write.curridx = 0;
1243 S_INIT_LOCK(&(XLogCtl->insert_lck));
1244 S_INIT_LOCK(&(XLogCtl->info_lck));
1245 S_INIT_LOCK(&(XLogCtl->lgwr_lck));
1248 * Open/read Control file
1251 fd = open(ControlFilePath, O_RDWR);
1252 if (fd < 0 && (errno == EMFILE || errno == ENFILE))
1255 if (!ReleaseDataFile())
1256 elog(STOP, "Open(cntlfile) failed: %d (and no one data file can be closed)",
1261 elog(STOP, "Open(cntlfile) failed: %d", errno);
1263 if (read(fd, ControlFile, BLCKSZ) != BLCKSZ)
1264 elog(STOP, "Read(cntlfile) failed: %d", errno);
1268 if (ControlFile->logSeg == 0 ||
1269 ControlFile->time <= 0 ||
1270 ControlFile->state < DB_SHUTDOWNED ||
1271 ControlFile->state > DB_IN_PRODUCTION ||
1272 !XRecOffIsValid(ControlFile->checkPoint.xrecoff))
1273 elog(STOP, "Control file context is broken");
1275 /* Check for incompatible database */
1276 if (ControlFile->blcksz != BLCKSZ)
1277 elog(STOP, "database was initialized with BLCKSZ %d,\n\tbut the backend was compiled with BLCKSZ %d.\n\tlooks like you need to initdb.",
1278 ControlFile->blcksz, BLCKSZ);
1279 if (ControlFile->relseg_size != RELSEG_SIZE)
1280 elog(STOP, "database was initialized with RELSEG_SIZE %d,\n\tbut the backend was compiled with RELSEG_SIZE %d.\n\tlooks like you need to initdb.",
1281 ControlFile->relseg_size, RELSEG_SIZE);
1282 if (ControlFile->catalog_version_no != CATALOG_VERSION_NO)
1283 elog(STOP, "database was initialized with CATALOG_VERSION_NO %d,\n\tbut the backend was compiled with CATALOG_VERSION_NO %d.\n\tlooks like you need to initdb.",
1284 ControlFile->catalog_version_no, CATALOG_VERSION_NO);
1286 if (ControlFile->state == DB_SHUTDOWNED)
1287 elog(LOG, "Data Base System was shutdowned at %s",
1288 str_time(ControlFile->time));
1289 else if (ControlFile->state == DB_SHUTDOWNING)
1290 elog(LOG, "Data Base System was interrupted when shutting down at %s",
1291 str_time(ControlFile->time));
1292 else if (ControlFile->state == DB_IN_RECOVERY)
1294 elog(LOG, "Data Base System was interrupted being in recovery at %s\n"
1295 "\tThis propably means that some data blocks are corrupted\n"
1296 "\tAnd you will have to use last backup for recovery",
1297 str_time(ControlFile->time));
1299 else if (ControlFile->state == DB_IN_PRODUCTION)
1300 elog(LOG, "Data Base System was interrupted being in production at %s",
1301 str_time(ControlFile->time));
1303 LastRec = RecPtr = ControlFile->checkPoint;
1304 if (!XRecOffIsValid(RecPtr.xrecoff))
1305 elog(STOP, "Invalid checkPoint in control file");
1306 elog(LOG, "CheckPoint record at (%u, %u)", RecPtr.xlogid, RecPtr.xrecoff);
1308 record = ReadRecord(&RecPtr, buffer);
1309 if (record->xl_rmid != RM_XLOG_ID)
1310 elog(STOP, "Invalid RMID in checkPoint record");
1311 if (record->xl_len != sizeof(checkPoint))
1312 elog(STOP, "Invalid length of checkPoint record");
1313 checkPoint = *((CheckPoint*)((char*)record + SizeOfXLogRecord));
1315 elog(LOG, "Redo record at (%u, %u); Undo record at (%u, %u)",
1316 checkPoint.redo.xlogid, checkPoint.redo.xrecoff,
1317 checkPoint.undo.xlogid, checkPoint.undo.xrecoff);
1318 elog(LOG, "NextTransactionId: %u; NextOid: %u",
1319 checkPoint.nextXid, checkPoint.nextOid);
1320 if (checkPoint.nextXid < FirstTransactionId ||
1321 checkPoint.nextOid < BootstrapObjectIdData)
1323 elog(STOP, "Invalid NextTransactionId/NextOid");
1325 elog(LOG, "Invalid NextTransactionId/NextOid");
1329 ShmemVariableCache->nextXid = checkPoint.nextXid;
1330 ShmemVariableCache->nextOid = checkPoint.nextOid;
1333 if (XLByteLT(RecPtr, checkPoint.redo))
1334 elog(STOP, "Invalid redo in checkPoint record");
1335 if (checkPoint.undo.xrecoff == 0)
1336 checkPoint.undo = RecPtr;
1337 if (XLByteLT(RecPtr, checkPoint.undo))
1338 elog(STOP, "Invalid undo in checkPoint record");
1340 if (XLByteLT(checkPoint.undo, RecPtr) || XLByteLT(checkPoint.redo, RecPtr))
1342 if (ControlFile->state == DB_SHUTDOWNED)
1343 elog(STOP, "Invalid Redo/Undo record in Shutdowned state");
1346 else if (ControlFile->state != DB_SHUTDOWNED)
1351 elog(LOG, "The DataBase system was not properly shutdowned\n"
1352 "\tAutomatic recovery is in progress...");
1353 ControlFile->state = DB_IN_RECOVERY;
1354 ControlFile->time = time(NULL);
1355 UpdateControlFile();
1357 sie_saved = StopIfError;
1360 /* Is REDO required ? */
1361 if (XLByteLT(checkPoint.redo, RecPtr))
1362 record = ReadRecord(&(checkPoint.redo), buffer);
1363 else /* read past CheckPoint record */
1364 record = ReadRecord(NULL, buffer);
1367 if (record->xl_len != 0)
1369 elog(LOG, "Redo starts at (%u, %u)",
1370 ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
1374 if (record->xl_xid >= ShmemVariableCache->nextXid)
1375 ShmemVariableCache->nextXid = record->xl_xid + 1;
1377 RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
1378 record = ReadRecord(NULL, buffer);
1379 } while (record->xl_len != 0);
1380 elog(LOG, "Redo done at (%u, %u)",
1381 ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
1382 LastRec = ReadRecPtr;
1386 elog(LOG, "Redo is not required");
1391 RecPtr = ReadRecPtr;
1392 if (XLByteLT(checkPoint.undo, RecPtr))
1394 elog(LOG, "Undo starts at (%u, %u)",
1395 RecPtr.xlogid, RecPtr.xrecoff);
1398 record = ReadRecord(&RecPtr, buffer);
1399 if (TransactionIdIsValid(record->xl_xid) &&
1400 !TransactionIdDidCommit(record->xl_xid))
1401 RmgrTable[record->xl_rmid].rm_undo(record);
1402 RecPtr = record->xl_prev;
1403 } while (XLByteLE(checkPoint.undo, RecPtr));
1404 elog(LOG, "Undo done at (%u, %u)",
1405 ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
1409 elog(LOG, "Undo is not required");
1414 /* Init xlog buffer cache */
1415 record = ReadRecord(&LastRec, buffer);
1416 logId = EndRecPtr.xlogid;
1417 logSeg = (EndRecPtr.xrecoff - 1) / XLogSegSize;
1419 logFile = XLogFileOpen(logId, logSeg, false);
1420 XLogCtl->xlblocks[0].xlogid = logId;
1421 XLogCtl->xlblocks[0].xrecoff =
1422 ((EndRecPtr.xrecoff - 1) / BLCKSZ + 1) * BLCKSZ;
1423 Insert = &XLogCtl->Insert;
1424 memcpy((char*)(Insert->currpage), readBuf, BLCKSZ);
1425 Insert->currpos = ((char*) Insert->currpage) +
1426 (EndRecPtr.xrecoff + BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
1427 Insert->PrevRecord = ControlFile->checkPoint;
1434 * Let resource managers know that recovery is done
1436 for (i = 0; i <= RM_MAX_ID; i++)
1437 RmgrTable[record->xl_rmid].rm_redo(ReadRecPtr, NULL);
1438 CreateCheckPoint(true);
1439 StopIfError = sie_saved;
1442 ControlFile->state = DB_IN_PRODUCTION;
1443 ControlFile->time = time(NULL);
1444 UpdateControlFile();
1446 elog(LOG, "Data Base System is in production state at %s", str_time(time(NULL)));
1452 * This func must be called ONCE on system shutdown
1458 elog(LOG, "Data Base System is shutting down at %s", str_time(time(NULL)));
1460 CreateCheckPoint(true);
1462 elog(LOG, "Data Base System is shutdowned at %s", str_time(time(NULL)));
1466 CreateCheckPoint(bool shutdown)
1468 CheckPoint checkPoint;
1470 XLogCtlInsert *Insert = &XLogCtl->Insert;
1474 memset(&checkPoint, 0, sizeof(checkPoint));
1477 ControlFile->state = DB_SHUTDOWNING;
1478 ControlFile->time = time(NULL);
1479 UpdateControlFile();
1482 /* Get REDO record ptr */
1483 while (TAS(&(XLogCtl->insert_lck)))
1485 struct timeval delay = {0, 5000};
1488 elog(STOP, "XLog insert lock is busy while data base is shutting down");
1489 (void) select(0, NULL, NULL, NULL, &delay);
1491 freespace = ((char*) Insert->currpage) + BLCKSZ - Insert->currpos;
1492 if (freespace < SizeOfXLogRecord)
1494 curridx = NextBufIdx(Insert->curridx);
1495 if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write))
1496 InitXLBuffer(curridx);
1499 freespace = BLCKSZ - SizeOfXLogPHD;
1502 curridx = Insert->curridx;
1503 checkPoint.redo.xlogid = XLogCtl->xlblocks[curridx].xlogid;
1504 checkPoint.redo.xrecoff = XLogCtl->xlblocks[curridx].xrecoff - BLCKSZ +
1505 Insert->currpos - ((char*) Insert->currpage);
1506 S_UNLOCK(&(XLogCtl->insert_lck));
1508 SpinAcquire(XidGenLockId);
1509 checkPoint.nextXid = ShmemVariableCache->nextXid;
1510 SpinRelease(XidGenLockId);
1511 SpinAcquire(OidGenLockId);
1512 checkPoint.nextOid = ShmemVariableCache->nextOid;
1513 SpinRelease(OidGenLockId);
1517 /* Get UNDO record ptr */
1518 checkPoint.undo.xrecoff = 0;
1520 if (shutdown && checkPoint.undo.xrecoff != 0)
1521 elog(STOP, "Active transaction while data base is shutting down");
1523 recptr = XLogInsert(RM_XLOG_ID, (char*)&checkPoint, sizeof(checkPoint), NULL, 0);
1525 if (shutdown && !XLByteEQ(checkPoint.redo, MyLastRecPtr))
1526 elog(STOP, "XLog concurrent activity while data base is shutting down");
1530 SpinAcquire(ControlFileLockId);
1532 ControlFile->state = DB_SHUTDOWNED;
1533 ControlFile->checkPoint = MyLastRecPtr;
1534 ControlFile->time = time(NULL);
1535 UpdateControlFile();
1536 SpinRelease(ControlFileLockId);