]> granicus.if.org Git - postgresql/commitdiff
Rearrange XLogFileInit so that control-file spinlock is not held while filling
authorTom Lane <tgl@sss.pgh.pa.us>
Sat, 17 Mar 2001 20:54:13 +0000 (20:54 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Sat, 17 Mar 2001 20:54:13 +0000 (20:54 +0000)
the new log file with zeroes, only while renaming it into place.  This should
prevent problems with 'stuck spinlock' errors under heavy load.

src/backend/access/transam/xlog.c

index d2d75d652c911fb553c04f783029078a6d29e0ee..5bbc4812ff8922700bc7d4798f568525e652fe4b 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.59 2001/03/16 05:44:33 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.60 2001/03/17 20:54:13 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -86,7 +86,7 @@
 /* Max time to wait to acquire XLog activity locks */
 #define XLOG_LOCK_TIMEOUT                      (5*60*1000000) /* 5 minutes */
 /* Max time to wait to acquire checkpoint lock */
-#define CHECKPOINT_LOCK_TIMEOUT                (10*60*1000000) /* 10 minutes */
+#define CHECKPOINT_LOCK_TIMEOUT                (20*60*1000000) /* 20 minutes */
 
 /* User-settable parameters */
 int                    CheckPointSegments = 3;
@@ -335,10 +335,6 @@ static ControlFileData *ControlFile = NULL;
                        snprintf(path, MAXPGPATH, "%s%c%08X%08X",       \
                                         XLogDir, SEP_CHAR, log, seg)
 
-#define XLogTempFileName(path, log, seg)       \
-                       snprintf(path, MAXPGPATH, "%s%cT%08X%08X",      \
-                                        XLogDir, SEP_CHAR, log, seg)
-
 #define PrevBufIdx(idx)                \
                (((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))
 
@@ -401,7 +397,8 @@ static bool InRedo = false;
 
 static bool AdvanceXLInsertBuffer(void);
 static void XLogWrite(XLogwrtRqst WriteRqst);
-static int     XLogFileInit(uint32 log, uint32 seg, bool *usexistent);
+static int     XLogFileInit(uint32 log, uint32 seg,
+                                                bool *use_existent, bool use_lock);
 static int     XLogFileOpen(uint32 log, uint32 seg, bool econt);
 static void PreallocXlogFiles(XLogRecPtr endptr);
 static void MoveOfflineLogs(uint32 log, uint32 seg);
@@ -960,7 +957,7 @@ XLogWrite(XLogwrtRqst WriteRqst)
        XLogCtlWrite *Write = &XLogCtl->Write;
        char       *from;
        bool            ispartialpage;
-       bool            usexistent;
+       bool            use_existent;
 
        /* Update local LogwrtResult (caller probably did this already, but...) */
        LogwrtResult = Write->LogwrtResult;
@@ -994,12 +991,18 @@ XLogWrite(XLogwrtRqst WriteRqst)
                        }
                        XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
 
-                       /* create/use new log file; need lock in case creating */
-                       SpinAcquire(ControlFileLockId);
-                       usexistent = true;
-                       openLogFile = XLogFileInit(openLogId, openLogSeg, &usexistent);
+                       /* create/use new log file */
+                       use_existent = true;
+                       openLogFile = XLogFileInit(openLogId, openLogSeg,
+                                                                          &use_existent, true);
                        openLogOff = 0;
+
+                       if (!use_existent)      /* there was no precreated file */
+                               elog(LOG, "XLogWrite: new log file created - "
+                                        "consider increasing WAL_FILES");
+
                        /* update pg_control, unless someone else already did */
+                       SpinAcquire(ControlFileLockId);
                        if (ControlFile->logId != openLogId ||
                                ControlFile->logSeg != openLogSeg + 1)
                        {
@@ -1007,28 +1010,23 @@ XLogWrite(XLogwrtRqst WriteRqst)
                                ControlFile->logSeg = openLogSeg + 1;
                                ControlFile->time = time(NULL);
                                UpdateControlFile();
+                               /*
+                                * Signal postmaster to start a checkpoint if it's been too
+                                * long since the last one.  (We look at local copy of
+                                * RedoRecPtr which might be a little out of date, but should
+                                * be close enough for this purpose.)
+                                */
+                               if (IsUnderPostmaster &&
+                                       (openLogId != RedoRecPtr.xlogid ||
+                                        openLogSeg >= (RedoRecPtr.xrecoff / XLogSegSize) +
+                                        (uint32) CheckPointSegments))
+                               {
+                                       if (XLOG_DEBUG)
+                                               fprintf(stderr, "XLogWrite: time for a checkpoint, signaling postmaster\n");
+                                       kill(getppid(), SIGUSR1);
+                               }
                        }
                        SpinRelease(ControlFileLockId);
-
-                       if (!usexistent)        /* there was no precreated file */
-                               elog(LOG, "XLogWrite: new log file created - "
-                                        "consider increasing WAL_FILES");
-
-                       /*
-                        * Signal postmaster to start a checkpoint if it's been too
-                        * long since the last one.  (We look at local copy of RedoRecPtr
-                        * which might be a little out of date, but should be close enough
-                        * for this purpose.)
-                        */
-                       if (IsUnderPostmaster &&
-                               (openLogId != RedoRecPtr.xlogid ||
-                                openLogSeg >= (RedoRecPtr.xrecoff / XLogSegSize) +
-                                (uint32) CheckPointSegments))
-                       {
-                               if (XLOG_DEBUG)
-                                       fprintf(stderr, "XLogWrite: time for a checkpoint, signaling postmaster\n");
-                               kill(getppid(), SIGUSR1);
-                       }
                }
 
                if (openLogFile < 0)
