]> granicus.if.org Git - postgresql/blobdiff - src/backend/access/transam/twophase.c
pgindent run for 9.4
[postgresql] / src / backend / access / transam / twophase.c
index ae2b8dcd6956fcdf2e67271edc1fc05a42e82556..70ca6ab67d1ef8a25f7f8bf9515a116239aa8fb8 100644 (file)
@@ -3,11 +3,11 @@
  * twophase.c
  *             Two-phase commit support functions.
  *
- * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *             $PostgreSQL: pgsql/src/backend/access/transam/twophase.c,v 1.36 2007/09/21 16:32:19 tgl Exp $
+ *             src/backend/access/transam/twophase.c
  *
  * NOTES
  *             Each global transaction is associated with a global transaction
  *             GIDs and aborts the transaction if there already is a global
  *             transaction in prepared state with the same GID.
  *
- *             A global transaction (gxact) also has a dummy PGPROC that is entered
- *             into the ProcArray array; this is what keeps the XID considered
- *             running by TransactionIdIsInProgress.  It is also convenient as a
- *             PGPROC to hook the gxact's locks to.
+ *             A global transaction (gxact) also has dummy PGXACT and PGPROC; this is
+ *             what keeps the XID considered running by TransactionIdIsInProgress.
+ *             It is also convenient as a PGPROC to hook the gxact's locks to.
  *
  *             In order to survive crashes and shutdowns, all prepared
  *             transactions must be stored in permanent storage. This includes
 #include <time.h>
 #include <unistd.h>
 
-#include "access/heapam.h"
+#include "access/htup_details.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
 #include "access/twophase.h"
 #include "access/twophase_rmgr.h"
 #include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xlogutils.h"
 #include "catalog/pg_type.h"
+#include "catalog/storage.h"
 #include "funcapi.h"
 #include "miscadmin.h"
+#include "pg_trace.h"
 #include "pgstat.h"
+#include "replication/walsender.h"
+#include "replication/syncrep.h"
 #include "storage/fd.h"
+#include "storage/predicate.h"
+#include "storage/proc.h"
 #include "storage/procarray.h"
+#include "storage/sinvaladt.h"
 #include "storage/smgr.h"
 #include "utils/builtins.h"
+#include "utils/memutils.h"
+#include "utils/timestamp.h"
 
 
 /*
 #define TWOPHASE_DIR "pg_twophase"
 
 /* GUC variable, can't be changed after startup */
-int                    max_prepared_xacts = 5;
+int                    max_prepared_xacts = 0;
 
 /*
  * This struct describes one global transaction that is in prepared state
  * or attempting to become prepared.
  *
- * The first component of the struct is a dummy PGPROC that is inserted
- * into the global ProcArray so that the transaction appears to still be
- * running and holding locks.  It must be first because we cast pointers
- * to PGPROC and pointers to GlobalTransactionData back and forth.
- *
  * The lifecycle of a global transaction is:
  *
  * 1. After checking that the requested GID is not in use, set up an
@@ -82,7 +87,7 @@ int                   max_prepared_xacts = 5;
  * with locking_xid = my own XID and valid = false.
  *
  * 2. After successfully completing prepare, set valid = true and enter the
- * contained PGPROC into the global ProcArray.
+ * referenced PGPROC into the global ProcArray.
  *
  * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry
  * is valid and its locking_xid is no longer active, then store my current
@@ -104,14 +109,16 @@ int                       max_prepared_xacts = 5;
 
 typedef struct GlobalTransactionData
 {
-       PGPROC          proc;                   /* dummy proc */
+       GlobalTransaction next;         /* list link for free list */
+       int                     pgprocno;               /* ID of associated dummy PGPROC */
+       BackendId       dummyBackendId; /* similar to backend id for backends */
        TimestampTz prepared_at;        /* time of preparation */
        XLogRecPtr      prepare_lsn;    /* XLOG offset of prepare record */
        Oid                     owner;                  /* ID of user that executed the xact */
        TransactionId locking_xid;      /* top-level XID of backend working on xact */
        bool            valid;                  /* TRUE if fully prepared */
        char            gid[GIDSIZE];   /* The GID assigned to the prepared xact */
-} GlobalTransactionData;
+}      GlobalTransactionData;
 
 /*
  * Two Phase Commit shared state.  Access to this struct is protected
@@ -120,7 +127,7 @@ typedef struct GlobalTransactionData
 typedef struct TwoPhaseStateData
 {
        /* Head of linked list of free GlobalTransactionData structs */
-       SHMEM_OFFSET freeGXacts;
+       GlobalTransaction freeGXacts;
 
        /* Number of valid prepXacts entries. */
        int                     numPrepXacts;
@@ -139,7 +146,10 @@ static void RecordTransactionCommitPrepared(TransactionId xid,
                                                                int nchildren,
                                                                TransactionId *children,
                                                                int nrels,
-                                                               RelFileNode *rels);
+                                                               RelFileNode *rels,
+                                                               int ninvalmsgs,
+                                                               SharedInvalidationMessage *invalmsgs,
+                                                               bool initfileinval);
 static void RecordTransactionAbortPrepared(TransactionId xid,
                                                           int nchildren,
                                                           TransactionId *children,
@@ -182,7 +192,7 @@ TwoPhaseShmemInit(void)
                int                     i;
 
                Assert(!found);
-               TwoPhaseState->freeGXacts = INVALID_OFFSET;
+               TwoPhaseState->freeGXacts = NULL;
                TwoPhaseState->numPrepXacts = 0;
 
                /*
@@ -194,8 +204,26 @@ TwoPhaseShmemInit(void)
                                          sizeof(GlobalTransaction) * max_prepared_xacts));
                for (i = 0; i < max_prepared_xacts; i++)
                {
-                       gxacts[i].proc.links.next = TwoPhaseState->freeGXacts;
-                       TwoPhaseState->freeGXacts = MAKE_OFFSET(&gxacts[i]);
+                       /* insert into linked list */
+                       gxacts[i].next = TwoPhaseState->freeGXacts;
+                       TwoPhaseState->freeGXacts = &gxacts[i];
+
+                       /* associate it with a PGPROC assigned by InitProcGlobal */
+                       gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno;
+
+                       /*
+                        * Assign a unique ID for each dummy proc, so that the range of
+                        * dummy backend IDs immediately follows the range of normal
+                        * backend IDs. We don't dare to assign a real backend ID to dummy
+                        * procs, because prepared transactions don't take part in cache
+                        * invalidation like a real backend ID would imply, but having a
+                        * unique ID for them is nevertheless handy. This arrangement
+                        * allows you to allocate an array of size (MaxBackends +
+                        * max_prepared_xacts + 1), and have a slot for every backend and
+                        * prepared transaction. Currently multixact.c uses that
+                        * technique.
+                        */
+                       gxacts[i].dummyBackendId = MaxBackends + 1 + i;
                }
        }
        else
