]> granicus.if.org Git - postgresql/commitdiff
Arrange to recycle old XLOG log segment files as new segment files,
authorTom Lane <tgl@sss.pgh.pa.us>
Thu, 19 Jul 2001 02:12:35 +0000 (02:12 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Thu, 19 Jul 2001 02:12:35 +0000 (02:12 +0000)
rather than deleting them only to have to create more.  Steady state
is 2*CHECKPOINT_SEGMENTS + WAL_FILES + 1 segment files, which will
simply be renamed rather than constantly deleted and recreated.
To make this safe, added current XLOG file/offset number to page
header of XLOG pages, so that an un-overwritten page from an old
incarnation of a logfile can be reliably told from a valid page.
This change means that if you try to restart postmaster in a CVS-tip
database after installing the change, you'll get a complaint about
bad XLOG page magic number.  If you don't want to initdb, run
contrib/pg_resetxlog (and be sure you shut down the old postmaster
cleanly).

contrib/pg_resetxlog/pg_resetxlog.c
src/backend/access/transam/xlog.c
src/include/access/xlog.h

index f8c81b5c55dddcb1ea3983f35eb1cf574dc5f40d..6d32160905d4834c2a0f9e5f392495f6f1b1a4e3 100644 (file)
@@ -23,7 +23,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Header: /cvsroot/pgsql/contrib/pg_resetxlog/Attic/pg_resetxlog.c,v 1.5 2001/06/06 17:07:38 tgl Exp $
+ * $Header: /cvsroot/pgsql/contrib/pg_resetxlog/Attic/pg_resetxlog.c,v 1.6 2001/07/19 02:12:34 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -857,6 +857,10 @@ WriteEmptyXLOG(void)
        page->xlp_magic = XLOG_PAGE_MAGIC;
        page->xlp_info = 0;
        page->xlp_sui = ControlFile.checkPointCopy.ThisStartUpID;
+       page->xlp_pageaddr.xlogid =
+               ControlFile.checkPointCopy.redo.xlogid;
+       page->xlp_pageaddr.xrecoff =
+               ControlFile.checkPointCopy.redo.xrecoff - SizeOfXLogPHD;
        record = (XLogRecord *) ((char *) page + SizeOfXLogPHD);
        record->xl_prev.xlogid = 0;
        record->xl_prev.xrecoff = 0;
index 94ba140b3ad3fa35aab037031e0231d0f2572961..3251fb2afdbe5713b5068cafaa77eeb28d5f9035 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.70 2001/06/21 19:45:45 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.71 2001/07/19 02:12:34 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 /* User-settable parameters */
 int                    CheckPointSegments = 3;
 int                    XLOGbuffers = 8;
-int                    XLOGfiles = 0;          /* how many files to pre-allocate during
-                                                                * ckpt */
+int                    XLOGfiles = 0;          /* # of files to preallocate during ckpt */
 int                    XLOG_DEBUG = 0;
 char      *XLOG_sync_method = NULL;
 const char     XLOG_sync_method_default[] = DEFAULT_SYNC_METHOD_STR;
 char           XLOG_archive_dir[MAXPGPATH];            /* null string means
                                                                                                 * delete 'em */
 
+/*
+ * XLOGfileslop is used in the code as the allowed "fuzz" in the number of 
+ * preallocated XLOG segments --- we try to have at least XLOGfiles advance
+ * segments but no more than XLOGfiles+XLOGfileslop segments.  This could
+ * be made a separate GUC variable, but at present I think it's sufficient
+ * to hardwire it as 2*CheckPointSegments+1.  Under normal conditions, a
+ * checkpoint will free no more than 2*CheckPointSegments log segments, and
+ * we want to recycle all of them; the +1 allows boundary cases to happen
+ * without wasting a delete/create-segment cycle.
+ */
+
+#define XLOGfileslop   (2*CheckPointSegments + 1)
+
+
 /* these are derived from XLOG_sync_method by assign_xlog_sync_method */
 static int     sync_method = DEFAULT_SYNC_METHOD;
 static int     open_sync_bit = DEFAULT_SYNC_FLAGBIT;
 
-#define MinXLOGbuffers 4
-
 #define XLOG_SYNC_BIT  (enableFsync ? open_sync_bit : 0)
 
+#define MinXLOGbuffers 4
+
 
 /*
  * ThisStartUpID will be same in all backends --- it identifies current
@@ -405,9 +418,12 @@ static bool AdvanceXLInsertBuffer(void);
 static void XLogWrite(XLogwrtRqst WriteRqst);
 static int XLogFileInit(uint32 log, uint32 seg,
                         bool *use_existent, bool use_lock);
+static bool InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath,
+                                                                  bool find_free, int max_advance,
+                                                                  bool use_lock);
 static int     XLogFileOpen(uint32 log, uint32 seg, bool econt);
 static void PreallocXlogFiles(XLogRecPtr endptr);
-static void MoveOfflineLogs(uint32 log, uint32 seg);
+static void MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr);
 static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer);
 static bool ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI);
 static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr,
@@ -856,6 +872,8 @@ AdvanceXLInsertBuffer(void)
        bool            update_needed = true;
        XLogRecPtr      OldPageRqstPtr;
        XLogwrtRqst WriteRqst;
+       XLogRecPtr      NewPageEndPtr;
+       XLogPageHeader NewPage;
 
        /* Use Insert->LogwrtResult copy if it's more fresh */
        if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write))
