]> granicus.if.org Git - postgresql/blobdiff - src/backend/access/transam/twophase.c
pgindent run for 9.4
[postgresql] / src / backend / access / transam / twophase.c
index 30bc21b5244971a417f8543ec0643db176183f73..70ca6ab67d1ef8a25f7f8bf9515a116239aa8fb8 100644 (file)
@@ -3,11 +3,11 @@
  * twophase.c
  *             Two-phase commit support functions.
  *
- * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *             $PostgreSQL: pgsql/src/backend/access/transam/twophase.c,v 1.3 2005/06/18 19:33:41 tgl Exp $
+ *             src/backend/access/transam/twophase.c
  *
  * NOTES
  *             Each global transaction is associated with a global transaction
  *             GIDs and aborts the transaction if there already is a global
  *             transaction in prepared state with the same GID.
  *
- *             A global transaction (gxact) also has a dummy PGPROC that is entered
- *             into the ProcArray array; this is what keeps the XID considered
- *             running by TransactionIdIsInProgress.  It is also convenient as a
- *             PGPROC to hook the gxact's locks to.
+ *             A global transaction (gxact) also has dummy PGXACT and PGPROC; this is
+ *             what keeps the XID considered running by TransactionIdIsInProgress.
+ *             It is also convenient as a PGPROC to hook the gxact's locks to.
  *
  *             In order to survive crashes and shutdowns, all prepared
  *             transactions must be stored in permanent storage. This includes
  */
 #include "postgres.h"
 
-#include <sys/types.h>
-#include <sys/stat.h>
 #include <fcntl.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <time.h>
 #include <unistd.h>
 
-#include "access/heapam.h"
+#include "access/htup_details.h"
 #include "access/subtrans.h"
+#include "access/transam.h"
 #include "access/twophase.h"
 #include "access/twophase_rmgr.h"
 #include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xlogutils.h"
 #include "catalog/pg_type.h"
+#include "catalog/storage.h"
 #include "funcapi.h"
 #include "miscadmin.h"
+#include "pg_trace.h"
+#include "pgstat.h"
+#include "replication/walsender.h"
+#include "replication/syncrep.h"
 #include "storage/fd.h"
+#include "storage/predicate.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
+#include "storage/sinvaladt.h"
 #include "storage/smgr.h"
 #include "utils/builtins.h"
-#include "pgstat.h"
+#include "utils/memutils.h"
+#include "utils/timestamp.h"
 
 
 /*
 #define TWOPHASE_DIR "pg_twophase"
 
 /* GUC variable, can't be changed after startup */
-int max_prepared_xacts = 50;
+int                    max_prepared_xacts = 0;
 
 /*
  * This struct describes one global transaction that is in prepared state
  * or attempting to become prepared.
  *
- * The first component of the struct is a dummy PGPROC that is inserted
- * into the global ProcArray so that the transaction appears to still be
- * running and holding locks.  It must be first because we cast pointers
- * to PGPROC and pointers to GlobalTransactionData back and forth.
- *
  * The lifecycle of a global transaction is:
  *
  * 1. After checking that the requested GID is not in use, set up an
@@ -81,7 +87,7 @@ int max_prepared_xacts = 50;
  * with locking_xid = my own XID and valid = false.
  *
  * 2. After successfully completing prepare, set valid = true and enter the
- * contained PGPROC into the global ProcArray.
+ * referenced PGPROC into the global ProcArray.
  *
  * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry
  * is valid and its locking_xid is no longer active, then store my current
@@ -96,20 +102,23 @@ int max_prepared_xacts = 50;
  * entry will remain in prepXacts until recycled.  We can detect recyclable
  * entries by checking for valid = false and locking_xid no longer active.
  *
- * typedef struct GlobalTransactionData *GlobalTransaction appears in 
+ * typedef struct GlobalTransactionData *GlobalTransaction appears in
  * twophase.h
  */
 #define GIDSIZE 200
 
 typedef struct GlobalTransactionData
 {
-       PGPROC          proc;                   /* dummy proc */
-       TimestampTz     prepared_at;    /* time of preparation */
-       AclId           owner;                  /* ID of user that executed the xact */
+       GlobalTransaction next;         /* list link for free list */
+       int                     pgprocno;               /* ID of associated dummy PGPROC */
+       BackendId       dummyBackendId; /* similar to backend id for backends */
+       TimestampTz prepared_at;        /* time of preparation */
+       XLogRecPtr      prepare_lsn;    /* XLOG offset of prepare record */
+       Oid                     owner;                  /* ID of user that executed the xact */
        TransactionId locking_xid;      /* top-level XID of backend working on xact */
        bool            valid;                  /* TRUE if fully prepared */
-       char gid[GIDSIZE];                      /* The GID assigned to the prepared xact */
-} GlobalTransactionData;
+       char            gid[GIDSIZE];   /* The GID assigned to the prepared xact */
+}      GlobalTransactionData;
 
 /*
  * Two Phase Commit shared state.  Access to this struct is protected
@@ -118,51 +127,61 @@ typedef struct GlobalTransactionData
 typedef struct TwoPhaseStateData
 {
        /* Head of linked list of free GlobalTransactionData structs */
-       SHMEM_OFFSET freeGXacts;
+       GlobalTransaction freeGXacts;
 
        /* Number of valid prepXacts entries. */
-       int             numPrepXacts;
+       int                     numPrepXacts;
 
        /*
         * There are max_prepared_xacts items in this array, but C wants a
         * fixed-size array.
         */
-       GlobalTransaction       prepXacts[1]; /* VARIABLE LENGTH ARRAY */
+       GlobalTransaction prepXacts[1];         /* VARIABLE LENGTH ARRAY */
 } TwoPhaseStateData;                   /* VARIABLE LENGTH STRUCT */
 
 static TwoPhaseStateData *TwoPhaseState;
 
 
 static void RecordTransactionCommitPrepared(TransactionId xid,
-                                                                                       int nchildren,
-                                                                                       TransactionId *children,
-                                                                                       int nrels,
-                                                                                       RelFileNode *rels);
+                                                               int nchildren,
+                                                               TransactionId *children,
+                                                               int nrels,
+                                                               RelFileNode *rels,
+                                                               int ninvalmsgs,
+                                                               SharedInvalidationMessage *invalmsgs,
+                                                               bool initfileinval);
 static void RecordTransactionAbortPrepared(TransactionId xid,
-                                                                                       int nchildren,
-                                                                                       TransactionId *children,
-                                                                                       int nrels,
-                                                                                       RelFileNode *rels);
+                                                          int nchildren,
+                                                          TransactionId *children,
+                                                          int nrels,
+                                                          RelFileNode *rels);
 static void ProcessRecords(char *bufptr, TransactionId xid,
-                                                  const TwoPhaseCallback callbacks[]);
+                          const TwoPhaseCallback callbacks[]);
 
 
 /*
  * Initialization of shared memory
  */
-int
+Size
 TwoPhaseShmemSize(void)
 {
+       Size            size;
+
        /* Need the fixed struct, the array of pointers, and the GTD structs */
-       return MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) + 
-                                       sizeof(GlobalTransaction) * max_prepared_xacts) +
-               sizeof(GlobalTransactionData) * max_prepared_xacts;
+       size = offsetof(TwoPhaseStateData, prepXacts);
+       size = add_size(size, mul_size(max_prepared_xacts,
+                                                                  sizeof(GlobalTransaction)));
+       size = MAXALIGN(size);
+       size = add_size(size, mul_size(max_prepared_xacts,
+                                                                  sizeof(GlobalTransactionData)));
+
+       return size;
 }
 
 void
 TwoPhaseShmemInit(void)
 {
-       bool found;
+       bool            found;
 
        TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
                                                                        TwoPhaseShmemSize(),
@@ -173,7 +192,7 @@ TwoPhaseShmemInit(void)
                int                     i;
 
                Assert(!found);
-               TwoPhaseState->freeGXacts = INVALID_OFFSET;
+               TwoPhaseState->freeGXacts = NULL;
                TwoPhaseState->numPrepXacts = 0;
 
                /*
@@ -181,12 +200,30 @@ TwoPhaseShmemInit(void)
                 */
                gxacts = (GlobalTransaction)
                        ((char *) TwoPhaseState +
-                        MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) + 
+                        MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
                                          sizeof(GlobalTransaction) * max_prepared_xacts));
                for (i = 0; i < max_prepared_xacts; i++)
                {
-                       gxacts[i].proc.links.next = TwoPhaseState->freeGXacts;
-                       TwoPhaseState->freeGXacts = MAKE_OFFSET(&gxacts[i]);
+                       /* insert into linked list */
+                       gxacts[i].next = TwoPhaseState->freeGXacts;
+                       TwoPhaseState->freeGXacts = &gxacts[i];
+
+                       /* associate it with a PGPROC assigned by InitProcGlobal */
+                       gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno;
+
+                       /*
+                        * Assign a unique ID for each dummy proc, so that the range of
+                        * dummy backend IDs immediately follows the range of normal
+                        * backend IDs. We don't dare to assign a real backend ID to dummy
+                        * procs, because prepared transactions don't take part in cache
+                        * invalidation like a real backend ID would imply, but having a
+                        * unique ID for them is nevertheless handy. This arrangement
+                        * allows you to allocate an array of size (MaxBackends +
+                        * max_prepared_xacts + 1), and have a slot for every backend and
+                        * prepared transaction. Currently multixact.c uses that
+                        * technique.
+                        */
+                       gxacts[i].dummyBackendId = MaxBackends + 1 + i;
                }
        }
        else