@@ -216,6 +244,8 @@ MarkAsPreparing(TransactionId xid, const char *gid,
                                TimestampTz prepared_at, Oid owner, Oid databaseid)
 {
        GlobalTransaction gxact;
+       PGPROC     *proc;
+       PGXACT     *pgxact;
        int                     i;
 
        if (strlen(gid) >= GIDSIZE)
@@ -224,6 +254,13 @@ MarkAsPreparing(TransactionId xid, const char *gid,
                                 errmsg("transaction identifier \"%s\" is too long",
                                                gid)));
 
+       /* fail immediately if feature is disabled */
+       if (max_prepared_xacts == 0)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("prepared transactions are disabled"),
+                         errhint("Set max_prepared_transactions to a nonzero value.")));
+
        LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 
        /*
@@ -240,8 +277,8 @@ MarkAsPreparing(TransactionId xid, const char *gid,
                        TwoPhaseState->numPrepXacts--;
                        TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
                        /* and put it back in the freelist */
-                       gxact->proc.links.next = TwoPhaseState->freeGXacts;
-                       TwoPhaseState->freeGXacts = MAKE_OFFSET(gxact);
+                       gxact->next = TwoPhaseState->freeGXacts;
+                       TwoPhaseState->freeGXacts = gxact;
                        /* Back up index count too, so we don't miss scanning one */
                        i--;
                }
@@ -261,45 +298,47 @@ MarkAsPreparing(TransactionId xid, const char *gid,
        }
 
        /* Get a free gxact from the freelist */
-       if (TwoPhaseState->freeGXacts == INVALID_OFFSET)
+       if (TwoPhaseState->freeGXacts == NULL)
                ereport(ERROR,
                                (errcode(ERRCODE_OUT_OF_MEMORY),
                                 errmsg("maximum number of prepared transactions reached"),
                                 errhint("Increase max_prepared_transactions (currently %d).",
                                                 max_prepared_xacts)));
-       gxact = (GlobalTransaction) MAKE_PTR(TwoPhaseState->freeGXacts);
-       TwoPhaseState->freeGXacts = gxact->proc.links.next;
+       gxact = TwoPhaseState->freeGXacts;
+       TwoPhaseState->freeGXacts = gxact->next;
+
+       proc = &ProcGlobal->allProcs[gxact->pgprocno];
+       pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
 
-       /* Initialize it */
-       MemSet(&gxact->proc, 0, sizeof(PGPROC));
-       SHMQueueElemInit(&(gxact->proc.links));
-       gxact->proc.waitStatus = STATUS_OK;
+       /* Initialize the PGPROC entry */
+       MemSet(proc, 0, sizeof(PGPROC));
+       proc->pgprocno = gxact->pgprocno;
+       SHMQueueElemInit(&(proc->links));
+       proc->waitStatus = STATUS_OK;
        /* We set up the gxact's VXID as InvalidBackendId/XID */
-       gxact->proc.lxid = (LocalTransactionId) xid;
-       gxact->proc.xid = xid;
-       gxact->proc.xmin = InvalidTransactionId;
-       gxact->proc.pid = 0;
-       gxact->proc.backendId = InvalidBackendId;
-       gxact->proc.databaseId = databaseid;
-       gxact->proc.roleId = owner;
-       gxact->proc.inCommit = false;
-       gxact->proc.inVacuum = false;
-       gxact->proc.isAutovacuum = false;
-       gxact->proc.lwWaiting = false;
-       gxact->proc.lwExclusive = false;
-       gxact->proc.lwWaitLink = NULL;
-       gxact->proc.waitLock = NULL;
-       gxact->proc.waitProcLock = NULL;
+       proc->lxid = (LocalTransactionId) xid;
+       pgxact->xid = xid;
+       pgxact->xmin = InvalidTransactionId;
+       pgxact->delayChkpt = false;
+       pgxact->vacuumFlags = 0;
+       proc->pid = 0;
+       proc->backendId = InvalidBackendId;
+       proc->databaseId = databaseid;
+       proc->roleId = owner;
+       proc->lwWaiting = false;
+       proc->lwWaitMode = 0;
+       proc->lwWaitLink = NULL;
+       proc->waitLock = NULL;
+       proc->waitProcLock = NULL;
        for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
-               SHMQueueInit(&(gxact->proc.myProcLocks[i]));
+               SHMQueueInit(&(proc->myProcLocks[i]));
        /* subxid data must be filled later by GXactLoadSubxactData */
-       gxact->proc.subxids.overflowed = false;
-       gxact->proc.subxids.nxids = 0;
+       pgxact->overflowed = false;
+       pgxact->nxids = 0;
 
        gxact->prepared_at = prepared_at;
        /* initialize LSN to 0 (start of WAL) */
-       gxact->prepare_lsn.xlogid = 0;
-       gxact->prepare_lsn.xrecoff = 0;
+       gxact->prepare_lsn = 0;
        gxact->owner = owner;
        gxact->locking_xid = xid;
        gxact->valid = false;
@@ -325,17 +364,20 @@ static void
 GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
                                         TransactionId *children)
 {
+       PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
+       PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+
        /* We need no extra lock since the GXACT isn't valid yet */
        if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
        {
-               gxact->proc.subxids.overflowed = true;
+               pgxact->overflowed = true;
                nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
        }
        if (nsubxacts > 0)
        {
-               memcpy(gxact->proc.subxids.xids, children,
+               memcpy(proc->subxids.xids, children,
                           nsubxacts * sizeof(TransactionId));
-               gxact->proc.subxids.nxids = nsubxacts;
+               pgxact->nxids = nsubxacts;
        }
 }
 
@@ -356,7 +398,7 @@ MarkAsPrepared(GlobalTransaction gxact)
         * Put it into the global ProcArray so TransactionIdIsInProgress considers
         * the XID as still running.
         */
-       ProcArrayAdd(&gxact->proc);
+       ProcArrayAdd(&ProcGlobal->allProcs[gxact->pgprocno]);
 }
 
 /*
@@ -373,6 +415,7 @@ LockGXact(const char *gid, Oid user)
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
        {
                GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+               PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
 
                /* Ignore not-yet-valid GIDs */
                if (!gxact->valid)