@@ -930,32 +948,35 @@ AdvanceXLInsertBuffer(void)
         * Now the next buffer slot is free and we can set it up to be the
         * next output page.
         */
-       if (XLogCtl->xlblocks[Insert->curridx].xrecoff >= XLogFileSize)
+       NewPageEndPtr = XLogCtl->xlblocks[Insert->curridx];
+       if (NewPageEndPtr.xrecoff >= XLogFileSize)
        {
                /* crossing a logid boundary */
-               XLogCtl->xlblocks[nextidx].xlogid =
-                       XLogCtl->xlblocks[Insert->curridx].xlogid + 1;
-               XLogCtl->xlblocks[nextidx].xrecoff = BLCKSZ;
+               NewPageEndPtr.xlogid += 1;
+               NewPageEndPtr.xrecoff = BLCKSZ;
        }
        else
        {
-               XLogCtl->xlblocks[nextidx].xlogid =
-                       XLogCtl->xlblocks[Insert->curridx].xlogid;
-               XLogCtl->xlblocks[nextidx].xrecoff =
-                       XLogCtl->xlblocks[Insert->curridx].xrecoff + BLCKSZ;
+               NewPageEndPtr.xrecoff += BLCKSZ;
        }
+       XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
+       NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * BLCKSZ);
        Insert->curridx = nextidx;
-       Insert->currpage = (XLogPageHeader) (XLogCtl->pages + nextidx * BLCKSZ);
-       Insert->currpos = ((char *) Insert->currpage) + SizeOfXLogPHD;
+       Insert->currpage = NewPage;
+       Insert->currpos = ((char *) NewPage) + SizeOfXLogPHD;
 
        /*
         * Be sure to re-zero the buffer so that bytes beyond what we've
         * written will look like zeroes and not valid XLOG records...
         */
-       MemSet((char *) Insert->currpage, 0, BLCKSZ);
-       Insert->currpage->xlp_magic = XLOG_PAGE_MAGIC;
-       /* Insert->currpage->xlp_info = 0; *//* done by memset */
-       Insert->currpage->xlp_sui = ThisStartUpID;
+       MemSet((char *) NewPage, 0, BLCKSZ);
+
+       /* And fill the new page's header */
+       NewPage->xlp_magic = XLOG_PAGE_MAGIC;
+       /* NewPage->xlp_info = 0; */                    /* done by memset */
+       NewPage->xlp_sui = ThisStartUpID;
+       NewPage->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid;
+       NewPage->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - BLCKSZ;
 
        return update_needed;
 }
