* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/transam/twophase.c,v 1.2 2005/06/18 05:21:09 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/twophase.c,v 1.9 2005/07/31 17:19:17 tgl Exp $
*
* NOTES
* Each global transaction is associated with a global transaction
*/
#include "postgres.h"
-#include <sys/types.h>
-#include <sys/stat.h>
#include <fcntl.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <time.h>
#include <unistd.h>
#include "access/heapam.h"
#include "catalog/pg_type.h"
#include "funcapi.h"
#include "miscadmin.h"
+#include "pgstat.h"
#include "storage/fd.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/smgr.h"
#include "utils/builtins.h"
-#include "pgstat.h"
/*
typedef struct GlobalTransactionData
{
PGPROC proc; /* dummy proc */
- AclId owner; /* ID of user that executed the xact */
+ TimestampTz prepared_at; /* time of preparation */
+ XLogRecPtr prepare_lsn; /* XLOG offset of prepare record */
+ Oid owner; /* ID of user that executed the xact */
TransactionId locking_xid; /* top-level XID of backend working on xact */
bool valid; /* TRUE if fully prepared */
char gid[GIDSIZE]; /* The GID assigned to the prepared xact */
* assuming that we can use very much backend context.
*/
GlobalTransaction
-MarkAsPreparing(TransactionId xid, Oid databaseid, char *gid, AclId owner)
+MarkAsPreparing(TransactionId xid, const char *gid,
+ TimestampTz prepared_at, Oid owner, Oid databaseid)
{
GlobalTransaction gxact;
int i;
gxact->proc.xmin = InvalidTransactionId;
gxact->proc.pid = 0;
gxact->proc.databaseId = databaseid;
+ gxact->proc.roleId = owner;
gxact->proc.lwWaiting = false;
gxact->proc.lwExclusive = false;
gxact->proc.lwWaitLink = NULL;
gxact->proc.subxids.overflowed = false;
gxact->proc.subxids.nxids = 0;
+ gxact->prepared_at = prepared_at;
+ /* initialize LSN to 0 (start of WAL) */
+ gxact->prepare_lsn.xlogid = 0;
+ gxact->prepare_lsn.xrecoff = 0;
gxact->owner = owner;
gxact->locking_xid = xid;
gxact->valid = false;
* MarkAsPrepared
* Mark the GXACT as fully valid, and enter it into the global ProcArray.
*/
-void
+static void
MarkAsPrepared(GlobalTransaction gxact)
{
/* Lock here may be overkill, but I'm not convinced of that ... */
* Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
*/
static GlobalTransaction
-LockGXact(char *gid, AclId user)
+LockGXact(const char *gid, Oid user)
{
int i;
elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
}
+/*
+ * TransactionIdIsPrepared
+ * True iff transaction associated with the identifier is prepared
+ * for two-phase commit
+ *
+ * Note: only gxacts marked "valid" are considered; but notice we do not
+ * check the locking status.
+ *
+ * This is not currently exported, because it is only needed internally.
+ */
+static bool
+TransactionIdIsPrepared(TransactionId xid)
+{
+ bool result = false;
+ int i;
+
+ LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+
+ for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+ {
+ GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+
+ if (gxact->valid && gxact->proc.xid == xid)
+ {
+ result = true;
+ break;
+ }
+ }
+
+ LWLockRelease(TwoPhaseStateLock);
+
+ return result;
+}
+
/*
* Returns an array of all prepared transactions for the user-level
* function pg_prepared_xact.
/* build tupdesc for result tuples */
/* this had better match pg_prepared_xacts view in system_views.sql */
- tupdesc = CreateTemplateTupleDesc(4, false);
+ tupdesc = CreateTemplateTupleDesc(5, false);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
XIDOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
TEXTOID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 3, "ownerid",
- INT4OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 4, "dbid",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
+ TIMESTAMPTZOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
+ OIDOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
OIDOID, -1, 0);
funcctx->tuple_desc = BlessTupleDesc(tupdesc);
while (status->array != NULL && status->currIdx < status->ngxacts)
{
GlobalTransaction gxact = &status->array[status->currIdx++];
- Datum values[4];
- bool nulls[4];
+ Datum values[5];
+ bool nulls[5];
HeapTuple tuple;
Datum result;
values[0] = TransactionIdGetDatum(gxact->proc.xid);
values[1] = DirectFunctionCall1(textin, CStringGetDatum(gxact->gid));
- values[2] = Int32GetDatum(gxact->owner);
- values[3] = ObjectIdGetDatum(gxact->proc.databaseId);
+ values[2] = TimestampTzGetDatum(gxact->prepared_at);
+ values[3] = ObjectIdGetDatum(gxact->owner);
+ values[4] = ObjectIdGetDatum(gxact->proc.databaseId);
tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
result = HeapTupleGetDatum(tuple);
/************************************************************************/
#define TwoPhaseFilePath(path, xid) \
- snprintf(path, MAXPGPATH, "%s/%s/%08X", DataDir, TWOPHASE_DIR, xid)
+ snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)
/*
* 2PC state file format:
/*
* Header for a 2PC state file
*/
-#define TWOPHASE_MAGIC 0x57F94530 /* format identifier */
+#define TWOPHASE_MAGIC 0x57F94531 /* format identifier */
typedef struct TwoPhaseFileHeader
{
uint32 total_len; /* actual file length */
TransactionId xid; /* original transaction XID */
Oid database; /* OID of database it was in */
- AclId owner; /* user running the transaction */
+ TimestampTz prepared_at; /* time of preparation */
+ Oid owner; /* user running the transaction */
int32 nsubxacts; /* number of following subxact XIDs */
int32 ncommitrels; /* number of delete-on-commit rels */
int32 nabortrels; /* number of delete-on-abort rels */
hdr.magic = TWOPHASE_MAGIC;
hdr.total_len = 0; /* EndPrepare will fill this in */
hdr.xid = xid;
- hdr.database = MyDatabaseId;
- hdr.owner = GetUserId();
+ hdr.database = gxact->proc.databaseId;
+ hdr.prepared_at = gxact->prepared_at;
+ hdr.owner = gxact->owner;
hdr.nsubxacts = xactGetCommittedChildren(&children);
hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
TwoPhaseFileHeader *hdr;
char path[MAXPGPATH];
XLogRecData *record;
- XLogRecPtr recptr;
pg_crc32 statefile_crc;
pg_crc32 bogus_crc;
int fd;
FIN_CRC32(statefile_crc);
/*
- * Write a deliberately bogus CRC to the state file, and flush it to disk.
- * This is to minimize the odds of failure within the critical section
- * below --- in particular, running out of disk space.
- *
- * On most filesystems, write() rather than fsync() detects out-of-space,
- * so the fsync might be considered optional. Using it means there
- * are three fsyncs not two associated with preparing a transaction; is
- * the risk of an error from fsync high enough to justify that?
+ * Write a deliberately bogus CRC to the state file; this is just
+ * paranoia to catch the case where four more bytes will run us out of
+ * disk space.
*/
bogus_crc = ~ statefile_crc;
errmsg("could not write twophase state file: %m")));
}
- if (pg_fsync(fd) != 0)
- {
- close(fd);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not fsync twophase state file: %m")));
- }
-
/* Back up to prepare for rewriting the CRC */
if (lseek(fd, -((off_t) sizeof(pg_crc32)), SEEK_CUR) < 0)
{
* The state file isn't valid yet, because we haven't written the correct
* CRC yet. Before we do that, insert entry in WAL and flush it to disk.
*
- * Between the time we have written the WAL entry and the time we
- * flush the correct state file CRC to disk, we have an inconsistency:
- * the xact is prepared according to WAL but not according to our on-disk
- * state. We use a critical section to force a PANIC if we are unable to
- * complete the flush --- then, WAL replay should repair the
- * inconsistency.
+ * Between the time we have written the WAL entry and the time we write
+ * out the correct state file CRC, we have an inconsistency: the xact is
+ * prepared according to WAL but not according to our on-disk state.
+ * We use a critical section to force a PANIC if we are unable to complete
+ * the write --- then, WAL replay should repair the inconsistency. The
+ * odds of a PANIC actually occurring should be very tiny given that we
+ * were able to write the bogus CRC above.
*
* We have to lock out checkpoint start here, too; otherwise a checkpoint
* starting immediately after the WAL record is inserted could complete
- * before we've finished flushing, meaning that the WAL record would not
- * get replayed if a crash follows.
+ * without fsync'ing our state file. (This is essentially the same kind
+ * of race condition as the COMMIT-to-clog-write case that
+ * RecordTransactionCommit uses CheckpointStartLock for; see notes there.)
+ *
+ * We save the PREPARE record's location in the gxact for later use by
+ * CheckPointTwoPhase.
*/
START_CRIT_SECTION();
LWLockAcquire(CheckpointStartLock, LW_SHARED);
- recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE, records.head);
- XLogFlush(recptr);
+ gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
+ records.head);
+ XLogFlush(gxact->prepare_lsn);
/* If we crash now, we have prepared: WAL replay will fix things */
- /* write correct CRC, flush, and close file */
+ /* write correct CRC and close file */
if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
{
close(fd);
errmsg("could not write twophase state file: %m")));
}
- if (pg_fsync(fd) != 0)
- {
- close(fd);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not fsync twophase state file: %m")));
- }
-
if (close(fd) != 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not close twophase state file: %m")));
+ /*
+ * Mark the prepared transaction as valid. As soon as xact.c marks
+ * MyProc as not running our XID (which it will do immediately after
+ * this function returns), others can commit/rollback the xact.
+ *
+ * NB: a side effect of this is to make a dummy ProcArray entry for the
+ * prepared XID. This must happen before we clear the XID from MyProc,
+ * else there is a window where the XID is not running according to
+ * TransactionIdInProgress, and onlookers would be entitled to assume
+ * the xact crashed. Instead we have a window where the same XID
+ * appears twice in ProcArray, which is OK.
+ */
+ MarkAsPrepared(gxact);
+
+ /*
+ * Now we can release the checkpoint start lock: a checkpoint starting
+ * after this will certainly see the gxact as a candidate for fsyncing.
+ */
LWLockRelease(CheckpointStartLock);
END_CRIT_SECTION();
* FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
*/
void
-FinishPreparedTransaction(char *gid, bool isCommit)
+FinishPreparedTransaction(const char *gid, bool isCommit)
{
GlobalTransaction gxact;
TransactionId xid;
* In case we fail while running the callbacks, mark the gxact invalid
* so no one else will try to commit/rollback, and so it can be recycled
* properly later. It is still locked by our XID so it won't go away yet.
+ *
+ * (We assume it's safe to do this without taking TwoPhaseStateLock.)
*/
gxact->valid = false;
errmsg("could not write twophase state file: %m")));
}
- /* Sync and close the file */
+ /*
+ * We must fsync the file because the end-of-replay checkpoint will
+ * not do so, there being no GXACT in shared memory yet to tell it to.
+ */
if (pg_fsync(fd) != 0)
{
close(fd);
errmsg("could not close twophase state file: %m")));
}
+/*
+ * CheckPointTwoPhase -- handle 2PC component of checkpointing.
+ *
+ * We must fsync the state file of any GXACT that is valid and has a PREPARE
+ * LSN <= the checkpoint's redo horizon. (If the gxact isn't valid yet or
+ * has a later LSN, this checkpoint is not responsible for fsyncing it.)
+ *
+ * This is deliberately run as late as possible in the checkpoint sequence,
+ * because GXACTs ordinarily have short lifespans, and so it is quite
+ * possible that GXACTs that were valid at checkpoint start will no longer
+ * exist if we wait a little bit.
+ *
+ * If a GXACT remains valid across multiple checkpoints, it'll be fsynced
+ * each time. This is considered unusual enough that we don't bother to
+ * expend any extra code to avoid the redundant fsyncs. (They should be
+ * reasonably cheap anyway, since they won't cause I/O.)
+ */
+void
+CheckPointTwoPhase(XLogRecPtr redo_horizon)
+{
+ TransactionId *xids;
+ int nxids;
+ char path[MAXPGPATH];
+ int i;
+
+ /*
+ * We don't want to hold the TwoPhaseStateLock while doing I/O,
+ * so we grab it just long enough to make a list of the XIDs that
+ * require fsyncing, and then do the I/O afterwards.
+ *
+ * This approach creates a race condition: someone else could delete
+ * a GXACT between the time we release TwoPhaseStateLock and the time
+ * we try to open its state file. We handle this by special-casing
+ * ENOENT failures: if we see that, we verify that the GXACT is no
+ * longer valid, and if so ignore the failure.
+ */
+ if (max_prepared_xacts <= 0)
+ return; /* nothing to do */
+ xids = (TransactionId *) palloc(max_prepared_xacts * sizeof(TransactionId));
+ nxids = 0;
+
+ LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+
+ for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+ {
+ GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+
+ if (gxact->valid &&
+ XLByteLE(gxact->prepare_lsn, redo_horizon))
+ xids[nxids++] = gxact->proc.xid;
+ }
+
+ LWLockRelease(TwoPhaseStateLock);
+
+ for (i = 0; i < nxids; i++)
+ {
+ TransactionId xid = xids[i];
+ int fd;
+
+ TwoPhaseFilePath(path, xid);
+
+ fd = BasicOpenFile(path, O_RDWR | PG_BINARY, 0);
+ if (fd < 0)
+ {
+ if (errno == ENOENT)
+ {
+ /* OK if gxact is no longer valid */
+ if (!TransactionIdIsPrepared(xid))
+ continue;
+ /* Restore errno in case it was changed */
+ errno = ENOENT;
+ }
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open twophase state file \"%s\": %m",
+ path)));
+ }
+
+ if (pg_fsync(fd) != 0)
+ {
+ close(fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not fsync twophase state file \"%s\": %m",
+ path)));
+ }
+
+ if (close(fd) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not close twophase state file \"%s\": %m",
+ path)));
+ }
+
+ pfree(xids);
+}
+
/*
* PrescanPreparedTransactions
*
{
TransactionId origNextXid = ShmemVariableCache->nextXid;
TransactionId result = origNextXid;
- char dir[MAXPGPATH];
DIR *cldir;
struct dirent *clde;
- snprintf(dir, MAXPGPATH, "%s/%s", DataDir, TWOPHASE_DIR);
-
- cldir = AllocateDir(dir);
- if (cldir == NULL)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not open directory \"%s\": %m", dir)));
-
- errno = 0;
- while ((clde = readdir(cldir)) != NULL)
+ cldir = AllocateDir(TWOPHASE_DIR);
+ while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
{
if (strlen(clde->d_name) == 8 &&
strspn(clde->d_name, "0123456789ABCDEF") == 8)
(errmsg("removing future twophase state file \"%s\"",
clde->d_name)));
RemoveTwoPhaseFile(xid, true);
- errno = 0;
continue;
}
(errmsg("removing corrupt twophase state file \"%s\"",
clde->d_name)));
RemoveTwoPhaseFile(xid, true);
- errno = 0;
continue;
}
clde->d_name)));
RemoveTwoPhaseFile(xid, true);
pfree(buf);
- errno = 0;
continue;
}
pfree(buf);
}
- errno = 0;
}
-#ifdef WIN32
-
- /*
- * This fix is in mingw cvs (runtime/mingwex/dirent.c rev 1.4), but
- * not in released version
- */
- if (GetLastError() == ERROR_NO_MORE_FILES)
- errno = 0;
-#endif
- if (errno)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not read directory \"%s\": %m", dir)));
-
FreeDir(cldir);
return result;
DIR *cldir;
struct dirent *clde;
- snprintf(dir, MAXPGPATH, "%s/%s", DataDir, TWOPHASE_DIR);
+ snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR);
cldir = AllocateDir(dir);
- if (cldir == NULL)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not open directory \"%s\": %m", dir)));
-
- errno = 0;
- while ((clde = readdir(cldir)) != NULL)
+ while ((clde = ReadDir(cldir, dir)) != NULL)
{
if (strlen(clde->d_name) == 8 &&
strspn(clde->d_name, "0123456789ABCDEF") == 8)
(errmsg("removing stale twophase state file \"%s\"",
clde->d_name)));
RemoveTwoPhaseFile(xid, true);
- errno = 0;
continue;
}
(errmsg("removing corrupt twophase state file \"%s\"",
clde->d_name)));
RemoveTwoPhaseFile(xid, true);
- errno = 0;
continue;
}
/*
* Reconstruct subtrans state for the transaction --- needed
- * because pg_subtrans is not preserved over a restart
+ * because pg_subtrans is not preserved over a restart. Note
+ * that we are linking all the subtransactions directly to the
+ * top-level XID; there may originally have been a more complex
+ * hierarchy, but there's no need to restore that exactly.
*/
for (i = 0; i < hdr->nsubxacts; i++)
SubTransSetParent(subxids[i], xid);
/*
* Recreate its GXACT and dummy PGPROC
+ *
+ * Note: since we don't have the PREPARE record's WAL location
+ * at hand, we leave prepare_lsn zeroes. This means the GXACT
+ * will be fsync'd on every future checkpoint. We assume this
+ * situation is infrequent enough that the performance cost is
+ * negligible (especially since we know the state file has
+ * already been fsynced).
*/
- gxact = MarkAsPreparing(xid, hdr->database, hdr->gid, hdr->owner);
+ gxact = MarkAsPreparing(xid, hdr->gid,
+ hdr->prepared_at,
+ hdr->owner, hdr->database);
GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
MarkAsPrepared(gxact);
pfree(buf);
}
- errno = 0;
}
-#ifdef WIN32
-
- /*
- * This fix is in mingw cvs (runtime/mingwex/dirent.c rev 1.4), but
- * not in released version
- */
- if (GetLastError() == ERROR_NO_MORE_FILES)
- errno = 0;
-#endif
- if (errno)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not read directory \"%s\": %m", dir)));
-
FreeDir(cldir);
}