@@ -398,15 +441,15 @@ LockGXact(const char *gid, Oid user)
                                         errhint("Must be superuser or the user that prepared the transaction.")));
 
                /*
-                * Note: it probably would be possible to allow committing from another
-                * database; but at the moment NOTIFY is known not to work and there
-                * may be some other issues as well.  Hence disallow until someone
-                * gets motivated to make it work.
+                * Note: it probably would be possible to allow committing from
+                * another database; but at the moment NOTIFY is known not to work and
+                * there may be some other issues as well.  Hence disallow until
+                * someone gets motivated to make it work.
                 */
-               if (MyDatabaseId != gxact->proc.databaseId)
+               if (MyDatabaseId != proc->databaseId)
                        ereport(ERROR,
                                        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                                        errmsg("prepared transaction belongs to another database"),
+                                 errmsg("prepared transaction belongs to another database"),
                                         errhint("Connect to the database where the transaction was prepared to finish it.")));
 
                /* OK for me to lock it */
@@ -450,8 +493,8 @@ RemoveGXact(GlobalTransaction gxact)
                        TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
 
                        /* and put it back in the freelist */
-                       gxact->proc.links.next = TwoPhaseState->freeGXacts;
-                       TwoPhaseState->freeGXacts = MAKE_OFFSET(gxact);
+                       gxact->next = TwoPhaseState->freeGXacts;
+                       TwoPhaseState->freeGXacts = gxact;
 
                        LWLockRelease(TwoPhaseStateLock);
 
@@ -485,8 +528,9 @@ TransactionIdIsPrepared(TransactionId xid)
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
        {
                GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+               PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
 
-               if (gxact->valid && gxact->proc.xid == xid)
+               if (gxact->valid && pgxact->xid == xid)
                {
                        result = true;
                        break;
@@ -609,6 +653,8 @@ pg_prepared_xact(PG_FUNCTION_ARGS)
        while (status->array != NULL && status->currIdx < status->ngxacts)
        {
                GlobalTransaction gxact = &status->array[status->currIdx++];
+               PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
+               PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
                Datum           values[5];
                bool            nulls[5];
                HeapTuple       tuple;
@@ -623,11 +669,11 @@ pg_prepared_xact(PG_FUNCTION_ARGS)
                MemSet(values, 0, sizeof(values));
                MemSet(nulls, 0, sizeof(nulls));
 
-               values[0] = TransactionIdGetDatum(gxact->proc.xid);
-               values[1] = DirectFunctionCall1(textin, CStringGetDatum(gxact->gid));
+               values[0] = TransactionIdGetDatum(pgxact->xid);
+               values[1] = CStringGetTextDatum(gxact->gid);
                values[2] = TimestampTzGetDatum(gxact->prepared_at);
                values[3] = ObjectIdGetDatum(gxact->owner);
-               values[4] = ObjectIdGetDatum(gxact->proc.databaseId);
+               values[4] = ObjectIdGetDatum(proc->databaseId);
 
                tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
                result = HeapTupleGetDatum(tuple);
@@ -638,34 +684,36 @@ pg_prepared_xact(PG_FUNCTION_ARGS)
 }
 
 /*
- * TwoPhaseGetDummyProc
- *             Get the PGPROC that represents a prepared transaction specified by XID
+ * TwoPhaseGetGXact
+ *             Get the GlobalTransaction struct for a prepared transaction
+ *             specified by XID
  */
-PGPROC *
-TwoPhaseGetDummyProc(TransactionId xid)
+static GlobalTransaction
+TwoPhaseGetGXact(TransactionId xid)
 {
-       PGPROC     *result = NULL;
+       GlobalTransaction result = NULL;
        int                     i;
 
        static TransactionId cached_xid = InvalidTransactionId;
-       static PGPROC *cached_proc = NULL;
+       static GlobalTransaction cached_gxact = NULL;
 
        /*
         * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
         * repeatedly for the same XID.  We can save work with a simple cache.
         */
        if (xid == cached_xid)
-               return cached_proc;
+               return cached_gxact;
 
        LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
 
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
        {
                GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+               PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
 
-               if (gxact->proc.xid == xid)
+               if (pgxact->xid == xid)
                {
-                       result = &gxact->proc;
+                       result = gxact;
                        break;
                }
        }
@@ -673,14 +721,42 @@ TwoPhaseGetDummyProc(TransactionId xid)
        LWLockRelease(TwoPhaseStateLock);
 
        if (result == NULL)                     /* should not happen */
-               elog(ERROR, "failed to find dummy PGPROC for xid %u", xid);
+               elog(ERROR, "failed to find GlobalTransaction for xid %u", xid);
 
        cached_xid = xid;
-       cached_proc = result;
+       cached_gxact = result;
 
        return result;
 }
 
+/*
+ * TwoPhaseGetDummyProc
+ *             Get the dummy backend ID for prepared transaction specified by XID
+ *
+ * Dummy backend IDs are similar to real backend IDs of real backends.
+ * They start at MaxBackends + 1, and are unique across all currently active
+ * real backends and prepared transactions.
+ */
+BackendId
+TwoPhaseGetDummyBackendId(TransactionId xid)
+{
+       GlobalTransaction gxact = TwoPhaseGetGXact(xid);
+
+       return gxact->dummyBackendId;
+}
+
+/*
+ * TwoPhaseGetDummyProc
+ *             Get the PGPROC that represents a prepared transaction specified by XID
+ */
+PGPROC *
+TwoPhaseGetDummyProc(TransactionId xid)
+{
+       GlobalTransaction gxact = TwoPhaseGetGXact(xid);
+
+       return &ProcGlobal->allProcs[gxact->pgprocno];
+}
+
 /************************************************************************/
 /* State file support                                                                                                  */
 /************************************************************************/
@@ -695,10 +771,11 @@ TwoPhaseGetDummyProc(TransactionId xid)
  *     2. TransactionId[] (subtransactions)
  *     3. RelFileNode[] (files to be deleted at commit)
  *     4. RelFileNode[] (files to be deleted at abort)
- *     5. TwoPhaseRecordOnDisk
- *     6. ...
- *     7. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
- *     8. CRC32
+ *     5. SharedInvalidationMessage[] (inval messages to be sent at commit)
+ *     6. TwoPhaseRecordOnDisk
+ *     7. ...
+ *     8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
+ *     9. CRC32
  *
  * Each segment except the final CRC32 is MAXALIGN'd.
  */
@@ -706,7 +783,7 @@ TwoPhaseGetDummyProc(TransactionId xid)
 /*
  * Header for a 2PC state file
  */
-#define TWOPHASE_MAGIC 0x57F94531              /* format identifier */
+#define TWOPHASE_MAGIC 0x57F94532              /* format identifier */
 
 typedef struct TwoPhaseFileHeader
 {
@@ -719,6 +796,8 @@ typedef struct TwoPhaseFileHeader
        int32           nsubxacts;              /* number of following subxact XIDs */
        int32           ncommitrels;    /* number of delete-on-commit rels */
        int32           nabortrels;             /* number of delete-on-abort rels */
+       int32           ninvalmsgs;             /* number of cache invalidation messages */
+       bool            initfileinval;  /* does relcache init file need invalidation? */
        char            gid[GIDSIZE];   /* GID for transaction */
 } TwoPhaseFileHeader;
 
@@ -789,11 +868,14 @@ save_state_data(const void *data, uint32 len)
 void
 StartPrepare(GlobalTransaction gxact)
 {
-       TransactionId xid = gxact->proc.xid;
+       PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
+       PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+       TransactionId xid = pgxact->xid;
        TwoPhaseFileHeader hdr;
        TransactionId *children;
        RelFileNode *commitrels;
        RelFileNode *abortrels;
+       SharedInvalidationMessage *invalmsgs;
 
        /* Initialize linked list */
        records.head = palloc0(sizeof(XLogRecData));
@@ -812,23 +894,27 @@ StartPrepare(GlobalTransaction gxact)
        hdr.magic = TWOPHASE_MAGIC;
        hdr.total_len = 0;                      /* EndPrepare will fill this in */
        hdr.xid = xid;
-       hdr.database = gxact->proc.databaseId;
+       hdr.database = proc->databaseId;
        hdr.prepared_at = gxact->prepared_at;
        hdr.owner = gxact->owner;
        hdr.nsubxacts = xactGetCommittedChildren(&children);
-       hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels, NULL);
-       hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels, NULL);
+       hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
+       hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
+       hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
+                                                                                                                 &hdr.initfileinval);
        StrNCpy(hdr.gid, gxact->gid, GIDSIZE);
 
        save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
 
