From 4d14fe0048cf80052a3ba2053560f8aab1bb1b22 Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Tue, 13 Mar 2001 01:17:06 +0000 Subject: [PATCH] XLOG (and related) changes: * Store two past checkpoint locations, not just one, in pg_control. On startup, we fall back to the older checkpoint if the newer one is unreadable. Also, a physical copy of the newest checkpoint record is kept in pg_control for possible use in disaster recovery (ie, complete loss of pg_xlog). Also add a version number for pg_control itself. Remove archdir from pg_control; it ought to be a GUC parameter, not a special case (not that it's implemented yet anyway). * Suppress successive checkpoint records when nothing has been entered in the WAL log since the last one. This is not so much to avoid I/O as to make it actually useful to keep track of the last two checkpoints. If the things are right next to each other then there's not a lot of redundancy gained... * Change CRC scheme to a true 64-bit CRC, not a pair of 32-bit CRCs on alternate bytes. Polynomial borrowed from ECMA DLT1 standard. * Fix XLOG record length handling so that it will work at BLCKSZ = 32k. * Change XID allocation to work more like OID allocation. (This is of dubious necessity, but I think it's a good idea anyway.) * Fix a number of minor bugs, such as off-by-one logic for XLOG file wraparound at the 4 gig mark. * Add documentation and clean up some coding infelicities; move file format declarations out to include files where planned contrib utilities can get at them. * Checkpoint will now occur every CHECKPOINT_SEGMENTS log segments or every CHECKPOINT_TIMEOUT seconds, whichever comes first. It is also possible to force a checkpoint by sending SIGUSR1 to the postmaster (undocumented feature...) * Defend against kill -9 postmaster by storing shmem block's key and ID in postmaster.pid lockfile, and checking at startup to ensure that no processes are still connected to old shmem block (if it still exists). * Switch backends to accept SIGQUIT rather than SIGUSR1 for emergency stop, for symmetry with postmaster and xlog utilities. Clean up signal handling in bootstrap.c so that xlog utilities launched by postmaster will react to signals better. * Standalone bootstrap now grabs lockfile in target directory, as added insurance against running it in parallel with live postmaster. --- doc/src/sgml/ref/checkpoint.sgml | 13 +- doc/src/sgml/runtime.sgml | 32 +- doc/src/sgml/wal.sgml | 15 +- src/backend/access/transam/transam.c | 3 +- src/backend/access/transam/varsup.c | 38 +- src/backend/access/transam/xact.c | 24 +- src/backend/access/transam/xlog.c | 2621 ++++++++++------- src/backend/access/transam/xlogutils.c | 4 +- src/backend/bootstrap/bootstrap.c | 72 +- src/backend/port/beos/shm.c | 23 +- src/backend/port/qnx4/shm.c | 25 +- src/backend/postmaster/postmaster.c | 159 +- src/backend/storage/ipc/ipc.c | 40 +- src/backend/tcop/postgres.c | 41 +- src/backend/utils/hash/Makefile | 4 +- src/backend/utils/hash/pg_crc.c | 417 +++ src/backend/utils/init/globals.c | 52 +- src/backend/utils/init/miscinit.c | 180 +- src/backend/utils/misc/guc.c | 10 +- src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/access/transam.h | 18 +- src/include/access/xlog.h | 171 +- src/include/access/xlogdefs.h | 41 +- src/include/access/xlogutils.h | 10 + src/include/catalog/pg_control.h | 115 + src/include/miscadmin.h | 4 +- src/include/storage/ipc.h | 4 +- src/include/tcop/tcopprot.h | 3 +- src/include/utils/pg_crc.h | 115 + 29 files changed, 2842 insertions(+), 1413 deletions(-) create mode 100644 src/backend/utils/hash/pg_crc.c create mode 100644 src/include/catalog/pg_control.h create mode 100644 src/include/utils/pg_crc.h diff --git a/doc/src/sgml/ref/checkpoint.sgml b/doc/src/sgml/ref/checkpoint.sgml index 021035888b..5336c3f552 100644 --- a/doc/src/sgml/ref/checkpoint.sgml +++ b/doc/src/sgml/ref/checkpoint.sgml @@ -1,4 +1,4 @@ - + @@ -26,11 +26,12 @@ CHECKPOINT Write-Ahead Logging (WAL) puts a checkpoint in the transaction log - every 300 seconds by default. (This may be changed by the run-time - configuration option CHECKPOINT_TIMEOUT.) - The CHECKPOINT command forces a checkpoint at - the point at which the command is issued. The next automatic - checkpoint will still happen after the original cycle expires. + every so often. (To adjust the automatic checkpoint interval, see + the run-time + configuration options CHECKPOINT_SEGMENTS + and CHECKPOINT_TIMEOUT.) + The CHECKPOINT command forces an immediate checkpoint + when the command is issued, without waiting for a scheduled checkpoint. diff --git a/doc/src/sgml/runtime.sgml b/doc/src/sgml/runtime.sgml index b23dcbf5a6..f321cea669 100644 --- a/doc/src/sgml/runtime.sgml +++ b/doc/src/sgml/runtime.sgml @@ -1,5 +1,5 @@ @@ -976,6 +976,11 @@ env PGOPTIONS='-c geqo=off' psql fsyncs because of performance problems, you may wish to reconsider your choice. + + + This option can only be set at server start or in the + postgresql.conf file. + @@ -1192,11 +1197,25 @@ env PGOPTIONS='-c geqo=off' psql tuning. + + CHECKPOINT_SEGMENTS (integer) + + + Maximum distance between automatic WAL checkpoints, in logfile + segments (each segment is normally 16 megabytes). + This option can only be set at server start or in the + postgresql.conf file. + + + + CHECKPOINT_TIMEOUT (integer) - Frequency of automatic WAL checkpoints, in seconds. + Maximum time between automatic WAL checkpoints, in seconds. + This option can only be set at server start or in the + postgresql.conf file. @@ -1205,8 +1224,8 @@ env PGOPTIONS='-c geqo=off' psql WAL_BUFFERS (integer) - Number of buffers for WAL. This option can only be set at - server start. + Number of disk-page buffers for WAL log. This option can only be set + at server start. @@ -1226,7 +1245,8 @@ env PGOPTIONS='-c geqo=off' psql Number of log files that are created in advance at checkpoint - time. This option can only be set at server start. + time. This option can only be set at server start or in the + postgresql.conf file. @@ -1909,7 +1929,7 @@ default:\ This is the Immediate Shutdown which - will cause the postmaster to send a SIGUSR1 to all backends and + will cause the postmaster to send a SIGQUIT to all backends and exit immediately (without properly shutting down the database system). When WAL is implemented, this will lead to recovery on start-up. Right now it's not recommendable to use this option. diff --git a/doc/src/sgml/wal.sgml b/doc/src/sgml/wal.sgml index e706ee271c..c92ccd9d23 100644 --- a/doc/src/sgml/wal.sgml +++ b/doc/src/sgml/wal.sgml @@ -1,4 +1,4 @@ - + Write-Ahead Logging (<acronym>WAL</acronym>) @@ -257,7 +257,7 @@ The WAL log is held on the disk as a set of 16 MB files called segments. By default a new segment is created only if more than 75% of the current segment is - used. One can instruct the server to create up to 64 log segments + used. One can instruct the server to pre-create up to 64 log segments at checkpoint time by modifying the WAL_FILES configuration parameter. @@ -272,11 +272,12 @@ - By default, the postmaster spawns a special backend process to - create the next checkpoint 300 seconds after the previous - checkpoint's creation. One can change this interval by modifying - the CHECKPOINT_TIMEOUT parameter. It is also - possible to force a checkpoint by using the SQL command + The postmaster spawns a special backend process every so often + to create the next checkpoint. A checkpoint is created every + CHECKPOINT_SEGMENTS log segments, or every + CHECKPOINT_TIMEOUT seconds, whichever comes first. + The default settings are 3 segments and 300 seconds respectively. + It is also possible to force a checkpoint by using the SQL command CHECKPOINT. diff --git a/src/backend/access/transam/transam.c b/src/backend/access/transam/transam.c index dcac8ac2e0..ec429c194b 100644 --- a/src/backend/access/transam/transam.c +++ b/src/backend/access/transam/transam.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/transam/transam.c,v 1.39 2001/01/24 19:42:51 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/transam/transam.c,v 1.40 2001/03/13 01:17:05 tgl Exp $ * * NOTES * This file contains the high level access-method interface to the @@ -430,6 +430,7 @@ InitializeTransactionLog(void) Assert(!IsUnderPostmaster && ShmemVariableCache->nextXid <= FirstTransactionId); ShmemVariableCache->nextXid = FirstTransactionId; + ShmemVariableCache->xidCount = 0; /* force an XLOG rec right away */ } else if (RecoveryCheckingEnabled()) { diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c index 885e8e236c..e4271f5fa8 100644 --- a/src/backend/access/transam/varsup.c +++ b/src/backend/access/transam/varsup.c @@ -6,7 +6,7 @@ * Copyright (c) 2000, PostgreSQL Global Development Group * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/transam/varsup.c,v 1.35 2001/01/24 19:42:51 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/transam/varsup.c,v 1.36 2001/03/13 01:17:05 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -14,12 +14,17 @@ #include "postgres.h" #include "access/transam.h" +#include "access/xlog.h" #include "storage/proc.h" -SPINLOCK OidGenLockId; -extern SPINLOCK XidGenLockId; -extern void XLogPutNextOid(Oid nextOid); +/* Number of XIDs and OIDs to prefetch (preallocate) per XLOG write */ +#define VAR_XID_PREFETCH 1024 +#define VAR_OID_PREFETCH 8192 + +/* Spinlocks for serializing generation of XIDs and OIDs, respectively */ +SPINLOCK XidGenLockId; +SPINLOCK OidGenLockId; /* pointer to "variable cache" in shared memory (set up by shmem.c) */ VariableCache ShmemVariableCache = NULL; @@ -38,23 +43,31 @@ GetNewTransactionId(TransactionId *xid) } SpinAcquire(XidGenLockId); + + /* If we run out of logged for use xids then we must log more */ + if (ShmemVariableCache->xidCount == 0) + { + XLogPutNextXid(ShmemVariableCache->nextXid + VAR_XID_PREFETCH); + ShmemVariableCache->xidCount = VAR_XID_PREFETCH; + } + *xid = ShmemVariableCache->nextXid; - (ShmemVariableCache->nextXid)++; - if (MyProc != (PROC *) NULL) - MyProc->xid = *xid; + (ShmemVariableCache->nextXid)++; + (ShmemVariableCache->xidCount)--; SpinRelease(XidGenLockId); + if (MyProc != (PROC *) NULL) + MyProc->xid = *xid; } /* - * Like GetNewTransactionId reads nextXid but don't fetch it. + * Read nextXid but don't allocate it. */ void ReadNewTransactionId(TransactionId *xid) { - /* * During bootstrap initialization, we return the special * bootstrap transaction id. @@ -68,7 +81,6 @@ ReadNewTransactionId(TransactionId *xid) SpinAcquire(XidGenLockId); *xid = ShmemVariableCache->nextXid; SpinRelease(XidGenLockId); - } /* ---------------------------------------------------------------- @@ -76,7 +88,6 @@ ReadNewTransactionId(TransactionId *xid) * ---------------------------------------------------------------- */ -#define VAR_OID_PREFETCH 8192 static Oid lastSeenOid = InvalidOid; void @@ -84,7 +95,7 @@ GetNewObjectId(Oid *oid_return) { SpinAcquire(OidGenLockId); - /* If we run out of logged for use oids then we log more */ + /* If we run out of logged for use oids then we must log more */ if (ShmemVariableCache->oidCount == 0) { XLogPutNextOid(ShmemVariableCache->nextOid + VAR_OID_PREFETCH); @@ -103,11 +114,11 @@ GetNewObjectId(Oid *oid_return) void CheckMaxObjectId(Oid assigned_oid) { - if (lastSeenOid != InvalidOid && assigned_oid < lastSeenOid) return; SpinAcquire(OidGenLockId); + if (assigned_oid < ShmemVariableCache->nextOid) { lastSeenOid = ShmemVariableCache->nextOid - 1; @@ -138,5 +149,4 @@ CheckMaxObjectId(Oid assigned_oid) ShmemVariableCache->nextOid = assigned_oid + 1; SpinRelease(OidGenLockId); - } diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 0af2582658..1331c8e983 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.98 2001/02/26 00:50:07 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.99 2001/03/13 01:17:05 tgl Exp $ * * NOTES * Transaction aborts can now occur two ways: @@ -706,14 +706,18 @@ RecordTransactionCommit() } XLogFlush(recptr); + + /* Break the chain of back-links in the XLOG records I output */ MyLastRecPtr.xrecoff = 0; TransactionIdCommit(xid); - MyProc->logRec.xrecoff = 0; END_CRIT_SECTION(); } + /* Show myself as out of the transaction in PROC array */ + MyProc->logRec.xrecoff = 0; + if (leak) ResetBufferPool(true); } @@ -802,6 +806,10 @@ RecordTransactionAbort(void) { TransactionId xid = GetCurrentTransactionId(); + /* + * Double check here is to catch case that we aborted partway through + * RecordTransactionCommit ... + */ if (MyLastRecPtr.xrecoff != 0 && !TransactionIdDidCommit(xid)) { XLogRecData rdata; @@ -815,13 +823,19 @@ RecordTransactionAbort(void) rdata.next = NULL; START_CRIT_SECTION(); + recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT, &rdata); TransactionIdAbort(xid); - MyProc->logRec.xrecoff = 0; + END_CRIT_SECTION(); } + /* Break the chain of back-links in the XLOG records I output */ + MyLastRecPtr.xrecoff = 0; + /* Show myself as out of the transaction in PROC array */ + MyProc->logRec.xrecoff = 0; + /* * Tell bufmgr and smgr to release resources. */ @@ -1187,10 +1201,6 @@ AbortTransaction(void) AtEOXact_CatCache(false); AtAbort_Memory(); AtEOXact_Files(); - - /* Here we'll rollback xaction changes */ - MyLastRecPtr.xrecoff = 0; - AtAbort_Locks(); SharedBufferChanged = false; /* safest place to do it */ diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index fc861aba12..3d6b8255e9 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6,7 +6,7 @@ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.55 2001/02/26 00:50:07 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.56 2001/03/13 01:17:05 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -14,6 +14,7 @@ #include "postgres.h" #include +#include #include #include #include @@ -27,6 +28,7 @@ #include "access/transam.h" #include "access/xact.h" #include "catalog/catversion.h" +#include "catalog/pg_control.h" #include "storage/sinval.h" #include "storage/proc.h" #include "storage/spin.h" @@ -36,7 +38,6 @@ #include "access/xlogutils.h" #include "utils/builtins.h" #include "utils/relcache.h" - #include "miscadmin.h" @@ -45,160 +46,239 @@ /* Max time to wait to acquire checkpoint lock */ #define CHECKPOINT_LOCK_TIMEOUT (10*60*1000000) /* 10 minutes */ - +/* User-settable parameters */ +int CheckPointSegments = 3; int XLOGbuffers = 8; -int XLOGfiles = 0; /* how many files to pre-allocate */ -XLogRecPtr MyLastRecPtr = {0, 0}; -bool InRecovery = false; +int XLOGfiles = 0; /* how many files to pre-allocate during ckpt */ +int XLOG_DEBUG = 0; +char XLOG_archive_dir[MAXPGPATH]; /* null string means delete 'em */ + +#define MinXLOGbuffers 4 + + +/* + * ThisStartUpID will be same in all backends --- it identifies current + * instance of the database system. + */ StartUpID ThisStartUpID = 0; -XLogRecPtr RedoRecPtr; -int XLOG_DEBUG = 0; +/* Are we doing recovery by reading XLOG? */ +bool InRecovery = false; -/* To read/update control file and create new log file */ -SPINLOCK ControlFileLockId; +/* + * MyLastRecPtr points to the start of the last XLOG record inserted by the + * current transaction. If MyLastRecPtr.xrecoff == 0, then we are not in + * a transaction or the transaction has not yet made any loggable changes. + * + * Note that XLOG records inserted outside transaction control are not + * reflected into MyLastRecPtr. + */ +XLogRecPtr MyLastRecPtr = {0, 0}; -/* To generate new xid */ -SPINLOCK XidGenLockId; +/* + * ProcLastRecPtr points to the start of the last XLOG record inserted by the + * current backend. It is updated for all inserts, transaction-controlled + * or not. + */ +static XLogRecPtr ProcLastRecPtr = {0, 0}; -static char XLogDir[MAXPGPATH]; -static char ControlFilePath[MAXPGPATH]; +/* + * RedoRecPtr is this backend's local copy of the REDO record pointer + * (which is almost but not quite the same as a pointer to the most recent + * CHECKPOINT record). We update this from the shared-memory copy, + * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we + * hold the Insert spinlock). See XLogInsert for details. + */ +static XLogRecPtr RedoRecPtr; -#define MinXLOGbuffers 4 +/* This lock must be held to read/update control file or create new log file */ +SPINLOCK ControlFileLockId; -typedef struct XLgwrRqst +/*---------- + * Shared-memory data structures for XLOG control + * + * LogwrtRqst indicates a byte position that we need to write and/or fsync + * the log up to (all records before that point must be written or fsynced). + * LogwrtResult indicates the byte positions we have already written/fsynced. + * These structs are identical but are declared separately to indicate their + * slightly different functions. + * + * We do a lot of pushups to minimize the amount of access to spinlocked + * shared memory values. There are actually three shared-memory copies of + * LogwrtResult, plus one unshared copy in each backend. Here's how it works: + * XLogCtl->LogwrtResult is protected by info_lck + * XLogCtl->Write.LogwrtResult is protected by logwrt_lck + * XLogCtl->Insert.LogwrtResult is protected by insert_lck + * One must hold the associated spinlock to read or write any of these, but + * of course no spinlock is needed to read/write the unshared LogwrtResult. + * + * XLogCtl->LogwrtResult and XLogCtl->Write.LogwrtResult are both "always + * right", since both are updated by a write or flush operation before + * it releases logwrt_lck. The point of keeping XLogCtl->Write.LogwrtResult + * is that it can be examined/modified by code that already holds logwrt_lck + * without needing to grab info_lck as well. + * + * XLogCtl->Insert.LogwrtResult may lag behind the reality of the other two, + * but is updated when convenient. Again, it exists for the convenience of + * code that is already holding insert_lck but not the other locks. + * + * The unshared LogwrtResult may lag behind any or all of these, and again + * is updated when convenient. + * + * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst + * (protected by info_lck), but we don't need to cache any copies of it. + * + * Note that this all works because the request and result positions can only + * advance forward, never back up, and so we can easily determine which of two + * values is "more up to date". + *---------- + */ +typedef struct XLogwrtRqst { - XLogRecPtr Write; /* byte (1-based) to write out */ - XLogRecPtr Flush; /* byte (1-based) to flush */ -} XLgwrRqst; + XLogRecPtr Write; /* last byte + 1 to write out */ + XLogRecPtr Flush; /* last byte + 1 to flush */ +} XLogwrtRqst; -typedef struct XLgwrResult +typedef struct XLogwrtResult { - XLogRecPtr Write; /* bytes written out */ - XLogRecPtr Flush; /* bytes flushed */ -} XLgwrResult; + XLogRecPtr Write; /* last byte + 1 written out */ + XLogRecPtr Flush; /* last byte + 1 flushed */ +} XLogwrtResult; +/* + * Shared state data for XLogInsert. + */ typedef struct XLogCtlInsert { - XLgwrResult LgwrResult; - XLogRecPtr PrevRecord; + XLogwrtResult LogwrtResult; /* a recent value of LogwrtResult */ + XLogRecPtr PrevRecord; /* start of previously-inserted record */ uint16 curridx; /* current block index in cache */ - XLogPageHeader currpage; - char *currpos; - XLogRecPtr RedoRecPtr; + XLogPageHeader currpage; /* points to header of block in cache */ + char *currpos; /* current insertion point in cache */ + XLogRecPtr RedoRecPtr; /* current redo point for insertions */ } XLogCtlInsert; +/* + * Shared state data for XLogWrite/XLogFlush. + */ typedef struct XLogCtlWrite { - XLgwrResult LgwrResult; - uint16 curridx; /* index of next block to write */ + XLogwrtResult LogwrtResult; /* current value of LogwrtResult */ + uint16 curridx; /* cache index of next block to write */ } XLogCtlWrite; - +/* + * Total shared-memory state for XLOG. + */ typedef struct XLogCtlData { + /* Protected by insert_lck: */ XLogCtlInsert Insert; - XLgwrRqst LgwrRqst; - XLgwrResult LgwrResult; + /* Protected by info_lck: */ + XLogwrtRqst LogwrtRqst; + XLogwrtResult LogwrtResult; + /* Protected by logwrt_lck: */ XLogCtlWrite Write; - char *pages; + /* + * These values do not change after startup, although the pointed-to + * pages and xlblocks values certainly do. Permission to read/write + * the pages and xlblocks values depends on insert_lck and logwrt_lck. + */ + char *pages; /* buffers for unwritten XLOG pages */ XLogRecPtr *xlblocks; /* 1st byte ptr-s + BLCKSZ */ - uint32 XLogCacheByte; - uint32 XLogCacheBlck; + uint32 XLogCacheByte; /* # bytes in xlog buffers */ + uint32 XLogCacheBlck; /* highest allocated xlog buffer index */ StartUpID ThisStartUpID; - XLogRecPtr RedoRecPtr; /* for postmaster */ - slock_t insert_lck; - slock_t info_lck; - slock_t lgwr_lck; + + /* This value is not protected by *any* spinlock... */ + XLogRecPtr RedoRecPtr; /* see SetRedoRecPtr/GetRedoRecPtr */ + + slock_t insert_lck; /* XLogInsert lock */ + slock_t info_lck; /* locks shared LogwrtRqst/LogwrtResult */ + slock_t logwrt_lck; /* XLogWrite/XLogFlush lock */ slock_t chkp_lck; /* checkpoint lock */ } XLogCtlData; static XLogCtlData *XLogCtl = NULL; /* - * Contents of pg_control + * We maintain an image of pg_control in shared memory. */ - -typedef enum DBState -{ - DB_STARTUP = 0, - DB_SHUTDOWNED, - DB_SHUTDOWNING, - DB_IN_RECOVERY, - DB_IN_PRODUCTION -} DBState; - -#define LOCALE_NAME_BUFLEN 128 - -typedef struct ControlFileData -{ - crc64 crc; - uint32 logId; /* current log file id */ - uint32 logSeg; /* current log file segment (1-based) */ - XLogRecPtr checkPoint; /* last check point record ptr */ - time_t time; /* time stamp of last modification */ - DBState state; /* see enum above */ - - /* - * this data is used to make sure that configuration of this DB is - * compatible with the backend executable - */ - uint32 blcksz; /* block size for this DB */ - uint32 relseg_size; /* blocks per segment of large relation */ - uint32 catalog_version_no; /* internal version number */ - /* active locales --- "C" if compiled without USE_LOCALE: */ - char lc_collate[LOCALE_NAME_BUFLEN]; - char lc_ctype[LOCALE_NAME_BUFLEN]; - - /* - * important directory locations - */ - char archdir[MAXPGPATH]; /* where to move offline log files */ -} ControlFileData; - static ControlFileData *ControlFile = NULL; -typedef struct CheckPoint -{ - XLogRecPtr redo; /* next RecPtr available when we */ - /* began to create CheckPoint */ - /* (i.e. REDO start point) */ - XLogRecPtr undo; /* first record of oldest in-progress */ - /* transaction when we started */ - /* (i.e. UNDO end point) */ - StartUpID ThisStartUpID; - TransactionId nextXid; - Oid nextOid; - bool Shutdown; -} CheckPoint; +/* + * Macros for managing XLogInsert state. In most cases, the calling routine + * has local copies of XLogCtl->Insert and/or XLogCtl->Insert->curridx, + * so these are passed as parameters instead of being fetched via XLogCtl. + */ -#define XLOG_CHECKPOINT 0x00 -#define XLOG_NEXTOID 0x10 +/* Free space remaining in the current xlog page buffer */ +#define INSERT_FREESPACE(Insert) \ + (BLCKSZ - ((Insert)->currpos - (char *) (Insert)->currpage)) + +/* Construct XLogRecPtr value for current insertion point */ +#define INSERT_RECPTR(recptr,Insert,curridx) \ + ( \ + (recptr).xlogid = XLogCtl->xlblocks[curridx].xlogid, \ + (recptr).xrecoff = \ + XLogCtl->xlblocks[curridx].xrecoff - INSERT_FREESPACE(Insert) \ + ) + + +/* Increment an xlogid/segment pair */ +#define NextLogSeg(logId, logSeg) \ + do { \ + if ((logSeg) >= XLogSegsPerFile-1) \ + { \ + (logId)++; \ + (logSeg) = 0; \ + } \ + else \ + (logSeg)++; \ + } while (0) + +/* Decrement an xlogid/segment pair (assume it's not 0,0) */ +#define PrevLogSeg(logId, logSeg) \ + do { \ + if (logSeg) \ + (logSeg)--; \ + else \ + { \ + (logId)--; \ + (logSeg) = XLogSegsPerFile-1; \ + } \ + } while (0) -typedef struct BkpBlock -{ - crc64 crc; - RelFileNode node; - BlockNumber block; -} BkpBlock; +/* + * Compute ID and segment from an XLogRecPtr. + * + * For XLByteToSeg, do the computation at face value. For XLByteToPrevSeg, + * a boundary byte is taken to be in the previous segment. This is suitable + * for deciding which segment to write given a pointer to a record end, + * for example. + */ +#define XLByteToSeg(xlrp, logId, logSeg) \ + ( logId = (xlrp).xlogid, \ + logSeg = (xlrp).xrecoff / XLogSegSize \ + ) +#define XLByteToPrevSeg(xlrp, logId, logSeg) \ + ( logId = (xlrp).xlogid, \ + logSeg = ((xlrp).xrecoff - 1) / XLogSegSize \ + ) /* - * We break each log file in 16Mb segments + * Is an XLogRecPtr within a particular XLOG segment? + * + * For XLByteInSeg, do the computation at face value. For XLByteInPrevSeg, + * a boundary byte is taken to be in the previous segment. */ -#define XLogSegSize ((uint32) (16*1024*1024)) -#define XLogLastSeg (((uint32) 0xffffffff) / XLogSegSize) -#define XLogFileSize (XLogLastSeg * XLogSegSize) - -#define NextLogSeg(_logId, _logSeg) \ -{\ - if (_logSeg >= XLogLastSeg)\ - {\ - _logId++;\ - _logSeg = 0;\ - }\ - else\ - _logSeg++;\ -} +#define XLByteInSeg(xlrp, logId, logSeg) \ + ((xlrp).xlogid == (logId) && \ + (xlrp).xrecoff / XLogSegSize == (logSeg)) + +#define XLByteInPrevSeg(xlrp, logId, logSeg) \ + ((xlrp).xlogid == (logId) && \ + ((xlrp).xrecoff - 1) / XLogSegSize == (logSeg)) #define XLogFileName(path, log, seg) \ @@ -209,120 +289,133 @@ typedef struct BkpBlock snprintf(path, MAXPGPATH, "%s%cT%08X%08X", \ XLogDir, SEP_CHAR, log, seg) -#define PrevBufIdx(curridx) \ - ((curridx == 0) ? XLogCtl->XLogCacheBlck : (curridx - 1)) - -#define NextBufIdx(curridx) \ - ((curridx == XLogCtl->XLogCacheBlck) ? 0 : (curridx + 1)) - -#define InitXLBuffer(curridx) (\ - XLogCtl->xlblocks[curridx].xrecoff = \ - (XLogCtl->xlblocks[Insert->curridx].xrecoff == XLogFileSize) ? \ - BLCKSZ : (XLogCtl->xlblocks[Insert->curridx].xrecoff + BLCKSZ), \ - XLogCtl->xlblocks[curridx].xlogid = \ - (XLogCtl->xlblocks[Insert->curridx].xrecoff == XLogFileSize) ? \ - (XLogCtl->xlblocks[Insert->curridx].xlogid + 1) : \ - XLogCtl->xlblocks[Insert->curridx].xlogid, \ - Insert->curridx = curridx, \ - Insert->currpage = (XLogPageHeader) (XLogCtl->pages + curridx * BLCKSZ), \ - Insert->currpos = \ - ((char*) Insert->currpage) + SizeOfXLogPHD, \ - Insert->currpage->xlp_magic = XLOG_PAGE_MAGIC, \ - Insert->currpage->xlp_info = 0 \ - ) +#define PrevBufIdx(idx) \ + (((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1)) + +#define NextBufIdx(idx) \ + (((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1)) #define XRecOffIsValid(xrecoff) \ - (xrecoff % BLCKSZ >= SizeOfXLogPHD && \ - (BLCKSZ - xrecoff % BLCKSZ) >= SizeOfXLogRecord) - -#define _INTL_MAXLOGRECSZ (3 * MAXLOGRECSZ) - -extern uint32 crc_table[]; -#define INIT_CRC64(crc) (crc.crc1 = 0xffffffff, crc.crc2 = 0xffffffff) -#define FIN_CRC64(crc) (crc.crc1 ^= 0xffffffff, crc.crc2 ^= 0xffffffff) -#define COMP_CRC64(crc, data, len) \ -{\ - uint32 __c1 = crc.crc1;\ - uint32 __c2 = crc.crc2;\ - char *__data = data;\ - uint32 __len = len;\ -\ - while (__len >= 2)\ - {\ - __c1 = crc_table[(__c1 ^ *__data++) & 0xff] ^ (__c1 >> 8);\ - __c2 = crc_table[(__c2 ^ *__data++) & 0xff] ^ (__c2 >> 8);\ - __len -= 2;\ - }\ - if (__len > 0)\ - __c1 = crc_table[(__c1 ^ *__data++) & 0xff] ^ (__c1 >> 8);\ - crc.crc1 = __c1;\ - crc.crc2 = __c2;\ -} + ((xrecoff) % BLCKSZ >= SizeOfXLogPHD && \ + (BLCKSZ - (xrecoff) % BLCKSZ) >= SizeOfXLogRecord) -void SetRedoRecPtr(void); -void GetRedoRecPtr(void); +/* + * _INTL_MAXLOGRECSZ: max space needed for a record including header and + * any backup-block data. + */ +#define _INTL_MAXLOGRECSZ (SizeOfXLogRecord + MAXLOGRECSZ + \ + XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ)) -static void GetFreeXLBuffer(void); -static void XLogWrite(char *buffer); -static int XLogFileInit(uint32 log, uint32 seg, bool *usexistent); -static int XLogFileOpen(uint32 log, uint32 seg, bool econt); -static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, char *buffer); -static void WriteControlFile(void); -static void ReadControlFile(void); -static char *str_time(time_t tnow); -static void xlog_outrec(char *buf, XLogRecord *record); -static XLgwrResult LgwrResult = {{0, 0}, {0, 0}}; -static XLgwrRqst LgwrRqst = {{0, 0}, {0, 0}}; +/* File path names */ +static char XLogDir[MAXPGPATH]; +static char ControlFilePath[MAXPGPATH]; + +/* + * Private, possibly out-of-date copy of shared LogwrtResult. + * See discussion above. + */ +static XLogwrtResult LogwrtResult = {{0, 0}, {0, 0}}; -static int logFile = -1; -static uint32 logId = 0; -static uint32 logSeg = 0; -static uint32 logOff = 0; +/* + * openLogFile is -1 or a kernel FD for an open log file segment. + * When it's open, openLogOff is the current seek offset in the file. + * openLogId/openLogSeg identify the segment. These variables are only + * used to write the XLOG, and so will normally refer to the active segment. + */ +static int openLogFile = -1; +static uint32 openLogId = 0; +static uint32 openLogSeg = 0; +static uint32 openLogOff = 0; -static XLogRecPtr ReadRecPtr; -static XLogRecPtr EndRecPtr; +/* + * These variables are used similarly to the ones above, but for reading + * the XLOG. Note, however, that readOff generally represents the offset + * of the page just read, not the seek position of the FD itself, which + * will be just past that page. + */ static int readFile = -1; static uint32 readId = 0; static uint32 readSeg = 0; static uint32 readOff = 0; -static char readBuf[BLCKSZ]; +/* Buffer for currently read page (BLCKSZ bytes) */ +static char *readBuf = NULL; +/* State information for XLOG reading */ +static XLogRecPtr ReadRecPtr; +static XLogRecPtr EndRecPtr; static XLogRecord *nextRecord = NULL; static bool InRedo = false; + +static bool AdvanceXLInsertBuffer(void); +static void XLogWrite(XLogwrtRqst WriteRqst); +static int XLogFileInit(uint32 log, uint32 seg, bool *usexistent); +static int XLogFileOpen(uint32 log, uint32 seg, bool econt); +static void PreallocXlogFiles(XLogRecPtr endptr); +static void MoveOfflineLogs(uint32 log, uint32 seg); +static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer); +static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, + const char *whichChkpt, + char *buffer); +static void WriteControlFile(void); +static void ReadControlFile(void); +static char *str_time(time_t tnow); +static void xlog_outrec(char *buf, XLogRecord *record); + + +/* + * Insert an XLOG record having the specified RMID and info bytes, + * with the body of the record being the data chunk(s) described by + * the rdata list (see xlog.h for notes about rdata). + * + * Returns XLOG pointer to end of record (beginning of next record). + * This can be used as LSN for data pages affected by the logged action. + * (LSN is the XLOG point up to which the XLOG must be flushed to disk + * before the data page can be written out. This implements the basic + * WAL rule "write the log before the data".) + * + * NB: this routine feels free to scribble on the XLogRecData structs, + * though not on the data they reference. This is OK since the XLogRecData + * structs are always just temporaries in the calling code. + */ XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) { XLogCtlInsert *Insert = &XLogCtl->Insert; XLogRecord *record; - XLogSubRecord *subrecord; + XLogContRecord *contrecord; XLogRecPtr RecPtr; + XLogRecPtr WriteRqst; uint32 freespace; uint16 curridx; XLogRecData *rdt; - Buffer dtbuf[2] = {InvalidBuffer, InvalidBuffer}; - bool dtbuf_bkp[2] = {false, false}; - XLogRecData dtbuf_rdt[4]; - BkpBlock dtbuf_xlg[2]; - XLogRecPtr dtbuf_lsn[2]; - crc64 dtbuf_crc[2], - rdata_crc; - uint32 len; + Buffer dtbuf[XLR_MAX_BKP_BLOCKS]; + bool dtbuf_bkp[XLR_MAX_BKP_BLOCKS]; + BkpBlock dtbuf_xlg[XLR_MAX_BKP_BLOCKS]; + XLogRecPtr dtbuf_lsn[XLR_MAX_BKP_BLOCKS]; + XLogRecData dtbuf_rdt[2 * XLR_MAX_BKP_BLOCKS]; + crc64 rdata_crc; + uint32 len, + write_len; unsigned i; - bool updrqst = false; - bool repeat = false; + bool do_logwrt; + bool updrqst; bool no_tran = (rmid == RM_XLOG_ID) ? true : false; if (info & XLR_INFO_MASK) { if ((info & XLR_INFO_MASK) != XLOG_NO_TRAN) elog(STOP, "XLogInsert: invalid info mask %02X", - (info & XLR_INFO_MASK)); + (info & XLR_INFO_MASK)); no_tran = true; info &= ~XLR_INFO_MASK; } + /* + * In bootstrap mode, we don't actually log anything but XLOG resources; + * return a phony record pointer. + */ if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID) { RecPtr.xlogid = 0; @@ -330,180 +423,229 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) return (RecPtr); } + /* + * Here we scan the rdata list, determine which buffers must be backed + * up, and compute the CRC values for the data. Note that the record + * header isn't added into the CRC yet since we don't know the final + * length or info bits quite yet. + * + * We may have to loop back to here if a race condition is detected below. + * We could prevent the race by doing all this work while holding the + * insert spinlock, but it seems better to avoid doing CRC calculations + * while holding the lock. This means we have to be careful about + * modifying the rdata list until we know we aren't going to loop back + * again. The only change we allow ourselves to make earlier is to set + * rdt->data = NULL in list items we have decided we will have to back + * up the whole buffer for. This is OK because we will certainly decide + * the same thing again for those items if we do it over; doing it here + * saves an extra pass over the list later. + */ begin:; + for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) + { + dtbuf[i] = InvalidBuffer; + dtbuf_bkp[i] = false; + } + INIT_CRC64(rdata_crc); - for (len = 0, rdt = rdata; ; ) + len = 0; + for (rdt = rdata; ; ) { if (rdt->buffer == InvalidBuffer) { + /* Simple data, just include it */ len += rdt->len; COMP_CRC64(rdata_crc, rdt->data, rdt->len); - if (rdt->next == NULL) - break; - rdt = rdt->next; - continue; } - for (i = 0; i < 2; i++) + else { - if (rdt->buffer == dtbuf[i]) - { - if (dtbuf_bkp[i]) - rdt->data = NULL; - else if (rdt->data) - { - len += rdt->len; - COMP_CRC64(rdata_crc, rdt->data, rdt->len); - } - break; - } - if (dtbuf[i] == InvalidBuffer) + /* Find info for buffer */ + for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) { - dtbuf[i] = rdt->buffer; - dtbuf_lsn[i] = *((XLogRecPtr*)(BufferGetBlock(rdt->buffer))); - if (XLByteLE(dtbuf_lsn[i], RedoRecPtr)) + if (rdt->buffer == dtbuf[i]) { - crc64 crc; - - dtbuf_bkp[i] = true; - rdt->data = NULL; - INIT_CRC64(crc); - COMP_CRC64(crc, ((char*)BufferGetBlock(dtbuf[i])), BLCKSZ); - dtbuf_crc[i] = crc; + /* Buffer already referenced by earlier list item */ + if (dtbuf_bkp[i]) + rdt->data = NULL; + else if (rdt->data) + { + len += rdt->len; + COMP_CRC64(rdata_crc, rdt->data, rdt->len); + } + break; } - else if (rdt->data) + if (dtbuf[i] == InvalidBuffer) { - len += rdt->len; - COMP_CRC64(rdata_crc, rdt->data, rdt->len); + /* OK, put it in this slot */ + dtbuf[i] = rdt->buffer; + /* + * XXX We assume page LSN is first data on page + */ + dtbuf_lsn[i] = *((XLogRecPtr*)BufferGetBlock(rdt->buffer)); + if (XLByteLE(dtbuf_lsn[i], RedoRecPtr)) + { + crc64 dtcrc; + + dtbuf_bkp[i] = true; + rdt->data = NULL; + INIT_CRC64(dtcrc); + COMP_CRC64(dtcrc, + BufferGetBlock(dtbuf[i]), + BLCKSZ); + dtbuf_xlg[i].node = BufferGetFileNode(dtbuf[i]); + dtbuf_xlg[i].block = BufferGetBlockNumber(dtbuf[i]); + COMP_CRC64(dtcrc, + (char*) &(dtbuf_xlg[i]) + sizeof(crc64), + sizeof(BkpBlock) - sizeof(crc64)); + FIN_CRC64(dtcrc); + dtbuf_xlg[i].crc = dtcrc; + } + else if (rdt->data) + { + len += rdt->len; + COMP_CRC64(rdata_crc, rdt->data, rdt->len); + } + break; } - break; } + if (i >= XLR_MAX_BKP_BLOCKS) + elog(STOP, "XLogInsert: can backup %d blocks at most", + XLR_MAX_BKP_BLOCKS); } - if (i >= 2) - elog(STOP, "XLogInsert: can backup 2 blocks at most"); + /* Break out of loop when rdt points to last list item */ if (rdt->next == NULL) break; rdt = rdt->next; } + /* + * NOTE: the test for len == 0 here is somewhat fishy, since in theory + * all of the rmgr data might have been suppressed in favor of backup + * blocks. Currently, all callers of XLogInsert provide at least some + * not-in-a-buffer data and so len == 0 should never happen, but that + * may not be true forever. If you need to remove the len == 0 check, + * also remove the check for xl_len == 0 in ReadRecord, below. + */ if (len == 0 || len > MAXLOGRECSZ) elog(STOP, "XLogInsert: invalid record len %u", len); START_CRIT_SECTION(); - /* obtain xlog insert lock */ - if (TAS(&(XLogCtl->insert_lck))) /* busy */ - { - bool do_lgwr = true; + /* wait to obtain xlog insert lock */ + do_logwrt = true; - for (i = 0;;) + for (i = 0;;) + { + /* try to update LogwrtResult while waiting for insert lock */ + if (!TAS(&(XLogCtl->info_lck))) { - /* try to read LgwrResult while waiting for insert lock */ - if (!TAS(&(XLogCtl->info_lck))) - { - LgwrRqst = XLogCtl->LgwrRqst; - LgwrResult = XLogCtl->LgwrResult; - S_UNLOCK(&(XLogCtl->info_lck)); + XLogwrtRqst LogwrtRqst; - /* - * If cache is half filled then try to acquire lgwr lock - * and do LGWR work, but only once. - */ - if (do_lgwr && - (LgwrRqst.Write.xlogid != LgwrResult.Write.xlogid || - (LgwrRqst.Write.xrecoff - LgwrResult.Write.xrecoff >= - XLogCtl->XLogCacheByte / 2))) + LogwrtRqst = XLogCtl->LogwrtRqst; + LogwrtResult = XLogCtl->LogwrtResult; + S_UNLOCK(&(XLogCtl->info_lck)); + + /* + * If cache is half filled then try to acquire logwrt lock + * and do LOGWRT work, but only once per XLogInsert call. + * Ignore any fractional blocks in performing this check. + */ + LogwrtRqst.Write.xrecoff -= LogwrtRqst.Write.xrecoff % BLCKSZ; + if (do_logwrt && + (LogwrtRqst.Write.xlogid != LogwrtResult.Write.xlogid || + (LogwrtRqst.Write.xrecoff >= LogwrtResult.Write.xrecoff + + XLogCtl->XLogCacheByte / 2))) + { + if (!TAS(&(XLogCtl->logwrt_lck))) { - if (!TAS(&(XLogCtl->lgwr_lck))) + LogwrtResult = XLogCtl->Write.LogwrtResult; + if (XLByteLT(LogwrtResult.Write, LogwrtRqst.Write)) { - LgwrResult = XLogCtl->Write.LgwrResult; - if (!TAS(&(XLogCtl->info_lck))) - { - LgwrRqst = XLogCtl->LgwrRqst; - S_UNLOCK(&(XLogCtl->info_lck)); - } - if (XLByteLT(LgwrResult.Write, LgwrRqst.Write)) - { - XLogWrite(NULL); - do_lgwr = false; - } - S_UNLOCK(&(XLogCtl->lgwr_lck)); + XLogWrite(LogwrtRqst); + do_logwrt = false; } + S_UNLOCK(&(XLogCtl->logwrt_lck)); } } - S_LOCK_SLEEP(&(XLogCtl->insert_lck), i++, XLOG_LOCK_TIMEOUT); - if (!TAS(&(XLogCtl->insert_lck))) - break; } + if (!TAS(&(XLogCtl->insert_lck))) + break; + S_LOCK_SLEEP(&(XLogCtl->insert_lck), i++, XLOG_LOCK_TIMEOUT); } - /* Race condition: RedoRecPtr was changed */ - RedoRecPtr = Insert->RedoRecPtr; - repeat = false; - for (i = 0; i < 2; i++) + /* + * Check to see if my RedoRecPtr is out of date. If so, may have to + * go back and recompute everything. This can only happen just after a + * checkpoint, so it's better to be slow in this case and fast otherwise. + */ + if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr)) { - if (dtbuf[i] == InvalidBuffer) - continue; - if (dtbuf_bkp[i] == false && - XLByteLE(dtbuf_lsn[i], RedoRecPtr)) + Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr)); + RedoRecPtr = Insert->RedoRecPtr; + + for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) { - dtbuf[i] = InvalidBuffer; - repeat = true; + if (dtbuf[i] == InvalidBuffer) + continue; + if (dtbuf_bkp[i] == false && + XLByteLE(dtbuf_lsn[i], RedoRecPtr)) + { + /* + * Oops, this buffer now needs to be backed up, but we didn't + * think so above. Start over. + */ + S_UNLOCK(&(XLogCtl->insert_lck)); + END_CRIT_SECTION(); + goto begin; + } } } - if (repeat) - { - S_UNLOCK(&(XLogCtl->insert_lck)); - END_CRIT_SECTION(); - goto begin; - } - /* Attach backup blocks to record data */ - for (i = 0; i < 2; i++) + /* + * Make additional rdata list entries for the backup blocks, so that + * we don't need to special-case them in the write loop. Note that we + * have now irrevocably changed the input rdata list. At the exit of + * this loop, write_len includes the backup block data. + * + * Also set the appropriate info bits to show which buffers were backed + * up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th distinct + * buffer value (ignoring InvalidBuffer) appearing in the rdata list. + */ + write_len = len; + for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) { if (dtbuf[i] == InvalidBuffer || !(dtbuf_bkp[i])) continue; - info |= (XLR_SET_BKP_BLOCK(i)); - - dtbuf_xlg[i].node = BufferGetFileNode(dtbuf[i]); - dtbuf_xlg[i].block = BufferGetBlockNumber(dtbuf[i]); - COMP_CRC64(dtbuf_crc[i], - ((char*)&(dtbuf_xlg[i]) + offsetof(BkpBlock, node)), - (sizeof(BkpBlock) - offsetof(BkpBlock, node))); - FIN_CRC64(dtbuf_crc[i]); - dtbuf_xlg[i].crc = dtbuf_crc[i]; + info |= XLR_SET_BKP_BLOCK(i); rdt->next = &(dtbuf_rdt[2 * i]); - dtbuf_rdt[2 * i].data = (char*)&(dtbuf_xlg[i]); + dtbuf_rdt[2 * i].data = (char*) &(dtbuf_xlg[i]); dtbuf_rdt[2 * i].len = sizeof(BkpBlock); - len += sizeof(BkpBlock); + write_len += sizeof(BkpBlock); rdt = dtbuf_rdt[2 * i].next = &(dtbuf_rdt[2 * i + 1]); - dtbuf_rdt[2 * i + 1].data = (char*)(BufferGetBlock(dtbuf[i])); + dtbuf_rdt[2 * i + 1].data = (char*) BufferGetBlock(dtbuf[i]); dtbuf_rdt[2 * i + 1].len = BLCKSZ; - len += BLCKSZ; + write_len += BLCKSZ; dtbuf_rdt[2 * i + 1].next = NULL; } - /* Insert record */ + /* Insert record header */ - freespace = ((char *) Insert->currpage) + BLCKSZ - Insert->currpos; + updrqst = false; + freespace = INSERT_FREESPACE(Insert); if (freespace < SizeOfXLogRecord) { - curridx = NextBufIdx(Insert->curridx); - if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write)) - InitXLBuffer(curridx); - else - GetFreeXLBuffer(); + updrqst = AdvanceXLInsertBuffer(); freespace = BLCKSZ - SizeOfXLogPHD; } - else - curridx = Insert->curridx; - freespace -= SizeOfXLogRecord; + curridx = Insert->curridx; record = (XLogRecord *) Insert->currpos; + record->xl_prev = Insert->PrevRecord; if (no_tran) { @@ -514,26 +656,26 @@ begin:; record->xl_xact_prev = MyLastRecPtr; record->xl_xid = GetCurrentTransactionId(); - record->xl_len = len; + record->xl_len = len; /* doesn't include backup blocks */ record->xl_info = info; record->xl_rmid = rmid; - COMP_CRC64(rdata_crc, ((char*)record + offsetof(XLogRecord, xl_prev)), - (SizeOfXLogRecord - offsetof(XLogRecord, xl_prev))); + /* Now we can finish computing the main CRC */ + COMP_CRC64(rdata_crc, (char*) record + sizeof(crc64), + SizeOfXLogRecord - sizeof(crc64)); FIN_CRC64(rdata_crc); record->xl_crc = rdata_crc; - RecPtr.xlogid = XLogCtl->xlblocks[curridx].xlogid; - RecPtr.xrecoff = - XLogCtl->xlblocks[curridx].xrecoff - BLCKSZ + - Insert->currpos - ((char *) Insert->currpage); + /* Compute record's XLOG location */ + INSERT_RECPTR(RecPtr, Insert, curridx); + + /* If first XLOG record of transaction, save it in PROC array */ if (MyLastRecPtr.xrecoff == 0 && !no_tran) { SpinAcquire(SInvalLock); MyProc->logRec = RecPtr; SpinRelease(SInvalLock); } - Insert->PrevRecord = RecPtr; if (XLOG_DEBUG) { @@ -546,14 +688,22 @@ begin:; strcat(buf, " - "); RmgrTable[record->xl_rmid].rm_desc(buf, record->xl_info, rdata->data); } - strcat(buf, "\n"); - write(2, buf, strlen(buf)); + fprintf(stderr, "%s\n", buf); } - MyLastRecPtr = RecPtr; /* begin of record */ + /* Record begin of record in appropriate places */ + if (!no_tran) + MyLastRecPtr = RecPtr; + ProcLastRecPtr = RecPtr; + Insert->PrevRecord = RecPtr; + Insert->currpos += SizeOfXLogRecord; + freespace -= SizeOfXLogRecord; - while (len) + /* + * Append the data, including backup blocks if any + */ + while (write_len) { while (rdata->data == NULL) rdata = rdata->next; @@ -565,13 +715,13 @@ begin:; memcpy(Insert->currpos, rdata->data, freespace); rdata->data += freespace; rdata->len -= freespace; - len -= freespace; + write_len -= freespace; } else { memcpy(Insert->currpos, rdata->data, rdata->len); freespace -= rdata->len; - len -= rdata->len; + write_len -= rdata->len; Insert->currpos += rdata->len; rdata = rdata->next; continue; @@ -579,49 +729,44 @@ begin:; } /* Use next buffer */ - curridx = NextBufIdx(curridx); - if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write)) - { - InitXLBuffer(curridx); - updrqst = true; - } - else - GetFreeXLBuffer(); - freespace = BLCKSZ - SizeOfXLogPHD - SizeOfXLogSubRecord; - Insert->currpage->xlp_info |= XLP_FIRST_IS_SUBRECORD; - subrecord = (XLogSubRecord *) Insert->currpos; - subrecord->xl_len = len; - Insert->currpos += SizeOfXLogSubRecord; + updrqst = AdvanceXLInsertBuffer(); + curridx = Insert->curridx; + /* Insert cont-record header */ + Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD; + contrecord = (XLogContRecord *) Insert->currpos; + contrecord->xl_rem_len = write_len; + Insert->currpos += SizeOfXLogContRecord; + freespace = BLCKSZ - SizeOfXLogPHD - SizeOfXLogContRecord; } - Insert->currpos = ((char *) Insert->currpage) + - MAXALIGN(Insert->currpos - ((char *) Insert->currpage)); - freespace = ((char *) Insert->currpage) + BLCKSZ - Insert->currpos; + /* Ensure next record will be properly aligned */ + Insert->currpos = (char *) Insert->currpage + + MAXALIGN(Insert->currpos - (char *) Insert->currpage); + freespace = INSERT_FREESPACE(Insert); /* - * Begin of the next record will be stored as LSN for - * changed data page... + * The recptr I return is the beginning of the *next* record. + * This will be stored as LSN for changed data pages... */ - RecPtr.xlogid = XLogCtl->xlblocks[curridx].xlogid; - RecPtr.xrecoff = - XLogCtl->xlblocks[curridx].xrecoff - BLCKSZ + - Insert->currpos - ((char *) Insert->currpage); + INSERT_RECPTR(RecPtr, Insert, curridx); - /* Need to update global LgwrRqst if some block was filled up */ + /* Need to update shared LogwrtRqst if some block was filled up */ if (freespace < SizeOfXLogRecord) updrqst = true; /* curridx is filled and available for writing out */ else curridx = PrevBufIdx(curridx); - LgwrRqst.Write = XLogCtl->xlblocks[curridx]; + WriteRqst = XLogCtl->xlblocks[curridx]; S_UNLOCK(&(XLogCtl->insert_lck)); if (updrqst) { S_LOCK(&(XLogCtl->info_lck)); - LgwrResult = XLogCtl->LgwrResult; - if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrRqst.Write)) - XLogCtl->LgwrRqst.Write = LgwrRqst.Write; + /* advance global request to include new block(s) */ + if (XLByteLT(XLogCtl->LogwrtRqst.Write, WriteRqst)) + XLogCtl->LogwrtRqst.Write = WriteRqst; + /* update local result copy while I have the chance */ + LogwrtResult = XLogCtl->LogwrtResult; S_UNLOCK(&(XLogCtl->info_lck)); } @@ -629,327 +774,403 @@ begin:; return (RecPtr); } -void -XLogFlush(XLogRecPtr record) +/* + * Advance the Insert state to the next buffer page, writing out the next + * buffer if it still contains unwritten data. + * + * The global LogwrtRqst.Write pointer needs to be advanced to include the + * just-filled page. If we can do this for free (without an extra spinlock), + * we do so here. Otherwise the caller must do it. We return TRUE if the + * request update still needs to be done, FALSE if we did it internally. + * + * Must be called with insert_lck held. + */ +static bool +AdvanceXLInsertBuffer(void) { - XLogRecPtr WriteRqst; - char buffer[BLCKSZ]; - char *usebuf = NULL; - unsigned spins = 0; - bool force_lgwr = false; + XLogCtlInsert *Insert = &XLogCtl->Insert; + XLogCtlWrite *Write = &XLogCtl->Write; + uint16 nextidx = NextBufIdx(Insert->curridx); + bool update_needed = true; + XLogRecPtr OldPageRqstPtr; + XLogwrtRqst WriteRqst; - if (XLOG_DEBUG) - { - fprintf(stderr, "XLogFlush%s%s: rqst %u/%u; wrt %u/%u; flsh %u/%u\n", - (IsBootstrapProcessingMode()) ? "(bootstrap)" : "", - (InRedo) ? "(redo)" : "", - record.xlogid, record.xrecoff, - LgwrResult.Write.xlogid, LgwrResult.Write.xrecoff, - LgwrResult.Flush.xlogid, LgwrResult.Flush.xrecoff); - fflush(stderr); - } + /* Use Insert->LogwrtResult copy if it's more fresh */ + if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write)) + LogwrtResult = Insert->LogwrtResult; - if (InRedo) - return; - if (XLByteLE(record, LgwrResult.Flush)) - return; + /* + * Get ending-offset of the buffer page we need to replace (this may be + * zero if the buffer hasn't been used yet). Fall through if it's already + * written out. + */ + OldPageRqstPtr = XLogCtl->xlblocks[nextidx]; + if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write)) + { + /* nope, got work to do... */ + unsigned spins = 0; + XLogRecPtr FinishedPageRqstPtr; - START_CRIT_SECTION(); + FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx]; - WriteRqst = LgwrRqst.Write; - for (;;) - { - /* try to read LgwrResult */ - if (!TAS(&(XLogCtl->info_lck))) + for (;;) { - LgwrResult = XLogCtl->LgwrResult; - if (XLByteLE(record, LgwrResult.Flush)) + /* While waiting, try to get info_lck and update LogwrtResult */ + if (!TAS(&(XLogCtl->info_lck))) { + if (XLByteLT(XLogCtl->LogwrtRqst.Write, FinishedPageRqstPtr)) + XLogCtl->LogwrtRqst.Write = FinishedPageRqstPtr; + update_needed = false; /* Did the shared-request update */ + LogwrtResult = XLogCtl->LogwrtResult; S_UNLOCK(&(XLogCtl->info_lck)); - END_CRIT_SECTION(); - return; - } - if (XLByteLT(XLogCtl->LgwrRqst.Flush, record)) - XLogCtl->LgwrRqst.Flush = record; - if (XLByteLT(WriteRqst, XLogCtl->LgwrRqst.Write)) - { - WriteRqst = XLogCtl->LgwrRqst.Write; - usebuf = NULL; - } - S_UNLOCK(&(XLogCtl->info_lck)); - } - /* if something was added to log cache then try to flush this too */ - if (!TAS(&(XLogCtl->insert_lck))) - { - XLogCtlInsert *Insert = &XLogCtl->Insert; - uint32 freespace = - ((char *) Insert->currpage) + BLCKSZ - Insert->currpos; - if (freespace < SizeOfXLogRecord) /* buffer is full */ - { - usebuf = NULL; - LgwrRqst.Write = WriteRqst = XLogCtl->xlblocks[Insert->curridx]; - } - else - { - usebuf = buffer; - memcpy(usebuf, Insert->currpage, BLCKSZ - freespace); - memset(usebuf + BLCKSZ - freespace, 0, freespace); - WriteRqst = XLogCtl->xlblocks[Insert->curridx]; - WriteRqst.xrecoff = WriteRqst.xrecoff - BLCKSZ + - Insert->currpos - ((char *) Insert->currpage); - } - S_UNLOCK(&(XLogCtl->insert_lck)); - force_lgwr = true; - } - if (force_lgwr || WriteRqst.xlogid > record.xlogid || - (WriteRqst.xlogid == record.xlogid && - WriteRqst.xrecoff >= record.xrecoff + BLCKSZ)) - { - if (!TAS(&(XLogCtl->lgwr_lck))) - { - LgwrResult = XLogCtl->Write.LgwrResult; - if (XLByteLE(record, LgwrResult.Flush)) + if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write)) { - S_UNLOCK(&(XLogCtl->lgwr_lck)); - END_CRIT_SECTION(); - return; + /* OK, someone wrote it already */ + Insert->LogwrtResult = LogwrtResult; + break; } - if (XLByteLT(LgwrResult.Write, WriteRqst)) + } + + /* + * LogwrtResult lock is busy or we know the page is still dirty. + * Try to acquire logwrt lock and write full blocks. + */ + if (!TAS(&(XLogCtl->logwrt_lck))) + { + LogwrtResult = Write->LogwrtResult; + if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write)) { - LgwrRqst.Flush = LgwrRqst.Write = WriteRqst; - XLogWrite(usebuf); - S_UNLOCK(&(XLogCtl->lgwr_lck)); - if (XLByteLT(LgwrResult.Flush, record)) - elog(STOP, "XLogFlush: request is not satisfied"); - END_CRIT_SECTION(); - return; + S_UNLOCK(&(XLogCtl->logwrt_lck)); + /* OK, someone wrote it already */ + Insert->LogwrtResult = LogwrtResult; + break; } + /* + * Have to write buffers while holding insert lock. + * This is not good, so only write as much as we absolutely + * must. + */ + WriteRqst.Write = OldPageRqstPtr; + WriteRqst.Flush.xlogid = 0; + WriteRqst.Flush.xrecoff = 0; + XLogWrite(WriteRqst); + S_UNLOCK(&(XLogCtl->logwrt_lck)); + Insert->LogwrtResult = LogwrtResult; break; } + S_LOCK_SLEEP(&(XLogCtl->logwrt_lck), spins++, XLOG_LOCK_TIMEOUT); } - S_LOCK_SLEEP(&(XLogCtl->lgwr_lck), spins++, XLOG_LOCK_TIMEOUT); - } - - if (logFile >= 0 && (LgwrResult.Write.xlogid != logId || - (LgwrResult.Write.xrecoff - 1) / XLogSegSize != logSeg)) - { - if (close(logFile) != 0) - elog(STOP, "close(logfile %u seg %u) failed: %m", - logId, logSeg); - logFile = -1; - } - - if (logFile < 0) - { - logId = LgwrResult.Write.xlogid; - logSeg = (LgwrResult.Write.xrecoff - 1) / XLogSegSize; - logOff = 0; - logFile = XLogFileOpen(logId, logSeg, false); } - if (pg_fdatasync(logFile) != 0) - elog(STOP, "fsync(logfile %u seg %u) failed: %m", - logId, logSeg); - LgwrResult.Flush = LgwrResult.Write; - - S_LOCK(&(XLogCtl->info_lck)); - XLogCtl->LgwrResult = LgwrResult; - if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrResult.Write)) - XLogCtl->LgwrRqst.Write = LgwrResult.Write; - S_UNLOCK(&(XLogCtl->info_lck)); - - XLogCtl->Write.LgwrResult = LgwrResult; - - S_UNLOCK(&(XLogCtl->lgwr_lck)); - - END_CRIT_SECTION(); - return; - -} - -/* - * We use this routine when Insert->curridx block is full and the next XLOG - * buffer looks as unwritten to OS' cache. insert_lck is assumed here. - */ -static void -GetFreeXLBuffer() -{ - XLogCtlInsert *Insert = &XLogCtl->Insert; - XLogCtlWrite *Write = &XLogCtl->Write; - uint16 curridx = NextBufIdx(Insert->curridx); - unsigned spins = 0; - - /* Use Insert->LgwrResult copy if it's more fresh */ - if (XLByteLT(LgwrResult.Write, Insert->LgwrResult.Write)) + /* + * Now the next buffer slot is free and we can set it up to be the + * next output page. + */ + if (XLogCtl->xlblocks[Insert->curridx].xrecoff >= XLogFileSize) { - LgwrResult = Insert->LgwrResult; - if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write)) - { - InitXLBuffer(curridx); - return; - } + /* crossing a logid boundary */ + XLogCtl->xlblocks[nextidx].xlogid = + XLogCtl->xlblocks[Insert->curridx].xlogid + 1; + XLogCtl->xlblocks[nextidx].xrecoff = BLCKSZ; } - - LgwrRqst.Write = XLogCtl->xlblocks[Insert->curridx]; - for (;;) + else { - if (!TAS(&(XLogCtl->info_lck))) - { - LgwrResult = XLogCtl->LgwrResult; - /* LgwrRqst.Write GE XLogCtl->LgwrRqst.Write */ - XLogCtl->LgwrRqst.Write = LgwrRqst.Write; - S_UNLOCK(&(XLogCtl->info_lck)); - if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write)) - { - Insert->LgwrResult = LgwrResult; - InitXLBuffer(curridx); - return; - } - } - - /* - * LgwrResult lock is busy or un-updated. Try to acquire lgwr lock - * and write full blocks. - */ - if (!TAS(&(XLogCtl->lgwr_lck))) - { - LgwrResult = Write->LgwrResult; - if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write)) - { - S_UNLOCK(&(XLogCtl->lgwr_lck)); - Insert->LgwrResult = LgwrResult; - InitXLBuffer(curridx); - return; - } - - /* - * Have to write buffers while holding insert lock - not - * good... - */ - XLogWrite(NULL); - S_UNLOCK(&(XLogCtl->lgwr_lck)); - Insert->LgwrResult = LgwrResult; - InitXLBuffer(curridx); - return; - } - S_LOCK_SLEEP(&(XLogCtl->lgwr_lck), spins++, XLOG_LOCK_TIMEOUT); + XLogCtl->xlblocks[nextidx].xlogid = + XLogCtl->xlblocks[Insert->curridx].xlogid; + XLogCtl->xlblocks[nextidx].xrecoff = + XLogCtl->xlblocks[Insert->curridx].xrecoff + BLCKSZ; } + Insert->curridx = nextidx; + Insert->currpage = (XLogPageHeader) (XLogCtl->pages + nextidx * BLCKSZ); + Insert->currpos = ((char*) Insert->currpage) + SizeOfXLogPHD; + /* + * Be sure to re-zero the buffer so that bytes beyond what we've written + * will look like zeroes and not valid XLOG records... + */ + MemSet((char*) Insert->currpage, 0, BLCKSZ); + Insert->currpage->xlp_magic = XLOG_PAGE_MAGIC; + /* Insert->currpage->xlp_info = 0; */ /* done by memset */ + + return update_needed; } +/* + * Write and/or fsync the log at least as far as WriteRqst indicates. + * + * Must be called with logwrt_lck held. + */ static void -XLogWrite(char *buffer) +XLogWrite(XLogwrtRqst WriteRqst) { XLogCtlWrite *Write = &XLogCtl->Write; char *from; - uint32 wcnt = 0; + bool ispartialpage; bool usexistent; - for (; XLByteLT(LgwrResult.Write, LgwrRqst.Write);) + /* Update local LogwrtResult (caller probably did this already, but...) */ + LogwrtResult = Write->LogwrtResult; + + while (XLByteLT(LogwrtResult.Write, WriteRqst.Write)) { - LgwrResult.Write = XLogCtl->xlblocks[Write->curridx]; - if (LgwrResult.Write.xlogid != logId || - (LgwrResult.Write.xrecoff - 1) / XLogSegSize != logSeg) + /* Advance LogwrtResult.Write to end of current buffer page */ + LogwrtResult.Write = XLogCtl->xlblocks[Write->curridx]; + ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write); + + if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg)) { - if (wcnt > 0) - { - if (pg_fdatasync(logFile) != 0) - elog(STOP, "fsync(logfile %u seg %u) failed: %m", - logId, logSeg); - if (LgwrResult.Write.xlogid != logId) - LgwrResult.Flush.xrecoff = XLogFileSize; - else - LgwrResult.Flush.xrecoff = LgwrResult.Write.xrecoff - BLCKSZ; - LgwrResult.Flush.xlogid = logId; - if (!TAS(&(XLogCtl->info_lck))) - { - XLogCtl->LgwrResult.Flush = LgwrResult.Flush; - XLogCtl->LgwrResult.Write = LgwrResult.Flush; - if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrResult.Flush)) - XLogCtl->LgwrRqst.Write = LgwrResult.Flush; - if (XLByteLT(XLogCtl->LgwrRqst.Flush, LgwrResult.Flush)) - XLogCtl->LgwrRqst.Flush = LgwrResult.Flush; - S_UNLOCK(&(XLogCtl->info_lck)); - } - } - if (logFile >= 0) + /* + * Switch to new logfile segment. + */ + if (openLogFile >= 0) { - if (close(logFile) != 0) + if (close(openLogFile) != 0) elog(STOP, "close(logfile %u seg %u) failed: %m", - logId, logSeg); - logFile = -1; + openLogId, openLogSeg); + openLogFile = -1; } - logId = LgwrResult.Write.xlogid; - logSeg = (LgwrResult.Write.xrecoff - 1) / XLogSegSize; - logOff = 0; + XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg); + + /* create/use new log file; need lock in case creating */ SpinAcquire(ControlFileLockId); - /* create/use new log file */ usexistent = true; - logFile = XLogFileInit(logId, logSeg, &usexistent); - ControlFile->logId = logId; - ControlFile->logSeg = logSeg + 1; - ControlFile->time = time(NULL); - UpdateControlFile(); + openLogFile = XLogFileInit(openLogId, openLogSeg, &usexistent); + openLogOff = 0; + /* update pg_control, unless someone else already did */ + if (ControlFile->logId != openLogId || + ControlFile->logSeg != openLogSeg + 1) + { + ControlFile->logId = openLogId; + ControlFile->logSeg = openLogSeg + 1; + ControlFile->time = time(NULL); + UpdateControlFile(); + } SpinRelease(ControlFileLockId); - if (!usexistent) /* there was no file */ + + if (!usexistent) /* there was no precreated file */ elog(LOG, "XLogWrite: new log file created - " - "try to increase WAL_FILES"); + "consider increasing WAL_FILES"); + + /* + * Signal postmaster to start a checkpoint if it's been too + * long since the last one. (We look at local copy of RedoRecPtr + * which might be a little out of date, but should be close enough + * for this purpose.) + */ + if (IsUnderPostmaster && + (openLogId != RedoRecPtr.xlogid || + openLogSeg >= (RedoRecPtr.xrecoff / XLogSegSize) + + (uint32) CheckPointSegments)) + { + if (XLOG_DEBUG) + fprintf(stderr, "XLogWrite: time for a checkpoint, signaling postmaster\n"); + kill(getppid(), SIGUSR1); + } } - if (logFile < 0) + if (openLogFile < 0) { - logId = LgwrResult.Write.xlogid; - logSeg = (LgwrResult.Write.xrecoff - 1) / XLogSegSize; - logOff = 0; - logFile = XLogFileOpen(logId, logSeg, false); + XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg); + openLogFile = XLogFileOpen(openLogId, openLogSeg, false); + openLogOff = 0; } - if (logOff != (LgwrResult.Write.xrecoff - BLCKSZ) % XLogSegSize) + /* Need to seek in the file? */ + if (openLogOff != (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize) { - logOff = (LgwrResult.Write.xrecoff - BLCKSZ) % XLogSegSize; - if (lseek(logFile, (off_t) logOff, SEEK_SET) < 0) + openLogOff = (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize; + if (lseek(openLogFile, (off_t) openLogOff, SEEK_SET) < 0) elog(STOP, "lseek(logfile %u seg %u off %u) failed: %m", - logId, logSeg, logOff); + openLogId, openLogSeg, openLogOff); } - if (buffer != NULL && XLByteLT(LgwrRqst.Write, LgwrResult.Write)) - from = buffer; - else - from = XLogCtl->pages + Write->curridx * BLCKSZ; - - if (write(logFile, from, BLCKSZ) != BLCKSZ) + /* OK to write the page */ + from = XLogCtl->pages + Write->curridx * BLCKSZ; + if (write(openLogFile, from, BLCKSZ) != BLCKSZ) elog(STOP, "write(logfile %u seg %u off %u) failed: %m", - logId, logSeg, logOff); + openLogId, openLogSeg, openLogOff); + openLogOff += BLCKSZ; - wcnt++; - logOff += BLCKSZ; + /* + * If we just wrote the whole last page of a logfile segment, + * fsync the segment immediately. This avoids having to go back + * and re-open prior segments when an fsync request comes along later. + * Doing it here ensures that one and only one backend will perform + * this fsync. + */ + if (openLogOff >= XLogSegSize && !ispartialpage) + { + if (pg_fdatasync(openLogFile) != 0) + elog(STOP, "fsync(logfile %u seg %u) failed: %m", + openLogId, openLogSeg); + LogwrtResult.Flush = LogwrtResult.Write; /* end of current page */ + } - if (from != buffer) - Write->curridx = NextBufIdx(Write->curridx); - else - LgwrResult.Write = LgwrRqst.Write; + if (ispartialpage) + { + /* Only asked to write a partial page */ + LogwrtResult.Write = WriteRqst.Write; + break; + } + Write->curridx = NextBufIdx(Write->curridx); } - if (wcnt == 0) - elog(STOP, "XLogWrite: nothing written"); - if (XLByteLT(LgwrResult.Flush, LgwrRqst.Flush) && - XLByteLE(LgwrRqst.Flush, LgwrResult.Write)) + /* + * If asked to flush, do so + */ + if (XLByteLT(LogwrtResult.Flush, WriteRqst.Flush) && + XLByteLT(LogwrtResult.Flush, LogwrtResult.Write)) { - if (pg_fdatasync(logFile) != 0) + /* + * Could get here without iterating above loop, in which case + * we might have no open file or the wrong one. However, we do + * not need to fsync more than one file. + */ + if (openLogFile >= 0 && + !XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg)) + { + if (close(openLogFile) != 0) + elog(STOP, "close(logfile %u seg %u) failed: %m", + openLogId, openLogSeg); + openLogFile = -1; + } + if (openLogFile < 0) + { + XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg); + openLogFile = XLogFileOpen(openLogId, openLogSeg, false); + openLogOff = 0; + } + + if (pg_fdatasync(openLogFile) != 0) elog(STOP, "fsync(logfile %u seg %u) failed: %m", - logId, logSeg); - LgwrResult.Flush = LgwrResult.Write; + openLogId, openLogSeg); + LogwrtResult.Flush = LogwrtResult.Write; } + /* + * Update shared-memory status + * + * We make sure that the shared 'request' values do not fall behind + * the 'result' values. This is not absolutely essential, but it saves + * some code in a couple of places. + */ S_LOCK(&(XLogCtl->info_lck)); - XLogCtl->LgwrResult = LgwrResult; - if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrResult.Write)) - XLogCtl->LgwrRqst.Write = LgwrResult.Write; + XLogCtl->LogwrtResult = LogwrtResult; + if (XLByteLT(XLogCtl->LogwrtRqst.Write, LogwrtResult.Write)) + XLogCtl->LogwrtRqst.Write = LogwrtResult.Write; + if (XLByteLT(XLogCtl->LogwrtRqst.Flush, LogwrtResult.Flush)) + XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush; S_UNLOCK(&(XLogCtl->info_lck)); - Write->LgwrResult = LgwrResult; + Write->LogwrtResult = LogwrtResult; +} + +/* + * Ensure that all XLOG data through the given position is flushed to disk. + * + * NOTE: this differs from XLogWrite mainly in that the logwrt_lck is not + * already held, and we try to avoid acquiring it if possible. + */ +void +XLogFlush(XLogRecPtr record) +{ + XLogRecPtr WriteRqstPtr; + XLogwrtRqst WriteRqst; + unsigned spins = 0; + + if (XLOG_DEBUG) + { + fprintf(stderr, "XLogFlush%s%s: rqst %u/%u; wrt %u/%u; flsh %u/%u\n", + (IsBootstrapProcessingMode()) ? "(bootstrap)" : "", + (InRedo) ? "(redo)" : "", + record.xlogid, record.xrecoff, + LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff, + LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff); + fflush(stderr); + } + + /* Disabled during REDO */ + if (InRedo) + return; + + /* Quick exit if already known flushed */ + if (XLByteLE(record, LogwrtResult.Flush)) + return; + + START_CRIT_SECTION(); + + /* + * Since fsync is usually a horribly expensive operation, we try to + * piggyback as much data as we can on each fsync: if we see any more + * data entered into the xlog buffer, we'll write and fsync that too, + * so that the final value of LogwrtResult.Flush is as large as possible. + * This gives us some chance of avoiding another fsync immediately after. + */ + + /* initialize to given target; may increase below */ + WriteRqstPtr = record; + + for (;;) + { + /* try to read LogwrtResult and update local state */ + if (!TAS(&(XLogCtl->info_lck))) + { + if (XLByteLT(WriteRqstPtr, XLogCtl->LogwrtRqst.Write)) + WriteRqstPtr = XLogCtl->LogwrtRqst.Write; + LogwrtResult = XLogCtl->LogwrtResult; + S_UNLOCK(&(XLogCtl->info_lck)); + if (XLByteLE(record, LogwrtResult.Flush)) + { + /* Done already */ + break; + } + } + /* if something was added to log cache then try to flush this too */ + if (!TAS(&(XLogCtl->insert_lck))) + { + XLogCtlInsert *Insert = &XLogCtl->Insert; + uint32 freespace = INSERT_FREESPACE(Insert); + + if (freespace < SizeOfXLogRecord) /* buffer is full */ + { + WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx]; + } + else + { + WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx]; + WriteRqstPtr.xrecoff -= freespace; + } + S_UNLOCK(&(XLogCtl->insert_lck)); + } + /* now try to get the logwrt lock */ + if (!TAS(&(XLogCtl->logwrt_lck))) + { + LogwrtResult = XLogCtl->Write.LogwrtResult; + if (XLByteLE(record, LogwrtResult.Flush)) + { + /* Done already */ + S_UNLOCK(&(XLogCtl->logwrt_lck)); + break; + } + WriteRqst.Write = WriteRqstPtr; + WriteRqst.Flush = record; + XLogWrite(WriteRqst); + S_UNLOCK(&(XLogCtl->logwrt_lck)); + if (XLByteLT(LogwrtResult.Flush, record)) + elog(STOP, "XLogFlush: request is not satisfied"); + break; + } + S_LOCK_SLEEP(&(XLogCtl->logwrt_lck), spins++, XLOG_LOCK_TIMEOUT); + } + + END_CRIT_SECTION(); } +/* + * Create a new XLOG file segment, or open a pre-existing one. + * + * Returns FD of opened file. + */ static int XLogFileInit(uint32 log, uint32 seg, bool *usexistent) { @@ -962,7 +1183,7 @@ XLogFileInit(uint32 log, uint32 seg, bool *usexistent) XLogFileName(path, log, seg); /* - * Try to use existent file (checkpoint maker creates it sometimes). + * Try to use existent file (checkpoint maker may have created it already) */ if (*usexistent) { @@ -971,10 +1192,11 @@ XLogFileInit(uint32 log, uint32 seg, bool *usexistent) { if (errno != ENOENT) elog(STOP, "InitOpen(logfile %u seg %u) failed: %m", - logId, logSeg); + log, seg); } else return(fd); + /* Set flag to tell caller there was no existent file */ *usexistent = false; } @@ -982,10 +1204,11 @@ XLogFileInit(uint32 log, uint32 seg, bool *usexistent) unlink(tpath); unlink(path); - fd = BasicOpenFile(tpath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, S_IRUSR | S_IWUSR); + fd = BasicOpenFile(tpath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, + S_IRUSR | S_IWUSR); if (fd < 0) elog(STOP, "InitCreate(logfile %u seg %u) failed: %m", - logId, logSeg); + log, seg); /* * Zero-fill the file. We have to do this the hard way to ensure that @@ -1000,13 +1223,21 @@ XLogFileInit(uint32 log, uint32 seg, bool *usexistent) for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(zbuffer)) { if ((int) write(fd, zbuffer, sizeof(zbuffer)) != (int) sizeof(zbuffer)) + { + int save_errno = errno; + + /* If we fail to make the file, delete it to release disk space */ + unlink(tpath); + errno = save_errno; + elog(STOP, "ZeroFill(logfile %u seg %u) failed: %m", - logId, logSeg); + log, seg); + } } if (pg_fsync(fd) != 0) elog(STOP, "fsync(logfile %u seg %u) failed: %m", - logId, logSeg); + log, seg); close(fd); @@ -1018,22 +1249,25 @@ XLogFileInit(uint32 log, uint32 seg, bool *usexistent) #ifndef __BEOS__ if (link(tpath, path) < 0) elog(STOP, "InitRelink(logfile %u seg %u) failed: %m", - logId, logSeg); + log, seg); unlink(tpath); #else if (rename(tpath, path) < 0) elog(STOP, "InitRelink(logfile %u seg %u) failed: %m", - logId, logSeg); + log, seg); #endif fd = BasicOpenFile(path, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR); if (fd < 0) elog(STOP, "InitReopen(logfile %u seg %u) failed: %m", - logId, logSeg); + log, seg); return (fd); } +/* + * Open a pre-existing logfile segment. + */ static int XLogFileOpen(uint32 log, uint32 seg, bool econt) { @@ -1048,51 +1282,94 @@ XLogFileOpen(uint32 log, uint32 seg, bool econt) if (econt && errno == ENOENT) { elog(LOG, "open(logfile %u seg %u) failed: %m", - logId, logSeg); + log, seg); return (fd); } elog(STOP, "open(logfile %u seg %u) failed: %m", - logId, logSeg); + log, seg); } return (fd); } /* - * (Re)move offline log files older or equal to passwd one + * Preallocate log files beyond the specified log endpoint, according to + * the XLOGfile user parameter. + */ +static void +PreallocXlogFiles(XLogRecPtr endptr) +{ + uint32 _logId; + uint32 _logSeg; + int lf; + bool usexistent; + struct timeval delay; + int i; + + XLByteToPrevSeg(endptr, _logId, _logSeg); + if (XLOGfiles > 0) + { + for (i = 1; i <= XLOGfiles; i++) + { + usexistent = true; + NextLogSeg(_logId, _logSeg); + SpinAcquire(ControlFileLockId); + lf = XLogFileInit(_logId, _logSeg, &usexistent); + close(lf); + SpinRelease(ControlFileLockId); + /* + * Give up ControlFileLockId for 1/50 sec to let other + * backends switch to new log file in XLogWrite() + */ + delay.tv_sec = 0; + delay.tv_usec = 20000; + (void) select(0, NULL, NULL, NULL, &delay); + } + } + else if ((endptr.xrecoff - 1) % XLogSegSize >= + (uint32) (0.75 * XLogSegSize)) + { + usexistent = true; + NextLogSeg(_logId, _logSeg); + SpinAcquire(ControlFileLockId); + lf = XLogFileInit(_logId, _logSeg, &usexistent); + close(lf); + SpinRelease(ControlFileLockId); + } +} + +/* + * Remove or move offline all log files older or equal to passed log/seg# */ static void -MoveOfflineLogs(char *archdir, uint32 _logId, uint32 _logSeg) +MoveOfflineLogs(uint32 log, uint32 seg) { DIR *xldir; struct dirent *xlde; char lastoff[32]; char path[MAXPGPATH]; - Assert(archdir[0] == 0); /* ! implemented yet */ + Assert(XLOG_archive_dir[0] == 0); /* ! implemented yet */ xldir = opendir(XLogDir); if (xldir == NULL) elog(STOP, "MoveOfflineLogs: cannot open xlog dir: %m"); - sprintf(lastoff, "%08X%08X", _logId, _logSeg); + sprintf(lastoff, "%08X%08X", log, seg); errno = 0; while ((xlde = readdir(xldir)) != NULL) { - if (strlen(xlde->d_name) != 16 || - strspn(xlde->d_name, "0123456789ABCDEF") != 16) - continue; - if (strcmp(xlde->d_name, lastoff) > 0) + if (strlen(xlde->d_name) == 16 && + strspn(xlde->d_name, "0123456789ABCDEF") == 16 && + strcmp(xlde->d_name, lastoff) <= 0) { - errno = 0; - continue; + elog(LOG, "MoveOfflineLogs: %s %s", (XLOG_archive_dir[0]) ? + "archive" : "remove", xlde->d_name); + sprintf(path, "%s%c%s", XLogDir, SEP_CHAR, xlde->d_name); + if (XLOG_archive_dir[0] == 0) + unlink(path); } - elog(LOG, "MoveOfflineLogs: %s %s", (archdir[0]) ? - "archive" : "remove", xlde->d_name); - sprintf(path, "%s%c%s", XLogDir, SEP_CHAR, xlde->d_name); - if (archdir[0] == 0) - unlink(path); errno = 0; } if (errno) @@ -1100,6 +1377,11 @@ MoveOfflineLogs(char *archdir, uint32 _logId, uint32 _logSeg) closedir(xldir); } +/* + * Restore the backup blocks present in an XLOG record, if any. + * + * We assume all of the record has been read into memory at *record. + */ static void RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn) { @@ -1110,9 +1392,10 @@ RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn) char *blk; int i; - for (i = 0, blk = (char*)XLogRecGetData(record) + record->xl_len; i < 2; i++) + blk = (char*)XLogRecGetData(record) + record->xl_len; + for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) { - if (!(record->xl_info & (XLR_SET_BKP_BLOCK(i)))) + if (!(record->xl_info & XLR_SET_BKP_BLOCK(i))) continue; memcpy((char*)&bkpb, blk, sizeof(BkpBlock)); @@ -1137,6 +1420,13 @@ RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn) } } +/* + * CRC-check an XLOG record. We do not believe the contents of an XLOG + * record (other than to the minimal extent of computing the amount of + * data to read in) until we've checked the CRCs. + * + * We assume all of the record has been read into memory at *record. + */ static bool RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode) { @@ -1146,83 +1436,93 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode) uint32 len = record->xl_len; char *blk; - for (i = 0; i < 2; i++) - { - if (!(record->xl_info & (XLR_SET_BKP_BLOCK(i)))) - continue; - - if (len <= (sizeof(BkpBlock) + BLCKSZ)) - { - elog(emode, "ReadRecord: record at %u/%u is too short to keep bkp block", - recptr.xlogid, recptr.xrecoff); - return(false); - } - len -= sizeof(BkpBlock); - len -= BLCKSZ; - } - - /* CRC of rmgr data */ + /* Check CRC of rmgr data and record header */ INIT_CRC64(crc); - COMP_CRC64(crc, ((char*)XLogRecGetData(record)), len); - COMP_CRC64(crc, ((char*)record + offsetof(XLogRecord, xl_prev)), - (SizeOfXLogRecord - offsetof(XLogRecord, xl_prev))); + COMP_CRC64(crc, XLogRecGetData(record), len); + COMP_CRC64(crc, (char*) record + sizeof(crc64), + SizeOfXLogRecord - sizeof(crc64)); FIN_CRC64(crc); - if (record->xl_crc.crc1 != crc.crc1 || record->xl_crc.crc2 != crc.crc2) + if (!EQ_CRC64(record->xl_crc, crc)) { elog(emode, "ReadRecord: bad rmgr data CRC in record at %u/%u", - recptr.xlogid, recptr.xrecoff); + recptr.xlogid, recptr.xrecoff); return(false); } - if (record->xl_len == len) - return(true); - - for (i = 0, blk = (char*)XLogRecGetData(record) + len; i < 2; i++) + /* Check CRCs of backup blocks, if any */ + blk = (char*)XLogRecGetData(record) + len; + for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) { - if (!(record->xl_info & (XLR_SET_BKP_BLOCK(i)))) + if (!(record->xl_info & XLR_SET_BKP_BLOCK(i))) continue; INIT_CRC64(crc); - COMP_CRC64(crc, (blk + sizeof(BkpBlock)), BLCKSZ); - COMP_CRC64(crc, (blk + offsetof(BkpBlock, node)), - (sizeof(BkpBlock) - offsetof(BkpBlock, node))); + COMP_CRC64(crc, blk + sizeof(BkpBlock), BLCKSZ); + COMP_CRC64(crc, blk + sizeof(crc64), + sizeof(BkpBlock) - sizeof(crc64)); FIN_CRC64(crc); - memcpy((char*)&cbuf, blk, sizeof(crc64)); + memcpy((char*)&cbuf, blk, sizeof(crc64)); /* don't assume alignment */ - if (cbuf.crc1 != crc.crc1 || cbuf.crc2 != crc.crc2) + if (!EQ_CRC64(cbuf, crc)) { elog(emode, "ReadRecord: bad bkp block %d CRC in record at %u/%u", - i + 1, recptr.xlogid, recptr.xrecoff); + i + 1, recptr.xlogid, recptr.xrecoff); return(false); } - blk += sizeof(BkpBlock); - blk += BLCKSZ; + blk += sizeof(BkpBlock) + BLCKSZ; } - record->xl_len = len; /* !!! */ - return(true); } +/* + * Attempt to read an XLOG record. + * + * If RecPtr is not NULL, try to read a record at that position. Otherwise + * try to read a record just after the last one previously read. + * + * If no valid record is available, returns NULL, or fails if emode is STOP. + * (emode must be either STOP or LOG.) + * + * buffer is a workspace at least _INTL_MAXLOGRECSZ bytes long. It is needed + * to reassemble a record that crosses block boundaries. Note that on + * successful return, the returned record pointer always points at buffer. + */ static XLogRecord * -ReadRecord(XLogRecPtr *RecPtr, char *buffer) +ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer) { XLogRecord *record; XLogRecPtr tmpRecPtr = EndRecPtr; - uint32 len; - bool nextmode = (RecPtr == NULL); - int emode = (nextmode) ? LOG : STOP; - bool noBlck = false; + uint32 len, + total_len; + uint32 targetPageOff; + unsigned i; + + if (readBuf == NULL) + { + /* + * First time through, permanently allocate readBuf. We do it + * this way, rather than just making a static array, for two + * reasons: (1) no need to waste the storage in most instantiations + * of the backend; (2) a static char array isn't guaranteed to + * have any particular alignment, whereas malloc() will provide + * MAXALIGN'd storage. + */ + readBuf = (char *) malloc(BLCKSZ); + Assert(readBuf != NULL); + } - if (nextmode) + if (RecPtr == NULL) { RecPtr = &tmpRecPtr; + /* fast case if next record is on same page */ if (nextRecord != NULL) { record = nextRecord; goto got_record; } + /* align old recptr to next page */ if (tmpRecPtr.xrecoff % BLCKSZ != 0) tmpRecPtr.xrecoff += (BLCKSZ - tmpRecPtr.xrecoff % BLCKSZ); if (tmpRecPtr.xrecoff >= XLogFileSize) @@ -1236,31 +1536,36 @@ ReadRecord(XLogRecPtr *RecPtr, char *buffer) elog(STOP, "ReadRecord: invalid record offset at (%u, %u)", RecPtr->xlogid, RecPtr->xrecoff); - if (readFile >= 0 && (RecPtr->xlogid != readId || - RecPtr->xrecoff / XLogSegSize != readSeg)) + if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg)) { close(readFile); readFile = -1; } - readId = RecPtr->xlogid; - readSeg = RecPtr->xrecoff / XLogSegSize; + XLByteToSeg(*RecPtr, readId, readSeg); if (readFile < 0) { - noBlck = true; - readFile = XLogFileOpen(readId, readSeg, nextmode); + readFile = XLogFileOpen(readId, readSeg, (emode == LOG)); if (readFile < 0) goto next_record_is_invalid; + readOff = (uint32) (-1); /* force read to occur below */ } - if (noBlck || readOff != (RecPtr->xrecoff % XLogSegSize) / BLCKSZ) + targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / BLCKSZ) * BLCKSZ; + if (readOff != targetPageOff) { - readOff = (RecPtr->xrecoff % XLogSegSize) / BLCKSZ; - if (lseek(readFile, (off_t) (readOff * BLCKSZ), SEEK_SET) < 0) - elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %m", + readOff = targetPageOff; + if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0) + { + elog(emode, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %m", readId, readSeg, readOff); + goto next_record_is_invalid; + } if (read(readFile, readBuf, BLCKSZ) != BLCKSZ) - elog(STOP, "ReadRecord: read(logfile %u seg %u off %u) failed: %m", + { + elog(emode, "ReadRecord: read(logfile %u seg %u off %u) failed: %m", readId, readSeg, readOff); + goto next_record_is_invalid; + } if (((XLogPageHeader) readBuf)->xlp_magic != XLOG_PAGE_MAGIC) { elog(emode, "ReadRecord: invalid magic number %u in logfile %u seg %u off %u", @@ -1269,64 +1574,83 @@ ReadRecord(XLogRecPtr *RecPtr, char *buffer) goto next_record_is_invalid; } } - if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_SUBRECORD) && + if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) && RecPtr->xrecoff % BLCKSZ == SizeOfXLogPHD) { - elog(emode, "ReadRecord: subrecord is requested by (%u, %u)", + elog(emode, "ReadRecord: contrecord is requested by (%u, %u)", RecPtr->xlogid, RecPtr->xrecoff); goto next_record_is_invalid; } record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % BLCKSZ); got_record:; + /* + * Currently, xl_len == 0 must be bad data, but that might not be + * true forever. See note in XLogInsert. + */ if (record->xl_len == 0) { elog(emode, "ReadRecord: record with zero len at (%u, %u)", - RecPtr->xlogid, RecPtr->xrecoff); + RecPtr->xlogid, RecPtr->xrecoff); goto next_record_is_invalid; } - if (record->xl_len > _INTL_MAXLOGRECSZ) + /* + * Compute total length of record including any appended backup blocks. + */ + total_len = SizeOfXLogRecord + record->xl_len; + for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) + { + if (!(record->xl_info & XLR_SET_BKP_BLOCK(i))) + continue; + total_len += sizeof(BkpBlock) + BLCKSZ; + } + /* + * Make sure it will fit in buffer (currently, it is mechanically + * impossible for this test to fail, but it seems like a good idea + * anyway). + */ + if (total_len > _INTL_MAXLOGRECSZ) { elog(emode, "ReadRecord: too long record len %u at (%u, %u)", - record->xl_len, RecPtr->xlogid, RecPtr->xrecoff); + total_len, RecPtr->xlogid, RecPtr->xrecoff); goto next_record_is_invalid; } if (record->xl_rmid > RM_MAX_ID) { - elog(emode, "ReadRecord: invalid resource managed id %u at (%u, %u)", + elog(emode, "ReadRecord: invalid resource manager id %u at (%u, %u)", record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff); goto next_record_is_invalid; } nextRecord = NULL; - len = BLCKSZ - RecPtr->xrecoff % BLCKSZ - SizeOfXLogRecord; - if (record->xl_len > len) + len = BLCKSZ - RecPtr->xrecoff % BLCKSZ; + if (total_len > len) { - XLogSubRecord *subrecord; + /* Need to reassemble record */ + XLogContRecord *contrecord; uint32 gotlen = len; - memcpy(buffer, record, len + SizeOfXLogRecord); + memcpy(buffer, record, len); record = (XLogRecord *) buffer; - buffer += len + SizeOfXLogRecord; + buffer += len; for (;;) { - readOff++; - if (readOff == XLogSegSize / BLCKSZ) + readOff += BLCKSZ; + if (readOff >= XLogSegSize) { - readSeg++; - if (readSeg == XLogLastSeg) - { - readSeg = 0; - readId++; - } close(readFile); - readOff = 0; - readFile = XLogFileOpen(readId, readSeg, nextmode); + readFile = -1; + NextLogSeg(readId, readSeg); + readFile = XLogFileOpen(readId, readSeg, (emode == LOG)); if (readFile < 0) goto next_record_is_invalid; + readOff = 0; } if (read(readFile, readBuf, BLCKSZ) != BLCKSZ) - elog(STOP, "ReadRecord: read(logfile %u seg %u off %u) failed: %m", + { + elog(emode, "ReadRecord: read(logfile %u seg %u off %u) failed: %m", readId, readSeg, readOff); + goto next_record_is_invalid; + } if (((XLogPageHeader) readBuf)->xlp_magic != XLOG_PAGE_MAGIC) { elog(emode, "ReadRecord: invalid magic number %u in logfile %u seg %u off %u", @@ -1334,164 +1658,65 @@ got_record:; readId, readSeg, readOff); goto next_record_is_invalid; } - if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_SUBRECORD)) + if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD)) { - elog(emode, "ReadRecord: there is no subrecord flag in logfile %u seg %u off %u", + elog(emode, "ReadRecord: there is no ContRecord flag in logfile %u seg %u off %u", readId, readSeg, readOff); goto next_record_is_invalid; } - subrecord = (XLogSubRecord *) ((char *) readBuf + SizeOfXLogPHD); - if (subrecord->xl_len == 0 || - record->xl_len < (subrecord->xl_len + gotlen)) + contrecord = (XLogContRecord *) ((char *) readBuf + SizeOfXLogPHD); + if (contrecord->xl_rem_len == 0 || + total_len != (contrecord->xl_rem_len + gotlen)) { - elog(emode, "ReadRecord: invalid subrecord len %u in logfile %u seg %u off %u", - subrecord->xl_len, readId, readSeg, readOff); + elog(emode, "ReadRecord: invalid cont-record len %u in logfile %u seg %u off %u", + contrecord->xl_rem_len, readId, readSeg, readOff); goto next_record_is_invalid; } - len = BLCKSZ - SizeOfXLogPHD - SizeOfXLogSubRecord; - - if (subrecord->xl_len > len) - { - memcpy(buffer, (char *) subrecord + SizeOfXLogSubRecord, len); - gotlen += len; - buffer += len; - continue; - } - if (record->xl_len != (subrecord->xl_len + gotlen)) + len = BLCKSZ - SizeOfXLogPHD - SizeOfXLogContRecord; + if (contrecord->xl_rem_len > len) { - elog(emode, "ReadRecord: invalid len %u of constracted record in logfile %u seg %u off %u", - subrecord->xl_len + gotlen, readId, readSeg, readOff); - goto next_record_is_invalid; - } - memcpy(buffer, (char *) subrecord + SizeOfXLogSubRecord, subrecord->xl_len); - break; - } - if (!RecordIsValid(record, *RecPtr, emode)) - goto next_record_is_invalid; - if (BLCKSZ - SizeOfXLogRecord >= MAXALIGN(subrecord->xl_len) + - SizeOfXLogPHD + SizeOfXLogSubRecord) - { - nextRecord = (XLogRecord *) ((char *) subrecord + - MAXALIGN(subrecord->xl_len) + SizeOfXLogSubRecord); - } - EndRecPtr.xlogid = readId; - EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff * BLCKSZ + - SizeOfXLogPHD + SizeOfXLogSubRecord + - MAXALIGN(subrecord->xl_len); - ReadRecPtr = *RecPtr; - return (record); - } - if (!RecordIsValid(record, *RecPtr, emode)) - goto next_record_is_invalid; - if (BLCKSZ - SizeOfXLogRecord >= MAXALIGN(record->xl_len) + - RecPtr->xrecoff % BLCKSZ + SizeOfXLogRecord) - nextRecord = (XLogRecord *) ((char *) record + - MAXALIGN(record->xl_len) + SizeOfXLogRecord); - EndRecPtr.xlogid = RecPtr->xlogid; - EndRecPtr.xrecoff = RecPtr->xrecoff + - MAXALIGN(record->xl_len) + SizeOfXLogRecord; - ReadRecPtr = *RecPtr; - - return (record); - -next_record_is_invalid:; - close(readFile); - readFile = -1; - nextRecord = NULL; - memset(buffer, 0, SizeOfXLogRecord); - record = (XLogRecord *) buffer; - - /* - * If we assumed that next record began on the same page where - * previous one ended - zero end of page. - */ - if (XLByteEQ(tmpRecPtr, EndRecPtr)) - { - Assert(EndRecPtr.xrecoff % BLCKSZ > (SizeOfXLogPHD + SizeOfXLogSubRecord) && - BLCKSZ - EndRecPtr.xrecoff % BLCKSZ >= SizeOfXLogRecord); - readId = EndRecPtr.xlogid; - readSeg = EndRecPtr.xrecoff / XLogSegSize; - readOff = (EndRecPtr.xrecoff % XLogSegSize) / BLCKSZ; - elog(LOG, "Formatting logfile %u seg %u block %u at offset %u", - readId, readSeg, readOff, EndRecPtr.xrecoff % BLCKSZ); - readFile = XLogFileOpen(readId, readSeg, false); - if (lseek(readFile, (off_t) (readOff * BLCKSZ), SEEK_SET) < 0) - elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %m", - readId, readSeg, readOff); - if (read(readFile, readBuf, BLCKSZ) != BLCKSZ) - elog(STOP, "ReadRecord: read(logfile %u seg %u off %u) failed: %m", - readId, readSeg, readOff); - memset(readBuf + EndRecPtr.xrecoff % BLCKSZ, 0, - BLCKSZ - EndRecPtr.xrecoff % BLCKSZ); - if (lseek(readFile, (off_t) (readOff * BLCKSZ), SEEK_SET) < 0) - elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %m", - readId, readSeg, readOff); - if (write(readFile, readBuf, BLCKSZ) != BLCKSZ) - elog(STOP, "ReadRecord: write(logfile %u seg %u off %u) failed: %m", - readId, readSeg, readOff); - readOff++; - } - else - { - Assert(EndRecPtr.xrecoff % BLCKSZ == 0 || - BLCKSZ - EndRecPtr.xrecoff % BLCKSZ < SizeOfXLogRecord); - readId = tmpRecPtr.xlogid; - readSeg = tmpRecPtr.xrecoff / XLogSegSize; - readOff = (tmpRecPtr.xrecoff % XLogSegSize) / BLCKSZ; - Assert(readOff > 0); - } - if (readOff > 0) - { - if (!XLByteEQ(tmpRecPtr, EndRecPtr)) - elog(LOG, "Formatting logfile %u seg %u block %u at offset 0", - readId, readSeg, readOff); - readOff *= BLCKSZ; - memset(readBuf, 0, BLCKSZ); - readFile = XLogFileOpen(readId, readSeg, false); - if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0) - elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %m", - readId, readSeg, readOff); - while (readOff < XLogSegSize) - { - if (write(readFile, readBuf, BLCKSZ) != BLCKSZ) - elog(STOP, "ReadRecord: write(logfile %u seg %u off %u) failed: %m", - readId, readSeg, readOff); - readOff += BLCKSZ; - } - } - if (readFile >= 0) - { - if (pg_fsync(readFile) < 0) - elog(STOP, "ReadRecord: fsync(logfile %u seg %u) failed: %m", - readId, readSeg); - close(readFile); - readFile = -1; - } - - readId = EndRecPtr.xlogid; - readSeg = (EndRecPtr.xrecoff - 1) / XLogSegSize + 1; - elog(LOG, "The last logId/logSeg is (%u, %u)", readId, readSeg - 1); - if (ControlFile->logId != readId || ControlFile->logSeg != readSeg) - { - elog(LOG, "Set logId/logSeg in control file"); - ControlFile->logId = readId; - ControlFile->logSeg = readSeg; - ControlFile->time = time(NULL); - UpdateControlFile(); - } - if (readSeg == XLogLastSeg) - { - readSeg = 0; - readId++; + memcpy(buffer, (char *)contrecord + SizeOfXLogContRecord, len); + gotlen += len; + buffer += len; + continue; + } + memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord, + contrecord->xl_rem_len); + break; + } + if (!RecordIsValid(record, *RecPtr, emode)) + goto next_record_is_invalid; + if (BLCKSZ - SizeOfXLogRecord >= SizeOfXLogPHD + + SizeOfXLogContRecord + MAXALIGN(contrecord->xl_rem_len)) + { + nextRecord = (XLogRecord *) ((char *) contrecord + + SizeOfXLogContRecord + MAXALIGN(contrecord->xl_rem_len)); + } + EndRecPtr.xlogid = readId; + EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff + + SizeOfXLogPHD + SizeOfXLogContRecord + + MAXALIGN(contrecord->xl_rem_len); + ReadRecPtr = *RecPtr; + return record; } - { - char path[MAXPGPATH]; - XLogFileName(path, readId, readSeg); - unlink(path); - } + /* Record does not cross a page boundary */ + if (!RecordIsValid(record, *RecPtr, emode)) + goto next_record_is_invalid; + if (BLCKSZ - SizeOfXLogRecord >= RecPtr->xrecoff % BLCKSZ + + MAXALIGN(total_len)) + nextRecord = (XLogRecord *) ((char *) record + MAXALIGN(total_len)); + EndRecPtr.xlogid = RecPtr->xlogid; + EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len); + ReadRecPtr = *RecPtr; + memcpy(buffer, record, total_len); + return (XLogRecord *) buffer; - return (record); +next_record_is_invalid:; + close(readFile); + readFile = -1; + nextRecord = NULL; + return NULL; } /* @@ -1521,17 +1746,18 @@ static void WriteControlFile(void) { int fd; - char buffer[BLCKSZ]; + char buffer[BLCKSZ]; /* need not be aligned */ #ifdef USE_LOCALE char *localeptr; #endif /* - * Initialize compatibility-check fields + * Initialize version and compatibility-check fields */ + ControlFile->pg_control_version = PG_CONTROL_VERSION; + ControlFile->catalog_version_no = CATALOG_VERSION_NO; ControlFile->blcksz = BLCKSZ; ControlFile->relseg_size = RELSEG_SIZE; - ControlFile->catalog_version_no = CATALOG_VERSION_NO; #ifdef USE_LOCALE localeptr = setlocale(LC_COLLATE, NULL); if (!localeptr) @@ -1558,6 +1784,13 @@ WriteControlFile(void) strcpy(ControlFile->lc_ctype, "C"); #endif + /* Contents are protected with a CRC */ + INIT_CRC64(ControlFile->crc); + COMP_CRC64(ControlFile->crc, + (char*) ControlFile + sizeof(crc64), + sizeof(ControlFileData) - sizeof(crc64)); + FIN_CRC64(ControlFile->crc); + /* * We write out BLCKSZ bytes into pg_control, zero-padding the * excess over sizeof(ControlFileData). This reduces the odds @@ -1568,12 +1801,6 @@ WriteControlFile(void) if (sizeof(ControlFileData) > BLCKSZ) elog(STOP, "sizeof(ControlFileData) is too large ... fix xlog.c"); - INIT_CRC64(ControlFile->crc); - COMP_CRC64(ControlFile->crc, - ((char*)ControlFile + offsetof(ControlFileData, logId)), - (sizeof(ControlFileData) - offsetof(ControlFileData, logId))); - FIN_CRC64(ControlFile->crc); - memset(buffer, 0, BLCKSZ); memcpy(buffer, ControlFile, sizeof(ControlFileData)); @@ -1609,13 +1836,24 @@ ReadControlFile(void) close(fd); + /* + * Check for expected pg_control format version. If this is wrong, + * the CRC check will likely fail because we'll be checking the wrong + * number of bytes. Complaining about wrong version will probably be + * more enlightening than complaining about wrong CRC. + */ + if (ControlFile->pg_control_version != PG_CONTROL_VERSION) + elog(STOP, "database was initialized with PG_CONTROL_VERSION %d,\n\tbut the backend was compiled with PG_CONTROL_VERSION %d.\n\tlooks like you need to initdb.", + ControlFile->pg_control_version, PG_CONTROL_VERSION); + + /* Now check the CRC. */ INIT_CRC64(crc); COMP_CRC64(crc, - ((char*)ControlFile + offsetof(ControlFileData, logId)), - (sizeof(ControlFileData) - offsetof(ControlFileData, logId))); + (char*) ControlFile + sizeof(crc64), + sizeof(ControlFileData) - sizeof(crc64)); FIN_CRC64(crc); - if (crc.crc1 != ControlFile->crc.crc1 || crc.crc2 != ControlFile->crc.crc2) + if (!EQ_CRC64(crc, ControlFile->crc)) elog(STOP, "Invalid CRC in control file"); /* @@ -1629,15 +1867,15 @@ ReadControlFile(void) * for themselves. (These locale settings are considered critical * compatibility items because they can affect sort order of indexes.) */ + if (ControlFile->catalog_version_no != CATALOG_VERSION_NO) + elog(STOP, "database was initialized with CATALOG_VERSION_NO %d,\n\tbut the backend was compiled with CATALOG_VERSION_NO %d.\n\tlooks like you need to initdb.", + ControlFile->catalog_version_no, CATALOG_VERSION_NO); if (ControlFile->blcksz != BLCKSZ) elog(STOP, "database was initialized with BLCKSZ %d,\n\tbut the backend was compiled with BLCKSZ %d.\n\tlooks like you need to initdb.", ControlFile->blcksz, BLCKSZ); if (ControlFile->relseg_size != RELSEG_SIZE) elog(STOP, "database was initialized with RELSEG_SIZE %d,\n\tbut the backend was compiled with RELSEG_SIZE %d.\n\tlooks like you need to initdb.", ControlFile->relseg_size, RELSEG_SIZE); - if (ControlFile->catalog_version_no != CATALOG_VERSION_NO) - elog(STOP, "database was initialized with CATALOG_VERSION_NO %d,\n\tbut the backend was compiled with CATALOG_VERSION_NO %d.\n\tlooks like you need to initdb.", - ControlFile->catalog_version_no, CATALOG_VERSION_NO); #ifdef USE_LOCALE if (setlocale(LC_COLLATE, ControlFile->lc_collate) == NULL) elog(STOP, "database was initialized with LC_COLLATE '%s',\n\twhich is not recognized by setlocale().\n\tlooks like you need to initdb.", @@ -1660,8 +1898,8 @@ UpdateControlFile(void) INIT_CRC64(ControlFile->crc); COMP_CRC64(ControlFile->crc, - ((char*)ControlFile + offsetof(ControlFileData, logId)), - (sizeof(ControlFileData) - offsetof(ControlFileData, logId))); + (char*) ControlFile + sizeof(crc64), + sizeof(ControlFileData) - sizeof(crc64)); FIN_CRC64(ControlFile->crc); fd = BasicOpenFile(ControlFilePath, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR); @@ -1678,7 +1916,7 @@ UpdateControlFile(void) } /* - * Management of shared memory for XLOG + * Initialization of shared memory for XLOG */ int @@ -1687,9 +1925,9 @@ XLOGShmemSize(void) if (XLOGbuffers < MinXLOGbuffers) XLOGbuffers = MinXLOGbuffers; - return (sizeof(XLogCtlData) + BLCKSZ * XLOGbuffers + - sizeof(XLogRecPtr) * XLOGbuffers + - sizeof(ControlFileData)); + return MAXALIGN(sizeof(XLogCtlData) + sizeof(XLogRecPtr) * XLOGbuffers) + + BLCKSZ * XLOGbuffers + + MAXALIGN(sizeof(ControlFileData)); } void @@ -1702,13 +1940,46 @@ XLOGShmemInit(void) XLOGbuffers = MinXLOGbuffers; XLogCtl = (XLogCtlData *) - ShmemInitStruct("XLOG Ctl", sizeof(XLogCtlData) + BLCKSZ * XLOGbuffers + - sizeof(XLogRecPtr) * XLOGbuffers, &found); + ShmemInitStruct("XLOG Ctl", + MAXALIGN(sizeof(XLogCtlData) + + sizeof(XLogRecPtr) * XLOGbuffers) + + BLCKSZ * XLOGbuffers, + &found); Assert(!found); ControlFile = (ControlFileData *) ShmemInitStruct("Control File", sizeof(ControlFileData), &found); Assert(!found); + memset(XLogCtl, 0, sizeof(XLogCtlData)); + /* + * Since XLogCtlData contains XLogRecPtr fields, its sizeof should be + * a multiple of the alignment for same, so no extra alignment padding + * is needed here. + */ + XLogCtl->xlblocks = (XLogRecPtr *) + (((char *) XLogCtl) + sizeof(XLogCtlData)); + memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers); + /* + * Here, on the other hand, we must MAXALIGN to ensure the page buffers + * have worst-case alignment. + */ + XLogCtl->pages = + ((char *) XLogCtl) + MAXALIGN(sizeof(XLogCtlData) + + sizeof(XLogRecPtr) * XLOGbuffers); + memset(XLogCtl->pages, 0, BLCKSZ * XLOGbuffers); + + /* + * Do basic initialization of XLogCtl shared data. + * (StartupXLOG will fill in additional info.) + */ + XLogCtl->XLogCacheByte = BLCKSZ * XLOGbuffers; + XLogCtl->XLogCacheBlck = XLOGbuffers - 1; + XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages); + S_INIT_LOCK(&(XLogCtl->insert_lck)); + S_INIT_LOCK(&(XLogCtl->info_lck)); + S_INIT_LOCK(&(XLogCtl->logwrt_lck)); + S_INIT_LOCK(&(XLogCtl->chkp_lck)); + /* * If we are not in bootstrap mode, pg_control should already exist. * Read and validate it immediately (see comments in ReadControlFile() @@ -1719,27 +1990,33 @@ XLOGShmemInit(void) } /* - * This func must be called ONCE on system install + * This func must be called ONCE on system install. It creates pg_control + * and the initial XLOG segment. */ void -BootStrapXLOG() +BootStrapXLOG(void) { CheckPoint checkPoint; - char buffer[BLCKSZ]; - bool usexistent = false; - XLogPageHeader page = (XLogPageHeader) buffer; + char *buffer; + XLogPageHeader page; XLogRecord *record; + bool usexistent = false; crc64 crc; + /* Use malloc() to ensure buffer is MAXALIGNED */ + buffer = (char *) malloc(BLCKSZ); + page = (XLogPageHeader) buffer; + checkPoint.redo.xlogid = 0; checkPoint.redo.xrecoff = SizeOfXLogPHD; checkPoint.undo = checkPoint.redo; + checkPoint.ThisStartUpID = 0; checkPoint.nextXid = FirstTransactionId; checkPoint.nextOid = BootstrapObjectIdData; - checkPoint.ThisStartUpID = 0; - checkPoint.Shutdown = true; + checkPoint.time = time(NULL); ShmemVariableCache->nextXid = checkPoint.nextXid; + ShmemVariableCache->xidCount = 0; ShmemVariableCache->nextOid = checkPoint.nextOid; ShmemVariableCache->oidCount = 0; @@ -1752,34 +2029,36 @@ BootStrapXLOG() record->xl_xact_prev = record->xl_prev; record->xl_xid = InvalidTransactionId; record->xl_len = sizeof(checkPoint); - record->xl_info = 0; + record->xl_info = XLOG_CHECKPOINT_SHUTDOWN; record->xl_rmid = RM_XLOG_ID; - memcpy((char *) record + SizeOfXLogRecord, &checkPoint, sizeof(checkPoint)); + memcpy(XLogRecGetData(record), &checkPoint, sizeof(checkPoint)); INIT_CRC64(crc); - COMP_CRC64(crc, ((char*)&checkPoint), sizeof(checkPoint)); - COMP_CRC64(crc, ((char*)record + offsetof(XLogRecord, xl_prev)), - (SizeOfXLogRecord - offsetof(XLogRecord, xl_prev))); + COMP_CRC64(crc, &checkPoint, sizeof(checkPoint)); + COMP_CRC64(crc, (char*) record + sizeof(crc64), + SizeOfXLogRecord - sizeof(crc64)); FIN_CRC64(crc); record->xl_crc = crc; - logFile = XLogFileInit(0, 0, &usexistent); + openLogFile = XLogFileInit(0, 0, &usexistent); - if (write(logFile, buffer, BLCKSZ) != BLCKSZ) + if (write(openLogFile, buffer, BLCKSZ) != BLCKSZ) elog(STOP, "BootStrapXLOG failed to write logfile: %m"); - if (pg_fsync(logFile) != 0) + if (pg_fsync(openLogFile) != 0) elog(STOP, "BootStrapXLOG failed to fsync logfile: %m"); - close(logFile); - logFile = -1; + close(openLogFile); + openLogFile = -1; memset(ControlFile, 0, sizeof(ControlFileData)); + /* Initialize pg_control status fields */ + ControlFile->state = DB_SHUTDOWNED; + ControlFile->time = checkPoint.time; ControlFile->logId = 0; ControlFile->logSeg = 1; ControlFile->checkPoint = checkPoint.redo; - ControlFile->time = time(NULL); - ControlFile->state = DB_SHUTDOWNED; + ControlFile->checkPointCopy = checkPoint; /* some additional ControlFile fields are set in WriteControlFile() */ WriteControlFile(); @@ -1788,47 +2067,35 @@ BootStrapXLOG() static char * str_time(time_t tnow) { - static char buf[20]; + static char buf[32]; strftime(buf, sizeof(buf), - "%Y-%m-%d %H:%M:%S", + "%Y-%m-%d %H:%M:%S %Z", localtime(&tnow)); return buf; } /* - * This func must be called ONCE on system startup + * This must be called ONCE during postmaster or standalone-backend startup */ void -StartupXLOG() +StartupXLOG(void) { XLogCtlInsert *Insert; CheckPoint checkPoint; + bool wasShutdown; XLogRecPtr RecPtr, - LastRec; + LastRec, + checkPointLoc, + EndOfLog; XLogRecord *record; - char buffer[_INTL_MAXLOGRECSZ + SizeOfXLogRecord]; + char *buffer; - elog(LOG, "starting up"); - CritSectionCount++; + /* Use malloc() to ensure record buffer is MAXALIGNED */ + buffer = (char *) malloc(_INTL_MAXLOGRECSZ); - XLogCtl->xlblocks = (XLogRecPtr *) (((char *) XLogCtl) + sizeof(XLogCtlData)); - XLogCtl->pages = ((char *) XLogCtl->xlblocks + sizeof(XLogRecPtr) * XLOGbuffers); - XLogCtl->XLogCacheByte = BLCKSZ * XLOGbuffers; - XLogCtl->XLogCacheBlck = XLOGbuffers - 1; - memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers); - XLogCtl->LgwrRqst = LgwrRqst; - XLogCtl->LgwrResult = LgwrResult; - XLogCtl->Insert.LgwrResult = LgwrResult; - XLogCtl->Insert.curridx = 0; - XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages); - XLogCtl->Write.LgwrResult = LgwrResult; - XLogCtl->Write.curridx = 0; - S_INIT_LOCK(&(XLogCtl->insert_lck)); - S_INIT_LOCK(&(XLogCtl->info_lck)); - S_INIT_LOCK(&(XLogCtl->lgwr_lck)); - S_INIT_LOCK(&(XLogCtl->chkp_lck)); + CritSectionCount++; /* * Read control file and check XLOG status looks valid. @@ -1860,22 +2127,42 @@ StartupXLOG() elog(LOG, "database system was interrupted at %s", str_time(ControlFile->time)); - LastRec = RecPtr = ControlFile->checkPoint; - if (!XRecOffIsValid(RecPtr.xrecoff)) - elog(STOP, "Invalid checkPoint in control file"); - elog(LOG, "CheckPoint record at (%u, %u)", RecPtr.xlogid, RecPtr.xrecoff); - - record = ReadRecord(&RecPtr, buffer); - if (record->xl_rmid != RM_XLOG_ID) - elog(STOP, "Invalid RMID in checkPoint record"); - if (record->xl_len != sizeof(checkPoint)) - elog(STOP, "Invalid length of checkPoint record"); - checkPoint = *((CheckPoint *) ((char *) record + SizeOfXLogRecord)); + /* + * Get the last valid checkpoint record. If the latest one according + * to pg_control is broken, try the next-to-last one. + */ + record = ReadCheckpointRecord(ControlFile->checkPoint, + "primary", buffer); + if (record != NULL) + { + checkPointLoc = ControlFile->checkPoint; + elog(LOG, "CheckPoint record at (%u, %u)", + checkPointLoc.xlogid, checkPointLoc.xrecoff); + } + else + { + record = ReadCheckpointRecord(ControlFile->prevCheckPoint, + "secondary", buffer); + if (record != NULL) + { + checkPointLoc = ControlFile->prevCheckPoint; + elog(LOG, "Using previous CheckPoint record at (%u, %u)", + checkPointLoc.xlogid, checkPointLoc.xrecoff); + InRecovery = true; /* force recovery even if SHUTDOWNED */ + } + else + { + elog(STOP, "Unable to locate a valid CheckPoint record"); + } + } + LastRec = RecPtr = checkPointLoc; + memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint)); + wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN); elog(LOG, "Redo record at (%u, %u); Undo record at (%u, %u); Shutdown %s", checkPoint.redo.xlogid, checkPoint.redo.xrecoff, checkPoint.undo.xlogid, checkPoint.undo.xrecoff, - (checkPoint.Shutdown) ? "TRUE" : "FALSE"); + wasShutdown ? "TRUE" : "FALSE"); elog(LOG, "NextTransactionId: %u; NextOid: %u", checkPoint.nextXid, checkPoint.nextOid); if (checkPoint.nextXid < FirstTransactionId || @@ -1883,6 +2170,7 @@ StartupXLOG() elog(STOP, "Invalid NextTransactionId/NextOid"); ShmemVariableCache->nextXid = checkPoint.nextXid; + ShmemVariableCache->xidCount = 0; ShmemVariableCache->nextOid = checkPoint.nextOid; ShmemVariableCache->oidCount = 0; @@ -1898,10 +2186,8 @@ StartupXLOG() if (XLByteLT(checkPoint.undo, RecPtr) || XLByteLT(checkPoint.redo, RecPtr)) { - if (checkPoint.Shutdown) + if (wasShutdown) elog(STOP, "Invalid Redo/Undo record in shutdown checkpoint"); - if (ControlFile->state == DB_SHUTDOWNED) - elog(STOP, "Invalid Redo/Undo record in shut down state"); InRecovery = true; } else if (ControlFile->state != DB_SHUTDOWNED) @@ -1923,11 +2209,11 @@ StartupXLOG() /* Is REDO required ? */ if (XLByteLT(checkPoint.redo, RecPtr)) - record = ReadRecord(&(checkPoint.redo), buffer); + record = ReadRecord(&(checkPoint.redo), STOP, buffer); else /* read past CheckPoint record */ - record = ReadRecord(NULL, buffer); + record = ReadRecord(NULL, LOG, buffer); - if (record->xl_len != 0) + if (record != NULL) { InRedo = true; elog(LOG, "redo starts at (%u, %u)", @@ -1935,7 +2221,11 @@ StartupXLOG() do { if (record->xl_xid >= ShmemVariableCache->nextXid) + { + /* This probably shouldn't happen... */ ShmemVariableCache->nextXid = record->xl_xid + 1; + ShmemVariableCache->xidCount = 0; + } if (XLOG_DEBUG) { char buf[8192]; @@ -1947,16 +2237,15 @@ StartupXLOG() strcat(buf, " - "); RmgrTable[record->xl_rmid].rm_desc(buf, record->xl_info, XLogRecGetData(record)); - strcat(buf, "\n"); - write(2, buf, strlen(buf)); + fprintf(stderr, "%s\n", buf); } - if (record->xl_info & (XLR_BKP_BLOCK_1|XLR_BKP_BLOCK_2)) + if (record->xl_info & XLR_BKP_BLOCK_MASK) RestoreBkpBlocks(record, EndRecPtr); RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record); - record = ReadRecord(NULL, buffer); - } while (record->xl_len != 0); + record = ReadRecord(NULL, LOG, buffer); + } while (record != NULL); elog(LOG, "redo done at (%u, %u)", ReadRecPtr.xlogid, ReadRecPtr.xrecoff); LastRec = ReadRecPtr; @@ -1966,29 +2255,40 @@ StartupXLOG() elog(LOG, "redo is not required"); } - /* Init xlog buffer cache */ - record = ReadRecord(&LastRec, buffer); - logId = EndRecPtr.xlogid; - logSeg = (EndRecPtr.xrecoff - 1) / XLogSegSize; - logOff = 0; - logFile = XLogFileOpen(logId, logSeg, false); - XLogCtl->xlblocks[0].xlogid = logId; + /* + * Init xlog buffer cache using the block containing the last valid + * record from the previous incarnation. + */ + record = ReadRecord(&LastRec, STOP, buffer); + EndOfLog = EndRecPtr; + XLByteToPrevSeg(EndOfLog, openLogId, openLogSeg); + openLogFile = XLogFileOpen(openLogId, openLogSeg, false); + openLogOff = 0; + ControlFile->logId = openLogId; + ControlFile->logSeg = openLogSeg + 1; + XLogCtl->xlblocks[0].xlogid = openLogId; XLogCtl->xlblocks[0].xrecoff = - ((EndRecPtr.xrecoff - 1) / BLCKSZ + 1) * BLCKSZ; + ((EndOfLog.xrecoff - 1) / BLCKSZ + 1) * BLCKSZ; Insert = &XLogCtl->Insert; - memcpy((char *) (Insert->currpage), readBuf, BLCKSZ); - Insert->currpos = ((char *) Insert->currpage) + - (EndRecPtr.xrecoff + BLCKSZ - XLogCtl->xlblocks[0].xrecoff); + /* Tricky point here: readBuf contains the *last* block that the LastRec + * record spans, not the one it starts in, which is what we want. + */ + Assert(readOff == (XLogCtl->xlblocks[0].xrecoff - BLCKSZ) % XLogSegSize); + memcpy((char *) Insert->currpage, readBuf, BLCKSZ); + Insert->currpos = (char *) Insert->currpage + + (EndOfLog.xrecoff + BLCKSZ - XLogCtl->xlblocks[0].xrecoff); + /* Make sure rest of page is zero */ + memset(Insert->currpos, 0, INSERT_FREESPACE(Insert)); Insert->PrevRecord = LastRec; - LgwrRqst.Write = LgwrRqst.Flush = - LgwrResult.Write = LgwrResult.Flush = EndRecPtr; + LogwrtResult.Write = LogwrtResult.Flush = EndOfLog; - XLogCtl->Write.LgwrResult = LgwrResult; - Insert->LgwrResult = LgwrResult; + XLogCtl->Write.LogwrtResult = LogwrtResult; + Insert->LogwrtResult = LogwrtResult; + XLogCtl->LogwrtResult = LogwrtResult; - XLogCtl->LgwrRqst = LgwrRqst; - XLogCtl->LgwrResult = LgwrResult; + XLogCtl->LogwrtRqst.Write = EndOfLog; + XLogCtl->LogwrtRqst.Flush = EndOfLog; #ifdef NOT_USED /* UNDO */ @@ -2001,7 +2301,7 @@ StartupXLOG() RecPtr.xlogid, RecPtr.xrecoff); do { - record = ReadRecord(&RecPtr, buffer); + record = ReadRecord(&RecPtr, STOP, buffer); if (TransactionIdIsValid(record->xl_xid) && !TransactionIdDidCommit(record->xl_xid)) RmgrTable[record->xl_rmid].rm_undo(EndRecPtr, record); @@ -2017,25 +2317,21 @@ StartupXLOG() if (InRecovery) { + /* + * In case we had to use the secondary checkpoint, make sure that + * it will still be shown as the secondary checkpoint after this + * CreateCheckPoint operation; we don't want the broken primary + * checkpoint to become prevCheckPoint... + */ + ControlFile->checkPoint = checkPointLoc; CreateCheckPoint(true); XLogCloseRelationCache(); } - if (XLOGfiles > 0) /* pre-allocate log files */ - { - uint32 _logId = logId, - _logSeg = logSeg; - int lf, i; - bool usexistent; - - for (i = 1; i <= XLOGfiles; i++) - { - NextLogSeg(_logId, _logSeg); - usexistent = false; - lf = XLogFileInit(_logId, _logSeg, &usexistent); - close(lf); - } - } + /* + * Preallocate additional log files, if wanted. + */ + PreallocXlogFiles(EndOfLog); InRecovery = false; @@ -2049,11 +2345,63 @@ StartupXLOG() elog(LOG, "database system is in production state"); CritSectionCount--; - return; + /* Shut down readFile facility, free space */ + if (readFile >= 0) + { + close(readFile); + readFile = -1; + } + if (readBuf) + { + free(readBuf); + readBuf = NULL; + } + + free(buffer); +} + +/* Subroutine to try to fetch and validate a prior checkpoint record */ +static XLogRecord * +ReadCheckpointRecord(XLogRecPtr RecPtr, + const char *whichChkpt, + char *buffer) +{ + XLogRecord *record; + + if (!XRecOffIsValid(RecPtr.xrecoff)) + { + elog(LOG, "Invalid %s checkPoint link in control file", whichChkpt); + return NULL; + } + + record = ReadRecord(&RecPtr, LOG, buffer); + + if (record == NULL) + { + elog(LOG, "Invalid %s checkPoint record", whichChkpt); + return NULL; + } + if (record->xl_rmid != RM_XLOG_ID) + { + elog(LOG, "Invalid RMID in %s checkPoint record", whichChkpt); + return NULL; + } + if (record->xl_info != XLOG_CHECKPOINT_SHUTDOWN && + record->xl_info != XLOG_CHECKPOINT_ONLINE) + { + elog(LOG, "Invalid xl_info in %s checkPoint record", whichChkpt); + return NULL; + } + if (record->xl_len != sizeof(CheckPoint)) + { + elog(LOG, "Invalid length of %s checkPoint record", whichChkpt); + return NULL; + } + return record; } /* - * Postmaster uses it to set ThisStartUpID & RedoRecPtr from + * Postmaster uses this to initialize ThisStartUpID & RedoRecPtr from * XLogCtlData located in shmem after successful startup. */ void @@ -2064,9 +2412,17 @@ SetThisStartUpID(void) } /* - * CheckPoint-er called by postmaster creates copy of RedoRecPtr - * for postmaster in shmem. Postmaster uses GetRedoRecPtr after - * that to update its own copy of RedoRecPtr. + * CheckPoint process called by postmaster saves copy of new RedoRecPtr + * in shmem (using SetRedoRecPtr). When checkpointer completes, postmaster + * calls GetRedoRecPtr to update its own copy of RedoRecPtr, so that + * subsequently-spawned backends will start out with a reasonably up-to-date + * local RedoRecPtr. Since these operations are not protected by any spinlock + * and copying an XLogRecPtr isn't atomic, it's unsafe to use either of these + * routines at other times! + * + * Note: once spawned, a backend must update its local RedoRecPtr from + * XLogCtl->Insert.RedoRecPtr while holding the insert spinlock. This is + * done in XLogInsert(). */ void SetRedoRecPtr(void) @@ -2081,13 +2437,16 @@ GetRedoRecPtr(void) } /* - * This func must be called ONCE on system shutdown + * This must be called ONCE during postmaster or standalone-backend shutdown */ void -ShutdownXLOG() +ShutdownXLOG(void) { elog(LOG, "shutting down"); + /* suppress in-transaction check in CreateCheckPoint */ + MyLastRecPtr.xrecoff = 0; + CritSectionCount++; CreateDummyCaches(); CreateCheckPoint(true); @@ -2096,6 +2455,9 @@ ShutdownXLOG() elog(LOG, "database system is shut down"); } +/* + * Perform a checkpoint --- either during shutdown, or on-the-fly + */ void CreateCheckPoint(bool shutdown) { @@ -2104,10 +2466,8 @@ CreateCheckPoint(bool shutdown) XLogCtlInsert *Insert = &XLogCtl->Insert; XLogRecData rdata; uint32 freespace; - uint16 curridx; uint32 _logId; uint32 _logSeg; - char archdir[MAXPGPATH]; unsigned spins = 0; if (MyLastRecPtr.xrecoff != 0) @@ -2122,154 +2482,198 @@ CreateCheckPoint(bool shutdown) CHECKPOINT_LOCK_TIMEOUT, 1000000); } - memset(&checkPoint, 0, sizeof(checkPoint)); if (shutdown) { ControlFile->state = DB_SHUTDOWNING; ControlFile->time = time(NULL); UpdateControlFile(); } + + memset(&checkPoint, 0, sizeof(checkPoint)); checkPoint.ThisStartUpID = ThisStartUpID; - checkPoint.Shutdown = shutdown; + checkPoint.time = time(NULL); - /* Get REDO record ptr */ S_LOCK(&(XLogCtl->insert_lck)); - freespace = ((char *) Insert->currpage) + BLCKSZ - Insert->currpos; + + /* + * If this isn't a shutdown, and we have not inserted any XLOG records + * since the start of the last checkpoint, skip the checkpoint. The + * idea here is to avoid inserting duplicate checkpoints when the system + * is idle. That wastes log space, and more importantly it exposes us to + * possible loss of both current and previous checkpoint records if the + * machine crashes just as we're writing the update. (Perhaps it'd make + * even more sense to checkpoint only when the previous checkpoint record + * is in a different xlog page?) + * + * We have to make two tests to determine that nothing has happened since + * the start of the last checkpoint: current insertion point must match + * the end of the last checkpoint record, and its redo pointer must point + * to itself. + */ + if (!shutdown) + { + XLogRecPtr curInsert; + + INSERT_RECPTR(curInsert, Insert, Insert->curridx); + if (curInsert.xlogid == ControlFile->checkPoint.xlogid && + curInsert.xrecoff == ControlFile->checkPoint.xrecoff + + MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) && + ControlFile->checkPoint.xlogid == + ControlFile->checkPointCopy.redo.xlogid && + ControlFile->checkPoint.xrecoff == + ControlFile->checkPointCopy.redo.xrecoff) + { + S_UNLOCK(&(XLogCtl->insert_lck)); + S_UNLOCK(&(XLogCtl->chkp_lck)); + END_CRIT_SECTION(); + return; + } + } + + /* + * Compute new REDO record ptr = location of next XLOG record. + * + * NB: this is NOT necessarily where the checkpoint record itself will + * be, since other backends may insert more XLOG records while we're + * off doing the buffer flush work. Those XLOG records are logically + * after the checkpoint, even though physically before it. Got that? + */ + freespace = INSERT_FREESPACE(Insert); if (freespace < SizeOfXLogRecord) { - curridx = NextBufIdx(Insert->curridx); - if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write)) - InitXLBuffer(curridx); - else - GetFreeXLBuffer(); + (void) AdvanceXLInsertBuffer(); + /* OK to ignore update return flag, since we will do flush anyway */ freespace = BLCKSZ - SizeOfXLogPHD; } - else - curridx = Insert->curridx; - checkPoint.redo.xlogid = XLogCtl->xlblocks[curridx].xlogid; - checkPoint.redo.xrecoff = XLogCtl->xlblocks[curridx].xrecoff - BLCKSZ + - Insert->currpos - ((char *) Insert->currpage); + INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx); + /* + * Here we update the shared RedoRecPtr for future XLogInsert calls; + * this must be done while holding the insert lock. + */ RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo; + /* + * Get UNDO record ptr - this is oldest of PROC->logRec values. + * We do this while holding insert lock to ensure that we won't miss + * any about-to-commit transactions (UNDO must include all xacts that + * have commits after REDO point). + */ + checkPoint.undo = GetUndoRecPtr(); + + if (shutdown && checkPoint.undo.xrecoff != 0) + elog(STOP, "Active transaction while data base is shutting down"); + + /* + * Now we can release insert lock, allowing other xacts to proceed + * even while we are flushing disk buffers. + */ S_UNLOCK(&(XLogCtl->insert_lck)); SpinAcquire(XidGenLockId); checkPoint.nextXid = ShmemVariableCache->nextXid; + if (!shutdown) + checkPoint.nextXid += ShmemVariableCache->xidCount; SpinRelease(XidGenLockId); + SpinAcquire(OidGenLockId); checkPoint.nextOid = ShmemVariableCache->nextOid; if (!shutdown) checkPoint.nextOid += ShmemVariableCache->oidCount; - SpinRelease(OidGenLockId); + /* + * Having constructed the checkpoint record, ensure all shmem disk buffers + * are flushed to disk. + */ FlushBufferPool(); - /* Get UNDO record ptr - should use oldest of PROC->logRec */ - checkPoint.undo = GetUndoRecPtr(); - - if (shutdown && checkPoint.undo.xrecoff != 0) - elog(STOP, "Active transaction while data base is shutting down"); - + /* + * Now insert the checkpoint record into XLOG. + */ rdata.buffer = InvalidBuffer; rdata.data = (char *)(&checkPoint); rdata.len = sizeof(checkPoint); rdata.next = NULL; - recptr = XLogInsert(RM_XLOG_ID, XLOG_CHECKPOINT, &rdata); + recptr = XLogInsert(RM_XLOG_ID, + shutdown ? XLOG_CHECKPOINT_SHUTDOWN : + XLOG_CHECKPOINT_ONLINE, + &rdata); + + XLogFlush(recptr); - if (shutdown && !XLByteEQ(checkPoint.redo, MyLastRecPtr)) + /* + * We now have ProcLastRecPtr = start of actual checkpoint record, + * recptr = end of actual checkpoint record. + */ + if (shutdown && !XLByteEQ(checkPoint.redo, ProcLastRecPtr)) elog(STOP, "XLog concurrent activity while data base is shutting down"); - XLogFlush(recptr); + /* + * Remember location of prior checkpoint's earliest info. + * Oldest item is redo or undo, whichever is older; but watch out + * for case that undo = 0. + */ + if (ControlFile->checkPointCopy.undo.xrecoff != 0 && + XLByteLT(ControlFile->checkPointCopy.undo, + ControlFile->checkPointCopy.redo)) + XLByteToSeg(ControlFile->checkPointCopy.undo, _logId, _logSeg); + else + XLByteToSeg(ControlFile->checkPointCopy.redo, _logId, _logSeg); + /* + * Update the control file. + */ SpinAcquire(ControlFileLockId); if (shutdown) - { - /* probably should delete extra log files */ ControlFile->state = DB_SHUTDOWNED; - } - else /* create new log file(s) */ - { - int lf; - bool usexistent = true; - - _logId = recptr.xlogid; - _logSeg = (recptr.xrecoff - 1) / XLogSegSize; - if (XLOGfiles > 0) - { - struct timeval delay; - int i; - - for (i = 1; i <= XLOGfiles; i++) - { - usexistent = true; - NextLogSeg(_logId, _logSeg); - lf = XLogFileInit(_logId, _logSeg, &usexistent); - close(lf); - /* - * Give up ControlFileLockId for 1/50 sec to let other - * backends switch to new log file in XLogWrite() - */ - SpinRelease(ControlFileLockId); - delay.tv_sec = 0; - delay.tv_usec = 20000; - (void) select(0, NULL, NULL, NULL, &delay); - SpinAcquire(ControlFileLockId); - } - } - else if ((recptr.xrecoff - 1) % XLogSegSize >= - (uint32) (0.75 * XLogSegSize)) - { - NextLogSeg(_logId, _logSeg); - lf = XLogFileInit(_logId, _logSeg, &usexistent); - close(lf); - } - } - - ControlFile->checkPoint = MyLastRecPtr; - strcpy(archdir, ControlFile->archdir); + ControlFile->prevCheckPoint = ControlFile->checkPoint; + ControlFile->checkPoint = ProcLastRecPtr; + ControlFile->checkPointCopy = checkPoint; ControlFile->time = time(NULL); UpdateControlFile(); SpinRelease(ControlFileLockId); /* - * Delete offline log files. Get oldest online - * log file from redo or undo record, whatever - * is older. + * Delete offline log files (those no longer needed even for previous + * checkpoint). */ - if (checkPoint.undo.xrecoff != 0 && - XLByteLT(checkPoint.undo, checkPoint.redo)) - { - _logId = checkPoint.undo.xlogid; - _logSeg = checkPoint.undo.xrecoff / XLogSegSize; - } - else - { - _logId = checkPoint.redo.xlogid; - _logSeg = checkPoint.redo.xrecoff / XLogSegSize; - } if (_logId || _logSeg) { - if (_logSeg) - _logSeg--; - else - { - _logId--; - _logSeg = 0; - } - MoveOfflineLogs(archdir, _logId, _logSeg); + PrevLogSeg(_logId, _logSeg); + MoveOfflineLogs(_logId, _logSeg); } + /* + * Make more log segments if needed. (Do this after deleting offline + * log segments, to avoid having peak disk space usage higher than + * necessary.) + */ + if (!shutdown) + PreallocXlogFiles(recptr); + S_UNLOCK(&(XLogCtl->chkp_lck)); - MyLastRecPtr.xrecoff = 0; /* to avoid commit record */ END_CRIT_SECTION(); - - return; } -void XLogPutNextOid(Oid nextOid); +/* + * Write a NEXTXID log record + */ +void +XLogPutNextXid(TransactionId nextXid) +{ + XLogRecData rdata; + + rdata.buffer = InvalidBuffer; + rdata.data = (char *)(&nextXid); + rdata.len = sizeof(TransactionId); + rdata.next = NULL; + (void) XLogInsert(RM_XLOG_ID, XLOG_NEXTXID, &rdata); +} +/* + * Write a NEXTOID log record + */ void XLogPutNextOid(Oid nextOid) { @@ -2282,18 +2686,63 @@ XLogPutNextOid(Oid nextOid) (void) XLogInsert(RM_XLOG_ID, XLOG_NEXTOID, &rdata); } +/* + * XLOG resource manager's routines + */ void xlog_redo(XLogRecPtr lsn, XLogRecord *record) { uint8 info = record->xl_info & ~XLR_INFO_MASK; - if (info == XLOG_NEXTOID) + if (info == XLOG_NEXTXID) + { + TransactionId nextXid; + + memcpy(&nextXid, XLogRecGetData(record), sizeof(TransactionId)); + if (ShmemVariableCache->nextXid < nextXid) + { + ShmemVariableCache->nextXid = nextXid; + ShmemVariableCache->xidCount = 0; + } + } + else if (info == XLOG_NEXTOID) { Oid nextOid; memcpy(&nextOid, XLogRecGetData(record), sizeof(Oid)); if (ShmemVariableCache->nextOid < nextOid) + { ShmemVariableCache->nextOid = nextOid; + ShmemVariableCache->oidCount = 0; + } + } + else if (info == XLOG_CHECKPOINT_SHUTDOWN) + { + CheckPoint checkPoint; + + memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint)); + /* In a SHUTDOWN checkpoint, believe the counters exactly */ + ShmemVariableCache->nextXid = checkPoint.nextXid; + ShmemVariableCache->xidCount = 0; + ShmemVariableCache->nextOid = checkPoint.nextOid; + ShmemVariableCache->oidCount = 0; + } + else if (info == XLOG_CHECKPOINT_ONLINE) + { + CheckPoint checkPoint; + + memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint)); + /* In an ONLINE checkpoint, treat the counters like NEXTXID/NEXTOID */ + if (ShmemVariableCache->nextXid < checkPoint.nextXid) + { + ShmemVariableCache->nextXid = checkPoint.nextXid; + ShmemVariableCache->xidCount = 0; + } + if (ShmemVariableCache->nextOid < checkPoint.nextOid) + { + ShmemVariableCache->nextOid = checkPoint.nextOid; + ShmemVariableCache->oidCount = 0; + } } } @@ -2307,7 +2756,8 @@ xlog_desc(char *buf, uint8 xl_info, char* rec) { uint8 info = xl_info & ~XLR_INFO_MASK; - if (info == XLOG_CHECKPOINT) + if (info == XLOG_CHECKPOINT_SHUTDOWN || + info == XLOG_CHECKPOINT_ONLINE) { CheckPoint *checkpoint = (CheckPoint*) rec; sprintf(buf + strlen(buf), "checkpoint: redo %u/%u; undo %u/%u; " @@ -2316,7 +2766,14 @@ xlog_desc(char *buf, uint8 xl_info, char* rec) checkpoint->undo.xlogid, checkpoint->undo.xrecoff, checkpoint->ThisStartUpID, checkpoint->nextXid, checkpoint->nextOid, - (checkpoint->Shutdown) ? "shutdown" : "online"); + (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online"); + } + else if (info == XLOG_NEXTXID) + { + TransactionId nextXid; + + memcpy(&nextXid, rec, sizeof(TransactionId)); + sprintf(buf + strlen(buf), "nextXid: %u", nextXid); } else if (info == XLOG_NEXTOID) { @@ -2340,7 +2797,7 @@ xlog_outrec(char *buf, XLogRecord *record) record->xl_xact_prev.xlogid, record->xl_xact_prev.xrecoff, record->xl_xid); - for (i = 0, bkpb = 0; i < 2; i++) + for (i = 0, bkpb = 0; i < XLR_MAX_BKP_BLOCKS; i++) { if (!(record->xl_info & (XLR_SET_BKP_BLOCK(i)))) continue; diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 8735db1ae1..8b80c326ca 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -6,10 +6,12 @@ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * + * $Header: /cvsroot/pgsql/src/backend/access/transam/xlogutils.c,v 1.14 2001/03/13 01:17:05 tgl Exp $ + * *------------------------------------------------------------------------- */ - #include "postgres.h" + #include "access/xlog.h" #include "access/transam.h" #include "access/xact.h" diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c index 836a6c87ef..95f73c356a 100644 --- a/src/backend/bootstrap/bootstrap.c +++ b/src/backend/bootstrap/bootstrap.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/bootstrap/bootstrap.c,v 1.104 2001/01/24 19:42:51 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/bootstrap/bootstrap.c,v 1.105 2001/03/13 01:17:05 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -18,13 +18,12 @@ #include #include #include - -#define BOOTSTRAP_INCLUDE /* mask out stuff in tcop/tcopprot.h */ - #ifdef HAVE_GETOPT_H #include #endif +#define BOOTSTRAP_INCLUDE /* mask out stuff in tcop/tcopprot.h */ + #include "access/genam.h" #include "access/heapam.h" #include "access/xlog.h" @@ -147,8 +146,6 @@ static MemoryContext nogc = NULL; /* special no-gc mem context */ extern int optind; extern char *optarg; -extern void SetRedoRecPtr(void); - /* * At bootstrap time, we first declare all the indices to be built, and * then build them. The IndexList structure stores enough information @@ -294,8 +291,16 @@ BootstrapMain(int argc, char *argv[]) else if (argc - optind == 1) dbName = argv[optind]; - SetProcessingMode(BootstrapProcessing); - IgnoreSystemIndexes(true); + if (dbName == NULL) + { + dbName = getenv("USER"); + if (dbName == NULL) + { + fputs("bootstrap backend: failed, no db name specified\n", stderr); + fputs(" and no USER enviroment variable\n", stderr); + proc_exit(1); + } + } if (!IsUnderPostmaster) { @@ -312,29 +317,52 @@ BootstrapMain(int argc, char *argv[]) } Assert(DataDir); - if (dbName == NULL) + if (IsUnderPostmaster) { - dbName = getenv("USER"); - if (dbName == NULL) - { - fputs("bootstrap backend: failed, no db name specified\n", stderr); - fputs(" and no USER enviroment variable\n", stderr); - proc_exit(1); - } + /* + * Properly accept or ignore signals the postmaster might send us + */ + pqsignal(SIGHUP, SIG_IGN); + pqsignal(SIGINT, SIG_IGN); /* ignore query-cancel */ + pqsignal(SIGTERM, die); + pqsignal(SIGQUIT, quickdie); + pqsignal(SIGUSR1, SIG_IGN); + pqsignal(SIGUSR2, SIG_IGN); + /* + * Reset some signals that are accepted by postmaster but not here + */ + pqsignal(SIGCHLD, SIG_IGN); + pqsignal(SIGTTIN, SIG_DFL); + pqsignal(SIGTTOU, SIG_DFL); + pqsignal(SIGCONT, SIG_DFL); + pqsignal(SIGWINCH, SIG_DFL); + /* + * Unblock signals (they were blocked when the postmaster forked us) + */ + PG_SETMASK(&UnBlockSig); } - - XLOGPathInit(); - - BaseInit(); - - if (!IsUnderPostmaster) + else { + /* Set up appropriately for interactive use */ pqsignal(SIGHUP, die); pqsignal(SIGINT, die); pqsignal(SIGTERM, die); pqsignal(SIGQUIT, die); + + /* + * Create lockfile for data directory. + */ + if (! CreateDataDirLockFile(DataDir, false)) + proc_exit(1); } + SetProcessingMode(BootstrapProcessing); + IgnoreSystemIndexes(true); + + XLOGPathInit(); + + BaseInit(); + /* * XLOG operations */ diff --git a/src/backend/port/beos/shm.c b/src/backend/port/beos/shm.c index ee5b67fc9c..e213ecb664 100644 --- a/src/backend/port/beos/shm.c +++ b/src/backend/port/beos/shm.c @@ -57,12 +57,25 @@ int* shmat(int memId,int m1,int m2) } } -/* Control a shared mem area : Used only to delete it */ -int shmctl(int shmid,int flag, struct shmid_ds* dummy) +/* Control a shared mem area */ +int shmctl(int shmid, int flag, struct shmid_ds* dummy) { - /* Delete the area */ - delete_area(shmid); - return 0; + if (flag == IPC_RMID) + { + /* Delete the area */ + delete_area(shmid); + return 0; + } + if (flag == IPC_STAT) + { + /* Is there a way to check existence of an area given its ID? + * For now, punt and assume it does not exist. + */ + errno = EINVAL; + return -1; + } + errno = EINVAL; + return -1; } /* Get an area based on the IPC key */ diff --git a/src/backend/port/qnx4/shm.c b/src/backend/port/qnx4/shm.c index 4cf0486544..9958f79938 100644 --- a/src/backend/port/qnx4/shm.c +++ b/src/backend/port/qnx4/shm.c @@ -7,7 +7,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/port/qnx4/Attic/shm.c,v 1.2 2000/04/12 17:15:30 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/port/qnx4/Attic/shm.c,v 1.3 2001/03/13 01:17:06 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -173,20 +173,25 @@ shmctl(int shmid, int cmd, struct shmid_ds * buf) struct shm_info info; char name[NAME_MAX + 1]; - /* IPC_RMID supported only */ - if (cmd != IPC_RMID) + if (cmd == IPC_RMID) { - errno = EINVAL; - return -1; + if (shm_getinfo(shmid, &info) == -1) + { + errno = EACCES; + return -1; + } + return shm_unlink(itoa(info.key, name, 16)); } - - if (shm_getinfo(shmid, &info) == -1) + if (cmd == IPC_STAT) { - errno = EACCES; + /* Can we support IPC_STAT? We only need shm_nattch ... + * For now, punt and assume the shm seg does not exist. + */ + errno = EINVAL; return -1; } - - return shm_unlink(itoa(info.key, name, 16)); + errno = EINVAL; + return -1; } int diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index e807d3d069..6a0d119865 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -6,12 +6,29 @@ * to the Postmaster and the postmaster uses the info in the * message to setup a backend process. * + * The postmaster also manages system-wide operations such as + * startup, shutdown, and periodic checkpoints. The postmaster + * itself doesn't do those operations, mind you --- it just forks + * off a subprocess to do them at the right times. It also takes + * care of resetting the system if a backend crashes. + * + * The postmaster process creates the shared memory and semaphore + * pools during startup, but as a rule does not touch them itself. + * In particular, it is not a member of the PROC array of backends + * and so it cannot participate in lock-manager operations. Keeping + * the postmaster away from shared memory operations makes it simpler + * and more reliable. The postmaster is almost always able to recover + * from crashes of individual backends by resetting shared memory; + * if it did much with shared memory then it would be prone to crashing + * along with the backends. + * + * * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/postmaster/postmaster.c,v 1.208 2001/02/20 01:34:40 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/postmaster/postmaster.c,v 1.209 2001/03/13 01:17:05 tgl Exp $ * * NOTES * @@ -21,16 +38,15 @@ * lock manager. * * Synchronization: - * The Postmaster shares memory with the backends and will have to lock - * the shared memory it accesses. The Postmaster should never block - * on messages from clients. + * The Postmaster shares memory with the backends but should avoid + * touching shared memory, so as not to become stuck if a crashing + * backend screws up locks or shared memory. Likewise, the Postmaster + * should never block on messages from frontend clients. * * Garbage Collection: * The Postmaster cleans up after backends if they have an emergency * exit and/or core dump. * - * Communication: - * *------------------------------------------------------------------------- */ @@ -194,8 +210,6 @@ extern char *optarg; extern int optind, opterr; -extern void GetRedoRecPtr(void); - /* * postmaster.c - function prototypes */ @@ -207,6 +221,7 @@ static void reset_shared(unsigned short port); static void SIGHUP_handler(SIGNAL_ARGS); static void pmdie(SIGNAL_ARGS); static void reaper(SIGNAL_ARGS); +static void schedule_checkpoint(SIGNAL_ARGS); static void dumpstatus(SIGNAL_ARGS); static void CleanupProc(int pid, int exitstatus); static int DoBackend(Port *port); @@ -491,7 +506,7 @@ PostmasterMain(int argc, char *argv[]) /* * In the event that some backend dumps core, send - * SIGSTOP, rather than SIGUSR1, to all its peers. This + * SIGSTOP, rather than SIGQUIT, to all its peers. This * lets the wily post_hacker collect core dumps from * everyone. */ @@ -643,11 +658,11 @@ PostmasterMain(int argc, char *argv[]) pqsignal(SIGHUP, SIGHUP_handler); /* reread config file and have children do same */ pqsignal(SIGINT, pmdie); /* send SIGTERM and ShutdownDataBase */ - pqsignal(SIGQUIT, pmdie); /* send SIGUSR1 and die */ + pqsignal(SIGQUIT, pmdie); /* send SIGQUIT and die */ pqsignal(SIGTERM, pmdie); /* wait for children and ShutdownDataBase */ pqsignal(SIGALRM, SIG_IGN); /* ignored */ pqsignal(SIGPIPE, SIG_IGN); /* ignored */ - pqsignal(SIGUSR1, pmdie); /* currently ignored, but see note in pmdie */ + pqsignal(SIGUSR1, schedule_checkpoint); /* start a background checkpoint */ pqsignal(SIGUSR2, pmdie); /* send SIGUSR2, don't die */ pqsignal(SIGCHLD, reaper); /* handle child termination */ pqsignal(SIGTTIN, SIG_IGN); /* ignored */ @@ -773,25 +788,22 @@ ServerLoop(void) struct timeval *timeout = NULL; struct timeval timeout_tv; - if (CheckPointPID == 0 && checkpointed && !FatalError) + if (CheckPointPID == 0 && checkpointed && + Shutdown == NoShutdown && !FatalError) { time_t now = time(NULL); if (CheckPointTimeout + checkpointed > now) { + /* Not time for checkpoint yet, so set a timeout for select */ timeout_tv.tv_sec = CheckPointTimeout + checkpointed - now; timeout_tv.tv_usec = 0; timeout = &timeout_tv; } else { + /* Time to make the checkpoint... */ CheckPointPID = CheckPointDataBase(); - /* - * Since this code is executed periodically, it's a fine - * place to do other actions that should happen every now - * and then on no particular schedule. Such as... - */ - TouchSocketLockFile(); } } @@ -1256,7 +1268,7 @@ ConnCreate(int serverFd) { fprintf(stderr, "%s: ConnCreate: malloc failed\n", progname); - SignalChildren(SIGUSR1); + SignalChildren(SIGQUIT); ExitPostmaster(1); } @@ -1374,7 +1386,7 @@ SIGHUP_handler(SIGNAL_ARGS) /* - * pmdie -- signal handler for cleaning up after a kill signal. + * pmdie -- signal handler for processing various postmaster signals. */ static void pmdie(SIGNAL_ARGS) @@ -1388,18 +1400,6 @@ pmdie(SIGNAL_ARGS) switch (postgres_signal_arg) { - case SIGUSR1: - /* - * Currently the postmaster ignores SIGUSR1 (maybe it should - * do something useful instead?) But we must have some handler - * installed for SIGUSR1, not just set it to SIG_IGN. Else, a - * freshly spawned backend would likewise have it set to SIG_IGN, - * which would mean the backend would ignore any attempt to kill - * it before it had gotten as far as setting up its own handler. - */ - errno = save_errno; - return; - case SIGUSR2: /* @@ -1419,7 +1419,7 @@ pmdie(SIGNAL_ARGS) /* * Smart Shutdown: * - * let children to end their work and ShutdownDataBase. + * Wait for children to end their work and ShutdownDataBase. */ if (Shutdown >= SmartShutdown) { @@ -1458,7 +1458,7 @@ pmdie(SIGNAL_ARGS) * Fast Shutdown: * * abort all children with SIGTERM (rollback active transactions - * and exit) and ShutdownDataBase. + * and exit) and ShutdownDataBase when they are gone. */ if (Shutdown >= FastShutdown) { @@ -1509,7 +1509,7 @@ pmdie(SIGNAL_ARGS) /* * Immediate Shutdown: * - * abort all children with SIGUSR1 and exit without attempt to + * abort all children with SIGQUIT and exit without attempt to * properly shutdown data base system. */ tnow = time(NULL); @@ -1517,10 +1517,10 @@ pmdie(SIGNAL_ARGS) fflush(stderr); if (ShutdownPID > 0) kill(ShutdownPID, SIGQUIT); - else if (StartupPID > 0) + if (StartupPID > 0) kill(StartupPID, SIGQUIT); - else if (DLGetHead(BackendList)) - SignalChildren(SIGUSR1); + if (DLGetHead(BackendList)) + SignalChildren(SIGQUIT); break; } @@ -1593,11 +1593,13 @@ reaper(SIGNAL_ARGS) } /* - * Startup succeeded - remember its ID - * and RedoRecPtr + * Startup succeeded - remember its ID and RedoRecPtr */ SetThisStartUpID(); + /* + * Arrange for first checkpoint to occur after standard delay. + */ CheckPointPID = 0; checkpointed = time(NULL); @@ -1697,6 +1699,7 @@ CleanupProc(int pid, if (!FatalError) { checkpointed = time(NULL); + /* Update RedoRecPtr for future child backends */ GetRedoRecPtr(); } } @@ -1731,7 +1734,7 @@ CleanupProc(int pid, * This backend is still alive. Unless we did so already, * tell it to commit hara-kiri. * - * SIGUSR1 is the special signal that says exit without proc_exit + * SIGQUIT is the special signal that says exit without proc_exit * and let the user know what's going on. But if SendStop is set * (-s on command line), then we send SIGSTOP instead, so that we * can get core dumps from all backends by hand. @@ -1741,9 +1744,9 @@ CleanupProc(int pid, if (DebugLvl) fprintf(stderr, "%s: CleanupProc: sending %s to process %d\n", progname, - (SendStop ? "SIGSTOP" : "SIGUSR1"), + (SendStop ? "SIGSTOP" : "SIGQUIT"), bp->pid); - kill(bp->pid, (SendStop ? SIGSTOP : SIGUSR1)); + kill(bp->pid, (SendStop ? SIGSTOP : SIGQUIT)); } } else @@ -1772,7 +1775,7 @@ CleanupProc(int pid, } /* - * Send a signal to all chidren processes. + * Send a signal to all backend children. */ static void SignalChildren(int signal) @@ -2108,6 +2111,24 @@ ExitPostmaster(int status) proc_exit(status); } +/* Request to schedule a checkpoint (no-op if one is currently running) */ +static void +schedule_checkpoint(SIGNAL_ARGS) +{ + int save_errno = errno; + + PG_SETMASK(&BlockSig); + + /* Ignore request if checkpointing is currently disabled */ + if (CheckPointPID == 0 && checkpointed && + Shutdown == NoShutdown && !FatalError) + { + CheckPointPID = CheckPointDataBase(); + } + + errno = save_errno; +} + static void dumpstatus(SIGNAL_ARGS) { @@ -2116,12 +2137,13 @@ dumpstatus(SIGNAL_ARGS) PG_SETMASK(&BlockSig); + fprintf(stderr, "%s: dumpstatus:\n", progname); + curr = DLGetHead(PortList); while (curr) { Port *port = DLE_VAL(curr); - fprintf(stderr, "%s: dumpstatus:\n", progname); fprintf(stderr, "\tsock %d\n", port->sock); curr = DLGetSucc(curr); } @@ -2240,6 +2262,9 @@ InitSSL(void) #endif +/* + * Fire off a subprocess for startup/shutdown/checkpoint. + */ static pid_t SSDataBase(int xlop) { @@ -2273,7 +2298,7 @@ SSDataBase(int xlop) /* Close the postmaster's sockets */ ClosePostmasterPorts(NULL); - + /* Set up command-line arguments for subprocess */ av[ac++] = "postgres"; av[ac++] = "-d"; @@ -2293,14 +2318,6 @@ SSDataBase(int xlop) optind = 1; - pqsignal(SIGQUIT, SIG_DFL); -#ifdef HAVE_SIGPROCMASK - sigdelset(&BlockSig, SIGQUIT); -#else - BlockSig &= ~(sigmask(SIGQUIT)); -#endif - PG_SETMASK(&BlockSig); - BootstrapMain(ac, av); ExitPostmaster(0); } @@ -2320,19 +2337,31 @@ SSDataBase(int xlop) ExitPostmaster(1); } - if (xlop != BS_XLOG_CHECKPOINT) - return(pid); - - if (!(bn = (Backend *) calloc(1, sizeof(Backend)))) + /* + * The startup and shutdown processes are not considered normal backends, + * but the checkpoint process is. Checkpoint must be added to the list + * of backends. + */ + if (xlop == BS_XLOG_CHECKPOINT) { - fprintf(stderr, "%s: CheckPointDataBase: malloc failed\n", - progname); - ExitPostmaster(1); - } + if (!(bn = (Backend *) calloc(1, sizeof(Backend)))) + { + fprintf(stderr, "%s: CheckPointDataBase: malloc failed\n", + progname); + ExitPostmaster(1); + } - bn->pid = pid; - bn->cancel_key = 0; - DLAddHead(BackendList, DLNewElem(bn)); + bn->pid = pid; + bn->cancel_key = PostmasterRandom(); + DLAddHead(BackendList, DLNewElem(bn)); + + /* + * Since this code is executed periodically, it's a fine + * place to do other actions that should happen every now + * and then on no particular schedule. Such as... + */ + TouchSocketLockFile(); + } return (pid); } diff --git a/src/backend/storage/ipc/ipc.c b/src/backend/storage/ipc/ipc.c index c529547fcf..eb8d488bdd 100644 --- a/src/backend/storage/ipc/ipc.c +++ b/src/backend/storage/ipc/ipc.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/ipc/ipc.c,v 1.62 2001/01/24 19:43:07 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/ipc/ipc.c,v 1.63 2001/03/13 01:17:06 tgl Exp $ * * NOTES * @@ -626,6 +626,9 @@ InternalIpcMemoryCreate(IpcMemoryKey memKey, uint32 size, int permission) /* Register on-exit routine to detach new segment before deleting */ on_shmem_exit(IpcMemoryDetach, PointerGetDatum(memAddress)); + /* Record key and ID in lockfile for data directory. */ + RecordSharedMemoryInLockFile(memKey, shmid); + return memAddress; } @@ -660,6 +663,41 @@ IpcMemoryDelete(int status, Datum shmId) */ } +/****************************************************************************/ +/* SharedMemoryIsInUse(shmKey, shmId) Is a shared memory segment in use? */ +/****************************************************************************/ +bool +SharedMemoryIsInUse(IpcMemoryKey shmKey, IpcMemoryId shmId) +{ + struct shmid_ds shmStat; + + /* + * We detect whether a shared memory segment is in use by seeing whether + * it (a) exists and (b) has any processes are attached to it. + * + * If we are unable to perform the stat operation for a reason other than + * nonexistence of the segment (most likely, because it doesn't belong to + * our userid), assume it is in use. + */ + if (shmctl(shmId, IPC_STAT, &shmStat) < 0) + { + /* + * EINVAL actually has multiple possible causes documented in the + * shmctl man page, but we assume it must mean the segment no longer + * exists. + */ + if (errno == EINVAL) + return false; + /* Else assume segment is in use */ + return true; + } + /* If it has attached processes, it's in use */ + if (shmStat.shm_nattch != 0) + return true; + return false; +} + + /* ---------------------------------------------------------------- * private memory support * diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index fa0dbc1312..326a05a348 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/tcop/postgres.c,v 1.209 2001/03/09 06:36:32 inoue Exp $ + * $Header: /cvsroot/pgsql/src/backend/tcop/postgres.c,v 1.210 2001/03/13 01:17:06 tgl Exp $ * * NOTES * this is the "main" module of the postgres backend and @@ -128,7 +128,6 @@ static void start_xact_command(void); static void finish_xact_command(void); static void SigHupHandler(SIGNAL_ARGS); static void FloatExceptionHandler(SIGNAL_ARGS); -static void quickdie(SIGNAL_ARGS); /* * Flag to mark SIGHUP. Whenever the main loop comes around it @@ -895,12 +894,12 @@ finish_xact_command(void) */ /* - * quickdie() occurs when signalled SIGUSR1 by the postmaster. + * quickdie() occurs when signalled SIGQUIT by the postmaster. * * Some backend has bought the farm, * so we need to stop what we're doing and exit. */ -static void +void quickdie(SIGNAL_ARGS) { PG_SETMASK(&BlockSig); @@ -917,7 +916,7 @@ quickdie(SIGNAL_ARGS) * Just nail the windows shut and get out of town. * * Note we do exit(1) not exit(0). This is to force the postmaster - * into a system reset cycle if some idiot DBA sends a manual SIGUSR1 + * into a system reset cycle if some idiot DBA sends a manual SIGQUIT * to a random backend. This is necessary precisely because we don't * clean up our shared memory state. */ @@ -987,8 +986,8 @@ QueryCancelHandler(SIGNAL_ARGS) InterruptHoldoffCount++; if (LockWaitCancel()) { - InterruptHoldoffCount--; DisableNotifyInterrupt(); + InterruptHoldoffCount--; ProcessInterrupts(); } else @@ -1205,9 +1204,7 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[], const cha case 'D': /* PGDATA directory */ if (secure) - { potential_DataDir = optarg; - } break; case 'd': /* debug level */ @@ -1243,13 +1240,10 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[], const cha case 'F': /* -------------------- * turn off fsync - * - * 7.0 buffer manager can support different backends running - * with different fsync settings, so this no longer needs - * to be "if (secure)". * -------------------- */ - enableFsync = false; + if (secure) + enableFsync = false; break; case 'f': @@ -1504,13 +1498,18 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[], const cha * Note that postmaster blocked all signals before forking child process, * so there is no race condition whereby we might receive a signal before * we have set up the handler. + * + * Also note: it's best not to use any signals that are SIG_IGNored in + * the postmaster. If such a signal arrives before we are able to change + * the handler to non-SIG_IGN, it'll get dropped. If necessary, make a + * dummy handler in the postmaster to reserve the signal. */ pqsignal(SIGHUP, SigHupHandler); /* set flag to read config file */ pqsignal(SIGINT, QueryCancelHandler); /* cancel current query */ pqsignal(SIGTERM, die); /* cancel current query and exit */ - pqsignal(SIGQUIT, die); /* could reassign this sig for another use */ - pqsignal(SIGALRM, HandleDeadLock); + pqsignal(SIGQUIT, quickdie); /* hard crash time */ + pqsignal(SIGALRM, HandleDeadLock); /* check for deadlock after timeout */ /* * Ignore failure to write to frontend. Note: if frontend closes @@ -1519,7 +1518,7 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[], const cha * midst of output during who-knows-what operation... */ pqsignal(SIGPIPE, SIG_IGN); - pqsignal(SIGUSR1, quickdie); + pqsignal(SIGUSR1, SIG_IGN); /* this signal available for use */ pqsignal(SIGUSR2, Async_NotifyHandler); /* flush also sinval cache */ pqsignal(SIGFPE, FloatExceptionHandler); pqsignal(SIGCHLD, SIG_IGN); /* ignored (may get this in system() calls) */ @@ -1534,14 +1533,14 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[], const cha pqinitmask(); - /* We allow SIGUSR1 (quickdie) at all times */ + /* We allow SIGQUIT (quickdie) at all times */ #ifdef HAVE_SIGPROCMASK - sigdelset(&BlockSig, SIGUSR1); + sigdelset(&BlockSig, SIGQUIT); #else - BlockSig &= ~(sigmask(SIGUSR1)); + BlockSig &= ~(sigmask(SIGQUIT)); #endif - PG_SETMASK(&BlockSig); /* block everything except SIGUSR1 */ + PG_SETMASK(&BlockSig); /* block everything except SIGQUIT */ if (IsUnderPostmaster) @@ -1693,7 +1692,7 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[], const cha if (!IsUnderPostmaster) { puts("\nPOSTGRES backend interactive interface "); - puts("$Revision: 1.209 $ $Date: 2001/03/09 06:36:32 $\n"); + puts("$Revision: 1.210 $ $Date: 2001/03/13 01:17:06 $\n"); } /* diff --git a/src/backend/utils/hash/Makefile b/src/backend/utils/hash/Makefile index 38db1cda30..a3749e37d2 100644 --- a/src/backend/utils/hash/Makefile +++ b/src/backend/utils/hash/Makefile @@ -4,7 +4,7 @@ # Makefile for utils/hash # # IDENTIFICATION -# $Header: /cvsroot/pgsql/src/backend/utils/hash/Makefile,v 1.9 2000/08/31 16:10:51 petere Exp $ +# $Header: /cvsroot/pgsql/src/backend/utils/hash/Makefile,v 1.10 2001/03/13 01:17:06 tgl Exp $ # #------------------------------------------------------------------------- @@ -12,7 +12,7 @@ subdir = src/backend/utils/hash top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = dynahash.o hashfn.o +OBJS = dynahash.o hashfn.o pg_crc.o all: SUBSYS.o diff --git a/src/backend/utils/hash/pg_crc.c b/src/backend/utils/hash/pg_crc.c new file mode 100644 index 0000000000..96413f3b8b --- /dev/null +++ b/src/backend/utils/hash/pg_crc.c @@ -0,0 +1,417 @@ +/*------------------------------------------------------------------------- + * + * pg_crc.c + * PostgreSQL 64-bit CRC support + * + * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * $Header: /cvsroot/pgsql/src/backend/utils/hash/pg_crc.c,v 1.1 2001/03/13 01:17:06 tgl Exp $ + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "utils/pg_crc.h" + + +#ifdef INT64_IS_BUSTED + +const uint32 crc_table0[256] = { + 0x00000000, 0xA9EA3693, + 0x53D46D26, 0xFA3E5BB5, + 0x0E42ECDF, 0xA7A8DA4C, + 0x5D9681F9, 0xF47CB76A, + 0x1C85D9BE, 0xB56FEF2D, + 0x4F51B498, 0xE6BB820B, + 0x12C73561, 0xBB2D03F2, + 0x41135847, 0xE8F96ED4, + 0x90E185EF, 0x390BB37C, + 0xC335E8C9, 0x6ADFDE5A, + 0x9EA36930, 0x37495FA3, + 0xCD770416, 0x649D3285, + 0x8C645C51, 0x258E6AC2, + 0xDFB03177, 0x765A07E4, + 0x8226B08E, 0x2BCC861D, + 0xD1F2DDA8, 0x7818EB3B, + 0x21C30BDE, 0x88293D4D, + 0x721766F8, 0xDBFD506B, + 0x2F81E701, 0x866BD192, + 0x7C558A27, 0xD5BFBCB4, + 0x3D46D260, 0x94ACE4F3, + 0x6E92BF46, 0xC77889D5, + 0x33043EBF, 0x9AEE082C, + 0x60D05399, 0xC93A650A, + 0xB1228E31, 0x18C8B8A2, + 0xE2F6E317, 0x4B1CD584, + 0xBF6062EE, 0x168A547D, + 0xECB40FC8, 0x455E395B, + 0xADA7578F, 0x044D611C, + 0xFE733AA9, 0x57990C3A, + 0xA3E5BB50, 0x0A0F8DC3, + 0xF031D676, 0x59DBE0E5, + 0xEA6C212F, 0x438617BC, + 0xB9B84C09, 0x10527A9A, + 0xE42ECDF0, 0x4DC4FB63, + 0xB7FAA0D6, 0x1E109645, + 0xF6E9F891, 0x5F03CE02, + 0xA53D95B7, 0x0CD7A324, + 0xF8AB144E, 0x514122DD, + 0xAB7F7968, 0x02954FFB, + 0x7A8DA4C0, 0xD3679253, + 0x2959C9E6, 0x80B3FF75, + 0x74CF481F, 0xDD257E8C, + 0x271B2539, 0x8EF113AA, + 0x66087D7E, 0xCFE24BED, + 0x35DC1058, 0x9C3626CB, + 0x684A91A1, 0xC1A0A732, + 0x3B9EFC87, 0x9274CA14, + 0xCBAF2AF1, 0x62451C62, + 0x987B47D7, 0x31917144, + 0xC5EDC62E, 0x6C07F0BD, + 0x9639AB08, 0x3FD39D9B, + 0xD72AF34F, 0x7EC0C5DC, + 0x84FE9E69, 0x2D14A8FA, + 0xD9681F90, 0x70822903, + 0x8ABC72B6, 0x23564425, + 0x5B4EAF1E, 0xF2A4998D, + 0x089AC238, 0xA170F4AB, + 0x550C43C1, 0xFCE67552, + 0x06D82EE7, 0xAF321874, + 0x47CB76A0, 0xEE214033, + 0x141F1B86, 0xBDF52D15, + 0x49899A7F, 0xE063ACEC, + 0x1A5DF759, 0xB3B7C1CA, + 0x7D3274CD, 0xD4D8425E, + 0x2EE619EB, 0x870C2F78, + 0x73709812, 0xDA9AAE81, + 0x20A4F534, 0x894EC3A7, + 0x61B7AD73, 0xC85D9BE0, + 0x3263C055, 0x9B89F6C6, + 0x6FF541AC, 0xC61F773F, + 0x3C212C8A, 0x95CB1A19, + 0xEDD3F122, 0x4439C7B1, + 0xBE079C04, 0x17EDAA97, + 0xE3911DFD, 0x4A7B2B6E, + 0xB04570DB, 0x19AF4648, + 0xF156289C, 0x58BC1E0F, + 0xA28245BA, 0x0B687329, + 0xFF14C443, 0x56FEF2D0, + 0xACC0A965, 0x052A9FF6, + 0x5CF17F13, 0xF51B4980, + 0x0F251235, 0xA6CF24A6, + 0x52B393CC, 0xFB59A55F, + 0x0167FEEA, 0xA88DC879, + 0x4074A6AD, 0xE99E903E, + 0x13A0CB8B, 0xBA4AFD18, + 0x4E364A72, 0xE7DC7CE1, + 0x1DE22754, 0xB40811C7, + 0xCC10FAFC, 0x65FACC6F, + 0x9FC497DA, 0x362EA149, + 0xC2521623, 0x6BB820B0, + 0x91867B05, 0x386C4D96, + 0xD0952342, 0x797F15D1, + 0x83414E64, 0x2AAB78F7, + 0xDED7CF9D, 0x773DF90E, + 0x8D03A2BB, 0x24E99428, + 0x975E55E2, 0x3EB46371, + 0xC48A38C4, 0x6D600E57, + 0x991CB93D, 0x30F68FAE, + 0xCAC8D41B, 0x6322E288, + 0x8BDB8C5C, 0x2231BACF, + 0xD80FE17A, 0x71E5D7E9, + 0x85996083, 0x2C735610, + 0xD64D0DA5, 0x7FA73B36, + 0x07BFD00D, 0xAE55E69E, + 0x546BBD2B, 0xFD818BB8, + 0x09FD3CD2, 0xA0170A41, + 0x5A2951F4, 0xF3C36767, + 0x1B3A09B3, 0xB2D03F20, + 0x48EE6495, 0xE1045206, + 0x1578E56C, 0xBC92D3FF, + 0x46AC884A, 0xEF46BED9, + 0xB69D5E3C, 0x1F7768AF, + 0xE549331A, 0x4CA30589, + 0xB8DFB2E3, 0x11358470, + 0xEB0BDFC5, 0x42E1E956, + 0xAA188782, 0x03F2B111, + 0xF9CCEAA4, 0x5026DC37, + 0xA45A6B5D, 0x0DB05DCE, + 0xF78E067B, 0x5E6430E8, + 0x267CDBD3, 0x8F96ED40, + 0x75A8B6F5, 0xDC428066, + 0x283E370C, 0x81D4019F, + 0x7BEA5A2A, 0xD2006CB9, + 0x3AF9026D, 0x931334FE, + 0x692D6F4B, 0xC0C759D8, + 0x34BBEEB2, 0x9D51D821, + 0x676F8394, 0xCE85B507 +}; + +const uint32 crc_table1[256] = { + 0x00000000, 0x42F0E1EB, + 0x85E1C3D7, 0xC711223C, + 0x49336645, 0x0BC387AE, + 0xCCD2A592, 0x8E224479, + 0x9266CC8A, 0xD0962D61, + 0x17870F5D, 0x5577EEB6, + 0xDB55AACF, 0x99A54B24, + 0x5EB46918, 0x1C4488F3, + 0x663D78FF, 0x24CD9914, + 0xE3DCBB28, 0xA12C5AC3, + 0x2F0E1EBA, 0x6DFEFF51, + 0xAAEFDD6D, 0xE81F3C86, + 0xF45BB475, 0xB6AB559E, + 0x71BA77A2, 0x334A9649, + 0xBD68D230, 0xFF9833DB, + 0x388911E7, 0x7A79F00C, + 0xCC7AF1FF, 0x8E8A1014, + 0x499B3228, 0x0B6BD3C3, + 0x854997BA, 0xC7B97651, + 0x00A8546D, 0x4258B586, + 0x5E1C3D75, 0x1CECDC9E, + 0xDBFDFEA2, 0x990D1F49, + 0x172F5B30, 0x55DFBADB, + 0x92CE98E7, 0xD03E790C, + 0xAA478900, 0xE8B768EB, + 0x2FA64AD7, 0x6D56AB3C, + 0xE374EF45, 0xA1840EAE, + 0x66952C92, 0x2465CD79, + 0x3821458A, 0x7AD1A461, + 0xBDC0865D, 0xFF3067B6, + 0x711223CF, 0x33E2C224, + 0xF4F3E018, 0xB60301F3, + 0xDA050215, 0x98F5E3FE, + 0x5FE4C1C2, 0x1D142029, + 0x93366450, 0xD1C685BB, + 0x16D7A787, 0x5427466C, + 0x4863CE9F, 0x0A932F74, + 0xCD820D48, 0x8F72ECA3, + 0x0150A8DA, 0x43A04931, + 0x84B16B0D, 0xC6418AE6, + 0xBC387AEA, 0xFEC89B01, + 0x39D9B93D, 0x7B2958D6, + 0xF50B1CAF, 0xB7FBFD44, + 0x70EADF78, 0x321A3E93, + 0x2E5EB660, 0x6CAE578B, + 0xABBF75B7, 0xE94F945C, + 0x676DD025, 0x259D31CE, + 0xE28C13F2, 0xA07CF219, + 0x167FF3EA, 0x548F1201, + 0x939E303D, 0xD16ED1D6, + 0x5F4C95AF, 0x1DBC7444, + 0xDAAD5678, 0x985DB793, + 0x84193F60, 0xC6E9DE8B, + 0x01F8FCB7, 0x43081D5C, + 0xCD2A5925, 0x8FDAB8CE, + 0x48CB9AF2, 0x0A3B7B19, + 0x70428B15, 0x32B26AFE, + 0xF5A348C2, 0xB753A929, + 0x3971ED50, 0x7B810CBB, + 0xBC902E87, 0xFE60CF6C, + 0xE224479F, 0xA0D4A674, + 0x67C58448, 0x253565A3, + 0xAB1721DA, 0xE9E7C031, + 0x2EF6E20D, 0x6C0603E6, + 0xF6FAE5C0, 0xB40A042B, + 0x731B2617, 0x31EBC7FC, + 0xBFC98385, 0xFD39626E, + 0x3A284052, 0x78D8A1B9, + 0x649C294A, 0x266CC8A1, + 0xE17DEA9D, 0xA38D0B76, + 0x2DAF4F0F, 0x6F5FAEE4, + 0xA84E8CD8, 0xEABE6D33, + 0x90C79D3F, 0xD2377CD4, + 0x15265EE8, 0x57D6BF03, + 0xD9F4FB7A, 0x9B041A91, + 0x5C1538AD, 0x1EE5D946, + 0x02A151B5, 0x4051B05E, + 0x87409262, 0xC5B07389, + 0x4B9237F0, 0x0962D61B, + 0xCE73F427, 0x8C8315CC, + 0x3A80143F, 0x7870F5D4, + 0xBF61D7E8, 0xFD913603, + 0x73B3727A, 0x31439391, + 0xF652B1AD, 0xB4A25046, + 0xA8E6D8B5, 0xEA16395E, + 0x2D071B62, 0x6FF7FA89, + 0xE1D5BEF0, 0xA3255F1B, + 0x64347D27, 0x26C49CCC, + 0x5CBD6CC0, 0x1E4D8D2B, + 0xD95CAF17, 0x9BAC4EFC, + 0x158E0A85, 0x577EEB6E, + 0x906FC952, 0xD29F28B9, + 0xCEDBA04A, 0x8C2B41A1, + 0x4B3A639D, 0x09CA8276, + 0x87E8C60F, 0xC51827E4, + 0x020905D8, 0x40F9E433, + 0x2CFFE7D5, 0x6E0F063E, + 0xA91E2402, 0xEBEEC5E9, + 0x65CC8190, 0x273C607B, + 0xE02D4247, 0xA2DDA3AC, + 0xBE992B5F, 0xFC69CAB4, + 0x3B78E888, 0x79880963, + 0xF7AA4D1A, 0xB55AACF1, + 0x724B8ECD, 0x30BB6F26, + 0x4AC29F2A, 0x08327EC1, + 0xCF235CFD, 0x8DD3BD16, + 0x03F1F96F, 0x41011884, + 0x86103AB8, 0xC4E0DB53, + 0xD8A453A0, 0x9A54B24B, + 0x5D459077, 0x1FB5719C, + 0x919735E5, 0xD367D40E, + 0x1476F632, 0x568617D9, + 0xE085162A, 0xA275F7C1, + 0x6564D5FD, 0x27943416, + 0xA9B6706F, 0xEB469184, + 0x2C57B3B8, 0x6EA75253, + 0x72E3DAA0, 0x30133B4B, + 0xF7021977, 0xB5F2F89C, + 0x3BD0BCE5, 0x79205D0E, + 0xBE317F32, 0xFCC19ED9, + 0x86B86ED5, 0xC4488F3E, + 0x0359AD02, 0x41A94CE9, + 0xCF8B0890, 0x8D7BE97B, + 0x4A6ACB47, 0x089A2AAC, + 0x14DEA25F, 0x562E43B4, + 0x913F6188, 0xD3CF8063, + 0x5DEDC41A, 0x1F1D25F1, + 0xD80C07CD, 0x9AFCE626 +}; + +#else /* int64 works */ + +const uint64 crc_table[256] = { + 0x0000000000000000, 0x42F0E1EBA9EA3693, + 0x85E1C3D753D46D26, 0xC711223CFA3E5BB5, + 0x493366450E42ECDF, 0x0BC387AEA7A8DA4C, + 0xCCD2A5925D9681F9, 0x8E224479F47CB76A, + 0x9266CC8A1C85D9BE, 0xD0962D61B56FEF2D, + 0x17870F5D4F51B498, 0x5577EEB6E6BB820B, + 0xDB55AACF12C73561, 0x99A54B24BB2D03F2, + 0x5EB4691841135847, 0x1C4488F3E8F96ED4, + 0x663D78FF90E185EF, 0x24CD9914390BB37C, + 0xE3DCBB28C335E8C9, 0xA12C5AC36ADFDE5A, + 0x2F0E1EBA9EA36930, 0x6DFEFF5137495FA3, + 0xAAEFDD6DCD770416, 0xE81F3C86649D3285, + 0xF45BB4758C645C51, 0xB6AB559E258E6AC2, + 0x71BA77A2DFB03177, 0x334A9649765A07E4, + 0xBD68D2308226B08E, 0xFF9833DB2BCC861D, + 0x388911E7D1F2DDA8, 0x7A79F00C7818EB3B, + 0xCC7AF1FF21C30BDE, 0x8E8A101488293D4D, + 0x499B3228721766F8, 0x0B6BD3C3DBFD506B, + 0x854997BA2F81E701, 0xC7B97651866BD192, + 0x00A8546D7C558A27, 0x4258B586D5BFBCB4, + 0x5E1C3D753D46D260, 0x1CECDC9E94ACE4F3, + 0xDBFDFEA26E92BF46, 0x990D1F49C77889D5, + 0x172F5B3033043EBF, 0x55DFBADB9AEE082C, + 0x92CE98E760D05399, 0xD03E790CC93A650A, + 0xAA478900B1228E31, 0xE8B768EB18C8B8A2, + 0x2FA64AD7E2F6E317, 0x6D56AB3C4B1CD584, + 0xE374EF45BF6062EE, 0xA1840EAE168A547D, + 0x66952C92ECB40FC8, 0x2465CD79455E395B, + 0x3821458AADA7578F, 0x7AD1A461044D611C, + 0xBDC0865DFE733AA9, 0xFF3067B657990C3A, + 0x711223CFA3E5BB50, 0x33E2C2240A0F8DC3, + 0xF4F3E018F031D676, 0xB60301F359DBE0E5, + 0xDA050215EA6C212F, 0x98F5E3FE438617BC, + 0x5FE4C1C2B9B84C09, 0x1D14202910527A9A, + 0x93366450E42ECDF0, 0xD1C685BB4DC4FB63, + 0x16D7A787B7FAA0D6, 0x5427466C1E109645, + 0x4863CE9FF6E9F891, 0x0A932F745F03CE02, + 0xCD820D48A53D95B7, 0x8F72ECA30CD7A324, + 0x0150A8DAF8AB144E, 0x43A04931514122DD, + 0x84B16B0DAB7F7968, 0xC6418AE602954FFB, + 0xBC387AEA7A8DA4C0, 0xFEC89B01D3679253, + 0x39D9B93D2959C9E6, 0x7B2958D680B3FF75, + 0xF50B1CAF74CF481F, 0xB7FBFD44DD257E8C, + 0x70EADF78271B2539, 0x321A3E938EF113AA, + 0x2E5EB66066087D7E, 0x6CAE578BCFE24BED, + 0xABBF75B735DC1058, 0xE94F945C9C3626CB, + 0x676DD025684A91A1, 0x259D31CEC1A0A732, + 0xE28C13F23B9EFC87, 0xA07CF2199274CA14, + 0x167FF3EACBAF2AF1, 0x548F120162451C62, + 0x939E303D987B47D7, 0xD16ED1D631917144, + 0x5F4C95AFC5EDC62E, 0x1DBC74446C07F0BD, + 0xDAAD56789639AB08, 0x985DB7933FD39D9B, + 0x84193F60D72AF34F, 0xC6E9DE8B7EC0C5DC, + 0x01F8FCB784FE9E69, 0x43081D5C2D14A8FA, + 0xCD2A5925D9681F90, 0x8FDAB8CE70822903, + 0x48CB9AF28ABC72B6, 0x0A3B7B1923564425, + 0x70428B155B4EAF1E, 0x32B26AFEF2A4998D, + 0xF5A348C2089AC238, 0xB753A929A170F4AB, + 0x3971ED50550C43C1, 0x7B810CBBFCE67552, + 0xBC902E8706D82EE7, 0xFE60CF6CAF321874, + 0xE224479F47CB76A0, 0xA0D4A674EE214033, + 0x67C58448141F1B86, 0x253565A3BDF52D15, + 0xAB1721DA49899A7F, 0xE9E7C031E063ACEC, + 0x2EF6E20D1A5DF759, 0x6C0603E6B3B7C1CA, + 0xF6FAE5C07D3274CD, 0xB40A042BD4D8425E, + 0x731B26172EE619EB, 0x31EBC7FC870C2F78, + 0xBFC9838573709812, 0xFD39626EDA9AAE81, + 0x3A28405220A4F534, 0x78D8A1B9894EC3A7, + 0x649C294A61B7AD73, 0x266CC8A1C85D9BE0, + 0xE17DEA9D3263C055, 0xA38D0B769B89F6C6, + 0x2DAF4F0F6FF541AC, 0x6F5FAEE4C61F773F, + 0xA84E8CD83C212C8A, 0xEABE6D3395CB1A19, + 0x90C79D3FEDD3F122, 0xD2377CD44439C7B1, + 0x15265EE8BE079C04, 0x57D6BF0317EDAA97, + 0xD9F4FB7AE3911DFD, 0x9B041A914A7B2B6E, + 0x5C1538ADB04570DB, 0x1EE5D94619AF4648, + 0x02A151B5F156289C, 0x4051B05E58BC1E0F, + 0x87409262A28245BA, 0xC5B073890B687329, + 0x4B9237F0FF14C443, 0x0962D61B56FEF2D0, + 0xCE73F427ACC0A965, 0x8C8315CC052A9FF6, + 0x3A80143F5CF17F13, 0x7870F5D4F51B4980, + 0xBF61D7E80F251235, 0xFD913603A6CF24A6, + 0x73B3727A52B393CC, 0x31439391FB59A55F, + 0xF652B1AD0167FEEA, 0xB4A25046A88DC879, + 0xA8E6D8B54074A6AD, 0xEA16395EE99E903E, + 0x2D071B6213A0CB8B, 0x6FF7FA89BA4AFD18, + 0xE1D5BEF04E364A72, 0xA3255F1BE7DC7CE1, + 0x64347D271DE22754, 0x26C49CCCB40811C7, + 0x5CBD6CC0CC10FAFC, 0x1E4D8D2B65FACC6F, + 0xD95CAF179FC497DA, 0x9BAC4EFC362EA149, + 0x158E0A85C2521623, 0x577EEB6E6BB820B0, + 0x906FC95291867B05, 0xD29F28B9386C4D96, + 0xCEDBA04AD0952342, 0x8C2B41A1797F15D1, + 0x4B3A639D83414E64, 0x09CA82762AAB78F7, + 0x87E8C60FDED7CF9D, 0xC51827E4773DF90E, + 0x020905D88D03A2BB, 0x40F9E43324E99428, + 0x2CFFE7D5975E55E2, 0x6E0F063E3EB46371, + 0xA91E2402C48A38C4, 0xEBEEC5E96D600E57, + 0x65CC8190991CB93D, 0x273C607B30F68FAE, + 0xE02D4247CAC8D41B, 0xA2DDA3AC6322E288, + 0xBE992B5F8BDB8C5C, 0xFC69CAB42231BACF, + 0x3B78E888D80FE17A, 0x7988096371E5D7E9, + 0xF7AA4D1A85996083, 0xB55AACF12C735610, + 0x724B8ECDD64D0DA5, 0x30BB6F267FA73B36, + 0x4AC29F2A07BFD00D, 0x08327EC1AE55E69E, + 0xCF235CFD546BBD2B, 0x8DD3BD16FD818BB8, + 0x03F1F96F09FD3CD2, 0x41011884A0170A41, + 0x86103AB85A2951F4, 0xC4E0DB53F3C36767, + 0xD8A453A01B3A09B3, 0x9A54B24BB2D03F20, + 0x5D45907748EE6495, 0x1FB5719CE1045206, + 0x919735E51578E56C, 0xD367D40EBC92D3FF, + 0x1476F63246AC884A, 0x568617D9EF46BED9, + 0xE085162AB69D5E3C, 0xA275F7C11F7768AF, + 0x6564D5FDE549331A, 0x279434164CA30589, + 0xA9B6706FB8DFB2E3, 0xEB46918411358470, + 0x2C57B3B8EB0BDFC5, 0x6EA7525342E1E956, + 0x72E3DAA0AA188782, 0x30133B4B03F2B111, + 0xF7021977F9CCEAA4, 0xB5F2F89C5026DC37, + 0x3BD0BCE5A45A6B5D, 0x79205D0E0DB05DCE, + 0xBE317F32F78E067B, 0xFCC19ED95E6430E8, + 0x86B86ED5267CDBD3, 0xC4488F3E8F96ED40, + 0x0359AD0275A8B6F5, 0x41A94CE9DC428066, + 0xCF8B0890283E370C, 0x8D7BE97B81D4019F, + 0x4A6ACB477BEA5A2A, 0x089A2AACD2006CB9, + 0x14DEA25F3AF9026D, 0x562E43B4931334FE, + 0x913F6188692D6F4B, 0xD3CF8063C0C759D8, + 0x5DEDC41A34BBEEB2, 0x1F1D25F19D51D821, + 0xD80C07CD676F8394, 0x9AFCE626CE85B507 +}; + +#endif /* INT64_IS_BUSTED */ diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c index 7e072a88b2..43331badc7 100644 --- a/src/backend/utils/init/globals.c +++ b/src/backend/utils/init/globals.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/utils/init/globals.c,v 1.53 2001/01/24 19:43:16 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/utils/init/globals.c,v 1.54 2001/03/13 01:17:06 tgl Exp $ * * NOTES * Globals used all over the place should be declared here and not @@ -16,14 +16,14 @@ * *------------------------------------------------------------------------- */ +#include "postgres.h" + #include #include #include #include #include -#include "postgres.h" - #include "catalog/catname.h" #include "catalog/indexing.h" #include "libpq/pqcomm.h" @@ -119,49 +119,3 @@ char *SharedSystemRelationNames[] = { VariableRelationName, 0 }; - -uint32 crc_table[] = { -0x00000000, 0x77073096, 0xee0e612c, 0x990951ba, 0x076dc419, 0x706af48f, -0xe963a535, 0x9e6495a3, 0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988, -0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91, 0x1db71064, 0x6ab020f2, -0xf3b97148, 0x84be41de, 0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7, -0x136c9856, 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec, 0x14015c4f, 0x63066cd9, -0xfa0f3d63, 0x8d080df5, 0x3b6e20c8, 0x4c69105e, 0xd56041e4, 0xa2677172, -0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b, 0x35b5a8fa, 0x42b2986c, -0xdbbbc9d6, 0xacbcf940, 0x32d86ce3, 0x45df5c75, 0xdcd60dcf, 0xabd13d59, -0x26d930ac, 0x51de003a, 0xc8d75180, 0xbfd06116, 0x21b4f4b5, 0x56b3c423, -0xcfba9599, 0xb8bda50f, 0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924, -0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d, 0x76dc4190, 0x01db7106, -0x98d220bc, 0xefd5102a, 0x71b18589, 0x06b6b51f, 0x9fbfe4a5, 0xe8b8d433, -0x7807c9a2, 0x0f00f934, 0x9609a88e, 0xe10e9818, 0x7f6a0dbb, 0x086d3d2d, -0x91646c97, 0xe6635c01, 0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e, -0x6c0695ed, 0x1b01a57b, 0x8208f4c1, 0xf50fc457, 0x65b0d9c6, 0x12b7e950, -0x8bbeb8ea, 0xfcb9887c, 0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, 0xfbd44c65, -0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2, 0x4adfa541, 0x3dd895d7, -0xa4d1c46d, 0xd3d6f4fb, 0x4369e96a, 0x346ed9fc, 0xad678846, 0xda60b8d0, -0x44042d73, 0x33031de5, 0xaa0a4c5f, 0xdd0d7cc9, 0x5005713c, 0x270241aa, -0xbe0b1010, 0xc90c2086, 0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f, -0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4, 0x59b33d17, 0x2eb40d81, -0xb7bd5c3b, 0xc0ba6cad, 0xedb88320, 0x9abfb3b6, 0x03b6e20c, 0x74b1d29a, -0xead54739, 0x9dd277af, 0x04db2615, 0x73dc1683, 0xe3630b12, 0x94643b84, -0x0d6d6a3e, 0x7a6a5aa8, 0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1, -0xf00f9344, 0x8708a3d2, 0x1e01f268, 0x6906c2fe, 0xf762575d, 0x806567cb, -0x196c3671, 0x6e6b06e7, 0xfed41b76, 0x89d32be0, 0x10da7a5a, 0x67dd4acc, -0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5, 0xd6d6a3e8, 0xa1d1937e, -0x38d8c2c4, 0x4fdff252, 0xd1bb67f1, 0xa6bc5767, 0x3fb506dd, 0x48b2364b, -0xd80d2bda, 0xaf0a1b4c, 0x36034af6, 0x41047a60, 0xdf60efc3, 0xa867df55, -0x316e8eef, 0x4669be79, 0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236, -0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f, 0xc5ba3bbe, 0xb2bd0b28, -0x2bb45a92, 0x5cb36a04, 0xc2d7ffa7, 0xb5d0cf31, 0x2cd99e8b, 0x5bdeae1d, -0x9b64c2b0, 0xec63f226, 0x756aa39c, 0x026d930a, 0x9c0906a9, 0xeb0e363f, -0x72076785, 0x05005713, 0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38, -0x92d28e9b, 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21, 0x86d3d2d4, 0xf1d4e242, -0x68ddb3f8, 0x1fda836e, 0x81be16cd, 0xf6b9265b, 0x6fb077e1, 0x18b74777, -0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c, 0x8f659eff, 0xf862ae69, -0x616bffd3, 0x166ccf45, 0xa00ae278, 0xd70dd2ee, 0x4e048354, 0x3903b3c2, -0xa7672661, 0xd06016f7, 0x4969474d, 0x3e6e77db, 0xaed16a4a, 0xd9d65adc, -0x40df0b66, 0x37d83bf0, 0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9, -0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6, 0xbad03605, 0xcdd70693, -0x54de5729, 0x23d967bf, 0xb3667a2e, 0xc4614ab8, 0x5d681b02, 0x2a6f2b94, -0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d -}; diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c index 0ecfa27244..c6192aa0c6 100644 --- a/src/backend/utils/init/miscinit.c +++ b/src/backend/utils/init/miscinit.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/utils/init/miscinit.c,v 1.61 2001/01/27 00:05:31 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/utils/init/miscinit.c,v 1.62 2001/03/13 01:17:06 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -41,7 +41,8 @@ unsigned char RecodeBackTable[128]; ProcessingMode Mode = InitProcessing; -/* Note: we rely on this to initialize as zeroes */ +/* Note: we rely on these to initialize as zeroes */ +static char directoryLockFile[MAXPGPATH]; static char socketLockFile[MAXPGPATH]; @@ -458,6 +459,9 @@ GetUserName(Oid userid) * The path is also just for informational purposes (so that a socket lockfile * can be more easily traced to the associated postmaster). * + * A data-directory lockfile can optionally contain a third line, containing + * the key and ID for the shared memory block used by this postmaster. + * * On successful lockfile creation, a proc_exit callback to remove the * lockfile is automatically created. *------------------------------------------------------------------------- @@ -476,16 +480,18 @@ UnlinkLockFile(int status, Datum filename) /* * Create a lockfile, if possible * - * Call CreateLockFile with the name of the lockfile to be created. If - * successful, it returns zero. On detecting a collision, it returns - * the PID or negated PID of the lockfile owner --- the caller is responsible - * for producing an appropriate error message. + * Call CreateLockFile with the name of the lockfile to be created. + * Returns true if successful, false if not (with a message on stderr). + * + * amPostmaster is used to determine how to encode the output PID. + * isDDLock and refName are used to determine what error message to produce. */ -static int -CreateLockFile(const char *filename, bool amPostmaster) +static bool +CreateLockFile(const char *filename, bool amPostmaster, + bool isDDLock, const char *refName) { int fd; - char buffer[MAXPGPATH + 32]; + char buffer[MAXPGPATH + 100]; int len; int encoded_pid; pid_t other_pid; @@ -539,11 +545,61 @@ CreateLockFile(const char *filename, bool amPostmaster) { if (kill(other_pid, 0) == 0 || errno != ESRCH) - return encoded_pid; /* lockfile belongs to a live process */ + { + /* lockfile belongs to a live process */ + fprintf(stderr, "Lock file \"%s\" already exists.\n", + filename); + if (isDDLock) + fprintf(stderr, + "Is another %s (pid %d) running in \"%s\"?\n", + (encoded_pid < 0 ? "postgres" : "postmaster"), + other_pid, refName); + else + fprintf(stderr, + "Is another %s (pid %d) using \"%s\"?\n", + (encoded_pid < 0 ? "postgres" : "postmaster"), + other_pid, refName); + return false; + } + } + + /* + * No, the creating process did not exist. However, it could be that + * the postmaster crashed (or more likely was kill -9'd by a clueless + * admin) but has left orphan backends behind. Check for this by + * looking to see if there is an associated shmem segment that is + * still in use. + */ + if (isDDLock) + { + char *ptr; + unsigned long shmKey, + shmId; + + ptr = strchr(buffer, '\n'); + if (ptr != NULL && + (ptr = strchr(ptr+1, '\n')) != NULL) + { + ptr++; + if (sscanf(ptr, "%lu %lu", &shmKey, &shmId) == 2) + { + if (SharedMemoryIsInUse((IpcMemoryKey) shmKey, + (IpcMemoryId) shmId)) + { + fprintf(stderr, + "Found a pre-existing shared memory block (ID %d) still in use.\n" + "If you're sure there are no old backends still running,\n" + "remove the shared memory block with ipcrm(1), or just\n" + "delete \"%s\".\n", + (int) shmId, filename); + return false; + } + } + } } /* - * No, the process did not exist. Unlink the file and try again to + * Looks like nobody's home. Unlink the file and try again to * create it. Need a loop because of possible race condition against * other would-be creators. */ @@ -576,28 +632,19 @@ CreateLockFile(const char *filename, bool amPostmaster) */ on_proc_exit(UnlinkLockFile, PointerGetDatum(strdup(filename))); - return 0; /* Success! */ + return true; /* Success! */ } bool CreateDataDirLockFile(const char *datadir, bool amPostmaster) { char lockfile[MAXPGPATH]; - int encoded_pid; snprintf(lockfile, sizeof(lockfile), "%s/postmaster.pid", datadir); - encoded_pid = CreateLockFile(lockfile, amPostmaster); - if (encoded_pid != 0) - { - fprintf(stderr, "Lock file \"%s\" already exists.\n", lockfile); - if (encoded_pid < 0) - fprintf(stderr, "Is another postgres (pid %d) running in \"%s\"?\n", - -encoded_pid, datadir); - else - fprintf(stderr, "Is another postmaster (pid %d) running in \"%s\"?\n", - encoded_pid, datadir); + if (! CreateLockFile(lockfile, amPostmaster, true, datadir)) return false; - } + /* Save name of lockfile for RecordSharedMemoryInLockFile */ + strcpy(directoryLockFile, lockfile); return true; } @@ -605,21 +652,10 @@ bool CreateSocketLockFile(const char *socketfile, bool amPostmaster) { char lockfile[MAXPGPATH]; - int encoded_pid; snprintf(lockfile, sizeof(lockfile), "%s.lock", socketfile); - encoded_pid = CreateLockFile(lockfile, amPostmaster); - if (encoded_pid != 0) - { - fprintf(stderr, "Lock file \"%s\" already exists.\n", lockfile); - if (encoded_pid < 0) - fprintf(stderr, "Is another postgres (pid %d) using \"%s\"?\n", - -encoded_pid, socketfile); - else - fprintf(stderr, "Is another postmaster (pid %d) using \"%s\"?\n", - encoded_pid, socketfile); + if (! CreateLockFile(lockfile, amPostmaster, false, socketfile)) return false; - } /* Save name of lockfile for TouchSocketLockFile */ strcpy(socketLockFile, lockfile); return true; @@ -650,6 +686,78 @@ TouchSocketLockFile(void) } } +/* + * Append information about a shared memory segment to the data directory + * lock file (if we have created one). + * + * This may be called multiple times in the life of a postmaster, if we + * delete and recreate shmem due to backend crash. Therefore, be prepared + * to overwrite existing information. (As of 7.1, a postmaster only creates + * one shm seg anyway; but for the purposes here, if we did have more than + * one then any one of them would do anyway.) + */ +void +RecordSharedMemoryInLockFile(IpcMemoryKey shmKey, IpcMemoryId shmId) +{ + int fd; + int len; + char *ptr; + char buffer[BLCKSZ]; + + /* + * Do nothing if we did not create a lockfile (probably because we + * are running standalone). + */ + if (directoryLockFile[0] == '\0') + return; + + fd = open(directoryLockFile, O_RDWR | PG_BINARY, 0); + if (fd < 0) + { + elog(DEBUG, "Failed to rewrite %s: %m", directoryLockFile); + return; + } + len = read(fd, buffer, sizeof(buffer) - 100); + if (len <= 0) + { + elog(DEBUG, "Failed to read %s: %m", directoryLockFile); + close(fd); + return; + } + buffer[len] = '\0'; + /* + * Skip over first two lines (PID and path). + */ + ptr = strchr(buffer, '\n'); + if (ptr == NULL || + (ptr = strchr(ptr+1, '\n')) == NULL) + { + elog(DEBUG, "Bogus data in %s", directoryLockFile); + close(fd); + return; + } + ptr++; + /* + * Append shm key and ID. Format to try to keep it the same length + * always (trailing junk won't hurt, but might confuse humans). + */ + sprintf(ptr, "%9lu %9lu\n", + (unsigned long) shmKey, (unsigned long) shmId); + /* + * And rewrite the data. Since we write in a single kernel call, + * this update should appear atomic to onlookers. + */ + len = strlen(buffer); + if (lseek(fd, (off_t) 0, SEEK_SET) != 0 || + (int) write(fd, buffer, len) != len) + { + elog(DEBUG, "Failed to write %s: %m", directoryLockFile); + close(fd); + return; + } + close(fd); +} + /*------------------------------------------------------------------------- * Version checking support diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index b2853917ac..7e6769d3c8 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -4,7 +4,7 @@ * Support for grand unified configuration scheme, including SET * command, configuration file, and command line options. * - * $Header: /cvsroot/pgsql/src/backend/utils/misc/guc.c,v 1.31 2001/02/26 00:50:07 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/utils/misc/guc.c,v 1.32 2001/03/13 01:17:06 tgl Exp $ * * Copyright 2000 by PostgreSQL Global Development Group * Written by Peter Eisentraut . @@ -36,6 +36,7 @@ /* XXX should be in a header file */ extern bool Log_connections; +extern int CheckPointSegments; extern int CheckPointTimeout; extern int XLOGbuffers; extern int XLOGfiles; @@ -279,13 +280,16 @@ ConfigureNamesInt[] = {"unix_socket_permissions", PGC_POSTMASTER, &Unix_socket_permissions, 0777, 0000, 0777}, - {"checkpoint_timeout", PGC_POSTMASTER, &CheckPointTimeout, + {"checkpoint_segments", PGC_SIGHUP, &CheckPointSegments, + 3, 1, INT_MAX}, + + {"checkpoint_timeout", PGC_SIGHUP, &CheckPointTimeout, 300, 30, 3600}, {"wal_buffers", PGC_POSTMASTER, &XLOGbuffers, 8, 4, INT_MAX}, - {"wal_files", PGC_POSTMASTER, &XLOGfiles, + {"wal_files", PGC_SIGHUP, &XLOGfiles, 0, 0, 64}, {"wal_debug", PGC_SUSET, &XLOG_DEBUG, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index f599d97cff..88d1fe9437 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -110,6 +110,7 @@ #wal_debug = 0 # range 0-16 #commit_delay = 0 # range 0-100000 #commit_siblings = 5 # range 1-1000 +#checkpoint_segments = 3 # in logfile segments (16MB each), min 1 #checkpoint_timeout = 300 # in seconds, range 30-3600 diff --git a/src/include/access/transam.h b/src/include/access/transam.h index 620f6e5910..460de69988 100644 --- a/src/include/access/transam.h +++ b/src/include/access/transam.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: transam.h,v 1.29 2001/01/24 19:43:19 momjian Exp $ + * $Id: transam.h,v 1.30 2001/03/13 01:17:06 tgl Exp $ * * NOTES * Transaction System Version 101 now support proper oid @@ -79,7 +79,7 @@ typedef unsigned char XidStatus;/* (2 bits) */ * their numbering at 512. * * The first 4 bytes of this relation store the version - * number of the transction system. + * number of the transaction system. * ---------------- */ typedef struct LogRelationContentsData @@ -100,13 +100,16 @@ typedef LogRelationContentsData *LogRelationContents; * is updated in place whenever the variables change. * * The first 4 bytes of this relation store the version - * number of the transction system. + * number of the transaction system. * * Currently, the relation has only one page and the next * available xid, the last committed xid and the next * available oid are stored there. + * + * XXX As of 7.1, pg_variable isn't used anymore; this is dead code. * ---------------- */ +#ifdef NOT_USED typedef struct VariableRelationContentsData { XLogRecPtr LSN; @@ -117,6 +120,7 @@ typedef struct VariableRelationContentsData } VariableRelationContentsData; typedef VariableRelationContentsData *VariableRelationContents; +#endif /* NOT_USED */ /* * VariableCache is placed in shmem and used by @@ -124,8 +128,9 @@ typedef VariableRelationContentsData *VariableRelationContents; */ typedef struct VariableCacheData { - TransactionId nextXid; - Oid nextOid; + TransactionId nextXid; /* next XID to assign */ + uint32 xidCount; /* XIDs available before must do XLOG work */ + Oid nextOid; /* and similarly for OIDs */ uint32 oidCount; } VariableCacheData; @@ -184,7 +189,8 @@ extern int RecoveryCheckingEnableState; extern bool AMI_OVERRIDE; /* in varsup.c */ -extern int OidGenLockId; +extern SPINLOCK OidGenLockId; +extern SPINLOCK XidGenLockId; extern VariableCache ShmemVariableCache; #endif /* TRAMSAM_H */ diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index c17bf32cc5..dd079496ed 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -3,7 +3,10 @@ * * PostgreSQL transaction log manager * - * $Header: /cvsroot/pgsql/src/include/access/xlog.h,v 1.18 2001/02/26 00:50:07 tgl Exp $ + * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * $Id: xlog.h,v 1.19 2001/03/13 01:17:06 tgl Exp $ */ #ifndef XLOG_H #define XLOG_H @@ -11,97 +14,126 @@ #include "access/rmgr.h" #include "access/transam.h" #include "access/xlogdefs.h" -#include "access/xlogutils.h" +#include "utils/pg_crc.h" -typedef struct crc64 -{ - uint32 crc1; - uint32 crc2; -} crc64; +/* + * Header for each record in XLOG + * + * NOTE: xl_len counts only the rmgr data, not the XLogRecord header, + * and also not any backup blocks appended to the record (which are signaled + * by xl_info flag bits). The total space needed for an XLOG record is + * really: + * + * SizeOfXLogRecord + xl_len + n_backup_blocks * (sizeof(BkpBlock) + BLCKSZ) + */ typedef struct XLogRecord { - crc64 xl_crc; + crc64 xl_crc; /* CRC for this record */ XLogRecPtr xl_prev; /* ptr to previous record in log */ XLogRecPtr xl_xact_prev; /* ptr to previous record of this xact */ TransactionId xl_xid; /* xact id */ - uint16 xl_len; /* total len of record *data* */ - uint8 xl_info; - RmgrId xl_rmid; /* resource manager inserted this record */ + uint16 xl_len; /* total len of rmgr data */ + uint8 xl_info; /* flag bits, see below */ + RmgrId xl_rmid; /* resource manager for this record */ /* ACTUAL LOG DATA FOLLOWS AT END OF STRUCT */ } XLogRecord; -#define SizeOfXLogRecord DOUBLEALIGN(sizeof(XLogRecord)) -#define MAXLOGRECSZ (2 * BLCKSZ) +#define SizeOfXLogRecord MAXALIGN(sizeof(XLogRecord)) +#define MAXLOGRECSZ 65535 /* the most that'll fit in xl_len */ -#define XLogRecGetData(record) \ - ((char*)record + SizeOfXLogRecord) +#define XLogRecGetData(record) ((char*) (record) + SizeOfXLogRecord) /* - * When there is no space on current page we continue - * on the next page with subrecord. + * XLOG uses only low 4 bits of xl_info. High 4 bits may be used by rmgr. */ -typedef struct XLogSubRecord -{ - uint16 xl_len; /* len of data left */ - - /* ACTUAL LOG DATA FOLLOWS AT END OF STRUCT */ - -} XLogSubRecord; - -#define SizeOfXLogSubRecord DOUBLEALIGN(sizeof(XLogSubRecord)) +#define XLR_INFO_MASK 0x0F /* - * XLOG uses only low 4 bits of xl_info. - * High 4 bits may be used by rmgr... - * - * We support backup of 2 blocks per record only. - * If we backed up some of these blocks then we use - * flags below to signal rmgr about this on recovery. + * We support backup of up to 2 disk blocks per XLOG record (could support + * more if we cared to dedicate more xl_info bits for this purpose; currently + * do not need more than 2 anyway). If we backed up any disk blocks then we + * use flag bits in xl_info to signal it. */ -#define XLR_SET_BKP_BLOCK(iblk) (0x08 >> iblk) +#define XLR_BKP_BLOCK_MASK 0x0C /* all info bits used for bkp blocks */ +#define XLR_MAX_BKP_BLOCKS 2 +#define XLR_SET_BKP_BLOCK(iblk) (0x08 >> (iblk)) #define XLR_BKP_BLOCK_1 XLR_SET_BKP_BLOCK(0) /* 0x08 */ #define XLR_BKP_BLOCK_2 XLR_SET_BKP_BLOCK(1) /* 0x04 */ -#define XLR_INFO_MASK 0x0F /* * Sometimes we log records which are out of transaction control. - * Rmgr may use flag below for this purpose. + * Rmgr may "or" XLOG_NO_TRAN into info passed to XLogInsert to indicate this. */ #define XLOG_NO_TRAN XLR_INFO_MASK -#define XLOG_PAGE_MAGIC 0x17345168 +/* + * Header info for a backup block appended to an XLOG record. + * + * Note that the backup block has its own CRC, and is not covered by + * the CRC of the XLOG record proper. Also note that we don't attempt + * to align either the BkpBlock struct or the block's data. + */ +typedef struct BkpBlock +{ + crc64 crc; + RelFileNode node; + BlockNumber block; +} BkpBlock; -typedef struct XLogPageHeaderData +/* + * When there is not enough space on current page for whole record, we + * continue on the next page with continuation record. (However, the + * XLogRecord header will never be split across pages; if there's less than + * SizeOfXLogRecord space left at the end of a page, we just waste it.) + * + * Note that xl_rem_len includes backup-block data, unlike xl_len in the + * initial header. + */ +typedef struct XLogContRecord { - uint32 xlp_magic; - uint16 xlp_info; -} XLogPageHeaderData; + uint32 xl_rem_len; /* total len of remaining data for record */ -#define SizeOfXLogPHD DOUBLEALIGN(sizeof(XLogPageHeaderData)) + /* ACTUAL LOG DATA FOLLOWS AT END OF STRUCT */ -typedef XLogPageHeaderData *XLogPageHeader; +} XLogContRecord; -/* When record crosses page boundary */ -#define XLP_FIRST_IS_SUBRECORD 0x0001 +#define SizeOfXLogContRecord MAXALIGN(sizeof(XLogContRecord)) -#define XLByteLT(left, right) \ - (right.xlogid > left.xlogid || \ - (right.xlogid == left.xlogid && right.xrecoff > left.xrecoff)) +/* + * Each page of XLOG file has a header like this: + */ +#define XLOG_PAGE_MAGIC 0x17345169 /* can be used as WAL version indicator */ -#define XLByteLE(left, right) \ - (right.xlogid > left.xlogid || \ - (right.xlogid == left.xlogid && right.xrecoff >= left.xrecoff)) +typedef struct XLogPageHeaderData +{ + uint32 xlp_magic; /* magic value for correctness checks */ + uint16 xlp_info; /* flag bits, see below */ +} XLogPageHeaderData; -#define XLByteEQ(left, right) \ - (right.xlogid == left.xlogid && right.xrecoff == left.xrecoff) +#define SizeOfXLogPHD MAXALIGN(sizeof(XLogPageHeaderData)) -extern StartUpID ThisStartUpID; /* current SUI */ -extern bool InRecovery; -extern XLogRecPtr MyLastRecPtr; +typedef XLogPageHeaderData *XLogPageHeader; + +/* When record crosses page boundary, set this flag in new page's header */ +#define XLP_FIRST_IS_CONTRECORD 0x0001 + +/* + * We break each logical log file (xlogid value) into 16Mb segments. + * One possible segment at the end of each log file is wasted, to ensure + * that we don't have problems representing last-byte-position-plus-1. + */ +#define XLogSegSize ((uint32) (16*1024*1024)) +#define XLogSegsPerFile (((uint32) 0xffffffff) / XLogSegSize) +#define XLogFileSize (XLogSegsPerFile * XLogSegSize) +/* + * Method table for resource managers. + * + * RmgrTable[] is indexed by RmgrId values (see rmgr.h). + */ typedef struct RmgrData { char *rm_name; @@ -112,12 +144,19 @@ typedef struct RmgrData extern RmgrData RmgrTable[]; -/* - * List of these structs is used to pass data to XLOG. - * If buffer is valid then XLOG will check if buffer must - * be backup-ed. For backup-ed buffer data will not be - * inserted into record (and XLOG sets - * XLR_BKP_BLOCK_X bit in xl_info). +/*-------------------- + * List of these structs is used to pass data to XLogInsert(). + * + * If buffer is valid then XLOG will check if buffer must be backed up + * (ie, whether this is first change of that page since last checkpoint). + * If so, the whole page contents are attached to the XLOG record, and XLOG + * sets XLR_BKP_BLOCK_X bit in xl_info. Note that the buffer must be pinned + * and locked while this is going on, so that it won't change under us. + * NB: when this happens, we do not bother to insert the associated data into + * the XLOG record, since we assume it's present in the buffer. Therefore, + * rmgr redo routines MUST pay attention to XLR_BKP_BLOCK_X to know what + * is actually stored in the XLOG record. + *-------------------- */ typedef struct XLogRecData { @@ -127,11 +166,13 @@ typedef struct XLogRecData struct XLogRecData *next; } XLogRecData; +extern StartUpID ThisStartUpID; /* current SUI */ +extern bool InRecovery; +extern XLogRecPtr MyLastRecPtr; + extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata); extern void XLogFlush(XLogRecPtr RecPtr); -extern void CreateCheckPoint(bool shutdown); - extern void xlog_redo(XLogRecPtr lsn, XLogRecord *record); extern void xlog_undo(XLogRecPtr lsn, XLogRecord *record); extern void xlog_desc(char *buf, uint8 xl_info, char* rec); @@ -145,6 +186,10 @@ extern void StartupXLOG(void); extern void ShutdownXLOG(void); extern void CreateCheckPoint(bool shutdown); extern void SetThisStartUpID(void); +extern void XLogPutNextXid(TransactionId nextXid); +extern void XLogPutNextOid(Oid nextOid); +extern void SetRedoRecPtr(void); +extern void GetRedoRecPtr(void); /* in storage/ipc/sinval.c, but don't want to declare in sinval.h because * we'd have to include xlog.h into that ... diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h index ce1b3ef8cf..bc7f9e1a36 100644 --- a/src/include/access/xlogdefs.h +++ b/src/include/access/xlogdefs.h @@ -1,20 +1,55 @@ /* - * * xlogdefs.h * * Postgres transaction log manager record pointer and - * system stratup number definitions + * system startup number definitions + * + * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California * + * $Id: xlogdefs.h,v 1.2 2001/03/13 01:17:06 tgl Exp $ */ #ifndef XLOG_DEFS_H #define XLOG_DEFS_H +/* + * Pointer to a location in the XLOG. These pointers are 64 bits wide, + * because we don't want them ever to overflow. + * + * NOTE: xrecoff == 0 is used to indicate an invalid pointer. This is OK + * because we use page headers in the XLOG, so no XLOG record can start + * right at the beginning of a file. + * + * NOTE: the "log file number" is somewhat misnamed, since the actual files + * making up the XLOG are much smaller than 4Gb. Each actual file is an + * XLogSegSize-byte "segment" of a logical log file having the indicated + * xlogid. The log file number and segment number together identify a + * physical XLOG file. Segment number and offset within the physical file + * are computed from xrecoff div and mod XLogSegSize. + */ typedef struct XLogRecPtr { uint32 xlogid; /* log file #, 0 based */ - uint32 xrecoff; /* offset of record in log file */ + uint32 xrecoff; /* byte offset of location in log file */ } XLogRecPtr; +/* + * Macros for comparing XLogRecPtrs + * + * Beware of passing expressions with side-effects to these macros, + * since the arguments may be evaluated multiple times. + */ +#define XLByteLT(a, b) \ + ((a).xlogid < (b).xlogid || \ + ((a).xlogid == (b).xlogid && (a).xrecoff < (b).xrecoff)) + +#define XLByteLE(a, b) \ + ((a).xlogid < (b).xlogid || \ + ((a).xlogid == (b).xlogid && (a).xrecoff <= (b).xrecoff)) + +#define XLByteEQ(a, b) \ + ((a).xlogid == (b).xlogid && (a).xrecoff == (b).xrecoff) + /* * StartUpID (SUI) - system startups counter. It's to allow removing * pg_log after shutdown, in future. diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index 016381f0d2..1f1fff7c07 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -1,3 +1,13 @@ +/* + * xlogutils.h + * + * PostgreSQL transaction log manager utility routines + * + * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * $Id: xlogutils.h,v 1.6 2001/03/13 01:17:06 tgl Exp $ + */ #ifndef XLOG_UTILS_H #define XLOG_UTILS_H diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h new file mode 100644 index 0000000000..97d0e13e78 --- /dev/null +++ b/src/include/catalog/pg_control.h @@ -0,0 +1,115 @@ +/*------------------------------------------------------------------------- + * + * pg_control.h + * The system control file "pg_control" is not a heap relation. + * However, we define it here so that the format is documented. + * + * + * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * $Id: pg_control.h,v 1.1 2001/03/13 01:17:06 tgl Exp $ + * + *------------------------------------------------------------------------- + */ +#ifndef PG_CONTROL_H +#define PG_CONTROL_H + +#include + +#include "access/xlogdefs.h" +#include "utils/pg_crc.h" + + +/* Version identifier for this pg_control format */ +#define PG_CONTROL_VERSION 71 + +/* + * Body of CheckPoint XLOG records. This is declared here because we keep + * a copy of the latest one in pg_control for possible disaster recovery. + */ +typedef struct CheckPoint +{ + XLogRecPtr redo; /* next RecPtr available when we */ + /* began to create CheckPoint */ + /* (i.e. REDO start point) */ + XLogRecPtr undo; /* first record of oldest in-progress */ + /* transaction when we started */ + /* (i.e. UNDO end point) */ + StartUpID ThisStartUpID; /* current SUI */ + TransactionId nextXid; /* next free XID */ + Oid nextOid; /* next free OID */ + time_t time; /* time stamp of checkpoint */ +} CheckPoint; + +/* XLOG info values for XLOG rmgr */ +#define XLOG_CHECKPOINT_SHUTDOWN 0x00 +#define XLOG_CHECKPOINT_ONLINE 0x10 +#define XLOG_NEXTXID 0x20 +#define XLOG_NEXTOID 0x30 + + +/* System status indicator */ +typedef enum DBState +{ + DB_STARTUP = 0, + DB_SHUTDOWNED, + DB_SHUTDOWNING, + DB_IN_RECOVERY, + DB_IN_PRODUCTION +} DBState; + +#define LOCALE_NAME_BUFLEN 128 + +/* + * Contents of pg_control. + * + * NOTE: try to keep this under 512 bytes so that it will fit on one physical + * sector of typical disk drives. This reduces the odds of corruption due to + * power failure midway through a write. Currently it fits comfortably, + * but we could probably reduce LOCALE_NAME_BUFLEN if things get tight. + */ + +typedef struct ControlFileData +{ + crc64 crc; /* CRC for remainder of struct */ + + /* + * Version identifier information. Keep these fields at the front, + * especially pg_control_version; they won't be real useful if they + * move around. + * + * pg_control_version identifies the format of pg_control itself. + * catalog_version_no identifies the format of the system catalogs. + * + * There are additional version identifiers in individual files; + * for example, WAL logs contain per-page magic numbers that can serve + * as version cues for the WAL log. + */ + uint32 pg_control_version; /* PG_CONTROL_VERSION */ + uint32 catalog_version_no; /* see catversion.h */ + + /* + * System status data + */ + DBState state; /* see enum above */ + time_t time; /* time stamp of last pg_control update */ + uint32 logId; /* current log file id */ + uint32 logSeg; /* current log file segment, + 1 */ + XLogRecPtr checkPoint; /* last check point record ptr */ + XLogRecPtr prevCheckPoint; /* previous check point record ptr */ + + CheckPoint checkPointCopy; /* copy of last check point record */ + + /* + * This data is used to make sure that configuration of this database + * is compatible with the backend executable. + */ + uint32 blcksz; /* block size for this DB */ + uint32 relseg_size; /* blocks per segment of large relation */ + /* active locales --- "C" if compiled without USE_LOCALE: */ + char lc_collate[LOCALE_NAME_BUFLEN]; + char lc_ctype[LOCALE_NAME_BUFLEN]; +} ControlFileData; + +#endif /* PG_CONTROL_H */ diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 46d7ab534e..851174f396 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -12,7 +12,7 @@ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: miscadmin.h,v 1.81 2001/02/10 02:31:28 tgl Exp $ + * $Id: miscadmin.h,v 1.82 2001/03/13 01:17:06 tgl Exp $ * * NOTES * some of the information in this file should be moved to @@ -283,6 +283,8 @@ extern ProcessingMode Mode; extern bool CreateDataDirLockFile(const char *datadir, bool amPostmaster); extern bool CreateSocketLockFile(const char *socketfile, bool amPostmaster); extern void TouchSocketLockFile(void); +extern void RecordSharedMemoryInLockFile(IpcMemoryKey shmKey, + IpcMemoryId shmId); extern void ValidatePgVersion(const char *path); diff --git a/src/include/storage/ipc.h b/src/include/storage/ipc.h index 3173121850..f1e1e6096c 100644 --- a/src/include/storage/ipc.h +++ b/src/include/storage/ipc.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: ipc.h,v 1.47 2001/02/10 02:31:28 tgl Exp $ + * $Id: ipc.h,v 1.48 2001/03/13 01:17:06 tgl Exp $ * * Some files that would normally need to include only sys/ipc.h must * instead include this file because on Ultrix, sys/ipc.h is not designed @@ -105,6 +105,8 @@ extern int IpcSemaphoreGetValue(IpcSemaphoreId semId, int sem); extern PGShmemHeader *IpcMemoryCreate(uint32 size, bool makePrivate, int permission); +extern bool SharedMemoryIsInUse(IpcMemoryKey shmKey, IpcMemoryId shmId); + /* ipci.c */ extern void CreateSharedMemoryAndSemaphores(bool makePrivate, int maxBackends); diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h index 52d719c6c5..94af6e7113 100644 --- a/src/include/tcop/tcopprot.h +++ b/src/include/tcop/tcopprot.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: tcopprot.h,v 1.38 2001/01/24 19:43:28 momjian Exp $ + * $Id: tcopprot.h,v 1.39 2001/03/13 01:17:06 tgl Exp $ * * OLD COMMENTS * This file was created so that other c files could get the two @@ -41,6 +41,7 @@ extern void pg_exec_query_string(char *query_string, #endif /* BOOTSTRAP_INCLUDE */ extern void die(SIGNAL_ARGS); +extern void quickdie(SIGNAL_ARGS); extern int PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[], const char *username); extern void ResetUsage(void); diff --git a/src/include/utils/pg_crc.h b/src/include/utils/pg_crc.h new file mode 100644 index 0000000000..18e012bbcd --- /dev/null +++ b/src/include/utils/pg_crc.h @@ -0,0 +1,115 @@ +/* + * pg_crc.h + * + * PostgreSQL 64-bit CRC support + * + * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * $Id: pg_crc.h,v 1.1 2001/03/13 01:17:06 tgl Exp $ + */ +#ifndef PG_CRC_H +#define PG_CRC_H + +/* + * If we have a 64-bit integer type, then a 64-bit CRC looks just like the + * usual sort of implementation. (See Ross Williams' excellent introduction + * A PAINLESS GUIDE TO CRC ERROR DETECTION ALGORITHMS, available from + * ftp://ftp.rocksoft.com/papers/crc_v3.txt or several other net sites.) + * If we have no working 64-bit type, then fake it with two 32-bit registers. + * + * The present implementation is a normal (not "reflected", in Williams' + * terms) 64-bit CRC, using initial all-ones register contents and a final + * bit inversion. The chosen polynomial is borrowed from the DLT1 spec + * (ECMA-182, available from http://www.ecma.ch/ecma1/STAND/ECMA-182.HTM): + * + * x^64 + x^62 + x^57 + x^55 + x^54 + x^53 + x^52 + x^47 + x^46 + x^45 + + * x^40 + x^39 + x^38 + x^37 + x^35 + x^33 + x^32 + x^31 + x^29 + x^27 + + * x^24 + x^23 + x^22 + x^21 + x^19 + x^17 + x^13 + x^12 + x^10 + x^9 + + * x^7 + x^4 + x + 1 + */ + +#ifdef INT64_IS_BUSTED + +/* + * crc0 represents the LSBs of the 64-bit value, crc1 the MSBs. Note that + * with crc0 placed first, the output of 32-bit and 64-bit implementations + * will be bit-compatible only on little-endian architectures. If it were + * important to make the two possible implementations bit-compatible on + * all machines, we could do a configure test to decide how to order the + * two fields, but it seems not worth the trouble. + */ +typedef struct crc64 +{ + uint32 crc0; + uint32 crc1; +} crc64; + +/* Initialize a CRC accumulator */ +#define INIT_CRC64(crc) ((crc).crc0 = 0xffffffff, (crc).crc1 = 0xffffffff) + +/* Finish a CRC calculation */ +#define FIN_CRC64(crc) ((crc).crc0 ^= 0xffffffff, (crc).crc1 ^= 0xffffffff) + +/* Accumulate some (more) bytes into a CRC */ +#define COMP_CRC64(crc, data, len) \ +do { \ + uint32 __crc0 = (crc).crc0; \ + uint32 __crc1 = (crc).crc1; \ + unsigned char *__data = (unsigned char *) (data); \ + uint32 __len = (len); \ +\ + while (__len-- > 0) \ + { \ + int __tab_index = ((int) (__crc1 >> 24) ^ *__data++) & 0xFF; \ + __crc1 = crc_table1[__tab_index] ^ ((__crc1 << 8) | (__crc0 >> 24)); \ + __crc0 = crc_table0[__tab_index] ^ (__crc0 << 8); \ + } \ + (crc).crc0 = __crc0; \ + (crc).crc1 = __crc1; \ +} while (0) + +/* Check for equality of two CRCs */ +#define EQ_CRC64(c1,c2) ((c1).crc0 == (c2).crc0 && (c1).crc1 == (c2).crc1) + +/* Constant table for CRC calculation */ +extern const uint32 crc_table0[]; +extern const uint32 crc_table1[]; + +#else /* int64 works */ + +typedef struct crc64 +{ + uint64 crc0; +} crc64; + +/* Initialize a CRC accumulator */ +#define INIT_CRC64(crc) ((crc).crc0 = (uint64) 0xffffffffffffffff) + +/* Finish a CRC calculation */ +#define FIN_CRC64(crc) ((crc).crc0 ^= (uint64) 0xffffffffffffffff) + +/* Accumulate some (more) bytes into a CRC */ +#define COMP_CRC64(crc, data, len) \ +do { \ + uint64 __crc0 = (crc).crc0; \ + unsigned char *__data = (unsigned char *) (data); \ + uint32 __len = (len); \ +\ + while (__len-- > 0) \ + { \ + int __tab_index = ((int) (__crc0 >> 56) ^ *__data++) & 0xFF; \ + __crc0 = crc_table[__tab_index] ^ (__crc0 << 8); \ + } \ + (crc).crc0 = __crc0; \ +} while (0) + +/* Check for equality of two CRCs */ +#define EQ_CRC64(c1,c2) ((c1).crc0 == (c2).crc0) + +/* Constant table for CRC calculation */ +extern const uint64 crc_table[]; + +#endif /* INT64_IS_BUSTED */ + +#endif /* PG_CRC_H */ -- 2.40.0