]> granicus.if.org Git - postgresql/commitdiff
Prevent very-low-probability PANIC during PREPARE TRANSACTION.
authorTom Lane <tgl@sss.pgh.pa.us>
Mon, 14 Jan 2013 03:19:47 +0000 (22:19 -0500)
committerTom Lane <tgl@sss.pgh.pa.us>
Mon, 14 Jan 2013 03:20:22 +0000 (22:20 -0500)
The code in PostPrepare_Locks supposed that it could reassign locks to
the prepared transaction's dummy PGPROC by deleting the PROCLOCK table
entries and immediately creating new ones.  This was safe when that code
was written, but since we invented partitioning of the shared lock table,
it's not safe --- another process could steal away the PROCLOCK entry in
the short interval when it's on the freelist.  Then, if we were otherwise
out of shared memory, PostPrepare_Locks would have to PANIC, since it's
too late to back out of the PREPARE at that point.

Fix by inventing a dynahash.c function to atomically update a hashtable
entry's key.  (This might possibly have other uses in future.)

This is an ancient bug that in principle we ought to back-patch, but the
odds of someone hitting it in the field seem really tiny, because (a) the
risk window is small, and (b) nobody runs servers with maxed-out lock
tables for long, because they'll be getting non-PANIC out-of-memory errors
anyway.  So fixing it in HEAD seems sufficient, at least until the new
code has gotten some testing.

src/backend/storage/lmgr/lock.c
src/backend/utils/hash/dynahash.c
src/include/utils/hsearch.h

index 5f2239f4bf044568ce993b300e135b25499a0816..f2cf5c6b8eb91a9c114eee1a51098eec893d856e 100644 (file)
@@ -3028,7 +3028,6 @@ PostPrepare_Locks(TransactionId xid)
        LOCK       *lock;
        PROCLOCK   *proclock;
        PROCLOCKTAG proclocktag;
-       bool            found;
        int                     partition;
 
        /* This is a critical section: any error means big trouble */