-       /* Add the additional info about subxacts and deletable files */
+       /*
+        * Add the additional info about subxacts, deletable files and cache
+        * invalidation messages.
+        */
        if (hdr.nsubxacts > 0)
        {
                save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
                /* While we have the child-xact data, stuff it in the gxact too */
                GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
-               pfree(children);
        }
        if (hdr.ncommitrels > 0)
        {
@@ -840,6 +926,12 @@ StartPrepare(GlobalTransaction gxact)
                save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
                pfree(abortrels);
        }
+       if (hdr.ninvalmsgs > 0)
+       {
+               save_state_data(invalmsgs,
+                                               hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
+               pfree(invalmsgs);
+       }
 }
 
 /*
@@ -850,7 +942,8 @@ StartPrepare(GlobalTransaction gxact)
 void
 EndPrepare(GlobalTransaction gxact)
 {
-       TransactionId xid = gxact->proc.xid;
+       PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+       TransactionId xid = pgxact->xid;
        TwoPhaseFileHeader *hdr;
        char            path[MAXPGPATH];
        XLogRecData *record;
@@ -867,19 +960,23 @@ EndPrepare(GlobalTransaction gxact)
        Assert(hdr->magic == TWOPHASE_MAGIC);
        hdr->total_len = records.total_len + sizeof(pg_crc32);
 
+       /*
+        * If the file size exceeds MaxAllocSize, we won't be able to read it in
+        * ReadTwoPhaseFile. Check for that now, rather than fail at commit time.
+        */
+       if (hdr->total_len > MaxAllocSize)
+               ereport(ERROR,
+                               (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+                                errmsg("two-phase state file maximum length exceeded")));
+
        /*
         * Create the 2PC state file.
-        *
-        * Note: because we use BasicOpenFile(), we are responsible for ensuring
-        * the FD gets closed in any error exit path.  Once we get into the
-        * critical section, though, it doesn't matter since any failure causes
-        * PANIC anyway.
         */
        TwoPhaseFilePath(path, xid);
 
-       fd = BasicOpenFile(path,
-                                          O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
-                                          S_IRUSR | S_IWUSR);
+       fd = OpenTransientFile(path,
+                                                  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
+                                                  S_IRUSR | S_IWUSR);
        if (fd < 0)
                ereport(ERROR,
                                (errcode_for_file_access(),
@@ -894,7 +991,7 @@ EndPrepare(GlobalTransaction gxact)
                COMP_CRC32(statefile_crc, record->data, record->len);
                if ((write(fd, record->data, record->len)) != record->len)
                {
-                       close(fd);
+                       CloseTransientFile(fd);
                        ereport(ERROR,
                                        (errcode_for_file_access(),
                                         errmsg("could not write two-phase state file: %m")));
@@ -911,7 +1008,7 @@ EndPrepare(GlobalTransaction gxact)
 
        if ((write(fd, &bogus_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
        {
-               close(fd);
+               CloseTransientFile(fd);
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("could not write two-phase state file: %m")));
@@ -920,7 +1017,7 @@ EndPrepare(GlobalTransaction gxact)
        /* Back up to prepare for rewriting the CRC */
        if (lseek(fd, -((off_t) sizeof(pg_crc32)), SEEK_CUR) < 0)
        {
-               close(fd);
+               CloseTransientFile(fd);
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("could not seek in two-phase state file: %m")));
@@ -934,22 +1031,22 @@ EndPrepare(GlobalTransaction gxact)
         * out the correct state file CRC, we have an inconsistency: the xact is
         * prepared according to WAL but not according to our on-disk state. We
         * use a critical section to force a PANIC if we are unable to complete
-        * the write --- then, WAL replay should repair the inconsistency.      The
+        * the write --- then, WAL replay should repair the inconsistency.  The
         * odds of a PANIC actually occurring should be very tiny given that we
         * were able to write the bogus CRC above.
         *
-        * We have to set inCommit here, too; otherwise a checkpoint
-        * starting immediately after the WAL record is inserted could complete
-        * without fsync'ing our state file.  (This is essentially the same kind
-        * of race condition as the COMMIT-to-clog-write case that
-        * RecordTransactionCommit uses inCommit for; see notes there.)
+        * We have to set delayChkpt here, too; otherwise a checkpoint starting
+        * immediately after the WAL record is inserted could complete without
+        * fsync'ing our state file.  (This is essentially the same kind of race
+        * condition as the COMMIT-to-clog-write case that RecordTransactionCommit
+        * uses delayChkpt for; see notes there.)
         *
         * We save the PREPARE record's location in the gxact for later use by
         * CheckPointTwoPhase.
         */
        START_CRIT_SECTION();
 
-       MyProc->inCommit = true;
+       MyPgXact->delayChkpt = true;
 
        gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
                                                                        records.head);
@@ -960,24 +1057,24 @@ EndPrepare(GlobalTransaction gxact)
        /* write correct CRC and close file */
        if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
        {
-               close(fd);
+               CloseTransientFile(fd);
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("could not write two-phase state file: %m")));
        }
 
