]> granicus.if.org Git - postgresql/commitdiff
Use a more granular approach to follow update chains
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Wed, 27 Nov 2013 20:50:33 +0000 (17:50 -0300)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Thu, 28 Nov 2013 14:54:25 +0000 (11:54 -0300)
Instead of simply checking the KEYS_UPDATED bit, we need to check
whether each lock held on the future version of the tuple conflicts with
the lock we're trying to acquire.

Per bug report #8434 by Tomonari Katsumata

src/backend/access/heap/heapam.c

index 493df9f6ccdb104e69950a68ae55080ce7c1751e..68c14925ed53cd550dfb7752052413bb7da1db77 100644 (file)
@@ -4777,6 +4777,83 @@ l5:
        *result_xmax = new_xmax;
 }
 
+/*
+ * Subroutine for heap_lock_updated_tuple_rec.
+ *
+ * Given an hypothetical multixact status held by the transaction identified
+ * with the given xid, does the current transaction need to wait, fail, or can
+ * it continue if it wanted to acquire a lock of the given mode?  "needwait"
+ * is set to true if waiting is necessary; if it can continue, then
+ * HeapTupleMayBeUpdated is returned.  In case of a conflict, a different
+ * HeapTupleSatisfiesUpdate return code is returned.
+ *
+ * The held status is said to be hypothetical because it might correspond to a
+ * lock held by a single Xid, i.e. not a real MultiXactId; we express it this
+ * way for simplicity of API.
+ */
+static HTSU_Result
+test_lockmode_for_conflict(MultiXactStatus status, TransactionId xid,
+                                                  LockTupleMode mode, bool *needwait)
+{
+       MultiXactStatus wantedstatus;
+
+       *needwait = false;
+       wantedstatus = get_mxact_status_for_lock(mode, false);
+
+       /*
+        * Note: we *must* check TransactionIdIsInProgress before
+        * TransactionIdDidAbort/Commit; see comment at top of tqual.c for an
+        * explanation.
+        */
+       if (TransactionIdIsCurrentTransactionId(xid))
+       {
+               /*
+                * Updated by our own transaction? Just return failure.  This shouldn't
+                * normally happen.
+                */
+               return HeapTupleSelfUpdated;
+       }
+       else if (TransactionIdIsInProgress(xid))
+       {
+               /*
+                * If the locking transaction is running, what we do depends on whether
+                * the lock modes conflict: if they do, then we must wait for it to
+                * finish; otherwise we can fall through to lock this tuple version
+                * without waiting.
+                */
+               if (DoLockModesConflict(LOCKMODE_from_mxstatus(status),
+                                                               LOCKMODE_from_mxstatus(wantedstatus)))
+               {
+                       *needwait = true;
+               }
+
+               /*
+                * If we set needwait above, then this value doesn't matter; otherwise,
+                * this value signals to caller that it's okay to proceed.
+                */
+               return HeapTupleMayBeUpdated;
+       }
+       else if (TransactionIdDidAbort(xid))
+               return HeapTupleMayBeUpdated;
+       else if (TransactionIdDidCommit(xid))
+       {
+               /*
+                * If the updating transaction committed, what we do depends on whether
+                * the lock modes conflict: if they do, then we must report error to
+                * caller.  But if they don't, we can fall through to lock it.
+                */
+               if (DoLockModesConflict(LOCKMODE_from_mxstatus(status),
+                                                               LOCKMODE_from_mxstatus(wantedstatus)))
+                       /* bummer */
+                       return HeapTupleUpdated;
+
+               return HeapTupleMayBeUpdated;
+       }
+
+       /* Not in progress, not aborted, not committed -- must have crashed */
+       return HeapTupleMayBeUpdated;
+}
+
 
 /*
  * Recursive part of heap_lock_updated_tuple
@@ -4794,7 +4871,8 @@ heap_lock_updated_tuple_rec(Relation rel, ItemPointer tid, TransactionId xid,
        Buffer          buf;
        uint16          new_infomask,
                                new_infomask2,
-                               old_infomask;
+                               old_infomask,
+                               old_infomask2;
        TransactionId xmax,
                                new_xmax;
        TransactionId priorXmax = InvalidTransactionId;
@@ -4836,45 +4914,107 @@ l4:
                }
 
                old_infomask = mytup.t_data->t_infomask;
+               old_infomask2 = mytup.t_data->t_infomask2;
                xmax = HeapTupleHeaderGetRawXmax(mytup.t_data);
 
                /*
-                * If this tuple is updated and the key has been modified (or
-                * deleted), what we do depends on the status of the updating
-                * transaction: if it's live, we sleep until it finishes; if it has
-                * committed, we have to fail (i.e. return HeapTupleUpdated); if it
-                * aborted, we ignore it. For updates that didn't touch the key, we
-                * can just plough ahead.
+                * If this tuple version has been updated or locked by some concurrent
+                * transaction(s), what we do depends on whether our lock mode
+                * conflicts with what those other transactions hold, and also on the
+                * status of them.
                 */
