]> granicus.if.org Git - postgresql/commitdiff
Avoid bogus TwoPhaseState locking sequences
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Wed, 14 Jun 2017 15:29:05 +0000 (11:29 -0400)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Wed, 14 Jun 2017 15:29:05 +0000 (11:29 -0400)
The optimized code in 728bd991c3c4 contains a few invalid locking
sequences.  To wit, the original code would try to acquire an lwlock
that it already holds.  Avoid this by moving lock acquisitions to
higher-level code, and install appropriate assertions in low-level that
the correct mode is held.

Authors: Michael Paquier, Álvaro Herrera
Reported-By: chuanting wang
Bug: #14680
Discussion: https://postgr.es/m/20170531033228.1487.10124@wrigleys.postgresql.org

src/backend/access/transam/twophase.c
src/backend/access/transam/xact.c

index c50f9c4bf6537d882cfe8ea877770eaa0200cde2..e9751aa2f6d05095ced8f300d88bd2254101f8f0 100644 (file)
@@ -195,7 +195,10 @@ typedef struct TwoPhaseStateData
 static TwoPhaseStateData *TwoPhaseState;
 
 /*
- * Global transaction entry currently locked by us, if any.
+ * Global transaction entry currently locked by us, if any.  Note that any
+ * access to the entry pointed to by this variable must be protected by
+ * TwoPhaseStateLock, though obviously the pointer itself doesn't need to be
+ * (since it's just local memory).
  */
 static GlobalTransaction MyLockedGxact = NULL;
 
@@ -338,18 +341,13 @@ AtAbort_Twophase(void)
         * resources held by the transaction yet.  In those cases, the in-memory
         * state can be wrong, but it's too late to back out.
         */
+       LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
        if (!MyLockedGxact->valid)
-       {
                RemoveGXact(MyLockedGxact);
-       }
        else
-       {
-               LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
-
                MyLockedGxact->locking_backend = InvalidBackendId;
+       LWLockRelease(TwoPhaseStateLock);
 
-               LWLockRelease(TwoPhaseStateLock);
-       }
        MyLockedGxact = NULL;
 }
 
@@ -454,6 +452,8 @@ MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
        PGXACT     *pgxact;
        int                     i;
 
+       Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
+
        Assert(gxact != NULL);
        proc = &ProcGlobal->allProcs[gxact->pgprocno];
        pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
@@ -530,15 +530,19 @@ GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
 /*
  * MarkAsPrepared
  *             Mark the GXACT as fully valid, and enter it into the global ProcArray.
+ *
+ * lock_held indicates whether caller already holds TwoPhaseStateLock.
  */
 static void