-       if (close(fd) != 0)
+       if (CloseTransientFile(fd) != 0)
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("could not close two-phase state file: %m")));
 
        /*
-        * Mark the prepared transaction as valid.      As soon as xact.c marks MyProc
-        * as not running our XID (which it will do immediately after this
-        * function returns), others can commit/rollback the xact.
+        * Mark the prepared transaction as valid.  As soon as xact.c marks
+        * MyPgXact as not running our XID (which it will do immediately after
+        * this function returns), others can commit/rollback the xact.
         *
         * NB: a side effect of this is to make a dummy ProcArray entry for the
-        * prepared XID.  This must happen before we clear the XID from MyProc,
+        * prepared XID.  This must happen before we clear the XID from MyPgXact,
         * else there is a window where the XID is not running according to
         * TransactionIdIsInProgress, and onlookers would be entitled to assume
         * the xact crashed.  Instead we have a window where the same XID appears
@@ -986,14 +1083,22 @@ EndPrepare(GlobalTransaction gxact)
        MarkAsPrepared(gxact);
 
        /*
-        * Now we can mark ourselves as out of the commit critical section:
-        * checkpoint starting after this will certainly see the gxact as a
+        * Now we can mark ourselves as out of the commit critical section: a
+        * checkpoint starting after this will certainly see the gxact as a
         * candidate for fsyncing.
         */
-       MyProc->inCommit = false;
+       MyPgXact->delayChkpt = false;
 
        END_CRIT_SECTION();
 
+       /*
+        * Wait for synchronous replication, if required.
+        *
+        * Note that at this stage we have marked the prepare, but still show as
+        * running in the procarray (twice!) and continue to hold locks.
+        */
+       SyncRepWaitForLSN(gxact->prepare_lsn);
+
        records.tail = records.head = NULL;
 }
 
@@ -1022,7 +1127,7 @@ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
  * contents of the file.  Otherwise return NULL.
  */
 static char *
-ReadTwoPhaseFile(TransactionId xid)
+ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
 {
        char            path[MAXPGPATH];
        char       *buf;
@@ -1035,43 +1140,47 @@ ReadTwoPhaseFile(TransactionId xid)
 
        TwoPhaseFilePath(path, xid);
 
-       fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
+       fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
        if (fd < 0)
        {
-               ereport(WARNING,
-                               (errcode_for_file_access(),
-                                errmsg("could not open two-phase state file \"%s\": %m",
-                                               path)));
+               if (give_warnings)
+                       ereport(WARNING,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not open two-phase state file \"%s\": %m",
+                                                       path)));
                return NULL;
        }
 
        /*
         * Check file length.  We can determine a lower bound pretty easily. We
-        * set an upper bound mainly to avoid palloc() failure on a corrupt file.
+        * set an upper bound to avoid palloc() failure on a corrupt file, though
+        * we can't guarantee that we won't get an out of memory error anyway,
+        * even on a valid file.
         */
        if (fstat(fd, &stat))
        {
-               close(fd);
-               ereport(WARNING,
-                               (errcode_for_file_access(),
-                                errmsg("could not stat two-phase state file \"%s\": %m",
-                                               path)));
+               CloseTransientFile(fd);
+               if (give_warnings)
+                       ereport(WARNING,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not stat two-phase state file \"%s\": %m",
+                                                       path)));
                return NULL;
        }
 
        if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
                                                MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
                                                sizeof(pg_crc32)) ||
-               stat.st_size > 10000000)
+               stat.st_size > MaxAllocSize)
        {
-               close(fd);
+               CloseTransientFile(fd);
                return NULL;
        }
 
        crc_offset = stat.st_size - sizeof(pg_crc32);
        if (crc_offset != MAXALIGN(crc_offset))
        {
-               close(fd);
+               CloseTransientFile(fd);
                return NULL;
        }
 
@@ -1082,16 +1191,17 @@ ReadTwoPhaseFile(TransactionId xid)
 
        if (read(fd, buf, stat.st_size) != stat.st_size)
        {
-               close(fd);
-               ereport(WARNING,
-                               (errcode_for_file_access(),
-                                errmsg("could not read two-phase state file \"%s\": %m",
-                                               path)));
+               CloseTransientFile(fd);
+               if (give_warnings)
+                       ereport(WARNING,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not read two-phase state file \"%s\": %m",
+                                                       path)));
                pfree(buf);
                return NULL;
        }
 
-       close(fd);
+       CloseTransientFile(fd);
 
        hdr = (TwoPhaseFileHeader *) buf;
        if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size)
@@ -1115,6 +1225,33 @@ ReadTwoPhaseFile(TransactionId xid)
        return buf;
 }
 
+/*
+ * Confirms an xid is prepared, during recovery
+ */
+bool
+StandbyTransactionIdIsPrepared(TransactionId xid)
+{
+       char       *buf;
+       TwoPhaseFileHeader *hdr;
+       bool            result;
+
+       Assert(TransactionIdIsValid(xid));
+
+       if (max_prepared_xacts <= 0)
+               return false;                   /* nothing to do */
+
+       /* Read and validate file */
+       buf = ReadTwoPhaseFile(xid, false);
+       if (buf == NULL)
+               return false;
+
+       /* Check header also */
+       hdr = (TwoPhaseFileHeader *) buf;
+       result = TransactionIdEquals(hdr->xid, xid);
+       pfree(buf);
+
+       return result;
+}
 
 /*
  * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
@@ -1123,6 +1260,8 @@ void
 FinishPreparedTransaction(const char *gid, bool isCommit)
 {
        GlobalTransaction gxact;
+       PGPROC     *proc;
+       PGXACT     *pgxact;
        TransactionId xid;
        char       *buf;
        char       *bufptr;
@@ -1131,6 +1270,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
        TransactionId *children;
        RelFileNode *commitrels;
        RelFileNode *abortrels;
+       RelFileNode *delrels;
+       int                     ndelrels;
+       SharedInvalidationMessage *invalmsgs;
        int                     i;
 
        /*
@@ -1138,12 +1280,14 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
         * try to commit the same GID at once.
         */
        gxact = LockGXact(gid, GetUserId());
-       xid = gxact->proc.xid;
+       proc = &ProcGlobal->allProcs[gxact->pgprocno];
+       pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+       xid = pgxact->xid;
 
        /*
         * Read and validate the state file
         */
