]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/xlog.c
428ca7998e85a4a23d41ffa52f891220ded06d75
[postgresql] / src / backend / access / transam / xlog.c
1 #include <fcntl.h>
2 #include <unistd.h>
3 #include <errno.h>
4 #include <sys/stat.h>
5 #include <sys/time.h>
6
7 #include "postgres.h"
8 #include "access/xlog.h"
9 #include "access/xact.h"
10 #include "storage/sinval.h"
11 #include "storage/proc.h"
12 #include "storage/spin.h"
13 #include "storage/s_lock.h"
14
15 void            UpdateControlFile(void);
16 int                     XLOGShmemSize(void);
17 void            XLOGShmemInit(void);
18 void            BootStrapXLOG(void);
19 void            StartupXLOG(void);
20 void            ShutdownXLOG(void);
21 void            CreateCheckPoint(bool shutdown);
22
23 char            XLogDir[MAXPGPATH+1];
24 char            ControlFilePath[MAXPGPATH+1];
25 uint32          XLOGbuffers = 0;
26 XLogRecPtr      MyLastRecPtr = {0, 0};
27 bool            StopIfError = false;
28
29 SPINLOCK        ControlFileLockId;
30 SPINLOCK        XidGenLockId;
31
32 extern bool                             ReleaseDataFile(void);
33
34 extern VariableCache    ShmemVariableCache;
35
36 #define MinXLOGbuffers  4
37
38 typedef struct XLgwrRqst
39 {
40         XLogRecPtr              Write;          /* byte (1-based) to write out */
41         XLogRecPtr              Flush;          /* byte (1-based) to flush */
42 } XLgwrRqst;
43
44 typedef struct XLgwrResult
45 {
46         XLogRecPtr              Write;          /* bytes written out */
47         XLogRecPtr              Flush;          /* bytes flushed */
48 } XLgwrResult;
49
50 typedef struct XLogCtlInsert
51 {
52         XLgwrResult             LgwrResult;
53         XLogRecPtr              PrevRecord;
54         uint16                  curridx;        /* current block index in cache */
55         XLogPageHeader  currpage;
56         char               *currpos;
57 } XLogCtlInsert;
58
59 typedef struct XLogCtlWrite
60 {
61         XLgwrResult             LgwrResult;
62         uint16                  curridx;        /* index of next block to write */
63 } XLogCtlWrite;
64
65 typedef struct XLogCtlData
66 {
67         XLogCtlInsert   Insert;
68         XLgwrRqst               LgwrRqst;
69         XLgwrResult             LgwrResult;
70         XLogCtlWrite    Write;
71         char               *pages;
72         XLogRecPtr         *xlblocks;   /* 1st byte ptr-s + BLCKSZ */
73         uint32                  XLogCacheByte;
74         uint32                  XLogCacheBlck;
75 #ifdef HAS_TEST_AND_SET
76         slock_t                 insert_lck;
77         slock_t                 info_lck;
78         slock_t                 lgwr_lck;
79 #endif
80 } XLogCtlData;
81
82 static XLogCtlData                 *XLogCtl = NULL;
83
84 typedef enum DBState
85 {
86         DB_STARTUP = 0,
87         DB_SHUTDOWNED,
88         DB_SHUTDOWNING,
89         DB_IN_RECOVERY,
90         DB_IN_PRODUCTION
91 } DBState;
92
93 typedef struct ControlFileData
94 {
95         uint32                  logId;                  /* current log file id */
96         uint32                  logSeg;                 /* current log file segment (1-based) */
97         XLogRecPtr              checkPoint;             /* last check point record ptr */
98         time_t                  time;                   /* time stamp of last modification */
99         DBState                 state;                  /* */
100
101         /*
102          * following data used to make sure that configurations for this DB
103          * do not conflict with the backend
104          */
105         uint32                  blcksz;                 /* block size for this DB */
106         uint32                  relseg_size;            /* segmented file's block number */
107         /* MORE DATA FOLLOWS AT THE END OF THIS STRUCTURE
108          * - locations of data dirs 
109          */
110 } ControlFileData;
111
112 static ControlFileData     *ControlFile = NULL;
113
114 typedef struct CheckPoint
115 {
116         XLogRecPtr              redo;           /* next RecPtr available when we */
117                                                                 /* began to create CheckPoint */
118                                                                 /* (i.e. REDO start point) */
119         XLogRecPtr              undo;           /* first record of oldest in-progress */
120                                                                 /* transaction when we started */
121                                                                 /* (i.e. UNDO end point) */
122         TransactionId   nextXid;
123         Oid                             nextOid;
124 } CheckPoint;
125
126 /* 
127  * We break each log file in 16Mb segments 
128  */
129 #define XLogSegSize             (16*1024*1024)
130 #define XLogLastSeg             (0xffffffff / XLogSegSize)
131 #define XLogFileSize    (XLogLastSeg * XLogSegSize)
132
133 #define XLogFileName(path, log, seg)    \
134                         sprintf(path, "%.*s%c%08X%08X",         \
135                         MAXPGPATH, XLogDir, SEP_CHAR, log, seg)
136
137 #define PrevBufIdx(curridx)             \
138                 ((curridx == 0) ? XLogCtl->XLogCacheBlck : (curridx - 1))
139
140 #define NextBufIdx(curridx)             \
141                 ((curridx == XLogCtl->XLogCacheBlck) ? 0 : (curridx + 1))
142
143 #define XLByteLT(left, right)           \
144                         (right.xlogid > left.xlogid || \
145                         (right.xlogid == left.xlogid && right.xrecoff > left.xrecoff))
146
147 #define XLByteLE(left, right)           \
148                         (right.xlogid > left.xlogid || \
149                         (right.xlogid == left.xlogid && right.xrecoff >=  left.xrecoff))
150
151 #define XLByteEQ(left, right)           \
152                         (right.xlogid == left.xlogid && right.xrecoff ==  left.xrecoff)
153
154 #define InitXLBuffer(curridx)   (\
155                                 XLogCtl->xlblocks[curridx].xrecoff = \
156                                 (XLogCtl->xlblocks[Insert->curridx].xrecoff == XLogFileSize) ? \
157                                 BLCKSZ : (XLogCtl->xlblocks[Insert->curridx].xrecoff + BLCKSZ), \
158                                 XLogCtl->xlblocks[curridx].xlogid = \
159                                 (XLogCtl->xlblocks[Insert->curridx].xrecoff == XLogFileSize) ? \
160                                 (XLogCtl->xlblocks[Insert->curridx].xlogid + 1) : \
161                                 XLogCtl->xlblocks[Insert->curridx].xlogid, \
162                                 Insert->curridx = curridx, \
163                                 Insert->currpage = (XLogPageHeader) (XLogCtl->pages + curridx * BLCKSZ), \
164                                 Insert->currpos = \
165                                         ((char*) Insert->currpage) + SizeOfXLogPHD, \
166                                 Insert->currpage->xlp_magic = XLOG_PAGE_MAGIC, \
167                                 Insert->currpage->xlp_info = 0 \
168                                 )
169
170 #define XRecOffIsValid(xrecoff) \
171                 (xrecoff % BLCKSZ >= SizeOfXLogPHD && \
172                 (BLCKSZ - xrecoff % BLCKSZ) >= SizeOfXLogRecord)
173
174 static void                             GetFreeXLBuffer(void);
175 static void                             XLogWrite(char *buffer);
176 static int                              XLogFileInit(uint32 log, uint32 seg);
177 static int                              XLogFileOpen(uint32 log, uint32 seg, bool econt);
178 static XLogRecord          *ReadRecord(XLogRecPtr *RecPtr, char *buffer);
179 static char                        *str_time(time_t tnow);
180
181 static XLgwrResult              LgwrResult = {{0, 0}, {0, 0}};
182 static XLgwrRqst                LgwrRqst = {{0, 0}, {0, 0}};
183
184 static int                              logFile = -1;
185 static uint32                   logId = 0;
186 static uint32                   logSeg = 0;
187 static uint32                   logOff = 0;
188
189 static XLogRecPtr               ReadRecPtr;
190 static XLogRecPtr               EndRecPtr;
191 static int                              readFile = -1;
192 static uint32                   readId = 0;
193 static uint32                   readSeg = 0;
194 static uint32                   readOff = 0;
195 static char                             readBuf[BLCKSZ];
196 static XLogRecord          *nextRecord = NULL;
197
198 XLogRecPtr
199 XLogInsert(RmgrId rmid, char *hdr, uint32 hdrlen, char *buf, uint32 buflen)
200 {
201         XLogCtlInsert      *Insert = &XLogCtl->Insert;
202         XLogRecord                 *record;
203         XLogSubRecord      *subrecord;
204         XLogRecPtr                      RecPtr;
205         uint32                          len = hdrlen + buflen,
206                                                 freespace,
207                                                 wlen;
208         uint16                          curridx;
209         bool                            updrqst = false;
210
211         if (len == 0 || len > MAXLOGRECSZ)
212                 elog(STOP, "XLogInsert: invalid record len %u", len);
213
214         /* obtain xlog insert lock */
215         if (TAS(&(XLogCtl->insert_lck)))        /* busy */
216         {
217                 bool            do_lgwr = true;
218                 unsigned        i = 0;
219
220                 for ( ; ; )
221                 {
222                         /* try to read LgwrResult while waiting for insert lock */
223                         if (!TAS(&(XLogCtl->info_lck)))
224                         {
225                                 LgwrRqst = XLogCtl->LgwrRqst;
226                                 LgwrResult = XLogCtl->LgwrResult;
227                                 S_UNLOCK(&(XLogCtl->info_lck));
228                                 /*
229                                  * If cache is half filled then try to acquire lgwr lock
230                                  * and do LGWR work, but only once.
231                                  */
232                                 if (do_lgwr && 
233                                         (LgwrRqst.Write.xlogid != LgwrResult.Write.xlogid || 
234                                         (LgwrRqst.Write.xrecoff - LgwrResult.Write.xrecoff >=
235                                         XLogCtl->XLogCacheByte / 2)))
236                                 {
237                                         if (!TAS(&(XLogCtl->lgwr_lck)))
238                                         {
239                                                 LgwrResult = XLogCtl->Write.LgwrResult;
240                                                 if (!TAS(&(XLogCtl->info_lck)))
241                                                 {
242                                                         LgwrRqst = XLogCtl->LgwrRqst;
243                                                         S_UNLOCK(&(XLogCtl->info_lck));
244                                                 }
245                                                 if (XLByteLT(LgwrResult.Write, LgwrRqst.Write))
246                                                 {
247                                                         XLogWrite(NULL);
248                                                         do_lgwr = false;
249                                                 }
250                                                 S_UNLOCK(&(XLogCtl->lgwr_lck));
251                                         }
252                                 }
253                         }
254                         s_lock_sleep(i++);
255                         if (!TAS(&(XLogCtl->insert_lck)))
256                                 break;
257                 }
258         }
259
260         freespace = ((char*) Insert->currpage) + BLCKSZ - Insert->currpos;
261         if (freespace < SizeOfXLogRecord)
262         {
263                 curridx = NextBufIdx(Insert->curridx);
264                 if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write))
265                         InitXLBuffer(curridx);
266                 else 
267                         GetFreeXLBuffer();
268                 freespace = BLCKSZ - SizeOfXLogPHD;
269         }
270         else
271                 curridx = Insert->curridx;
272
273         freespace -= SizeOfXLogRecord;
274         record = (XLogRecord*) Insert->currpos;
275         record->xl_prev = Insert->PrevRecord;
276         if (rmid != RM_XLOG_ID)
277                 record->xl_xact_prev = MyLastRecPtr;
278         else
279         {
280                 record->xl_xact_prev.xlogid = 0;
281                 record->xl_xact_prev.xrecoff = 0;
282         }
283         record->xl_xid = GetCurrentTransactionId();
284         record->xl_len = (len > freespace) ? freespace : len;
285         record->xl_info = (len > freespace) ? XLR_TO_BE_CONTINUED : 0;
286         record->xl_rmid = rmid;
287         RecPtr.xlogid = XLogCtl->xlblocks[curridx].xlogid;
288         RecPtr.xrecoff = 
289                 XLogCtl->xlblocks[curridx].xrecoff - BLCKSZ + 
290                 Insert->currpos - ((char*) Insert->currpage);
291         if (MyLastRecPtr.xrecoff == 0 && rmid != RM_XLOG_ID)
292         {
293                 SpinAcquire(SInvalLock);
294                 MyProc->logRec = RecPtr;
295                 SpinRelease(SInvalLock);
296         }
297         MyLastRecPtr = RecPtr;
298         RecPtr.xrecoff += record->xl_len;
299         Insert->currpos += SizeOfXLogRecord;
300         if (freespace > 0)
301         {
302                 wlen = (hdrlen > freespace) ? freespace : hdrlen;
303                 memcpy(Insert->currpos, hdr, wlen);
304                 freespace -= wlen;
305                 hdrlen -= wlen;
306                 hdr += wlen;
307                 Insert->currpos += wlen;
308                 if (buflen > 0 && freespace > 0)
309                 {
310                         wlen = (buflen > freespace) ? freespace : buflen;
311                         memcpy(Insert->currpos, buf, wlen);
312                         freespace -= wlen;
313                         buflen -= wlen;
314                         buf += wlen;
315                         Insert->currpos += wlen;
316                 }
317                 Insert->currpos = ((char*)Insert->currpage) + 
318                                         DOUBLEALIGN(Insert->currpos - ((char*)Insert->currpage));
319                 len = hdrlen + buflen;
320         }
321
322         if (len != 0)
323         {
324 nbuf:
325                 curridx = NextBufIdx(curridx);
326                 if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write))
327                 {
328                         InitXLBuffer(curridx);
329                         updrqst = true;
330                 }
331                 else
332                 {
333                         GetFreeXLBuffer();
334                         updrqst = false;
335                 }
336                 freespace = BLCKSZ - SizeOfXLogPHD - SizeOfXLogSubRecord;
337                 Insert->currpage->xlp_info |= XLP_FIRST_IS_SUBRECORD;
338                 subrecord = (XLogSubRecord*) Insert->currpos;
339                 Insert->currpos += SizeOfXLogSubRecord;
340                 if (hdrlen > freespace)
341                 {
342                         subrecord->xl_len = freespace;
343                         subrecord->xl_info = XLR_TO_BE_CONTINUED;
344                         memcpy(Insert->currpos, hdr, freespace);
345                         hdrlen -= freespace;
346                         hdr += freespace;
347                         goto nbuf;
348                 }
349                 else if (hdrlen > 0)
350                 {
351                         subrecord->xl_len = hdrlen;
352                         memcpy(Insert->currpos, hdr, hdrlen);
353                         Insert->currpos += hdrlen;
354                         freespace -= hdrlen;
355                         hdrlen = 0;
356                 }
357                 else
358                         subrecord->xl_len = 0;
359                 if (buflen > freespace)
360                 {
361                         subrecord->xl_len += freespace;
362                         subrecord->xl_info = XLR_TO_BE_CONTINUED;
363                         memcpy(Insert->currpos, buf, freespace);
364                         buflen -= freespace;
365                         buf += freespace;
366                         goto nbuf;
367                 }
368                 else if (buflen > 0)
369                 {
370                         subrecord->xl_len += buflen;
371                         memcpy(Insert->currpos, buf, buflen);
372                         Insert->currpos += buflen;
373                 }
374                 subrecord->xl_info = 0;
375                 RecPtr.xlogid = XLogCtl->xlblocks[curridx].xlogid;
376                 RecPtr.xrecoff = XLogCtl->xlblocks[curridx].xrecoff - 
377                                 BLCKSZ + SizeOfXLogPHD + subrecord->xl_len;
378                 Insert->currpos = ((char*)Insert->currpage) + 
379                                         DOUBLEALIGN(Insert->currpos - ((char*)Insert->currpage));
380         }
381         freespace = ((char*) Insert->currpage) + BLCKSZ - Insert->currpos;
382         /*
383          * All done! Update global LgwrRqst if some block was filled up.
384          */
385         if (freespace < SizeOfXLogRecord)
386                 updrqst = true;         /* curridx is filled and available for writing out */
387         else
388                 curridx = PrevBufIdx(curridx);
389         LgwrRqst.Write = XLogCtl->xlblocks[curridx];
390
391         S_UNLOCK(&(XLogCtl->insert_lck));
392
393         if (updrqst)
394         {
395                 unsigned        i = 0;
396
397                 for ( ; ; )
398                 {
399                         if (!TAS(&(XLogCtl->info_lck)))
400                         {
401                                 if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrRqst.Write))
402                                         XLogCtl->LgwrRqst.Write = LgwrRqst.Write;
403                                 S_UNLOCK(&(XLogCtl->info_lck));
404                                 break;
405                         }
406                         s_lock_sleep(i++);
407                 }
408         }
409
410         return (RecPtr);
411 }       
412
413 void
414 XLogFlush(XLogRecPtr record)
415 {
416         XLogRecPtr              WriteRqst;
417         char                    buffer[BLCKSZ];
418         char               *usebuf = NULL;
419         unsigned                i = 0;
420         bool                    force_lgwr = false;
421
422         if (XLByteLE(record, LgwrResult.Flush))
423                 return;
424         WriteRqst = LgwrRqst.Write;
425         for ( ; ; )
426         {
427                 /* try to read LgwrResult */
428                 if (!TAS(&(XLogCtl->info_lck)))
429                 {
430                         LgwrResult = XLogCtl->LgwrResult;
431                         if (XLByteLE(record, LgwrResult.Flush))
432                         {
433                                 S_UNLOCK(&(XLogCtl->info_lck));
434                                 return;
435                         }
436                         if (XLByteLT(XLogCtl->LgwrRqst.Flush, record))
437                                 XLogCtl->LgwrRqst.Flush = record;
438                         if (XLByteLT(WriteRqst, XLogCtl->LgwrRqst.Write))
439                         {
440                                 WriteRqst = XLogCtl->LgwrRqst.Write;
441                                 usebuf = NULL;
442                         }
443                         S_UNLOCK(&(XLogCtl->info_lck));
444                 }
445                 /* if something was added to log cache then try to flush this too */
446                 if (!TAS(&(XLogCtl->insert_lck)))
447                 {
448                         XLogCtlInsert      *Insert = &XLogCtl->Insert;
449                         uint32                          freespace = 
450                                         ((char*) Insert->currpage) + BLCKSZ - Insert->currpos;
451
452                         if (freespace < SizeOfXLogRecord)       /* buffer is full */
453                         {
454                                 usebuf = NULL;
455                                 LgwrRqst.Write = WriteRqst = XLogCtl->xlblocks[Insert->curridx];
456                         }
457                         else
458                         {
459                                 usebuf = buffer;
460                                 memcpy(usebuf, Insert->currpage, BLCKSZ - freespace);
461                                 memset(usebuf + BLCKSZ - freespace, 0, freespace);
462                                 WriteRqst = XLogCtl->xlblocks[Insert->curridx];
463                                 WriteRqst.xrecoff = WriteRqst.xrecoff - BLCKSZ + 
464                                                 Insert->currpos - ((char*) Insert->currpage);
465                         }
466                         S_UNLOCK(&(XLogCtl->insert_lck));
467                         force_lgwr = true;
468                 }
469                 if (force_lgwr || WriteRqst.xlogid > record.xlogid || 
470                         (WriteRqst.xlogid == record.xlogid && 
471                          WriteRqst.xrecoff >= record.xrecoff + BLCKSZ))
472                 {
473                         if (!TAS(&(XLogCtl->lgwr_lck)))
474                         {
475                                 LgwrResult = XLogCtl->Write.LgwrResult;
476                                 if (XLByteLE(record, LgwrResult.Flush))
477                                 {
478                                         S_UNLOCK(&(XLogCtl->lgwr_lck));
479                                         return;
480                                 }
481                                 if (XLByteLT(LgwrResult.Write, WriteRqst))
482                                 {
483                                         LgwrRqst.Flush = LgwrRqst.Write = WriteRqst;
484                                         XLogWrite(usebuf);
485                                         S_UNLOCK(&(XLogCtl->lgwr_lck));
486                                         if (XLByteLT(LgwrResult.Flush, record))
487                                                 elog(STOP, "XLogFlush: request is not satisfyed");
488                                         return;
489                                 }
490                                 break;
491                         }
492                 }
493                 s_lock_sleep(i++);
494         }
495
496         if (logFile >= 0 && (LgwrResult.Write.xlogid != logId || 
497                 (LgwrResult.Write.xrecoff - 1) / XLogSegSize != logSeg))
498         {
499                 if (close(logFile) != 0)
500                         elog(STOP, "Close(logfile %u seg %u) failed: %d", 
501                                                 logId, logSeg, errno);
502                 logFile = -1;
503         }
504
505         if (logFile < 0)
506         {
507                 logId = LgwrResult.Write.xlogid;
508                 logSeg = (LgwrResult.Write.xrecoff - 1) / XLogSegSize;
509                 logOff = 0;
510                 logFile = XLogFileOpen(logId, logSeg, false);
511         }
512
513         if (fsync(logFile) != 0)
514                 elog(STOP, "Fsync(logfile %u seg %u) failed: %d", 
515                                         logId, logSeg, errno);
516         LgwrResult.Flush = LgwrResult.Write;
517
518         for (i = 0; ; )
519         {
520                 if (!TAS(&(XLogCtl->info_lck)))
521                 {
522                         XLogCtl->LgwrResult = LgwrResult;
523                         if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrResult.Write))
524                                 XLogCtl->LgwrRqst.Write = LgwrResult.Write;
525                         S_UNLOCK(&(XLogCtl->info_lck));
526                         break;
527                 }
528                 s_lock_sleep(i++);
529         }
530         XLogCtl->Write.LgwrResult = LgwrResult;
531
532         S_UNLOCK(&(XLogCtl->lgwr_lck));
533         return;
534
535 }
536
537 static void
538 GetFreeXLBuffer()
539 {
540         XLogCtlInsert      *Insert = &XLogCtl->Insert;
541         XLogCtlWrite       *Write = &XLogCtl->Write;
542         uint16                          curridx = NextBufIdx(Insert->curridx);
543
544         LgwrRqst.Write = XLogCtl->xlblocks[Insert->curridx];
545         for ( ; ; )
546         {
547                 if (!TAS(&(XLogCtl->info_lck)))
548                 {
549                         LgwrResult = XLogCtl->LgwrResult;
550                         XLogCtl->LgwrRqst.Write = LgwrRqst.Write;
551                         S_UNLOCK(&(XLogCtl->info_lck));
552                         if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write))
553                         {
554                                 Insert->LgwrResult = LgwrResult;
555                                 InitXLBuffer(curridx);
556                                 return;
557                         }
558                 }
559                 /*
560                  * LgwrResult lock is busy or un-updated. Try to acquire lgwr lock
561                  * and write full blocks.
562                  */
563                 if (!TAS(&(XLogCtl->lgwr_lck)))
564                 {
565                         LgwrResult = Write->LgwrResult;
566                         if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write))
567                         {
568                                 S_UNLOCK(&(XLogCtl->lgwr_lck));
569                                 Insert->LgwrResult = LgwrResult;
570                                 InitXLBuffer(curridx);
571                                 return;
572                         }
573                         /* 
574                          * Have to write buffers while holding insert lock -
575                          * not good...
576                          */
577                         XLogWrite(NULL);
578                         S_UNLOCK(&(XLogCtl->lgwr_lck));
579                         Insert->LgwrResult = LgwrResult;
580                         InitXLBuffer(curridx);
581                         return;
582                 }
583         }
584
585         return;
586 }
587
588 static void
589 XLogWrite(char *buffer)
590 {
591         XLogCtlWrite   *Write = &XLogCtl->Write;
592         char               *from;
593         uint32                  wcnt = 0;
594         int                             i = 0;
595
596         for ( ; XLByteLT(LgwrResult.Write,      LgwrRqst.Write); )
597         {
598                 LgwrResult.Write = XLogCtl->xlblocks[Write->curridx];
599                 if (LgwrResult.Write.xlogid != logId || 
600                         (LgwrResult.Write.xrecoff - 1) / XLogSegSize != logSeg)
601                 {
602                         if (wcnt > 0)
603                         {
604                                 if (fsync(logFile) != 0)
605                                         elog(STOP, "Fsync(logfile %u seg %u) failed: %d", 
606                                                                 logId, logSeg, errno);
607                                 if (LgwrResult.Write.xlogid != logId)
608                                         LgwrResult.Flush.xrecoff = XLogFileSize;
609                                 else
610                                         LgwrResult.Flush.xrecoff = LgwrResult.Write.xrecoff - BLCKSZ;
611                                 LgwrResult.Flush.xlogid = logId;
612                                 if (!TAS(&(XLogCtl->info_lck)))
613                                 {
614                                         XLogCtl->LgwrResult.Flush = LgwrResult.Flush;
615                                         XLogCtl->LgwrResult.Write = LgwrResult.Flush;
616                                         if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrResult.Flush))
617                                                 XLogCtl->LgwrRqst.Write = LgwrResult.Flush;
618                                         if (XLByteLT(XLogCtl->LgwrRqst.Flush, LgwrResult.Flush))
619                                                 XLogCtl->LgwrRqst.Flush = LgwrResult.Flush;
620                                         S_UNLOCK(&(XLogCtl->info_lck));
621                                 }
622                         }
623                         if (logFile >= 0)
624                         {
625                                 if (close(logFile) != 0)
626                                         elog(STOP, "Close(logfile %u seg %u) failed: %d", 
627                                                                 logId, logSeg, errno);
628                                 logFile = -1;
629                         }
630                         logId = LgwrResult.Write.xlogid;
631                         logSeg = (LgwrResult.Write.xrecoff - 1) / XLogSegSize;
632                         logOff = 0;
633                         logFile = XLogFileInit(logId, logSeg);
634                         SpinAcquire(ControlFileLockId);
635                         ControlFile->logId = logId;
636                         ControlFile->logSeg = logSeg + 1;
637                         ControlFile->time = time(NULL);
638                         UpdateControlFile();
639                         SpinRelease(ControlFileLockId);
640                 }
641
642                 if (logFile < 0)
643                 {
644                         logId = LgwrResult.Write.xlogid;
645                         logSeg = (LgwrResult.Write.xrecoff - 1) / XLogSegSize;
646                         logOff = 0;
647                         logFile = XLogFileOpen(logId, logSeg, false);
648                 }
649
650                 if (logOff != (LgwrResult.Write.xrecoff - BLCKSZ) % XLogSegSize)
651                 {
652                         logOff = (LgwrResult.Write.xrecoff - BLCKSZ) % XLogSegSize;
653                         if (lseek(logFile, (off_t)logOff, SEEK_SET) < 0)
654                                 elog(STOP, "Lseek(logfile %u seg %u off %u) failed: %d", 
655                                                         logId, logSeg, logOff, errno);
656                 }
657
658                 if (buffer != NULL && XLByteLT(LgwrRqst.Write, LgwrResult.Write))
659                         from = buffer;
660                 else
661                         from = XLogCtl->pages + Write->curridx * BLCKSZ;
662
663                 if (write(logFile, from, BLCKSZ) != BLCKSZ)
664                         elog(STOP, "Write(logfile %u seg %u off %u) failed: %d", 
665                                                 logId, logSeg, logOff, errno);
666
667                 wcnt++;
668                 logOff += BLCKSZ;
669
670                 if (from != buffer)
671                         Write->curridx = NextBufIdx(Write->curridx);
672                 else
673                         LgwrResult.Write = LgwrRqst.Write;
674         }
675         if (wcnt == 0)
676                 elog(STOP, "XLogWrite: nothing written");
677
678         if (XLByteLT(LgwrResult.Flush, LgwrRqst.Flush) && 
679                 XLByteLE(LgwrRqst.Flush, LgwrResult.Write))
680         {
681                 if (fsync(logFile) != 0)
682                         elog(STOP, "Fsync(logfile %u seg %u) failed: %d", 
683                                                 logId, logSeg, errno);
684                 LgwrResult.Flush = LgwrResult.Write;
685         }
686
687         for ( ; ; )
688         {
689                 if (!TAS(&(XLogCtl->info_lck)))
690                 {
691                         XLogCtl->LgwrResult = LgwrResult;
692                         if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrResult.Write))
693                                 XLogCtl->LgwrRqst.Write = LgwrResult.Write;
694                         S_UNLOCK(&(XLogCtl->info_lck));
695                         break;
696                 }
697                 s_lock_sleep(i++);
698         }
699         Write->LgwrResult = LgwrResult;
700 }
701
702 static int
703 XLogFileInit(uint32 log, uint32 seg)
704 {
705         char    path[MAXPGPATH+1];
706         int             fd;
707
708         XLogFileName(path, log, seg);
709         unlink(path);
710
711 tryAgain:
712         fd = open(path, O_RDWR|O_CREAT|O_EXCL, S_IRUSR|S_IWUSR);
713         if (fd < 0 && (errno == EMFILE || errno == ENFILE))
714         {
715                 fd = errno;
716                 if (!ReleaseDataFile())
717                         elog(STOP, "Create(logfile %u seg %u) failed: %d (and no one data file can be closed)", 
718                                                 logId, logSeg, fd);
719                 goto tryAgain;
720         }
721         if (fd < 0)
722                 elog(STOP, "Init(logfile %u seg %u) failed: %d", 
723                                         logId, logSeg, errno);
724
725         if (lseek(fd, XLogSegSize - 1, SEEK_SET) != (off_t) (XLogSegSize - 1))
726                 elog(STOP, "Lseek(logfile %u seg %u) failed: %d", 
727                                         logId, logSeg, errno);
728
729         if (write(fd, "", 1) != 1)
730                 elog(STOP, "Init(logfile %u seg %u) failed: %d", 
731                                         logId, logSeg, errno);
732
733         if (fsync(fd) != 0)
734                 elog(STOP, "Fsync(logfile %u seg %u) failed: %d", 
735                                         logId, logSeg, errno);
736
737         if (lseek(fd, 0, SEEK_SET) < 0)
738                         elog(STOP, "Lseek(logfile %u seg %u off %u) failed: %d", 
739                                                 log, seg, 0, errno);
740
741         return(fd);
742 }
743
744 static int
745 XLogFileOpen(uint32 log, uint32 seg, bool econt)
746 {
747         char    path[MAXPGPATH+1];
748         int             fd;
749
750         XLogFileName(path, log, seg);
751
752 tryAgain:
753         fd = open(path, O_RDWR);
754         if (fd < 0 && (errno == EMFILE || errno == ENFILE))
755         {
756                 fd = errno;
757                 if (!ReleaseDataFile())
758                         elog(STOP, "Open(logfile %u seg %u) failed: %d (and no one data file can be closed)", 
759                                                 logId, logSeg, fd);
760                 goto tryAgain;
761         }
762         if (fd < 0)
763         {
764                 if (econt && errno == ENOENT)
765                 {
766                         elog(LOG, "Open(logfile %u seg %u) failed: file doesn't exist",
767                                                 logId, logSeg);
768                         return (fd);
769                 }
770                 elog(STOP, "Open(logfile %u seg %u) failed: %d", 
771                                         logId, logSeg, errno);
772         }
773
774         return(fd);
775 }
776
777 static XLogRecord*
778 ReadRecord(XLogRecPtr *RecPtr, char *buffer)
779 {
780         XLogRecord         *record;
781         XLogRecPtr              tmpRecPtr = EndRecPtr;
782         bool                    nextmode = (RecPtr == NULL);
783         int                             emode = (nextmode) ? LOG : STOP;
784         bool                    noBlck = false;
785
786         if (nextmode)
787         {
788                 RecPtr = &tmpRecPtr;
789                 if (nextRecord != NULL)
790                 {
791                         record = nextRecord;
792                         goto got_record;
793                 }
794                 if (tmpRecPtr.xrecoff % BLCKSZ != 0)
795                         tmpRecPtr.xrecoff += (BLCKSZ - tmpRecPtr.xrecoff % BLCKSZ);
796                 if (tmpRecPtr.xrecoff >= XLogFileSize)
797                 {
798                         (tmpRecPtr.xlogid)++;
799                         tmpRecPtr.xrecoff = 0;
800                 }
801                 tmpRecPtr.xrecoff += SizeOfXLogPHD;
802         }
803         else if (!XRecOffIsValid(RecPtr->xrecoff))
804                 elog(STOP, "ReadRecord: invalid record offset in (%u, %u)",
805                                         RecPtr->xlogid, RecPtr->xrecoff);
806
807         if (readFile >= 0 && (RecPtr->xlogid != readId || 
808                 RecPtr->xrecoff / XLogSegSize != readSeg))
809         {
810                 close(readFile);
811                 readFile = -1;
812         }
813         readId = RecPtr->xlogid;
814         readSeg = RecPtr->xrecoff / XLogSegSize;
815         if (readFile < 0)
816         {
817                 noBlck = true;
818                 readFile = XLogFileOpen(readId, readSeg, nextmode);
819                 if (readFile < 0)
820                         goto next_record_is_invalid;
821         }
822
823         if (noBlck || readOff != (RecPtr->xrecoff % XLogSegSize) / BLCKSZ)
824         {
825                 readOff = (RecPtr->xrecoff % XLogSegSize) / BLCKSZ;
826                 if (lseek(readFile, (off_t)(readOff * BLCKSZ), SEEK_SET) < 0)
827                         elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d", 
828                                                 readId, readSeg, readOff, errno);
829                 if (read(readFile, readBuf, BLCKSZ) != BLCKSZ)
830                         elog(STOP, "ReadRecord: read(logfile %u seg %u off %u) failed: %d", 
831                                                 readId, readSeg, readOff, errno);
832                 if (((XLogPageHeader)readBuf)->xlp_magic != XLOG_PAGE_MAGIC)
833                 {
834                         elog(emode, "ReadRecord: invalid magic number %u in logfile %u seg %u off %u",
835                                 ((XLogPageHeader)readBuf)->xlp_magic,
836                                 readId, readSeg, readOff);
837                         goto next_record_is_invalid;
838                 }
839         }
840         if ((((XLogPageHeader)readBuf)->xlp_info & XLP_FIRST_IS_SUBRECORD) && 
841                 RecPtr->xrecoff % BLCKSZ == SizeOfXLogPHD)
842         {
843                 elog(emode, "ReadRecord: subrecord is requested by (%u, %u)",
844                                         RecPtr->xlogid, RecPtr->xrecoff);
845                 goto next_record_is_invalid;
846         }
847         record = (XLogRecord*)((char*) readBuf + RecPtr->xrecoff % BLCKSZ);
848
849 got_record:;
850         if (record->xl_len == 0 || record->xl_len > 
851                 (BLCKSZ - RecPtr->xrecoff % BLCKSZ - SizeOfXLogRecord))
852         {
853                 elog(emode, "ReadRecord: invalid record len %u in (%u, %u)",
854                                         record->xl_len, RecPtr->xlogid, RecPtr->xrecoff);
855                 goto next_record_is_invalid;
856         }
857         if (record->xl_rmid > RM_MAX_ID)
858         {
859                 elog(emode, "ReadRecord: invalid resource managed id %u in (%u, %u)",
860                                         record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff);
861                 goto next_record_is_invalid;
862         }
863         nextRecord = NULL;
864         if (record->xl_info & XLR_TO_BE_CONTINUED)
865         {
866                 XLogSubRecord      *subrecord;
867                 uint32                          len = record->xl_len;
868
869                 if (record->xl_len + RecPtr->xrecoff % BLCKSZ + SizeOfXLogRecord != BLCKSZ)
870                 {
871                         elog(emode, "ReadRecord: invalid fragmented record len %u in (%u, %u)",
872                                                 record->xl_len, RecPtr->xlogid, RecPtr->xrecoff);
873                         goto next_record_is_invalid;
874                 }
875                 memcpy(buffer, record, record->xl_len + SizeOfXLogRecord);
876                 record = (XLogRecord*) buffer;
877                 buffer += record->xl_len + SizeOfXLogRecord;
878                 for ( ; ; )
879                 {
880                         readOff++;
881                         if (readOff == XLogSegSize / BLCKSZ)
882                         {
883                                 readSeg++;
884                                 if (readSeg == XLogLastSeg)
885                                 {
886                                         readSeg = 0;
887                                         readId++;
888                                 }
889                                 close(readFile);
890                                 readOff = 0;
891                                 readFile = XLogFileOpen(readId, readSeg, nextmode);
892                                 if (readFile < 0)
893                                         goto next_record_is_invalid;
894                         }
895                         if (read(readFile, readBuf, BLCKSZ) != BLCKSZ)
896                                 elog(STOP, "ReadRecord: read(logfile %u seg %u off %u) failed: %d", 
897                                                         readId, readSeg, readOff, errno);
898                         if (((XLogPageHeader)readBuf)->xlp_magic != XLOG_PAGE_MAGIC)
899                         {
900                                 elog(emode, "ReadRecord: invalid magic number %u in logfile %u seg %u off %u",
901                                         ((XLogPageHeader)readBuf)->xlp_magic,
902                                         readId, readSeg, readOff);
903                                 goto next_record_is_invalid;
904                         }
905                         if (!(((XLogPageHeader)readBuf)->xlp_info & XLP_FIRST_IS_SUBRECORD))
906                         {
907                                 elog(emode, "ReadRecord: there is no subrecord flag in logfile %u seg %u off %u",
908                                                         readId, readSeg, readOff);
909                                 goto next_record_is_invalid;
910                         }
911                         subrecord = (XLogSubRecord*)((char*) readBuf + SizeOfXLogPHD);
912                         if (subrecord->xl_len == 0 || subrecord->xl_len > 
913                                 (BLCKSZ - SizeOfXLogPHD - SizeOfXLogSubRecord))
914                         {
915                                 elog(emode, "ReadRecord: invalid subrecord len %u in logfile %u seg %u off %u",
916                                                         subrecord->xl_len, readId, readSeg, readOff);
917                                 goto next_record_is_invalid;
918                         }
919                         len += subrecord->xl_len;
920                         if (len > MAXLOGRECSZ)
921                         {
922                                 elog(emode, "ReadRecord: too long record len %u in (%u, %u)",
923                                                         len, RecPtr->xlogid, RecPtr->xrecoff);
924                                 goto next_record_is_invalid;
925                         }
926                         memcpy(buffer, (char*)subrecord + SizeOfXLogSubRecord, subrecord->xl_len);
927                         buffer += subrecord->xl_len;
928                         if (subrecord->xl_info & XLR_TO_BE_CONTINUED)
929                         {
930                                 if (subrecord->xl_len + 
931                                         SizeOfXLogPHD + SizeOfXLogSubRecord != BLCKSZ)
932                                 {
933                                         elog(emode, "ReadRecord: invalid fragmented subrecord len %u in logfile %u seg %u off %u",
934                                                                 subrecord->xl_len, readId, readSeg, readOff);
935                                         goto next_record_is_invalid;
936                                 }
937                                 continue;
938                         }
939                         break;
940                 }
941                 if (BLCKSZ - SizeOfXLogRecord >= 
942                         subrecord->xl_len + SizeOfXLogPHD + SizeOfXLogSubRecord)
943                 {
944                         nextRecord = (XLogRecord*)
945                                 ((char*)subrecord + subrecord->xl_len + SizeOfXLogSubRecord);
946                 }
947                 EndRecPtr.xlogid = readId;
948                 EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff * BLCKSZ + 
949                         SizeOfXLogPHD + SizeOfXLogSubRecord + subrecord->xl_len;
950                 ReadRecPtr = *RecPtr;
951                 return(record);
952         }
953         if (BLCKSZ - SizeOfXLogRecord >= 
954                 record->xl_len + RecPtr->xrecoff % BLCKSZ + SizeOfXLogRecord)
955         {
956                 nextRecord = (XLogRecord*)((char*)record + record->xl_len + SizeOfXLogRecord);
957         }
958         EndRecPtr.xlogid = RecPtr->xlogid;
959         EndRecPtr.xrecoff = RecPtr->xrecoff + record->xl_len + SizeOfXLogRecord;
960         ReadRecPtr = *RecPtr;
961
962         return(record);
963
964 next_record_is_invalid:;
965         close(readFile);
966         readFile = -1;
967         nextRecord = NULL;
968         memset(buffer, 0, SizeOfXLogRecord);
969         record = (XLogRecord*) buffer;
970         /*
971          * If we assumed that next record began on the same page where
972          * previous one ended - zero end of page.
973          */
974         if (XLByteEQ(tmpRecPtr, EndRecPtr))
975         {
976                 Assert (EndRecPtr.xrecoff % BLCKSZ > (SizeOfXLogPHD + SizeOfXLogSubRecord) && 
977                                 BLCKSZ - EndRecPtr.xrecoff % BLCKSZ >= SizeOfXLogRecord);
978                 readId = EndRecPtr.xlogid;
979                 readSeg = EndRecPtr.xrecoff / XLogSegSize;
980                 readOff = (EndRecPtr.xrecoff % XLogSegSize) / BLCKSZ;
981                 elog(LOG, "Formating logfile %u seg %u block %u at offset %u",
982                                         readId, readSeg, readOff, EndRecPtr.xrecoff % BLCKSZ);
983                 readFile = XLogFileOpen(readId, readSeg, false);
984                 if (lseek(readFile, (off_t)(readOff * BLCKSZ), SEEK_SET) < 0)
985                         elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d", 
986                                                 readId, readSeg, readOff, errno);
987                 if (read(readFile, readBuf, BLCKSZ) != BLCKSZ)
988                         elog(STOP, "ReadRecord: read(logfile %u seg %u off %u) failed: %d", 
989                                                 readId, readSeg, readOff, errno);
990                 memset(readBuf + EndRecPtr.xrecoff % BLCKSZ, 0, 
991                                 BLCKSZ - EndRecPtr.xrecoff % BLCKSZ);
992                 if (lseek(readFile, (off_t)(readOff * BLCKSZ), SEEK_SET) < 0)
993                         elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d", 
994                                                 readId, readSeg, readOff, errno);
995                 if (write(readFile, readBuf, BLCKSZ) != BLCKSZ)
996                         elog(STOP, "ReadRecord: write(logfile %u seg %u off %u) failed: %d", 
997                                                 readId, readSeg, readOff, errno);
998                 readOff++;
999         }
1000         else
1001         {
1002                 Assert (EndRecPtr.xrecoff % BLCKSZ == 0 || 
1003                                 BLCKSZ - EndRecPtr.xrecoff % BLCKSZ < SizeOfXLogRecord);
1004                 readId = tmpRecPtr.xlogid;
1005                 readSeg = tmpRecPtr.xrecoff / XLogSegSize;
1006                 readOff = (tmpRecPtr.xrecoff % XLogSegSize) / BLCKSZ;
1007                 Assert(readOff > 0);
1008         }
1009         if (readOff > 0)
1010         {
1011                 if (!XLByteEQ(tmpRecPtr, EndRecPtr))
1012                         elog(LOG, "Formating logfile %u seg %u block %u at offset 0",
1013                                                 readId, readSeg, readOff);
1014                 readOff *= BLCKSZ;
1015                 memset(readBuf, 0, BLCKSZ);
1016                 readFile = XLogFileOpen(readId, readSeg, false);
1017                 if (lseek(readFile, (off_t)readOff, SEEK_SET) < 0)
1018                         elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %d", 
1019                                                 readId, readSeg, readOff, errno);
1020                 while (readOff < XLogSegSize)
1021                 {
1022                         if (write(readFile, readBuf, BLCKSZ) != BLCKSZ)
1023                                 elog(STOP, "ReadRecord: write(logfile %u seg %u off %u) failed: %d", 
1024                                                         readId, readSeg, readOff, errno);
1025                         readOff += BLCKSZ;
1026                 }
1027         }
1028         if (readFile >= 0)
1029         {
1030                 if (fsync(readFile) < 0)
1031                         elog(STOP, "ReadRecord: fsync(logfile %u seg %u) failed: %d", 
1032                                                         readId, readSeg, errno);
1033                 close(readFile);
1034                 readFile = -1;
1035         }
1036
1037         readId = EndRecPtr.xlogid;
1038         readSeg = (EndRecPtr.xrecoff - 1) / XLogSegSize + 1;
1039         elog(LOG, "The last logId/logSeg is (%u, %u)", readId, readSeg - 1);
1040         if (ControlFile->logId != readId || ControlFile->logSeg != readSeg)
1041         {
1042                 elog(LOG, "Set logId/logSeg in control file");
1043                 ControlFile->logId = readId;
1044                 ControlFile->logSeg = readSeg;
1045                 ControlFile->time = time(NULL);
1046                 UpdateControlFile();
1047         }
1048         if (readSeg == XLogLastSeg)
1049         {
1050                 readSeg = 0;
1051                 readId++;
1052         }
1053         {
1054                 char    path[MAXPGPATH+1];
1055
1056                 XLogFileName(path, readId, readSeg);
1057                 unlink(path);
1058         }
1059
1060         return(record);
1061 }
1062
1063 void
1064 UpdateControlFile()
1065 {
1066         int             fd;
1067
1068 tryAgain:
1069         fd = open(ControlFilePath, O_RDWR);
1070         if (fd < 0 && (errno == EMFILE || errno == ENFILE))
1071         {
1072                 fd = errno;
1073                 if (!ReleaseDataFile())
1074                         elog(STOP, "Open(cntlfile) failed: %d (and no one data file can be closed)", 
1075                                                 fd);
1076                 goto tryAgain;
1077         }
1078         if (fd < 0)
1079                 elog(STOP, "Open(cntlfile) failed: %d", errno);
1080
1081         if (write(fd, ControlFile, BLCKSZ) != BLCKSZ)
1082                 elog(STOP, "Write(cntlfile) failed: %d", errno);
1083
1084         if (fsync(fd) != 0)
1085                 elog(STOP, "Fsync(cntlfile) failed: %d", errno);
1086
1087         close(fd);
1088
1089         return;
1090 }
1091
1092 int
1093 XLOGShmemSize()
1094 {
1095         if (XLOGbuffers < MinXLOGbuffers)
1096                 XLOGbuffers = MinXLOGbuffers;
1097
1098         return(sizeof(XLogCtlData) + BLCKSZ * XLOGbuffers + 
1099                         sizeof(XLogRecPtr) * XLOGbuffers + BLCKSZ);
1100 }
1101
1102 void
1103 XLOGShmemInit(void)
1104 {
1105         bool                            found;
1106
1107         if (XLOGbuffers < MinXLOGbuffers)
1108                 XLOGbuffers = MinXLOGbuffers;
1109
1110         ControlFile = (ControlFileData*) 
1111                 ShmemInitStruct("Control File", BLCKSZ, &found);
1112         Assert(!found);
1113         XLogCtl = (XLogCtlData*)
1114                 ShmemInitStruct("XLOG Ctl", sizeof(XLogCtlData) + BLCKSZ * XLOGbuffers + 
1115                                                 sizeof(XLogRecPtr) * XLOGbuffers, &found);
1116         Assert(!found);
1117 }
1118
1119 /*
1120  * This func must be called ONCE on system install
1121  */
1122 void
1123 BootStrapXLOG()
1124 {
1125         int                             fd;
1126         char                    buffer[BLCKSZ];
1127         XLogPageHeader  page = (XLogPageHeader)buffer;
1128         CheckPoint              checkPoint;
1129         XLogRecord         *record;
1130
1131         fd = open(ControlFilePath, O_RDWR|O_CREAT|O_EXCL, S_IRUSR|S_IWUSR);
1132         if (fd < 0)
1133                 elog(STOP, "BootStrapXLOG failed to create control file (%s): %d", 
1134                                         ControlFilePath, errno);
1135
1136         logFile = XLogFileInit(0, 0);
1137
1138         checkPoint.redo.xlogid = 0;
1139         checkPoint.redo.xrecoff = SizeOfXLogPHD;
1140         checkPoint.undo = checkPoint.redo;
1141         checkPoint.nextXid = FirstTransactionId;
1142         checkPoint.nextOid =  BootstrapObjectIdData;
1143
1144         memset(buffer, 0, BLCKSZ);
1145         page->xlp_magic = XLOG_PAGE_MAGIC;
1146         page->xlp_info = 0;
1147         record = (XLogRecord*) ((char*)page + SizeOfXLogPHD);
1148         record->xl_prev.xlogid = 0; record->xl_prev.xrecoff = 0;
1149         record->xl_xact_prev = record->xl_prev;
1150         record->xl_xid = InvalidTransactionId;
1151         record->xl_len = sizeof(checkPoint);
1152         record->xl_info = 0;
1153         record->xl_rmid = RM_XLOG_ID;
1154         memcpy((char*)record + SizeOfXLogRecord, &checkPoint, sizeof(checkPoint));
1155
1156         if (write(logFile, buffer, BLCKSZ) != BLCKSZ)
1157                 elog(STOP, "BootStrapXLOG failed to write logfile: %d", errno);
1158
1159         if (fsync(logFile) != 0)
1160                 elog(STOP, "BootStrapXLOG failed to fsync logfile: %d", errno);
1161
1162         close(logFile);
1163         logFile = -1;
1164
1165         memset(buffer, 0, BLCKSZ);
1166         ControlFile = (ControlFileData*) buffer;
1167         ControlFile->logId = 0;
1168         ControlFile->logSeg = 1;
1169         ControlFile->checkPoint = checkPoint.redo;
1170         ControlFile->time = time(NULL);
1171         ControlFile->state = DB_SHUTDOWNED;
1172         ControlFile->blcksz = BLCKSZ;
1173         ControlFile->relseg_size = RELSEG_SIZE;
1174
1175         if (write(fd, buffer, BLCKSZ) != BLCKSZ)
1176                 elog(STOP, "BootStrapXLOG failed to write control file: %d", errno);
1177
1178         if (fsync(fd) != 0)
1179                 elog(STOP, "BootStrapXLOG failed to fsync control file: %d", errno);
1180
1181         close(fd);
1182
1183         return;
1184
1185 }
1186
1187 static char*
1188 str_time(time_t tnow)
1189 {
1190         char   *result = ctime(&tnow);
1191         char   *p = strchr(result, '\n');
1192
1193         if (p != NULL)
1194                 *p = 0;
1195
1196         return(result);
1197 }
1198
1199 /*
1200  * This func must be called ONCE on system startup
1201  */
1202 void
1203 StartupXLOG()
1204 {
1205         XLogCtlInsert      *Insert;
1206         CheckPoint                      checkPoint;
1207         XLogRecPtr                      RecPtr,
1208                                                 LastRec;
1209         XLogRecord                 *record;
1210         char                            buffer[MAXLOGRECSZ+SizeOfXLogRecord];
1211         int                                     fd;
1212         int                                     recovery = 0;
1213         bool                            sie_saved = false;
1214
1215         elog(LOG, "Data Base System is starting up at %s", str_time(time(NULL)));
1216
1217         XLogCtl->xlblocks = (XLogRecPtr*) (((char *)XLogCtl) + sizeof(XLogCtlData));
1218         XLogCtl->pages = ((char *)XLogCtl->xlblocks + sizeof(XLogRecPtr) * XLOGbuffers);
1219         XLogCtl->XLogCacheByte = BLCKSZ * XLOGbuffers;
1220         XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
1221         memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
1222         XLogCtl->LgwrRqst = LgwrRqst;
1223         XLogCtl->LgwrResult = LgwrResult;
1224         XLogCtl->Insert.LgwrResult = LgwrResult;
1225         XLogCtl->Insert.curridx = 0;
1226         XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
1227         XLogCtl->Write.LgwrResult = LgwrResult;
1228         XLogCtl->Write.curridx = 0;
1229         S_INIT_LOCK(&(XLogCtl->insert_lck));
1230         S_INIT_LOCK(&(XLogCtl->info_lck));
1231         S_INIT_LOCK(&(XLogCtl->lgwr_lck));
1232
1233         /*
1234          * Open/read Control file
1235          */
1236 tryAgain:
1237         fd = open(ControlFilePath, O_RDWR);
1238         if (fd < 0 && (errno == EMFILE || errno == ENFILE))
1239         {
1240                 fd = errno;
1241                 if (!ReleaseDataFile())
1242                         elog(STOP, "Open(cntlfile) failed: %d (and no one data file can be closed)", 
1243                                                 fd);
1244                 goto tryAgain;
1245         }
1246         if (fd < 0)
1247                 elog(STOP, "Open(cntlfile) failed: %d", errno);
1248
1249         if (read(fd, ControlFile, BLCKSZ) != BLCKSZ)
1250                 elog(STOP, "Read(cntlfile) failed: %d", errno);
1251
1252         close(fd);
1253
1254         if (ControlFile->logSeg == 0 || 
1255                 ControlFile->time <= 0 || 
1256                 ControlFile->state < DB_SHUTDOWNED || 
1257                 ControlFile->state > DB_IN_PRODUCTION || 
1258                 !XRecOffIsValid(ControlFile->checkPoint.xrecoff))
1259                 elog(STOP, "Control file context is broken");
1260
1261         if (ControlFile->blcksz != BLCKSZ)
1262                 elog(STOP, "database was initialized in BLCKSZ(%d), but the backend was compiled in BLCKSZ(%d)",ControlFile->blcksz,BLCKSZ);
1263
1264         if (ControlFile->relseg_size != RELSEG_SIZE)
1265                 elog(STOP, "database was initialized in RELSEG_SIZE(%d), but the backend was compiled in RELSEG_SIZE(%d)",ControlFile->relseg_size, RELSEG_SIZE);
1266
1267         if (ControlFile->state == DB_SHUTDOWNED)
1268                 elog(LOG, "Data Base System was shutdowned at %s",
1269                                         str_time(ControlFile->time));
1270         else if (ControlFile->state == DB_SHUTDOWNING)
1271                 elog(LOG, "Data Base System was interrupted when shutting down at %s",
1272                                         str_time(ControlFile->time));
1273         else if (ControlFile->state == DB_IN_RECOVERY)
1274         {
1275                 elog(LOG, "Data Base System was interrupted being in recovery at %s\n"
1276                                   "\tThis propably means that some data blocks are corrupted\n"
1277                                   "\tAnd you will have to use last backup for recovery",
1278                                         str_time(ControlFile->time));
1279         }
1280         else if (ControlFile->state == DB_IN_PRODUCTION)
1281                 elog(LOG, "Data Base System was interrupted being in production at %s",
1282                                         str_time(ControlFile->time));
1283
1284         LastRec = RecPtr = ControlFile->checkPoint;
1285         if (!XRecOffIsValid(RecPtr.xrecoff))
1286                 elog(STOP, "Invalid checkPoint in control file");
1287         elog(LOG, "CheckPoint record at (%u, %u)", RecPtr.xlogid, RecPtr.xrecoff);
1288
1289         record = ReadRecord(&RecPtr, buffer);
1290         if (record->xl_rmid != RM_XLOG_ID)
1291                 elog(STOP, "Invalid RMID in checkPoint record");
1292         if (record->xl_len != sizeof(checkPoint))
1293                 elog(STOP, "Invalid length of checkPoint record");
1294         checkPoint = *((CheckPoint*)((char*)record + SizeOfXLogRecord));
1295
1296         elog(LOG, "Redo record at (%u, %u); Undo record at (%u, %u)",
1297                                 checkPoint.redo.xlogid, checkPoint.redo.xrecoff,
1298                                 checkPoint.undo.xlogid, checkPoint.undo.xrecoff);
1299         elog(LOG, "NextTransactionId: %u; NextOid: %u",
1300                                 checkPoint.nextXid, checkPoint.nextOid);
1301         if (checkPoint.nextXid < FirstTransactionId || 
1302                 checkPoint.nextOid < BootstrapObjectIdData)
1303 #ifdef XLOG
1304                 elog(STOP, "Invalid NextTransactionId/NextOid");
1305 #else
1306                 elog(LOG, "Invalid NextTransactionId/NextOid");
1307 #endif
1308
1309 #ifdef XLOG
1310         ShmemVariableCache->nextXid = checkPoint.nextXid;
1311         ShmemVariableCache->nextOid = checkPoint.nextOid;
1312 #endif
1313
1314         if (XLByteLT(RecPtr, checkPoint.redo))
1315                 elog(STOP, "Invalid redo in checkPoint record");
1316         if (checkPoint.undo.xrecoff == 0)
1317                 checkPoint.undo = RecPtr;
1318         if (XLByteLT(RecPtr, checkPoint.undo))
1319                 elog(STOP, "Invalid undo in checkPoint record");
1320
1321         if (XLByteLT(checkPoint.undo, RecPtr) || XLByteLT(checkPoint.redo, RecPtr))
1322         {
1323                 if (ControlFile->state == DB_SHUTDOWNED)
1324                         elog(STOP, "Invalid Redo/Undo record in Shutdowned state");
1325                 recovery = 2;
1326         }
1327         else if (ControlFile->state != DB_SHUTDOWNED)
1328                 recovery = 2;
1329
1330         if (recovery > 0)
1331         {
1332                 elog(LOG, "The DataBase system was not properly shutdowned\n"
1333                                         "\tAutomatic recovery is in progress...");
1334                 ControlFile->state = DB_IN_RECOVERY;
1335                 ControlFile->time = time(NULL);
1336                 UpdateControlFile();
1337
1338                 sie_saved = StopIfError;
1339                 StopIfError = true;
1340
1341                 /* Is REDO required ? */
1342                 if (XLByteLT(checkPoint.redo, RecPtr))
1343                         record = ReadRecord(&(checkPoint.redo), buffer);
1344                 else    /* read past CheckPoint record */
1345                         record = ReadRecord(NULL, buffer);
1346
1347                 /* REDO */
1348                 if (record->xl_len != 0)
1349                 {
1350                         elog(LOG, "Redo starts at (%u, %u)", 
1351                                                 ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
1352                         do
1353                         {
1354 #ifdef XLOG
1355                                 if (record->xl_xid >= ShmemVariableCache->nextXid)
1356                                         ShmemVariableCache->nextXid = record->xl_xid + 1;
1357 #endif
1358                                 RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
1359                                 record = ReadRecord(NULL, buffer);
1360                         } while (record->xl_len != 0);
1361                         elog(LOG, "Redo done at (%u, %u)", 
1362                                                 ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
1363                         LastRec = ReadRecPtr;
1364                 }
1365                 else
1366                 {
1367                         elog(LOG, "Redo is not required");
1368                         recovery--;
1369                 }
1370
1371                 /* UNDO */
1372                 RecPtr = ReadRecPtr;
1373                 if (XLByteLT(checkPoint.undo, RecPtr))
1374                 {
1375                         elog(LOG, "Undo starts at (%u, %u)", 
1376                                                 RecPtr.xlogid, RecPtr.xrecoff);
1377                         do
1378                         {
1379                                 record = ReadRecord(&RecPtr, buffer);
1380                                 if (TransactionIdIsValid(record->xl_xid) && 
1381                                         !TransactionIdDidCommit(record->xl_xid))
1382                                         RmgrTable[record->xl_rmid].rm_undo(record);
1383                                 RecPtr = record->xl_prev;
1384                         } while (XLByteLE(checkPoint.undo, RecPtr));
1385                         elog(LOG, "Undo done at (%u, %u)", 
1386                                                 ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
1387                 }
1388                 else
1389                 {
1390                         elog(LOG, "Undo is not required");
1391                         recovery--;
1392                 }
1393         }
1394
1395         /* Init xlog buffer cache */
1396         record = ReadRecord(&LastRec, buffer);
1397         logId = EndRecPtr.xlogid;
1398         logSeg = (EndRecPtr.xrecoff - 1) / XLogSegSize;
1399         logOff = 0;
1400         logFile = XLogFileOpen(logId, logSeg, false);
1401         XLogCtl->xlblocks[0].xlogid = logId;
1402         XLogCtl->xlblocks[0].xrecoff = 
1403                         ((EndRecPtr.xrecoff - 1) / BLCKSZ + 1) * BLCKSZ;
1404         Insert = &XLogCtl->Insert;
1405         memcpy((char*)(Insert->currpage), readBuf, BLCKSZ);
1406         Insert->currpos = ((char*) Insert->currpage) + 
1407                 (EndRecPtr.xrecoff + BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
1408         Insert->PrevRecord = ControlFile->checkPoint;
1409
1410         if (recovery > 0)
1411         {
1412                 int             i;
1413
1414                 /* 
1415                  * Let resource managers know that recovery is done
1416                  */
1417                 for (i = 0; i <= RM_MAX_ID; i++)
1418                         RmgrTable[record->xl_rmid].rm_redo(ReadRecPtr, NULL);
1419                 CreateCheckPoint(true);
1420                 StopIfError = sie_saved;
1421         }
1422
1423         ControlFile->state = DB_IN_PRODUCTION;
1424         ControlFile->time = time(NULL);
1425         UpdateControlFile();
1426
1427         elog(LOG, "Data Base System is in production state at %s", str_time(time(NULL)));
1428
1429         return;
1430 }
1431
1432 /*
1433  * This func must be called ONCE on system shutdown
1434  */
1435 void
1436 ShutdownXLOG()
1437 {
1438
1439         elog(LOG, "Data Base System is shutting down at %s", str_time(time(NULL)));
1440
1441         CreateCheckPoint(true);
1442
1443         elog(LOG, "Data Base System is shutdowned at %s", str_time(time(NULL)));
1444 }
1445
1446 void
1447 CreateCheckPoint(bool shutdown)
1448 {
1449         CheckPoint                      checkPoint;
1450         XLogRecPtr                      recptr;
1451         XLogCtlInsert      *Insert = &XLogCtl->Insert;
1452         uint32                          freespace;
1453         uint16                          curridx;
1454
1455         memset(&checkPoint, 0, sizeof(checkPoint));
1456         if (shutdown)
1457         {
1458                 ControlFile->state = DB_SHUTDOWNING;
1459                 ControlFile->time = time(NULL);
1460                 UpdateControlFile();
1461         }
1462
1463         /* Get REDO record ptr */
1464         while (TAS(&(XLogCtl->insert_lck)))
1465         {
1466                 struct timeval delay = {0, 5000};
1467
1468                 if (shutdown)
1469                         elog(STOP, "XLog insert lock is busy while data base is shutting down");
1470                 (void) select(0, NULL, NULL, NULL, &delay);
1471         }
1472         freespace = ((char*) Insert->currpage) + BLCKSZ - Insert->currpos;
1473         if (freespace < SizeOfXLogRecord)
1474         {
1475                 curridx = NextBufIdx(Insert->curridx);
1476                 if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write))
1477                         InitXLBuffer(curridx);
1478                 else 
1479                         GetFreeXLBuffer();
1480                 freespace = BLCKSZ - SizeOfXLogPHD;
1481         }
1482         else
1483                 curridx = Insert->curridx;
1484         checkPoint.redo.xlogid = XLogCtl->xlblocks[curridx].xlogid;
1485         checkPoint.redo.xrecoff = XLogCtl->xlblocks[curridx].xrecoff - BLCKSZ + 
1486                                                                 Insert->currpos - ((char*) Insert->currpage);
1487         S_UNLOCK(&(XLogCtl->insert_lck));
1488
1489         SpinAcquire(XidGenLockId);
1490         checkPoint.nextXid = ShmemVariableCache->nextXid;
1491         SpinRelease(XidGenLockId);
1492         SpinAcquire(OidGenLockId);
1493         checkPoint.nextOid = ShmemVariableCache->nextOid;
1494         SpinRelease(OidGenLockId);
1495
1496         FlushBufferPool();
1497
1498         /* Get UNDO record ptr */
1499         checkPoint.undo.xrecoff = 0;
1500
1501         if (shutdown && checkPoint.undo.xrecoff != 0)
1502                 elog(STOP, "Active transaction while data base is shutting down");
1503
1504         recptr = XLogInsert(RM_XLOG_ID, (char*)&checkPoint, sizeof(checkPoint), NULL, 0);
1505
1506         if (shutdown && !XLByteEQ(checkPoint.redo, MyLastRecPtr))
1507                 elog(STOP, "XLog concurrent activity while data base is shutting down");
1508
1509         XLogFlush(recptr);
1510
1511         SpinAcquire(ControlFileLockId);
1512         if (shutdown)
1513                 ControlFile->state = DB_SHUTDOWNED;
1514         ControlFile->checkPoint = MyLastRecPtr;
1515         ControlFile->time = time(NULL);
1516         UpdateControlFile();
1517         SpinRelease(ControlFileLockId);
1518
1519         return;
1520 }