]> granicus.if.org Git - postgresql/blobdiff - src/backend/access/transam/twophase.c
pgindent run for 9.4
[postgresql] / src / backend / access / transam / twophase.c
index d2fecb1ecb9171fc2c6dd2887d846c8a16779bb0..70ca6ab67d1ef8a25f7f8bf9515a116239aa8fb8 100644 (file)
@@ -3,7 +3,7 @@
  * twophase.c
  *             Two-phase commit support functions.
  *
- * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
  *             GIDs and aborts the transaction if there already is a global
  *             transaction in prepared state with the same GID.
  *
- *             A global transaction (gxact) also has a dummy PGPROC that is entered
- *             into the ProcArray array; this is what keeps the XID considered
- *             running by TransactionIdIsInProgress.  It is also convenient as a
- *             PGPROC to hook the gxact's locks to.
+ *             A global transaction (gxact) also has dummy PGXACT and PGPROC; this is
+ *             what keeps the XID considered running by TransactionIdIsInProgress.
+ *             It is also convenient as a PGPROC to hook the gxact's locks to.
  *
  *             In order to survive crashes and shutdowns, all prepared
  *             transactions must be stored in permanent storage. This includes
 #include <time.h>
 #include <unistd.h>
 
-#include "access/htup.h"
+#include "access/htup_details.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
 #include "access/twophase.h"
 #include "access/twophase_rmgr.h"
 #include "access/xact.h"
+#include "access/xlog.h"
 #include "access/xlogutils.h"
 #include "catalog/pg_type.h"
 #include "catalog/storage.h"
@@ -59,6 +59,7 @@
 #include "replication/syncrep.h"
 #include "storage/fd.h"
 #include "storage/predicate.h"
+#include "storage/proc.h"
 #include "storage/procarray.h"
 #include "storage/sinvaladt.h"
 #include "storage/smgr.h"
@@ -79,11 +80,6 @@ int                  max_prepared_xacts = 0;
  * This struct describes one global transaction that is in prepared state
  * or attempting to become prepared.
  *
- * The first component of the struct is a dummy PGPROC that is inserted
- * into the global ProcArray so that the transaction appears to still be
- * running and holding locks.  It must be first because we cast pointers
- * to PGPROC and pointers to GlobalTransactionData back and forth.
- *
  * The lifecycle of a global transaction is:
  *
  * 1. After checking that the requested GID is not in use, set up an
@@ -91,7 +87,7 @@ int                   max_prepared_xacts = 0;
  * with locking_xid = my own XID and valid = false.
  *
  * 2. After successfully completing prepare, set valid = true and enter the
- * contained PGPROC into the global ProcArray.
+ * referenced PGPROC into the global ProcArray.
  *
  * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry
  * is valid and its locking_xid is no longer active, then store my current