-       buf = ReadTwoPhaseFile(xid);
+       buf = ReadTwoPhaseFile(xid, true);
        if (buf == NULL)
                ereport(ERROR,
                                (errcode(ERRCODE_DATA_CORRUPTED),
@@ -1162,6 +1306,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
        bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
        abortrels = (RelFileNode *) bufptr;
        bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+       invalmsgs = (SharedInvalidationMessage *) bufptr;
+       bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
 
        /* compute latestXid among all children */
        latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
@@ -1177,18 +1323,20 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
        if (isCommit)
                RecordTransactionCommitPrepared(xid,
                                                                                hdr->nsubxacts, children,
-                                                                               hdr->ncommitrels, commitrels);
+                                                                               hdr->ncommitrels, commitrels,
+                                                                               hdr->ninvalmsgs, invalmsgs,
+                                                                               hdr->initfileinval);
        else
                RecordTransactionAbortPrepared(xid,
                                                                           hdr->nsubxacts, children,
                                                                           hdr->nabortrels, abortrels);
 
-       ProcArrayRemove(&gxact->proc, latestXid);
+       ProcArrayRemove(proc, latestXid);
 
        /*
         * In case we fail while running the callbacks, mark the gxact invalid so
         * no one else will try to commit/rollback, and so it can be recycled
-        * properly later.      It is still locked by our XID so it won't go away yet.
+        * properly later.  It is still locked by our XID so it won't go away yet.
         *
         * (We assume it's safe to do this without taking TwoPhaseStateLock.)
         */
@@ -1203,21 +1351,42 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
         */
        if (isCommit)
        {
-               for (i = 0; i < hdr->ncommitrels; i++)
-                       smgrdounlink(smgropen(commitrels[i]), false, false);
+               delrels = commitrels;
+               ndelrels = hdr->ncommitrels;
        }
        else
        {
-               for (i = 0; i < hdr->nabortrels; i++)
-                       smgrdounlink(smgropen(abortrels[i]), false, false);
+               delrels = abortrels;
+               ndelrels = hdr->nabortrels;
+       }
+       for (i = 0; i < ndelrels; i++)
+       {
+               SMgrRelation srel = smgropen(delrels[i], InvalidBackendId);
+
+               smgrdounlink(srel, false);
+               smgrclose(srel);
        }
 
+       /*
+        * Handle cache invalidation messages.
+        *
+        * Relcache init file invalidation requires processing both before and
+        * after we send the SI messages. See AtEOXact_Inval()
+        */
+       if (hdr->initfileinval)
+               RelationCacheInitFilePreInvalidate();
+       SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
+       if (hdr->initfileinval)
+               RelationCacheInitFilePostInvalidate();
+
        /* And now do the callbacks */
        if (isCommit)
                ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
        else
                ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
 
+       PredicateLockTwoPhaseFinish(xid, isCommit);
+
        /* Count the prepared xact as committed or aborted */
        AtEOXact_PgStat(isCommit);
 
@@ -1273,8 +1442,8 @@ RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
                if (errno != ENOENT || giveWarning)
                        ereport(WARNING,
                                        (errcode_for_file_access(),
-                                        errmsg("could not remove two-phase state file \"%s\": %m",
-                                                       path)));
+                                  errmsg("could not remove two-phase state file \"%s\": %m",
+                                                 path)));
 }
 
 /*
@@ -1296,9 +1465,9 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
 
        TwoPhaseFilePath(path, xid);
 
-       fd = BasicOpenFile(path,
-                                          O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY,
-                                          S_IRUSR | S_IWUSR);
+       fd = OpenTransientFile(path,
+                                                  O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY,
+                                                  S_IRUSR | S_IWUSR);
        if (fd < 0)
                ereport(ERROR,
                                (errcode_for_file_access(),
@@ -1308,14 +1477,14 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
        /* Write content and CRC */
        if (write(fd, content, len) != len)
        {
-               close(fd);
+               CloseTransientFile(fd);
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("could not write two-phase state file: %m")));
        }
        if (write(fd, &statefile_crc, sizeof(pg_crc32)) != sizeof(pg_crc32))
        {
-               close(fd);
+               CloseTransientFile(fd);
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("could not write two-phase state file: %m")));
@@ -1327,13 +1496,13 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
         */
        if (pg_fsync(fd) != 0)
        {
-               close(fd);
+               CloseTransientFile(fd);
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("could not fsync two-phase state file: %m")));
        }
 
-       if (close(fd) != 0)
+       if (CloseTransientFile(fd) != 0)
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("could not close two-phase state file: %m")));
@@ -1371,12 +1540,15 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
         *
         * This approach creates a race condition: someone else could delete a
         * GXACT between the time we release TwoPhaseStateLock and the time we try
-        * to open its state file.      We handle this by special-casing ENOENT
+        * to open its state file.  We handle this by special-casing ENOENT
         * failures: if we see that, we verify that the GXACT is no longer valid,
         * and if so ignore the failure.
         */
        if (max_prepared_xacts <= 0)
                return;                                 /* nothing to do */
+
+       TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
+
        xids = (TransactionId *) palloc(max_prepared_xacts * sizeof(TransactionId));
        nxids = 0;
 
@@ -1385,10 +1557,11 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
        {
                GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+               PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
 
                if (gxact->valid &&
-                       XLByteLE(gxact->prepare_lsn, redo_horizon))
-                       xids[nxids++] = gxact->proc.xid;
+                       gxact->prepare_lsn <= redo_horizon)
+                       xids[nxids++] = pgxact->xid;
        }
 
        LWLockRelease(TwoPhaseStateLock);
@@ -1400,7 +1573,7 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
 
                TwoPhaseFilePath(path, xid);
 
