]> granicus.if.org Git - postgresql/commitdiff
Speedup 2PC recovery by skipping two phase state files in normal path
authorSimon Riggs <simon@2ndQuadrant.com>
Tue, 4 Apr 2017 19:56:56 +0000 (15:56 -0400)
committerSimon Riggs <simon@2ndQuadrant.com>
Tue, 4 Apr 2017 19:56:56 +0000 (15:56 -0400)
2PC state info held in shmem at PREPARE, then cleaned at COMMIT PREPARED/ABORT PREPARED,
avoiding writing/fsyncing any state information to disk in the normal path, greatly enhancing replay speed.
Prepared transactions that live past one checkpoint redo horizon will be written to disk as now.
Similar conceptually to 978b2f65aa1262eb4ecbf8b3785cb1b9cf4db78e and building upon
the infrastructure created by that commit.

Authors, in equal measure: Stas Kelvich, Nikhil Sontakke and Michael Paquier
Discussion: https://postgr.es/m/CAMGcDxf8Bn9ZPBBJZba9wiyQq-Qk5uqq=VjoMnRnW5s+fKST3w@mail.gmail.com

src/backend/access/transam/twophase.c
src/backend/access/transam/xact.c
src/backend/access/transam/xlog.c
src/include/access/twophase.h

index 83169cccc301179a601b33d7ae0f87145ddd2450..d0e2bbf2916bcedaf5a81cd5cbb8e5c8dc4323ba 100644 (file)
  *               fsynced
  *             * If COMMIT happens after checkpoint then backend reads state data from
  *               files
- *             * In case of crash replay will move data from xlog to files, if that
- *               hasn't happened before. XXX TODO - move to shmem in replay also
+ *
+ *             During replay and replication, TwoPhaseState also holds information
+ *             about active prepared transactions that haven't been moved to disk yet.
+ *
+ *             Replay of twophase records happens by the following rules:
+ *
+ *             * At the beginning of recovery, pg_twophase is scanned once, filling
+ *               TwoPhaseState with entries marked with gxact->inredo and
+ *               gxact->ondisk.  Two-phase file data older than the XID horizon of
+ *               the redo position are discarded.
+ *             * On PREPARE redo, the transaction is added to TwoPhaseState->prepXacts.
+ *               gxact->inredo is set to true for such entries.
+ *             * On Checkpoint we iterate through TwoPhaseState->prepXacts entries
+ *               that have gxact->inredo set and are behind the redo_horizon. We
+ *               save them to disk and then switch gxact->ondisk to true.
+ *             * On COMMIT/ABORT we delete the entry from TwoPhaseState->prepXacts.
+ *               If gxact->ondisk is true, the corresponding entry from the disk
+ *               is additionally deleted.
+ *             * RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions()
+ *               and PrescanPreparedTransactions() have been modified to go through
+ *               gxact->inredo entries that have not made it to disk.
  *
  *-------------------------------------------------------------------------
  */
@@ -147,11 +166,13 @@ typedef struct GlobalTransactionData
         */
        XLogRecPtr      prepare_start_lsn;              /* XLOG offset of prepare record start */
        XLogRecPtr      prepare_end_lsn;        /* XLOG offset of prepare record end */
+       TransactionId   xid;                    /* The GXACT id */
 
        Oid                     owner;                  /* ID of user that executed the xact */
        BackendId       locking_backend;        /* backend currently working on the xact */
        bool            valid;                  /* TRUE if PGPROC entry is in proc array */
        bool            ondisk;                 /* TRUE if prepare state file is on disk */
+       bool            inredo;                 /* TRUE if entry was added via xlog_redo */
        char            gid[GIDSIZE];   /* The GID assigned to the prepared xact */
 }      GlobalTransactionData;
 