@@ -113,8 +109,8 @@ int                 max_prepared_xacts = 0;
 
 typedef struct GlobalTransactionData
 {
-       GlobalTransaction next;
-       int                     pgprocno;               /* dummy proc */
+       GlobalTransaction next;         /* list link for free list */
+       int                     pgprocno;               /* ID of associated dummy PGPROC */
        BackendId       dummyBackendId; /* similar to backend id for backends */
        TimestampTz prepared_at;        /* time of preparation */
        XLogRecPtr      prepare_lsn;    /* XLOG offset of prepare record */
@@ -208,10 +204,13 @@ TwoPhaseShmemInit(void)
                                          sizeof(GlobalTransaction) * max_prepared_xacts));
                for (i = 0; i < max_prepared_xacts; i++)
                {
-                       gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno;
+                       /* insert into linked list */
                        gxacts[i].next = TwoPhaseState->freeGXacts;
                        TwoPhaseState->freeGXacts = &gxacts[i];
 
+                       /* associate it with a PGPROC assigned by InitProcGlobal */
+                       gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno;
+
                        /*
                         * Assign a unique ID for each dummy proc, so that the range of
                         * dummy backend IDs immediately follows the range of normal
@@ -306,7 +305,7 @@ MarkAsPreparing(TransactionId xid, const char *gid,
                                 errhint("Increase max_prepared_transactions (currently %d).",
                                                 max_prepared_xacts)));
        gxact = TwoPhaseState->freeGXacts;
-       TwoPhaseState->freeGXacts = (GlobalTransaction) gxact->next;
+       TwoPhaseState->freeGXacts = gxact->next;
 
        proc = &ProcGlobal->allProcs[gxact->pgprocno];
        pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
@@ -320,14 +319,14 @@ MarkAsPreparing(TransactionId xid, const char *gid,
        proc->lxid = (LocalTransactionId) xid;
        pgxact->xid = xid;
        pgxact->xmin = InvalidTransactionId;
-       pgxact->inCommit = false;
+       pgxact->delayChkpt = false;
        pgxact->vacuumFlags = 0;
        proc->pid = 0;
        proc->backendId = InvalidBackendId;
        proc->databaseId = databaseid;
        proc->roleId = owner;
        proc->lwWaiting = false;
-       proc->lwExclusive = false;
+       proc->lwWaitMode = 0;
        proc->lwWaitLink = NULL;
        proc->waitLock = NULL;
        proc->waitProcLock = NULL;
@@ -339,8 +338,7 @@ MarkAsPreparing(TransactionId xid, const char *gid,
 
        gxact->prepared_at = prepared_at;
        /* initialize LSN to 0 (start of WAL) */
-       gxact->prepare_lsn.xlogid = 0;
-       gxact->prepare_lsn.xrecoff = 0;
+       gxact->prepare_lsn = 0;
        gxact->owner = owner;
        gxact->locking_xid = xid;
        gxact->valid = false;
@@ -366,8 +364,9 @@ static void
 GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
                                         TransactionId *children)
 {
-       PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
-       PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+       PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
+       PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+
        /* We need no extra lock since the GXACT isn't valid yet */
        if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
        {
@@ -416,7 +415,7 @@ LockGXact(const char *gid, Oid user)
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
        {
                GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
-               PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
+               PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
 
                /* Ignore not-yet-valid GIDs */
                if (!gxact->valid)
@@ -444,7 +443,7 @@ LockGXact(const char *gid, Oid user)
                /*
                 * Note: it probably would be possible to allow committing from
                 * another database; but at the moment NOTIFY is known not to work and
-                * there may be some other issues as well.      Hence disallow until
+                * there may be some other issues as well.  Hence disallow until
                 * someone gets motivated to make it work.
                 */
                if (MyDatabaseId != proc->databaseId)
@@ -529,7 +528,7 @@ TransactionIdIsPrepared(TransactionId xid)
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
        {
                GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
-               PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+               PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
 
                if (gxact->valid && pgxact->xid == xid)
                {
@@ -654,8 +653,8 @@ pg_prepared_xact(PG_FUNCTION_ARGS)
        while (status->array != NULL && status->currIdx < status->ngxacts)
        {
                GlobalTransaction gxact = &status->array[status->currIdx++];
-               PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
-               PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+               PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
+               PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
                Datum           values[5];
                bool            nulls[5];
                HeapTuple       tuple;
@@ -685,51 +684,36 @@ pg_prepared_xact(PG_FUNCTION_ARGS)
 }
 
 /*
- * TwoPhaseGetDummyProc
- *             Get the dummy backend ID for prepared transaction specified by XID
- *
- * Dummy backend IDs are similar to real backend IDs of real backends.
- * They start at MaxBackends + 1, and are unique across all currently active
- * real backends and prepared transactions.
- */
-BackendId
-TwoPhaseGetDummyBackendId(TransactionId xid)
-{
-       PGPROC     *proc = TwoPhaseGetDummyProc(xid);
-
-       return ((GlobalTransaction) proc)->dummyBackendId;
-}
-
-/*
- * TwoPhaseGetDummyProc
- *             Get the PGPROC that represents a prepared transaction specified by XID
+ * TwoPhaseGetGXact
+ *             Get the GlobalTransaction struct for a prepared transaction
+ *             specified by XID
  */
-PGPROC *
-TwoPhaseGetDummyProc(TransactionId xid)
+static GlobalTransaction
+TwoPhaseGetGXact(TransactionId xid)
 {
-       PGPROC     *result = NULL;
+       GlobalTransaction result = NULL;
        int                     i;
 
        static TransactionId cached_xid = InvalidTransactionId;
-       static PGPROC *cached_proc = NULL;
+       static GlobalTransaction cached_gxact = NULL;
 
        /*
         * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
         * repeatedly for the same XID.  We can save work with a simple cache.
         */
        if (xid == cached_xid)
-               return cached_proc;
+               return cached_gxact;
 
        LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
 
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
        {
                GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
-               PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+               PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
 
                if (pgxact->xid == xid)
                {
-                       result = &ProcGlobal->allProcs[gxact->pgprocno];
+                       result = gxact;
                        break;
                }
        }
@@ -737,14 +721,42 @@ TwoPhaseGetDummyProc(TransactionId xid)
        LWLockRelease(TwoPhaseStateLock);
 
        if (result == NULL)                     /* should not happen */
-               elog(ERROR, "failed to find dummy PGPROC for xid %u", xid);
+               elog(ERROR, "failed to find GlobalTransaction for xid %u", xid);
 
        cached_xid = xid;
-       cached_proc = result;
+       cached_gxact = result;
 
        return result;
 }
 
+/*
+ * TwoPhaseGetDummyProc
+ *             Get the dummy backend ID for prepared transaction specified by XID
+ *
+ * Dummy backend IDs are similar to real backend IDs of real backends.
+ * They start at MaxBackends + 1, and are unique across all currently active
+ * real backends and prepared transactions.
+ */
+BackendId
+TwoPhaseGetDummyBackendId(TransactionId xid)
+{
+       GlobalTransaction gxact = TwoPhaseGetGXact(xid);
+
+       return gxact->dummyBackendId;
+}
+
+/*
+ * TwoPhaseGetDummyProc
+ *             Get the PGPROC that represents a prepared transaction specified by XID
+ */
+PGPROC *
+TwoPhaseGetDummyProc(TransactionId xid)
+{
+       GlobalTransaction gxact = TwoPhaseGetGXact(xid);
+
+       return &ProcGlobal->allProcs[gxact->pgprocno];
+}
+
 /************************************************************************/
 /* State file support                                                                                                  */
 /************************************************************************/
@@ -856,8 +868,8 @@ save_state_data(const void *data, uint32 len)
 void
 StartPrepare(GlobalTransaction gxact)
 {
-       PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
-       PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+       PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
+       PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
        TransactionId xid = pgxact->xid;
        TwoPhaseFileHeader hdr;
        TransactionId *children;
@@ -959,17 +971,12 @@ EndPrepare(GlobalTransaction gxact)
 
        /*
         * Create the 2PC state file.
-        *
-        * Note: because we use BasicOpenFile(), we are responsible for ensuring
-        * the FD gets closed in any error exit path.  Once we get into the
-        * critical section, though, it doesn't matter since any failure causes
-        * PANIC anyway.
         */
        TwoPhaseFilePath(path, xid);
 
-       fd = BasicOpenFile(path,
-                                          O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
-                                          S_IRUSR | S_IWUSR);
+       fd = OpenTransientFile(path,
+                                                  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
+                                                  S_IRUSR | S_IWUSR);
        if (fd < 0)
                ereport(ERROR,
                                (errcode_for_file_access(),
@@ -984,7 +991,7 @@ EndPrepare(GlobalTransaction gxact)
                COMP_CRC32(statefile_crc, record->data, record->len);
                if ((write(fd, record->data, record->len)) != record->len)
                {
-                       close(fd);
+                       CloseTransientFile(fd);
                        ereport(ERROR,
                                        (errcode_for_file_access(),
                                         errmsg("could not write two-phase state file: %m")));
@@ -1001,7 +1008,7 @@ EndPrepare(GlobalTransaction gxact)
 
        if ((write(fd, &bogus_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
        {
-               close(fd);
+               CloseTransientFile(fd);
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("could not write two-phase state file: %m")));
@@ -1010,7 +1017,7 @@ EndPrepare(GlobalTransaction gxact)
        /* Back up to prepare for rewriting the CRC */
        if (lseek(fd, -((off_t) sizeof(pg_crc32)), SEEK_CUR) < 0)
        {
-               close(fd);
+               CloseTransientFile(fd);
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("could not seek in two-phase state file: %m")));
@@ -1024,22 +1031,22 @@ EndPrepare(GlobalTransaction gxact)
         * out the correct state file CRC, we have an inconsistency: the xact is
         * prepared according to WAL but not according to our on-disk state. We
         * use a critical section to force a PANIC if we are unable to complete
-        * the write --- then, WAL replay should repair the inconsistency.      The
+        * the write --- then, WAL replay should repair the inconsistency.  The
         * odds of a PANIC actually occurring should be very tiny given that we
         * were able to write the bogus CRC above.
         *
-        * We have to set inCommit here, too; otherwise a checkpoint starting
+        * We have to set delayChkpt here, too; otherwise a checkpoint starting
         * immediately after the WAL record is inserted could complete without
         * fsync'ing our state file.  (This is essentially the same kind of race
         * condition as the COMMIT-to-clog-write case that RecordTransactionCommit
-        * uses inCommit for; see notes there.)
+        * uses delayChkpt for; see notes there.)
         *
         * We save the PREPARE record's location in the gxact for later use by
         * CheckPointTwoPhase.
         */
        START_CRIT_SECTION();
 
-       MyPgXact->inCommit = true;
+       MyPgXact->delayChkpt = true;
 
        gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
                                                                        records.head);
@@ -1047,34 +1054,27 @@ EndPrepare(GlobalTransaction gxact)
 
        /* If we crash now, we have prepared: WAL replay will fix things */
 
-       /*
-        * Wake up all walsenders to send WAL up to the PREPARE record immediately
-        * if replication is enabled
-        */
-       if (max_wal_senders > 0)
-               WalSndWakeup();
-
        /* write correct CRC and close file */
        if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
        {
-               close(fd);
+               CloseTransientFile(fd);
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("could not write two-phase state file: %m")));
        }
 
-       if (close(fd) != 0)
+       if (CloseTransientFile(fd) != 0)
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("could not close two-phase state file: %m")));
 
        /*
-        * Mark the prepared transaction as valid.      As soon as xact.c marks MyProc
-        * as not running our XID (which it will do immediately after this
-        * function returns), others can commit/rollback the xact.
+        * Mark the prepared transaction as valid.  As soon as xact.c marks
+        * MyPgXact as not running our XID (which it will do immediately after
+        * this function returns), others can commit/rollback the xact.
         *
         * NB: a side effect of this is to make a dummy ProcArray entry for the
-        * prepared XID.  This must happen before we clear the XID from MyProc,
+        * prepared XID.  This must happen before we clear the XID from MyPgXact,
         * else there is a window where the XID is not running according to
         * TransactionIdIsInProgress, and onlookers would be entitled to assume
         * the xact crashed.  Instead we have a window where the same XID appears
@@ -1087,7 +1087,7 @@ EndPrepare(GlobalTransaction gxact)
         * checkpoint starting after this will certainly see the gxact as a
         * candidate for fsyncing.
         */
-       MyPgXact->inCommit = false;
+       MyPgXact->delayChkpt = false;
 
        END_CRIT_SECTION();
 
@@ -1140,7 +1140,7 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
 
        TwoPhaseFilePath(path, xid);
 
-       fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
+       fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
        if (fd < 0)
        {
                if (give_warnings)
@@ -1159,7 +1159,7 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
         */
        if (fstat(fd, &stat))
        {
-               close(fd);
+               CloseTransientFile(fd);
                if (give_warnings)
                        ereport(WARNING,
                                        (errcode_for_file_access(),
@@ -1173,14 +1173,14 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
                                                sizeof(pg_crc32)) ||
                stat.st_size > MaxAllocSize)
        {
-               close(fd);
+               CloseTransientFile(fd);
                return NULL;
        }
 
        crc_offset = stat.st_size - sizeof(pg_crc32);
        if (crc_offset != MAXALIGN(crc_offset))
        {
-               close(fd);
+               CloseTransientFile(fd);
                return NULL;
        }
 
@@ -1191,7 +1191,7 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
 
        if (read(fd, buf, stat.st_size) != stat.st_size)
        {
-               close(fd);
+               CloseTransientFile(fd);
                if (give_warnings)
                        ereport(WARNING,
                                        (errcode_for_file_access(),
@@ -1201,7 +1201,7 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
                return NULL;
        }
 
-       close(fd);
+       CloseTransientFile(fd);
 
        hdr = (TwoPhaseFileHeader *) buf;
        if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size)
@@ -1336,7 +1336,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
        /*
         * In case we fail while running the callbacks, mark the gxact invalid so
         * no one else will try to commit/rollback, and so it can be recycled
-        * properly later.      It is still locked by our XID so it won't go away yet.
+        * properly later.  It is still locked by our XID so it won't go away yet.
         *
         * (We assume it's safe to do this without taking TwoPhaseStateLock.)
         */
@@ -1362,13 +1362,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
        for (i = 0; i < ndelrels; i++)
        {
                SMgrRelation srel = smgropen(delrels[i], InvalidBackendId);
-               ForkNumber      fork;
 
-               for (fork = 0; fork <= MAX_FORKNUM; fork++)
-               {
-                       if (smgrexists(srel, fork))
-                               smgrdounlink(srel, fork, false);
-               }
+               smgrdounlink(srel, false);
                smgrclose(srel);
        }
 
@@ -1470,9 +1465,9 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
 
        TwoPhaseFilePath(path, xid);
 
-       fd = BasicOpenFile(path,
-                                          O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY,
-                                          S_IRUSR | S_IWUSR);
+       fd = OpenTransientFile(path,
+                                                  O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY,
+                                                  S_IRUSR | S_IWUSR);
        if (fd < 0)
                ereport(ERROR,
                                (errcode_for_file_access(),
@@ -1482,14 +1477,14 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
        /* Write content and CRC */
        if (write(fd, content, len) != len)
        {
-               close(fd);
+               CloseTransientFile(fd);
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("could not write two-phase state file: %m")));
        }
        if (write(fd, &statefile_crc, sizeof(pg_crc32)) != sizeof(pg_crc32))
        {
-               close(fd);
+               CloseTransientFile(fd);
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("could not write two-phase state file: %m")));
@@ -1501,13 +1496,13 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
         */
        if (pg_fsync(fd) != 0)
        {
-               close(fd);
+               CloseTransientFile(fd);
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("could not fsync two-phase state file: %m")));
        }
 
-       if (close(fd) != 0)
+       if (CloseTransientFile(fd) != 0)
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("could not close two-phase state file: %m")));
@@ -1545,7 +1540,7 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
         *
         * This approach creates a race condition: someone else could delete a
         * GXACT between the time we release TwoPhaseStateLock and the time we try
-        * to open its state file.      We handle this by special-casing ENOENT
+        * to open its state file.  We handle this by special-casing ENOENT
         * failures: if we see that, we verify that the GXACT is no longer valid,
         * and if so ignore the failure.
         */
@@ -1562,10 +1557,10 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
        {
                GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
-               PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+               PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
 
                if (gxact->valid &&
-                       XLByteLE(gxact->prepare_lsn, redo_horizon))
+                       gxact->prepare_lsn <= redo_horizon)
                        xids[nxids++] = pgxact->xid;
        }
 
