]> granicus.if.org Git - postgresql/blobdiff - src/backend/access/transam/twophase.c
pgindent run for 9.4
[postgresql] / src / backend / access / transam / twophase.c
index faafc7e5c18c7d717affac2f1ee1e59d9e488063..70ca6ab67d1ef8a25f7f8bf9515a116239aa8fb8 100644 (file)
@@ -3,11 +3,11 @@
  * twophase.c
  *             Two-phase commit support functions.
  *
- * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *             $PostgreSQL: pgsql/src/backend/access/transam/twophase.c,v 1.60 2010/04/13 14:17:46 heikki Exp $
+ *             src/backend/access/transam/twophase.c
  *
  * NOTES
  *             Each global transaction is associated with a global transaction
  *             GIDs and aborts the transaction if there already is a global
  *             transaction in prepared state with the same GID.
  *
- *             A global transaction (gxact) also has a dummy PGPROC that is entered
- *             into the ProcArray array; this is what keeps the XID considered
- *             running by TransactionIdIsInProgress.  It is also convenient as a
- *             PGPROC to hook the gxact's locks to.
+ *             A global transaction (gxact) also has dummy PGXACT and PGPROC; this is
+ *             what keeps the XID considered running by TransactionIdIsInProgress.
+ *             It is also convenient as a PGPROC to hook the gxact's locks to.
  *
  *             In order to survive crashes and shutdowns, all prepared
  *             transactions must be stored in permanent storage. This includes
 #include <time.h>
 #include <unistd.h>
 
-#include "access/htup.h"
+#include "access/htup_details.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
 #include "access/twophase.h"
 #include "access/twophase_rmgr.h"
 #include "access/xact.h"
+#include "access/xlog.h"
 #include "access/xlogutils.h"
 #include "catalog/pg_type.h"
 #include "catalog/storage.h"
 #include "miscadmin.h"
 #include "pg_trace.h"
 #include "pgstat.h"
+#include "replication/walsender.h"
+#include "replication/syncrep.h"
 #include "storage/fd.h"
+#include "storage/predicate.h"
+#include "storage/proc.h"
 #include "storage/procarray.h"
 #include "storage/sinvaladt.h"
 #include "storage/smgr.h"
 #include "utils/builtins.h"
 #include "utils/memutils.h"
+#include "utils/timestamp.h"
 
 
 /*
@@ -75,11 +80,6 @@ int                  max_prepared_xacts = 0;
  * This struct describes one global transaction that is in prepared state
  * or attempting to become prepared.
  *
- * The first component of the struct is a dummy PGPROC that is inserted
- * into the global ProcArray so that the transaction appears to still be
- * running and holding locks.  It must be first because we cast pointers
- * to PGPROC and pointers to GlobalTransactionData back and forth.
- *
  * The lifecycle of a global transaction is:
  *
  * 1. After checking that the requested GID is not in use, set up an
@@ -87,7 +87,7 @@ int                   max_prepared_xacts = 0;
  * with locking_xid = my own XID and valid = false.
  *
  * 2. After successfully completing prepare, set valid = true and enter the
- * contained PGPROC into the global ProcArray.
+ * referenced PGPROC into the global ProcArray.
  *
  * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry
  * is valid and its locking_xid is no longer active, then store my current
@@ -109,7 +109,8 @@ int                 max_prepared_xacts = 0;
 
 typedef struct GlobalTransactionData
 {
-       PGPROC          proc;                   /* dummy proc */
+       GlobalTransaction next;         /* list link for free list */
+       int                     pgprocno;               /* ID of associated dummy PGPROC */
        BackendId       dummyBackendId; /* similar to backend id for backends */
        TimestampTz prepared_at;        /* time of preparation */
        XLogRecPtr      prepare_lsn;    /* XLOG offset of prepare record */
@@ -117,7 +118,7 @@ typedef struct GlobalTransactionData
        TransactionId locking_xid;      /* top-level XID of backend working on xact */
        bool            valid;                  /* TRUE if fully prepared */
        char            gid[GIDSIZE];   /* The GID assigned to the prepared xact */