@@ -198,6 +219,15 @@ static void ProcessRecords(char *bufptr, TransactionId xid,
 static void RemoveGXact(GlobalTransaction gxact);
 
 static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
+static char *ProcessTwoPhaseBuffer(TransactionId xid,
+                                                       XLogRecPtr      prepare_start_lsn,
+                                                       bool fromdisk, bool overwriteOK, bool setParent,
+                                                       TransactionId *result, TransactionId *maxsubxid);
+static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid,
+                               const char *gid, TimestampTz prepared_at, Oid owner,
+                               Oid databaseid);
+static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning);
+static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
 
 /*
  * Initialization of shared memory
@@ -342,18 +372,12 @@ PostPrepare_Twophase(void)
 /*
  * MarkAsPreparing
  *             Reserve the GID for the given transaction.
- *
- * Internally, this creates a gxact struct and puts it into the active array.
- * NOTE: this is also used when reloading a gxact after a crash; so avoid
- * assuming that we can use very much backend context.
  */
 GlobalTransaction
 MarkAsPreparing(TransactionId xid, const char *gid,
                                TimestampTz prepared_at, Oid owner, Oid databaseid)
 {
        GlobalTransaction gxact;
-       PGPROC     *proc;
-       PGXACT     *pgxact;
        int                     i;
 
        if (strlen(gid) >= GIDSIZE)
@@ -401,6 +425,37 @@ MarkAsPreparing(TransactionId xid, const char *gid,
        gxact = TwoPhaseState->freeGXacts;
        TwoPhaseState->freeGXacts = gxact->next;
 
+       MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid);
+
+       gxact->ondisk = false;
+
+       /* And insert it into the active array */
+       Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
+       TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
+
+       LWLockRelease(TwoPhaseStateLock);
+
+       return gxact;
+}
+
+/*
+ * MarkAsPreparingGuts
+ *
+ * This uses a gxact struct and puts it into the active array.
+ * NOTE: this is also used when reloading a gxact after a crash; so avoid
+ * assuming that we can use very much backend context.
+ *
+ * Note: This function should be called with appropriate locks held.
+ */
+static void
+MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
+                                       TimestampTz prepared_at, Oid owner, Oid databaseid)
+{
+       PGPROC     *proc;
+       PGXACT     *pgxact;
+       int                     i;
+
+       Assert(gxact != NULL);
        proc = &ProcGlobal->allProcs[gxact->pgprocno];
        pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
 
@@ -431,28 +486,18 @@ MarkAsPreparing(TransactionId xid, const char *gid,
        pgxact->nxids = 0;
 
        gxact->prepared_at = prepared_at;
-       /* initialize LSN to InvalidXLogRecPtr */
-       gxact->prepare_start_lsn = InvalidXLogRecPtr;
-       gxact->prepare_end_lsn = InvalidXLogRecPtr;
+       gxact->xid = xid;
        gxact->owner = owner;
        gxact->locking_backend = MyBackendId;
        gxact->valid = false;
-       gxact->ondisk = false;
+       gxact->inredo = false;
        strcpy(gxact->gid, gid);
 
-       /* And insert it into the active array */
-       Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
-       TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
-
        /*
         * Remember that we have this GlobalTransaction entry locked for us. If we
         * abort after this, we must release it.
         */
        MyLockedGxact = gxact;
-
-       LWLockRelease(TwoPhaseStateLock);
-
-       return gxact;
 }
 
 /*
@@ -1244,9 +1289,9 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
  * Reads 2PC data from xlog. During checkpoint this data will be moved to
  * twophase files and ReadTwoPhaseFile should be used instead.
  *
- * Note clearly that this function accesses WAL during normal operation, similarly
- * to the way WALSender or Logical Decoding would do. It does not run during
- * crash recovery or standby processing.
+ * Note clearly that this function can access WAL during normal operation,
+ * similarly to the way WALSender or Logical Decoding would do.
+ *
  */
 static void
 XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
@@ -1255,8 +1300,6 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
        XLogReaderState *xlogreader;
        char       *errormsg;
 
-       Assert(!RecoveryInProgress());
-
        xlogreader = XLogReaderAllocate(&read_local_xlog_page, NULL);
        if (!xlogreader)
                ereport(ERROR,
@@ -1501,7 +1544,7 @@ ProcessRecords(char *bufptr, TransactionId xid,
  * If giveWarning is false, do not complain about file-not-present;
  * this is an expected case during WAL replay.
  */
-void
+static void
 RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
 {
        char            path[MAXPGPATH];
@@ -1521,7 +1564,7 @@ RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
  *
  * Note: content and len don't include CRC.
  */
-void
+static void
 RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
 {
        char            path[MAXPGPATH];
@@ -1587,9 +1630,11 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
 /*
  * CheckPointTwoPhase -- handle 2PC component of checkpointing.
  *
- * We must fsync the state file of any GXACT that is valid and has a PREPARE
- * LSN <= the checkpoint's redo horizon.  (If the gxact isn't valid yet or
- * has a later LSN, this checkpoint is not responsible for fsyncing it.)
+ * We must fsync the state file of any GXACT that is valid or has been
+ * generated during redo and has a PREPARE LSN <= the checkpoint's redo
+ * horizon.  (If the gxact isn't valid yet, has not been generated in
+ * redo, or has a later LSN, this checkpoint is not responsible for
+ * fsyncing it.)
  *
  * This is deliberately run as late as possible in the checkpoint sequence,
  * because GXACTs ordinarily have short lifespans, and so it is quite
@@ -1631,10 +1676,10 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
        LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
        {
+               /* Note that we are using gxact not pgxact so this works in recovery also */
                GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
-               PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
 
-               if (gxact->valid &&
+               if ((gxact->valid || gxact->inredo) &&
                        !gxact->ondisk &&
                        gxact->prepare_end_lsn <= redo_horizon)
                {
@@ -1642,8 +1687,10 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
                        int                     len;
 
                        XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
-                       RecreateTwoPhaseFile(pgxact->xid, buf, len);
+                       RecreateTwoPhaseFile(gxact->xid, buf, len);
                        gxact->ondisk = true;
+                       gxact->prepare_start_lsn = InvalidXLogRecPtr;
+                       gxact->prepare_end_lsn = InvalidXLogRecPtr;
                        pfree(buf);
                        serialized_xacts++;
                }
@@ -1670,13 +1717,50 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
                                                           serialized_xacts)));
 }
 
+/*
+ * restoreTwoPhaseData
+ *
+ * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
+ * This is called once at the beginning of recovery, saving any extra
+ * lookups in the future.  Two-phase files that are newer than the
+ * minimum XID horizon are discarded on the way.
+ */
+void
+restoreTwoPhaseData(void)
+{
+       DIR                        *cldir;
+       struct dirent  *clde;
+
+       cldir = AllocateDir(TWOPHASE_DIR);
+       while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
+       {
+               if (strlen(clde->d_name) == 8 &&
+                       strspn(clde->d_name, "0123456789ABCDEF") == 8)
+               {
+                       TransactionId xid;
+                       char       *buf;
+
+                       xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
+
+                       buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr,
+                                                                               true, false, false,
+                                                                               NULL, NULL);
+                       if (buf == NULL)
+                               continue;
+
+                       PrepareRedoAdd(buf, InvalidXLogRecPtr, InvalidXLogRecPtr);
+               }
+       }
+       FreeDir(cldir);
+}
+
 /*
  * PrescanPreparedTransactions
  *
- * Scan the pg_twophase directory and determine the range of valid XIDs
- * present.  This is run during database startup, after we have completed
- * reading WAL.  ShmemVariableCache->nextXid has been set to one more than
- * the highest XID for which evidence exists in WAL.
+ * Scan the shared memory entries of TwoPhaseState and determine the range
+ * of valid XIDs present.  This is run during database startup, after we
+ * have completed reading WAL.  ShmemVariableCache->nextXid has been set to
+ * one more than the highest XID for which evidence exists in WAL.
  *
  * We throw away any prepared xacts with main XID beyond nextXid --- if any
  * are present, it suggests that the DBA has done a PITR recovery to an
@@ -1702,120 +1786,52 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
 {
        TransactionId origNextXid = ShmemVariableCache->nextXid;
        TransactionId result = origNextXid;
-       DIR                *cldir;
-       struct dirent *clde;
+       TransactionId maxsubxid = origNextXid;
        TransactionId *xids = NULL;
        int                     nxids = 0;
        int                     allocsize = 0;
+       int                     i;
 
-       cldir = AllocateDir(TWOPHASE_DIR);
-       while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
+       LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+       for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
        {
-               if (strlen(clde->d_name) == 8 &&
-                       strspn(clde->d_name, "0123456789ABCDEF") == 8)
-               {
-                       TransactionId xid;
-                       char       *buf;
-                       TwoPhaseFileHeader *hdr;
-                       TransactionId *subxids;
-                       int                     i;
-
-                       xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
-
-                       /* Reject XID if too new */
-                       if (TransactionIdFollowsOrEquals(xid, origNextXid))
-                       {
-                               ereport(WARNING,
-                                               (errmsg("removing future two-phase state file \"%s\"",
-                                                               clde->d_name)));
-                               RemoveTwoPhaseFile(xid, true);
-                               continue;
-                       }
+               TransactionId xid;
+               char       *buf;
+               GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
 
-                       /*
-                        * Note: we can't check if already processed because clog
-                        * subsystem isn't up yet.
-                        */
+               Assert(gxact->inredo);
 
-                       /* Read and validate file */
-                       buf = ReadTwoPhaseFile(xid, true);
-                       if (buf == NULL)
-                       {
-                               ereport(WARNING,
-                                         (errmsg("removing corrupt two-phase state file \"%s\"",
-                                                         clde->d_name)));
-                               RemoveTwoPhaseFile(xid, true);
-                               continue;
-                       }
+               xid = gxact->xid;
 
-                       /* Deconstruct header */
-                       hdr = (TwoPhaseFileHeader *) buf;
-                       if (!TransactionIdEquals(hdr->xid, xid))
-                       {
-                               ereport(WARNING,
-                                         (errmsg("removing corrupt two-phase state file \"%s\"",
-                                                         clde->d_name)));
-                               RemoveTwoPhaseFile(xid, true);
-                               pfree(buf);
-                               continue;
-                       }
+               buf = ProcessTwoPhaseBuffer(xid,
+                               gxact->prepare_start_lsn,
+                               gxact->ondisk, false, false,
+                               &result, &maxsubxid);
 
-                       /*
-                        * OK, we think this file is valid.  Incorporate xid into the
-                        * running-minimum result.
-                        */
-                       if (TransactionIdPrecedes(xid, result))
-                               result = xid;
+               if (buf == NULL)
+                       continue;
 
-                       /*
-                        * Examine subtransaction XIDs ... they should all follow main
-                        * XID, and they may force us to advance nextXid.
-                        *
-                        * We don't expect anyone else to modify nextXid, hence we don't
-                        * need to hold a lock while examining it.  We still acquire the
-                        * lock to modify it, though.
-                        */
-                       subxids = (TransactionId *) (buf +
-                                                               MAXALIGN(sizeof(TwoPhaseFileHeader)) +
-                                                               MAXALIGN(hdr->gidlen));
-                       for (i = 0; i < hdr->nsubxacts; i++)
+               if (xids_p)
+               {
+                       if (nxids == allocsize)
                        {
-                               TransactionId subxid = subxids[i];
-
-                               Assert(TransactionIdFollows(subxid, xid));
-                               if (TransactionIdFollowsOrEquals(subxid,
-                                                                                                ShmemVariableCache->nextXid))
+                               if (nxids == 0)
                                {
-                                       LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
-                                       ShmemVariableCache->nextXid = subxid;
-                                       TransactionIdAdvance(ShmemVariableCache->nextXid);
-                                       LWLockRelease(XidGenLock);
+                                       allocsize = 10;
+                                       xids = palloc(allocsize * sizeof(TransactionId));
                                }
-                       }
-
-
-                       if (xids_p)
-                       {
-                               if (nxids == allocsize)
+                               else
                                {
-                                       if (nxids == 0)
-                                       {
-                                               allocsize = 10;
-                                               xids = palloc(allocsize * sizeof(TransactionId));
-                                       }
-                                       else
-                                       {
-                                               allocsize = allocsize * 2;
-                                               xids = repalloc(xids, allocsize * sizeof(TransactionId));
-                                       }
+                                       allocsize = allocsize * 2;
+                                       xids = repalloc(xids, allocsize * sizeof(TransactionId));
                                }
-                               xids[nxids++] = xid;
                        }
-
-                       pfree(buf);
+                       xids[nxids++] = xid;
                }
+
+               pfree(buf);
        }