-               fd = BasicOpenFile(path, O_RDWR | PG_BINARY, 0);
+               fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
                if (fd < 0)
                {
                        if (errno == ENOENT)
@@ -1419,14 +1592,14 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
 
                if (pg_fsync(fd) != 0)
                {
-                       close(fd);
+                       CloseTransientFile(fd);
                        ereport(ERROR,
                                        (errcode_for_file_access(),
                                         errmsg("could not fsync two-phase state file \"%s\": %m",
                                                        path)));
                }
 
-               if (close(fd) != 0)
+               if (CloseTransientFile(fd) != 0)
                        ereport(ERROR,
                                        (errcode_for_file_access(),
                                         errmsg("could not close two-phase state file \"%s\": %m",
@@ -1434,6 +1607,8 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
        }
 
        pfree(xids);
+
+       TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
 }
 
 /*
@@ -1446,7 +1621,7 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
  *
  * We throw away any prepared xacts with main XID beyond nextXid --- if any
  * are present, it suggests that the DBA has done a PITR recovery to an
- * earlier point in time without cleaning out pg_twophase.     We dare not
+ * earlier point in time without cleaning out pg_twophase.  We dare not
  * try to recover such prepared xacts since they likely depend on database
  * state that doesn't exist now.
  *
@@ -1458,14 +1633,21 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
  * Our other responsibility is to determine and return the oldest valid XID
  * among the prepared xacts (if none, return ShmemVariableCache->nextXid).
  * This is needed to synchronize pg_subtrans startup properly.
+ *
+ * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
+ * top-level xids is stored in *xids_p. The number of entries in the array
+ * is returned in *nxids_p.
  */
 TransactionId
-PrescanPreparedTransactions(void)
+PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
 {
        TransactionId origNextXid = ShmemVariableCache->nextXid;
        TransactionId result = origNextXid;
        DIR                *cldir;
        struct dirent *clde;
+       TransactionId *xids = NULL;
+       int                     nxids = 0;
+       int                     allocsize = 0;
 
        cldir = AllocateDir(TWOPHASE_DIR);
        while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
@@ -1497,12 +1679,12 @@ PrescanPreparedTransactions(void)
                         */
 
                        /* Read and validate file */
-                       buf = ReadTwoPhaseFile(xid);
+                       buf = ReadTwoPhaseFile(xid, true);
                        if (buf == NULL)
                        {
                                ereport(WARNING,
-                                               (errmsg("removing corrupt two-phase state file \"%s\"",
-                                                               clde->d_name)));
+                                         (errmsg("removing corrupt two-phase state file \"%s\"",
+                                                         clde->d_name)));
                                RemoveTwoPhaseFile(xid, true);
                                continue;
                        }
@@ -1512,8 +1694,8 @@ PrescanPreparedTransactions(void)
                        if (!TransactionIdEquals(hdr->xid, xid))
                        {
                                ereport(WARNING,
-                                               (errmsg("removing corrupt two-phase state file \"%s\"",
-                                                               clde->d_name)));
+                                         (errmsg("removing corrupt two-phase state file \"%s\"",
+                                                         clde->d_name)));
                                RemoveTwoPhaseFile(xid, true);
                                pfree(buf);
                                continue;
@@ -1529,6 +1711,10 @@ PrescanPreparedTransactions(void)
                        /*
                         * Examine subtransaction XIDs ... they should all follow main
                         * XID, and they may force us to advance nextXid.
+                        *
+                        * We don't expect anyone else to modify nextXid, hence we don't
+                        * need to hold a lock while examining it.  We still acquire the
+                        * lock to modify it, though.
                         */
                        subxids = (TransactionId *)
                                (buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
@@ -1540,9 +1726,30 @@ PrescanPreparedTransactions(void)
                                if (TransactionIdFollowsOrEquals(subxid,
                                                                                                 ShmemVariableCache->nextXid))
                                {
+                                       LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
                                        ShmemVariableCache->nextXid = subxid;
                                        TransactionIdAdvance(ShmemVariableCache->nextXid);
+                                       LWLockRelease(XidGenLock);
+                               }
+                       }
+
+
+                       if (xids_p)
+                       {
+                               if (nxids == allocsize)
+                               {
+                                       if (nxids == 0)
+                                       {
+                                               allocsize = 10;
+                                               xids = palloc(allocsize * sizeof(TransactionId));
+                                       }
+                                       else
+                                       {
+                                               allocsize = allocsize * 2;
+                                               xids = repalloc(xids, allocsize * sizeof(TransactionId));
+                                       }
                                }
+                               xids[nxids++] = xid;
                        }
 
                        pfree(buf);
@@ -1550,9 +1757,98 @@ PrescanPreparedTransactions(void)
        }
        FreeDir(cldir);
 
+       if (xids_p)
+       {
+               *xids_p = xids;
+               *nxids_p = nxids;
+       }
+
        return result;
 }
 
+/*
+ * StandbyRecoverPreparedTransactions
+ *
+ * Scan the pg_twophase directory and setup all the required information to
+ * allow standby queries to treat prepared transactions as still active.
+ * This is never called at the end of recovery - we use
+ * RecoverPreparedTransactions() at that point.
+ *
+ * Currently we simply call SubTransSetParent() for any subxids of prepared
+ * transactions. If overwriteOK is true, it's OK if some XIDs have already
+ * been marked in pg_subtrans.
+ */
+void
+StandbyRecoverPreparedTransactions(bool overwriteOK)
+{
+       DIR                *cldir;
+       struct dirent *clde;
+
+       cldir = AllocateDir(TWOPHASE_DIR);
+       while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
+       {
+               if (strlen(clde->d_name) == 8 &&
+                       strspn(clde->d_name, "0123456789ABCDEF") == 8)
+               {
+                       TransactionId xid;
+                       char       *buf;
+                       TwoPhaseFileHeader *hdr;
+                       TransactionId *subxids;
+                       int                     i;
+
+                       xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
+
+                       /* Already processed? */
+                       if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
+                       {
+                               ereport(WARNING,
+                                               (errmsg("removing stale two-phase state file \"%s\"",
+                                                               clde->d_name)));
+                               RemoveTwoPhaseFile(xid, true);
+                               continue;
+                       }
+
+                       /* Read and validate file */
+                       buf = ReadTwoPhaseFile(xid, true);
+                       if (buf == NULL)
+                       {
+                               ereport(WARNING,
+                                         (errmsg("removing corrupt two-phase state file \"%s\"",
+                                                         clde->d_name)));
+                               RemoveTwoPhaseFile(xid, true);
+                               continue;
+                       }
+
+                       /* Deconstruct header */
+                       hdr = (TwoPhaseFileHeader *) buf;
+                       if (!TransactionIdEquals(hdr->xid, xid))
+                       {
+                               ereport(WARNING,
+                                         (errmsg("removing corrupt two-phase state file \"%s\"",
+                                                         clde->d_name)));
+                               RemoveTwoPhaseFile(xid, true);
+                               pfree(buf);
+                               continue;
+                       }
+
+                       /*
+                        * Examine subtransaction XIDs ... they should all follow main
+                        * XID.
+                        */
+                       subxids = (TransactionId *)
+                               (buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
+                       for (i = 0; i < hdr->nsubxacts; i++)
+                       {
+                               TransactionId subxid = subxids[i];
+
+                               Assert(TransactionIdFollows(subxid, xid));
+                               SubTransSetParent(xid, subxid, overwriteOK);
+                       }
+               }
+       }
+       FreeDir(cldir);
+}
+
 /*
  * RecoverPreparedTransactions
  *
@@ -1566,6 +1862,7 @@ RecoverPreparedTransactions(void)
        char            dir[MAXPGPATH];
        DIR                *cldir;
        struct dirent *clde;
+       bool            overwriteOK = false;
 
        snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR);
 
@@ -1596,12 +1893,12 @@ RecoverPreparedTransactions(void)
                        }
 
                        /* Read and validate file */
-                       buf = ReadTwoPhaseFile(xid);
+                       buf = ReadTwoPhaseFile(xid, true);
                        if (buf == NULL)
                        {
                                ereport(WARNING,
-                                               (errmsg("removing corrupt two-phase state file \"%s\"",
-                                                               clde->d_name)));
+                                         (errmsg("removing corrupt two-phase state file \"%s\"",
+                                                         clde->d_name)));
                                RemoveTwoPhaseFile(xid, true);
                                continue;
                        }