@@ -196,7 +233,7 @@ TwoPhaseShmemInit(void)
 
 /*
  * MarkAsPreparing
- *             Reserve the GID for the given transaction.
+ *             Reserve the GID for the given transaction.
  *
  * Internally, this creates a gxact struct and puts it into the active array.
  * NOTE: this is also used when reloading a gxact after a crash; so avoid
@@ -204,24 +241,32 @@ TwoPhaseShmemInit(void)
  */
 GlobalTransaction
 MarkAsPreparing(TransactionId xid, const char *gid,
-                               TimestampTz prepared_at, AclId owner, Oid databaseid)
+                               TimestampTz prepared_at, Oid owner, Oid databaseid)
 {
-       GlobalTransaction       gxact;
-       int i;
+       GlobalTransaction gxact;
+       PGPROC     *proc;
+       PGXACT     *pgxact;
+       int                     i;
 
        if (strlen(gid) >= GIDSIZE)
                ereport(ERROR,
                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-                                errmsg("global transaction identifier \"%s\" is too long",
+                                errmsg("transaction identifier \"%s\" is too long",
                                                gid)));
 
+       /* fail immediately if feature is disabled */
+       if (max_prepared_xacts == 0)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("prepared transactions are disabled"),
+                         errhint("Set max_prepared_transactions to a nonzero value.")));
+
        LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 
        /*
-        * First, find and recycle any gxacts that failed during prepare.
-        * We do this partly to ensure we don't mistakenly say their GIDs
-        * are still reserved, and partly so we don't fail on out-of-slots
-        * unnecessarily.
+        * First, find and recycle any gxacts that failed during prepare. We do
+        * this partly to ensure we don't mistakenly say their GIDs are still
+        * reserved, and partly so we don't fail on out-of-slots unnecessarily.
         */
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
        {
@@ -232,8 +277,8 @@ MarkAsPreparing(TransactionId xid, const char *gid,
                        TwoPhaseState->numPrepXacts--;
                        TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
                        /* and put it back in the freelist */
-                       gxact->proc.links.next = TwoPhaseState->freeGXacts;
-                       TwoPhaseState->freeGXacts = MAKE_OFFSET(gxact);
+                       gxact->next = TwoPhaseState->freeGXacts;
+                       TwoPhaseState->freeGXacts = gxact;
                        /* Back up index count too, so we don't miss scanning one */
                        i--;
                }
@@ -247,40 +292,53 @@ MarkAsPreparing(TransactionId xid, const char *gid,
                {
                        ereport(ERROR,
                                        (errcode(ERRCODE_DUPLICATE_OBJECT),
-                                        errmsg("global transaction identifier \"%s\" is already in use",
+                                        errmsg("transaction identifier \"%s\" is already in use",
                                                        gid)));
                }
        }
 
        /* Get a free gxact from the freelist */
-       if (TwoPhaseState->freeGXacts == INVALID_OFFSET)
+       if (TwoPhaseState->freeGXacts == NULL)
                ereport(ERROR,
                                (errcode(ERRCODE_OUT_OF_MEMORY),
                                 errmsg("maximum number of prepared transactions reached"),
                                 errhint("Increase max_prepared_transactions (currently %d).",
                                                 max_prepared_xacts)));
-       gxact = (GlobalTransaction) MAKE_PTR(TwoPhaseState->freeGXacts);
-       TwoPhaseState->freeGXacts = gxact->proc.links.next;
-
-       /* Initialize it */
-       MemSet(&gxact->proc, 0, sizeof(PGPROC));
-       SHMQueueElemInit(&(gxact->proc.links));
-       gxact->proc.waitStatus = STATUS_OK;
-       gxact->proc.xid = xid;
-       gxact->proc.xmin = InvalidTransactionId;
-       gxact->proc.pid = 0;
-       gxact->proc.databaseId = databaseid;
-       gxact->proc.lwWaiting = false;
-       gxact->proc.lwExclusive = false;
-       gxact->proc.lwWaitLink = NULL;
-       gxact->proc.waitLock = NULL;
-       gxact->proc.waitProcLock = NULL;
-       SHMQueueInit(&(gxact->proc.procLocks));
+       gxact = TwoPhaseState->freeGXacts;
+       TwoPhaseState->freeGXacts = gxact->next;
+
+       proc = &ProcGlobal->allProcs[gxact->pgprocno];
+       pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+
+       /* Initialize the PGPROC entry */
+       MemSet(proc, 0, sizeof(PGPROC));
+       proc->pgprocno = gxact->pgprocno;
+       SHMQueueElemInit(&(proc->links));
+       proc->waitStatus = STATUS_OK;
+       /* We set up the gxact's VXID as InvalidBackendId/XID */
+       proc->lxid = (LocalTransactionId) xid;
+       pgxact->xid = xid;
+       pgxact->xmin = InvalidTransactionId;
+       pgxact->delayChkpt = false;
+       pgxact->vacuumFlags = 0;
+       proc->pid = 0;
+       proc->backendId = InvalidBackendId;
+       proc->databaseId = databaseid;
+       proc->roleId = owner;
+       proc->lwWaiting = false;
+       proc->lwWaitMode = 0;
+       proc->lwWaitLink = NULL;
+       proc->waitLock = NULL;
+       proc->waitProcLock = NULL;
+       for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
+               SHMQueueInit(&(proc->myProcLocks[i]));
        /* subxid data must be filled later by GXactLoadSubxactData */
-       gxact->proc.subxids.overflowed = false;
-       gxact->proc.subxids.nxids = 0;
+       pgxact->overflowed = false;
+       pgxact->nxids = 0;
 
        gxact->prepared_at = prepared_at;
+       /* initialize LSN to 0 (start of WAL) */
+       gxact->prepare_lsn = 0;
        gxact->owner = owner;
        gxact->locking_xid = xid;
        gxact->valid = false;
@@ -306,17 +364,20 @@ static void
 GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
                                         TransactionId *children)
 {
+       PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
+       PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+
        /* We need no extra lock since the GXACT isn't valid yet */
        if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
        {
-               gxact->proc.subxids.overflowed = true;
+               pgxact->overflowed = true;
                nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
        }
        if (nsubxacts > 0)
        {
-               memcpy(gxact->proc.subxids.xids, children,
+               memcpy(proc->subxids.xids, children,
                           nsubxacts * sizeof(TransactionId));
-               gxact->proc.subxids.nxids = nsubxacts;
+               pgxact->nxids = nsubxacts;
        }
 }
 
@@ -324,7 +385,7 @@ GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
  * MarkAsPrepared
  *             Mark the GXACT as fully valid, and enter it into the global ProcArray.
  */
-void
+static void
 MarkAsPrepared(GlobalTransaction gxact)
 {
        /* Lock here may be overkill, but I'm not convinced of that ... */
@@ -334,10 +395,10 @@ MarkAsPrepared(GlobalTransaction gxact)
        LWLockRelease(TwoPhaseStateLock);
 
        /*
-        * Put it into the global ProcArray so TransactionIdInProgress considers
+        * Put it into the global ProcArray so TransactionIdIsInProgress considers
         * the XID as still running.
         */
-       ProcArrayAdd(&gxact->proc);
+       ProcArrayAdd(&ProcGlobal->allProcs[gxact->pgprocno]);
 }
 
 /*
@@ -345,15 +406,16 @@ MarkAsPrepared(GlobalTransaction gxact)
  *             Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
  */
 static GlobalTransaction
-LockGXact(const char *gid, AclId user)
+LockGXact(const char *gid, Oid user)
 {
-       int i;
+       int                     i;
 
        LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
        {
-               GlobalTransaction       gxact = TwoPhaseState->prepXacts[i];
+               GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+               PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
 
                /* Ignore not-yet-valid GIDs */
                if (!gxact->valid)
@@ -367,17 +429,29 @@ LockGXact(const char *gid, AclId user)
                        if (TransactionIdIsActive(gxact->locking_xid))
                                ereport(ERROR,
                                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                                                errmsg("prepared transaction with gid \"%s\" is busy",
-                                                               gid)));
+                               errmsg("prepared transaction with identifier \"%s\" is busy",
+                                          gid)));
                        gxact->locking_xid = InvalidTransactionId;
                }
 
                if (user != gxact->owner && !superuser_arg(user))
                        ereport(ERROR,
                                        (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-                                        errmsg("permission denied to finish prepared transaction"),
+                                 errmsg("permission denied to finish prepared transaction"),
                                         errhint("Must be superuser or the user that prepared the transaction.")));
 
+               /*
+                * Note: it probably would be possible to allow committing from
+                * another database; but at the moment NOTIFY is known not to work and
+                * there may be some other issues as well.  Hence disallow until
+                * someone gets motivated to make it work.
+                */
+               if (MyDatabaseId != proc->databaseId)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                 errmsg("prepared transaction belongs to another database"),
+                                        errhint("Connect to the database where the transaction was prepared to finish it.")));
+
                /* OK for me to lock it */
                gxact->locking_xid = GetTopTransactionId();
 