-MarkAsPrepared(GlobalTransaction gxact)
+MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
 {
        /* Lock here may be overkill, but I'm not convinced of that ... */
-       LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+       if (!lock_held)
+               LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
        Assert(!gxact->valid);
        gxact->valid = true;
-       LWLockRelease(TwoPhaseStateLock);
+       if (!lock_held)
+               LWLockRelease(TwoPhaseStateLock);
 
        /*
         * Put it into the global ProcArray so TransactionIdIsInProgress considers
@@ -632,7 +636,7 @@ RemoveGXact(GlobalTransaction gxact)
 {
        int                     i;
 
-       LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+       Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
 
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
        {
@@ -646,14 +650,10 @@ RemoveGXact(GlobalTransaction gxact)
                        gxact->next = TwoPhaseState->freeGXacts;
                        TwoPhaseState->freeGXacts = gxact;
 
-                       LWLockRelease(TwoPhaseStateLock);
-
                        return;
                }
        }
 
-       LWLockRelease(TwoPhaseStateLock);
-
        elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
 }
 
@@ -1127,7 +1127,7 @@ EndPrepare(GlobalTransaction gxact)
         * the xact crashed.  Instead we have a window where the same XID appears
         * twice in ProcArray, which is OK.
         */
-       MarkAsPrepared(gxact);
+       MarkAsPrepared(gxact, false);
 
        /*
         * Now we can mark ourselves as out of the commit critical section: a
@@ -1506,7 +1506,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
        if (gxact->ondisk)
                RemoveTwoPhaseFile(xid, true);
 
+       LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
        RemoveGXact(gxact);
+       LWLockRelease(TwoPhaseStateLock);
        MyLockedGxact = NULL;
 
        pfree(buf);
@@ -1734,6 +1736,7 @@ restoreTwoPhaseData(void)
        struct dirent *clde;
 
        cldir = AllocateDir(TWOPHASE_DIR);
+       LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
        while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
        {
                if (strlen(clde->d_name) == 8 &&
@@ -1752,6 +1755,7 @@ restoreTwoPhaseData(void)
                        PrepareRedoAdd(buf, InvalidXLogRecPtr, InvalidXLogRecPtr);
                }
        }
+       LWLockRelease(TwoPhaseStateLock);
        FreeDir(cldir);
 }
 
@@ -1792,7 +1796,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
        int                     allocsize = 0;
        int                     i;
 
-       LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+       LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
        {
                TransactionId xid;
@@ -1867,7 +1871,7 @@ StandbyRecoverPreparedTransactions(void)
 {
        int                     i;
 
-       LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+       LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
        {
                TransactionId xid;
@@ -1893,7 +1897,8 @@ StandbyRecoverPreparedTransactions(void)
  * Scan the shared memory entries of TwoPhaseState and reload the state for
  * each prepared transaction (reacquire locks, etc).
  *
- * This is run during database startup.
+ * This is run at the end of recovery, but before we allow backends to write
+ * WAL.
  *
  * At the end of recovery the way we take snapshots will change. We now need
  * to mark all running transactions with their full SubTransSetParent() info
@@ -1907,9 +1912,7 @@ RecoverPreparedTransactions(void)
 {
        int                     i;
 
-       /*
-        * Don't need a lock in the recovery phase.
-        */
+       LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
        {
                TransactionId xid;
@@ -1955,7 +1958,6 @@ RecoverPreparedTransactions(void)
                 * Recreate its GXACT and dummy PGPROC. But, check whether it was
                 * added in redo and already has a shmem entry for it.
                 */
-               LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
                MarkAsPreparingGuts(gxact, xid, gid,
                                                        hdr->prepared_at,
                                                        hdr->owner, hdr->database);
@@ -1963,13 +1965,13 @@ RecoverPreparedTransactions(void)
                /* recovered, so reset the flag for entries generated by redo */
                gxact->inredo = false;
 
-               LWLockRelease(TwoPhaseStateLock);
-
                GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
-               MarkAsPrepared(gxact);
+               MarkAsPrepared(gxact, true);
+
+               LWLockRelease(TwoPhaseStateLock);
 
                /*
-                * Recover other state (notably locks) using resource managers
+                * Recover other state (notably locks) using resource managers.
                 */
                ProcessRecords(bufptr, xid, twophase_recover_callbacks);
 
@@ -1988,7 +1990,11 @@ RecoverPreparedTransactions(void)
                PostPrepare_Twophase();
 
                pfree(buf);
+
+               LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
        }
+
+       LWLockRelease(TwoPhaseStateLock);
 }
 
 /*
@@ -2014,6 +2020,8 @@ ProcessTwoPhaseBuffer(TransactionId xid,
        TwoPhaseFileHeader *hdr;
        int                     i;
 
+       Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
+
        if (!fromdisk)
                Assert(prepare_start_lsn != InvalidXLogRecPtr);
 
@@ -2030,8 +2038,8 @@ ProcessTwoPhaseBuffer(TransactionId xid,
                else
                {
                        ereport(WARNING,
-                                       (errmsg("removing stale two-phase state from"
-                                                       " shared memory for \"%u\"", xid)));
+                                       (errmsg("removing stale two-phase state from shared memory for \"%u\"",
+                                                       xid)));
                        PrepareRedoRemove(xid, true);
                }
                return NULL;
@@ -2308,6 +2316,7 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
        const char *gid;
        GlobalTransaction gxact;
 
+       Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
        Assert(RecoveryInProgress());
 
        bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
@@ -2324,7 +2333,6 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
         * that it got added in the redo phase
         */
 
-       LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
        /* Get a free gxact from the freelist */
        if (TwoPhaseState->freeGXacts == NULL)
                ereport(ERROR,
@@ -2350,17 +2358,17 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
        Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
        TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
 
-       LWLockRelease(TwoPhaseStateLock);
-
-       elog(DEBUG2, "Adding 2PC data to shared memory %u", gxact->xid);
+       elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid);
 }
 
 /*
  * PrepareRedoRemove
  *
- * Remove the corresponding gxact entry from TwoPhaseState. Also
- * remove the 2PC file if a prepared transaction was saved via
- * an earlier checkpoint.
+ * Remove the corresponding gxact entry from TwoPhaseState. Also remove
+ * the 2PC file if a prepared transaction was saved via an earlier checkpoint.
+ *
+ * Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState
+ * is updated.
  */
 void
 PrepareRedoRemove(TransactionId xid, bool giveWarning)
@@ -2369,9 +2377,9 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning)
        int                     i;
        bool            found = false;
 
+       Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
        Assert(RecoveryInProgress());
 
-       LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
        for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
        {
                gxact = TwoPhaseState->prepXacts[i];
@@ -2383,7 +2391,6 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning)
                        break;
                }
        }
-       LWLockRelease(TwoPhaseStateLock);
 
        /*
         * Just leave if there is nothing, this is expected during WAL replay.
@@ -2394,7 +2401,7 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning)
        /*
         * And now we can clean up any files we may have left.
         */