@@ -1273,10 +1294,7 @@ XLogFileInit(uint32 log, uint32 seg,
 {
        char            path[MAXPGPATH];
        char            tmppath[MAXPGPATH];
-       char            targpath[MAXPGPATH];
        char            zbuffer[BLCKSZ];
-       uint32          targlog,
-                               targseg;
        int                     fd;
        int                     nbytes;
 
@@ -1352,32 +1370,96 @@ XLogFileInit(uint32 log, uint32 seg,
        close(fd);
 
        /*
-        * Now move the segment into place with its final name.  We want to be
-        * sure that only one process does this at a time.
-        */
-       if (use_lock)
-               SpinAcquire(ControlFileLockId);
-
-       /*
+        * Now move the segment into place with its final name.
+        *
         * If caller didn't want to use a pre-existing file, get rid of any
         * pre-existing file.  Otherwise, cope with possibility that someone
         * else has created the file while we were filling ours: if so, use
         * ours to pre-create a future log segment.
         */
-       targlog = log;
-       targseg = seg;
-       strcpy(targpath, path);
+       if (!InstallXLogFileSegment(log, seg, tmppath,
+                                                               *use_existent, XLOGfiles + XLOGfileslop,
+                                                               use_lock))
+       {
+               /* No need for any more future segments... */
+               unlink(tmppath);
+       }
+
+       /* Set flag to tell caller there was no existent file */
+       *use_existent = false;
+
+       /* Now open original target segment (might not be file I just made) */
+       fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
+                                          S_IRUSR | S_IWUSR);
+       if (fd < 0)
+               elog(STOP, "open of %s (log file %u, segment %u) failed: %m",
+                        path, log, seg);
+
+       return (fd);
+}
+
+/*
+ * Install a new XLOG segment file as a current or future log segment.
+ *
+ * This is used both to install a newly-created segment (which has a temp
+ * filename while it's being created) and to recycle an old segment.
+ *
+ * log, seg: identify segment to install as (or first possible target).
+ *
+ * tmppath: initial name of file to install.  It will be renamed into place.
+ *
+ * find_free: if TRUE, install the new segment at the first empty log/seg
+ * number at or after the passed numbers.  If FALSE, install the new segment
+ * exactly where specified, deleting any existing segment file there.
+ *
+ * max_advance: maximum number of log/seg slots to advance past the starting
+ * point.  Fail if no free slot is found in this range.  (Irrelevant if
+ * find_free is FALSE.)
+ *
+ * use_lock: if TRUE, acquire ControlFileLock spinlock while moving file into
+ * place.  This should be TRUE except during bootstrap log creation.  The
+ * caller must *not* hold the spinlock at call.
+ *
+ * Returns TRUE if file installed, FALSE if not installed because of
+ * exceeding max_advance limit.  (Any other kind of failure causes elog().)
+ */
+static bool
+InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath,
+                                          bool find_free, int max_advance,
+                                          bool use_lock)
+{
+       char            path[MAXPGPATH];
+       int                     fd;
+
+       XLogFileName(path, log, seg);
+
+       /*
+        * We want to be sure that only one process does this at a time.
+        */
+       if (use_lock)
+               SpinAcquire(ControlFileLockId);
 
-       if (!*use_existent)
-               unlink(targpath);
+       if (!find_free)
+       {
+               /* Force installation: get rid of any pre-existing segment file */
+               unlink(path);
+       }
        else
        {
-               while ((fd = BasicOpenFile(targpath, O_RDWR | PG_BINARY,
+               /* Find a free slot to put it in */
+               while ((fd = BasicOpenFile(path, O_RDWR | PG_BINARY,
                                                                   S_IRUSR | S_IWUSR)) >= 0)
                {
                        close(fd);
-                       NextLogSeg(targlog, targseg);
-                       XLogFileName(targpath, targlog, targseg);
+                       if (--max_advance < 0)
+                       {
+                               /* Failed to find a free slot within specified range */
+                               if (use_lock)
+                                       SpinRelease(ControlFileLockId);
+                               return false;
+                       }
+                       NextLogSeg(log, seg);
+                       XLogFileName(path, log, seg);
                }
        }
 
@@ -1387,30 +1469,20 @@ XLogFileInit(uint32 log, uint32 seg,
         * rename() is an acceptable substitute except for the truly paranoid.
         */
 #ifndef __BEOS__
-       if (link(tmppath, targpath) < 0)
+       if (link(tmppath, path) < 0)
                elog(STOP, "link from %s to %s (initialization of log file %u, segment %u) failed: %m",
-                        tmppath, targpath, targlog, targseg);
+                        tmppath, path, log, seg);
        unlink(tmppath);
 #else
-       if (rename(tmppath, targpath) < 0)
+       if (rename(tmppath, path) < 0)
                elog(STOP, "rename from %s to %s (initialization of log file %u, segment %u) failed: %m",
-                        tmppath, targpath targlog, targseg);
+                        tmppath, path, log, seg);
 #endif
 
        if (use_lock)
                SpinRelease(ControlFileLockId);
 
-       /* Set flag to tell caller there was no existent file */
-       *use_existent = false;
-
-       /* Now open original target segment (might not be file I just made) */
-       fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
-                                          S_IRUSR | S_IWUSR);
-       if (fd < 0)
-               elog(STOP, "open of %s (log file %u, segment %u) failed: %m",
-                        path, log, seg);
-
-       return (fd);
+       return true;
 }
 
 /*
@@ -1477,20 +1549,26 @@ PreallocXlogFiles(XLogRecPtr endptr)
 
 /*
  * Remove or move offline all log files older or equal to passed log/seg#
+ *
+ * endptr is current (or recent) end of xlog; this is used to determine
+ * whether we want to recycle rather than delete no-longer-wanted log files.
  */
 static void
-MoveOfflineLogs(uint32 log, uint32 seg)
+MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr)
 {
+       uint32          endlogId;
+       uint32          endlogSeg;
        DIR                *xldir;
        struct dirent *xlde;
        char            lastoff[32];
        char            path[MAXPGPATH];
 
-       Assert(XLOG_archive_dir[0] == 0);       /* not implemented yet */
+       XLByteToPrevSeg(endptr, endlogId, endlogSeg);
 
        xldir = opendir(XLogDir);
        if (xldir == NULL)
-               elog(STOP, "could not open transaction log directory (%s): %m", XLogDir);
+               elog(STOP, "could not open transaction log directory (%s): %m",
+                        XLogDir);
 
        sprintf(lastoff, "%08X%08X", log, seg);
 
@@ -1501,19 +1579,42 @@ MoveOfflineLogs(uint32 log, uint32 seg)
                        strspn(xlde->d_name, "0123456789ABCDEF") == 16 &&
                        strcmp(xlde->d_name, lastoff) <= 0)
                {
+                       sprintf(path, "%s/%s", XLogDir, xlde->d_name);
                        if (XLOG_archive_dir[0])
-                               elog(LOG, "archiving transaction log file %s", xlde->d_name);
+                       {
+                               elog(LOG, "archiving transaction log file %s",
+                                        xlde->d_name);
+                               elog(NOTICE, "archiving log files is not implemented!");
+                       }
                        else
-                               elog(LOG, "removing transaction log file %s", xlde->d_name);
-
-                       sprintf(path, "%s/%s", XLogDir, xlde->d_name);
-                       if (XLOG_archive_dir[0] == 0)
-                               unlink(path);
+                       {
+                               /*
+                                * Before deleting the file, see if it can be recycled as
+                                * a future log segment.  We allow recycling segments up to
+                                * XLOGfiles + XLOGfileslop segments beyond the current
+                                * XLOG location.
+                                */
+                               if (InstallXLogFileSegment(endlogId, endlogSeg, path,
+                                                                                  true, XLOGfiles + XLOGfileslop,
+                                                                                  true))
+                               {
+                                       elog(LOG, "recycled transaction log file %s",
+                                                xlde->d_name);
+                               }
+                               else
+                               {
+                                       /* No need for any more future segments... */
+                                       elog(LOG, "removing transaction log file %s",
+                                                xlde->d_name);
+                                       unlink(path);
+                               }
+                       }
                }
                errno = 0;
        }
        if (errno)
