]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/xlog.c
Add a notion of a 'catalog version number' that can indicate
[postgresql] / src / backend / access / transam / xlog.c
1 /*------------------------------------------------------------------------- 
2  *
3  * xlog.c
4  *
5  *
6  * Copyright (c) 1994, Regents of the University of California
7  *
8  * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.6 1999/10/24 20:42:27 tgl Exp $
9  *
10  *-------------------------------------------------------------------------
11  */
12 #include <fcntl.h>
13 #include <unistd.h>
14 #include <errno.h>
15 #include <sys/stat.h>
16 #include <sys/time.h>
17
18 #include "postgres.h"
19
20 #include "access/xlog.h"
21 #include "access/xact.h"
22 #include "catalog/catversion.h"
23 #include "storage/sinval.h"
24 #include "storage/proc.h"
25 #include "storage/spin.h"
26 #include "storage/s_lock.h"
27
28 void            UpdateControlFile(void);
29 int                     XLOGShmemSize(void);
30 void            XLOGShmemInit(void);
31 void            BootStrapXLOG(void);
32 void            StartupXLOG(void);
33 void            ShutdownXLOG(void);
34 void            CreateCheckPoint(bool shutdown);
35
36 char            XLogDir[MAXPGPATH+1];
37 char            ControlFilePath[MAXPGPATH+1];
38 uint32          XLOGbuffers = 0;
39 XLogRecPtr      MyLastRecPtr = {0, 0};
40 bool            StopIfError = false;
41
42 SPINLOCK        ControlFileLockId;
43 SPINLOCK        XidGenLockId;
44
45 extern bool                             ReleaseDataFile(void);
46
47 extern VariableCache    ShmemVariableCache;
48
49 #define MinXLOGbuffers  4
50
51 typedef struct XLgwrRqst
52 {
53         XLogRecPtr              Write;          /* byte (1-based) to write out */
54         XLogRecPtr              Flush;          /* byte (1-based) to flush */
55 } XLgwrRqst;
56
57 typedef struct XLgwrResult
58 {
59         XLogRecPtr              Write;          /* bytes written out */
60         XLogRecPtr              Flush;          /* bytes flushed */
61 } XLgwrResult;
62
63 typedef struct XLogCtlInsert
64 {
65         XLgwrResult             LgwrResult;
66         XLogRecPtr              PrevRecord;
67         uint16                  curridx;        /* current block index in cache */
68         XLogPageHeader  currpage;
69         char               *currpos;
70 } XLogCtlInsert;
71
72 typedef struct XLogCtlWrite
73 {
74         XLgwrResult             LgwrResult;
75         uint16                  curridx;        /* index of next block to write */
76 } XLogCtlWrite;
77
78 typedef struct XLogCtlData
79 {
80         XLogCtlInsert   Insert;
81         XLgwrRqst               LgwrRqst;
82         XLgwrResult             LgwrResult;
83         XLogCtlWrite    Write;
84         char               *pages;
85         XLogRecPtr         *xlblocks;   /* 1st byte ptr-s + BLCKSZ */
86         uint32                  XLogCacheByte;
87         uint32                  XLogCacheBlck;
88 #ifdef HAS_TEST_AND_SET
89         slock_t                 insert_lck;
90         slock_t                 info_lck;
91         slock_t                 lgwr_lck;
92 #endif
93 } XLogCtlData;
94
95 static XLogCtlData                 *XLogCtl = NULL;
96
97 typedef enum DBState
98 {
99         DB_STARTUP = 0,
100         DB_SHUTDOWNED,
101         DB_SHUTDOWNING,
102         DB_IN_RECOVERY,
103         DB_IN_PRODUCTION
104 } DBState;
105
106 typedef struct ControlFileData
107 {
108         uint32                  logId;                  /* current log file id */
109         uint32                  logSeg;                 /* current log file segment (1-based) */
110         XLogRecPtr              checkPoint;             /* last check point record ptr */
111         time_t                  time;                   /* time stamp of last modification */
112         DBState                 state;                  /* */
113
114         /*
115          * this data is used to make sure that configuration of this DB
116          * is compatible with the current backend
117          */
118         uint32                  blcksz;                 /* block size for this DB */
119         uint32                  relseg_size;    /* blocks per segment of large relation */
120         uint32                  catalog_version_no;     /* internal version number */
121
122         /*
123          * MORE DATA FOLLOWS AT THE END OF THIS STRUCTURE
124          * - locations of data dirs 
125          */
126 } ControlFileData;
127
128 static ControlFileData     *ControlFile = NULL;
129
130 typedef struct CheckPoint
131 {
132         XLogRecPtr              redo;           /* next RecPtr available when we */
133                                                                 /* began to create CheckPoint */
134                                                                 /* (i.e. REDO start point) */
135         XLogRecPtr              undo;           /* first record of oldest in-progress */
136                                                                 /* transaction when we started */
137                                                                 /* (i.e. UNDO end point) */
138         TransactionId   nextXid;
139         Oid                             nextOid;
140 } CheckPoint;
141
142 /* 
143  * We break each log file in 16Mb segments 
144  */
145 #define XLogSegSize             (16*1024*1024)
146 #define XLogLastSeg             (0xffffffff / XLogSegSize)
147 #define XLogFileSize    (XLogLastSeg * XLogSegSize)
148
149 #define XLogFileName(path, log, seg)    \
150                         sprintf(path, "%.*s%c%08X%08X",         \
151                         MAXPGPATH, XLogDir, SEP_CHAR, log, seg)
152
153 #define PrevBufIdx(curridx)             \
154                 ((curridx == 0) ? XLogCtl->XLogCacheBlck : (curridx - 1))
155
156 #define NextBufIdx(curridx)             \
157                 ((curridx == XLogCtl->XLogCacheBlck) ? 0 : (curridx + 1))
158
159 #define XLByteLT(left, right)           \
160                         (right.xlogid > left.xlogid || \
161                         (right.xlogid == left.xlogid && right.xrecoff > left.xrecoff))
162
163 #define XLByteLE(left, right)           \
164                         (right.xlogid > left.xlogid || \
165                         (right.xlogid == left.xlogid && right.xrecoff >=  left.xrecoff))
166
167 #define XLByteEQ(left, right)           \
168                         (right.xlogid == left.xlogid && right.xrecoff ==  left.xrecoff)
169
170 #define InitXLBuffer(curridx)   (\
171                                 XLogCtl->xlblocks[curridx].xrecoff = \
172                                 (XLogCtl->xlblocks[Insert->curridx].xrecoff == XLogFileSize) ? \
173                                 BLCKSZ : (XLogCtl->xlblocks[Insert->curridx].xrecoff + BLCKSZ), \
174                                 XLogCtl->xlblocks[curridx].xlogid = \
175                                 (XLogCtl->xlblocks[Insert->curridx].xrecoff == XLogFileSize) ? \
176                                 (XLogCtl->xlblocks[Insert->curridx].xlogid + 1) : \
177                                 XLogCtl->xlblocks[Insert->curridx].xlogid, \
178                                 Insert->curridx = curridx, \
179                                 Insert->currpage = (XLogPageHeader) (XLogCtl->pages + curridx * BLCKSZ), \
180                                 Insert->currpos = \
181                                         ((char*) Insert->currpage) + SizeOfXLogPHD, \
182                                 Insert->currpage->xlp_magic = XLOG_PAGE_MAGIC, \
183                                 Insert->currpage->xlp_info = 0 \
184                                 )
185
186 #define XRecOffIsValid(xrecoff) \
187                 (xrecoff % BLCKSZ >= SizeOfXLogPHD && \
188                 (BLCKSZ - xrecoff % BLCKSZ) >= SizeOfXLogRecord)
189
190 static void                             GetFreeXLBuffer(void);
191 static void                             XLogWrite(char *buffer);
192 static int                              XLogFileInit(uint32 log, uint32 seg);
193 static int                              XLogFileOpen(uint32 log, uint32 seg, bool econt);
194 static XLogRecord          *ReadRecord(XLogRecPtr *RecPtr, char *buffer);
195 static char                        *str_time(time_t tnow);
196
197 static XLgwrResult              LgwrResult = {{0, 0}, {0, 0}};
198 static XLgwrRqst                LgwrRqst = {{0, 0}, {0, 0}};
199
200 static int                              logFile = -1;
201 static uint32                   logId = 0;
202 static uint32                   logSeg = 0;
203 static uint32                   logOff = 0;
204
205 static XLogRecPtr               ReadRecPtr;
206 static XLogRecPtr               EndRecPtr;
207 static int                              readFile = -1;
208 static uint32                   readId = 0;
209 static uint32                   readSeg = 0;
210 static uint32                   readOff = 0;
211 static char                             readBuf[BLCKSZ];
212 static XLogRecord          *nextRecord = NULL;
213
214 XLogRecPtr
215 XLogInsert(RmgrId rmid, char *hdr, uint32 hdrlen, char *buf, uint32 buflen)
216 {
217         XLogCtlInsert      *Insert = &XLogCtl->Insert;
218         XLogRecord                 *record;
219         XLogSubRecord      *subrecord;
220         XLogRecPtr                      RecPtr;
221         uint32                          len = hdrlen + buflen,
222                                                 freespace,
223                                                 wlen;
224         uint16                          curridx;
225         bool                            updrqst = false;
226
227         if (len == 0 || len > MAXLOGRECSZ)
228                 elog(STOP, "XLogInsert: invalid record len %u", len);
229
230         /* obtain xlog insert lock */
231         if (TAS(&(XLogCtl->insert_lck)))        /* busy */
232         {
233                 bool            do_lgwr = true;
234                 unsigned        i = 0;
235
236                 for ( ; ; )
237                 {
238                         /* try to read LgwrResult while waiting for insert lock */
239                         if (!TAS(&(XLogCtl->info_lck)))
240                         {
241                                 LgwrRqst = XLogCtl->LgwrRqst;
242                                 LgwrResult = XLogCtl->LgwrResult;
243                                 S_UNLOCK(&(XLogCtl->info_lck));
244                                 /*
245                                  * If cache is half filled then try to acquire lgwr lock
246                                  * and do LGWR work, but only once.
247                                  */
248                                 if (do_lgwr && 
249                                         (LgwrRqst.Write.xlogid != LgwrResult.Write.xlogid || 
250                                         (LgwrRqst.Write.xrecoff - LgwrResult.Write.xrecoff >=
251                                         XLogCtl->XLogCacheByte / 2)))
252                                 {
253                                         if (!TAS(&(XLogCtl->lgwr_lck)))
254                                         {
255                                                 LgwrResult = XLogCtl->Write.LgwrResult;
256                                                 if (!TAS(&(XLogCtl->info_lck)))
257                                                 {
258                                                         LgwrRqst = XLogCtl->LgwrRqst;
259                                                         S_UNLOCK(&(XLogCtl->info_lck));
260                                                 }
261                                                 if (XLByteLT(LgwrResult.Write, LgwrRqst.Write))
262                                                 {
263                                                         XLogWrite(NULL);
264                                                         do_lgwr = false;
265                                                 }
266                                                 S_UNLOCK(&(XLogCtl->lgwr_lck));
267                                         }
268                                 }
269                         }
270                         s_lock_sleep(i++);
271                         if (!TAS(&(XLogCtl->insert_lck)))
272                                 break;
273                 }
274         }
275
276         freespace = ((char*) Insert->currpage) + BLCKSZ - Insert->currpos;
277         if (freespace < SizeOfXLogRecord)
278         {
279                 curridx = NextBufIdx(Insert->curridx);
280                 if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write))
281                         InitXLBuffer(curridx);
282                 else 
283                         GetFreeXLBuffer();
284                 freespace = BLCKSZ - SizeOfXLogPHD;
285         }
286         else
287                 curridx = Insert->curridx;
288
289         freespace -= SizeOfXLogRecord;
290         record = (XLogRecord*) Insert->currpos;
291         record->xl_prev = Insert->PrevRecord;
292         if (rmid != RM_XLOG_ID)
293                 record->xl_xact_prev = MyLastRecPtr;
294         else
295         {
296                 record->xl_xact_prev.xlogid = 0;
297                 record->xl_xact_prev.xrecoff = 0;
298         }
299         record->xl_xid = GetCurrentTransactionId();
300         record->xl_len = (len > freespace) ? freespace : len;
301         record->xl_info = (len > freespace) ? XLR_TO_BE_CONTINUED : 0;
302         record->xl_rmid = rmid;
303         RecPtr.xlogid = XLogCtl->xlblocks[curridx].xlogid;
304         RecPtr.xrecoff = 
305                 XLogCtl->xlblocks[curridx].xrecoff - BLCKSZ + 
306                 Insert->currpos - ((char*) Insert->currpage);
307         if (MyLastRecPtr.xrecoff == 0 && rmid != RM_XLOG_ID)
308         {
309                 SpinAcquire(SInvalLock);
310                 MyProc->logRec = RecPtr;
311                 SpinRelease(SInvalLock);
312         }
313         MyLastRecPtr = RecPtr;
314         RecPtr.xrecoff += record->xl_len;
315         Insert->currpos += SizeOfXLogRecord;
316         if (freespace > 0)
317         {
318                 wlen = (hdrlen > freespace) ? freespace : hdrlen;
319                 memcpy(Insert->currpos, hdr, wlen);
320                 freespace -= wlen;
321                 hdrlen -= wlen;
322                 hdr += wlen;
323                 Insert->currpos += wlen;
324                 if (buflen > 0 && freespace > 0)
325                 {
326                         wlen = (buflen > freespace) ? freespace : buflen;
327                         memcpy(Insert->currpos, buf, wlen);
328                         freespace -= wlen;
329                         buflen -= wlen;
330                         buf += wlen;
331                         Insert->currpos += wlen;
332                 }
333                 Insert->currpos = ((char*)Insert->currpage) + 
334                                         DOUBLEALIGN(Insert->currpos - ((char*)Insert->currpage));
335                 len = hdrlen + buflen;
336         }
337
338         if (len != 0)
339         {
340 nbuf:
341                 curridx = NextBufIdx(curridx);
342                 if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write))
343                 {
344                         InitXLBuffer(curridx);
345                         updrqst = true;
346                 }
347                 else
348                 {
349                         GetFreeXLBuffer();
350                         updrqst = false;
351                 }
352                 freespace = BLCKSZ - SizeOfXLogPHD - SizeOfXLogSubRecord;
353                 Insert->currpage->xlp_info |= XLP_FIRST_IS_SUBRECORD;
354                 subrecord = (XLogSubRecord*) Insert->currpos;
355                 Insert->currpos += SizeOfXLogSubRecord;
356                 if (hdrlen > freespace)
357                 {
358                         subrecord->xl_len = freespace;
359                         subrecord->xl_info = XLR_TO_BE_CONTINUED;
360                         memcpy(Insert->currpos, hdr, freespace);
361                         hdrlen -= freespace;
362                         hdr += freespace;
363                         goto nbuf;
364                 }
365                 else if (hdrlen > 0)
366                 {
367                         subrecord->xl_len = hdrlen;
368                         memcpy(Insert->currpos, hdr, hdrlen);
369                         Insert->currpos += hdrlen;
370                         freespace -= hdrlen;
371                         hdrlen = 0;
372                 }
373                 else
374                         subrecord->xl_len = 0;
375                 if (buflen > freespace)
376                 {
377                         subrecord->xl_len += freespace;
378                         subrecord->xl_info = XLR_TO_BE_CONTINUED;
379                         memcpy(Insert->currpos, buf, freespace);
380                         buflen -= freespace;
381                         buf += freespace;
382                         goto nbuf;
383                 }
384                 else if (buflen > 0)
385                 {
386                         subrecord->xl_len += buflen;
387                         memcpy(Insert->currpos, buf, buflen);
388                         Insert->currpos += buflen;
389                 }
390                 subrecord->xl_info = 0;
391                 RecPtr.xlogid = XLogCtl->xlblocks[curridx].xlogid;
392                 RecPtr.xrecoff = XLogCtl->xlblocks[curridx].xrecoff - 
393                                 BLCKSZ + SizeOfXLogPHD + subrecord->xl_len;
394                 Insert->currpos = ((char*)Insert->currpage) + 
395                                         DOUBLEALIGN(Insert->currpos - ((char*)Insert->currpage));
396         }
397         freespace = ((char*) Insert->currpage) + BLCKSZ - Insert->currpos;
398         /*
399          * All done! Update global LgwrRqst if some block was filled up.
400          */
401         if (freespace < SizeOfXLogRecord)
402                 updrqst = true;         /* curridx is filled and available for writing out */
403         else
404                 curridx = PrevBufIdx(curridx);
405         LgwrRqst.Write = XLogCtl->xlblocks[curridx];
406
407         S_UNLOCK(&(XLogCtl->insert_lck));
408
409         if (updrqst)
410         {
411                 unsigned        i = 0;
412
413                 for ( ; ; )
414                 {
415                         if (!TAS(&(XLogCtl->info_lck)))
416                         {
417                                 if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrRqst.Write))
418                                         XLogCtl->LgwrRqst.Write = LgwrRqst.Write;
419                                 S_UNLOCK(&(XLogCtl->info_lck));
420                                 break;
421                         }
422                         s_lock_sleep(i++);
423                 }
424         }
425
426         return (RecPtr);
427 }       
428
429 void
430 XLogFlush(XLogRecPtr record)
431 {
432         XLogRecPtr              WriteRqst;
433         char                    buffer[BLCKSZ];
434         char               *usebuf = NULL;
435         unsigned                i = 0;
436         bool                    force_lgwr = false;
437
438         if (XLByteLE(record, LgwrResult.Flush))
439                 return;
440         WriteRqst = LgwrRqst.Write;
441         for ( ; ; )
442         {
443                 /* try to read LgwrResult */
444                 if (!TAS(&(XLogCtl->info_lck)))
445                 {
446                         LgwrResult = XLogCtl->LgwrResult;
447                         if (XLByteLE(record, LgwrResult.Flush))
448                         {
449                                 S_UNLOCK(&(XLogCtl->info_lck));
450                                 return;
451                         }
452                         if (XLByteLT(XLogCtl->LgwrRqst.Flush, record))
453                                 XLogCtl->LgwrRqst.Flush = record;
454                         if (XLByteLT(WriteRqst, XLogCtl->LgwrRqst.Write))
455                         {
456                                 WriteRqst = XLogCtl->LgwrRqst.Write;
457                                 usebuf = NULL;
458                         }
459                         S_UNLOCK(&(XLogCtl->info_lck));
460                 }
461                 /* if something was added to log cache then try to flush this too */
462                 if (!TAS(&(XLogCtl->insert_lck)))
463                 {
464                         XLogCtlInsert      *Insert = &XLogCtl->Insert;
465                         uint32                          freespace = 
466                                         ((char*) Insert->currpage) + BLCKSZ - Insert->currpos;
467
468                         if (freespace < SizeOfXLogRecord)       /* buffer is full */
469                         {
470                                 usebuf = NULL;
471                                 LgwrRqst.Write = WriteRqst = XLogCtl->xlblocks[Insert->curridx];
472                         }
473                         else
474                         {
475                                 usebuf = buffer;
476                                 memcpy(usebuf, Insert->currpage, BLCKSZ - freespace);
477                                 memset(usebuf + BLCKSZ - freespace, 0, freespace);
478                                 WriteRqst = XLogCtl->xlblocks[Insert->curridx];
479                                 WriteRqst.xrecoff = WriteRqst.xrecoff - BLCKSZ + 
480                                                 Insert->currpos - ((char*) Insert->currpage);
481                         }
482                         S_UNLOCK(&(XLogCtl->insert_lck));
483                         force_lgwr = true;
484                 }
485                 if (force_lgwr || WriteRqst.xlogid > record.xlogid || 
486                         (WriteRqst.xlogid == record.xlogid && 
487                          WriteRqst.xrecoff >= record.xrecoff + BLCKSZ))
488                 {
489                         if (!TAS(&(XLogCtl->lgwr_lck)))
490                         {
491                                 LgwrResult = XLogCtl->Write.LgwrResult;
492                                 if (XLByteLE(record, LgwrResult.Flush))
493                                 {
494                                         S_UNLOCK(&(XLogCtl->lgwr_lck));
495                                         return;
496                                 }
497                                 if (XLByteLT(LgwrResult.Write, WriteRqst))
498                                 {
499                                         LgwrRqst.Flush = LgwrRqst.Write = WriteRqst;
500                                         XLogWrite(usebuf);
501                                         S_UNLOCK(&(XLogCtl->lgwr_lck));
502                                         if (XLByteLT(LgwrResult.Flush, record))
503                                                 elog(STOP, "XLogFlush: request is not satisfyed");
504                                         return;
505                                 }
506                                 break;
507                         }
508                 }
509                 s_lock_sleep(i++);
510         }
511
512         if (logFile >= 0 && (LgwrResult.Write.xlogid != logId || 
513                 (LgwrResult.Write.xrecoff - 1) / XLogSegSize != logSeg))
514         {
515                 if (close(logFile) != 0)
516                         elog(STOP, "Close(logfile %u seg %u) failed: %d", 
517                                                 logId, logSeg, errno);
518                 logFile = -1;
519         }
520
521         if (logFile < 0)
522         {
523                 logId = LgwrResult.Write.xlogid;
524                 logSeg = (LgwrResult.Write.xrecoff - 1) / XLogSegSize;
525                 logOff = 0;
526                 logFile = XLogFileOpen(logId, logSeg, false);
527         }
528
529         if (fsync(logFile) != 0)
530                 elog(STOP, "Fsync(logfile %u seg %u) failed: %d", 
531                                         logId, logSeg, errno);
532         LgwrResult.Flush = LgwrResult.Write;
533
534         for (i = 0; ; )
535         {
536                 if (!TAS(&(XLogCtl->info_lck)))
537                 {
538                         XLogCtl->LgwrResult = LgwrResult;
539                         if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrResult.Write))
540                                 XLogCtl->LgwrRqst.Write = LgwrResult.Write;
541                         S_UNLOCK(&(XLogCtl->info_lck));
542                         break;
543                 }
544                 s_lock_sleep(i++);
545         }
546         XLogCtl->Write.LgwrResult = LgwrResult;
547
548         S_UNLOCK(&(XLogCtl->lgwr_lck));
549         return;
550
551 }
552
553 static void
554 GetFreeXLBuffer()
555 {
556         XLogCtlInsert      *Insert = &XLogCtl->Insert;
557         XLogCtlWrite       *Write = &XLogCtl->Write;
558         uint16                          curridx = NextBufIdx(Insert->curridx);
559
560         LgwrRqst.Write = XLogCtl->xlblocks[Insert->curridx];
561         for ( ; ; )
562         {
563                 if (!TAS(&(XLogCtl->info_lck)))
564                 {
565                         LgwrResult = XLogCtl->LgwrResult;
566                         XLogCtl->LgwrRqst.Write = LgwrRqst.Write;
567                         S_UNLOCK(&(XLogCtl->info_lck));
568                         if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write))
569                         {
570                                 Insert->LgwrResult = LgwrResult;
571                                 InitXLBuffer(curridx);
572                                 return;
573                         }
574                 }
575                 /*
576                  * LgwrResult lock is busy or un-updated. Try to acquire lgwr lock
577                  * and write full blocks.
578                  */
579                 if (!TAS(&(XLogCtl->lgwr_lck)))
580                 {
581                         LgwrResult = Write->LgwrResult;
582                         if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write))
583                         {
584                                 S_UNLOCK(&(XLogCtl->lgwr_lck));
585                                 Insert->LgwrResult = LgwrResult;
586                                 InitXLBuffer(curridx);
587                                 return;
588                         }
589                         /* 
590                          * Have to write buffers while holding insert lock -
591                          * not good...
592                          */
593                         XLogWrite(NULL);
594                         S_UNLOCK(&(XLogCtl->lgwr_lck));
595                         Insert->LgwrResult = LgwrResult;
596                         InitXLBuffer(curridx);
597                         return;
598                 }
599         }
600
601         return;
602 }
603
604 static void
605 XLogWrite(char *buffer)
606 {
607         XLogCtlWrite   *Write = &XLogCtl->Write;
608         char               *from;
609         uint32                  wcnt = 0;
610         int                             i = 0;
611
612         for ( ; XLByteLT(LgwrResult.Write,      LgwrRqst.Write); )
613         {
614                 LgwrResult.Write = XLogCtl->xlblocks[Write->curridx];
615                 if (LgwrResult.Write.xlogid != logId || 
616                         (LgwrResult.Write.xrecoff - 1) / XLogSegSize != logSeg)
617                 {
618                         if (wcnt > 0)
619                         {
620                                 if (fsync(logFile) != 0)
621                                         elog(STOP, "Fsync(logfile %u seg %u) failed: %d", 
622                                                                 logId, logSeg, errno);
623                                 if (LgwrResult.Write.xlogid != logId)
624                                         LgwrResult.Flush.xrecoff = XLogFileSize;
625                                 else
626                                         LgwrResult.Flush.xrecoff = LgwrResult.Write.xrecoff - BLCKSZ;
627                                 LgwrResult.Flush.xlogid = logId;
628                                 if (!TAS(&(XLogCtl->info_lck)))
629                                 {
630                                         XLogCtl->LgwrResult.Flush = LgwrResult.Flush;
631                                         XLogCtl->LgwrResult.Write = LgwrResult.Flush;
632                                         if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrResult.Flush))
633                                                 XLogCtl->LgwrRqst.Write = LgwrResult.Flush;
634                                         if (XLByteLT(XLogCtl->LgwrRqst.Flush, LgwrResult.Flush))
635                                                 XLogCtl->LgwrRqst.Flush = LgwrResult.Flush;
636                                         S_UNLOCK(&(XLogCtl->info_lck));
637                                 }
638                         }
639                         if (logFile >= 0)
640                         {
641                                 if (close(logFile) != 0)
642                                         elog(STOP, "Close(logfile %u seg %u) failed: %d", 
643                                                                 logId, logSeg, errno);
644                                 logFile = -1;
645                         }
646                         logId = LgwrResult.Write.xlogid;
647                         logSeg = (LgwrResult.Write.xrecoff - 1) / XLogSegSize;
648                         logOff = 0;
649                         logFile = XLogFileInit(logId, logSeg);
650                         SpinAcquire(ControlFileLockId);
651                         ControlFile->logId = logId;
652                         ControlFile->logSeg = logSeg + 1;
653                         ControlFile->time = time(NULL);
654                         UpdateControlFile();
655                         SpinRelease(ControlFileLockId);
656                 }
657
658                 if (logFile < 0)
659                 {
660                         logId = LgwrResult.Write.xlogid;
661                         logSeg = (LgwrResult.Write.xrecoff - 1) / XLogSegSize;
662                         logOff = 0;
663                         logFile = XLogFileOpen(logId, logSeg, false);
664                 }
665
666                 if (logOff != (LgwrResult.Write.xrecoff - BLCKSZ) % XLogSegSize)
667                 {
668                         logOff = (LgwrResult.Write.xrecoff - BLCKSZ) % XLogSegSize;
669                         if (lseek(logFile, (off_t)logOff, SEEK_SET) < 0)
670                                 elog(STOP, "Lseek(logfile %u seg %u off %u) failed: %d", 
671                                                         logId, logSeg, logOff, errno);
672                 }
673
674                 if (buffer != NULL && XLByteLT(LgwrRqst.Write, LgwrResult.Write))
675                         from = buffer;
676                 else
677                         from = XLogCtl->pages + Write->curridx * BLCKSZ;
678
679                 if (write(logFile, from, BLCKSZ) != BLCKSZ)
680                         elog(STOP, "Write(logfile %u seg %u off %u) failed: %d", 
681                                                 logId, logSeg, logOff, errno);
682
683                 wcnt++;
684                 logOff += BLCKSZ;
685
686                 if (from != buffer)
687                         Write->curridx = NextBufIdx(Write->curridx);
688                 else
689                         LgwrResult.Write = LgwrRqst.Write;
690         }
691         if (wcnt == 0)
692                 elog(STOP, "XLogWrite: nothing written");
693
694         if (XLByteLT(LgwrResult.Flush, LgwrRqst.Flush) && 
695                 XLByteLE(LgwrRqst.Flush, LgwrResult.Write))
696         {
697                 if (fsync(logFile) != 0)
698                         elog(STOP, "Fsync(logfile %u seg %u) failed: %d", 
699                                                 logId, logSeg, errno);
700                 LgwrResult.Flush = LgwrResult.Write;
701         }
702
703         for ( ; ; )
704         {
705                 if (!TAS(&(XLogCtl->info_lck)))
706                 {
707                         XLogCtl->LgwrResult = LgwrResult;
708                         if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrResult.Write))
709                                 XLogCtl->LgwrRqst.Write = LgwrResult.Write;
710                         S_UNLOCK(&(XLogCtl->info_lck));
711                         break;
712                 }
713                 s_lock_sleep(i++);
714         }
715         Write->LgwrResult = LgwrResult;
716 }
717
718 static int
719 XLogFileInit(uint32 log, uint32 seg)
720 {
721         char    path[MAXPGPATH+1];
722         int             fd;
723
724         XLogFileName(path, log, seg);
725         unlink(path);
726
727 tryAgain:
728         fd = open(path, O_RDWR|O_CREAT|O_EXCL, S_IRUSR|S_IWUSR);
729         if (fd < 0 && (errno == EMFILE || errno == ENFILE))
730         {
731                 fd = errno;
732                 if (!ReleaseDataFile())
733                         elog(STOP, "Create(logfile %u seg %u) failed: %d (and no one data file can be closed)", 
734                                                 logId, logSeg, fd);
735                 goto tryAgain;
736         }
737         if (fd < 0)
738                 elog(STOP, "Init(logfile %u seg %u) failed: %d", 
739                                         logId, logSeg, errno);
740
741         if (lseek(fd, XLogSegSize - 1, SEEK_SET) != (off_t) (XLogSegSize - 1))
742                 elog(STOP, "Lseek(logfile %u seg %u) failed: %d", 
743                                         logId, logSeg, errno);
744
745         if (write(fd, "", 1) != 1)
746                 elog(STOP, "Init(logfile %u seg %u) failed: %d", 
747                                         logId, logSeg, errno);
748
749         if (fsync(fd) != 0)
750                 elog(STOP, "Fsync(logfile %u seg %u) failed: %d", 
751                                         logId, logSeg, errno);
752
753         if (lseek(fd, 0, SEEK_SET) < 0)
754                         elog(STOP, "Lseek(logfile %u seg %u off %u) failed: %d", 
755                                                 log, seg, 0, errno);
756
757         return(fd);
758 }
759
760 static int
761 XLogFileOpen(uint32 log, uint32 seg, bool econt)
762 {
763         char    path[MAXPGPATH+1];
764         int             fd;
765
766         XLogFileName(path, log, seg);
767
768 tryAgain:
769         fd = open(path, O_RDWR);
770         if (fd < 0 && (errno == EMFILE || errno == ENFILE))
771         {
772                 fd = errno;
773                 if (!ReleaseDataFile())
774                         elog(STOP, "Open(logfile %u seg %u) failed: %d (and no one data file can be closed)", 
775                                                 logId, logSeg, fd);
776                 goto tryAgain;
777         }
778         if (fd < 0)
779         {
780                 if (econt && errno == ENOENT)
781                 {
782                         elog(LOG, "Open(logfile %u seg %u) failed: file doesn't exist",
783                                                 logId, logSeg);
784                         return (fd);
785                 }
786                 elog(STOP, "Open(logfile %u seg %u) failed: %d", 
787                                         logId, logSeg, errno);
788         }
789
790         return(fd);
791 }
792
793 static XLogRecord*
794 ReadRecord(XLogRecPtr *RecPtr, char *buffer)
795 {
796         XLogRecord         *record;
797         XLogRecPtr              tmpRecPtr = EndRecPtr;
798         bool                    nextmode = (RecPtr == NULL);
799         int                             emode = (nextmode) ? LOG : STOP;
800         bool                    noBlck = false;
801
802         if (nextmode)
803         {
804                 RecPtr = &tmpRecPtr;
805                 if (nextRecord != NULL)
806                 {
807                         record = nextRecord;
808                         goto got_record;
809                 }
810                 if (tmpRecPtr.xrecoff % BLCKSZ != 0)
811                         tmpRecPtr.xrecoff += (BLCKSZ - tmpRecPtr.xrecoff % BLCKSZ);
812                 if (tmpRecPtr.xrecoff >= XLogFileSize)
813                 {
814                         (tmpRecPtr.xlogid)++;
815                         tmpRecPtr.xrecoff = 0;
816                 }
817                 tmpRecPtr.xrecoff += SizeOfXLogPHD;
818         }
819         else if (!XRecOffIsValid(RecPtr->xrecoff))
820                 elog(STOP, "ReadRecord: invalid record offset in (%u, %u)",
821                                         RecPtr->xlogid, RecPtr->xrecoff);
822
823         if (readFile >= 0 && (RecPtr->xlogid != readId || 
824                 RecPtr->xrecoff / XLogSegSize != readSeg))
825         {
826                 close(readFile);
827                 readFile = -1;
828         }
829         readId = RecPtr->xlogid;
830         readSeg = RecPtr->xrecoff / XLogSegSize;
831         if (readFile < 0)
832         {
833                 noBlck = true;
834                 readFile = XLogFileOpen(readId, readSeg, nextmode);
835                 if (readFile < 0)
836                         goto next_record_is_invalid;
837         }
838
839         if (noBlck || readOff != (RecPtr->xrecoff % XLogSegSize) / BLCKSZ)
840         {
841                 readOff = (RecPtr->xrecoff % XLogSegSize) / BLCKSZ;
842                 if (lseek(readFile, (off_t)(readOff * BLCKSZ), SEEK_SET) < 0)
843                         elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d", 
844                                                 readId, readSeg, readOff, errno);
845                 if (read(readFile, readBuf, BLCKSZ) != BLCKSZ)
846                         elog(STOP, "ReadRecord: read(logfile %u seg %u off %u) failed: %d", 
847                                                 readId, readSeg, readOff, errno);
848                 if (((XLogPageHeader)readBuf)->xlp_magic != XLOG_PAGE_MAGIC)
849                 {
850                         elog(emode, "ReadRecord: invalid magic number %u in logfile %u seg %u off %u",
851                                 ((XLogPageHeader)readBuf)->xlp_magic,
852                                 readId, readSeg, readOff);
853                         goto next_record_is_invalid;
854                 }
855         }
856         if ((((XLogPageHeader)readBuf)->xlp_info & XLP_FIRST_IS_SUBRECORD) && 
857                 RecPtr->xrecoff % BLCKSZ == SizeOfXLogPHD)
858         {
859                 elog(emode, "ReadRecord: subrecord is requested by (%u, %u)",
860                                         RecPtr->xlogid, RecPtr->xrecoff);
861                 goto next_record_is_invalid;
862         }
863         record = (XLogRecord*)((char*) readBuf + RecPtr->xrecoff % BLCKSZ);
864
865 got_record:;
866         if (record->xl_len == 0 || record->xl_len > 
867                 (BLCKSZ - RecPtr->xrecoff % BLCKSZ - SizeOfXLogRecord))
868         {
869                 elog(emode, "ReadRecord: invalid record len %u in (%u, %u)",
870                                         record->xl_len, RecPtr->xlogid, RecPtr->xrecoff);
871                 goto next_record_is_invalid;
872         }
873         if (record->xl_rmid > RM_MAX_ID)
874         {
875                 elog(emode, "ReadRecord: invalid resource managed id %u in (%u, %u)",
876                                         record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff);
877                 goto next_record_is_invalid;
878         }
879         nextRecord = NULL;
880         if (record->xl_info & XLR_TO_BE_CONTINUED)
881         {
882                 XLogSubRecord      *subrecord;
883                 uint32                          len = record->xl_len;
884
885                 if (record->xl_len + RecPtr->xrecoff % BLCKSZ + SizeOfXLogRecord != BLCKSZ)
886                 {
887                         elog(emode, "ReadRecord: invalid fragmented record len %u in (%u, %u)",
888                                                 record->xl_len, RecPtr->xlogid, RecPtr->xrecoff);
889                         goto next_record_is_invalid;
890                 }
891                 memcpy(buffer, record, record->xl_len + SizeOfXLogRecord);
892                 record = (XLogRecord*) buffer;
893                 buffer += record->xl_len + SizeOfXLogRecord;
894                 for ( ; ; )
895                 {
896                         readOff++;
897                         if (readOff == XLogSegSize / BLCKSZ)
898                         {
899                                 readSeg++;
900                                 if (readSeg == XLogLastSeg)
901                                 {
902                                         readSeg = 0;
903                                         readId++;
904                                 }
905                                 close(readFile);
906                                 readOff = 0;
907                                 readFile = XLogFileOpen(readId, readSeg, nextmode);
908                                 if (readFile < 0)
909                                         goto next_record_is_invalid;
910                         }
911                         if (read(readFile, readBuf, BLCKSZ) != BLCKSZ)
912                                 elog(STOP, "ReadRecord: read(logfile %u seg %u off %u) failed: %d", 
913                                                         readId, readSeg, readOff, errno);
914                         if (((XLogPageHeader)readBuf)->xlp_magic != XLOG_PAGE_MAGIC)
915                         {
916                                 elog(emode, "ReadRecord: invalid magic number %u in logfile %u seg %u off %u",
917                                         ((XLogPageHeader)readBuf)->xlp_magic,
918                                         readId, readSeg, readOff);
919                                 goto next_record_is_invalid;
920                         }
921                         if (!(((XLogPageHeader)readBuf)->xlp_info & XLP_FIRST_IS_SUBRECORD))
922                         {
923                                 elog(emode, "ReadRecord: there is no subrecord flag in logfile %u seg %u off %u",
924                                                         readId, readSeg, readOff);
925                                 goto next_record_is_invalid;
926                         }
927                         subrecord = (XLogSubRecord*)((char*) readBuf + SizeOfXLogPHD);
928                         if (subrecord->xl_len == 0 || subrecord->xl_len > 
929                                 (BLCKSZ - SizeOfXLogPHD - SizeOfXLogSubRecord))
930                         {
931                                 elog(emode, "ReadRecord: invalid subrecord len %u in logfile %u seg %u off %u",
932                                                         subrecord->xl_len, readId, readSeg, readOff);
933                                 goto next_record_is_invalid;
934                         }
935                         len += subrecord->xl_len;
936                         if (len > MAXLOGRECSZ)
937                         {
938                                 elog(emode, "ReadRecord: too long record len %u in (%u, %u)",
939                                                         len, RecPtr->xlogid, RecPtr->xrecoff);
940                                 goto next_record_is_invalid;
941                         }
942                         memcpy(buffer, (char*)subrecord + SizeOfXLogSubRecord, subrecord->xl_len);
943                         buffer += subrecord->xl_len;
944                         if (subrecord->xl_info & XLR_TO_BE_CONTINUED)
945                         {
946                                 if (subrecord->xl_len + 
947                                         SizeOfXLogPHD + SizeOfXLogSubRecord != BLCKSZ)
948                                 {
949                                         elog(emode, "ReadRecord: invalid fragmented subrecord len %u in logfile %u seg %u off %u",
950                                                                 subrecord->xl_len, readId, readSeg, readOff);
951                                         goto next_record_is_invalid;
952                                 }
953                                 continue;
954                         }
955                         break;
956                 }
957                 if (BLCKSZ - SizeOfXLogRecord >= 
958                         subrecord->xl_len + SizeOfXLogPHD + SizeOfXLogSubRecord)
959                 {
960                         nextRecord = (XLogRecord*)
961                                 ((char*)subrecord + subrecord->xl_len + SizeOfXLogSubRecord);
962                 }
963                 EndRecPtr.xlogid = readId;
964                 EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff * BLCKSZ + 
965                         SizeOfXLogPHD + SizeOfXLogSubRecord + subrecord->xl_len;
966                 ReadRecPtr = *RecPtr;
967                 return(record);
968         }
969         if (BLCKSZ - SizeOfXLogRecord >= 
970                 record->xl_len + RecPtr->xrecoff % BLCKSZ + SizeOfXLogRecord)
971         {
972                 nextRecord = (XLogRecord*)((char*)record + record->xl_len + SizeOfXLogRecord);
973         }
974         EndRecPtr.xlogid = RecPtr->xlogid;
975         EndRecPtr.xrecoff = RecPtr->xrecoff + record->xl_len + SizeOfXLogRecord;
976         ReadRecPtr = *RecPtr;
977
978         return(record);
979
980 next_record_is_invalid:;
981         close(readFile);
982         readFile = -1;
983         nextRecord = NULL;
984         memset(buffer, 0, SizeOfXLogRecord);
985         record = (XLogRecord*) buffer;
986         /*
987          * If we assumed that next record began on the same page where
988          * previous one ended - zero end of page.
989          */
990         if (XLByteEQ(tmpRecPtr, EndRecPtr))
991         {
992                 Assert (EndRecPtr.xrecoff % BLCKSZ > (SizeOfXLogPHD + SizeOfXLogSubRecord) && 
993                                 BLCKSZ - EndRecPtr.xrecoff % BLCKSZ >= SizeOfXLogRecord);
994                 readId = EndRecPtr.xlogid;
995                 readSeg = EndRecPtr.xrecoff / XLogSegSize;
996                 readOff = (EndRecPtr.xrecoff % XLogSegSize) / BLCKSZ;
997                 elog(LOG, "Formating logfile %u seg %u block %u at offset %u",
998                                         readId, readSeg, readOff, EndRecPtr.xrecoff % BLCKSZ);
999                 readFile = XLogFileOpen(readId, readSeg, false);
1000                 if (lseek(readFile, (off_t)(readOff * BLCKSZ), SEEK_SET) < 0)
1001                         elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d", 
1002                                                 readId, readSeg, readOff, errno);
1003                 if (read(readFile, readBuf, BLCKSZ) != BLCKSZ)
1004                         elog(STOP, "ReadRecord: read(logfile %u seg %u off %u) failed: %d", 
1005                                                 readId, readSeg, readOff, errno);
1006                 memset(readBuf + EndRecPtr.xrecoff % BLCKSZ, 0, 
1007                                 BLCKSZ - EndRecPtr.xrecoff % BLCKSZ);
1008                 if (lseek(readFile, (off_t)(readOff * BLCKSZ), SEEK_SET) < 0)
1009                         elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d", 
1010                                                 readId, readSeg, readOff, errno);
1011                 if (write(readFile, readBuf, BLCKSZ) != BLCKSZ)
1012                         elog(STOP, "ReadRecord: write(logfile %u seg %u off %u) failed: %d", 
1013                                                 readId, readSeg, readOff, errno);
1014                 readOff++;
1015         }
1016         else
1017         {
1018                 Assert (EndRecPtr.xrecoff % BLCKSZ == 0 || 
1019                                 BLCKSZ - EndRecPtr.xrecoff % BLCKSZ < SizeOfXLogRecord);
1020                 readId = tmpRecPtr.xlogid;
1021                 readSeg = tmpRecPtr.xrecoff / XLogSegSize;
1022                 readOff = (tmpRecPtr.xrecoff % XLogSegSize) / BLCKSZ;
1023                 Assert(readOff > 0);
1024         }
1025         if (readOff > 0)
1026         {
1027                 if (!XLByteEQ(tmpRecPtr, EndRecPtr))
1028                         elog(LOG, "Formating logfile %u seg %u block %u at offset 0",
1029                                                 readId, readSeg, readOff);
1030                 readOff *= BLCKSZ;
1031                 memset(readBuf, 0, BLCKSZ);
1032                 readFile = XLogFileOpen(readId, readSeg, false);
1033                 if (lseek(readFile, (off_t)readOff, SEEK_SET) < 0)
1034                         elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d", 
1035                                                 readId, readSeg, readOff, errno);
1036                 while (readOff < XLogSegSize)
1037                 {
1038                         if (write(readFile, readBuf, BLCKSZ) != BLCKSZ)
1039                                 elog(STOP, "ReadRecord: write(logfile %u seg %u off %u) failed: %d", 
1040                                                         readId, readSeg, readOff, errno);
1041                         readOff += BLCKSZ;
1042                 }
1043         }
1044         if (readFile >= 0)
1045         {
1046                 if (fsync(readFile) < 0)
1047                         elog(STOP, "ReadRecord: fsync(logfile %u seg %u) failed: %d", 
1048                                                         readId, readSeg, errno);
1049                 close(readFile);
1050                 readFile = -1;
1051         }
1052
1053         readId = EndRecPtr.xlogid;
1054         readSeg = (EndRecPtr.xrecoff - 1) / XLogSegSize + 1;
1055         elog(LOG, "The last logId/logSeg is (%u, %u)", readId, readSeg - 1);
1056         if (ControlFile->logId != readId || ControlFile->logSeg != readSeg)
1057         {
1058                 elog(LOG, "Set logId/logSeg in control file");
1059                 ControlFile->logId = readId;
1060                 ControlFile->logSeg = readSeg;
1061                 ControlFile->time = time(NULL);
1062                 UpdateControlFile();
1063         }
1064         if (readSeg == XLogLastSeg)
1065         {
1066                 readSeg = 0;
1067                 readId++;
1068         }
1069         {
1070                 char    path[MAXPGPATH+1];
1071
1072                 XLogFileName(path, readId, readSeg);
1073                 unlink(path);
1074         }
1075
1076         return(record);
1077 }
1078
1079 void
1080 UpdateControlFile()
1081 {
1082         int             fd;
1083
1084 tryAgain:
1085         fd = open(ControlFilePath, O_RDWR);
1086         if (fd < 0 && (errno == EMFILE || errno == ENFILE))
1087         {
1088                 fd = errno;
1089                 if (!ReleaseDataFile())
1090                         elog(STOP, "Open(cntlfile) failed: %d (and no one data file can be closed)", 
1091                                                 fd);
1092                 goto tryAgain;
1093         }
1094         if (fd < 0)
1095                 elog(STOP, "Open(cntlfile) failed: %d", errno);
1096
1097         if (write(fd, ControlFile, BLCKSZ) != BLCKSZ)
1098                 elog(STOP, "Write(cntlfile) failed: %d", errno);
1099
1100         if (fsync(fd) != 0)
1101                 elog(STOP, "Fsync(cntlfile) failed: %d", errno);
1102
1103         close(fd);
1104
1105         return;
1106 }
1107
1108 int
1109 XLOGShmemSize()
1110 {
1111         if (XLOGbuffers < MinXLOGbuffers)
1112                 XLOGbuffers = MinXLOGbuffers;
1113
1114         return(sizeof(XLogCtlData) + BLCKSZ * XLOGbuffers + 
1115                         sizeof(XLogRecPtr) * XLOGbuffers + BLCKSZ);
1116 }
1117
1118 void
1119 XLOGShmemInit(void)
1120 {
1121         bool                            found;
1122
1123         if (XLOGbuffers < MinXLOGbuffers)
1124                 XLOGbuffers = MinXLOGbuffers;
1125
1126         ControlFile = (ControlFileData*) 
1127                 ShmemInitStruct("Control File", BLCKSZ, &found);
1128         Assert(!found);
1129         XLogCtl = (XLogCtlData*)
1130                 ShmemInitStruct("XLOG Ctl", sizeof(XLogCtlData) + BLCKSZ * XLOGbuffers + 
1131                                                 sizeof(XLogRecPtr) * XLOGbuffers, &found);
1132         Assert(!found);
1133 }
1134
1135 /*
1136  * This func must be called ONCE on system install
1137  */
1138 void
1139 BootStrapXLOG()
1140 {
1141         int                             fd;
1142         char                    buffer[BLCKSZ];
1143         XLogPageHeader  page = (XLogPageHeader)buffer;
1144         CheckPoint              checkPoint;
1145         XLogRecord         *record;
1146
1147         fd = open(ControlFilePath, O_RDWR|O_CREAT|O_EXCL, S_IRUSR|S_IWUSR);
1148         if (fd < 0)
1149                 elog(STOP, "BootStrapXLOG failed to create control file (%s): %d", 
1150                                         ControlFilePath, errno);
1151
1152         logFile = XLogFileInit(0, 0);
1153
1154         checkPoint.redo.xlogid = 0;
1155         checkPoint.redo.xrecoff = SizeOfXLogPHD;
1156         checkPoint.undo = checkPoint.redo;
1157         checkPoint.nextXid = FirstTransactionId;
1158         checkPoint.nextOid =  BootstrapObjectIdData;
1159
1160         memset(buffer, 0, BLCKSZ);
1161         page->xlp_magic = XLOG_PAGE_MAGIC;
1162         page->xlp_info = 0;
1163         record = (XLogRecord*) ((char*)page + SizeOfXLogPHD);
1164         record->xl_prev.xlogid = 0; record->xl_prev.xrecoff = 0;
1165         record->xl_xact_prev = record->xl_prev;
1166         record->xl_xid = InvalidTransactionId;
1167         record->xl_len = sizeof(checkPoint);
1168         record->xl_info = 0;
1169         record->xl_rmid = RM_XLOG_ID;
1170         memcpy((char*)record + SizeOfXLogRecord, &checkPoint, sizeof(checkPoint));
1171
1172         if (write(logFile, buffer, BLCKSZ) != BLCKSZ)
1173                 elog(STOP, "BootStrapXLOG failed to write logfile: %d", errno);
1174
1175         if (fsync(logFile) != 0)
1176                 elog(STOP, "BootStrapXLOG failed to fsync logfile: %d", errno);
1177
1178         close(logFile);
1179         logFile = -1;
1180
1181         memset(buffer, 0, BLCKSZ);
1182         ControlFile = (ControlFileData*) buffer;
1183         ControlFile->logId = 0;
1184         ControlFile->logSeg = 1;
1185         ControlFile->checkPoint = checkPoint.redo;
1186         ControlFile->time = time(NULL);
1187         ControlFile->state = DB_SHUTDOWNED;
1188         ControlFile->blcksz = BLCKSZ;
1189         ControlFile->relseg_size = RELSEG_SIZE;
1190         ControlFile->catalog_version_no = CATALOG_VERSION_NO;
1191
1192         if (write(fd, buffer, BLCKSZ) != BLCKSZ)
1193                 elog(STOP, "BootStrapXLOG failed to write control file: %d", errno);
1194
1195         if (fsync(fd) != 0)
1196                 elog(STOP, "BootStrapXLOG failed to fsync control file: %d", errno);
1197
1198         close(fd);
1199 }
1200
1201 static char*
1202 str_time(time_t tnow)
1203 {
1204         char   *result = ctime(&tnow);
1205         char   *p = strchr(result, '\n');
1206
1207         if (p != NULL)
1208                 *p = 0;
1209
1210         return(result);
1211 }
1212
1213 /*
1214  * This func must be called ONCE on system startup
1215  */
1216 void
1217 StartupXLOG()
1218 {
1219         XLogCtlInsert      *Insert;
1220         CheckPoint                      checkPoint;
1221         XLogRecPtr                      RecPtr,
1222                                                 LastRec;
1223         XLogRecord                 *record;
1224         char                            buffer[MAXLOGRECSZ+SizeOfXLogRecord];
1225         int                                     fd;
1226         int                                     recovery = 0;
1227         bool                            sie_saved = false;
1228
1229         elog(LOG, "Data Base System is starting up at %s", str_time(time(NULL)));
1230
1231         XLogCtl->xlblocks = (XLogRecPtr*) (((char *)XLogCtl) + sizeof(XLogCtlData));
1232         XLogCtl->pages = ((char *)XLogCtl->xlblocks + sizeof(XLogRecPtr) * XLOGbuffers);
1233         XLogCtl->XLogCacheByte = BLCKSZ * XLOGbuffers;
1234         XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
1235         memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
1236         XLogCtl->LgwrRqst = LgwrRqst;
1237         XLogCtl->LgwrResult = LgwrResult;
1238         XLogCtl->Insert.LgwrResult = LgwrResult;
1239         XLogCtl->Insert.curridx = 0;
1240         XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
1241         XLogCtl->Write.LgwrResult = LgwrResult;
1242         XLogCtl->Write.curridx = 0;
1243         S_INIT_LOCK(&(XLogCtl->insert_lck));
1244         S_INIT_LOCK(&(XLogCtl->info_lck));
1245         S_INIT_LOCK(&(XLogCtl->lgwr_lck));
1246
1247         /*
1248          * Open/read Control file
1249          */
1250 tryAgain:
1251         fd = open(ControlFilePath, O_RDWR);
1252         if (fd < 0 && (errno == EMFILE || errno == ENFILE))
1253         {
1254                 fd = errno;
1255                 if (!ReleaseDataFile())
1256                         elog(STOP, "Open(cntlfile) failed: %d (and no one data file can be closed)", 
1257                                                 fd);
1258                 goto tryAgain;
1259         }
1260         if (fd < 0)
1261                 elog(STOP, "Open(cntlfile) failed: %d", errno);
1262
1263         if (read(fd, ControlFile, BLCKSZ) != BLCKSZ)
1264                 elog(STOP, "Read(cntlfile) failed: %d", errno);
1265
1266         close(fd);
1267
1268         if (ControlFile->logSeg == 0 || 
1269                 ControlFile->time <= 0 || 
1270                 ControlFile->state < DB_SHUTDOWNED || 
1271                 ControlFile->state > DB_IN_PRODUCTION || 
1272                 !XRecOffIsValid(ControlFile->checkPoint.xrecoff))
1273                 elog(STOP, "Control file context is broken");
1274
1275         /* Check for incompatible database */
1276         if (ControlFile->blcksz != BLCKSZ)
1277                 elog(STOP, "database was initialized with BLCKSZ %d,\n\tbut the backend was compiled with BLCKSZ %d.\n\tlooks like you need to initdb.",
1278                          ControlFile->blcksz, BLCKSZ);
1279         if (ControlFile->relseg_size != RELSEG_SIZE)
1280                 elog(STOP, "database was initialized with RELSEG_SIZE %d,\n\tbut the backend was compiled with RELSEG_SIZE %d.\n\tlooks like you need to initdb.",
1281                          ControlFile->relseg_size, RELSEG_SIZE);
1282         if (ControlFile->catalog_version_no != CATALOG_VERSION_NO)
1283                 elog(STOP, "database was initialized with CATALOG_VERSION_NO %d,\n\tbut the backend was compiled with CATALOG_VERSION_NO %d.\n\tlooks like you need to initdb.",
1284                          ControlFile->catalog_version_no, CATALOG_VERSION_NO);
1285
1286         if (ControlFile->state == DB_SHUTDOWNED)
1287                 elog(LOG, "Data Base System was shutdowned at %s",
1288                                         str_time(ControlFile->time));
1289         else if (ControlFile->state == DB_SHUTDOWNING)
1290                 elog(LOG, "Data Base System was interrupted when shutting down at %s",
1291                                         str_time(ControlFile->time));
1292         else if (ControlFile->state == DB_IN_RECOVERY)
1293         {
1294                 elog(LOG, "Data Base System was interrupted being in recovery at %s\n"
1295                                   "\tThis propably means that some data blocks are corrupted\n"
1296                                   "\tAnd you will have to use last backup for recovery",
1297                                         str_time(ControlFile->time));
1298         }
1299         else if (ControlFile->state == DB_IN_PRODUCTION)
1300                 elog(LOG, "Data Base System was interrupted being in production at %s",
1301                                         str_time(ControlFile->time));
1302
1303         LastRec = RecPtr = ControlFile->checkPoint;
1304         if (!XRecOffIsValid(RecPtr.xrecoff))
1305                 elog(STOP, "Invalid checkPoint in control file");
1306         elog(LOG, "CheckPoint record at (%u, %u)", RecPtr.xlogid, RecPtr.xrecoff);
1307
1308         record = ReadRecord(&RecPtr, buffer);
1309         if (record->xl_rmid != RM_XLOG_ID)
1310                 elog(STOP, "Invalid RMID in checkPoint record");
1311         if (record->xl_len != sizeof(checkPoint))
1312                 elog(STOP, "Invalid length of checkPoint record");
1313         checkPoint = *((CheckPoint*)((char*)record + SizeOfXLogRecord));
1314
1315         elog(LOG, "Redo record at (%u, %u); Undo record at (%u, %u)",
1316                                 checkPoint.redo.xlogid, checkPoint.redo.xrecoff,
1317                                 checkPoint.undo.xlogid, checkPoint.undo.xrecoff);
1318         elog(LOG, "NextTransactionId: %u; NextOid: %u",
1319                                 checkPoint.nextXid, checkPoint.nextOid);
1320         if (checkPoint.nextXid < FirstTransactionId || 
1321                 checkPoint.nextOid < BootstrapObjectIdData)
1322 #ifdef XLOG
1323                 elog(STOP, "Invalid NextTransactionId/NextOid");
1324 #else
1325                 elog(LOG, "Invalid NextTransactionId/NextOid");
1326 #endif
1327
1328 #ifdef XLOG
1329         ShmemVariableCache->nextXid = checkPoint.nextXid;
1330         ShmemVariableCache->nextOid = checkPoint.nextOid;
1331 #endif
1332
1333         if (XLByteLT(RecPtr, checkPoint.redo))
1334                 elog(STOP, "Invalid redo in checkPoint record");
1335         if (checkPoint.undo.xrecoff == 0)
1336                 checkPoint.undo = RecPtr;
1337         if (XLByteLT(RecPtr, checkPoint.undo))
1338                 elog(STOP, "Invalid undo in checkPoint record");
1339
1340         if (XLByteLT(checkPoint.undo, RecPtr) || XLByteLT(checkPoint.redo, RecPtr))
1341         {
1342                 if (ControlFile->state == DB_SHUTDOWNED)
1343                         elog(STOP, "Invalid Redo/Undo record in Shutdowned state");
1344                 recovery = 2;
1345         }
1346         else if (ControlFile->state != DB_SHUTDOWNED)
1347                 recovery = 2;
1348
1349         if (recovery > 0)
1350         {
1351                 elog(LOG, "The DataBase system was not properly shutdowned\n"
1352                                         "\tAutomatic recovery is in progress...");
1353                 ControlFile->state = DB_IN_RECOVERY;
1354                 ControlFile->time = time(NULL);
1355                 UpdateControlFile();
1356
1357                 sie_saved = StopIfError;
1358                 StopIfError = true;
1359
1360                 /* Is REDO required ? */
1361                 if (XLByteLT(checkPoint.redo, RecPtr))
1362                         record = ReadRecord(&(checkPoint.redo), buffer);
1363                 else    /* read past CheckPoint record */
1364                         record = ReadRecord(NULL, buffer);
1365
1366                 /* REDO */
1367                 if (record->xl_len != 0)
1368                 {
1369                         elog(LOG, "Redo starts at (%u, %u)", 
1370                                                 ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
1371                         do
1372                         {
1373 #ifdef XLOG
1374                                 if (record->xl_xid >= ShmemVariableCache->nextXid)
1375                                         ShmemVariableCache->nextXid = record->xl_xid + 1;
1376 #endif
1377                                 RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
1378                                 record = ReadRecord(NULL, buffer);
1379                         } while (record->xl_len != 0);
1380                         elog(LOG, "Redo done at (%u, %u)", 
1381                                                 ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
1382                         LastRec = ReadRecPtr;
1383                 }
1384                 else
1385                 {
1386                         elog(LOG, "Redo is not required");
1387                         recovery--;
1388                 }
1389
1390                 /* UNDO */
1391                 RecPtr = ReadRecPtr;
1392                 if (XLByteLT(checkPoint.undo, RecPtr))
1393                 {
1394                         elog(LOG, "Undo starts at (%u, %u)", 
1395                                                 RecPtr.xlogid, RecPtr.xrecoff);
1396                         do
1397                         {
1398                                 record = ReadRecord(&RecPtr, buffer);
1399                                 if (TransactionIdIsValid(record->xl_xid) && 
1400                                         !TransactionIdDidCommit(record->xl_xid))
1401                                         RmgrTable[record->xl_rmid].rm_undo(record);
1402                                 RecPtr = record->xl_prev;
1403                         } while (XLByteLE(checkPoint.undo, RecPtr));
1404                         elog(LOG, "Undo done at (%u, %u)", 
1405                                                 ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
1406                 }
1407                 else
1408                 {
1409                         elog(LOG, "Undo is not required");
1410                         recovery--;
1411                 }
1412         }
1413
1414         /* Init xlog buffer cache */
1415         record = ReadRecord(&LastRec, buffer);
1416         logId = EndRecPtr.xlogid;
1417         logSeg = (EndRecPtr.xrecoff - 1) / XLogSegSize;
1418         logOff = 0;
1419         logFile = XLogFileOpen(logId, logSeg, false);
1420         XLogCtl->xlblocks[0].xlogid = logId;
1421         XLogCtl->xlblocks[0].xrecoff = 
1422                         ((EndRecPtr.xrecoff - 1) / BLCKSZ + 1) * BLCKSZ;
1423         Insert = &XLogCtl->Insert;
1424         memcpy((char*)(Insert->currpage), readBuf, BLCKSZ);
1425         Insert->currpos = ((char*) Insert->currpage) + 
1426                 (EndRecPtr.xrecoff + BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
1427         Insert->PrevRecord = ControlFile->checkPoint;
1428
1429         if (recovery > 0)
1430         {
1431                 int             i;
1432
1433                 /* 
1434                  * Let resource managers know that recovery is done
1435                  */
1436                 for (i = 0; i <= RM_MAX_ID; i++)
1437                         RmgrTable[record->xl_rmid].rm_redo(ReadRecPtr, NULL);
1438                 CreateCheckPoint(true);
1439                 StopIfError = sie_saved;
1440         }
1441
1442         ControlFile->state = DB_IN_PRODUCTION;
1443         ControlFile->time = time(NULL);
1444         UpdateControlFile();
1445
1446         elog(LOG, "Data Base System is in production state at %s", str_time(time(NULL)));
1447
1448         return;
1449 }
1450
1451 /*
1452  * This func must be called ONCE on system shutdown
1453  */
1454 void
1455 ShutdownXLOG()
1456 {
1457
1458         elog(LOG, "Data Base System is shutting down at %s", str_time(time(NULL)));
1459
1460         CreateCheckPoint(true);
1461
1462         elog(LOG, "Data Base System is shutdowned at %s", str_time(time(NULL)));
1463 }
1464
1465 void
1466 CreateCheckPoint(bool shutdown)
1467 {
1468         CheckPoint                      checkPoint;
1469         XLogRecPtr                      recptr;
1470         XLogCtlInsert      *Insert = &XLogCtl->Insert;
1471         uint32                          freespace;
1472         uint16                          curridx;
1473
1474         memset(&checkPoint, 0, sizeof(checkPoint));
1475         if (shutdown)
1476         {
1477                 ControlFile->state = DB_SHUTDOWNING;
1478                 ControlFile->time = time(NULL);
1479                 UpdateControlFile();
1480         }
1481
1482         /* Get REDO record ptr */
1483         while (TAS(&(XLogCtl->insert_lck)))
1484         {
1485                 struct timeval delay = {0, 5000};
1486
1487                 if (shutdown)
1488                         elog(STOP, "XLog insert lock is busy while data base is shutting down");
1489                 (void) select(0, NULL, NULL, NULL, &delay);
1490         }
1491         freespace = ((char*) Insert->currpage) + BLCKSZ - Insert->currpos;
1492         if (freespace < SizeOfXLogRecord)
1493         {
1494                 curridx = NextBufIdx(Insert->curridx);
1495                 if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write))
1496                         InitXLBuffer(curridx);
1497                 else 
1498                         GetFreeXLBuffer();
1499                 freespace = BLCKSZ - SizeOfXLogPHD;
1500         }
1501         else
1502                 curridx = Insert->curridx;
1503         checkPoint.redo.xlogid = XLogCtl->xlblocks[curridx].xlogid;
1504         checkPoint.redo.xrecoff = XLogCtl->xlblocks[curridx].xrecoff - BLCKSZ + 
1505                                                                 Insert->currpos - ((char*) Insert->currpage);
1506         S_UNLOCK(&(XLogCtl->insert_lck));
1507
1508         SpinAcquire(XidGenLockId);
1509         checkPoint.nextXid = ShmemVariableCache->nextXid;
1510         SpinRelease(XidGenLockId);
1511         SpinAcquire(OidGenLockId);
1512         checkPoint.nextOid = ShmemVariableCache->nextOid;
1513         SpinRelease(OidGenLockId);
1514
1515         FlushBufferPool();
1516
1517         /* Get UNDO record ptr */
1518         checkPoint.undo.xrecoff = 0;
1519
1520         if (shutdown && checkPoint.undo.xrecoff != 0)
1521                 elog(STOP, "Active transaction while data base is shutting down");
1522
1523         recptr = XLogInsert(RM_XLOG_ID, (char*)&checkPoint, sizeof(checkPoint), NULL, 0);
1524
1525         if (shutdown && !XLByteEQ(checkPoint.redo, MyLastRecPtr))
1526                 elog(STOP, "XLog concurrent activity while data base is shutting down");
1527
1528         XLogFlush(recptr);
1529
1530         SpinAcquire(ControlFileLockId);
1531         if (shutdown)
1532                 ControlFile->state = DB_SHUTDOWNED;
1533         ControlFile->checkPoint = MyLastRecPtr;
1534         ControlFile->time = time(NULL);
1535         UpdateControlFile();
1536         SpinRelease(ControlFileLockId);
1537
1538         return;
1539 }