* fsynced
* * If COMMIT happens after checkpoint then backend reads state data from
* files
- * * In case of crash replay will move data from xlog to files, if that
- * hasn't happened before. XXX TODO - move to shmem in replay also
+ *
+ * During replay and replication, TwoPhaseState also holds information
+ * about active prepared transactions that haven't been moved to disk yet.
+ *
+ * Replay of twophase records happens by the following rules:
+ *
+ * * At the beginning of recovery, pg_twophase is scanned once, filling
+ * TwoPhaseState with entries marked with gxact->inredo and
+ * gxact->ondisk. Two-phase file data older than the XID horizon of
+ * the redo position are discarded.
+ * * On PREPARE redo, the transaction is added to TwoPhaseState->prepXacts.
+ * gxact->inredo is set to true for such entries.
+ * * On Checkpoint we iterate through TwoPhaseState->prepXacts entries
+ * that have gxact->inredo set and are behind the redo_horizon. We
+ * save them to disk and then switch gxact->ondisk to true.
+ * * On COMMIT/ABORT we delete the entry from TwoPhaseState->prepXacts.
+ * If gxact->ondisk is true, the corresponding entry from the disk
+ * is additionally deleted.
+ * * RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions()
+ * and PrescanPreparedTransactions() have been modified to go through
+ * gxact->inredo entries that have not made it to disk.
*
*-------------------------------------------------------------------------
*/
*/
XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare record start */
XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare record end */
+ TransactionId xid; /* The GXACT id */
Oid owner; /* ID of user that executed the xact */
BackendId locking_backend; /* backend currently working on the xact */
bool valid; /* TRUE if PGPROC entry is in proc array */
bool ondisk; /* TRUE if prepare state file is on disk */
+ bool inredo; /* TRUE if entry was added via xlog_redo */
char gid[GIDSIZE]; /* The GID assigned to the prepared xact */
} GlobalTransactionData;
static void RemoveGXact(GlobalTransaction gxact);
static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
+static char *ProcessTwoPhaseBuffer(TransactionId xid,
+ XLogRecPtr prepare_start_lsn,
+ bool fromdisk, bool overwriteOK, bool setParent,
+ TransactionId *result, TransactionId *maxsubxid);
+static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid,
+ const char *gid, TimestampTz prepared_at, Oid owner,
+ Oid databaseid);
+static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning);
+static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
/*
* Initialization of shared memory
/*
* MarkAsPreparing
* Reserve the GID for the given transaction.
- *
- * Internally, this creates a gxact struct and puts it into the active array.
- * NOTE: this is also used when reloading a gxact after a crash; so avoid
- * assuming that we can use very much backend context.
*/
GlobalTransaction
MarkAsPreparing(TransactionId xid, const char *gid,
TimestampTz prepared_at, Oid owner, Oid databaseid)
{
GlobalTransaction gxact;
- PGPROC *proc;
- PGXACT *pgxact;
int i;
if (strlen(gid) >= GIDSIZE)
gxact = TwoPhaseState->freeGXacts;
TwoPhaseState->freeGXacts = gxact->next;
+ MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid);
+
+ gxact->ondisk = false;
+
+ /* And insert it into the active array */
+ Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
+ TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
+
+ LWLockRelease(TwoPhaseStateLock);
+
+ return gxact;
+}
+
+/*
+ * MarkAsPreparingGuts
+ *
+ * This uses a gxact struct and puts it into the active array.
+ * NOTE: this is also used when reloading a gxact after a crash; so avoid
+ * assuming that we can use very much backend context.
+ *
+ * Note: This function should be called with appropriate locks held.
+ */
+static void
+MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
+ TimestampTz prepared_at, Oid owner, Oid databaseid)
+{
+ PGPROC *proc;
+ PGXACT *pgxact;
+ int i;
+
+ Assert(gxact != NULL);
proc = &ProcGlobal->allProcs[gxact->pgprocno];
pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
pgxact->nxids = 0;
gxact->prepared_at = prepared_at;
- /* initialize LSN to InvalidXLogRecPtr */
- gxact->prepare_start_lsn = InvalidXLogRecPtr;
- gxact->prepare_end_lsn = InvalidXLogRecPtr;
+ gxact->xid = xid;
gxact->owner = owner;
gxact->locking_backend = MyBackendId;
gxact->valid = false;
- gxact->ondisk = false;
+ gxact->inredo = false;
strcpy(gxact->gid, gid);
- /* And insert it into the active array */
- Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
- TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
-
/*
* Remember that we have this GlobalTransaction entry locked for us. If we
* abort after this, we must release it.
*/
MyLockedGxact = gxact;
-
- LWLockRelease(TwoPhaseStateLock);
-
- return gxact;
}
/*
* Reads 2PC data from xlog. During checkpoint this data will be moved to
* twophase files and ReadTwoPhaseFile should be used instead.
*
- * Note clearly that this function accesses WAL during normal operation, similarly
- * to the way WALSender or Logical Decoding would do. It does not run during
- * crash recovery or standby processing.
+ * Note clearly that this function can access WAL during normal operation,
+ * similarly to the way WALSender or Logical Decoding would do.
+ *
*/
static void
XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
XLogReaderState *xlogreader;
char *errormsg;
- Assert(!RecoveryInProgress());
-
xlogreader = XLogReaderAllocate(&read_local_xlog_page, NULL);
if (!xlogreader)
ereport(ERROR,
* If giveWarning is false, do not complain about file-not-present;
* this is an expected case during WAL replay.
*/
-void
+static void
RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
{
char path[MAXPGPATH];
*
* Note: content and len don't include CRC.
*/
-void
+static void
RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
{
char path[MAXPGPATH];
/*
* CheckPointTwoPhase -- handle 2PC component of checkpointing.
*
- * We must fsync the state file of any GXACT that is valid and has a PREPARE
- * LSN <= the checkpoint's redo horizon. (If the gxact isn't valid yet or
- * has a later LSN, this checkpoint is not responsible for fsyncing it.)
+ * We must fsync the state file of any GXACT that is valid or has been
+ * generated during redo and has a PREPARE LSN <= the checkpoint's redo
+ * horizon. (If the gxact isn't valid yet, has not been generated in
+ * redo, or has a later LSN, this checkpoint is not responsible for
+ * fsyncing it.)
*
* This is deliberately run as late as possible in the checkpoint sequence,
* because GXACTs ordinarily have short lifespans, and so it is quite
LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
+ /* Note that we are using gxact not pgxact so this works in recovery also */
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
- PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
- if (gxact->valid &&
+ if ((gxact->valid || gxact->inredo) &&
!gxact->ondisk &&
gxact->prepare_end_lsn <= redo_horizon)
{
int len;
XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
- RecreateTwoPhaseFile(pgxact->xid, buf, len);
+ RecreateTwoPhaseFile(gxact->xid, buf, len);
gxact->ondisk = true;
+ gxact->prepare_start_lsn = InvalidXLogRecPtr;
+ gxact->prepare_end_lsn = InvalidXLogRecPtr;
pfree(buf);
serialized_xacts++;
}
serialized_xacts)));
}
+/*
+ * restoreTwoPhaseData
+ *
+ * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
+ * This is called once at the beginning of recovery, saving any extra
+ * lookups in the future. Two-phase files that are newer than the
+ * minimum XID horizon are discarded on the way.
+ */
+void
+restoreTwoPhaseData(void)
+{
+ DIR *cldir;
+ struct dirent *clde;
+
+ cldir = AllocateDir(TWOPHASE_DIR);
+ while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
+ {
+ if (strlen(clde->d_name) == 8 &&
+ strspn(clde->d_name, "0123456789ABCDEF") == 8)
+ {
+ TransactionId xid;
+ char *buf;
+
+ xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
+
+ buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr,
+ true, false, false,
+ NULL, NULL);
+ if (buf == NULL)
+ continue;
+
+ PrepareRedoAdd(buf, InvalidXLogRecPtr, InvalidXLogRecPtr);
+ }
+ }
+ FreeDir(cldir);
+}
+
/*
* PrescanPreparedTransactions
*
- * Scan the pg_twophase directory and determine the range of valid XIDs
- * present. This is run during database startup, after we have completed
- * reading WAL. ShmemVariableCache->nextXid has been set to one more than
- * the highest XID for which evidence exists in WAL.
+ * Scan the shared memory entries of TwoPhaseState and determine the range
+ * of valid XIDs present. This is run during database startup, after we
+ * have completed reading WAL. ShmemVariableCache->nextXid has been set to
+ * one more than the highest XID for which evidence exists in WAL.
*
* We throw away any prepared xacts with main XID beyond nextXid --- if any
* are present, it suggests that the DBA has done a PITR recovery to an
{
TransactionId origNextXid = ShmemVariableCache->nextXid;
TransactionId result = origNextXid;
- DIR *cldir;
- struct dirent *clde;
+ TransactionId maxsubxid = origNextXid;
TransactionId *xids = NULL;
int nxids = 0;
int allocsize = 0;
+ int i;
- cldir = AllocateDir(TWOPHASE_DIR);
- while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
+ LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+ for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
- if (strlen(clde->d_name) == 8 &&
- strspn(clde->d_name, "0123456789ABCDEF") == 8)
- {
- TransactionId xid;
- char *buf;
- TwoPhaseFileHeader *hdr;
- TransactionId *subxids;
- int i;
-
- xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
-
- /* Reject XID if too new */
- if (TransactionIdFollowsOrEquals(xid, origNextXid))
- {
- ereport(WARNING,
- (errmsg("removing future two-phase state file \"%s\"",
- clde->d_name)));
- RemoveTwoPhaseFile(xid, true);
- continue;
- }
+ TransactionId xid;
+ char *buf;
+ GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
- /*
- * Note: we can't check if already processed because clog
- * subsystem isn't up yet.
- */
+ Assert(gxact->inredo);
- /* Read and validate file */
- buf = ReadTwoPhaseFile(xid, true);
- if (buf == NULL)
- {
- ereport(WARNING,
- (errmsg("removing corrupt two-phase state file \"%s\"",
- clde->d_name)));
- RemoveTwoPhaseFile(xid, true);
- continue;
- }
+ xid = gxact->xid;
- /* Deconstruct header */
- hdr = (TwoPhaseFileHeader *) buf;
- if (!TransactionIdEquals(hdr->xid, xid))
- {
- ereport(WARNING,
- (errmsg("removing corrupt two-phase state file \"%s\"",
- clde->d_name)));
- RemoveTwoPhaseFile(xid, true);
- pfree(buf);
- continue;
- }
+ buf = ProcessTwoPhaseBuffer(xid,
+ gxact->prepare_start_lsn,
+ gxact->ondisk, false, false,
+ &result, &maxsubxid);
- /*
- * OK, we think this file is valid. Incorporate xid into the
- * running-minimum result.
- */
- if (TransactionIdPrecedes(xid, result))
- result = xid;
+ if (buf == NULL)
+ continue;
- /*
- * Examine subtransaction XIDs ... they should all follow main
- * XID, and they may force us to advance nextXid.
- *
- * We don't expect anyone else to modify nextXid, hence we don't
- * need to hold a lock while examining it. We still acquire the
- * lock to modify it, though.
- */
- subxids = (TransactionId *) (buf +
- MAXALIGN(sizeof(TwoPhaseFileHeader)) +
- MAXALIGN(hdr->gidlen));
- for (i = 0; i < hdr->nsubxacts; i++)
+ if (xids_p)
+ {
+ if (nxids == allocsize)
{
- TransactionId subxid = subxids[i];
-
- Assert(TransactionIdFollows(subxid, xid));
- if (TransactionIdFollowsOrEquals(subxid,
- ShmemVariableCache->nextXid))
+ if (nxids == 0)
{
- LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
- ShmemVariableCache->nextXid = subxid;
- TransactionIdAdvance(ShmemVariableCache->nextXid);
- LWLockRelease(XidGenLock);
+ allocsize = 10;
+ xids = palloc(allocsize * sizeof(TransactionId));
}
- }
-
-
- if (xids_p)
- {
- if (nxids == allocsize)
+ else
{
- if (nxids == 0)
- {
- allocsize = 10;
- xids = palloc(allocsize * sizeof(TransactionId));
- }
- else
- {
- allocsize = allocsize * 2;
- xids = repalloc(xids, allocsize * sizeof(TransactionId));
- }
+ allocsize = allocsize * 2;
+ xids = repalloc(xids, allocsize * sizeof(TransactionId));
}
- xids[nxids++] = xid;
}
-
- pfree(buf);
+ xids[nxids++] = xid;
}
+
+ pfree(buf);
}
- FreeDir(cldir);
+ LWLockRelease(TwoPhaseStateLock);
if (xids_p)
{
*nxids_p = nxids;
}
+ /* update nextXid if needed */
+ if (TransactionIdFollowsOrEquals(maxsubxid, ShmemVariableCache->nextXid))
+ {
+ LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
+ ShmemVariableCache->nextXid = maxsubxid;
+ TransactionIdAdvance(ShmemVariableCache->nextXid);
+ LWLockRelease(XidGenLock);
+ }
+
return result;
}
/*
* StandbyRecoverPreparedTransactions
*
- * Scan the pg_twophase directory and setup all the required information to
- * allow standby queries to treat prepared transactions as still active.
+ * Scan the shared memory entries of TwoPhaseState and setup all the required
+ * information to allow standby queries to treat prepared transactions as still
+ * active.
+ *
* This is never called at the end of recovery - we use
* RecoverPreparedTransactions() at that point.
*
void
StandbyRecoverPreparedTransactions(bool overwriteOK)
{
- DIR *cldir;
- struct dirent *clde;
+ int i;
- cldir = AllocateDir(TWOPHASE_DIR);
- while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
+ LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+ for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
- if (strlen(clde->d_name) == 8 &&
- strspn(clde->d_name, "0123456789ABCDEF") == 8)
- {
- TransactionId xid;
- char *buf;
- TwoPhaseFileHeader *hdr;
- TransactionId *subxids;
- int i;
-
- xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
-
- /* Already processed? */
- if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
- {
- ereport(WARNING,
- (errmsg("removing stale two-phase state file \"%s\"",
- clde->d_name)));
- RemoveTwoPhaseFile(xid, true);
- continue;
- }
-
- /* Read and validate file */
- buf = ReadTwoPhaseFile(xid, true);
- if (buf == NULL)
- {
- ereport(WARNING,
- (errmsg("removing corrupt two-phase state file \"%s\"",
- clde->d_name)));
- RemoveTwoPhaseFile(xid, true);
- continue;
- }
+ TransactionId xid;
+ char *buf;
+ GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
- /* Deconstruct header */
- hdr = (TwoPhaseFileHeader *) buf;
- if (!TransactionIdEquals(hdr->xid, xid))
- {
- ereport(WARNING,
- (errmsg("removing corrupt two-phase state file \"%s\"",
- clde->d_name)));
- RemoveTwoPhaseFile(xid, true);
- pfree(buf);
- continue;
- }
+ Assert(gxact->inredo);
- /*
- * Examine subtransaction XIDs ... they should all follow main
- * XID.
- */
- subxids = (TransactionId *) (buf +
- MAXALIGN(sizeof(TwoPhaseFileHeader)) +
- MAXALIGN(hdr->gidlen));
- for (i = 0; i < hdr->nsubxacts; i++)
- {
- TransactionId subxid = subxids[i];
-
- Assert(TransactionIdFollows(subxid, xid));
- SubTransSetParent(xid, subxid, overwriteOK);
- }
+ xid = gxact->xid;
+ buf = ProcessTwoPhaseBuffer(xid,
+ gxact->prepare_start_lsn,
+ gxact->ondisk, overwriteOK, true,
+ NULL, NULL);
+ if (buf != NULL)
pfree(buf);
- }
}
- FreeDir(cldir);
+ LWLockRelease(TwoPhaseStateLock);
}
/*
* RecoverPreparedTransactions
*
- * Scan the pg_twophase directory and reload shared-memory state for each
- * prepared transaction (reacquire locks, etc). This is run during database
- * startup.
+ * Scan the shared memory entries of TwoPhaseState and reload the state for
+ * each prepared transaction (reacquire locks, etc).
+ *
+ * This is run during database startup.
*/
void
RecoverPreparedTransactions(void)
{
- char dir[MAXPGPATH];
- DIR *cldir;
- struct dirent *clde;
- bool overwriteOK = false;
-
- snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR);
+ int i;
- cldir = AllocateDir(dir);
- while ((clde = ReadDir(cldir, dir)) != NULL)
+ /*
+ * Don't need a lock in the recovery phase.
+ */
+ for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
- if (strlen(clde->d_name) == 8 &&
- strspn(clde->d_name, "0123456789ABCDEF") == 8)
- {
- TransactionId xid;
- char *buf;
- char *bufptr;
- TwoPhaseFileHeader *hdr;
- TransactionId *subxids;
- GlobalTransaction gxact;
- const char *gid;
- int i;
+ TransactionId xid;
+ char *buf;
+ GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+ char *bufptr;
+ TwoPhaseFileHeader *hdr;
+ TransactionId *subxids;
+ const char *gid;
+ bool overwriteOK = false;
+ int i;
- xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
+ xid = gxact->xid;
- /* Already processed? */
- if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
- {
- ereport(WARNING,
- (errmsg("removing stale two-phase state file \"%s\"",
- clde->d_name)));
- RemoveTwoPhaseFile(xid, true);
- continue;
- }
+ buf = ProcessTwoPhaseBuffer(xid,
+ gxact->prepare_start_lsn,
+ gxact->ondisk, false, false,
+ NULL, NULL);
+ if (buf == NULL)
+ continue;
- /* Read and validate file */
- buf = ReadTwoPhaseFile(xid, true);
- if (buf == NULL)
- {
- ereport(WARNING,
- (errmsg("removing corrupt two-phase state file \"%s\"",
- clde->d_name)));
- RemoveTwoPhaseFile(xid, true);
- continue;
- }
+ ereport(LOG,
+ (errmsg("recovering prepared transaction %u from shared memory", xid)));
+
+ hdr = (TwoPhaseFileHeader *) buf;
+ Assert(TransactionIdEquals(hdr->xid, xid));
+ bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
+ gid = (const char *) bufptr;
+ bufptr += MAXALIGN(hdr->gidlen);
+ subxids = (TransactionId *) bufptr;
+ bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
+ bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
+ bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+ bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
- ereport(LOG,
- (errmsg("recovering prepared transaction %u", xid)));
-
- /* Deconstruct header */
- hdr = (TwoPhaseFileHeader *) buf;
- Assert(TransactionIdEquals(hdr->xid, xid));
- bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
- gid = (const char *) bufptr;
- bufptr += MAXALIGN(hdr->gidlen);
- subxids = (TransactionId *) bufptr;
- bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
- bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
- bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
- bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
+ /*
+ * It's possible that SubTransSetParent has been set before, if
+ * the prepared transaction generated xid assignment records. Test
+ * here must match one used in AssignTransactionId().
+ */
+ if (InHotStandby && (hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS ||
+ XLogLogicalInfoActive()))
+ overwriteOK = true;
- /*
- * It's possible that SubTransSetParent has been set before, if
- * the prepared transaction generated xid assignment records. Test
- * here must match one used in AssignTransactionId().
- */
- if (InHotStandby && (hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS ||
- XLogLogicalInfoActive()))
- overwriteOK = true;
+ /*
+ * Reconstruct subtrans state for the transaction --- needed
+ * because pg_subtrans is not preserved over a restart. Note that
+ * we are linking all the subtransactions directly to the
+ * top-level XID; there may originally have been a more complex
+ * hierarchy, but there's no need to restore that exactly.
+ */
+ for (i = 0; i < hdr->nsubxacts; i++)
+ SubTransSetParent(subxids[i], xid, overwriteOK);
- /*
- * Reconstruct subtrans state for the transaction --- needed
- * because pg_subtrans is not preserved over a restart. Note that
- * we are linking all the subtransactions directly to the
- * top-level XID; there may originally have been a more complex
- * hierarchy, but there's no need to restore that exactly.
- */
- for (i = 0; i < hdr->nsubxacts; i++)
- SubTransSetParent(subxids[i], xid, overwriteOK);
+ /*
+ * Recreate its GXACT and dummy PGPROC. But, check whether
+ * it was added in redo and already has a shmem entry for
+ * it.
+ */
+ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+ MarkAsPreparingGuts(gxact, xid, gid,
+ hdr->prepared_at,
+ hdr->owner, hdr->database);
- /*
- * Recreate its GXACT and dummy PGPROC
- */
- gxact = MarkAsPreparing(xid, gid,
- hdr->prepared_at,
- hdr->owner, hdr->database);
- gxact->ondisk = true;
- GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
- MarkAsPrepared(gxact);
+ /* recovered, so reset the flag for entries generated by redo */
+ gxact->inredo = false;
- /*
- * Recover other state (notably locks) using resource managers
- */
- ProcessRecords(bufptr, xid, twophase_recover_callbacks);
+ LWLockRelease(TwoPhaseStateLock);
- /*
- * Release locks held by the standby process after we process each
- * prepared transaction. As a result, we don't need too many
- * additional locks at any one time.
- */
- if (InHotStandby)
- StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
+ GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
+ MarkAsPrepared(gxact);
- /*
- * We're done with recovering this transaction. Clear
- * MyLockedGxact, like we do in PrepareTransaction() during normal
- * operation.
- */
- PostPrepare_Twophase();
+ /*
+ * Recover other state (notably locks) using resource managers
+ */
+ ProcessRecords(bufptr, xid, twophase_recover_callbacks);
- pfree(buf);
+ /*
+ * Release locks held by the standby process after we process each
+ * prepared transaction. As a result, we don't need too many
+ * additional locks at any one time.
+ */
+ if (InHotStandby)
+ StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
+
+ /*
+ * We're done with recovering this transaction. Clear
+ * MyLockedGxact, like we do in PrepareTransaction() during normal
+ * operation.
+ */
+ PostPrepare_Twophase();
+
+ pfree(buf);
+ }
+}
+
+/*
+ * ProcessTwoPhaseBuffer
+ *
+ * Given a transaction id, read it either from disk or read it directly
+ * via shmem xlog record pointer using the provided "prepare_start_lsn".
+ *
+ * If setParent is true, then use the overwriteOK parameter to set up
+ * subtransaction parent linkages.
+ *
+ * If result and maxsubxid are not NULL, fill them up with smallest
+ * running transaction id (lesser than ShmemVariableCache->nextXid)
+ * and largest subtransaction id for this transaction respectively.
+ */
+static char *
+ProcessTwoPhaseBuffer(TransactionId xid,
+ XLogRecPtr prepare_start_lsn,
+ bool fromdisk, bool overwriteOK,
+ bool setParent, TransactionId *result,
+ TransactionId *maxsubxid)
+{
+ TransactionId origNextXid = ShmemVariableCache->nextXid;
+ TransactionId res;
+ TransactionId maxsub;
+ TransactionId *subxids;
+ char *buf;
+ TwoPhaseFileHeader *hdr;
+ int i;
+
+ if (!fromdisk)
+ Assert(prepare_start_lsn != InvalidXLogRecPtr);
+
+ if (result)
+ res = *result;
+ if (maxsubxid)
+ maxsub = *maxsubxid;
+
+ /* Already processed? */
+ if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
+ {
+ if (fromdisk)
+ {
+ ereport(WARNING,
+ (errmsg("removing stale two-phase state file for \"%u\"",
+ xid)));
+ RemoveTwoPhaseFile(xid, true);
}
+ else
+ {
+ ereport(WARNING,
+ (errmsg("removing stale two-phase state from"
+ " shared memory for \"%u\"", xid)));
+ PrepareRedoRemove(xid, true);
+ }
+ return NULL;
}
- FreeDir(cldir);
+
+ /* Reject XID if too new */
+ if (TransactionIdFollowsOrEquals(xid, origNextXid))
+ {
+ if (fromdisk)
+ {
+ ereport(WARNING,
+ (errmsg("removing future two-phase state file for \"%u\"",
+ xid)));
+ RemoveTwoPhaseFile(xid, true);
+ }
+ else
+ {
+ ereport(WARNING,
+ (errmsg("removing future two-phase state from memory for \"%u\"",
+ xid)));
+ PrepareRedoRemove(xid, true);
+ }
+ return NULL;
+ }
+
+ if (fromdisk)
+ {
+ /* Read and validate file */
+ buf = ReadTwoPhaseFile(xid, true);
+ if (buf == NULL)
+ {
+ ereport(WARNING,
+ (errmsg("removing corrupt two-phase state file for \"%u\"",
+ xid)));
+ RemoveTwoPhaseFile(xid, true);
+ return NULL;
+ }
+ }
+ else
+ {
+ /* Read xlog data */
+ XlogReadTwoPhaseData(prepare_start_lsn, &buf, NULL);
+ }
+
+ /* Deconstruct header */
+ hdr = (TwoPhaseFileHeader *) buf;
+ if (!TransactionIdEquals(hdr->xid, xid))
+ {
+ if (fromdisk)
+ {
+ ereport(WARNING,
+ (errmsg("removing corrupt two-phase state file for \"%u\"",
+ xid)));
+ RemoveTwoPhaseFile(xid, true);
+ }
+ else
+ {
+ ereport(WARNING,
+ (errmsg("removing corrupt two-phase state from memory for \"%u\"",
+ xid)));
+ PrepareRedoRemove(xid, true);
+ }
+ pfree(buf);
+ return NULL;
+ }
+
+ /*
+ * OK, we think this buffer is valid. Incorporate xid into the
+ * running-minimum result.
+ */
+ if (TransactionIdPrecedes(xid, res))
+ res = xid;
+
+ /*
+ * Examine subtransaction XIDs ... they should all follow main
+ * XID, and they may force us to advance nextXid.
+ */
+ subxids = (TransactionId *) (buf +
+ MAXALIGN(sizeof(TwoPhaseFileHeader)) +
+ MAXALIGN(hdr->gidlen));
+ for (i = 0; i < hdr->nsubxacts; i++)
+ {
+ TransactionId subxid = subxids[i];
+
+ Assert(TransactionIdFollows(subxid, xid));
+ if (TransactionIdFollowsOrEquals(subxid, maxsub))
+ maxsub = subxid;
+ if (setParent)
+ SubTransSetParent(xid, subxid, overwriteOK);
+ }
+
+ if (result)
+ *result = res;
+ if (maxsubxid)
+ *maxsubxid = maxsub;
+
+ return buf;
}
+
/*
* RecordTransactionCommitPrepared
*
*/
SyncRepWaitForLSN(recptr, false);
}
+
+/*
+ * PrepareRedoAdd
+ *
+ * Store pointers to the start/end of the WAL record along with the xid in
+ * a gxact entry in shared memory TwoPhaseState structure. If caller
+ * specifies InvalidXLogRecPtr as WAL position to fetch the two-phase
+ * data, the entry is marked as located on disk.
+ */
+void
+PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
+{
+ TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
+ char *bufptr;
+ const char *gid;
+ GlobalTransaction gxact;
+
+ Assert(RecoveryInProgress());
+
+ bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
+ gid = (const char *) bufptr;
+
+ /*
+ * Reserve the GID for the given transaction in the redo code path.
+ *
+ * This creates a gxact struct and puts it into the active array.
+ *
+ * In redo, this struct is mainly used to track PREPARE/COMMIT entries
+ * in shared memory. Hence, we only fill up the bare minimum contents here.
+ * The gxact also gets marked with gxact->inredo set to true to indicate
+ * that it got added in the redo phase
+ */
+
+ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+ /* Get a free gxact from the freelist */
+ if (TwoPhaseState->freeGXacts == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("maximum number of prepared transactions reached"),
+ errhint("Increase max_prepared_transactions (currently %d).",
+ max_prepared_xacts)));
+ gxact = TwoPhaseState->freeGXacts;
+ TwoPhaseState->freeGXacts = gxact->next;
+
+ gxact->prepared_at = hdr->prepared_at;
+ gxact->prepare_start_lsn = start_lsn;
+ gxact->prepare_end_lsn = end_lsn;
+ gxact->xid = hdr->xid;
+ gxact->owner = hdr->owner;
+ gxact->locking_backend = InvalidBackendId;
+ gxact->valid = false;
+ gxact->ondisk = XLogRecPtrIsInvalid(start_lsn);
+ gxact->inredo = true; /* yes, added in redo */
+ strcpy(gxact->gid, gid);
+
+ /* And insert it into the active array */
+ Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
+ TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
+
+ LWLockRelease(TwoPhaseStateLock);
+
+ elog(DEBUG2, "Adding 2PC data to shared memory %u", gxact->xid);
+}
+
+/*
+ * PrepareRedoRemove
+ *
+ * Remove the corresponding gxact entry from TwoPhaseState. Also
+ * remove the 2PC file if a prepared transaction was saved via
+ * an earlier checkpoint.
+ */
+void
+PrepareRedoRemove(TransactionId xid, bool giveWarning)
+{
+ GlobalTransaction gxact = NULL;
+ int i;
+
+ Assert(RecoveryInProgress());
+
+ LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+ for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+ {
+ gxact = TwoPhaseState->prepXacts[i];
+
+ if (gxact->xid == xid)
+ {
+ Assert(gxact->inredo);
+ break;
+ }
+ }
+ LWLockRelease(TwoPhaseStateLock);
+
+ /*
+ * Just leave if there is nothing, this is expected during WAL replay.
+ */
+ if (gxact == NULL)
+ return;
+
+ /*
+ * And now we can clean up any files we may have left.
+ */
+ elog(DEBUG2, "Removing 2PC data from shared memory %u", xid);
+ if (gxact->ondisk)
+ RemoveTwoPhaseFile(xid, giveWarning);
+ RemoveGXact(gxact);
+
+ return;
+}