-               elog(STOP, "could not read transaction log directory (%s): %m", XLogDir);
+               elog(STOP, "could not read transaction log directory (%s): %m",
+                        XLogDir);
        closedir(xldir);
 }
 
@@ -1866,6 +1967,8 @@ next_record_is_invalid:;
 static bool
 ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI)
 {
+       XLogRecPtr      recaddr;
+
        if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
        {
                elog(emode, "ReadRecord: invalid magic number %04X in log file %u, segment %u, offset %u",
@@ -1878,6 +1981,15 @@ ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI)
                         hdr->xlp_info, readId, readSeg, readOff);
                return false;
        }
+       recaddr.xlogid = readId;
+       recaddr.xrecoff = readSeg * XLogSegSize + readOff;
+       if (!XLByteEQ(hdr->xlp_pageaddr, recaddr))
+       {
+               elog(emode, "ReadRecord: unexpected pageaddr (%u, %u) in log file %u, segment %u, offset %u",
+                        hdr->xlp_pageaddr.xlogid, hdr->xlp_pageaddr.xrecoff,
+                        readId, readSeg, readOff);
+               return false;
+       }
 
        /*
         * We disbelieve a SUI less than the previous page's SUI, or more than
@@ -2248,6 +2360,8 @@ BootStrapXLOG(void)
        page->xlp_magic = XLOG_PAGE_MAGIC;
        page->xlp_info = 0;
        page->xlp_sui = checkPoint.ThisStartUpID;
+       page->xlp_pageaddr.xlogid = 0;
+       page->xlp_pageaddr.xrecoff = 0;
        record = (XLogRecord *) ((char *) page + SizeOfXLogPHD);
        record->xl_prev.xlogid = 0;
        record->xl_prev.xrecoff = 0;
@@ -2500,23 +2614,29 @@ StartupXLOG(void)
                EndOfLog.xrecoff += (BLCKSZ - EndOfLog.xrecoff % BLCKSZ);
        if (EndOfLog.xrecoff % BLCKSZ == 0)
        {
-               if (EndOfLog.xrecoff >= XLogFileSize)
+               XLogRecPtr      NewPageEndPtr;
+
+               NewPageEndPtr = EndOfLog;
+               if (NewPageEndPtr.xrecoff >= XLogFileSize)
                {
-                       XLogCtl->xlblocks[0].xlogid = EndOfLog.xlogid + 1;
-                       XLogCtl->xlblocks[0].xrecoff = BLCKSZ;
+                       /* crossing a logid boundary */
+                       NewPageEndPtr.xlogid += 1;
+                       NewPageEndPtr.xrecoff = BLCKSZ;
                }
                else
                {
-                       XLogCtl->xlblocks[0].xlogid = EndOfLog.xlogid;
-                       XLogCtl->xlblocks[0].xrecoff = EndOfLog.xrecoff + BLCKSZ;
+                       NewPageEndPtr.xrecoff += BLCKSZ;
                }