@@ -390,8 +464,8 @@ LockGXact(const char *gid, AclId user)
 
        ereport(ERROR,
                        (errcode(ERRCODE_UNDEFINED_OBJECT),
-                        errmsg("prepared transaction with gid \"%s\" does not exist",
-                                       gid)));
+                errmsg("prepared transaction with identifier \"%s\" does not exist",
+                               gid)));
 
        /* NOTREACHED */
        return NULL;
@@ -406,7 +480,7 @@ LockGXact(const char *gid, AclId user)
 static void
 RemoveGXact(GlobalTransaction gxact)
 {
-       int i;
+       int                     i;
 
        LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 
@@ -419,8 +493,8 @@ RemoveGXact(GlobalTransaction gxact)
                        TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
 
                        /* and put it back in the freelist */
-                       gxact->proc.links.next = TwoPhaseState->freeGXacts;
-                       TwoPhaseState->freeGXacts = MAKE_OFFSET(gxact);
+                       gxact->next = TwoPhaseState->freeGXacts;
+                       TwoPhaseState->freeGXacts = gxact;
 
                        LWLockRelease(TwoPhaseStateLock);
 
@@ -433,6 +507,41 @@ RemoveGXact(GlobalTransaction gxact)
        elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
 }
 
+/*
+ * TransactionIdIsPrepared
+ *             True iff transaction associated with the identifier is prepared
+ *             for two-phase commit
+ *
+ * Note: only gxacts marked "valid" are considered; but notice we do not
+ * check the locking status.
+ *
+ * This is not currently exported, because it is only needed internally.
+ */
+static bool
+TransactionIdIsPrepared(TransactionId xid)
+{
+       bool            result = false;
+       int                     i;
+
+       LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+
+       for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+       {
+               GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+               PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+
+               if (gxact->valid && pgxact->xid == xid)
+               {
+                       result = true;
+                       break;
+               }
+       }
+
+       LWLockRelease(TwoPhaseStateLock);
+
+       return result;
+}
+
 /*
  * Returns an array of all prepared transactions for the user-level
  * function pg_prepared_xact.
@@ -449,8 +558,8 @@ static int
 GetPreparedTransactionList(GlobalTransaction *gxacts)
 {
        GlobalTransaction array;
-       int             num;
-       int             i;
+       int                     num;
+       int                     i;
 
        LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
 
@@ -479,13 +588,13 @@ GetPreparedTransactionList(GlobalTransaction *gxacts)
 typedef struct
 {
        GlobalTransaction array;
-       int             ngxacts;
-       int             currIdx;
+       int                     ngxacts;
+       int                     currIdx;
 } Working_State;
 
 /*
  * pg_prepared_xact
- *             Produce a view with one row per prepared transaction.
+ *             Produce a view with one row per prepared transaction.
  *
  * This function is here so we don't have to export the
  * GlobalTransactionData struct definition.
@@ -505,8 +614,7 @@ pg_prepared_xact(PG_FUNCTION_ARGS)
                funcctx = SRF_FIRSTCALL_INIT();
 
                /*
-                * Switch to memory context appropriate for multiple function
-                * calls
+                * Switch to memory context appropriate for multiple function calls
                 */
                oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
 
@@ -520,15 +628,15 @@ pg_prepared_xact(PG_FUNCTION_ARGS)
                TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
                                                   TIMESTAMPTZOID, -1, 0);
                TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
-                                                  INT4OID, -1, 0);
+                                                  OIDOID, -1, 0);
                TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
                                                   OIDOID, -1, 0);
 
                funcctx->tuple_desc = BlessTupleDesc(tupdesc);
 
                /*
-                * Collect all the 2PC status information that we will format and
-                * send out as a result set.
+                * Collect all the 2PC status information that we will format and send
+                * out as a result set.
                 */
                status = (Working_State *) palloc(sizeof(Working_State));
                funcctx->user_fctx = (void *) status;
@@ -545,6 +653,8 @@ pg_prepared_xact(PG_FUNCTION_ARGS)
        while (status->array != NULL && status->currIdx < status->ngxacts)
        {
                GlobalTransaction gxact = &status->array[status->currIdx++];
+               PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
+               PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
                Datum           values[5];
                bool            nulls[5];
                HeapTuple       tuple;
@@ -559,11 +669,11 @@ pg_prepared_xact(PG_FUNCTION_ARGS)
                MemSet(values, 0, sizeof(values));
                MemSet(nulls, 0, sizeof(nulls));
 
-               values[0] = TransactionIdGetDatum(gxact->proc.xid);
-               values[1] = DirectFunctionCall1(textin, CStringGetDatum(gxact->gid));
+               values[0] = TransactionIdGetDatum(pgxact->xid);
+               values[1] = CStringGetTextDatum(gxact->gid);
                values[2] = TimestampTzGetDatum(gxact->prepared_at);
-               values[3] = Int32GetDatum(gxact->owner);
-               values[4] = ObjectIdGetDatum(gxact->proc.databaseId);
+               values[3] = ObjectIdGetDatum(gxact->owner);
+               values[4] = ObjectIdGetDatum(proc->databaseId);
 
                tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
                result = HeapTupleGetDatum(tuple);
@@ -574,34 +684,36 @@ pg_prepared_xact(PG_FUNCTION_ARGS)
 }
 
 /*
- * TwoPhaseGetDummyProc
- *             Get the PGPROC that represents a prepared transaction specified by XID
+ * TwoPhaseGetGXact
+ *             Get the GlobalTransaction struct for a prepared transaction
+ *             specified by XID
  */
-PGPROC *
-TwoPhaseGetDummyProc(TransactionId xid)
+static GlobalTransaction
+TwoPhaseGetGXact(TransactionId xid)
 {
-       PGPROC     *result = NULL;
+       GlobalTransaction result = NULL;
        int                     i;
 
        static TransactionId cached_xid = InvalidTransactionId;
-       static PGPROC *cached_proc = NULL;
+       static GlobalTransaction cached_gxact = NULL;
 
        /*
         * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
         * repeatedly for the same XID.  We can save work with a simple cache.
         */
        if (xid == cached_xid)
-               return cached_proc;
+               return cached_gxact;
 
        LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
 
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
        {
-               GlobalTransaction       gxact = TwoPhaseState->prepXacts[i];
+               GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+               PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
 
-               if (gxact->proc.xid == xid)
+               if (pgxact->xid == xid)
                {
-                       result = &gxact->proc;
+                       result = gxact;
                        break;
                }
        }
@@ -609,32 +721,61 @@ TwoPhaseGetDummyProc(TransactionId xid)
        LWLockRelease(TwoPhaseStateLock);
 
        if (result == NULL)                     /* should not happen */
-               elog(ERROR, "failed to find dummy PGPROC for xid %u", xid);
+               elog(ERROR, "failed to find GlobalTransaction for xid %u", xid);
 
        cached_xid = xid;
-       cached_proc = result;
+       cached_gxact = result;
 
        return result;
 }
 
+/*
+ * TwoPhaseGetDummyProc
+ *             Get the dummy backend ID for prepared transaction specified by XID
+ *
+ * Dummy backend IDs are similar to real backend IDs of real backends.
+ * They start at MaxBackends + 1, and are unique across all currently active
+ * real backends and prepared transactions.
+ */
+BackendId
+TwoPhaseGetDummyBackendId(TransactionId xid)
+{
+       GlobalTransaction gxact = TwoPhaseGetGXact(xid);
+
+       return gxact->dummyBackendId;
+}
+
+/*
+ * TwoPhaseGetDummyProc
+ *             Get the PGPROC that represents a prepared transaction specified by XID
+ */
+PGPROC *
+TwoPhaseGetDummyProc(TransactionId xid)
+{
+       GlobalTransaction gxact = TwoPhaseGetGXact(xid);
+
+       return &ProcGlobal->allProcs[gxact->pgprocno];
+}
+
 /************************************************************************/
-/* State file support                                                   */
+/* State file support                                                                                                  */
 /************************************************************************/
 
 #define TwoPhaseFilePath(path, xid) \
-       snprintf(path, MAXPGPATH, "%s/%s/%08X", DataDir, TWOPHASE_DIR, xid)
+       snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)
 
 /*
  * 2PC state file format:
  *
- *  1. TwoPhaseFileHeader
- *  2. TransactionId[] (subtransactions)
+ *     1. TwoPhaseFileHeader
+ *     2. TransactionId[] (subtransactions)
  *     3. RelFileNode[] (files to be deleted at commit)
  *     4. RelFileNode[] (files to be deleted at abort)
- *  5. TwoPhaseRecordOnDisk
- *  6. ...
- *  7. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
- *  8. CRC32
+ *     5. SharedInvalidationMessage[] (inval messages to be sent at commit)
+ *     6. TwoPhaseRecordOnDisk
+ *     7. ...
+ *     8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
+ *     9. CRC32
  *
  * Each segment except the final CRC32 is MAXALIGN'd.
  */
@@ -642,20 +783,22 @@ TwoPhaseGetDummyProc(TransactionId xid)
 /*
  * Header for a 2PC state file
  */