@@ -1230,14 +1228,28 @@ XLogFlush(XLogRecPtr record)
 /*
  * Create a new XLOG file segment, or open a pre-existing one.
  *
+ * log, seg: identify segment to be created/opened.
+ *
+ * *use_existent: if TRUE, OK to use a pre-existing file (else, any
+ * pre-existing file will be deleted).  On return, TRUE if a pre-existing
+ * file was used.
+ *
+ * use_lock: if TRUE, acquire ControlFileLock spinlock while moving file into
+ * place.  This should be TRUE except during bootstrap log creation.  The
+ * caller must *not* hold the spinlock at call.
+ *
  * Returns FD of opened file.
  */
 static int
-XLogFileInit(uint32 log, uint32 seg, bool *usexistent)
+XLogFileInit(uint32 log, uint32 seg,
+                        bool *use_existent, bool use_lock)
 {
        char            path[MAXPGPATH];
-       char            tpath[MAXPGPATH];
+       char            tmppath[MAXPGPATH];
+       char            targpath[MAXPGPATH];
        char            zbuffer[BLCKSZ];
+       uint32          targlog,
+                               targseg;
        int                     fd;
        int                     nbytes;
 
@@ -1246,7 +1258,7 @@ XLogFileInit(uint32 log, uint32 seg, bool *usexistent)
        /*
         * Try to use existent file (checkpoint maker may have created it already)
         */
-       if (*usexistent)
+       if (*use_existent)
        {
                fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
                                                   S_IRUSR | S_IWUSR);
@@ -1258,20 +1270,24 @@ XLogFileInit(uint32 log, uint32 seg, bool *usexistent)
                }
                else
                        return(fd);
-               /* Set flag to tell caller there was no existent file */
-               *usexistent = false;
        }
 
-       XLogTempFileName(tpath, log, seg);
-       unlink(tpath);
-       unlink(path);
+       /*
+        * Initialize an empty (all zeroes) segment.  NOTE: it is possible that
+        * another process is doing the same thing.  If so, we will end up
+        * pre-creating an extra log segment.  That seems OK, and better than
+        * holding the spinlock throughout this lengthy process.
+        */
+       snprintf(tmppath, MAXPGPATH, "%s%cxlogtemp.%d",
+                        XLogDir, SEP_CHAR, (int) getpid());
+
+       unlink(tmppath);
 
        /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
-       fd = BasicOpenFile(tpath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
+       fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
                                           S_IRUSR | S_IWUSR);
        if (fd < 0)
-               elog(STOP, "InitCreate(logfile %u seg %u) failed: %m",
-                        log, seg);
+               elog(STOP, "InitCreate(%s) failed: %m", tmppath);
 
        /*
         * Zero-fill the file.  We have to do this the hard way to ensure that
@@ -1290,36 +1306,73 @@ XLogFileInit(uint32 log, uint32 seg, bool *usexistent)
                        int             save_errno = errno;
 
                        /* If we fail to make the file, delete it to release disk space */
-                       unlink(tpath);
+                       unlink(tmppath);
                        errno = save_errno;
 
-                       elog(STOP, "ZeroFill(logfile %u seg %u) failed: %m",
-                                log, seg);
+                       elog(STOP, "ZeroFill(%s) failed: %m", tmppath);
                }
        }
 
        if (pg_fsync(fd) != 0)