-} GlobalTransactionData;
+}      GlobalTransactionData;
 
 /*
  * Two Phase Commit shared state.  Access to this struct is protected
@@ -203,9 +204,13 @@ TwoPhaseShmemInit(void)
                                          sizeof(GlobalTransaction) * max_prepared_xacts));
                for (i = 0; i < max_prepared_xacts; i++)
                {
-                       gxacts[i].proc.links.next = (SHM_QUEUE *) TwoPhaseState->freeGXacts;
+                       /* insert into linked list */
+                       gxacts[i].next = TwoPhaseState->freeGXacts;
                        TwoPhaseState->freeGXacts = &gxacts[i];
 
+                       /* associate it with a PGPROC assigned by InitProcGlobal */
+                       gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno;
+
                        /*
                         * Assign a unique ID for each dummy proc, so that the range of
                         * dummy backend IDs immediately follows the range of normal
@@ -239,6 +244,8 @@ MarkAsPreparing(TransactionId xid, const char *gid,
                                TimestampTz prepared_at, Oid owner, Oid databaseid)
 {
        GlobalTransaction gxact;
+       PGPROC     *proc;
+       PGXACT     *pgxact;
        int                     i;
 
        if (strlen(gid) >= GIDSIZE)
@@ -270,7 +277,7 @@ MarkAsPreparing(TransactionId xid, const char *gid,
                        TwoPhaseState->numPrepXacts--;
                        TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
                        /* and put it back in the freelist */
-                       gxact->proc.links.next = (SHM_QUEUE *) TwoPhaseState->freeGXacts;
+                       gxact->next = TwoPhaseState->freeGXacts;
                        TwoPhaseState->freeGXacts = gxact;
                        /* Back up index count too, so we don't miss scanning one */
                        i--;
@@ -298,37 +305,40 @@ MarkAsPreparing(TransactionId xid, const char *gid,
                                 errhint("Increase max_prepared_transactions (currently %d).",
                                                 max_prepared_xacts)));
        gxact = TwoPhaseState->freeGXacts;
-       TwoPhaseState->freeGXacts = (GlobalTransaction) gxact->proc.links.next;
+       TwoPhaseState->freeGXacts = gxact->next;
+
+       proc = &ProcGlobal->allProcs[gxact->pgprocno];
+       pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
 
-       /* Initialize it */
-       MemSet(&gxact->proc, 0, sizeof(PGPROC));
-       SHMQueueElemInit(&(gxact->proc.links));
-       gxact->proc.waitStatus = STATUS_OK;
+       /* Initialize the PGPROC entry */
+       MemSet(proc, 0, sizeof(PGPROC));
+       proc->pgprocno = gxact->pgprocno;
+       SHMQueueElemInit(&(proc->links));
+       proc->waitStatus = STATUS_OK;
        /* We set up the gxact's VXID as InvalidBackendId/XID */
-       gxact->proc.lxid = (LocalTransactionId) xid;
-       gxact->proc.xid = xid;
-       gxact->proc.xmin = InvalidTransactionId;
-       gxact->proc.pid = 0;
-       gxact->proc.backendId = InvalidBackendId;
-       gxact->proc.databaseId = databaseid;
-       gxact->proc.roleId = owner;
-       gxact->proc.inCommit = false;
-       gxact->proc.vacuumFlags = 0;
-       gxact->proc.lwWaiting = false;
-       gxact->proc.lwExclusive = false;
-       gxact->proc.lwWaitLink = NULL;
-       gxact->proc.waitLock = NULL;
-       gxact->proc.waitProcLock = NULL;
+       proc->lxid = (LocalTransactionId) xid;
+       pgxact->xid = xid;
+       pgxact->xmin = InvalidTransactionId;
+       pgxact->delayChkpt = false;
+       pgxact->vacuumFlags = 0;
+       proc->pid = 0;
+       proc->backendId = InvalidBackendId;
+       proc->databaseId = databaseid;
+       proc->roleId = owner;
+       proc->lwWaiting = false;
+       proc->lwWaitMode = 0;
+       proc->lwWaitLink = NULL;
+       proc->waitLock = NULL;
+       proc->waitProcLock = NULL;
        for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