-#define TWOPHASE_MAGIC 0x57F94531              /* format identifier */
+#define TWOPHASE_MAGIC 0x57F94532              /* format identifier */
 
 typedef struct TwoPhaseFileHeader
 {
-       uint32                  magic;                          /* format identifier */
-       uint32                  total_len;                      /* actual file length */
-       TransactionId   xid;                            /* original transaction XID */
-       Oid                             database;                       /* OID of database it was in */
-       TimestampTz             prepared_at;            /* time of preparation */
-       AclId                   owner;                          /* user running the transaction */
-       int32                   nsubxacts;                      /* number of following subxact XIDs */
-       int32                   ncommitrels;            /* number of delete-on-commit rels */
-       int32                   nabortrels;                     /* number of delete-on-abort rels */
-       char                    gid[GIDSIZE];           /* GID for transaction */
+       uint32          magic;                  /* format identifier */
+       uint32          total_len;              /* actual file length */
+       TransactionId xid;                      /* original transaction XID */
+       Oid                     database;               /* OID of database it was in */
+       TimestampTz prepared_at;        /* time of preparation */
+       Oid                     owner;                  /* user running the transaction */
+       int32           nsubxacts;              /* number of following subxact XIDs */
+       int32           ncommitrels;    /* number of delete-on-commit rels */
+       int32           nabortrels;             /* number of delete-on-abort rels */
+       int32           ninvalmsgs;             /* number of cache invalidation messages */
+       bool            initfileinval;  /* does relcache init file need invalidation? */
+       char            gid[GIDSIZE];   /* GID for transaction */
 } TwoPhaseFileHeader;
 
 /*
@@ -666,9 +809,9 @@ typedef struct TwoPhaseFileHeader
  */
 typedef struct TwoPhaseRecordOnDisk
 {
-       uint32                  len;            /* length of rmgr data */
-       TwoPhaseRmgrId  rmid;           /* resource manager for this record */
-       uint16                  info;           /* flag bits for use by rmgr */
+       uint32          len;                    /* length of rmgr data */
+       TwoPhaseRmgrId rmid;            /* resource manager for this record */
+       uint16          info;                   /* flag bits for use by rmgr */
 } TwoPhaseRecordOnDisk;
 
 /*
@@ -681,9 +824,9 @@ static struct xllist
 {
        XLogRecData *head;                      /* first data block in the chain */
        XLogRecData *tail;                      /* last block in chain */
-       uint32 bytes_free;                      /* free bytes left in tail block */
-       uint32 total_len;                       /* total data bytes in chain */
-} records;
+       uint32          bytes_free;             /* free bytes left in tail block */
+       uint32          total_len;              /* total data bytes in chain */
+}      records;
 
 
 /*
@@ -697,7 +840,7 @@ static struct xllist
 static void
 save_state_data(const void *data, uint32 len)
 {
-       uint32  padlen = MAXALIGN(len);
+       uint32          padlen = MAXALIGN(len);
 
        if (padlen > records.bytes_free)
        {
@@ -725,11 +868,14 @@ save_state_data(const void *data, uint32 len)
 void
 StartPrepare(GlobalTransaction gxact)
 {
-       TransactionId   xid = gxact->proc.xid;
+       PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
+       PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+       TransactionId xid = pgxact->xid;
        TwoPhaseFileHeader hdr;
        TransactionId *children;
        RelFileNode *commitrels;
        RelFileNode *abortrels;
+       SharedInvalidationMessage *invalmsgs;
 
        /* Initialize linked list */
        records.head = palloc0(sizeof(XLogRecData));
@@ -748,23 +894,27 @@ StartPrepare(GlobalTransaction gxact)
        hdr.magic = TWOPHASE_MAGIC;
        hdr.total_len = 0;                      /* EndPrepare will fill this in */
        hdr.xid = xid;
-       hdr.database = gxact->proc.databaseId;
+       hdr.database = proc->databaseId;
        hdr.prepared_at = gxact->prepared_at;
        hdr.owner = gxact->owner;
        hdr.nsubxacts = xactGetCommittedChildren(&children);
        hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
        hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
+       hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
+                                                                                                                 &hdr.initfileinval);
        StrNCpy(hdr.gid, gxact->gid, GIDSIZE);
 
        save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
 
-       /* Add the additional info about subxacts and deletable files */
+       /*
+        * Add the additional info about subxacts, deletable files and cache
+        * invalidation messages.
+        */
        if (hdr.nsubxacts > 0)
        {
                save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
                /* While we have the child-xact data, stuff it in the gxact too */
                GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
-               pfree(children);
        }
        if (hdr.ncommitrels > 0)
        {
@@ -776,6 +926,12 @@ StartPrepare(GlobalTransaction gxact)
                save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
                pfree(abortrels);
        }
+       if (hdr.ninvalmsgs > 0)
+       {
+               save_state_data(invalmsgs,
+                                               hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
+               pfree(invalmsgs);
+       }
 }
 
 /*
@@ -786,14 +942,14 @@ StartPrepare(GlobalTransaction gxact)
 void
 EndPrepare(GlobalTransaction gxact)
 {
-       TransactionId   xid = gxact->proc.xid;
+       PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+       TransactionId xid = pgxact->xid;
        TwoPhaseFileHeader *hdr;
-       char                    path[MAXPGPATH];
-       XLogRecData        *record;
-       XLogRecPtr              recptr;
-       pg_crc32                statefile_crc;
-       pg_crc32                bogus_crc;
-       int                             fd;
+       char            path[MAXPGPATH];
+       XLogRecData *record;
+       pg_crc32        statefile_crc;
+       pg_crc32        bogus_crc;
+       int                     fd;
 
        /* Add the end sentinel to the list of 2PC records */
        RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
@@ -804,23 +960,27 @@ EndPrepare(GlobalTransaction gxact)
        Assert(hdr->magic == TWOPHASE_MAGIC);
        hdr->total_len = records.total_len + sizeof(pg_crc32);
 
+       /*
+        * If the file size exceeds MaxAllocSize, we won't be able to read it in
+        * ReadTwoPhaseFile. Check for that now, rather than fail at commit time.
+        */
+       if (hdr->total_len > MaxAllocSize)
+               ereport(ERROR,
+                               (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+                                errmsg("two-phase state file maximum length exceeded")));
+
        /*
         * Create the 2PC state file.
-        *
-        * Note: because we use BasicOpenFile(), we are responsible for ensuring
-        * the FD gets closed in any error exit path.  Once we get into the
-        * critical section, though, it doesn't matter since any failure causes
-        * PANIC anyway.
         */
        TwoPhaseFilePath(path, xid);
 
-       fd = BasicOpenFile(path,
-                                          O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
-                                          S_IRUSR | S_IWUSR);
+       fd = OpenTransientFile(path,
+                                                  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
+                                                  S_IRUSR | S_IWUSR);
        if (fd < 0)
                ereport(ERROR,
                                (errcode_for_file_access(),
-                                errmsg("could not create twophase state file \"%s\": %m",
+                                errmsg("could not create two-phase state file \"%s\": %m",
                                                path)));
 
        /* Write data to file, and calculate CRC as we pass over it */
@@ -831,103 +991,114 @@ EndPrepare(GlobalTransaction gxact)
                COMP_CRC32(statefile_crc, record->data, record->len);
                if ((write(fd, record->data, record->len)) != record->len)
                {
-                       close(fd);
+                       CloseTransientFile(fd);
                        ereport(ERROR,
                                        (errcode_for_file_access(),
-                                        errmsg("could not write twophase state file: %m")));
+                                        errmsg("could not write two-phase state file: %m")));
                }
        }
 
        FIN_CRC32(statefile_crc);
 
        /*
-        * Write a deliberately bogus CRC to the state file, and flush it to disk.
-        * This is to minimize the odds of failure within the critical section
-        * below --- in particular, running out of disk space.
-        *
-        * On most filesystems, write() rather than fsync() detects out-of-space,
-        * so the fsync might be considered optional.  Using it means there
-        * are three fsyncs not two associated with preparing a transaction; is
-        * the risk of an error from fsync high enough to justify that?
+        * Write a deliberately bogus CRC to the state file; this is just paranoia
+        * to catch the case where four more bytes will run us out of disk space.
         */
-       bogus_crc = ~ statefile_crc;
+       bogus_crc = ~statefile_crc;
 
        if ((write(fd, &bogus_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
        {
-               close(fd);
+               CloseTransientFile(fd);
                ereport(ERROR,
                                (errcode_for_file_access(),
-                                errmsg("could not write twophase state file: %m")));
-       }
-
-       if (pg_fsync(fd) != 0)
-       {
-               close(fd);
-               ereport(ERROR,
-                               (errcode_for_file_access(),
-                                errmsg("could not fsync twophase state file: %m")));
+                                errmsg("could not write two-phase state file: %m")));
        }
 
        /* Back up to prepare for rewriting the CRC */
        if (lseek(fd, -((off_t) sizeof(pg_crc32)), SEEK_CUR) < 0)
        {
-               close(fd);
+               CloseTransientFile(fd);
                ereport(ERROR,
                                (errcode_for_file_access(),
-                                errmsg("could not seek twophase state file: %m")));
+                                errmsg("could not seek in two-phase state file: %m")));
        }
 
        /*
         * The state file isn't valid yet, because we haven't written the correct
         * CRC yet.  Before we do that, insert entry in WAL and flush it to disk.
         *
-        * Between the time we have written the WAL entry and the time we
-        * flush the correct state file CRC to disk, we have an inconsistency:
-        * the xact is prepared according to WAL but not according to our on-disk
-        * state.  We use a critical section to force a PANIC if we are unable to
-        * complete the flush --- then, WAL replay should repair the
-        * inconsistency.
+        * Between the time we have written the WAL entry and the time we write
+        * out the correct state file CRC, we have an inconsistency: the xact is
+        * prepared according to WAL but not according to our on-disk state. We
+        * use a critical section to force a PANIC if we are unable to complete
+        * the write --- then, WAL replay should repair the inconsistency.  The
+        * odds of a PANIC actually occurring should be very tiny given that we
+        * were able to write the bogus CRC above.
         *
-        * We have to lock out checkpoint start here, too; otherwise a checkpoint
-        * starting immediately after the WAL record is inserted could complete
-        * before we've finished flushing, meaning that the WAL record would not
-        * get replayed if a crash follows.
+        * We have to set delayChkpt here, too; otherwise a checkpoint starting
+        * immediately after the WAL record is inserted could complete without
+        * fsync'ing our state file.  (This is essentially the same kind of race
+        * condition as the COMMIT-to-clog-write case that RecordTransactionCommit
+        * uses delayChkpt for; see notes there.)
+        *
+        * We save the PREPARE record's location in the gxact for later use by
+        * CheckPointTwoPhase.
         */
        START_CRIT_SECTION();
 