-               elog(STOP, "fsync(logfile %u seg %u) failed: %m",
-                        log, seg);
+               elog(STOP, "fsync(%s) failed: %m", tmppath);
 
        close(fd);
 
        /*
-        * Prefer link() to rename() here just to be sure that we don't overwrite
-        * an existing logfile.  However, there shouldn't be one, so rename()
-        * is an acceptable substitute except for the truly paranoid.
+        * Now move the segment into place with its final name.  We want to be
+        * sure that only one process does this at a time.
+        */
+       if (use_lock)
+               SpinAcquire(ControlFileLockId);
+
+       /*
+        * If caller didn't want to use a pre-existing file, get rid of any
+        * pre-existing file.  Otherwise, cope with possibility that someone
+        * else has created the file while we were filling ours: if so, use
+        * ours to pre-create a future log segment.
+        */
+       targlog = log;
+       targseg = seg;
+       strcpy(targpath, path);
+
+       if (! *use_existent)
+       {
+               unlink(targpath);
+       }
+       else
+       {
+               while ((fd = BasicOpenFile(targpath, O_RDWR | PG_BINARY,
+                                                                  S_IRUSR | S_IWUSR)) >= 0)
+               {
+                       close(fd);
+                       NextLogSeg(targlog, targseg);
+                       XLogFileName(targpath, targlog, targseg);
+               }
+       }
+
+       /*
+        * Prefer link() to rename() here just to be really sure that we don't
+        * overwrite an existing logfile.  However, there shouldn't be one, so
+        * rename() is an acceptable substitute except for the truly paranoid.
         */
 #ifndef __BEOS__
-       if (link(tpath, path) < 0)
+       if (link(tmppath, targpath) < 0)
                elog(STOP, "InitRelink(logfile %u seg %u) failed: %m",
-                        log, seg);
-       unlink(tpath);
+                        targlog, targseg);
+       unlink(tmppath);
 #else
-       if (rename(tpath, path) < 0)
+       if (rename(tmppath, targpath) < 0)
                elog(STOP, "InitRelink(logfile %u seg %u) failed: %m",
-                        log, seg);
+                        targlog, targseg);
 #endif
 
+       if (use_lock)
+               SpinRelease(ControlFileLockId);
+
+       /* Set flag to tell caller there was no existent file */
+       *use_existent = false;
+
+       /* Now open original target segment (might not be file I just made) */
        fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
                                           S_IRUSR | S_IWUSR);
        if (fd < 0)
@@ -1367,8 +1420,7 @@ PreallocXlogFiles(XLogRecPtr endptr)
        uint32          _logId;
        uint32          _logSeg;
        int                     lf;
-       bool            usexistent;
-       struct timeval delay;
+       bool            use_existent;
        int                     i;
 
        XLByteToPrevSeg(endptr, _logId, _logSeg);
@@ -1376,30 +1428,19 @@ PreallocXlogFiles(XLogRecPtr endptr)
        {
                for (i = 1; i <= XLOGfiles; i++)
                {
-                       usexistent = true;
                        NextLogSeg(_logId, _logSeg);
-                       SpinAcquire(ControlFileLockId);
-                       lf = XLogFileInit(_logId, _logSeg, &usexistent);
+                       use_existent = true;
+                       lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
                        close(lf);
-                       SpinRelease(ControlFileLockId);
-                       /*
-                        * Give up ControlFileLockId for 1/50 sec to let other
-                        * backends switch to new log file in XLogWrite()
-                        */
-                       delay.tv_sec = 0;
-                       delay.tv_usec = 20000;
-                       (void) select(0, NULL, NULL, NULL, &delay);
                }
        }
        else if ((endptr.xrecoff - 1) % XLogSegSize >=
                         (uint32) (0.75 * XLogSegSize))
        {
-               usexistent = true;
                NextLogSeg(_logId, _logSeg);
-               SpinAcquire(ControlFileLockId);
-               lf = XLogFileInit(_logId, _logSeg, &usexistent);
+               use_existent = true;
+               lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
                close(lf);
-               SpinRelease(ControlFileLockId);
        }
 }
 
@@ -2103,7 +2144,7 @@ BootStrapXLOG(void)
        char       *buffer;
        XLogPageHeader page;
        XLogRecord *record;
-       bool        usexistent = false;
+       bool        use_existent;
        crc64           crc;
 
        /* Use malloc() to ensure buffer is MAXALIGNED */
@@ -2144,7 +2185,8 @@ BootStrapXLOG(void)
        FIN_CRC64(crc);
        record->xl_crc = crc;
 
-       openLogFile = XLogFileInit(0, 0, &usexistent);
+       use_existent = false;
+       openLogFile = XLogFileInit(0, 0, &use_existent, false);
 
        if (write(openLogFile, buffer, BLCKSZ) != BLCKSZ)
                elog(STOP, "BootStrapXLOG failed to write logfile: %m");