@@ -1617,6 +1914,16 @@ RecoverPreparedTransactions(void)
                        bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
                        bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
                        bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+                       bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
+
+                       /*
+                        * It's possible that SubTransSetParent has been set before, if
+                        * the prepared transaction generated xid assignment records. Test
+                        * here must match one used in AssignTransactionId().
+                        */
+                       if (InHotStandby && (hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS ||
+                                                                XLogLogicalInfoActive()))
+                               overwriteOK = true;
 
                        /*
                         * Reconstruct subtrans state for the transaction --- needed
@@ -1626,7 +1933,7 @@ RecoverPreparedTransactions(void)
                         * hierarchy, but there's no need to restore that exactly.
                         */
                        for (i = 0; i < hdr->nsubxacts; i++)
-                               SubTransSetParent(subxids[i], xid);
+                               SubTransSetParent(subxids[i], xid, overwriteOK);
 
                        /*
                         * Recreate its GXACT and dummy PGPROC
@@ -1649,6 +1956,14 @@ RecoverPreparedTransactions(void)
                         */
                        ProcessRecords(bufptr, xid, twophase_recover_callbacks);
 
+                       /*
+                        * Release locks held by the standby process after we process each
+                        * prepared transaction. As a result, we don't need too many
+                        * additional locks at any one time.
+                        */
+                       if (InHotStandby)
+                               StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
+
                        pfree(buf);
                }
        }
@@ -1659,7 +1974,7 @@ RecoverPreparedTransactions(void)
  *     RecordTransactionCommitPrepared
  *
  * This is basically the same as RecordTransactionCommit: in particular,
- * we must set the inCommit flag to avoid a race condition.
+ * we must set the delayChkpt flag to avoid a race condition.
  *
  * We know the transaction made at least one XLOG entry (its PREPARE),
  * so it is never possible to optimize out the commit record.
@@ -1669,9 +1984,12 @@ RecordTransactionCommitPrepared(TransactionId xid,
                                                                int nchildren,
                                                                TransactionId *children,
                                                                int nrels,
-                                                               RelFileNode *rels)
+                                                               RelFileNode *rels,
+                                                               int ninvalmsgs,
+                                                               SharedInvalidationMessage *invalmsgs,
+                                                               bool initfileinval)
 {
-       XLogRecData rdata[3];
+       XLogRecData rdata[4];
        int                     lastrdata = 0;
        xl_xact_commit_prepared xlrec;
        XLogRecPtr      recptr;
@@ -1679,13 +1997,17 @@ RecordTransactionCommitPrepared(TransactionId xid,
        START_CRIT_SECTION();
 
        /* See notes in RecordTransactionCommit */
-       MyProc->inCommit = true;
+       MyPgXact->delayChkpt = true;
 
        /* Emit the XLOG commit record */
        xlrec.xid = xid;
        xlrec.crec.xact_time = GetCurrentTimestamp();
+       xlrec.crec.xinfo = initfileinval ? XACT_COMPLETION_UPDATE_RELCACHE_FILE : 0;
+       xlrec.crec.nmsgs = 0;
        xlrec.crec.nrels = nrels;
        xlrec.crec.nsubxacts = nchildren;
+       xlrec.crec.nmsgs = ninvalmsgs;
+
        rdata[0].data = (char *) (&xlrec);
        rdata[0].len = MinSizeOfXactCommitPrepared;
        rdata[0].buffer = InvalidBuffer;
@@ -1707,28 +2029,43 @@ RecordTransactionCommitPrepared(TransactionId xid,
                rdata[2].buffer = InvalidBuffer;
                lastrdata = 2;
        }
+       /* dump cache invalidation messages */
+       if (ninvalmsgs > 0)
+       {
+               rdata[lastrdata].next = &(rdata[3]);
+               rdata[3].data = (char *) invalmsgs;
+               rdata[3].len = ninvalmsgs * sizeof(SharedInvalidationMessage);
+               rdata[3].buffer = InvalidBuffer;
+               lastrdata = 3;
+       }
        rdata[lastrdata].next = NULL;
 
        recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED, rdata);
 
        /*
-        * We don't currently try to sleep before flush here ... nor is there
-        * any support for async commit of a prepared xact (the very idea is
-        * probably a contradiction)
+        * We don't currently try to sleep before flush here ... nor is there any
+        * support for async commit of a prepared xact (the very idea is probably
+        * a contradiction)
         */
 
        /* Flush XLOG to disk */
        XLogFlush(recptr);
 
        /* Mark the transaction committed in pg_clog */
-       TransactionIdCommit(xid);
-       /* to avoid race conditions, the parent must commit first */
-       TransactionIdCommitTree(nchildren, children);
+       TransactionIdCommitTree(xid, nchildren, children);
 
        /* Checkpoint can proceed now */
-       MyProc->inCommit = false;
+       MyPgXact->delayChkpt = false;
 
        END_CRIT_SECTION();
+
+       /*
+        * Wait for synchronous replication, if required.
+        *
+        * Note that at this stage we have marked clog, but still show as running
+        * in the procarray and continue to hold locks.
+        */
+       SyncRepWaitForLSN(recptr);
 }
 
 /*
@@ -1798,8 +2135,15 @@ RecordTransactionAbortPrepared(TransactionId xid,
         * Mark the transaction aborted in clog.  This is not absolutely necessary
         * but we may as well do it while we are here.
         */
-       TransactionIdAbort(xid);
-       TransactionIdAbortTree(nchildren, children);
+       TransactionIdAbortTree(xid, nchildren, children);
 
        END_CRIT_SECTION();
+
+       /*
+        * Wait for synchronous replication, if required.
+        *
+        * Note that at this stage we have marked clog, but still show as running
+        * in the procarray and continue to hold locks.
+        */
+       SyncRepWaitForLSN(recptr);
 }