-       LWLockAcquire(CheckpointStartLock, LW_SHARED);
+       MyPgXact->delayChkpt = true;
 
-       recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE, records.head);
-       XLogFlush(recptr);
+       gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
+                                                                       records.head);
+       XLogFlush(gxact->prepare_lsn);
 
        /* If we crash now, we have prepared: WAL replay will fix things */
 
-       /* write correct CRC, flush, and close file */
+       /* write correct CRC and close file */
        if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
        {
-               close(fd);
+               CloseTransientFile(fd);
                ereport(ERROR,
                                (errcode_for_file_access(),
-                                errmsg("could not write twophase state file: %m")));
+                                errmsg("could not write two-phase state file: %m")));
        }
 
-       if (pg_fsync(fd) != 0)
-       {
-               close(fd);
+       if (CloseTransientFile(fd) != 0)
                ereport(ERROR,
                                (errcode_for_file_access(),
-                                errmsg("could not fsync twophase state file: %m")));
-       }
+                                errmsg("could not close two-phase state file: %m")));
 
-       if (close(fd) != 0)
-               ereport(ERROR,
-                               (errcode_for_file_access(),
-                                errmsg("could not close twophase state file: %m")));
+       /*
+        * Mark the prepared transaction as valid.  As soon as xact.c marks
+        * MyPgXact as not running our XID (which it will do immediately after
+        * this function returns), others can commit/rollback the xact.
+        *
+        * NB: a side effect of this is to make a dummy ProcArray entry for the
+        * prepared XID.  This must happen before we clear the XID from MyPgXact,
+        * else there is a window where the XID is not running according to
+        * TransactionIdIsInProgress, and onlookers would be entitled to assume
+        * the xact crashed.  Instead we have a window where the same XID appears
+        * twice in ProcArray, which is OK.
+        */
+       MarkAsPrepared(gxact);
 
-       LWLockRelease(CheckpointStartLock);
+       /*
+        * Now we can mark ourselves as out of the commit critical section: a
+        * checkpoint starting after this will certainly see the gxact as a
+        * candidate for fsyncing.
+        */
+       MyPgXact->delayChkpt = false;
 
        END_CRIT_SECTION();
 
+       /*
+        * Wait for synchronous replication, if required.
+        *
+        * Note that at this stage we have marked the prepare, but still show as
+        * running in the procarray (twice!) and continue to hold locks.
+        */
+       SyncRepWaitForLSN(gxact->prepare_lsn);
+
        records.tail = records.head = NULL;
 }
 
@@ -956,56 +1127,60 @@ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
  * contents of the file.  Otherwise return NULL.
  */
 static char *
-ReadTwoPhaseFile(TransactionId xid)
+ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
 {
        char            path[MAXPGPATH];
        char       *buf;
        TwoPhaseFileHeader *hdr;
        int                     fd;
-       struct stat     stat;
+       struct stat stat;
        uint32          crc_offset;
-       pg_crc32        calc_crc, file_crc;
+       pg_crc32        calc_crc,
+                               file_crc;
 
        TwoPhaseFilePath(path, xid);
 
-       fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
+       fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
        if (fd < 0)
        {
-               ereport(WARNING,
-                               (errcode_for_file_access(),
-                                errmsg("could not open twophase state file \"%s\": %m",
-                                               path)));
+               if (give_warnings)
+                       ereport(WARNING,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not open two-phase state file \"%s\": %m",
+                                                       path)));
                return NULL;
        }
 
        /*
-        * Check file length.  We can determine a lower bound pretty easily.
-        * We set an upper bound mainly to avoid palloc() failure on a corrupt
-        * file.
+        * Check file length.  We can determine a lower bound pretty easily. We
+        * set an upper bound to avoid palloc() failure on a corrupt file, though
+        * we can't guarantee that we won't get an out of memory error anyway,
+        * even on a valid file.
         */
        if (fstat(fd, &stat))
        {
-               close(fd);
-               ereport(WARNING,
-                               (errcode_for_file_access(),
-                                errmsg("could not stat twophase state file \"%s\": %m",
-                                               path)));
+               CloseTransientFile(fd);
+               if (give_warnings)
+                       ereport(WARNING,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not stat two-phase state file \"%s\": %m",
+                                                       path)));
                return NULL;
        }
 
        if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
                                                MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
                                                sizeof(pg_crc32)) ||
-               stat.st_size > 10000000)
+               stat.st_size > MaxAllocSize)
        {
-               close(fd);
+               CloseTransientFile(fd);
                return NULL;
        }
 
        crc_offset = stat.st_size - sizeof(pg_crc32);
        if (crc_offset != MAXALIGN(crc_offset))
        {
-               close(fd);
+               CloseTransientFile(fd);
                return NULL;
        }
 
@@ -1016,16 +1191,17 @@ ReadTwoPhaseFile(TransactionId xid)
 
        if (read(fd, buf, stat.st_size) != stat.st_size)
        {
-               close(fd);
-               ereport(WARNING,
-                               (errcode_for_file_access(),
-                                errmsg("could not read twophase state file \"%s\": %m",
-                                               path)));
+               CloseTransientFile(fd);
+               if (give_warnings)
+                       ereport(WARNING,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not read two-phase state file \"%s\": %m",
+                                                       path)));
                pfree(buf);
                return NULL;
        }
 
-       close(fd);
+       CloseTransientFile(fd);
 
        hdr = (TwoPhaseFileHeader *) buf;
        if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size)
@@ -1049,6 +1225,33 @@ ReadTwoPhaseFile(TransactionId xid)
        return buf;
 }
 
+/*
+ * Confirms an xid is prepared, during recovery
+ */
+bool
+StandbyTransactionIdIsPrepared(TransactionId xid)
+{
+       char       *buf;
+       TwoPhaseFileHeader *hdr;
+       bool            result;
+
+       Assert(TransactionIdIsValid(xid));
+
+       if (max_prepared_xacts <= 0)
+               return false;                   /* nothing to do */
+
+       /* Read and validate file */
+       buf = ReadTwoPhaseFile(xid, false);
+       if (buf == NULL)
+               return false;
+
+       /* Check header also */
+       hdr = (TwoPhaseFileHeader *) buf;
+       result = TransactionIdEquals(hdr->xid, xid);
+       pfree(buf);
+
+       return result;
+}
 
 /*
  * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
@@ -1057,30 +1260,38 @@ void
 FinishPreparedTransaction(const char *gid, bool isCommit)
 {
        GlobalTransaction gxact;
+       PGPROC     *proc;
+       PGXACT     *pgxact;
        TransactionId xid;
-       char *buf;
-       char *bufptr;
+       char       *buf;
+       char       *bufptr;
        TwoPhaseFileHeader *hdr;
+       TransactionId latestXid;
        TransactionId *children;
        RelFileNode *commitrels;
        RelFileNode *abortrels;
-       int             i;
+       RelFileNode *delrels;
+       int                     ndelrels;
+       SharedInvalidationMessage *invalmsgs;
+       int                     i;
 
        /*
-        * Validate the GID, and lock the GXACT to ensure that two backends
-        * do not try to commit the same GID at once.
+        * Validate the GID, and lock the GXACT to ensure that two backends do not
+        * try to commit the same GID at once.
         */
        gxact = LockGXact(gid, GetUserId());
-       xid = gxact->proc.xid;
+       proc = &ProcGlobal->allProcs[gxact->pgprocno];
+       pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+       xid = pgxact->xid;
 
        /*
         * Read and validate the state file
         */
-       buf = ReadTwoPhaseFile(xid);
+       buf = ReadTwoPhaseFile(xid, true);
        if (buf == NULL)
                ereport(ERROR,
                                (errcode(ERRCODE_DATA_CORRUPTED),
-                                errmsg("twophase state file for transaction %u is corrupt",
+                                errmsg("two-phase state file for transaction %u is corrupt",
                                                xid)));
 
        /*
@@ -1095,50 +1306,78 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
        bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
        abortrels = (RelFileNode *) bufptr;
        bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+       invalmsgs = (SharedInvalidationMessage *) bufptr;
+       bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
+
+       /* compute latestXid among all children */
+       latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
 
        /*
         * The order of operations here is critical: make the XLOG entry for
         * commit or abort, then mark the transaction committed or aborted in
-        * pg_clog, then remove its PGPROC from the global ProcArray (which
-        * means TransactionIdIsInProgress will stop saying the prepared xact
-        * is in progress), then run the post-commit or post-abort callbacks.
-        * The callbacks will release the locks the transaction held.
+        * pg_clog, then remove its PGPROC from the global ProcArray (which means
+        * TransactionIdIsInProgress will stop saying the prepared xact is in
+        * progress), then run the post-commit or post-abort callbacks. The
+        * callbacks will release the locks the transaction held.
         */
        if (isCommit)
                RecordTransactionCommitPrepared(xid,
                                                                                hdr->nsubxacts, children,
-                                                                               hdr->ncommitrels, commitrels);
+                                                                               hdr->ncommitrels, commitrels,
+                                                                               hdr->ninvalmsgs, invalmsgs,
+                                                                               hdr->initfileinval);
        else
                RecordTransactionAbortPrepared(xid,
                                                                           hdr->nsubxacts, children,
                                                                           hdr->nabortrels, abortrels);
 
