static TwoPhaseStateData *TwoPhaseState;
/*
- * Global transaction entry currently locked by us, if any.
+ * Global transaction entry currently locked by us, if any. Note that any
+ * access to the entry pointed to by this variable must be protected by
+ * TwoPhaseStateLock, though obviously the pointer itself doesn't need to be
+ * (since it's just local memory).
*/
static GlobalTransaction MyLockedGxact = NULL;
* resources held by the transaction yet. In those cases, the in-memory
* state can be wrong, but it's too late to back out.
*/
+ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
if (!MyLockedGxact->valid)
- {
RemoveGXact(MyLockedGxact);
- }
else
- {
- LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
-
MyLockedGxact->locking_backend = InvalidBackendId;
+ LWLockRelease(TwoPhaseStateLock);
- LWLockRelease(TwoPhaseStateLock);
- }
MyLockedGxact = NULL;
}
PGXACT *pgxact;
int i;
+ Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
+
Assert(gxact != NULL);
proc = &ProcGlobal->allProcs[gxact->pgprocno];
pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
/*
* MarkAsPrepared
* Mark the GXACT as fully valid, and enter it into the global ProcArray.
+ *
+ * lock_held indicates whether caller already holds TwoPhaseStateLock.
*/
static void
-MarkAsPrepared(GlobalTransaction gxact)
+MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
{
/* Lock here may be overkill, but I'm not convinced of that ... */
- LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+ if (!lock_held)
+ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
Assert(!gxact->valid);
gxact->valid = true;
- LWLockRelease(TwoPhaseStateLock);
+ if (!lock_held)
+ LWLockRelease(TwoPhaseStateLock);
/*
* Put it into the global ProcArray so TransactionIdIsInProgress considers
{
int i;
- LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+ Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
gxact->next = TwoPhaseState->freeGXacts;
TwoPhaseState->freeGXacts = gxact;
- LWLockRelease(TwoPhaseStateLock);
-
return;
}
}
- LWLockRelease(TwoPhaseStateLock);
-
elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
}
* the xact crashed. Instead we have a window where the same XID appears
* twice in ProcArray, which is OK.
*/
- MarkAsPrepared(gxact);
+ MarkAsPrepared(gxact, false);
/*
* Now we can mark ourselves as out of the commit critical section: a
if (gxact->ondisk)
RemoveTwoPhaseFile(xid, true);
+ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
RemoveGXact(gxact);
+ LWLockRelease(TwoPhaseStateLock);
MyLockedGxact = NULL;
pfree(buf);
struct dirent *clde;
cldir = AllocateDir(TWOPHASE_DIR);
+ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
{
if (strlen(clde->d_name) == 8 &&
PrepareRedoAdd(buf, InvalidXLogRecPtr, InvalidXLogRecPtr);
}
}
+ LWLockRelease(TwoPhaseStateLock);
FreeDir(cldir);
}
int allocsize = 0;
int i;
- LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
TransactionId xid;
{
int i;
- LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
TransactionId xid;
* Scan the shared memory entries of TwoPhaseState and reload the state for
* each prepared transaction (reacquire locks, etc).
*
- * This is run during database startup.
+ * This is run at the end of recovery, but before we allow backends to write
+ * WAL.
*
* At the end of recovery the way we take snapshots will change. We now need
* to mark all running transactions with their full SubTransSetParent() info
{
int i;
- /*
- * Don't need a lock in the recovery phase.
- */
+ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
TransactionId xid;
* Recreate its GXACT and dummy PGPROC. But, check whether it was
* added in redo and already has a shmem entry for it.
*/
- LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
MarkAsPreparingGuts(gxact, xid, gid,
hdr->prepared_at,
hdr->owner, hdr->database);
/* recovered, so reset the flag for entries generated by redo */
gxact->inredo = false;
- LWLockRelease(TwoPhaseStateLock);
-
GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
- MarkAsPrepared(gxact);
+ MarkAsPrepared(gxact, true);
+
+ LWLockRelease(TwoPhaseStateLock);
/*
- * Recover other state (notably locks) using resource managers
+ * Recover other state (notably locks) using resource managers.
*/
ProcessRecords(bufptr, xid, twophase_recover_callbacks);
PostPrepare_Twophase();
pfree(buf);
+
+ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
}
+
+ LWLockRelease(TwoPhaseStateLock);
}
/*
TwoPhaseFileHeader *hdr;
int i;
+ Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
+
if (!fromdisk)
Assert(prepare_start_lsn != InvalidXLogRecPtr);
else
{
ereport(WARNING,
- (errmsg("removing stale two-phase state from"
- " shared memory for \"%u\"", xid)));
+ (errmsg("removing stale two-phase state from shared memory for \"%u\"",
+ xid)));
PrepareRedoRemove(xid, true);
}
return NULL;
const char *gid;
GlobalTransaction gxact;
+ Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
Assert(RecoveryInProgress());
bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
* that it got added in the redo phase
*/
- LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
/* Get a free gxact from the freelist */
if (TwoPhaseState->freeGXacts == NULL)
ereport(ERROR,
Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
- LWLockRelease(TwoPhaseStateLock);
-
- elog(DEBUG2, "Adding 2PC data to shared memory %u", gxact->xid);
+ elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid);
}
/*
* PrepareRedoRemove
*
- * Remove the corresponding gxact entry from TwoPhaseState. Also
- * remove the 2PC file if a prepared transaction was saved via
- * an earlier checkpoint.
+ * Remove the corresponding gxact entry from TwoPhaseState. Also remove
+ * the 2PC file if a prepared transaction was saved via an earlier checkpoint.
+ *
+ * Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState
+ * is updated.
*/
void
PrepareRedoRemove(TransactionId xid, bool giveWarning)
int i;
bool found = false;
+ Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
Assert(RecoveryInProgress());
- LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
gxact = TwoPhaseState->prepXacts[i];
break;
}
}
- LWLockRelease(TwoPhaseStateLock);
/*
* Just leave if there is nothing, this is expected during WAL replay.
/*
* And now we can clean up any files we may have left.
*/
- elog(DEBUG2, "Removing 2PC data from shared memory %u", xid);
+ elog(DEBUG2, "removing 2PC data for transaction %u", xid);
if (gxact->ondisk)
RemoveTwoPhaseFile(xid, giveWarning);
RemoveGXact(gxact);
int i;
TimestampTz commit_time;
+ Assert(TransactionIdIsValid(xid));
+
max_xid = TransactionIdLatest(xid, parsed->nsubxacts, parsed->subxacts);
/*
int i;
TransactionId max_xid;
+ Assert(TransactionIdIsValid(xid));
+
/*
* Make sure nextXid is beyond any XID mentioned in the record.
*
/* Backup blocks are not used in xact records */
Assert(!XLogRecHasAnyBlockRefs(record));
- if (info == XLOG_XACT_COMMIT || info == XLOG_XACT_COMMIT_PREPARED)
+ if (info == XLOG_XACT_COMMIT)
{
xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
xl_xact_parsed_commit parsed;
- ParseCommitRecord(XLogRecGetInfo(record), xlrec,
- &parsed);
+ ParseCommitRecord(XLogRecGetInfo(record), xlrec, &parsed);
+ xact_redo_commit(&parsed, XLogRecGetXid(record),
+ record->EndRecPtr, XLogRecGetOrigin(record));
+ }
+ else if (info == XLOG_XACT_COMMIT_PREPARED)
+ {
+ xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
+ xl_xact_parsed_commit parsed;
- if (info == XLOG_XACT_COMMIT)
- {
- Assert(!TransactionIdIsValid(parsed.twophase_xid));
- xact_redo_commit(&parsed, XLogRecGetXid(record),
- record->EndRecPtr, XLogRecGetOrigin(record));
- }
- else
- {
- Assert(TransactionIdIsValid(parsed.twophase_xid));
- xact_redo_commit(&parsed, parsed.twophase_xid,
- record->EndRecPtr, XLogRecGetOrigin(record));
+ ParseCommitRecord(XLogRecGetInfo(record), xlrec, &parsed);
+ xact_redo_commit(&parsed, parsed.twophase_xid,
+ record->EndRecPtr, XLogRecGetOrigin(record));
- /* Delete TwoPhaseState gxact entry and/or 2PC file. */
- PrepareRedoRemove(parsed.twophase_xid, false);
- }
+ /* Delete TwoPhaseState gxact entry and/or 2PC file. */
+ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+ PrepareRedoRemove(parsed.twophase_xid, false);
+ LWLockRelease(TwoPhaseStateLock);
}
- else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED)
+ else if (info == XLOG_XACT_ABORT)
{
xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
xl_xact_parsed_abort parsed;
- ParseAbortRecord(XLogRecGetInfo(record), xlrec,
- &parsed);
+ ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed);
+ xact_redo_abort(&parsed, XLogRecGetXid(record));
+ }
+ else if (info == XLOG_XACT_ABORT_PREPARED)
+ {
+ xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
+ xl_xact_parsed_abort parsed;
- if (info == XLOG_XACT_ABORT)
- {
- Assert(!TransactionIdIsValid(parsed.twophase_xid));
- xact_redo_abort(&parsed, XLogRecGetXid(record));
- }
- else
- {
- Assert(TransactionIdIsValid(parsed.twophase_xid));
- xact_redo_abort(&parsed, parsed.twophase_xid);
+ ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed);
+ xact_redo_abort(&parsed, parsed.twophase_xid);
- /* Delete TwoPhaseState gxact entry and/or 2PC file. */
- PrepareRedoRemove(parsed.twophase_xid, false);
- }
+ /* Delete TwoPhaseState gxact entry and/or 2PC file. */
+ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+ PrepareRedoRemove(parsed.twophase_xid, false);
+ LWLockRelease(TwoPhaseStateLock);
}
else if (info == XLOG_XACT_PREPARE)
{
* Store xid and start/end pointers of the WAL record in TwoPhaseState
* gxact entry.
*/
+ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
PrepareRedoAdd(XLogRecGetData(record),
record->ReadRecPtr,
record->EndRecPtr);
+ LWLockRelease(TwoPhaseStateLock);
}
else if (info == XLOG_XACT_ASSIGNMENT)
{