@@ -3114,10 +3113,8 @@ PostPrepare_Locks(TransactionId xid)
                while (proclock)
                {
                        PROCLOCK   *nextplock;
-                       LOCKMASK        holdMask;
-                       PROCLOCK   *newproclock;
 
-                       /* Get link first, since we may unlink/delete this proclock */
+                       /* Get link first, since we may unlink/relink this proclock */
                        nextplock = (PROCLOCK *)
                                SHMQueueNext(procLocks, &proclock->procLink,
                                                         offsetof(PROCLOCK, procLink));
@@ -3145,64 +3142,42 @@ PostPrepare_Locks(TransactionId xid)
                        if (proclock->releaseMask != proclock->holdMask)
                                elog(PANIC, "we seem to have dropped a bit somewhere");
 
-                       holdMask = proclock->holdMask;
-
                        /*
                         * We cannot simply modify proclock->tag.myProc to reassign
                         * ownership of the lock, because that's part of the hash key and
-                        * the proclock would then be in the wrong hash chain.  So, unlink
-                        * and delete the old proclock; create a new one with the right
-                        * contents; and link it into place.  We do it in this order to be
-                        * certain we won't run out of shared memory (the way dynahash.c
-                        * works, the deleted object is certain to be available for
-                        * reallocation).
+                        * the proclock would then be in the wrong hash chain.  Instead
+                        * use hash_update_hash_key.  (We used to create a new hash entry,
+                        * but that risks out-of-memory failure if other processes are
+                        * busy making proclocks too.)  We must unlink the proclock from
+                        * our procLink chain and put it into the new proc's chain, too.
+                        *
+                        * Note: the updated proclock hash key will still belong to the
+                        * same hash partition, cf proclock_hash().  So the partition
+                        * lock we already hold is sufficient for this.
                         */
-                       SHMQueueDelete(&proclock->lockLink);
                        SHMQueueDelete(&proclock->procLink);
-                       if (!hash_search(LockMethodProcLockHash,
-                                                        (void *) &(proclock->tag),
-                                                        HASH_REMOVE, NULL))
-                               elog(PANIC, "proclock table corrupted");
 
                        /*
-                        * Create the hash key for the new proclock table.
+                        * Create the new hash key for the proclock.
                         */
                        proclocktag.myLock = lock;
                        proclocktag.myProc = newproc;
 
-                       newproclock = (PROCLOCK *) hash_search(LockMethodProcLockHash,
-                                                                                                  (void *) &proclocktag,
-                                                                                                  HASH_ENTER_NULL, &found);
-                       if (!newproclock)
-                               ereport(PANIC,  /* should not happen */
-                                               (errcode(ERRCODE_OUT_OF_MEMORY),
-                                                errmsg("out of shared memory"),
-                                                errdetail("Not enough memory for reassigning the prepared transaction's locks.")));
-
                        /*
-                        * If new, initialize the new entry
+                        * Update the proclock.  We should not find any existing entry
+                        * for the same hash key, since there can be only one entry for
+                        * any given lock with my own proc.
                         */
-                       if (!found)
-                       {
-                               newproclock->holdMask = 0;
-                               newproclock->releaseMask = 0;
-                               /* Add new proclock to appropriate lists */
-                               SHMQueueInsertBefore(&lock->procLocks, &newproclock->lockLink);
-                               SHMQueueInsertBefore(&(newproc->myProcLocks[partition]),
-                                                                        &newproclock->procLink);
-                               PROCLOCK_PRINT("PostPrepare_Locks: new", newproclock);
-                       }
-                       else
-                       {
-                               PROCLOCK_PRINT("PostPrepare_Locks: found", newproclock);
-                               Assert((newproclock->holdMask & ~lock->grantMask) == 0);
-                       }
+                       if (!hash_update_hash_key(LockMethodProcLockHash,
+                                                                         (void *) proclock,
+                                                                         (void *) &proclocktag))
+                               elog(PANIC, "duplicate entry found while reassigning a prepared transaction's locks");
 
-                       /*
-                        * Pass over the identified lock ownership.
-                        */
-                       Assert((newproclock->holdMask & holdMask) == 0);
-                       newproclock->holdMask |= holdMask;
+                       /* Re-link into the new proc's proclock list */
+                       SHMQueueInsertBefore(&(newproc->myProcLocks[partition]),
+                                                                &proclock->procLink);
+
+                       PROCLOCK_PRINT("PostPrepare_Locks: updated", proclock);
 
        next_item:
                        proclock = nextplock;
index 83d2c5d2666da02b3c700e01b22979f5ebbab7e9..1d473218a7761bc051354ddcfd0c09059d2e3507 100644 (file)
@@ -183,6 +183,12 @@ struct HTAB
  */
 #define ELEMENTKEY(helem)  (((char *)(helem)) + MAXALIGN(sizeof(HASHELEMENT)))
 
+/*
+ * Obtain element pointer given pointer to key
+ */
+#define ELEMENT_FROM_KEY(key)  \
+       ((HASHELEMENT *) (((char *) (key)) - MAXALIGN(sizeof(HASHELEMENT))))
+
 /*
  * Fast MOD arithmetic, assuming that y is a power of 2 !
  */
@@ -987,6 +993,144 @@ hash_search_with_hash_value(HTAB *hashp,
        return NULL;                            /* keep compiler quiet */
 }
 
+/*
+ * hash_update_hash_key -- change the hash key of an existing table entry
+ *
+ * This is equivalent to removing the entry, making a new entry, and copying
+ * over its data, except that the entry never goes to the table's freelist.
+ * Therefore this cannot suffer an out-of-memory failure, even if there are
+ * other processes operating in other partitions of the hashtable.
+ *
+ * Returns TRUE if successful, FALSE if the requested new hash key is already
+ * present.  Throws error if the specified entry pointer isn't actually a
+ * table member.
+ *
+ * NB: currently, there is no special case for old and new hash keys being
+ * identical, which means we'll report FALSE for that situation.  This is
+ * preferable for existing uses.
+ *
+ * NB: for a partitioned hashtable, caller must hold lock on both relevant
+ * partitions, if the new hash key would belong to a different partition.
+ */
+bool
+hash_update_hash_key(HTAB *hashp,
+                                        void *existingEntry,
+                                        const void *newKeyPtr)
+{
+       HASHELEMENT *existingElement = ELEMENT_FROM_KEY(existingEntry);
+       HASHHDR    *hctl = hashp->hctl;
+       uint32          newhashvalue;
+       Size            keysize;
+       uint32          bucket;
+       long            segment_num;
+       long            segment_ndx;
+       HASHSEGMENT segp;
+       HASHBUCKET      currBucket;
+       HASHBUCKET *prevBucketPtr;
+       HASHBUCKET *oldPrevPtr;
+       HashCompareFunc match;
+
+#if HASH_STATISTICS
+       hash_accesses++;
+       hctl->accesses++;
+#endif
+
+       /* disallow updates if frozen */
+       if (hashp->frozen)
+               elog(ERROR, "cannot update in frozen hashtable \"%s\"",
+                        hashp->tabname);
+
+       /*
+        * Lookup the existing element using its saved hash value.  We need to
+        * do this to be able to unlink it from its hash chain, but as a side
+        * benefit we can verify the validity of the passed existingEntry pointer.
+        */
+       bucket = calc_bucket(hctl, existingElement->hashvalue);
+
+       segment_num = bucket >> hashp->sshift;
+       segment_ndx = MOD(bucket, hashp->ssize);
+
+       segp = hashp->dir[segment_num];
+
+       if (segp == NULL)
+               hash_corrupted(hashp);
+
+       prevBucketPtr = &segp[segment_ndx];
+       currBucket = *prevBucketPtr;
+
+       while (currBucket != NULL)
+       {
+               if (currBucket == existingElement)
+                       break;
+               prevBucketPtr = &(currBucket->link);
+               currBucket = *prevBucketPtr;
+       }
+
+       if (currBucket == NULL)
+               elog(ERROR, "hash_update_hash_key argument is not in hashtable \"%s\"",
+                        hashp->tabname);
+
+       oldPrevPtr = prevBucketPtr;
+
+       /*
+        * Now perform the equivalent of a HASH_ENTER operation to locate the
+        * hash chain we want to put the entry into.
+        */
+       newhashvalue = hashp->hash(newKeyPtr, hashp->keysize);
+
+       bucket = calc_bucket(hctl, newhashvalue);
+
+       segment_num = bucket >> hashp->sshift;
+       segment_ndx = MOD(bucket, hashp->ssize);
+
+       segp = hashp->dir[segment_num];
+
+       if (segp == NULL)
+               hash_corrupted(hashp);
+
+       prevBucketPtr = &segp[segment_ndx];
+       currBucket = *prevBucketPtr;
+
+       /*
+        * Follow collision chain looking for matching key
+        */
+       match = hashp->match;           /* save one fetch in inner loop */
+       keysize = hashp->keysize;       /* ditto */
+
+       while (currBucket != NULL)
+       {
+               if (currBucket->hashvalue == newhashvalue &&
+                       match(ELEMENTKEY(currBucket), newKeyPtr, keysize) == 0)
+                       break;
+               prevBucketPtr = &(currBucket->link);
+               currBucket = *prevBucketPtr;
+#if HASH_STATISTICS
+               hash_collisions++;
+               hctl->collisions++;
+#endif
+       }
+
+       if (currBucket != NULL)
+               return false;                   /* collision with an existing entry */
+
+       currBucket = existingElement;
+
+       /* OK to remove record from old hash bucket's chain. */
+       *oldPrevPtr = currBucket->link;
+
+       /* link into new hashbucket chain */
+       *prevBucketPtr = currBucket;
+       currBucket->link = NULL;
+
+       /* copy new key into record */
+       currBucket->hashvalue = newhashvalue;
+       hashp->keycopy(ELEMENTKEY(currBucket), newKeyPtr, keysize);
+
+       /* rest of record is untouched */
+
+       return true;
+}
+
 /*
  * create a new entry if possible
  */
index c9ce2220e48de853f197ca90f3f75ff8e9b52020..6ffd6fcac6906c1df834af094d760ae367a045f3 100644 (file)
@@ -128,6 +128,8 @@ extern uint32 get_hash_value(HTAB *hashp, const void *keyPtr);
 extern void *hash_search_with_hash_value(HTAB *hashp, const void *keyPtr,
                                                        uint32 hashvalue, HASHACTION action,
                                                        bool *foundPtr);
+extern bool hash_update_hash_key(HTAB *hashp, void *existingEntry,
+                                        const void *newKeyPtr);
 extern long hash_get_num_entries(HTAB *hashp);
 extern void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp);
 extern void *hash_seq_search(HASH_SEQ_STATUS *status);