* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.70 2001/06/21 19:45:45 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.71 2001/07/19 02:12:34 tgl Exp $
*
*-------------------------------------------------------------------------
*/
/* User-settable parameters */
int CheckPointSegments = 3;
int XLOGbuffers = 8;
-int XLOGfiles = 0; /* how many files to pre-allocate during
- * ckpt */
+int XLOGfiles = 0; /* # of files to preallocate during ckpt */
int XLOG_DEBUG = 0;
char *XLOG_sync_method = NULL;
const char XLOG_sync_method_default[] = DEFAULT_SYNC_METHOD_STR;
char XLOG_archive_dir[MAXPGPATH]; /* null string means
* delete 'em */
+/*
+ * XLOGfileslop is used in the code as the allowed "fuzz" in the number of
+ * preallocated XLOG segments --- we try to have at least XLOGfiles advance
+ * segments but no more than XLOGfiles+XLOGfileslop segments. This could
+ * be made a separate GUC variable, but at present I think it's sufficient
+ * to hardwire it as 2*CheckPointSegments+1. Under normal conditions, a
+ * checkpoint will free no more than 2*CheckPointSegments log segments, and
+ * we want to recycle all of them; the +1 allows boundary cases to happen
+ * without wasting a delete/create-segment cycle.
+ */
+
+#define XLOGfileslop (2*CheckPointSegments + 1)
+
+
/* these are derived from XLOG_sync_method by assign_xlog_sync_method */
static int sync_method = DEFAULT_SYNC_METHOD;
static int open_sync_bit = DEFAULT_SYNC_FLAGBIT;
-#define MinXLOGbuffers 4
-
#define XLOG_SYNC_BIT (enableFsync ? open_sync_bit : 0)
+#define MinXLOGbuffers 4
+
/*
* ThisStartUpID will be same in all backends --- it identifies current
static void XLogWrite(XLogwrtRqst WriteRqst);
static int XLogFileInit(uint32 log, uint32 seg,
bool *use_existent, bool use_lock);
+static bool InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath,
+ bool find_free, int max_advance,
+ bool use_lock);
static int XLogFileOpen(uint32 log, uint32 seg, bool econt);
static void PreallocXlogFiles(XLogRecPtr endptr);
-static void MoveOfflineLogs(uint32 log, uint32 seg);
+static void MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr);
static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer);
static bool ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI);
static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr,
bool update_needed = true;
XLogRecPtr OldPageRqstPtr;
XLogwrtRqst WriteRqst;
+ XLogRecPtr NewPageEndPtr;
+ XLogPageHeader NewPage;
/* Use Insert->LogwrtResult copy if it's more fresh */
if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write))
* Now the next buffer slot is free and we can set it up to be the
* next output page.
*/
- if (XLogCtl->xlblocks[Insert->curridx].xrecoff >= XLogFileSize)
+ NewPageEndPtr = XLogCtl->xlblocks[Insert->curridx];
+ if (NewPageEndPtr.xrecoff >= XLogFileSize)
{
/* crossing a logid boundary */
- XLogCtl->xlblocks[nextidx].xlogid =
- XLogCtl->xlblocks[Insert->curridx].xlogid + 1;
- XLogCtl->xlblocks[nextidx].xrecoff = BLCKSZ;
+ NewPageEndPtr.xlogid += 1;
+ NewPageEndPtr.xrecoff = BLCKSZ;
}
else
{
- XLogCtl->xlblocks[nextidx].xlogid =
- XLogCtl->xlblocks[Insert->curridx].xlogid;
- XLogCtl->xlblocks[nextidx].xrecoff =
- XLogCtl->xlblocks[Insert->curridx].xrecoff + BLCKSZ;
+ NewPageEndPtr.xrecoff += BLCKSZ;
}
+ XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
+ NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * BLCKSZ);
Insert->curridx = nextidx;
- Insert->currpage = (XLogPageHeader) (XLogCtl->pages + nextidx * BLCKSZ);
- Insert->currpos = ((char *) Insert->currpage) + SizeOfXLogPHD;
+ Insert->currpage = NewPage;
+ Insert->currpos = ((char *) NewPage) + SizeOfXLogPHD;
/*
* Be sure to re-zero the buffer so that bytes beyond what we've
* written will look like zeroes and not valid XLOG records...
*/
- MemSet((char *) Insert->currpage, 0, BLCKSZ);
- Insert->currpage->xlp_magic = XLOG_PAGE_MAGIC;
- /* Insert->currpage->xlp_info = 0; *//* done by memset */
- Insert->currpage->xlp_sui = ThisStartUpID;
+ MemSet((char *) NewPage, 0, BLCKSZ);
+
+ /* And fill the new page's header */
+ NewPage->xlp_magic = XLOG_PAGE_MAGIC;
+ /* NewPage->xlp_info = 0; */ /* done by memset */
+ NewPage->xlp_sui = ThisStartUpID;
+ NewPage->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid;
+ NewPage->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - BLCKSZ;
return update_needed;
}
{
char path[MAXPGPATH];
char tmppath[MAXPGPATH];
- char targpath[MAXPGPATH];
char zbuffer[BLCKSZ];
- uint32 targlog,
- targseg;
int fd;
int nbytes;
close(fd);
/*
- * Now move the segment into place with its final name. We want to be
- * sure that only one process does this at a time.
- */
- if (use_lock)
- SpinAcquire(ControlFileLockId);
-
- /*
+ * Now move the segment into place with its final name.
+ *
* If caller didn't want to use a pre-existing file, get rid of any
* pre-existing file. Otherwise, cope with possibility that someone
* else has created the file while we were filling ours: if so, use
* ours to pre-create a future log segment.
*/
- targlog = log;
- targseg = seg;
- strcpy(targpath, path);
+ if (!InstallXLogFileSegment(log, seg, tmppath,
+ *use_existent, XLOGfiles + XLOGfileslop,
+ use_lock))
+ {
+ /* No need for any more future segments... */
+ unlink(tmppath);
+ }
+
+ /* Set flag to tell caller there was no existent file */
+ *use_existent = false;
+
+ /* Now open original target segment (might not be file I just made) */
+ fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
+ S_IRUSR | S_IWUSR);
+ if (fd < 0)
+ elog(STOP, "open of %s (log file %u, segment %u) failed: %m",
+ path, log, seg);
+
+ return (fd);
+}
+
+/*
+ * Install a new XLOG segment file as a current or future log segment.
+ *
+ * This is used both to install a newly-created segment (which has a temp
+ * filename while it's being created) and to recycle an old segment.
+ *
+ * log, seg: identify segment to install as (or first possible target).
+ *
+ * tmppath: initial name of file to install. It will be renamed into place.
+ *
+ * find_free: if TRUE, install the new segment at the first empty log/seg
+ * number at or after the passed numbers. If FALSE, install the new segment
+ * exactly where specified, deleting any existing segment file there.
+ *
+ * max_advance: maximum number of log/seg slots to advance past the starting
+ * point. Fail if no free slot is found in this range. (Irrelevant if
+ * find_free is FALSE.)
+ *
+ * use_lock: if TRUE, acquire ControlFileLock spinlock while moving file into
+ * place. This should be TRUE except during bootstrap log creation. The
+ * caller must *not* hold the spinlock at call.
+ *
+ * Returns TRUE if file installed, FALSE if not installed because of
+ * exceeding max_advance limit. (Any other kind of failure causes elog().)
+ */
+static bool
+InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath,
+ bool find_free, int max_advance,
+ bool use_lock)
+{
+ char path[MAXPGPATH];
+ int fd;
+
+ XLogFileName(path, log, seg);
+
+ /*
+ * We want to be sure that only one process does this at a time.
+ */
+ if (use_lock)
+ SpinAcquire(ControlFileLockId);
- if (!*use_existent)
- unlink(targpath);
+ if (!find_free)
+ {
+ /* Force installation: get rid of any pre-existing segment file */
+ unlink(path);
+ }
else
{
- while ((fd = BasicOpenFile(targpath, O_RDWR | PG_BINARY,
+ /* Find a free slot to put it in */
+ while ((fd = BasicOpenFile(path, O_RDWR | PG_BINARY,
S_IRUSR | S_IWUSR)) >= 0)
{
close(fd);
- NextLogSeg(targlog, targseg);
- XLogFileName(targpath, targlog, targseg);
+ if (--max_advance < 0)
+ {
+ /* Failed to find a free slot within specified range */
+ if (use_lock)
+ SpinRelease(ControlFileLockId);
+ return false;
+ }
+ NextLogSeg(log, seg);
+ XLogFileName(path, log, seg);
}
}
* rename() is an acceptable substitute except for the truly paranoid.
*/
#ifndef __BEOS__
- if (link(tmppath, targpath) < 0)
+ if (link(tmppath, path) < 0)
elog(STOP, "link from %s to %s (initialization of log file %u, segment %u) failed: %m",
- tmppath, targpath, targlog, targseg);
+ tmppath, path, log, seg);
unlink(tmppath);
#else
- if (rename(tmppath, targpath) < 0)
+ if (rename(tmppath, path) < 0)
elog(STOP, "rename from %s to %s (initialization of log file %u, segment %u) failed: %m",
- tmppath, targpath targlog, targseg);
+ tmppath, path, log, seg);
#endif
if (use_lock)
SpinRelease(ControlFileLockId);
- /* Set flag to tell caller there was no existent file */
- *use_existent = false;
-
- /* Now open original target segment (might not be file I just made) */
- fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
- S_IRUSR | S_IWUSR);
- if (fd < 0)
- elog(STOP, "open of %s (log file %u, segment %u) failed: %m",
- path, log, seg);
-
- return (fd);
+ return true;
}
/*
/*
* Remove or move offline all log files older or equal to passed log/seg#
+ *
+ * endptr is current (or recent) end of xlog; this is used to determine
+ * whether we want to recycle rather than delete no-longer-wanted log files.
*/
static void
-MoveOfflineLogs(uint32 log, uint32 seg)
+MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr)
{
+ uint32 endlogId;
+ uint32 endlogSeg;
DIR *xldir;
struct dirent *xlde;
char lastoff[32];
char path[MAXPGPATH];
- Assert(XLOG_archive_dir[0] == 0); /* not implemented yet */
+ XLByteToPrevSeg(endptr, endlogId, endlogSeg);
xldir = opendir(XLogDir);
if (xldir == NULL)
- elog(STOP, "could not open transaction log directory (%s): %m", XLogDir);
+ elog(STOP, "could not open transaction log directory (%s): %m",
+ XLogDir);
sprintf(lastoff, "%08X%08X", log, seg);
strspn(xlde->d_name, "0123456789ABCDEF") == 16 &&
strcmp(xlde->d_name, lastoff) <= 0)
{
+ sprintf(path, "%s/%s", XLogDir, xlde->d_name);
if (XLOG_archive_dir[0])
- elog(LOG, "archiving transaction log file %s", xlde->d_name);
+ {
+ elog(LOG, "archiving transaction log file %s",
+ xlde->d_name);
+ elog(NOTICE, "archiving log files is not implemented!");
+ }
else
- elog(LOG, "removing transaction log file %s", xlde->d_name);
-
- sprintf(path, "%s/%s", XLogDir, xlde->d_name);
- if (XLOG_archive_dir[0] == 0)
- unlink(path);
+ {
+ /*
+ * Before deleting the file, see if it can be recycled as
+ * a future log segment. We allow recycling segments up to
+ * XLOGfiles + XLOGfileslop segments beyond the current
+ * XLOG location.
+ */
+ if (InstallXLogFileSegment(endlogId, endlogSeg, path,
+ true, XLOGfiles + XLOGfileslop,
+ true))
+ {
+ elog(LOG, "recycled transaction log file %s",
+ xlde->d_name);
+ }
+ else
+ {
+ /* No need for any more future segments... */
+ elog(LOG, "removing transaction log file %s",
+ xlde->d_name);
+ unlink(path);
+ }
+ }
}
errno = 0;
}
if (errno)
- elog(STOP, "could not read transaction log directory (%s): %m", XLogDir);
+ elog(STOP, "could not read transaction log directory (%s): %m",
+ XLogDir);
closedir(xldir);
}
static bool
ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI)
{
+ XLogRecPtr recaddr;
+
if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
{
elog(emode, "ReadRecord: invalid magic number %04X in log file %u, segment %u, offset %u",
hdr->xlp_info, readId, readSeg, readOff);
return false;
}
+ recaddr.xlogid = readId;
+ recaddr.xrecoff = readSeg * XLogSegSize + readOff;
+ if (!XLByteEQ(hdr->xlp_pageaddr, recaddr))
+ {
+ elog(emode, "ReadRecord: unexpected pageaddr (%u, %u) in log file %u, segment %u, offset %u",
+ hdr->xlp_pageaddr.xlogid, hdr->xlp_pageaddr.xrecoff,
+ readId, readSeg, readOff);
+ return false;
+ }
/*
* We disbelieve a SUI less than the previous page's SUI, or more than
page->xlp_magic = XLOG_PAGE_MAGIC;
page->xlp_info = 0;
page->xlp_sui = checkPoint.ThisStartUpID;
+ page->xlp_pageaddr.xlogid = 0;
+ page->xlp_pageaddr.xrecoff = 0;
record = (XLogRecord *) ((char *) page + SizeOfXLogPHD);
record->xl_prev.xlogid = 0;
record->xl_prev.xrecoff = 0;
EndOfLog.xrecoff += (BLCKSZ - EndOfLog.xrecoff % BLCKSZ);
if (EndOfLog.xrecoff % BLCKSZ == 0)
{
- if (EndOfLog.xrecoff >= XLogFileSize)
+ XLogRecPtr NewPageEndPtr;
+
+ NewPageEndPtr = EndOfLog;
+ if (NewPageEndPtr.xrecoff >= XLogFileSize)
{
- XLogCtl->xlblocks[0].xlogid = EndOfLog.xlogid + 1;
- XLogCtl->xlblocks[0].xrecoff = BLCKSZ;
+ /* crossing a logid boundary */
+ NewPageEndPtr.xlogid += 1;
+ NewPageEndPtr.xrecoff = BLCKSZ;
}
else
{
- XLogCtl->xlblocks[0].xlogid = EndOfLog.xlogid;
- XLogCtl->xlblocks[0].xrecoff = EndOfLog.xrecoff + BLCKSZ;
+ NewPageEndPtr.xrecoff += BLCKSZ;
}
- Insert->currpos = (char *) Insert->currpage + SizeOfXLogPHD;
+ XLogCtl->xlblocks[0] = NewPageEndPtr;
Insert->currpage->xlp_magic = XLOG_PAGE_MAGIC;
if (InRecovery)
Insert->currpage->xlp_sui = ThisStartUpID;
else
Insert->currpage->xlp_sui = ThisStartUpID + 1;
+ Insert->currpage->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid;
+ Insert->currpage->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - BLCKSZ;
/* rest of buffer was zeroed in XLOGShmemInit */
+ Insert->currpos = (char *) Insert->currpage + SizeOfXLogPHD;
}
else
{
if (_logId || _logSeg)
{
PrevLogSeg(_logId, _logSeg);
- MoveOfflineLogs(_logId, _logSeg);
+ MoveOfflineLogs(_logId, _logSeg, recptr);
}
/*