-       ProcArrayRemove(&gxact->proc);
+       ProcArrayRemove(proc, latestXid);
 
        /*
-        * In case we fail while running the callbacks, mark the gxact invalid
-        * so no one else will try to commit/rollback, and so it can be recycled
+        * In case we fail while running the callbacks, mark the gxact invalid so
+        * no one else will try to commit/rollback, and so it can be recycled
         * properly later.  It is still locked by our XID so it won't go away yet.
+        *
+        * (We assume it's safe to do this without taking TwoPhaseStateLock.)
         */
        gxact->valid = false;
 
        /*
-        * We have to remove any files that were supposed to be dropped.
-        * For consistency with the regular xact.c code paths, must do this
-        * before releasing locks, so do it before running the callbacks.
+        * We have to remove any files that were supposed to be dropped. For
+        * consistency with the regular xact.c code paths, must do this before
+        * releasing locks, so do it before running the callbacks.
         *
         * NB: this code knows that we couldn't be dropping any temp rels ...
         */
        if (isCommit)
        {
-               for (i = 0; i < hdr->ncommitrels; i++)
-                       smgrdounlink(smgropen(commitrels[i]), false, false);
+               delrels = commitrels;
+               ndelrels = hdr->ncommitrels;
        }
        else
        {
-               for (i = 0; i < hdr->nabortrels; i++)
-                       smgrdounlink(smgropen(abortrels[i]), false, false);
+               delrels = abortrels;
+               ndelrels = hdr->nabortrels;
        }
+       for (i = 0; i < ndelrels; i++)
+       {
+               SMgrRelation srel = smgropen(delrels[i], InvalidBackendId);
+
+               smgrdounlink(srel, false);
+               smgrclose(srel);
+       }
+
+       /*
+        * Handle cache invalidation messages.
+        *
+        * Relcache init file invalidation requires processing both before and
+        * after we send the SI messages. See AtEOXact_Inval()
+        */
+       if (hdr->initfileinval)
+               RelationCacheInitFilePreInvalidate();
+       SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
+       if (hdr->initfileinval)
+               RelationCacheInitFilePostInvalidate();
 
        /* And now do the callbacks */
        if (isCommit)
@@ -1146,7 +1385,10 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
        else
                ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
 
-       pgstat_count_xact_commit();
+       PredicateLockTwoPhaseFinish(xid, isCommit);
+
+       /* Count the prepared xact as committed or aborted */
+       AtEOXact_PgStat(isCommit);
 
        /*
         * And now we can clean up our mess.
@@ -1177,8 +1419,8 @@ ProcessRecords(char *bufptr, TransactionId xid,
                bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
 
                if (callbacks[record->rmid] != NULL)
-                       callbacks[record->rmid](xid, record->info,
-                                                                       (void *) bufptr, record->len);
+                       callbacks[record->rmid] (xid, record->info,
+                                                                        (void *) bufptr, record->len);
 
                bufptr += MAXALIGN(record->len);
        }
@@ -1193,15 +1435,15 @@ ProcessRecords(char *bufptr, TransactionId xid,
 void
 RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
 {
-       char path[MAXPGPATH];
+       char            path[MAXPGPATH];
 
        TwoPhaseFilePath(path, xid);
        if (unlink(path))
                if (errno != ENOENT || giveWarning)
                        ereport(WARNING,
                                        (errcode_for_file_access(),
-                                        errmsg("could not remove two-phase state file \"%s\": %m",
-                                                       path)));
+                                  errmsg("could not remove two-phase state file \"%s\": %m",
+                                                 path)));
 }
 
 /*
@@ -1223,44 +1465,150 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
 
        TwoPhaseFilePath(path, xid);
 
-       fd = BasicOpenFile(path,
-                                          O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY,
-                                          S_IRUSR | S_IWUSR);
+       fd = OpenTransientFile(path,
+                                                  O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY,
+                                                  S_IRUSR | S_IWUSR);
        if (fd < 0)
                ereport(ERROR,
                                (errcode_for_file_access(),
-                                errmsg("could not recreate twophase state file \"%s\": %m",
+                                errmsg("could not recreate two-phase state file \"%s\": %m",
                                                path)));
 
        /* Write content and CRC */
        if (write(fd, content, len) != len)
        {
-               close(fd);
+               CloseTransientFile(fd);
                ereport(ERROR,
                                (errcode_for_file_access(),
-                                errmsg("could not write twophase state file: %m")));
+                                errmsg("could not write two-phase state file: %m")));
        }
        if (write(fd, &statefile_crc, sizeof(pg_crc32)) != sizeof(pg_crc32))
        {
-               close(fd);
+               CloseTransientFile(fd);
                ereport(ERROR,
                                (errcode_for_file_access(),
-                                errmsg("could not write twophase state file: %m")));
+                                errmsg("could not write two-phase state file: %m")));
        }
 
-       /* Sync and close the file */
+       /*
+        * We must fsync the file because the end-of-replay checkpoint will not do
+        * so, there being no GXACT in shared memory yet to tell it to.
+        */
        if (pg_fsync(fd) != 0)
        {
-               close(fd);
+               CloseTransientFile(fd);
                ereport(ERROR,
                                (errcode_for_file_access(),
-                                errmsg("could not fsync twophase state file: %m")));
+                                errmsg("could not fsync two-phase state file: %m")));
        }
 
-       if (close(fd) != 0)
+       if (CloseTransientFile(fd) != 0)
                ereport(ERROR,
                                (errcode_for_file_access(),
-                                errmsg("could not close twophase state file: %m")));
+                                errmsg("could not close two-phase state file: %m")));
+}
+
+/*
+ * CheckPointTwoPhase -- handle 2PC component of checkpointing.
+ *
+ * We must fsync the state file of any GXACT that is valid and has a PREPARE
+ * LSN <= the checkpoint's redo horizon.  (If the gxact isn't valid yet or
+ * has a later LSN, this checkpoint is not responsible for fsyncing it.)
+ *
+ * This is deliberately run as late as possible in the checkpoint sequence,
+ * because GXACTs ordinarily have short lifespans, and so it is quite
+ * possible that GXACTs that were valid at checkpoint start will no longer
+ * exist if we wait a little bit.
+ *
+ * If a GXACT remains valid across multiple checkpoints, it'll be fsynced
+ * each time.  This is considered unusual enough that we don't bother to
+ * expend any extra code to avoid the redundant fsyncs.  (They should be
+ * reasonably cheap anyway, since they won't cause I/O.)
+ */
+void
+CheckPointTwoPhase(XLogRecPtr redo_horizon)
+{
+       TransactionId *xids;
+       int                     nxids;
+       char            path[MAXPGPATH];
+       int                     i;
+
+       /*
+        * We don't want to hold the TwoPhaseStateLock while doing I/O, so we grab
+        * it just long enough to make a list of the XIDs that require fsyncing,
+        * and then do the I/O afterwards.
+        *
+        * This approach creates a race condition: someone else could delete a
+        * GXACT between the time we release TwoPhaseStateLock and the time we try
+        * to open its state file.  We handle this by special-casing ENOENT
+        * failures: if we see that, we verify that the GXACT is no longer valid,
+        * and if so ignore the failure.
+        */
+       if (max_prepared_xacts <= 0)
+               return;                                 /* nothing to do */
+
+       TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
+
+       xids = (TransactionId *) palloc(max_prepared_xacts * sizeof(TransactionId));
+       nxids = 0;
+
+       LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+
+       for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+       {
+               GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+               PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+
+               if (gxact->valid &&
+                       gxact->prepare_lsn <= redo_horizon)
+                       xids[nxids++] = pgxact->xid;
+       }
+
+       LWLockRelease(TwoPhaseStateLock);
+
+       for (i = 0; i < nxids; i++)
+       {
+               TransactionId xid = xids[i];
+               int                     fd;
+
+               TwoPhaseFilePath(path, xid);
+
+               fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
+               if (fd < 0)
+               {
+                       if (errno == ENOENT)
+                       {
+                               /* OK if gxact is no longer valid */
+                               if (!TransactionIdIsPrepared(xid))
+                                       continue;
+                               /* Restore errno in case it was changed */
+                               errno = ENOENT;
+                       }
+                       ereport(ERROR,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not open two-phase state file \"%s\": %m",
+                                                       path)));
+               }
+
+               if (pg_fsync(fd) != 0)
+               {
+                       CloseTransientFile(fd);
+                       ereport(ERROR,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not fsync two-phase state file \"%s\": %m",
+                                                       path)));
+               }
+
+               if (CloseTransientFile(fd) != 0)
+                       ereport(ERROR,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not close two-phase state file \"%s\": %m",
+                                                       path)));
+       }
+
+       pfree(xids);
+
+       TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
 }
 
 /*
@@ -1285,35 +1633,33 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
  * Our other responsibility is to determine and return the oldest valid XID
  * among the prepared xacts (if none, return ShmemVariableCache->nextXid).
  * This is needed to synchronize pg_subtrans startup properly.
+ *
+ * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
+ * top-level xids is stored in *xids_p. The number of entries in the array
+ * is returned in *nxids_p.
  */
 TransactionId