-               SHMQueueInit(&(gxact->proc.myProcLocks[i]));
+               SHMQueueInit(&(proc->myProcLocks[i]));
        /* subxid data must be filled later by GXactLoadSubxactData */
-       gxact->proc.subxids.overflowed = false;
-       gxact->proc.subxids.nxids = 0;
+       pgxact->overflowed = false;
+       pgxact->nxids = 0;
 
        gxact->prepared_at = prepared_at;
        /* initialize LSN to 0 (start of WAL) */
-       gxact->prepare_lsn.xlogid = 0;
-       gxact->prepare_lsn.xrecoff = 0;
+       gxact->prepare_lsn = 0;
        gxact->owner = owner;
        gxact->locking_xid = xid;
        gxact->valid = false;
@@ -354,17 +364,20 @@ static void
 GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
                                         TransactionId *children)
 {
+       PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
+       PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+
        /* We need no extra lock since the GXACT isn't valid yet */
        if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
        {
-               gxact->proc.subxids.overflowed = true;
+               pgxact->overflowed = true;
                nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
        }
        if (nsubxacts > 0)
        {
-               memcpy(gxact->proc.subxids.xids, children,
+               memcpy(proc->subxids.xids, children,
                           nsubxacts * sizeof(TransactionId));
-               gxact->proc.subxids.nxids = nsubxacts;
+               pgxact->nxids = nsubxacts;
        }
 }
 
@@ -385,7 +398,7 @@ MarkAsPrepared(GlobalTransaction gxact)
         * Put it into the global ProcArray so TransactionIdIsInProgress considers
         * the XID as still running.
         */
-       ProcArrayAdd(&gxact->proc);
+       ProcArrayAdd(&ProcGlobal->allProcs[gxact->pgprocno]);
 }
 
 /*
@@ -402,6 +415,7 @@ LockGXact(const char *gid, Oid user)
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
        {
                GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+               PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
 
                /* Ignore not-yet-valid GIDs */
                if (!gxact->valid)
@@ -429,10 +443,10 @@ LockGXact(const char *gid, Oid user)
                /*
                 * Note: it probably would be possible to allow committing from
                 * another database; but at the moment NOTIFY is known not to work and
-                * there may be some other issues as well.      Hence disallow until
+                * there may be some other issues as well.  Hence disallow until
                 * someone gets motivated to make it work.
                 */