-               if (!(old_infomask & HEAP_XMAX_INVALID) &&
-                       (mytup.t_data->t_infomask2 & HEAP_KEYS_UPDATED))
+               if (!(old_infomask & HEAP_XMAX_INVALID))
                {
-                       TransactionId update_xid;
+                       TransactionId rawxmax;
+                       bool            needwait;
 
-                       /*
-                        * Note: we *must* check TransactionIdIsInProgress before
-                        * TransactionIdDidAbort/Commit; see comment at top of tqual.c for
-                        * an explanation.
-                        */
-                       update_xid = HeapTupleHeaderGetUpdateXid(mytup.t_data);
-                       if (TransactionIdIsCurrentTransactionId(update_xid))
+                       rawxmax = HeapTupleHeaderGetRawXmax(mytup.t_data);
+                       if (old_infomask & HEAP_XMAX_IS_MULTI)
                        {
-                               UnlockReleaseBuffer(buf);
-                               return HeapTupleSelfUpdated;
-                       }
-                       else if (TransactionIdIsInProgress(update_xid))
-                       {
-                               LockBuffer(buf, BUFFER_LOCK_UNLOCK);
-                               /* No LockTupleTuplock here -- see heap_lock_updated_tuple */
-                               XactLockTableWait(update_xid);
-                               goto l4;
+                               int             nmembers;
+                               int             i;
+                               MultiXactMember *members;
+
+                               nmembers = GetMultiXactIdMembers(rawxmax, &members, false);
+                               for (i = 0; i < nmembers; i++)
+                               {
+                                       HTSU_Result     res;
+
+                                       res = test_lockmode_for_conflict(members[i].status,
+                                                                                                        members[i].xid,
+                                                                                                        mode, &needwait);
+
+                                       if (needwait)
+                                       {
+                                               LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+                                               XactLockTableWait(members[i].xid);
+                                               pfree(members);
+                                               goto l4;
+                                       }
+                                       if (res != HeapTupleMayBeUpdated)
+                                       {
+                                               UnlockReleaseBuffer(buf);
+                                               pfree(members);
+                                               return res;
+                                       }
+                               }
+                               if (members)
+                                       pfree(members);
                        }
-                       else if (TransactionIdDidAbort(update_xid))
-                               ;                               /* okay to proceed */
-                       else if (TransactionIdDidCommit(update_xid))
+                       else
                        {
-                               UnlockReleaseBuffer(buf);
-                               return HeapTupleUpdated;
+                               HTSU_Result     res;
+                               MultiXactStatus status;
+
+                               /*
+                                * For a non-multi Xmax, we first need to compute the
+                                * corresponding MultiXactStatus by using the infomask bits.
+                                */
+                               if (HEAP_XMAX_IS_LOCKED_ONLY(old_infomask))
+                               {
+                                       if (HEAP_XMAX_IS_KEYSHR_LOCKED(old_infomask))
+                                               status = MultiXactStatusForKeyShare;
+                                       else if (HEAP_XMAX_IS_SHR_LOCKED(old_infomask))
+                                               status = MultiXactStatusForShare;
+                                       else if (HEAP_XMAX_IS_EXCL_LOCKED(old_infomask))
+                                       {
+                                               if (old_infomask2 & HEAP_KEYS_UPDATED)
+                                                       status = MultiXactStatusForUpdate;
+                                               else
+                                                       status = MultiXactStatusForNoKeyUpdate;
+                                       }
+                                       else
+                                       {
+                                               /*
+                                                * LOCK_ONLY present alone (a pg_upgraded tuple
+                                                * marked as share-locked in the old cluster) shouldn't
+                                                * be seen in the middle of an update chain.
+                                                */
+                                               elog(ERROR, "invalid lock status in tuple");
+                                       }
+                               }
+                               else
+                               {
+                                       /* it's an update, but which kind? */
+                                       if (old_infomask2 & HEAP_KEYS_UPDATED)
+                                               status = MultiXactStatusUpdate;
+                                       else
+                                               status = MultiXactStatusNoKeyUpdate;
+                               }
+
+                               res = test_lockmode_for_conflict(status, rawxmax, mode,
+                                                                                                &needwait);
+                               if (needwait)
+                               {
+                                       LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+                                       XactLockTableWait(rawxmax);
+                                       goto l4;
+                               }
+                               if (res != HeapTupleMayBeUpdated)
+                               {
+                                       UnlockReleaseBuffer(buf);
+                                       return res;
+                               }
                        }
                }