-PrescanPreparedTransactions(void)
+PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
 {
        TransactionId origNextXid = ShmemVariableCache->nextXid;
        TransactionId result = origNextXid;
-       char    dir[MAXPGPATH];
-       DIR             *cldir;
+       DIR                *cldir;
        struct dirent *clde;
+       TransactionId *xids = NULL;
+       int                     nxids = 0;
+       int                     allocsize = 0;
 
-       snprintf(dir, MAXPGPATH, "%s/%s", DataDir, TWOPHASE_DIR);
-
-       cldir = AllocateDir(dir);
-       if (cldir == NULL)
-               ereport(ERROR,
-                               (errcode_for_file_access(),
-                                errmsg("could not open directory \"%s\": %m", dir)));
-
-       errno = 0;
-       while ((clde = readdir(cldir)) != NULL)
+       cldir = AllocateDir(TWOPHASE_DIR);
+       while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
        {
                if (strlen(clde->d_name) == 8 &&
                        strspn(clde->d_name, "0123456789ABCDEF") == 8)
                {
                        TransactionId xid;
-                       char *buf;
-                       TwoPhaseFileHeader      *hdr;
+                       char       *buf;
+                       TwoPhaseFileHeader *hdr;
                        TransactionId *subxids;
-                       int i;
+                       int                     i;
 
                        xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
 
@@ -1321,10 +1667,9 @@ PrescanPreparedTransactions(void)
                        if (TransactionIdFollowsOrEquals(xid, origNextXid))
                        {
                                ereport(WARNING,
-                                               (errmsg("removing future twophase state file \"%s\"",
+                                               (errmsg("removing future two-phase state file \"%s\"",
                                                                clde->d_name)));
                                RemoveTwoPhaseFile(xid, true);
-                               errno = 0;
                                continue;
                        }
 
@@ -1334,14 +1679,13 @@ PrescanPreparedTransactions(void)
                         */
 
                        /* Read and validate file */
-                       buf = ReadTwoPhaseFile(xid);
+                       buf = ReadTwoPhaseFile(xid, true);
                        if (buf == NULL)
                        {
                                ereport(WARNING,
-                                               (errmsg("removing corrupt twophase state file \"%s\"",
-                                                               clde->d_name)));
+                                         (errmsg("removing corrupt two-phase state file \"%s\"",
+                                                         clde->d_name)));
                                RemoveTwoPhaseFile(xid, true);
-                               errno = 0;
                                continue;
                        }
 
@@ -1350,11 +1694,10 @@ PrescanPreparedTransactions(void)
                        if (!TransactionIdEquals(hdr->xid, xid))
                        {
                                ereport(WARNING,
-                                               (errmsg("removing corrupt twophase state file \"%s\"",
-                                                               clde->d_name)));
+                                         (errmsg("removing corrupt two-phase state file \"%s\"",
+                                                         clde->d_name)));
                                RemoveTwoPhaseFile(xid, true);
                                pfree(buf);
-                               errno = 0;
                                continue;
                        }
 
@@ -1368,6 +1711,10 @@ PrescanPreparedTransactions(void)
                        /*
                         * Examine subtransaction XIDs ... they should all follow main
                         * XID, and they may force us to advance nextXid.
+                        *
+                        * We don't expect anyone else to modify nextXid, hence we don't
+                        * need to hold a lock while examining it.  We still acquire the
+                        * lock to modify it, though.
                         */
                        subxids = (TransactionId *)
                                (buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
@@ -1379,34 +1726,129 @@ PrescanPreparedTransactions(void)
                                if (TransactionIdFollowsOrEquals(subxid,
                                                                                                 ShmemVariableCache->nextXid))
                                {
+                                       LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
                                        ShmemVariableCache->nextXid = subxid;
                                        TransactionIdAdvance(ShmemVariableCache->nextXid);
+                                       LWLockRelease(XidGenLock);
+                               }
+                       }
+
+
+                       if (xids_p)
+                       {
+                               if (nxids == allocsize)
+                               {
+                                       if (nxids == 0)
+                                       {
+                                               allocsize = 10;
+                                               xids = palloc(allocsize * sizeof(TransactionId));
+                                       }
+                                       else
+                                       {
+                                               allocsize = allocsize * 2;
+                                               xids = repalloc(xids, allocsize * sizeof(TransactionId));
+                                       }
                                }
+                               xids[nxids++] = xid;
                        }
 
                        pfree(buf);
                }
-               errno = 0;
        }
-#ifdef WIN32
-
-       /*
-        * This fix is in mingw cvs (runtime/mingwex/dirent.c rev 1.4), but
-        * not in released version
-        */
-       if (GetLastError() == ERROR_NO_MORE_FILES)
-               errno = 0;
-#endif
-       if (errno)
-               ereport(ERROR,
-                               (errcode_for_file_access(),
-                                errmsg("could not read directory \"%s\": %m", dir)));
-
        FreeDir(cldir);
 
+       if (xids_p)
+       {
+               *xids_p = xids;
+               *nxids_p = nxids;
+       }
+
        return result;
 }
 
+/*
+ * StandbyRecoverPreparedTransactions
+ *
+ * Scan the pg_twophase directory and setup all the required information to
+ * allow standby queries to treat prepared transactions as still active.
+ * This is never called at the end of recovery - we use
+ * RecoverPreparedTransactions() at that point.
+ *
+ * Currently we simply call SubTransSetParent() for any subxids of prepared
+ * transactions. If overwriteOK is true, it's OK if some XIDs have already
+ * been marked in pg_subtrans.
+ */
+void
+StandbyRecoverPreparedTransactions(bool overwriteOK)
+{
+       DIR                *cldir;
+       struct dirent *clde;
+
+       cldir = AllocateDir(TWOPHASE_DIR);
+       while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
+       {
+               if (strlen(clde->d_name) == 8 &&
+                       strspn(clde->d_name, "0123456789ABCDEF") == 8)
+               {
+                       TransactionId xid;
+                       char       *buf;
+                       TwoPhaseFileHeader *hdr;
+                       TransactionId *subxids;
+                       int                     i;
+
+                       xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
+
+                       /* Already processed? */
+                       if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
+                       {
+                               ereport(WARNING,
+                                               (errmsg("removing stale two-phase state file \"%s\"",
+                                                               clde->d_name)));
+                               RemoveTwoPhaseFile(xid, true);
+                               continue;
+                       }
+
+                       /* Read and validate file */
+                       buf = ReadTwoPhaseFile(xid, true);
+                       if (buf == NULL)
+                       {
+                               ereport(WARNING,
+                                         (errmsg("removing corrupt two-phase state file \"%s\"",
+                                                         clde->d_name)));
+                               RemoveTwoPhaseFile(xid, true);
+                               continue;
+                       }
+
+                       /* Deconstruct header */
+                       hdr = (TwoPhaseFileHeader *) buf;
+                       if (!TransactionIdEquals(hdr->xid, xid))
+                       {
+                               ereport(WARNING,
+                                         (errmsg("removing corrupt two-phase state file \"%s\"",
+                                                         clde->d_name)));
+                               RemoveTwoPhaseFile(xid, true);
+                               pfree(buf);
+                               continue;
+                       }
+
+                       /*
+                        * Examine subtransaction XIDs ... they should all follow main
+                        * XID.
+                        */
+                       subxids = (TransactionId *)
+                               (buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
+                       for (i = 0; i < hdr->nsubxacts; i++)
+                       {
+                               TransactionId subxid = subxids[i];
+
+                               Assert(TransactionIdFollows(subxid, xid));
+                               SubTransSetParent(xid, subxid, overwriteOK);
+                       }
+               }
+       }
+       FreeDir(cldir);
+}
+
 /*
  * RecoverPreparedTransactions
  *
@@ -1417,31 +1859,26 @@ PrescanPreparedTransactions(void)
 void
 RecoverPreparedTransactions(void)
 {
-       char    dir[MAXPGPATH];
-       DIR             *cldir;
+       char            dir[MAXPGPATH];
+       DIR                *cldir;
        struct dirent *clde;
+       bool            overwriteOK = false;
 
-       snprintf(dir, MAXPGPATH, "%s/%s", DataDir, TWOPHASE_DIR);
+       snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR);
 
        cldir = AllocateDir(dir);
-       if (cldir == NULL)
-               ereport(ERROR,
-                               (errcode_for_file_access(),
-                                errmsg("could not open directory \"%s\": %m", dir)));
-
-       errno = 0;
-       while ((clde = readdir(cldir)) != NULL)
+       while ((clde = ReadDir(cldir, dir)) != NULL)
        {
                if (strlen(clde->d_name) == 8 &&
                        strspn(clde->d_name, "0123456789ABCDEF") == 8)
                {
                        TransactionId xid;
-                       char *buf;
-                       char *bufptr;
-                       TwoPhaseFileHeader      *hdr;
+                       char       *buf;
+                       char       *bufptr;
+                       TwoPhaseFileHeader *hdr;
                        TransactionId *subxids;
-                       GlobalTransaction       gxact;
-                       int i;
+                       GlobalTransaction gxact;
+                       int                     i;
 
                        xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
 
@@ -1449,22 +1886,20 @@ RecoverPreparedTransactions(void)
                        if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
                        {
                                ereport(WARNING,
-                                               (errmsg("removing stale twophase state file \"%s\"",
+                                               (errmsg("removing stale two-phase state file \"%s\"",
                                                                clde->d_name)));
                                RemoveTwoPhaseFile(xid, true);
-                               errno = 0;
                                continue;
                        }
 
                        /* Read and validate file */
-                       buf = ReadTwoPhaseFile(xid);
+                       buf = ReadTwoPhaseFile(xid, true);
                        if (buf == NULL)
                        {
                                ereport(WARNING,
-                                               (errmsg("removing corrupt twophase state file \"%s\"",
-                                                               clde->d_name)));
+                                         (errmsg("removing corrupt two-phase state file \"%s\"",
+                                                         clde->d_name)));
                                RemoveTwoPhaseFile(xid, true);
-                               errno = 0;
                                continue;
                        }
 