-               if (MyDatabaseId != gxact->proc.databaseId)
+               if (MyDatabaseId != proc->databaseId)
                        ereport(ERROR,
                                        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                  errmsg("prepared transaction belongs to another database"),
@@ -479,7 +493,7 @@ RemoveGXact(GlobalTransaction gxact)
                        TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
 
                        /* and put it back in the freelist */
-                       gxact->proc.links.next = (SHM_QUEUE *) TwoPhaseState->freeGXacts;
+                       gxact->next = TwoPhaseState->freeGXacts;
                        TwoPhaseState->freeGXacts = gxact;
 
                        LWLockRelease(TwoPhaseStateLock);
@@ -514,8 +528,9 @@ TransactionIdIsPrepared(TransactionId xid)
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
        {
                GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+               PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
 
-               if (gxact->valid && gxact->proc.xid == xid)
+               if (gxact->valid && pgxact->xid == xid)
                {
                        result = true;
                        break;
@@ -638,6 +653,8 @@ pg_prepared_xact(PG_FUNCTION_ARGS)
        while (status->array != NULL && status->currIdx < status->ngxacts)
        {
                GlobalTransaction gxact = &status->array[status->currIdx++];
+               PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
+               PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
                Datum           values[5];
                bool            nulls[5];
                HeapTuple       tuple;
@@ -652,11 +669,11 @@ pg_prepared_xact(PG_FUNCTION_ARGS)
                MemSet(values, 0, sizeof(values));
                MemSet(nulls, 0, sizeof(nulls));
 
-               values[0] = TransactionIdGetDatum(gxact->proc.xid);
+               values[0] = TransactionIdGetDatum(pgxact->xid);
                values[1] = CStringGetTextDatum(gxact->gid);
                values[2] = TimestampTzGetDatum(gxact->prepared_at);
                values[3] = ObjectIdGetDatum(gxact->owner);
-               values[4] = ObjectIdGetDatum(gxact->proc.databaseId);
+               values[4] = ObjectIdGetDatum(proc->databaseId);
 
                tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
                result = HeapTupleGetDatum(tuple);
@@ -667,50 +684,36 @@ pg_prepared_xact(PG_FUNCTION_ARGS)
 }
 
 /*
- * TwoPhaseGetDummyProc
- *             Get the dummy backend ID for prepared transaction specified by XID
- *
- * Dummy backend IDs are similar to real backend IDs of real backends.
- * They start at MaxBackends + 1, and are unique across all currently active
- * real backends and prepared transactions.
+ * TwoPhaseGetGXact
+ *             Get the GlobalTransaction struct for a prepared transaction
+ *             specified by XID
  */
-BackendId
-TwoPhaseGetDummyBackendId(TransactionId xid)
-{
-       PGPROC     *proc = TwoPhaseGetDummyProc(xid);
-
-       return ((GlobalTransaction) proc)->dummyBackendId;
-}
-
-/*
- * TwoPhaseGetDummyProc
- *             Get the PGPROC that represents a prepared transaction specified by XID
- */
-PGPROC *
-TwoPhaseGetDummyProc(TransactionId xid)
+static GlobalTransaction
+TwoPhaseGetGXact(TransactionId xid)
 {
-       PGPROC     *result = NULL;
+       GlobalTransaction result = NULL;
        int                     i;
 
        static TransactionId cached_xid = InvalidTransactionId;
-       static PGPROC *cached_proc = NULL;
+       static GlobalTransaction cached_gxact = NULL;
 
        /*
         * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
         * repeatedly for the same XID.  We can save work with a simple cache.
         */
        if (xid == cached_xid)
-               return cached_proc;
+               return cached_gxact;
 
        LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
 
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
        {
                GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+               PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
 
-               if (gxact->proc.xid == xid)
+               if (pgxact->xid == xid)
                {
-                       result = &gxact->proc;
+                       result = gxact;
                        break;
                }
        }
@@ -718,14 +721,42 @@ TwoPhaseGetDummyProc(TransactionId xid)
        LWLockRelease(TwoPhaseStateLock);
 
        if (result == NULL)                     /* should not happen */
-               elog(ERROR, "failed to find dummy PGPROC for xid %u", xid);
+               elog(ERROR, "failed to find GlobalTransaction for xid %u", xid);
 
        cached_xid = xid;
-       cached_proc = result;
+       cached_gxact = result;
 
        return result;
 }
 
+/*
+ * TwoPhaseGetDummyProc
+ *             Get the dummy backend ID for prepared transaction specified by XID
+ *
+ * Dummy backend IDs are similar to real backend IDs of real backends.
+ * They start at MaxBackends + 1, and are unique across all currently active
+ * real backends and prepared transactions.
+ */
+BackendId
+TwoPhaseGetDummyBackendId(TransactionId xid)
+{
+       GlobalTransaction gxact = TwoPhaseGetGXact(xid);
+
+       return gxact->dummyBackendId;
+}
+
+/*
+ * TwoPhaseGetDummyProc
+ *             Get the PGPROC that represents a prepared transaction specified by XID
+ */
+PGPROC *
+TwoPhaseGetDummyProc(TransactionId xid)
+{
+       GlobalTransaction gxact = TwoPhaseGetGXact(xid);
+
+       return &ProcGlobal->allProcs[gxact->pgprocno];
+}
+
 /************************************************************************/
 /* State file support                                                                                                  */
 /************************************************************************/
@@ -837,7 +868,9 @@ save_state_data(const void *data, uint32 len)
 void
 StartPrepare(GlobalTransaction gxact)
 {
-       TransactionId xid = gxact->proc.xid;
+       PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
+       PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+       TransactionId xid = pgxact->xid;
        TwoPhaseFileHeader hdr;
        TransactionId *children;
        RelFileNode *commitrels;
@@ -861,12 +894,12 @@ StartPrepare(GlobalTransaction gxact)
        hdr.magic = TWOPHASE_MAGIC;
        hdr.total_len = 0;                      /* EndPrepare will fill this in */
        hdr.xid = xid;
-       hdr.database = gxact->proc.databaseId;
+       hdr.database = proc->databaseId;
        hdr.prepared_at = gxact->prepared_at;
        hdr.owner = gxact->owner;
        hdr.nsubxacts = xactGetCommittedChildren(&children);
-       hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels, NULL);
-       hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels, NULL);
+       hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
+       hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
        hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
                                                                                                                  &hdr.initfileinval);
        StrNCpy(hdr.gid, gxact->gid, GIDSIZE);
@@ -909,7 +942,8 @@ StartPrepare(GlobalTransaction gxact)
 void
 EndPrepare(GlobalTransaction gxact)
 {
-       TransactionId xid = gxact->proc.xid;
+       PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+       TransactionId xid = pgxact->xid;
        TwoPhaseFileHeader *hdr;
        char            path[MAXPGPATH];
        XLogRecData *record;
@@ -937,17 +971,12 @@ EndPrepare(GlobalTransaction gxact)
 
        /*
         * Create the 2PC state file.
-        *
-        * Note: because we use BasicOpenFile(), we are responsible for ensuring
-        * the FD gets closed in any error exit path.  Once we get into the
-        * critical section, though, it doesn't matter since any failure causes
-        * PANIC anyway.
         */
        TwoPhaseFilePath(path, xid);
 
-       fd = BasicOpenFile(path,
-                                          O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
-                                          S_IRUSR | S_IWUSR);
+       fd = OpenTransientFile(path,
+                                                  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
+                                                  S_IRUSR | S_IWUSR);
        if (fd < 0)
                ereport(ERROR,
                                (errcode_for_file_access(),
@@ -962,7 +991,7 @@ EndPrepare(GlobalTransaction gxact)
                COMP_CRC32(statefile_crc, record->data, record->len);
                if ((write(fd, record->data, record->len)) != record->len)
                {
-                       close(fd);
+                       CloseTransientFile(fd);
                        ereport(ERROR,
                                        (errcode_for_file_access(),
                                         errmsg("could not write two-phase state file: %m")));
@@ -979,7 +1008,7 @@ EndPrepare(GlobalTransaction gxact)
 
        if ((write(fd, &bogus_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
        {
-               close(fd);
+               CloseTransientFile(fd);
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("could not write two-phase state file: %m")));
@@ -988,7 +1017,7 @@ EndPrepare(GlobalTransaction gxact)
        /* Back up to prepare for rewriting the CRC */
        if (lseek(fd, -((off_t) sizeof(pg_crc32)), SEEK_CUR) < 0)
        {
-               close(fd);
+               CloseTransientFile(fd);
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("could not seek in two-phase state file: %m")));
@@ -1002,22 +1031,22 @@ EndPrepare(GlobalTransaction gxact)
         * out the correct state file CRC, we have an inconsistency: the xact is
         * prepared according to WAL but not according to our on-disk state. We
         * use a critical section to force a PANIC if we are unable to complete
-        * the write --- then, WAL replay should repair the inconsistency.      The
+        * the write --- then, WAL replay should repair the inconsistency.  The
         * odds of a PANIC actually occurring should be very tiny given that we
         * were able to write the bogus CRC above.
         *
-        * We have to set inCommit here, too; otherwise a checkpoint starting
+        * We have to set delayChkpt here, too; otherwise a checkpoint starting
         * immediately after the WAL record is inserted could complete without
         * fsync'ing our state file.  (This is essentially the same kind of race
         * condition as the COMMIT-to-clog-write case that RecordTransactionCommit
-        * uses inCommit for; see notes there.)
+        * uses delayChkpt for; see notes there.)
         *
         * We save the PREPARE record's location in the gxact for later use by
         * CheckPointTwoPhase.
         */
        START_CRIT_SECTION();
 
-       MyProc->inCommit = true;
+       MyPgXact->delayChkpt = true;
 
        gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
                                                                        records.head);
@@ -1028,24 +1057,24 @@ EndPrepare(GlobalTransaction gxact)
        /* write correct CRC and close file */
        if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
        {
-               close(fd);
+               CloseTransientFile(fd);
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("could not write two-phase state file: %m")));
        }
 
-       if (close(fd) != 0)
+       if (CloseTransientFile(fd) != 0)
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("could not close two-phase state file: %m")));
 
        /*
-        * Mark the prepared transaction as valid.      As soon as xact.c marks MyProc
-        * as not running our XID (which it will do immediately after this
-        * function returns), others can commit/rollback the xact.
+        * Mark the prepared transaction as valid.  As soon as xact.c marks
+        * MyPgXact as not running our XID (which it will do immediately after
+        * this function returns), others can commit/rollback the xact.
         *
         * NB: a side effect of this is to make a dummy ProcArray entry for the
-        * prepared XID.  This must happen before we clear the XID from MyProc,
+        * prepared XID.  This must happen before we clear the XID from MyPgXact,
         * else there is a window where the XID is not running according to
         * TransactionIdIsInProgress, and onlookers would be entitled to assume
         * the xact crashed.  Instead we have a window where the same XID appears
@@ -1058,10 +1087,18 @@ EndPrepare(GlobalTransaction gxact)
         * checkpoint starting after this will certainly see the gxact as a
         * candidate for fsyncing.
         */