-       FreeDir(cldir);
+       LWLockRelease(TwoPhaseStateLock);
 
        if (xids_p)
        {
@@ -1823,14 +1839,25 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
                *nxids_p = nxids;
        }
 
+       /* update nextXid if needed */
+       if (TransactionIdFollowsOrEquals(maxsubxid, ShmemVariableCache->nextXid))
+       {
+               LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
+               ShmemVariableCache->nextXid = maxsubxid;
+               TransactionIdAdvance(ShmemVariableCache->nextXid);
+               LWLockRelease(XidGenLock);
+       }
+
        return result;
 }
 
 /*
  * StandbyRecoverPreparedTransactions
  *
- * Scan the pg_twophase directory and setup all the required information to
- * allow standby queries to treat prepared transactions as still active.
+ * Scan the shared memory entries of TwoPhaseState and setup all the required
+ * information to allow standby queries to treat prepared transactions as still
+ * active.
+ *
  * This is never called at the end of recovery - we use
  * RecoverPreparedTransactions() at that point.
  *
@@ -1841,202 +1868,292 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
 void
 StandbyRecoverPreparedTransactions(bool overwriteOK)
 {
-       DIR                *cldir;
-       struct dirent *clde;
+       int                     i;
 
-       cldir = AllocateDir(TWOPHASE_DIR);
-       while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
+       LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+       for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
        {
-               if (strlen(clde->d_name) == 8 &&
-                       strspn(clde->d_name, "0123456789ABCDEF") == 8)
-               {
-                       TransactionId xid;
-                       char       *buf;
-                       TwoPhaseFileHeader *hdr;
-                       TransactionId *subxids;
-                       int                     i;
-
-                       xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
-
-                       /* Already processed? */
-                       if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
-                       {
-                               ereport(WARNING,
-                                               (errmsg("removing stale two-phase state file \"%s\"",
-                                                               clde->d_name)));
-                               RemoveTwoPhaseFile(xid, true);
-                               continue;
-                       }
-
-                       /* Read and validate file */
-                       buf = ReadTwoPhaseFile(xid, true);
-                       if (buf == NULL)
-                       {
-                               ereport(WARNING,
-                                         (errmsg("removing corrupt two-phase state file \"%s\"",
-                                                         clde->d_name)));
-                               RemoveTwoPhaseFile(xid, true);
-                               continue;
-                       }
+               TransactionId xid;
+               char       *buf;
+               GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
 