@@ -1479,19 +1914,36 @@ RecoverPreparedTransactions(void)
                        bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
                        bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
                        bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+                       bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
+
+                       /*
+                        * It's possible that SubTransSetParent has been set before, if
+                        * the prepared transaction generated xid assignment records. Test
+                        * here must match one used in AssignTransactionId().
+                        */
+                       if (InHotStandby && (hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS ||
+                                                                XLogLogicalInfoActive()))
+                               overwriteOK = true;
 
                        /*
                         * Reconstruct subtrans state for the transaction --- needed
-                        * because pg_subtrans is not preserved over a restart.  Note
-                        * that we are linking all the subtransactions directly to the
+                        * because pg_subtrans is not preserved over a restart.  Note that
+                        * we are linking all the subtransactions directly to the
                         * top-level XID; there may originally have been a more complex
                         * hierarchy, but there's no need to restore that exactly.
                         */
                        for (i = 0; i < hdr->nsubxacts; i++)
-                               SubTransSetParent(subxids[i], xid);
+                               SubTransSetParent(subxids[i], xid, overwriteOK);
 
                        /*
                         * Recreate its GXACT and dummy PGPROC
+                        *
+                        * Note: since we don't have the PREPARE record's WAL location at
+                        * hand, we leave prepare_lsn zeroes.  This means the GXACT will
+                        * be fsync'd on every future checkpoint.  We assume this
+                        * situation is infrequent enough that the performance cost is
+                        * negligible (especially since we know the state file has already
+                        * been fsynced).
                         */
                        gxact = MarkAsPreparing(xid, hdr->gid,
                                                                        hdr->prepared_at,
@@ -1504,24 +1956,17 @@ RecoverPreparedTransactions(void)
                         */
                        ProcessRecords(bufptr, xid, twophase_recover_callbacks);
 
+                       /*
+                        * Release locks held by the standby process after we process each
+                        * prepared transaction. As a result, we don't need too many
+                        * additional locks at any one time.
+                        */
+                       if (InHotStandby)
+                               StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
+
                        pfree(buf);
                }
-               errno = 0;
        }
-#ifdef WIN32
-
-       /*
-        * This fix is in mingw cvs (runtime/mingwex/dirent.c rev 1.4), but
-        * not in released version
-        */
-       if (GetLastError() == ERROR_NO_MORE_FILES)
-               errno = 0;
-#endif
-       if (errno)
-               ereport(ERROR,
-                               (errcode_for_file_access(),
-                                errmsg("could not read directory \"%s\": %m", dir)));
-
        FreeDir(cldir);
 }
 
@@ -1529,7 +1974,7 @@ RecoverPreparedTransactions(void)
  *     RecordTransactionCommitPrepared
  *
  * This is basically the same as RecordTransactionCommit: in particular,
- * we must take the CheckpointStartLock to avoid a race condition.
+ * we must set the delayChkpt flag to avoid a race condition.
  *
  * We know the transaction made at least one XLOG entry (its PREPARE),
  * so it is never possible to optimize out the commit record.
@@ -1539,9 +1984,12 @@ RecordTransactionCommitPrepared(TransactionId xid,
                                                                int nchildren,
                                                                TransactionId *children,
                                                                int nrels,
-                                                               RelFileNode *rels)
+                                                               RelFileNode *rels,
+                                                               int ninvalmsgs,
+                                                               SharedInvalidationMessage *invalmsgs,
+                                                               bool initfileinval)
 {
-       XLogRecData rdata[3];
+       XLogRecData rdata[4];
        int                     lastrdata = 0;
        xl_xact_commit_prepared xlrec;
        XLogRecPtr      recptr;
@@ -1549,13 +1997,17 @@ RecordTransactionCommitPrepared(TransactionId xid,
        START_CRIT_SECTION();
 
        /* See notes in RecordTransactionCommit */
-       LWLockAcquire(CheckpointStartLock, LW_SHARED);
+       MyPgXact->delayChkpt = true;
 
        /* Emit the XLOG commit record */
        xlrec.xid = xid;
-       xlrec.crec.xtime = time(NULL);
+       xlrec.crec.xact_time = GetCurrentTimestamp();
+       xlrec.crec.xinfo = initfileinval ? XACT_COMPLETION_UPDATE_RELCACHE_FILE : 0;
+       xlrec.crec.nmsgs = 0;
        xlrec.crec.nrels = nrels;
        xlrec.crec.nsubxacts = nchildren;
+       xlrec.crec.nmsgs = ninvalmsgs;
+
        rdata[0].data = (char *) (&xlrec);
        rdata[0].len = MinSizeOfXactCommitPrepared;
        rdata[0].buffer = InvalidBuffer;
@@ -1577,26 +2029,43 @@ RecordTransactionCommitPrepared(TransactionId xid,
                rdata[2].buffer = InvalidBuffer;
                lastrdata = 2;
        }
+       /* dump cache invalidation messages */
+       if (ninvalmsgs > 0)
+       {
+               rdata[lastrdata].next = &(rdata[3]);
+               rdata[3].data = (char *) invalmsgs;
+               rdata[3].len = ninvalmsgs * sizeof(SharedInvalidationMessage);
+               rdata[3].buffer = InvalidBuffer;
+               lastrdata = 3;
+       }
        rdata[lastrdata].next = NULL;
 
-       recptr = XLogInsert(RM_XACT_ID,
-                                               XLOG_XACT_COMMIT_PREPARED | XLOG_NO_TRAN,
-                                               rdata);
+       recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED, rdata);
 
-       /* we don't currently try to sleep before flush here ... */
+       /*
+        * We don't currently try to sleep before flush here ... nor is there any
+        * support for async commit of a prepared xact (the very idea is probably
+        * a contradiction)
+        */
 
        /* Flush XLOG to disk */
        XLogFlush(recptr);
 
        /* Mark the transaction committed in pg_clog */
-       TransactionIdCommit(xid);
-       /* to avoid race conditions, the parent must commit first */
-       TransactionIdCommitTree(nchildren, children);
+       TransactionIdCommitTree(xid, nchildren, children);
 
-       /* Checkpoint is allowed again */
-       LWLockRelease(CheckpointStartLock);
+       /* Checkpoint can proceed now */
+       MyPgXact->delayChkpt = false;
 
        END_CRIT_SECTION();
+
+       /*
+        * Wait for synchronous replication, if required.
+        *
+        * Note that at this stage we have marked clog, but still show as running
+        * in the procarray and continue to hold locks.
+        */
+       SyncRepWaitForLSN(recptr);
 }
 
 /*
@@ -1631,7 +2100,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
 
        /* Emit the XLOG abort record */
        xlrec.xid = xid;
-       xlrec.arec.xtime = time(NULL);
+       xlrec.arec.xact_time = GetCurrentTimestamp();
        xlrec.arec.nrels = nrels;
        xlrec.arec.nsubxacts = nchildren;
        rdata[0].data = (char *) (&xlrec);
@@ -1657,20 +2126,24 @@ RecordTransactionAbortPrepared(TransactionId xid,
        }
        rdata[lastrdata].next = NULL;
 
-       recptr = XLogInsert(RM_XACT_ID,
-                                               XLOG_XACT_ABORT_PREPARED | XLOG_NO_TRAN,
-                                               rdata);
+       recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED, rdata);
 
        /* Always flush, since we're about to remove the 2PC state file */
        XLogFlush(recptr);
 
        /*
-        * Mark the transaction aborted in clog.  This is not absolutely
-        * necessary but we may as well do it while we are here.
+        * Mark the transaction aborted in clog.  This is not absolutely necessary
+        * but we may as well do it while we are here.
         */
-       TransactionIdAbort(xid);
-       TransactionIdAbortTree(nchildren, children);
+       TransactionIdAbortTree(xid, nchildren, children);
 
        END_CRIT_SECTION();
-}
 
+       /*
+        * Wait for synchronous replication, if required.
+        *
+        * Note that at this stage we have marked clog, but still show as running
+        * in the procarray and continue to hold locks.
+        */
+       SyncRepWaitForLSN(recptr);
+}