-       MyProc->inCommit = false;
+       MyPgXact->delayChkpt = false;
 
        END_CRIT_SECTION();
 
+       /*
+        * Wait for synchronous replication, if required.
+        *
+        * Note that at this stage we have marked the prepare, but still show as
+        * running in the procarray (twice!) and continue to hold locks.
+        */
+       SyncRepWaitForLSN(gxact->prepare_lsn);
+
        records.tail = records.head = NULL;
 }
 
@@ -1103,7 +1140,7 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
 
        TwoPhaseFilePath(path, xid);
 
-       fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
+       fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
        if (fd < 0)
        {
                if (give_warnings)
@@ -1122,7 +1159,7 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
         */
        if (fstat(fd, &stat))
        {
-               close(fd);
+               CloseTransientFile(fd);
                if (give_warnings)
                        ereport(WARNING,
                                        (errcode_for_file_access(),
@@ -1136,14 +1173,14 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
                                                sizeof(pg_crc32)) ||
                stat.st_size > MaxAllocSize)
        {
-               close(fd);
+               CloseTransientFile(fd);
                return NULL;
        }
 
        crc_offset = stat.st_size - sizeof(pg_crc32);
        if (crc_offset != MAXALIGN(crc_offset))
        {
-               close(fd);
+               CloseTransientFile(fd);
                return NULL;
        }
 
@@ -1154,7 +1191,7 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
 
        if (read(fd, buf, stat.st_size) != stat.st_size)
        {
-               close(fd);
+               CloseTransientFile(fd);
                if (give_warnings)
                        ereport(WARNING,
                                        (errcode_for_file_access(),
@@ -1164,7 +1201,7 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
                return NULL;
        }
 
-       close(fd);
+       CloseTransientFile(fd);
 
        hdr = (TwoPhaseFileHeader *) buf;
        if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size)
@@ -1200,6 +1237,9 @@ StandbyTransactionIdIsPrepared(TransactionId xid)
 
        Assert(TransactionIdIsValid(xid));
 
+       if (max_prepared_xacts <= 0)
+               return false;                   /* nothing to do */
+
        /* Read and validate file */
        buf = ReadTwoPhaseFile(xid, false);
        if (buf == NULL)
@@ -1220,6 +1260,8 @@ void
 FinishPreparedTransaction(const char *gid, bool isCommit)
 {
        GlobalTransaction gxact;
+       PGPROC     *proc;
+       PGXACT     *pgxact;
        TransactionId xid;
        char       *buf;
        char       *bufptr;
@@ -1238,7 +1280,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
         * try to commit the same GID at once.
         */
        gxact = LockGXact(gid, GetUserId());
-       xid = gxact->proc.xid;
+       proc = &ProcGlobal->allProcs[gxact->pgprocno];
+       pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+       xid = pgxact->xid;
 
        /*
         * Read and validate the state file
@@ -1287,12 +1331,12 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
                                                                           hdr->nsubxacts, children,
                                                                           hdr->nabortrels, abortrels);
 
-       ProcArrayRemove(&gxact->proc, latestXid);
+       ProcArrayRemove(proc, latestXid);
 
        /*
         * In case we fail while running the callbacks, mark the gxact invalid so
         * no one else will try to commit/rollback, and so it can be recycled
-        * properly later.      It is still locked by our XID so it won't go away yet.
+        * properly later.  It is still locked by our XID so it won't go away yet.
         *
         * (We assume it's safe to do this without taking TwoPhaseStateLock.)
         */
@@ -1317,14 +1361,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
        }
        for (i = 0; i < ndelrels; i++)
        {
-               SMgrRelation srel = smgropen(delrels[i]);
-               ForkNumber      fork;
+               SMgrRelation srel = smgropen(delrels[i], InvalidBackendId);
 
-               for (fork = 0; fork <= MAX_FORKNUM; fork++)
-               {
-                       if (smgrexists(srel, fork))
-                               smgrdounlink(srel, fork, false, false);
-               }
+               smgrdounlink(srel, false);
                smgrclose(srel);
        }
 
@@ -1335,10 +1374,10 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
         * after we send the SI messages. See AtEOXact_Inval()
         */
        if (hdr->initfileinval)
-               RelationCacheInitFileInvalidate(true);
+               RelationCacheInitFilePreInvalidate();
        SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
        if (hdr->initfileinval)
-               RelationCacheInitFileInvalidate(false);
+               RelationCacheInitFilePostInvalidate();
 
        /* And now do the callbacks */
        if (isCommit)
@@ -1346,6 +1385,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
        else
                ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
 
+       PredicateLockTwoPhaseFinish(xid, isCommit);
+
        /* Count the prepared xact as committed or aborted */
        AtEOXact_PgStat(isCommit);
 
@@ -1424,9 +1465,9 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
 
        TwoPhaseFilePath(path, xid);
 
-       fd = BasicOpenFile(path,
-                                          O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY,
-                                          S_IRUSR | S_IWUSR);
+       fd = OpenTransientFile(path,
+                                                  O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY,
+                                                  S_IRUSR | S_IWUSR);
        if (fd < 0)
                ereport(ERROR,
                                (errcode_for_file_access(),
@@ -1436,14 +1477,14 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
        /* Write content and CRC */
        if (write(fd, content, len) != len)
        {
-               close(fd);
+               CloseTransientFile(fd);
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("could not write two-phase state file: %m")));
        }
        if (write(fd, &statefile_crc, sizeof(pg_crc32)) != sizeof(pg_crc32))
        {
-               close(fd);
+               CloseTransientFile(fd);
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("could not write two-phase state file: %m")));
@@ -1455,13 +1496,13 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
         */
        if (pg_fsync(fd) != 0)
        {
-               close(fd);
+               CloseTransientFile(fd);
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("could not fsync two-phase state file: %m")));
        }
 