-       elog(DEBUG2, "Removing 2PC data from shared memory %u", xid);
+       elog(DEBUG2, "removing 2PC data for transaction %u", xid);
        if (gxact->ondisk)
                RemoveTwoPhaseFile(xid, giveWarning);
        RemoveGXact(gxact);
index 7e8c598f2adc191a34f2bb5424a3a480cc342888..e09696c37f6a9cafe2b204f0b05c95059243a872 100644 (file)
@@ -5351,6 +5351,8 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
        int                     i;
        TimestampTz commit_time;
 
+       Assert(TransactionIdIsValid(xid));
+
        max_xid = TransactionIdLatest(xid, parsed->nsubxacts, parsed->subxacts);
 
        /*
@@ -5518,6 +5520,8 @@ xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid)
        int                     i;
        TransactionId max_xid;
 
+       Assert(TransactionIdIsValid(xid));
+
        /*
         * Make sure nextXid is beyond any XID mentioned in the record.
         *
@@ -5598,51 +5602,49 @@ xact_redo(XLogReaderState *record)
        /* Backup blocks are not used in xact records */
        Assert(!XLogRecHasAnyBlockRefs(record));
 
-       if (info == XLOG_XACT_COMMIT || info == XLOG_XACT_COMMIT_PREPARED)
+       if (info == XLOG_XACT_COMMIT)
        {
                xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
                xl_xact_parsed_commit parsed;
 
-               ParseCommitRecord(XLogRecGetInfo(record), xlrec,
-                                                 &parsed);
+               ParseCommitRecord(XLogRecGetInfo(record), xlrec, &parsed);
+               xact_redo_commit(&parsed, XLogRecGetXid(record),
+                                                record->EndRecPtr, XLogRecGetOrigin(record));
+       }
+       else if (info == XLOG_XACT_COMMIT_PREPARED)
+       {
+               xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
+               xl_xact_parsed_commit parsed;
 
-               if (info == XLOG_XACT_COMMIT)
-               {
-                       Assert(!TransactionIdIsValid(parsed.twophase_xid));
-                       xact_redo_commit(&parsed, XLogRecGetXid(record),
-                                                        record->EndRecPtr, XLogRecGetOrigin(record));
-               }
-               else
-               {
-                       Assert(TransactionIdIsValid(parsed.twophase_xid));
-                       xact_redo_commit(&parsed, parsed.twophase_xid,
-                                                        record->EndRecPtr, XLogRecGetOrigin(record));
+               ParseCommitRecord(XLogRecGetInfo(record), xlrec, &parsed);
+               xact_redo_commit(&parsed, parsed.twophase_xid,
+                                                record->EndRecPtr, XLogRecGetOrigin(record));
 
-                       /* Delete TwoPhaseState gxact entry and/or 2PC file. */
-                       PrepareRedoRemove(parsed.twophase_xid, false);
-               }
+               /* Delete TwoPhaseState gxact entry and/or 2PC file. */
+               LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+               PrepareRedoRemove(parsed.twophase_xid, false);
+               LWLockRelease(TwoPhaseStateLock);
        }
-       else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED)
+       else if (info == XLOG_XACT_ABORT)
        {
                xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
                xl_xact_parsed_abort parsed;
 
-               ParseAbortRecord(XLogRecGetInfo(record), xlrec,
-                                                &parsed);
+               ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed);
+               xact_redo_abort(&parsed, XLogRecGetXid(record));
+       }
+       else if (info == XLOG_XACT_ABORT_PREPARED)
+       {
+               xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
+               xl_xact_parsed_abort parsed;
 
-               if (info == XLOG_XACT_ABORT)
-               {
-                       Assert(!TransactionIdIsValid(parsed.twophase_xid));
-                       xact_redo_abort(&parsed, XLogRecGetXid(record));
-               }
-               else
-               {
-                       Assert(TransactionIdIsValid(parsed.twophase_xid));
-                       xact_redo_abort(&parsed, parsed.twophase_xid);
+               ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed);
+               xact_redo_abort(&parsed, parsed.twophase_xid);
 
-                       /* Delete TwoPhaseState gxact entry and/or 2PC file. */
-                       PrepareRedoRemove(parsed.twophase_xid, false);
-               }
+               /* Delete TwoPhaseState gxact entry and/or 2PC file. */
+               LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+               PrepareRedoRemove(parsed.twophase_xid, false);
+               LWLockRelease(TwoPhaseStateLock);
        }
        else if (info == XLOG_XACT_PREPARE)
        {
@@ -5650,9 +5652,11 @@ xact_redo(XLogReaderState *record)
                 * Store xid and start/end pointers of the WAL record in TwoPhaseState
                 * gxact entry.
                 */
+               LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
                PrepareRedoAdd(XLogRecGetData(record),
                                           record->ReadRecPtr,
                                           record->EndRecPtr);
+               LWLockRelease(TwoPhaseStateLock);
        }
        else if (info == XLOG_XACT_ASSIGNMENT)
        {