-               Insert->currpos = (char *) Insert->currpage + SizeOfXLogPHD;
+               XLogCtl->xlblocks[0] = NewPageEndPtr;
                Insert->currpage->xlp_magic = XLOG_PAGE_MAGIC;
                if (InRecovery)
                        Insert->currpage->xlp_sui = ThisStartUpID;
                else
                        Insert->currpage->xlp_sui = ThisStartUpID + 1;
+               Insert->currpage->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid;
+               Insert->currpage->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - BLCKSZ;
                /* rest of buffer was zeroed in XLOGShmemInit */
+               Insert->currpos = (char *) Insert->currpage + SizeOfXLogPHD;
        }
        else
        {
@@ -2916,7 +3036,7 @@ CreateCheckPoint(bool shutdown)
        if (_logId || _logSeg)
        {
                PrevLogSeg(_logId, _logSeg);
-               MoveOfflineLogs(_logId, _logSeg);
+               MoveOfflineLogs(_logId, _logSeg, recptr);
        }
 
        /*
index 41a8d84dade4a667df4a8fffd5ef4904f22f3983..73a60b2e0ce018e19b31aab40594f378d3831dff 100644 (file)
@@ -6,7 +6,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: xlog.h,v 1.23 2001/03/22 04:00:32 momjian Exp $
+ * $Id: xlog.h,v 1.24 2001/07/19 02:12:35 tgl Exp $
  */
 #ifndef XLOG_H
 #define XLOG_H
@@ -109,13 +109,14 @@ typedef struct XLogContRecord
 /*
  * Each page of XLOG file has a header like this:
  */
-#define XLOG_PAGE_MAGIC 0xD058 /* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD059 /* can be used as WAL version indicator */
 
 typedef struct XLogPageHeaderData
 {
        uint16          xlp_magic;              /* magic value for correctness checks */
        uint16          xlp_info;               /* flag bits, see below */
        StartUpID       xlp_sui;                /* StartUpID of first record on page */
+       XLogRecPtr      xlp_pageaddr;   /* XLOG address of this page */
 } XLogPageHeaderData;
 
 #define SizeOfXLogPHD  MAXALIGN(sizeof(XLogPageHeaderData))