-                       /* Deconstruct header */
-                       hdr = (TwoPhaseFileHeader *) buf;
-                       if (!TransactionIdEquals(hdr->xid, xid))
-                       {
-                               ereport(WARNING,
-                                         (errmsg("removing corrupt two-phase state file \"%s\"",
-                                                         clde->d_name)));
-                               RemoveTwoPhaseFile(xid, true);
-                               pfree(buf);
-                               continue;
-                       }
+               Assert(gxact->inredo);
 
-                       /*
-                        * Examine subtransaction XIDs ... they should all follow main
-                        * XID.
-                        */
-                       subxids = (TransactionId *) (buf +
-                                                               MAXALIGN(sizeof(TwoPhaseFileHeader)) +
-                                                               MAXALIGN(hdr->gidlen));
-                       for (i = 0; i < hdr->nsubxacts; i++)
-                       {
-                               TransactionId subxid = subxids[i];
-
-                               Assert(TransactionIdFollows(subxid, xid));
-                               SubTransSetParent(xid, subxid, overwriteOK);
-                       }
+               xid = gxact->xid;
 
+               buf = ProcessTwoPhaseBuffer(xid,
+                               gxact->prepare_start_lsn,
+                               gxact->ondisk, overwriteOK, true,
+                               NULL, NULL);
+               if (buf != NULL)
                        pfree(buf);