-       if (close(fd) != 0)
+       if (CloseTransientFile(fd) != 0)
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("could not close two-phase state file: %m")));
@@ -1499,7 +1540,7 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
         *
         * This approach creates a race condition: someone else could delete a
         * GXACT between the time we release TwoPhaseStateLock and the time we try
-        * to open its state file.      We handle this by special-casing ENOENT
+        * to open its state file.  We handle this by special-casing ENOENT
         * failures: if we see that, we verify that the GXACT is no longer valid,
         * and if so ignore the failure.
         */
@@ -1516,10 +1557,11 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
        {
                GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+               PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
 
                if (gxact->valid &&
-                       XLByteLE(gxact->prepare_lsn, redo_horizon))
-                       xids[nxids++] = gxact->proc.xid;
+                       gxact->prepare_lsn <= redo_horizon)
+                       xids[nxids++] = pgxact->xid;
        }
 
        LWLockRelease(TwoPhaseStateLock);
@@ -1531,7 +1573,7 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
 
                TwoPhaseFilePath(path, xid);
 
-               fd = BasicOpenFile(path, O_RDWR | PG_BINARY, 0);
+               fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
                if (fd < 0)
                {
                        if (errno == ENOENT)
@@ -1550,14 +1592,14 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
 
                if (pg_fsync(fd) != 0)
                {
-                       close(fd);
+                       CloseTransientFile(fd);
                        ereport(ERROR,
                                        (errcode_for_file_access(),
                                         errmsg("could not fsync two-phase state file \"%s\": %m",
                                                        path)));
                }
 
-               if (close(fd) != 0)
+               if (CloseTransientFile(fd) != 0)
                        ereport(ERROR,
                                        (errcode_for_file_access(),
                                         errmsg("could not close two-phase state file \"%s\": %m",
@@ -1579,7 +1621,7 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
  *
  * We throw away any prepared xacts with main XID beyond nextXid --- if any
  * are present, it suggests that the DBA has done a PITR recovery to an
- * earlier point in time without cleaning out pg_twophase.     We dare not
+ * earlier point in time without cleaning out pg_twophase.  We dare not
  * try to recover such prepared xacts since they likely depend on database
  * state that doesn't exist now.
  *
@@ -1669,6 +1711,10 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
                        /*
                         * Examine subtransaction XIDs ... they should all follow main
                         * XID, and they may force us to advance nextXid.
+                        *
+                        * We don't expect anyone else to modify nextXid, hence we don't
+                        * need to hold a lock while examining it.  We still acquire the
+                        * lock to modify it, though.
                         */
                        subxids = (TransactionId *)
                                (buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
@@ -1680,8 +1726,10 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
                                if (TransactionIdFollowsOrEquals(subxid,
                                                                                                 ShmemVariableCache->nextXid))
                                {
+                                       LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
                                        ShmemVariableCache->nextXid = subxid;
                                        TransactionIdAdvance(ShmemVariableCache->nextXid);
+                                       LWLockRelease(XidGenLock);
                                }
                        }
 
@@ -1873,7 +1921,8 @@ RecoverPreparedTransactions(void)
                         * the prepared transaction generated xid assignment records. Test
                         * here must match one used in AssignTransactionId().
                         */
-                       if (InHotStandby && hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS)
+                       if (InHotStandby && (hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS ||
+                                                                XLogLogicalInfoActive()))
                                overwriteOK = true;
 
                        /*
@@ -1925,7 +1974,7 @@ RecoverPreparedTransactions(void)
  *     RecordTransactionCommitPrepared
  *
  * This is basically the same as RecordTransactionCommit: in particular,
- * we must set the inCommit flag to avoid a race condition.
+ * we must set the delayChkpt flag to avoid a race condition.
  *
  * We know the transaction made at least one XLOG entry (its PREPARE),
  * so it is never possible to optimize out the commit record.
@@ -1948,7 +1997,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
        START_CRIT_SECTION();
 
        /* See notes in RecordTransactionCommit */
-       MyProc->inCommit = true;
+       MyPgXact->delayChkpt = true;
 
        /* Emit the XLOG commit record */
        xlrec.xid = xid;
@@ -2006,9 +2055,17 @@ RecordTransactionCommitPrepared(TransactionId xid,
        TransactionIdCommitTree(xid, nchildren, children);
 
        /* Checkpoint can proceed now */
-       MyProc->inCommit = false;
+       MyPgXact->delayChkpt = false;
 
        END_CRIT_SECTION();
+
+       /*
+        * Wait for synchronous replication, if required.
+        *
+        * Note that at this stage we have marked clog, but still show as running
+        * in the procarray and continue to hold locks.
+        */
+       SyncRepWaitForLSN(recptr);
 }
 
 /*
@@ -2081,4 +2138,12 @@ RecordTransactionAbortPrepared(TransactionId xid,
        TransactionIdAbortTree(xid, nchildren, children);
 
        END_CRIT_SECTION();
+
+       /*
+        * Wait for synchronous replication, if required.
+        *
+        * Note that at this stage we have marked clog, but still show as running
+        * in the procarray and continue to hold locks.
+        */
+       SyncRepWaitForLSN(recptr);
 }