@@ -1578,7 +1573,7 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
 
                TwoPhaseFilePath(path, xid);
 
-               fd = BasicOpenFile(path, O_RDWR | PG_BINARY, 0);
+               fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
                if (fd < 0)
                {
                        if (errno == ENOENT)
@@ -1597,14 +1592,14 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
 
                if (pg_fsync(fd) != 0)
                {
-                       close(fd);
+                       CloseTransientFile(fd);
                        ereport(ERROR,
                                        (errcode_for_file_access(),
                                         errmsg("could not fsync two-phase state file \"%s\": %m",
                                                        path)));
                }
 
-               if (close(fd) != 0)
+               if (CloseTransientFile(fd) != 0)
                        ereport(ERROR,
                                        (errcode_for_file_access(),
                                         errmsg("could not close two-phase state file \"%s\": %m",
@@ -1626,7 +1621,7 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
  *
  * We throw away any prepared xacts with main XID beyond nextXid --- if any
  * are present, it suggests that the DBA has done a PITR recovery to an
- * earlier point in time without cleaning out pg_twophase.     We dare not
+ * earlier point in time without cleaning out pg_twophase.  We dare not
  * try to recover such prepared xacts since they likely depend on database
  * state that doesn't exist now.
  *
@@ -1716,6 +1711,10 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
                        /*
                         * Examine subtransaction XIDs ... they should all follow main
                         * XID, and they may force us to advance nextXid.
+                        *
+                        * We don't expect anyone else to modify nextXid, hence we don't
+                        * need to hold a lock while examining it.  We still acquire the
+                        * lock to modify it, though.
                         */
                        subxids = (TransactionId *)
                                (buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
@@ -1727,8 +1726,10 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
                                if (TransactionIdFollowsOrEquals(subxid,
                                                                                                 ShmemVariableCache->nextXid))
                                {
+                                       LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
                                        ShmemVariableCache->nextXid = subxid;
                                        TransactionIdAdvance(ShmemVariableCache->nextXid);
+                                       LWLockRelease(XidGenLock);
                                }
                        }
 
@@ -1920,7 +1921,8 @@ RecoverPreparedTransactions(void)
                         * the prepared transaction generated xid assignment records. Test
                         * here must match one used in AssignTransactionId().
                         */
-                       if (InHotStandby && hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS)
+                       if (InHotStandby && (hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS ||
+                                                                XLogLogicalInfoActive()))
                                overwriteOK = true;
 
                        /*
@@ -1972,7 +1974,7 @@ RecoverPreparedTransactions(void)
  *     RecordTransactionCommitPrepared
  *
  * This is basically the same as RecordTransactionCommit: in particular,
- * we must set the inCommit flag to avoid a race condition.
+ * we must set the delayChkpt flag to avoid a race condition.
  *
  * We know the transaction made at least one XLOG entry (its PREPARE),
  * so it is never possible to optimize out the commit record.
@@ -1995,7 +1997,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
        START_CRIT_SECTION();
 
        /* See notes in RecordTransactionCommit */
-       MyPgXact->inCommit = true;
+       MyPgXact->delayChkpt = true;
 
        /* Emit the XLOG commit record */
        xlrec.xid = xid;
@@ -2049,18 +2051,11 @@ RecordTransactionCommitPrepared(TransactionId xid,
        /* Flush XLOG to disk */
        XLogFlush(recptr);
 
-       /*
-        * Wake up all walsenders to send WAL up to the COMMIT PREPARED record
-        * immediately if replication is enabled
-        */
-       if (max_wal_senders > 0)
-               WalSndWakeup();
-
        /* Mark the transaction committed in pg_clog */
        TransactionIdCommitTree(xid, nchildren, children);
 
        /* Checkpoint can proceed now */
-       MyPgXact->inCommit = false;
+       MyPgXact->delayChkpt = false;
 
        END_CRIT_SECTION();
 
@@ -2136,13 +2131,6 @@ RecordTransactionAbortPrepared(TransactionId xid,
        /* Always flush, since we're about to remove the 2PC state file */
        XLogFlush(recptr);
 
-       /*
-        * Wake up all walsenders to send WAL up to the ABORT PREPARED record
-        * immediately if replication is enabled
-        */
-       if (max_wal_senders > 0)
-               WalSndWakeup();
-
        /*
         * Mark the transaction aborted in clog.  This is not absolutely necessary
         * but we may as well do it while we are here.