-               }
        }
-       FreeDir(cldir);
+       LWLockRelease(TwoPhaseStateLock);
 }
 
 /*
  * RecoverPreparedTransactions
  *
- * Scan the pg_twophase directory and reload shared-memory state for each
- * prepared transaction (reacquire locks, etc).  This is run during database
- * startup.
+ * Scan the shared memory entries of TwoPhaseState and reload the state for
+ * each prepared transaction (reacquire locks, etc).
+ *
+ * This is run during database startup.
  */
 void
 RecoverPreparedTransactions(void)
 {
-       char            dir[MAXPGPATH];
-       DIR                *cldir;
-       struct dirent *clde;
-       bool            overwriteOK = false;
-
-       snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR);
+       int                     i;
 
-       cldir = AllocateDir(dir);
-       while ((clde = ReadDir(cldir, dir)) != NULL)
+       /*
+        * Don't need a lock in the recovery phase.
+        */
+       for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
        {
-               if (strlen(clde->d_name) == 8 &&
-                       strspn(clde->d_name, "0123456789ABCDEF") == 8)
-               {
-                       TransactionId xid;
-                       char       *buf;
-                       char       *bufptr;
-                       TwoPhaseFileHeader *hdr;
-                       TransactionId *subxids;
-                       GlobalTransaction gxact;
-                       const char *gid;
-                       int                     i;
+               TransactionId xid;
+               char       *buf;
+               GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+               char       *bufptr;
+               TwoPhaseFileHeader *hdr;
+               TransactionId *subxids;
+               const char *gid;
+               bool            overwriteOK = false;
+               int                     i;
 
-                       xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
+               xid = gxact->xid;
 
-                       /* Already processed? */
-                       if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
-                       {
-                               ereport(WARNING,
-                                               (errmsg("removing stale two-phase state file \"%s\"",
-                                                               clde->d_name)));
-                               RemoveTwoPhaseFile(xid, true);
-                               continue;
-                       }
+               buf = ProcessTwoPhaseBuffer(xid,
+                               gxact->prepare_start_lsn,
+                               gxact->ondisk, false, false,
+                               NULL, NULL);
+               if (buf == NULL)
+                       continue;
 
-                       /* Read and validate file */
-                       buf = ReadTwoPhaseFile(xid, true);
-                       if (buf == NULL)
-                       {
-                               ereport(WARNING,
-                                         (errmsg("removing corrupt two-phase state file \"%s\"",
-                                                         clde->d_name)));
-                               RemoveTwoPhaseFile(xid, true);
-                               continue;
-                       }
+               ereport(LOG,
+                               (errmsg("recovering prepared transaction %u from shared memory", xid)));
+
+               hdr = (TwoPhaseFileHeader *) buf;
+               Assert(TransactionIdEquals(hdr->xid, xid));
+               bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
+               gid = (const char *) bufptr;
+               bufptr += MAXALIGN(hdr->gidlen);
+               subxids = (TransactionId *) bufptr;
+               bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
+               bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
+               bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+               bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
 
-                       ereport(LOG,
-                                       (errmsg("recovering prepared transaction %u", xid)));
-
-                       /* Deconstruct header */
-                       hdr = (TwoPhaseFileHeader *) buf;
-                       Assert(TransactionIdEquals(hdr->xid, xid));
-                       bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
-                       gid = (const char *) bufptr;
-                       bufptr += MAXALIGN(hdr->gidlen);
-                       subxids = (TransactionId *) bufptr;
-                       bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
-                       bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
-                       bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
-                       bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
+               /*
+                * It's possible that SubTransSetParent has been set before, if
+                * the prepared transaction generated xid assignment records. Test
+                * here must match one used in AssignTransactionId().
+                */
+               if (InHotStandby && (hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS ||
+                                                        XLogLogicalInfoActive()))
+                       overwriteOK = true;
 
-                       /*
-                        * It's possible that SubTransSetParent has been set before, if
-                        * the prepared transaction generated xid assignment records. Test
-                        * here must match one used in AssignTransactionId().
-                        */
-                       if (InHotStandby && (hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS ||
-                                                                XLogLogicalInfoActive()))
-                               overwriteOK = true;
+               /*
+                * Reconstruct subtrans state for the transaction --- needed
+                * because pg_subtrans is not preserved over a restart.  Note that
+                * we are linking all the subtransactions directly to the
+                * top-level XID; there may originally have been a more complex
+                * hierarchy, but there's no need to restore that exactly.
+                */
+               for (i = 0; i < hdr->nsubxacts; i++)
+                       SubTransSetParent(subxids[i], xid, overwriteOK);
 
-                       /*
-                        * Reconstruct subtrans state for the transaction --- needed
-                        * because pg_subtrans is not preserved over a restart.  Note that
-                        * we are linking all the subtransactions directly to the
-                        * top-level XID; there may originally have been a more complex
-                        * hierarchy, but there's no need to restore that exactly.
-                        */
-                       for (i = 0; i < hdr->nsubxacts; i++)
-                               SubTransSetParent(subxids[i], xid, overwriteOK);
+               /*
+                * Recreate its GXACT and dummy PGPROC. But, check whether
+                * it was added in redo and already has a shmem entry for
+                * it.
+                */
+               LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+               MarkAsPreparingGuts(gxact, xid, gid,
+                                                       hdr->prepared_at,
+                                                       hdr->owner, hdr->database);
 
-                       /*
-                        * Recreate its GXACT and dummy PGPROC
-                        */
-                       gxact = MarkAsPreparing(xid, gid,
-                                                                       hdr->prepared_at,
-                                                                       hdr->owner, hdr->database);
-                       gxact->ondisk = true;
-                       GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
-                       MarkAsPrepared(gxact);
+               /* recovered, so reset the flag for entries generated by redo */
+               gxact->inredo = false;
 
-                       /*
-                        * Recover other state (notably locks) using resource managers
-                        */
-                       ProcessRecords(bufptr, xid, twophase_recover_callbacks);
+               LWLockRelease(TwoPhaseStateLock);
 
-                       /*
-                        * Release locks held by the standby process after we process each
-                        * prepared transaction. As a result, we don't need too many
-                        * additional locks at any one time.
-                        */
-                       if (InHotStandby)
-                               StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
+               GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
+               MarkAsPrepared(gxact);
 
-                       /*
-                        * We're done with recovering this transaction. Clear
-                        * MyLockedGxact, like we do in PrepareTransaction() during normal
-                        * operation.
-                        */
-                       PostPrepare_Twophase();
+               /*
+                * Recover other state (notably locks) using resource managers
+                */
+               ProcessRecords(bufptr, xid, twophase_recover_callbacks);
 
-                       pfree(buf);
+               /*
+                * Release locks held by the standby process after we process each
+                * prepared transaction. As a result, we don't need too many
+                * additional locks at any one time.
+                */
+               if (InHotStandby)
+                       StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
+
+               /*
+                * We're done with recovering this transaction. Clear
+                * MyLockedGxact, like we do in PrepareTransaction() during normal
+                * operation.
+                */
+               PostPrepare_Twophase();
+
+               pfree(buf);
+       }
+}
+
+/*
+ * ProcessTwoPhaseBuffer
+ *
+ * Given a transaction id, read it either from disk or read it directly
+ * via shmem xlog record pointer using the provided "prepare_start_lsn".
+ *
+ * If setParent is true, then use the overwriteOK parameter to set up
+ * subtransaction parent linkages.
+ *
+ * If result and maxsubxid are not NULL, fill them up with smallest
+ * running transaction id (lesser than ShmemVariableCache->nextXid)
+ * and largest subtransaction id for this transaction respectively.
+ */
+static char *
+ProcessTwoPhaseBuffer(TransactionId xid,
+                                         XLogRecPtr prepare_start_lsn,
+                                         bool fromdisk, bool overwriteOK,
+                                         bool setParent, TransactionId *result,
+                                         TransactionId *maxsubxid)
+{
+       TransactionId origNextXid = ShmemVariableCache->nextXid;
+       TransactionId res;
+       TransactionId maxsub;
+       TransactionId *subxids;
+       char       *buf;
+       TwoPhaseFileHeader *hdr;
+       int                     i;
+
+       if (!fromdisk)
+               Assert(prepare_start_lsn != InvalidXLogRecPtr);
+
+       if (result)
+               res = *result;
+       if (maxsubxid)
+               maxsub = *maxsubxid;
+
+       /* Already processed? */
+       if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
+       {
+               if (fromdisk)
+               {
+                       ereport(WARNING,
+                                       (errmsg("removing stale two-phase state file for \"%u\"",
+                                                       xid)));
+                       RemoveTwoPhaseFile(xid, true);
                }
+               else
+               {
+                       ereport(WARNING,
+                                       (errmsg("removing stale two-phase state from"
+                                                       " shared memory for \"%u\"", xid)));
+                       PrepareRedoRemove(xid, true);
+               }
+               return NULL;
        }
-       FreeDir(cldir);
+
+       /* Reject XID if too new */
+       if (TransactionIdFollowsOrEquals(xid, origNextXid))
+       {
+               if (fromdisk)
+               {
+                       ereport(WARNING,
+                                       (errmsg("removing future two-phase state file for \"%u\"",
+                                                       xid)));
+                       RemoveTwoPhaseFile(xid, true);
+               }
+               else
+               {
+                       ereport(WARNING,
+                                       (errmsg("removing future two-phase state from memory for \"%u\"",
+                                                       xid)));
+                       PrepareRedoRemove(xid, true);
+               }
+               return NULL;
+       }
+
+       if (fromdisk)
+       {
+               /* Read and validate file */
+               buf = ReadTwoPhaseFile(xid, true);
+               if (buf == NULL)
+               {
+                       ereport(WARNING,
+                                       (errmsg("removing corrupt two-phase state file for \"%u\"",
+                                                       xid)));
+                       RemoveTwoPhaseFile(xid, true);
+                       return NULL;
+               }
+       }
+       else
+       {
+               /* Read xlog data */
+               XlogReadTwoPhaseData(prepare_start_lsn, &buf, NULL);
+       }
+
+       /* Deconstruct header */
+       hdr = (TwoPhaseFileHeader *) buf;
+       if (!TransactionIdEquals(hdr->xid, xid))
+       {
+               if (fromdisk)
+               {
+                       ereport(WARNING,
+                                       (errmsg("removing corrupt two-phase state file for \"%u\"",
+                                                       xid)));
+                       RemoveTwoPhaseFile(xid, true);
+               }
+               else
+               {
+                       ereport(WARNING,
+                                       (errmsg("removing corrupt two-phase state from memory for \"%u\"",
+                                                       xid)));
+                       PrepareRedoRemove(xid, true);
+               }
+               pfree(buf);
+               return NULL;
+       }
+
+       /*
+        * OK, we think this buffer is valid.  Incorporate xid into the
+        * running-minimum result.
+        */
+       if (TransactionIdPrecedes(xid, res))
+               res = xid;
+
+       /*
+        * Examine subtransaction XIDs ... they should all follow main
+        * XID, and they may force us to advance nextXid.
+        */
+       subxids = (TransactionId *) (buf +
+                                                                MAXALIGN(sizeof(TwoPhaseFileHeader)) +
+                                                                MAXALIGN(hdr->gidlen));
+       for (i = 0; i < hdr->nsubxacts; i++)
+       {
+               TransactionId subxid = subxids[i];
+
+               Assert(TransactionIdFollows(subxid, xid));
+               if (TransactionIdFollowsOrEquals(subxid, maxsub))
+                       maxsub = subxid;
+               if (setParent)
+                       SubTransSetParent(xid, subxid, overwriteOK);
+       }
+
+       if (result)
+               *result = res;
+       if (maxsubxid)
+               *maxsubxid = maxsub;
+
+       return buf;
 }
 
