]> granicus.if.org Git - postgresql/commitdiff
Repair problems with VACUUM destroying t_ctid chains too soon, and with
authorTom Lane <tgl@sss.pgh.pa.us>
Sat, 20 Aug 2005 00:40:32 +0000 (00:40 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Sat, 20 Aug 2005 00:40:32 +0000 (00:40 +0000)
insufficient paranoia in code that follows t_ctid links.  (We must do both
because even with VACUUM doing it properly, the intermediate state with
a dangling t_ctid link is visible concurrently during lazy VACUUM, and
could be seen afterwards if either type of VACUUM crashes partway through.)
Also try to improve documentation about what's going on.  Patch is a bit
bulky because passing the XMAX information around required changing the
APIs of some low-level heapam.c routines, but it's not conceptually very
complicated.  Per trouble report from Teodor and subsequent analysis.
This needs to be back-patched, but I'll do that after 8.1 beta is out.

src/backend/access/heap/heapam.c
src/backend/commands/async.c
src/backend/commands/trigger.c
src/backend/commands/vacuum.c
src/backend/executor/execMain.c
src/backend/utils/time/tqual.c
src/include/access/heapam.h
src/include/access/htup.h
src/include/executor/executor.h
src/include/utils/tqual.h

index 6e70ab25c106445374102ccd2188461abe667953..46c7c4da73f7aaf2107e1ddb9be2ef2dd99d288a 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.197 2005/08/12 01:35:54 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.198 2005/08/20 00:39:51 tgl Exp $
  *
  *
  * INTERFACE ROUTINES
@@ -22,7 +22,7 @@
  *             heap_rescan             - restart a relation scan
  *             heap_endscan    - end relation scan
  *             heap_getnext    - retrieve next tuple in scan
- *             heap_fetch              - retrieve tuple with tid
+ *             heap_fetch              - retrieve tuple with given tid
  *             heap_insert             - insert tuple into a relation
  *             heap_delete             - delete a tuple from a relation
  *             heap_update             - replace a tuple in a relation with another tuple
@@ -152,7 +152,7 @@ heapgettup(Relation relation,
                tid = NULL;
        }
 
-       tuple->t_tableOid = relation->rd_id;
+       tuple->t_tableOid = RelationGetRelid(relation);
 
        /*
         * return null immediately if relation is empty
@@ -800,10 +800,13 @@ heap_getnext(HeapScanDesc scan, ScanDirection direction)
  * keep_buf = false, the pin is released and *userbuf is set to InvalidBuffer.
  *
  * It is somewhat inconsistent that we ereport() on invalid block number but
- * return false on invalid item number.  This is historical.  The only
- * justification I can see is that the caller can relatively easily check the
- * block number for validity, but cannot check the item number without reading
- * the page himself.
+ * return false on invalid item number.  There are a couple of reasons though.
+ * One is that the caller can relatively easily check the block number for
+ * validity, but cannot check the item number without reading the page
+ * himself.  Another is that when we are following a t_ctid link, we can be
+ * reasonably confident that the page number is valid (since VACUUM shouldn't
+ * truncate off the destination page without having killed the referencing
+ * tuple first), but the item number might well not be good.
  */
 bool
 heap_fetch(Relation relation,
@@ -906,7 +909,7 @@ heap_release_fetch(Relation relation,
        tuple->t_datamcxt = NULL;
        tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
        tuple->t_len = ItemIdGetLength(lp);
-       tuple->t_tableOid = relation->rd_id;
+       tuple->t_tableOid = RelationGetRelid(relation);
 
        /*
         * check time qualification of tuple, then release lock
@@ -950,83 +953,129 @@ heap_release_fetch(Relation relation,
 
 /*
  *     heap_get_latest_tid -  get the latest tid of a specified tuple
+ *
+ * Actually, this gets the latest version that is visible according to
+ * the passed snapshot.  You can pass SnapshotDirty to get the very latest,
+ * possibly uncommitted version.
+ *
+ * *tid is both an input and an output parameter: it is updated to
+ * show the latest version of the row.  Note that it will not be changed
+ * if no version of the row passes the snapshot test.
  */
-ItemPointer
+void
 heap_get_latest_tid(Relation relation,
                                        Snapshot snapshot,
                                        ItemPointer tid)
 {
-       ItemId          lp = NULL;
-       Buffer          buffer;
-       PageHeader      dp;
-       OffsetNumber offnum;
-       HeapTupleData tp;
-       HeapTupleHeader t_data;
+       BlockNumber     blk;
        ItemPointerData ctid;
-       bool            invalidBlock,
-                               linkend,
-                               valid;
+       TransactionId priorXmax;
 
-       /*
-        * get the buffer from the relation descriptor Note that this does a
-        * buffer pin.
-        */
-       buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
-       LockBuffer(buffer, BUFFER_LOCK_SHARE);
+       /* this is to avoid Assert failures on bad input */
+       if (!ItemPointerIsValid(tid))
+               return;
 
        /*
-        * get the item line pointer corresponding to the requested tid
+        * Since this can be called with user-supplied TID, don't trust the
+        * input too much.  (RelationGetNumberOfBlocks is an expensive check,
+        * so we don't check t_ctid links again this way.  Note that it would
+        * not do to call it just once and save the result, either.)
         */
-       dp = (PageHeader) BufferGetPage(buffer);
-       offnum = ItemPointerGetOffsetNumber(tid);
-       invalidBlock = true;
-       if (!PageIsNew(dp))
-       {
-               lp = PageGetItemId(dp, offnum);
-               if (ItemIdIsUsed(lp))
-                       invalidBlock = false;
-       }
-       if (invalidBlock)
-       {
-               LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-               ReleaseBuffer(buffer);
-               return NULL;
-       }
+       blk = ItemPointerGetBlockNumber(tid);
+       if (blk >= RelationGetNumberOfBlocks(relation))
+               elog(ERROR, "block number %u is out of range for relation \"%s\"",
+                        blk, RelationGetRelationName(relation));
 
        /*
-        * more sanity checks
+        * Loop to chase down t_ctid links.  At top of loop, ctid is the
+        * tuple we need to examine, and *tid is the TID we will return if
+        * ctid turns out to be bogus.
+        *
+        * Note that we will loop until we reach the end of the t_ctid chain.
+        * Depending on the snapshot passed, there might be at most one visible
+        * version of the row, but we don't try to optimize for that.
         */
+       ctid = *tid;
+       priorXmax = InvalidTransactionId;       /* cannot check first XMIN */
+       for (;;)
+       {
+               Buffer          buffer;
+               PageHeader      dp;
+               OffsetNumber offnum;
+               ItemId          lp;
+               HeapTupleData tp;
+               bool            valid;
 
-       tp.t_datamcxt = NULL;
-       t_data = tp.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
-       tp.t_len = ItemIdGetLength(lp);
-       tp.t_self = *tid;
-       ctid = tp.t_data->t_ctid;
+               /*
+                * Read, pin, and lock the page.
+                */
+               buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(&ctid));
+               LockBuffer(buffer, BUFFER_LOCK_SHARE);
+               dp = (PageHeader) BufferGetPage(buffer);
 
-       /*
-        * check time qualification of tid
-        */
+               /*
+                * Check for bogus item number.  This is not treated as an error
+                * condition because it can happen while following a t_ctid link.
+                * We just assume that the prior tid is OK and return it unchanged.
+                */
+               offnum = ItemPointerGetOffsetNumber(&ctid);
+               if (offnum < FirstOffsetNumber || offnum > PageGetMaxOffsetNumber(dp))
+               {
+                       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+                       ReleaseBuffer(buffer);
+                       break;
+               }
+               lp = PageGetItemId(dp, offnum);
+               if (!ItemIdIsUsed(lp))
+               {
+                       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+                       ReleaseBuffer(buffer);
+                       break;
+               }
 
-       HeapTupleSatisfies(&tp, relation, buffer, dp,
-                                          snapshot, 0, NULL, valid);
+               /* OK to access the tuple */
+               tp.t_self = ctid;
+               tp.t_datamcxt = NULL;
+               tp.t_data = (HeapTupleHeader) PageGetItem(dp, lp);
+               tp.t_len = ItemIdGetLength(lp);
 
-       linkend = true;
-       if ((t_data->t_infomask & HEAP_XMIN_COMMITTED) != 0 &&
-               !ItemPointerEquals(tid, &ctid))
-               linkend = false;
+               /*
+                * After following a t_ctid link, we might arrive at an unrelated
+                * tuple.  Check for XMIN match.
+                */
+               if (TransactionIdIsValid(priorXmax) &&
+                       !TransactionIdEquals(priorXmax, HeapTupleHeaderGetXmin(tp.t_data)))
+               {
+                       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+                       ReleaseBuffer(buffer);
+                       break;
+               }
 
-       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-       ReleaseBuffer(buffer);
+               /*
+                * Check time qualification of tuple; if visible, set it as the new
+                * result candidate.
+                */
+               HeapTupleSatisfies(&tp, relation, buffer, dp,
+                                                  snapshot, 0, NULL, valid);
+               if (valid)
+                       *tid = ctid;
 
-       if (!valid)
-       {
-               if (linkend)
-                       return NULL;
-               heap_get_latest_tid(relation, snapshot, &ctid);
-               *tid = ctid;
-       }
+               /*
+                * If there's a valid t_ctid link, follow it, else we're done.
+                */
+               if ((tp.t_data->t_infomask & (HEAP_XMAX_INVALID | HEAP_IS_LOCKED)) ||
+                       ItemPointerEquals(&tp.t_self, &tp.t_data->t_ctid))
+               {
+                       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+                       ReleaseBuffer(buffer);
+                       break;
+               }
 
-       return tid;
+               ctid = tp.t_data->t_ctid;
+               priorXmax = HeapTupleHeaderGetXmax(tp.t_data);
+               LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+               ReleaseBuffer(buffer);
+       }                               /* end of loop */
 }
 
 /*
@@ -1083,7 +1132,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
        HeapTupleHeaderSetCmin(tup->t_data, cid);
        HeapTupleHeaderSetXmax(tup->t_data, 0);         /* zero out Datum fields */
        HeapTupleHeaderSetCmax(tup->t_data, 0);         /* for cleanliness */
-       tup->t_tableOid = relation->rd_id;
+       tup->t_tableOid = RelationGetRelid(relation);
 
        /*
         * If the new tuple is too big for storage or contains already toasted
@@ -1197,29 +1246,34 @@ simple_heap_insert(Relation relation, HeapTuple tup)
 }
 
 /*
- *     heap_delete             - delete a tuple
+ *     heap_delete - delete a tuple
  *
  * NB: do not call this directly unless you are prepared to deal with
  * concurrent-update conditions.  Use simple_heap_delete instead.
  *
- *     relation - table to be modified
+ *     relation - table to be modified (caller must hold suitable lock)
  *     tid - TID of tuple to be deleted
  *     ctid - output parameter, used only for failure case (see below)
- *     cid - delete command ID to use in verifying tuple visibility
+ *     update_xmax - output parameter, used only for failure case (see below)
+ *     cid - delete command ID (used for visibility test, and stored into
+ *             cmax if successful)
  *     crosscheck - if not InvalidSnapshot, also check tuple against this
  *     wait - true if should wait for any conflicting update to commit/abort
  *
  * Normal, successful return value is HeapTupleMayBeUpdated, which
  * actually means we did delete it.  Failure return codes are
  * HeapTupleSelfUpdated, HeapTupleUpdated, or HeapTupleBeingUpdated
- * (the last only possible if wait == false).  On a failure return,
- * *ctid is set to the ctid link of the target tuple (possibly a later
- * version of the row).
+ * (the last only possible if wait == false).
+ *
+ * In the failure cases, the routine returns the tuple's t_ctid and t_xmax.
+ * If t_ctid is the same as tid, the tuple was deleted; if different, the
+ * tuple was updated, and t_ctid is the location of the replacement tuple.
+ * (t_xmax is needed to verify that the replacement tuple matches.)
  */
 HTSU_Result
 heap_delete(Relation relation, ItemPointer tid,
-                       ItemPointer ctid, CommandId cid,
-                       Snapshot crosscheck, bool wait)
+                       ItemPointer ctid, TransactionId *update_xmax,
+                       CommandId cid, Snapshot crosscheck, bool wait)
 {
        HTSU_Result     result;
        TransactionId xid = GetCurrentTransactionId();
@@ -1236,11 +1290,11 @@ heap_delete(Relation relation, ItemPointer tid,
 
        dp = (PageHeader) BufferGetPage(buffer);
        lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(tid));
+
        tp.t_datamcxt = NULL;
-       tp.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
+       tp.t_data = (HeapTupleHeader) PageGetItem(dp, lp);
        tp.t_len = ItemIdGetLength(lp);
        tp.t_self = *tid;
-       tp.t_tableOid = relation->rd_id;
 
 l1:
        result = HeapTupleSatisfiesUpdate(tp.t_data, cid, buffer);
@@ -1360,7 +1414,9 @@ l1:
                Assert(result == HeapTupleSelfUpdated ||
                           result == HeapTupleUpdated ||
                           result == HeapTupleBeingUpdated);
+               Assert(!(tp.t_data->t_infomask & HEAP_XMAX_INVALID));
                *ctid = tp.t_data->t_ctid;
+               *update_xmax = HeapTupleHeaderGetXmax(tp.t_data);
                LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
                ReleaseBuffer(buffer);
                if (have_tuple_lock)
@@ -1457,11 +1513,12 @@ l1:
 void
 simple_heap_delete(Relation relation, ItemPointer tid)
 {
-       ItemPointerData ctid;
        HTSU_Result             result;
+       ItemPointerData update_ctid;
+       TransactionId update_xmax;
 
        result = heap_delete(relation, tid,
-                                                &ctid,
+                                                &update_ctid, &update_xmax,
                                                 GetCurrentCommandId(), InvalidSnapshot,
                                                 true /* wait for commit */ );
        switch (result)
@@ -1491,27 +1548,33 @@ simple_heap_delete(Relation relation, ItemPointer tid)
  * NB: do not call this directly unless you are prepared to deal with
  * concurrent-update conditions.  Use simple_heap_update instead.
  *
- *     relation - table to be modified
+ *     relation - table to be modified (caller must hold suitable lock)
  *     otid - TID of old tuple to be replaced
  *     newtup - newly constructed tuple data to store
  *     ctid - output parameter, used only for failure case (see below)
- *     cid - update command ID to use in verifying old tuple visibility
+ *     update_xmax - output parameter, used only for failure case (see below)
+ *     cid - update command ID (used for visibility test, and stored into
+ *             cmax/cmin if successful)
  *     crosscheck - if not InvalidSnapshot, also check old tuple against this
  *     wait - true if should wait for any conflicting update to commit/abort
  *
  * Normal, successful return value is HeapTupleMayBeUpdated, which
  * actually means we *did* update it.  Failure return codes are
  * HeapTupleSelfUpdated, HeapTupleUpdated, or HeapTupleBeingUpdated
- * (the last only possible if wait == false).  On a failure return,
- * *ctid is set to the ctid link of the old tuple (possibly a later
- * version of the row).
+ * (the last only possible if wait == false).
+ *
  * On success, newtup->t_self is set to the TID where the new tuple
  * was inserted.
+ *
+ * In the failure cases, the routine returns the tuple's t_ctid and t_xmax.
+ * If t_ctid is the same as otid, the tuple was deleted; if different, the
+ * tuple was updated, and t_ctid is the location of the replacement tuple.
+ * (t_xmax is needed to verify that the replacement tuple matches.)
  */
 HTSU_Result
 heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
-                       ItemPointer ctid, CommandId cid,
-                       Snapshot crosscheck, bool wait)
+                       ItemPointer ctid, TransactionId *update_xmax,
+                       CommandId cid, Snapshot crosscheck, bool wait)
 {
        HTSU_Result     result;
        TransactionId xid = GetCurrentTransactionId();
@@ -1664,7 +1727,9 @@ l2:
                Assert(result == HeapTupleSelfUpdated ||
                           result == HeapTupleUpdated ||
                           result == HeapTupleBeingUpdated);
+               Assert(!(oldtup.t_data->t_infomask & HEAP_XMAX_INVALID));
                *ctid = oldtup.t_data->t_ctid;
+               *update_xmax = HeapTupleHeaderGetXmax(oldtup.t_data);
                LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
                ReleaseBuffer(buffer);
                if (have_tuple_lock)
@@ -1878,11 +1943,12 @@ l2:
 void
 simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
 {
-       ItemPointerData ctid;
        HTSU_Result             result;
+       ItemPointerData update_ctid;
+       TransactionId update_xmax;
 
        result = heap_update(relation, otid, tup,
-                                                &ctid,
+                                                &update_ctid, &update_xmax,
                                                 GetCurrentCommandId(), InvalidSnapshot,
                                                 true /* wait for commit */ );
        switch (result)
@@ -1907,7 +1973,34 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
 }
 
 /*
- *     heap_lock_tuple         - lock a tuple in shared or exclusive mode
+ *     heap_lock_tuple - lock a tuple in shared or exclusive mode
+ *
+ * Note that this acquires a buffer pin, which the caller must release.
+ *
+ * Input parameters:
+ *     relation: relation containing tuple (caller must hold suitable lock)
+ *     tuple->t_self: TID of tuple to lock (rest of struct need not be valid)
+ *     cid: current command ID (used for visibility test, and stored into
+ *             tuple's cmax if lock is successful)
+ *     mode: indicates if shared or exclusive tuple lock is desired
+ *     nowait: if true, ereport rather than blocking if lock not available
+ *
+ * Output parameters:
+ *     *tuple: all fields filled in
+ *     *buffer: set to buffer holding tuple (pinned but not locked at exit)
+ *     *ctid: set to tuple's t_ctid, but only in failure cases
+ *     *update_xmax: set to tuple's xmax, but only in failure cases
+ *
+ * Function result may be:
+ *     HeapTupleMayBeUpdated: lock was successfully acquired
+ *     HeapTupleSelfUpdated: lock failed because tuple updated by self
+ *     HeapTupleUpdated: lock failed because tuple updated by other xact
+ *
+ * In the failure cases, the routine returns the tuple's t_ctid and t_xmax.
+ * If t_ctid is the same as t_self, the tuple was deleted; if different, the
+ * tuple was updated, and t_ctid is the location of the replacement tuple.
+ * (t_xmax is needed to verify that the replacement tuple matches.)
+ *
  *
  * NOTES: because the shared-memory lock table is of finite size, but users
  * could reasonably want to lock large numbers of tuples, we do not rely on
@@ -1943,7 +2036,8 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
  */
 HTSU_Result
 heap_lock_tuple(Relation relation, HeapTuple tuple, Buffer *buffer,
-                                CommandId cid, LockTupleMode mode, bool nowait)
+                               ItemPointer ctid, TransactionId *update_xmax,
+                               CommandId cid, LockTupleMode mode, bool nowait)
 {
        HTSU_Result     result;
        ItemPointer tid = &(tuple->t_self);
@@ -1961,9 +2055,12 @@ heap_lock_tuple(Relation relation, HeapTuple tuple, Buffer *buffer,
 
        dp = (PageHeader) BufferGetPage(*buffer);
        lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(tid));
+       Assert(ItemIdIsUsed(lp));
+
        tuple->t_datamcxt = NULL;
        tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
        tuple->t_len = ItemIdGetLength(lp);
+       tuple->t_tableOid = RelationGetRelid(relation);
 
 l3:
        result = HeapTupleSatisfiesUpdate(tuple->t_data, cid, *buffer);
@@ -2112,14 +2209,13 @@ l3:
 
        if (result != HeapTupleMayBeUpdated)
        {
-               ItemPointerData newctid = tuple->t_data->t_ctid;
-
                Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
+               Assert(!(tuple->t_data->t_infomask & HEAP_XMAX_INVALID));
+               *ctid = tuple->t_data->t_ctid;
+               *update_xmax = HeapTupleHeaderGetXmax(tuple->t_data);
                LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
                if (have_tuple_lock)
                        UnlockTuple(relation, tid, tuple_lock_type);
-               /* can't overwrite t_self (== *tid) until after above Unlock */
-               tuple->t_self = newctid;
                return result;
        }
 
index 142b02dfaf88d7cf9eabe16e022c59379ab0b5f5..e2c6203891d48fcb9951bc9142a286affa1fe2a7 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.123 2005/06/17 22:32:43 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.124 2005/08/20 00:39:53 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -550,8 +550,9 @@ AtCommit_Notify(void)
                        }
                        else if (listener->notification == 0)
                        {
-                               ItemPointerData ctid;
                                HTSU_Result             result;
+                               ItemPointerData update_ctid;
+                               TransactionId update_xmax;
 
                                rTuple = heap_modifytuple(lTuple, tdesc,
                                                                                  value, nulls, repl);
@@ -573,7 +574,7 @@ AtCommit_Notify(void)
                                 * heap_update calls.
                                 */
                                result = heap_update(lRel, &lTuple->t_self, rTuple,
-                                                                        &ctid,
+                                                                        &update_ctid, &update_xmax,
                                                                         GetCurrentCommandId(), InvalidSnapshot,
                                                                         false /* no wait for commit */ );
                                switch (result)
@@ -585,7 +586,6 @@ AtCommit_Notify(void)
 
                                        case HeapTupleMayBeUpdated:
                                                /* done successfully */
-
 #ifdef NOT_USED                                        /* currently there are no indexes */
                                                CatalogUpdateIndexes(lRel, rTuple);
 #endif
index 4ea973ae7fa4422e7dbb1eb55757b3ab6ce39717..562f676f4b8b4f91919f794681544cf5a0a68bd1 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/commands/trigger.c,v 1.191 2005/08/12 01:35:57 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/commands/trigger.c,v 1.192 2005/08/20 00:39:54 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -1592,14 +1592,18 @@ GetTupleForTrigger(EState *estate, ResultRelInfo *relinfo,
        if (newSlot != NULL)
        {
                HTSU_Result     test;
+               ItemPointerData update_ctid;
+               TransactionId update_xmax;
+
+               *newSlot = NULL;
 
                /*
                 * lock tuple for update
                 */
-               *newSlot = NULL;
-               tuple.t_self = *tid;
 ltrmark:;
-               test = heap_lock_tuple(relation, &tuple, &buffer, cid,
+               tuple.t_self = *tid;
+               test = heap_lock_tuple(relation, &tuple, &buffer,
+                                                          &update_ctid, &update_xmax, cid,
                                                           LockTupleExclusive, false);
                switch (test)
                {
@@ -1617,15 +1621,18 @@ ltrmark:;
                                        ereport(ERROR,
                                                        (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
                                                         errmsg("could not serialize access due to concurrent update")));
-                               else if (!(ItemPointerEquals(&(tuple.t_self), tid)))
+                               else if (!ItemPointerEquals(&update_ctid, &tuple.t_self))
                                {
-                                       TupleTableSlot *epqslot = EvalPlanQual(estate,
-                                                                                        relinfo->ri_RangeTableIndex,
-                                                                                                               &(tuple.t_self));
-
-                                       if (!(TupIsNull(epqslot)))
+                                       /* it was updated, so look at the updated version */
+                                       TupleTableSlot *epqslot;
+
+                                       epqslot = EvalPlanQual(estate,
+                                                                                  relinfo->ri_RangeTableIndex,
+                                                                                  &update_ctid,
+                                                                                  update_xmax);
+                                       if (!TupIsNull(epqslot))
                                        {
-                                               *tid = tuple.t_self;
+                                               *tid = update_ctid;
                                                *newSlot = epqslot;
                                                goto ltrmark;
                                        }
@@ -1639,7 +1646,7 @@ ltrmark:;
 
                        default:
                                ReleaseBuffer(buffer);
-                               elog(ERROR, "invalid heap_lock_tuple status: %d", test);
+                               elog(ERROR, "unrecognized heap_lock_tuple status: %u", test);
                                return NULL;    /* keep compiler quiet */
                }
        }
@@ -1659,6 +1666,7 @@ ltrmark:;
                tuple.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
                tuple.t_len = ItemIdGetLength(lp);
                tuple.t_self = *tid;
+               tuple.t_tableOid = RelationGetRelid(relation);
        }
 
        result = heap_copytuple(&tuple);
index 9db91209448624a17730b2e79f3be5d25ac0ac0c..ef199c5f0734c56b3376d147b6315e008321c638 100644 (file)
@@ -13,7 +13,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.312 2005/07/29 19:30:03 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.313 2005/08/20 00:39:54 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "pgstat.h"
 
 
+/*
+ * VacPage structures keep track of each page on which we find useful
+ * amounts of free space.
+ */
 typedef struct VacPageData
 {
        BlockNumber blkno;                      /* BlockNumber of this Page */
@@ -73,30 +77,54 @@ typedef struct VacPageListData
 
 typedef VacPageListData *VacPageList;
 
+/*
+ * The "vtlinks" array keeps information about each recently-updated tuple
+ * ("recent" meaning its XMAX is too new to let us recycle the tuple).
+ * We store the tuple's own TID as well as its t_ctid (its link to the next
+ * newer tuple version).  Searching in this array allows us to follow update
+ * chains backwards from newer to older tuples.  When we move a member of an
+ * update chain, we must move *all* the live members of the chain, so that we
+ * can maintain their t_ctid link relationships (we must not just overwrite
+ * t_ctid in an existing tuple).
+ *
+ * Note: because t_ctid links can be stale (this would only occur if a prior
+ * VACUUM crashed partway through), it is possible that new_tid points to an
+ * empty slot or unrelated tuple.  We have to check the linkage as we follow
+ * it, just as is done in EvalPlanQual.
+ */
 typedef struct VTupleLinkData
 {
-       ItemPointerData new_tid;
-       ItemPointerData this_tid;
+       ItemPointerData new_tid;        /* t_ctid of an updated tuple */
+       ItemPointerData this_tid;       /* t_self of the tuple */
 } VTupleLinkData;
 
 typedef VTupleLinkData *VTupleLink;
 
+/*
+ * We use an array of VTupleMoveData to plan a chain tuple move fully
+ * before we do it.
+ */
 typedef struct VTupleMoveData
 {
        ItemPointerData tid;            /* tuple ID */
-       VacPage         vacpage;                /* where to move */
-       bool            cleanVpd;               /* clean vacpage before using */
+       VacPage         vacpage;                /* where to move it to */
+       bool            cleanVpd;               /* clean vacpage before using? */
 } VTupleMoveData;
 
 typedef VTupleMoveData *VTupleMove;
 
+/*
+ * VRelStats contains the data acquired by scan_heap for use later
+ */
 typedef struct VRelStats
 {
+       /* miscellaneous statistics */
        BlockNumber rel_pages;
        double          rel_tuples;
        Size            min_tlen;
        Size            max_tlen;
        bool            hasindex;
+       /* vtlinks array for tuple chain following - sorted by new_tid */
        int                     num_vtlinks;
        VTupleLink      vtlinks;
 } VRelStats;
@@ -117,6 +145,7 @@ typedef struct ExecContextData
        EState     *estate;
        TupleTableSlot *slot;
 } ExecContextData;
+
 typedef ExecContextData *ExecContext;
 
 static void
@@ -1802,18 +1831,25 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                        }
 
                        /*
-                        * If this tuple is in the chain of tuples created in updates
-                        * by "recent" transactions then we have to move all chain of
-                        * tuples to another places.
+                        * If this tuple is in a chain of tuples created in updates
+                        * by "recent" transactions then we have to move the whole chain
+                        * of tuples to other places, so that we can write new t_ctid
+                        * links that preserve the chain relationship.
+                        *
+                        * This test is complicated.  Read it as "if tuple is a recently
+                        * created updated version, OR if it is an obsoleted version".
+                        * (In the second half of the test, we needn't make any check
+                        * on XMAX --- it must be recently obsoleted, else scan_heap
+                        * would have deemed it removable.)
                         *
                         * NOTE: this test is not 100% accurate: it is possible for a
                         * tuple to be an updated one with recent xmin, and yet not
-                        * have a corresponding tuple in the vtlinks list.      Presumably
+                        * match any new_tid entry in the vtlinks list.  Presumably
                         * there was once a parent tuple with xmax matching the xmin,
                         * but it's possible that that tuple has been removed --- for
-                        * example, if it had xmin = xmax then
-                        * HeapTupleSatisfiesVacuum would deem it removable as soon as
-                        * the xmin xact completes.
+                        * example, if it had xmin = xmax and wasn't itself an updated
+                        * version, then HeapTupleSatisfiesVacuum would deem it removable
+                        * as soon as the xmin xact completes.
                         *
                         * To be on the safe side, we abandon the repair_frag process if
                         * we cannot find the parent tuple in vtlinks.  This may be
@@ -1854,72 +1890,85 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                        break;          /* out of walk-along-page loop */
                                }
 
-                               vtmove = (VTupleMove) palloc(100 * sizeof(VTupleMoveData));
-                               num_vtmove = 0;
-                               free_vtmove = 100;
-
                                /*
                                 * If this tuple is in the begin/middle of the chain then
-                                * we have to move to the end of chain.
+                                * we have to move to the end of chain.  As with any
+                                * t_ctid chase, we have to verify that each new tuple
+                                * is really the descendant of the tuple we came from.
                                 */
                                while (!(tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
                                                                                                  HEAP_IS_LOCKED)) &&
                                           !(ItemPointerEquals(&(tp.t_self),
                                                                                   &(tp.t_data->t_ctid))))
                                {
-                                       Page            Cpage;
-                                       ItemId          Citemid;
-                                       ItemPointerData Ctid;
-
-                                       Ctid = tp.t_data->t_ctid;
-                                       if (freeCbuf)
-                                               ReleaseBuffer(Cbuf);
-                                       freeCbuf = true;
-                                       Cbuf = ReadBuffer(onerel,
-                                                                         ItemPointerGetBlockNumber(&Ctid));
-                                       Cpage = BufferGetPage(Cbuf);
-                                       Citemid = PageGetItemId(Cpage,
-                                                                         ItemPointerGetOffsetNumber(&Ctid));
-                                       if (!ItemIdIsUsed(Citemid))
+                                       ItemPointerData nextTid;
+                                       TransactionId priorXmax;
+                                       Buffer          nextBuf;
+                                       Page            nextPage;
+                                       OffsetNumber nextOffnum;
+                                       ItemId          nextItemid;
+                                       HeapTupleHeader nextTdata;
+
+                                       nextTid = tp.t_data->t_ctid;
+                                       priorXmax = HeapTupleHeaderGetXmax(tp.t_data);
+                                       /* assume block# is OK (see heap_fetch comments) */
+                                       nextBuf = ReadBuffer(onerel,
+                                                                                ItemPointerGetBlockNumber(&nextTid));
+                                       nextPage = BufferGetPage(nextBuf);
+                                       /* If bogus or unused slot, assume tp is end of chain */
+                                       nextOffnum = ItemPointerGetOffsetNumber(&nextTid);
+                                       if (nextOffnum < FirstOffsetNumber ||
+                                               nextOffnum > PageGetMaxOffsetNumber(nextPage))
                                        {
-                                               /*
-                                                * This means that in the middle of chain there
-                                                * was tuple updated by older (than OldestXmin)
-                                                * xaction and this tuple is already deleted by
-                                                * me. Actually, upper part of chain should be
-                                                * removed and seems that this should be handled
-                                                * in scan_heap(), but it's not implemented at the
-                                                * moment and so we just stop shrinking here.
-                                                */
-                                               elog(DEBUG2, "child itemid in update-chain marked as unused --- can't continue repair_frag");
-                                               chain_move_failed = true;
-                                               break;  /* out of loop to move to chain end */
+                                               ReleaseBuffer(nextBuf);
+                                               break;
+                                       }
+                                       nextItemid = PageGetItemId(nextPage, nextOffnum);
+                                       if (!ItemIdIsUsed(nextItemid))
+                                       {
+                                               ReleaseBuffer(nextBuf);
+                                               break;
                                        }
+                                       /* if not matching XMIN, assume tp is end of chain */
+                                       nextTdata = (HeapTupleHeader) PageGetItem(nextPage,
+                                                                                                                         nextItemid);
+                                       if (!TransactionIdEquals(HeapTupleHeaderGetXmin(nextTdata),
+                                                                                        priorXmax))
+                                       {
+                                               ReleaseBuffer(nextBuf);
+                                               break;
+                                       }
+                                       /* OK, switch our attention to the next tuple in chain */
                                        tp.t_datamcxt = NULL;
-                                       tp.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
-                                       tp.t_self = Ctid;
-                                       tlen = tp.t_len = ItemIdGetLength(Citemid);
-                               }
-                               if (chain_move_failed)
-                               {
+                                       tp.t_data = nextTdata;
+                                       tp.t_self = nextTid;
+                                       tlen = tp.t_len = ItemIdGetLength(nextItemid);
                                        if (freeCbuf)
                                                ReleaseBuffer(Cbuf);
-                                       pfree(vtmove);
-                                       break;          /* out of walk-along-page loop */
+                                       Cbuf = nextBuf;
+                                       freeCbuf = true;
                                }
 
+                               /* Set up workspace for planning the chain move */
+                               vtmove = (VTupleMove) palloc(100 * sizeof(VTupleMoveData));
+                               num_vtmove = 0;
+                               free_vtmove = 100;
+
                                /*
-                                * Check if all items in chain can be moved
+                                * Now, walk backwards up the chain (towards older tuples)
+                                * and check if all items in chain can be moved.  We record
+                                * all the moves that need to be made in the vtmove array.
                                 */
                                for (;;)
                                {
                                        Buffer          Pbuf;
                                        Page            Ppage;
                                        ItemId          Pitemid;
-                                       HeapTupleData Ptp;
+                                       HeapTupleHeader PTdata;
                                        VTupleLinkData vtld,
                                                           *vtlp;
 
+                                       /* Identify a target page to move this tuple to */
                                        if (to_vacpage == NULL ||
                                                !enough_space(to_vacpage, tlen))
                                        {
@@ -1942,6 +1991,8 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                        if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
                                                to_vacpage->free -= sizeof(ItemIdData);
                                        (to_vacpage->offsets_used)++;
+
+                                       /* Add an entry to vtmove list */
                                        if (free_vtmove == 0)
                                        {
                                                free_vtmove = 1000;
@@ -1959,13 +2010,13 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                        free_vtmove--;
                                        num_vtmove++;
 
-                                       /* At beginning of chain? */
+                                       /* Done if at beginning of chain */
                                        if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
                                                TransactionIdPrecedes(HeapTupleHeaderGetXmin(tp.t_data),
                                                                                          OldestXmin))
-                                               break;
+                                               break;          /* out of check-all-items loop */
 
-                                       /* No, move to tuple with prior row version */
+                                       /* Move to tuple with prior row version */
                                        vtld.new_tid = tp.t_self;
                                        vtlp = (VTupleLink)
                                                vac_bsearch((void *) &vtld,
@@ -1989,18 +2040,17 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                        /* this can't happen since we saw tuple earlier: */
                                        if (!ItemIdIsUsed(Pitemid))
                                                elog(ERROR, "parent itemid marked as unused");
-                                       Ptp.t_datamcxt = NULL;
-                                       Ptp.t_data = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
+                                       PTdata = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
 
                                        /* ctid should not have changed since we saved it */
                                        Assert(ItemPointerEquals(&(vtld.new_tid),
-                                                                                        &(Ptp.t_data->t_ctid)));
+                                                                                        &(PTdata->t_ctid)));
 
                                        /*
-                                        * Read above about cases when !ItemIdIsUsed(Citemid)
+                                        * Read above about cases when !ItemIdIsUsed(nextItemid)
                                         * (child item is removed)... Due to the fact that at
                                         * the moment we don't remove unuseful part of
-                                        * update-chain, it's possible to get too old parent
+                                        * update-chain, it's possible to get non-matching parent
                                         * row here. Like as in the case which caused this
                                         * problem, we stop shrinking here. I could try to
                                         * find real parent row but want not to do it because
@@ -2008,8 +2058,8 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                         * and we are too close to 6.5 release. - vadim
                                         * 06/11/99
                                         */
-                                       if (Ptp.t_data->t_infomask & HEAP_XMAX_IS_MULTI ||
-                                               !(TransactionIdEquals(HeapTupleHeaderGetXmax(Ptp.t_data),
+                                       if ((PTdata->t_infomask & HEAP_XMAX_IS_MULTI) ||
+                                               !(TransactionIdEquals(HeapTupleHeaderGetXmax(PTdata),
                                                                         HeapTupleHeaderGetXmin(tp.t_data))))
                                        {
                                                ReleaseBuffer(Pbuf);
@@ -2017,8 +2067,8 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                                chain_move_failed = true;
                                                break;  /* out of check-all-items loop */
                                        }
-                                       tp.t_datamcxt = Ptp.t_datamcxt;
-                                       tp.t_data = Ptp.t_data;
+                                       tp.t_datamcxt = NULL;
+                                       tp.t_data = PTdata;
                                        tlen = tp.t_len = ItemIdGetLength(Pitemid);
                                        if (freeCbuf)
                                                ReleaseBuffer(Cbuf);
@@ -2047,7 +2097,9 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                }
 
                                /*
-                                * Okay, move the whole tuple chain
+                                * Okay, move the whole tuple chain in reverse order.
+                                *
+                                * Ctid tracks the new location of the previously-moved tuple.
                                 */
                                ItemPointerSetInvalid(&Ctid);
                                for (ti = 0; ti < num_vtmove; ti++)
@@ -2077,10 +2129,6 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                        tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
                                        tuple_len = tuple.t_len = ItemIdGetLength(Citemid);
 
-                                       /*
-                                        * make a copy of the source tuple, and then mark the
-                                        * source tuple MOVED_OFF.
-                                        */
                                        move_chain_tuple(onerel, Cbuf, Cpage, &tuple,
                                                                         dst_buffer, dst_page, destvacpage,
                                                                         &ec, &Ctid, vtmove[ti].cleanVpd);
@@ -2143,7 +2191,6 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                        move_plain_tuple(onerel, buf, page, &tuple,
                                                         dst_buffer, dst_page, dst_vacpage, &ec);
 
-
                        num_moved++;
                        if (dst_vacpage->blkno > last_move_dest_block)
                                last_move_dest_block = dst_vacpage->blkno;
@@ -2474,6 +2521,9 @@ move_chain_tuple(Relation rel,
        ItemId          newitemid;
        Size            tuple_len = old_tup->t_len;
 
+       /*
+        * make a modifiable copy of the source tuple.
+        */
        heap_copytuple_with_tuple(old_tup, &newtup);
 
        /*
@@ -2484,6 +2534,9 @@ move_chain_tuple(Relation rel,
        /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
        START_CRIT_SECTION();
 
+       /*
+        * mark the source tuple MOVED_OFF.
+        */
        old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
                                                                         HEAP_XMIN_INVALID |
                                                                         HEAP_MOVED_IN);
@@ -2529,16 +2582,27 @@ move_chain_tuple(Relation rel,
        newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
                                                 InvalidOffsetNumber, LP_USED);
        if (newoff == InvalidOffsetNumber)
-       {
                elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
                         (unsigned long) tuple_len, dst_vacpage->blkno);
-       }
        newitemid = PageGetItemId(dst_page, newoff);
+       /* drop temporary copy, and point to the version on the dest page */
        pfree(newtup.t_data);
        newtup.t_datamcxt = NULL;
        newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
+
        ItemPointerSet(&(newtup.t_self), dst_vacpage->blkno, newoff);
 
+       /*
+        * Set new tuple's t_ctid pointing to itself if last tuple in chain,
+        * and to next tuple in chain otherwise.  (Since we move the chain
+        * in reverse order, this is actually the previously processed tuple.)
+        */
+       if (!ItemPointerIsValid(ctid))
+               newtup.t_data->t_ctid = newtup.t_self;
+       else
+               newtup.t_data->t_ctid = *ctid;
+       *ctid = newtup.t_self;
+
        /* XLOG stuff */
        if (!rel->rd_istemp)
        {
@@ -2563,17 +2627,6 @@ move_chain_tuple(Relation rel,
 
        END_CRIT_SECTION();
 
-       /*
-        * Set new tuple's t_ctid pointing to itself for last tuple in chain,
-        * and to next tuple in chain otherwise.
-        */
-       /* Is this ok after log_heap_move() and END_CRIT_SECTION()? */
-       if (!ItemPointerIsValid(ctid))
-               newtup.t_data->t_ctid = newtup.t_self;
-       else
-               newtup.t_data->t_ctid = *ctid;
-       *ctid = newtup.t_self;
-
        LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
        if (dst_buf != old_buf)
                LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
@@ -2638,12 +2691,10 @@ move_plain_tuple(Relation rel,
        newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
                                                 InvalidOffsetNumber, LP_USED);
        if (newoff == InvalidOffsetNumber)
-       {
                elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
                         (unsigned long) tuple_len,
                         dst_vacpage->blkno, (unsigned long) dst_vacpage->free,
                         dst_vacpage->offsets_used, dst_vacpage->offsets_free);
-       }
        newitemid = PageGetItemId(dst_page, newoff);
        pfree(newtup.t_data);
        newtup.t_datamcxt = NULL;
index eb485ad5476a7ba35579bfbaf033ef52b75b7cec..9f5c008fa9f72e3d92fee9750e9b4c437dee736e 100644 (file)
@@ -26,7 +26,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/executor/execMain.c,v 1.253 2005/08/18 21:34:20 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/executor/execMain.c,v 1.254 2005/08/20 00:39:55 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -1165,8 +1165,10 @@ lnext:   ;
                                foreach(l, estate->es_rowMarks)
                                {
                                        execRowMark *erm = lfirst(l);
-                                       Buffer          buffer;
                                        HeapTupleData tuple;
+                                       Buffer          buffer;
+                                       ItemPointerData update_ctid;
+                                       TransactionId update_xmax;
                                        TupleTableSlot *newSlot;
                                        LockTupleMode   lockmode;
                                        HTSU_Result             test;
@@ -1183,15 +1185,17 @@ lnext:  ;
                                        if (isNull)
                                                elog(ERROR, "\"%s\" is NULL", erm->resname);
 
+                                       tuple.t_self = *((ItemPointer) DatumGetPointer(datum));
+
                                        if (estate->es_forUpdate)
                                                lockmode = LockTupleExclusive;
                                        else
                                                lockmode = LockTupleShared;
 
-                                       tuple.t_self = *((ItemPointer) DatumGetPointer(datum));
                                        test = heap_lock_tuple(erm->relation, &tuple, &buffer,
-                                                                                 estate->es_snapshot->curcid,
-                                                                                 lockmode, estate->es_rowNoWait);
+                                                                                  &update_ctid, &update_xmax,
+                                                                                  estate->es_snapshot->curcid,
+                                                                                  lockmode, estate->es_rowNoWait);
                                        ReleaseBuffer(buffer);
                                        switch (test)
                                        {
@@ -1207,11 +1211,15 @@ lnext:  ;
                                                                ereport(ERROR,
                                                                                (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
                                                                                 errmsg("could not serialize access due to concurrent update")));
-                                                       if (!(ItemPointerEquals(&(tuple.t_self),
-                                                                 (ItemPointer) DatumGetPointer(datum))))
+                                                       if (!ItemPointerEquals(&update_ctid,
+                                                                                                  &tuple.t_self))
                                                        {
-                                                               newSlot = EvalPlanQual(estate, erm->rti, &(tuple.t_self));
-                                                               if (!(TupIsNull(newSlot)))
+                                                               /* updated, so look at updated version */
+                                                               newSlot = EvalPlanQual(estate,
+                                                                                                          erm->rti,
+                                                                                                          &update_ctid,
+                                                                                                          update_xmax);
+                                                               if (!TupIsNull(newSlot))
                                                                {
                                                                        slot = newSlot;
                                                                        estate->es_useEvalPlan = true;
@@ -1454,8 +1462,9 @@ ExecDelete(TupleTableSlot *slot,
 {
        ResultRelInfo *resultRelInfo;
        Relation        resultRelationDesc;
-       ItemPointerData ctid;
        HTSU_Result     result;
+       ItemPointerData update_ctid;
+       TransactionId update_xmax;
 
        /*
         * get information on the (current) result relation
@@ -1486,7 +1495,7 @@ ExecDelete(TupleTableSlot *slot,
         */
 ldelete:;
        result = heap_delete(resultRelationDesc, tupleid,
-                                                &ctid,
+                                                &update_ctid, &update_xmax,
                                                 estate->es_snapshot->curcid,
                                                 estate->es_crosscheck_snapshot,
                                                 true /* wait for commit */ );
@@ -1504,14 +1513,17 @@ ldelete:;
                                ereport(ERROR,
                                                (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
                                                 errmsg("could not serialize access due to concurrent update")));
-                       else if (!(ItemPointerEquals(tupleid, &ctid)))
+                       else if (!ItemPointerEquals(tupleid, &update_ctid))
                        {
-                               TupleTableSlot *epqslot = EvalPlanQual(estate,
-                                                          resultRelInfo->ri_RangeTableIndex, &ctid);
+                               TupleTableSlot *epqslot;
 
+                               epqslot = EvalPlanQual(estate,
+                                                                          resultRelInfo->ri_RangeTableIndex,
+                                                                          &update_ctid,
+                                                                          update_xmax);
                                if (!TupIsNull(epqslot))
                                {
-                                       *tupleid = ctid;
+                                       *tupleid = update_ctid;
                                        goto ldelete;
                                }
                        }
@@ -1558,8 +1570,9 @@ ExecUpdate(TupleTableSlot *slot,
        HeapTuple       tuple;
        ResultRelInfo *resultRelInfo;
        Relation        resultRelationDesc;
-       ItemPointerData ctid;
        HTSU_Result     result;
+       ItemPointerData update_ctid;
+       TransactionId update_xmax;
 
        /*
         * abort the operation if not running transactions
@@ -1627,7 +1640,7 @@ lreplace:;
         * referential integrity updates in serializable transactions.
         */
        result = heap_update(resultRelationDesc, tupleid, tuple,
-                                                &ctid,
+                                                &update_ctid, &update_xmax,
                                                 estate->es_snapshot->curcid,
                                                 estate->es_crosscheck_snapshot,
                                                 true /* wait for commit */ );
@@ -1645,14 +1658,17 @@ lreplace:;
                                ereport(ERROR,
                                                (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
                                                 errmsg("could not serialize access due to concurrent update")));
-                       else if (!(ItemPointerEquals(tupleid, &ctid)))
+                       else if (!ItemPointerEquals(tupleid, &update_ctid))
                        {
-                               TupleTableSlot *epqslot = EvalPlanQual(estate,
-                                                          resultRelInfo->ri_RangeTableIndex, &ctid);
+                               TupleTableSlot *epqslot;
 
+                               epqslot = EvalPlanQual(estate,
+                                                                          resultRelInfo->ri_RangeTableIndex,
+                                                                          &update_ctid,
+                                                                          update_xmax);
                                if (!TupIsNull(epqslot))
                                {
-                                       *tupleid = ctid;
+                                       *tupleid = update_ctid;
                                        slot = ExecFilterJunk(estate->es_junkFilter, epqslot);
                                        tuple = ExecMaterializeSlot(slot);
                                        goto lreplace;
@@ -1791,9 +1807,21 @@ ExecConstraints(ResultRelInfo *resultRelInfo,
  * under READ COMMITTED rules.
  *
  * See backend/executor/README for some info about how this works.
+ *
+ *     estate - executor state data
+ *     rti - rangetable index of table containing tuple
+ *     *tid - t_ctid from the outdated tuple (ie, next updated version)
+ *     priorXmax - t_xmax from the outdated tuple
+ *
+ * *tid is also an output parameter: it's modified to hold the TID of the
+ * latest version of the tuple (note this may be changed even on failure)
+ *
+ * Returns a slot containing the new candidate update/delete tuple, or
+ * NULL if we determine we shouldn't process the row.
  */
 TupleTableSlot *
-EvalPlanQual(EState *estate, Index rti, ItemPointer tid)
+EvalPlanQual(EState *estate, Index rti,
+                        ItemPointer tid, TransactionId priorXmax)
 {
        evalPlanQual *epq;
        EState     *epqstate;
@@ -1837,11 +1865,24 @@ EvalPlanQual(EState *estate, Index rti, ItemPointer tid)
        {
                Buffer          buffer;
 
-               if (heap_fetch(relation, SnapshotDirty, &tuple, &buffer, false, NULL))
+               if (heap_fetch(relation, SnapshotDirty, &tuple, &buffer, true, NULL))
                {
-                       TransactionId xwait = SnapshotDirty->xmax;
+                       /*
+                        * If xmin isn't what we're expecting, the slot must have been
+                        * recycled and reused for an unrelated tuple.  This implies
+                        * that the latest version of the row was deleted, so we need
+                        * do nothing.  (Should be safe to examine xmin without getting
+                        * buffer's content lock, since xmin never changes in an existing
+                        * tuple.)
+                        */
+                       if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple.t_data),
+                                                                        priorXmax))
+                       {
+                               ReleaseBuffer(buffer);
+                               return NULL;
+                       }
 
-                       /* xmin should not be dirty... */
+                       /* otherwise xmin should not be dirty... */
                        if (TransactionIdIsValid(SnapshotDirty->xmin))
                                elog(ERROR, "t_xmin is uncommitted in tuple to be updated");
 
@@ -1849,11 +1890,11 @@ EvalPlanQual(EState *estate, Index rti, ItemPointer tid)
                         * If tuple is being updated by other transaction then we have
                         * to wait for its commit/abort.
                         */
-                       if (TransactionIdIsValid(xwait))
+                       if (TransactionIdIsValid(SnapshotDirty->xmax))
                        {
                                ReleaseBuffer(buffer);
-                               XactLockTableWait(xwait);
-                               continue;
+                               XactLockTableWait(SnapshotDirty->xmax);
+                               continue;               /* loop back to repeat heap_fetch */
                        }
 
                        /*
@@ -1865,22 +1906,50 @@ EvalPlanQual(EState *estate, Index rti, ItemPointer tid)
                }
 
                /*
-                * Oops! Invalid tuple. Have to check is it updated or deleted.
-                * Note that it's possible to get invalid SnapshotDirty->tid if
-                * tuple updated by this transaction. Have we to check this ?
+                * If the referenced slot was actually empty, the latest version
+                * of the row must have been deleted, so we need do nothing.
                 */
-               if (ItemPointerIsValid(&(SnapshotDirty->tid)) &&
-                       !(ItemPointerEquals(&(tuple.t_self), &(SnapshotDirty->tid))))
+               if (tuple.t_data == NULL)
                {
-                       /* updated, so look at the updated copy */
-                       tuple.t_self = SnapshotDirty->tid;
-                       continue;
+                       ReleaseBuffer(buffer);
+                       return NULL;
                }
 
                /*
-                * Deleted or updated by this transaction; forget it.
+                * As above, if xmin isn't what we're expecting, do nothing.
                 */
-               return NULL;
+               if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple.t_data),
+                                                                priorXmax))
+               {
+                       ReleaseBuffer(buffer);
+                       return NULL;
+               }
+
+               /*
+                * If we get here, the tuple was found but failed SnapshotDirty.
+                * Assuming the xmin is either a committed xact or our own xact
+                * (as it certainly should be if we're trying to modify the tuple),
+                * this must mean that the row was updated or deleted by either
+                * a committed xact or our own xact.  If it was deleted, we can
+                * ignore it; if it was updated then chain up to the next version
+                * and repeat the whole test.
+                *
+                * As above, it should be safe to examine xmax and t_ctid without
+                * the buffer content lock, because they can't be changing.
+                */
+               if (ItemPointerEquals(&tuple.t_self, &tuple.t_data->t_ctid))
+               {
+                       /* deleted, so forget about it */
+                       ReleaseBuffer(buffer);
+                       return NULL;
+               }
+
+               /* updated, so look at the updated row */
+               tuple.t_self = tuple.t_data->t_ctid;
+               /* updated row should have xmin matching this xmax */
+               priorXmax = HeapTupleHeaderGetXmax(tuple.t_data);
+               ReleaseBuffer(buffer);
+               /* loop back to fetch next in chain */
        }
 
        /*
index 94633249641298025fb43fa5f14388e0d099bbab..f8dcf43b64d7ff28b6077919fb4b10533d9b826a 100644 (file)
@@ -32,7 +32,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/utils/time/tqual.c,v 1.89 2005/05/19 21:35:47 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/utils/time/tqual.c,v 1.90 2005/08/20 00:39:57 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -677,14 +677,15 @@ HeapTupleSatisfiesUpdate(HeapTupleHeader tuple, CommandId curcid,
  * However, we also include the effects of other xacts still in progress.
  *
  * Returns extra information in the global variable SnapshotDirty, namely
- * xids of concurrent xacts that affected the tuple.  Also, the tuple's
- * t_ctid (forward link) is returned if it's being updated.
+ * xids of concurrent xacts that affected the tuple.  SnapshotDirty->xmin
+ * is set to InvalidTransactionId if xmin is either committed good or
+ * committed dead; or to xmin if that transaction is still in progress.
+ * Similarly for SnapshotDirty->xmax.
  */
 bool
 HeapTupleSatisfiesDirty(HeapTupleHeader tuple, Buffer buffer)
 {
        SnapshotDirty->xmin = SnapshotDirty->xmax = InvalidTransactionId;
-       ItemPointerSetInvalid(&(SnapshotDirty->tid));
 
        if (!(tuple->t_infomask & HEAP_XMIN_COMMITTED))
        {
@@ -781,7 +782,6 @@ HeapTupleSatisfiesDirty(HeapTupleHeader tuple, Buffer buffer)
        {
                if (tuple->t_infomask & HEAP_IS_LOCKED)
                        return true;
-               SnapshotDirty->tid = tuple->t_ctid;
                return false;                   /* updated by other */
        }
 
@@ -824,7 +824,6 @@ HeapTupleSatisfiesDirty(HeapTupleHeader tuple, Buffer buffer)
 
        tuple->t_infomask |= HEAP_XMAX_COMMITTED;
        SetBufferCommitInfoNeedsSave(buffer);
-       SnapshotDirty->tid = tuple->t_ctid;
        return false;                           /* updated by other */
 }
 
@@ -1224,10 +1223,13 @@ HeapTupleSatisfiesVacuum(HeapTupleHeader tuple, TransactionId OldestXmin,
                                                        HeapTupleHeaderGetXmax(tuple)))
        {
                /*
-                * inserter also deleted it, so it was never visible to anyone
-                * else
+                * Inserter also deleted it, so it was never visible to anyone
+                * else.  However, we can only remove it early if it's not an
+                * updated tuple; else its parent tuple is linking to it via t_ctid,
+                * and this tuple mustn't go away before the parent does.
                 */
-               return HEAPTUPLE_DEAD;
+               if (!(tuple->t_infomask & HEAP_UPDATED))
+                       return HEAPTUPLE_DEAD;
        }
 
        if (!TransactionIdPrecedes(HeapTupleHeaderGetXmax(tuple), OldestXmin))
index 9c040b0dfd4f11db6cabf3a974cce30dfd548261..3221734a6f6f08c3a21b0894aa6e592c2c1a0689 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/access/heapam.h,v 1.103 2005/08/01 20:31:13 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/access/heapam.h,v 1.104 2005/08/20 00:39:59 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -152,19 +152,23 @@ extern bool heap_release_fetch(Relation relation, Snapshot snapshot,
                                   HeapTuple tuple, Buffer *userbuf, bool keep_buf,
                                   PgStat_Info *pgstat_info);
 
-extern ItemPointer heap_get_latest_tid(Relation relation, Snapshot snapshot,
+extern void heap_get_latest_tid(Relation relation, Snapshot snapshot,
                                        ItemPointer tid);
 extern void setLastTid(const ItemPointer tid);
 
 extern Oid     heap_insert(Relation relation, HeapTuple tup, CommandId cid,
                                                bool use_wal, bool use_fsm);
-extern HTSU_Result heap_delete(Relation relation, ItemPointer tid, ItemPointer ctid,
-                       CommandId cid, Snapshot crosscheck, bool wait);
-extern HTSU_Result heap_update(Relation relation, ItemPointer otid, HeapTuple tup,
-               ItemPointer ctid, CommandId cid, Snapshot crosscheck, bool wait);
-extern HTSU_Result heap_lock_tuple(Relation relation, HeapTuple tup,
-                                Buffer *userbuf, CommandId cid,
-                                LockTupleMode mode, bool nowait);
+extern HTSU_Result heap_delete(Relation relation, ItemPointer tid,
+                                                          ItemPointer ctid, TransactionId *update_xmax,
+                                                          CommandId cid, Snapshot crosscheck, bool wait);
+extern HTSU_Result heap_update(Relation relation, ItemPointer otid,
+                                                          HeapTuple newtup,
+                                                          ItemPointer ctid, TransactionId *update_xmax,
+                                                          CommandId cid, Snapshot crosscheck, bool wait);
+extern HTSU_Result heap_lock_tuple(Relation relation, HeapTuple tuple,
+                                                                  Buffer *buffer, ItemPointer ctid,
+                                                                  TransactionId *update_xmax, CommandId cid,
+                                                                  LockTupleMode mode, bool nowait);
 
 extern Oid     simple_heap_insert(Relation relation, HeapTuple tup);
 extern void simple_heap_delete(Relation relation, ItemPointer tid);
index e394afd31392a1e96fab99e8f333545b57e0cf1e..abc4dce9b95dfa8756284c0a4d8cd25846d3c0f6 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/access/htup.h,v 1.75 2005/06/08 15:50:27 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/access/htup.h,v 1.76 2005/08/20 00:39:59 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
  * However, with the advent of subtransactions, a tuple may need both Xmax
  * and Cmin simultaneously, so this is no longer possible.
  *
+ * A word about t_ctid: whenever a new tuple is stored on disk, its t_ctid
+ * is initialized with its own TID (location).  If the tuple is ever updated,
+ * its t_ctid is changed to point to the replacement version of the tuple.
+ * Thus, a tuple is the latest version of its row iff XMAX is invalid or
+ * t_ctid points to itself (in which case, if XMAX is valid, the tuple is
+ * either locked or deleted).  One can follow the chain of t_ctid links
+ * to find the newest version of the row.  Beware however that VACUUM might
+ * erase the pointed-to (newer) tuple before erasing the pointing (older)
+ * tuple.  Hence, when following a t_ctid link, it is necessary to check
+ * to see if the referenced slot is empty or contains an unrelated tuple.
+ * Check that the referenced tuple has XMIN equal to the referencing tuple's
+ * XMAX to verify that it is actually the descendant version and not an
+ * unrelated tuple stored into a slot recently freed by VACUUM.  If either
+ * check fails, one may assume that there is no live descendant version.
+ *
  * Following the fixed header fields, the nulls bitmap is stored (beginning
  * at t_bits). The bitmap is *not* stored if t_infomask shows that there
  * are no nulls in the tuple.  If an OID field is present (as indicated by
@@ -334,18 +349,29 @@ do { \
 /*
  * HeapTupleData is an in-memory data structure that points to a tuple.
  *
- * This new HeapTuple for version >= 6.5 and this is why it was changed:
+ * There are several ways in which this data structure is used:
+ *
+ * * Pointer to a tuple in a disk buffer: t_data points directly into the
+ *      buffer (which the code had better be holding a pin on, but this is not
+ *      reflected in HeapTupleData itself).  t_datamcxt must be NULL.
+ *
+ * * Pointer to nothing: t_data and t_datamcxt are NULL.  This is used as
+ *      a failure indication in some functions.
+ *
+ * * Part of a palloc'd tuple: the HeapTupleData itself and the tuple
+ *      form a single palloc'd chunk.  t_data points to the memory location
+ *      immediately following the HeapTupleData struct (at offset HEAPTUPLESIZE),
+ *      and t_datamcxt is the containing context.  This is used as the output
+ *      format of heap_form_tuple and related routines.
  *
- * 1. t_len moved off on-disk tuple data - ItemIdData is used to get len;
- * 2. t_ctid above is not self tuple TID now - it may point to
- *       updated version of tuple (required by MVCC);
- * 3. someday someone let tuple to cross block boundaries -
- *       he have to add something below...
+ * * Separately allocated tuple: t_data points to a palloc'd chunk that
+ *      is not adjacent to the HeapTupleData, and t_datamcxt is the context
+ *      containing that chunk.
  *
- * Change for 7.0:
- *       Up to now t_data could be NULL, the memory location directly following
- *       HeapTupleData, or pointing into a buffer. Now, it could also point to
- *       a separate allocation that was done in the t_datamcxt memory context.
+ * t_len should always be valid, except in the pointer-to-nothing case.
+ * t_self and t_tableOid should be valid if the HeapTupleData points to
+ * a disk buffer, or if it represents a copy of a tuple on disk.  They
+ * should be explicitly set invalid in manufactured tuples.
  */
 typedef struct HeapTupleData
 {
index 2e42894788eebdeb2f6bb05311d0659cf9904229..6064ff2f4f23eb7e3b686569418146253114c462 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/executor/executor.h,v 1.118 2005/04/16 20:07:35 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/executor/executor.h,v 1.119 2005/08/20 00:40:13 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -98,7 +98,7 @@ extern bool ExecContextForcesOids(PlanState *planstate, bool *hasoids);
 extern void ExecConstraints(ResultRelInfo *resultRelInfo,
                                TupleTableSlot *slot, EState *estate);
 extern TupleTableSlot *EvalPlanQual(EState *estate, Index rti,
-                        ItemPointer tid);
+                                                                       ItemPointer tid, TransactionId priorXmax);
 
 /*
  * prototypes from functions in execProcnode.c
index f12ae2233fd29c8b93ad9086270919b25a73fbfd..fa530ed977cc1843485f8af5b3ceaaaa796bbc7d 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/utils/tqual.h,v 1.57 2005/05/19 21:35:48 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/utils/tqual.h,v 1.58 2005/08/20 00:40:32 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -42,7 +42,6 @@ typedef struct SnapshotData
        TransactionId *xip;                     /* array of xact IDs in progress */
        /* note: all ids in xip[] satisfy xmin <= xip[i] < xmax */
        CommandId       curcid;                 /* in my xact, CID < curcid are visible */
-       ItemPointerData tid;            /* required for Dirty snapshot -:( */
 } SnapshotData;
 
 typedef SnapshotData *Snapshot;