+
 /*
  *     RecordTransactionCommitPrepared
  *
@@ -2187,3 +2304,111 @@ RecordTransactionAbortPrepared(TransactionId xid,
         */
        SyncRepWaitForLSN(recptr, false);
 }
+
+/*
+ * PrepareRedoAdd
+ *
+ * Store pointers to the start/end of the WAL record along with the xid in
+ * a gxact entry in shared memory TwoPhaseState structure.  If caller
+ * specifies InvalidXLogRecPtr as WAL position to fetch the two-phase
+ * data, the entry is marked as located on disk.
+ */
+void
+PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
+{
+       TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
+       char                      *bufptr;
+       const char                *gid;
+       GlobalTransaction gxact;
+
+       Assert(RecoveryInProgress());
+
+       bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
+       gid = (const char *) bufptr;
+
+       /*
+        * Reserve the GID for the given transaction in the redo code path.
+        *
+        * This creates a gxact struct and puts it into the active array.
+        *
+        * In redo, this struct is mainly used to track PREPARE/COMMIT entries
+        * in shared memory. Hence, we only fill up the bare minimum contents here.
+        * The gxact also gets marked with gxact->inredo set to true to indicate
+        * that it got added in the redo phase
+        */
+
+       LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+       /* Get a free gxact from the freelist */
+       if (TwoPhaseState->freeGXacts == NULL)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OUT_OF_MEMORY),
+                                errmsg("maximum number of prepared transactions reached"),
+                                errhint("Increase max_prepared_transactions (currently %d).",
+                                                max_prepared_xacts)));
+       gxact = TwoPhaseState->freeGXacts;
+       TwoPhaseState->freeGXacts = gxact->next;
+
+       gxact->prepared_at = hdr->prepared_at;
+       gxact->prepare_start_lsn = start_lsn;
+       gxact->prepare_end_lsn = end_lsn;
+       gxact->xid = hdr->xid;
+       gxact->owner = hdr->owner;
+       gxact->locking_backend = InvalidBackendId;
+       gxact->valid = false;
+       gxact->ondisk = XLogRecPtrIsInvalid(start_lsn);
+       gxact->inredo = true; /* yes, added in redo */
+       strcpy(gxact->gid, gid);
+
+       /* And insert it into the active array */
+       Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
+       TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
+
+       LWLockRelease(TwoPhaseStateLock);
+
+       elog(DEBUG2, "Adding 2PC data to shared memory %u", gxact->xid);
+}
+
+/*
+ * PrepareRedoRemove
+ *
+ * Remove the corresponding gxact entry from TwoPhaseState. Also
+ * remove the 2PC file if a prepared transaction was saved via
+ * an earlier checkpoint.
+ */
+void
+PrepareRedoRemove(TransactionId xid, bool giveWarning)
+{
+       GlobalTransaction gxact = NULL;
+       int                     i;
+
+       Assert(RecoveryInProgress());
+
+       LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+       for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+       {
+               gxact = TwoPhaseState->prepXacts[i];
+
+               if (gxact->xid == xid)
+               {
+                       Assert(gxact->inredo);
+                       break;
+               }
+       }
+       LWLockRelease(TwoPhaseStateLock);
+
+       /*
+        * Just leave if there is nothing, this is expected during WAL replay.
+        */
+       if (gxact == NULL)
+               return;
+
+       /*
+        * And now we can clean up any files we may have left.
+        */
+       elog(DEBUG2, "Removing 2PC data from shared memory %u", xid);
+       if (gxact->ondisk)
+               RemoveTwoPhaseFile(xid, giveWarning);
+       RemoveGXact(gxact);
+
+       return;
+}
index c8751c697d4baa82d7e0b3f3eddbe3696ab631e6..6f614e4fad07c8c7b0ff0f6eab262dcbb7357b20 100644 (file)
@@ -5615,7 +5615,9 @@ xact_redo(XLogReaderState *record)
                        Assert(TransactionIdIsValid(parsed.twophase_xid));
                        xact_redo_commit(&parsed, parsed.twophase_xid,
                                                         record->EndRecPtr, XLogRecGetOrigin(record));
-                       RemoveTwoPhaseFile(parsed.twophase_xid, false);
+
+                       /* Delete TwoPhaseState gxact entry and/or 2PC file. */
+                       PrepareRedoRemove(parsed.twophase_xid, false);
                }
        }
        else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED)
@@ -5635,14 +5637,20 @@ xact_redo(XLogReaderState *record)
                {
                        Assert(TransactionIdIsValid(parsed.twophase_xid));
                        xact_redo_abort(&parsed, parsed.twophase_xid);
-                       RemoveTwoPhaseFile(parsed.twophase_xid, false);
+
+                       /* Delete TwoPhaseState gxact entry and/or 2PC file. */
+                       PrepareRedoRemove(parsed.twophase_xid, false);
                }
        }
        else if (info == XLOG_XACT_PREPARE)
        {
-               /* the record contents are exactly the 2PC file */
-               RecreateTwoPhaseFile(XLogRecGetXid(record),
-                                                 XLogRecGetData(record), XLogRecGetDataLen(record));
+               /*
+                * Store xid and start/end pointers of the WAL record in
+                * TwoPhaseState gxact entry.
+                */
+               PrepareRedoAdd(XLogRecGetData(record),
+                                          record->ReadRecPtr,
+                                          record->EndRecPtr);
        }
        else if (info == XLOG_XACT_ASSIGNMENT)
        {
index 5d58f0983cf8aec4e348faa6824df46ab861c7c3..287b3b13799dd5e9c9bddf24553b28c87950c146 100644 (file)
@@ -6696,6 +6696,16 @@ StartupXLOG(void)
         */
        restoreTimeLineHistoryFiles(ThisTimeLineID, recoveryTargetTLI);
 
+       /*
+        * Before running in recovery, scan pg_twophase and fill in its status
+        * to be able to work on entries generated by redo.  Doing a scan before
+        * taking any recovery action has the merit to discard any 2PC files that
+        * are newer than the first record to replay, saving from any conflicts at
+        * replay.  This avoids as well any subsequent scans when doing recovery
+        * of the on-disk two-phase data.
+        */
+       restoreTwoPhaseData();
+
        lastFullPageWrites = checkPoint.fullPageWrites;
 
        RedoRecPtr = XLogCtl->RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
index b2b7848fad233ca6394add0f7c3ee5ad8d17b000..4d547c55539788baef0b54af7e68e43b60378b27 100644 (file)
@@ -49,11 +49,12 @@ extern TransactionId PrescanPreparedTransactions(TransactionId **xids_p,
 extern void StandbyRecoverPreparedTransactions(bool overwriteOK);
 extern void RecoverPreparedTransactions(void);
 
-extern void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
-extern void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning);
-
 extern void CheckPointTwoPhase(XLogRecPtr redo_horizon);
 
 extern void FinishPreparedTransaction(const char *gid, bool isCommit);
 
+extern void PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
+                                                  XLogRecPtr end_lsn);
+extern void PrepareRedoRemove(TransactionId xid, bool giveWarning);
+extern void restoreTwoPhaseData(void);
 #endif   /* TWOPHASE_H */