]> granicus.if.org Git - postgresql/commitdiff
vacuum.c refactoring
authorBruce Momjian <bruce@momjian.us>
Tue, 8 Jun 2004 13:59:36 +0000 (13:59 +0000)
committerBruce Momjian <bruce@momjian.us>
Tue, 8 Jun 2004 13:59:36 +0000 (13:59 +0000)
   . rename variables
     . cur_buffer -> dst_buffer
     . ToPage -> dst_page
     . cur_page -> dst_vacpage
   . move variable declarations into block where variable is used
   . various Asserts instead of elog(ERROR, ...)
   . extract functionality from repair_frag() into new routines
     . move_chain_tuple()
     . move_plain_tuple()
     . update_hint_bits()
   . create type ExecContext
   . add comments

Manfred Koizar

src/backend/commands/vacuum.c

index 68164a9a80c0ceb2bc6de30be9698604692784a4..80a021487f9d108e157f61a786b2cee272de5312 100644 (file)
@@ -13,7 +13,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.280 2004/06/05 19:48:07 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.281 2004/06/08 13:59:36 momjian Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -99,6 +99,64 @@ typedef struct VRelStats
        VTupleLink      vtlinks;
 } VRelStats;
 
+/*----------------------------------------------------------------------
+ * ExecContext:
+ *
+ * As these variables always appear together, we put them into one struct
+ * and pull initialization and cleanup into separate routines.
+ * ExecContext is used by repair_frag() and move_xxx_tuple().  More
+ * accurately:  It is *used* only in move_xxx_tuple(), but because this
+ * routine is called many times, we initialize the struct just once in
+ * repair_frag() and pass it on to move_xxx_tuple().
+ */
+typedef struct ExecContextData
+{
+       ResultRelInfo *resultRelInfo;
+       EState     *estate;
+       TupleTable      tupleTable;
+       TupleTableSlot *slot;
+} ExecContextData;
+typedef ExecContextData *ExecContext;
+
+static void
+ExecContext_Init(ExecContext ec, Relation rel)
+{
+       TupleDesc       tupdesc = RelationGetDescr(rel);
+
+       /*
+        * We need a ResultRelInfo and an EState so we can use the regular
+        * executor's index-entry-making machinery.
+        */
+       ec->estate = CreateExecutorState();
+
+       ec->resultRelInfo = makeNode(ResultRelInfo);
+       ec->resultRelInfo->ri_RangeTableIndex = 1;              /* dummy */
+       ec->resultRelInfo->ri_RelationDesc = rel;
+       ec->resultRelInfo->ri_TrigDesc = NULL;  /* we don't fire triggers */
+
+       ExecOpenIndices(ec->resultRelInfo);
+
+       ec->estate->es_result_relations = ec->resultRelInfo;
+       ec->estate->es_num_result_relations = 1;
+       ec->estate->es_result_relation_info = ec->resultRelInfo;
+
+       /* Set up a dummy tuple table too */
+       ec->tupleTable = ExecCreateTupleTable(1);
+       ec->slot = ExecAllocTableSlot(ec->tupleTable);
+       ExecSetSlotDescriptor(ec->slot, tupdesc, false);
+}
+
+static void
+ExecContext_Finish(ExecContext ec)
+{
+       ExecDropTupleTable(ec->tupleTable, true);
+       ExecCloseIndices(ec->resultRelInfo);
+       FreeExecutorState(ec->estate);
+}
+/*
+ * End of ExecContext Implementation
+ *----------------------------------------------------------------------
+ */
 
 static MemoryContext vac_context = NULL;
 
@@ -122,6 +180,17 @@ static void scan_heap(VRelStats *vacrelstats, Relation onerel,
 static void repair_frag(VRelStats *vacrelstats, Relation onerel,
                        VacPageList vacuum_pages, VacPageList fraged_pages,
                        int nindexes, Relation *Irel);
+static void move_chain_tuple(Relation rel,
+                                        Buffer old_buf, Page old_page, HeapTuple old_tup,
+                                        Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
+                                        ExecContext ec, ItemPointer ctid, bool cleanVpd);
+static void move_plain_tuple(Relation rel,
+                                        Buffer old_buf, Page old_page, HeapTuple old_tup,
+                                        Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
+                                        ExecContext ec);
+static void update_hint_bits(Relation rel, VacPageList fraged_pages,
+                                       int num_fraged_pages, BlockNumber last_move_dest_block,
+                                       int num_moved);
 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
                        VacPageList vacpagelist);
 static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
@@ -675,7 +744,7 @@ vac_update_dbstats(Oid dbid,
 static void
 vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
 {
-       TransactionId myXID;
+       TransactionId myXID = GetCurrentTransactionId();
        Relation        relation;
        HeapScanDesc scan;
        HeapTuple       tuple;
@@ -683,7 +752,6 @@ vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
        bool            vacuumAlreadyWrapped = false;
        bool            frozenAlreadyWrapped = false;
 
-       myXID = GetCurrentTransactionId();
 
        relation = heap_openr(DatabaseRelationName, AccessShareLock);
 
@@ -1059,17 +1127,9 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
 {
        BlockNumber nblocks,
                                blkno;
-       ItemId          itemid;
-       Buffer          buf;
        HeapTupleData tuple;
-       OffsetNumber offnum,
-                               maxoff;
-       bool            pgchanged,
-                               tupgone,
-                               notup;
        char       *relname;
-       VacPage         vacpage,
-                               vacpagecopy;
+       VacPage         vacpage;
        BlockNumber empty_pages,
                                empty_end_pages;
        double          num_tuples,
@@ -1080,7 +1140,6 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
                                usable_free_space;
        Size            min_tlen = MaxTupleSize;
        Size            max_tlen = 0;
-       int                     i;
        bool            do_shrinking = true;
        VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
        int                     num_vtlinks = 0;
@@ -1113,6 +1172,11 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
                                        tempPage = NULL;
                bool            do_reap,
                                        do_frag;
+               Buffer          buf;
+               OffsetNumber offnum,
+                                       maxoff;
+               bool            pgchanged,
+                                       notup;
 
                vacuum_delay_point();
 
@@ -1125,6 +1189,8 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
 
                if (PageIsNew(page))
                {
+                       VacPage vacpagecopy;
+
                        ereport(WARNING,
                        (errmsg("relation \"%s\" page %u is uninitialized --- fixing",
                                        relname, blkno)));
@@ -1142,6 +1208,8 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
 
                if (PageIsEmpty(page))
                {
+                       VacPage vacpagecopy;
+
                        vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
                        free_space += vacpage->free;
                        empty_pages++;
@@ -1161,8 +1229,8 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
                         offnum = OffsetNumberNext(offnum))
                {
                        uint16          sv_infomask;
-
-                       itemid = PageGetItemId(page, offnum);
+                       ItemId          itemid = PageGetItemId(page, offnum);
+                       bool            tupgone = false;
 
                        /*
                         * Collect un-used items too - it's possible to have indexes
@@ -1180,7 +1248,6 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
                        tuple.t_len = ItemIdGetLength(itemid);
                        ItemPointerSet(&(tuple.t_self), blkno, offnum);
 
-                       tupgone = false;
                        sv_infomask = tuple.t_data->t_infomask;
 
                        switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin))
@@ -1269,7 +1336,8 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
                                        do_shrinking = false;
                                        break;
                                default:
-                                       elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
+                                       /* unexpected HeapTupleSatisfiesVacuum result */
+                                       Assert(false);
                                        break;
                        }
 
@@ -1344,7 +1412,7 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
 
                if (do_reap || do_frag)
                {
-                       vacpagecopy = copy_vac_page(vacpage);
+                       VacPage vacpagecopy = copy_vac_page(vacpage);
                        if (do_reap)
                                vpage_insert(vacuum_pages, vacpagecopy);
                        if (do_frag)
@@ -1390,6 +1458,8 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
         */
        if (do_shrinking)
        {
+               int                     i;
+
                Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
                fraged_pages->num_pages -= empty_end_pages;
                usable_free_space = 0;
@@ -1453,76 +1523,29 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                        VacPageList vacuum_pages, VacPageList fraged_pages,
                        int nindexes, Relation *Irel)
 {
-       TransactionId myXID;
-       CommandId       myCID;
-       Buffer          buf,
-                               cur_buffer;
+       TransactionId myXID = GetCurrentTransactionId();
+       Buffer          dst_buffer = InvalidBuffer;
        BlockNumber nblocks,
                                blkno;
        BlockNumber last_move_dest_block = 0,
                                last_vacuum_block;
-       Page            page,
-                               ToPage = NULL;
-       OffsetNumber offnum,
-                               maxoff,
-                               newoff,
-                               max_offset;
-       ItemId          itemid,
-                               newitemid;
-       HeapTupleData tuple,
-                               newtup;
-       TupleDesc       tupdesc;
-       ResultRelInfo *resultRelInfo;
-       EState     *estate;
-       TupleTable      tupleTable;
-       TupleTableSlot *slot;
+       Page            dst_page = NULL;
+       ExecContextData ec;
        VacPageListData Nvacpagelist;
-       VacPage         cur_page = NULL,
+       VacPage         dst_vacpage = NULL,
                                last_vacuum_page,
                                vacpage,
                           *curpage;
-       int                     cur_item = 0;
        int                     i;
-       Size            tuple_len;
-       int                     num_moved,
+       int                     num_moved = 0,
                                num_fraged_pages,
                                vacuumed_pages;
-       int                     checked_moved,
-                               num_tuples,
-                               keep_tuples = 0;
-       bool            isempty,
-                               dowrite,
-                               chain_tuple_moved;
+       int                     keep_tuples = 0;
        VacRUsage       ru0;
 
        vac_init_rusage(&ru0);
 
-       myXID = GetCurrentTransactionId();
-       myCID = GetCurrentCommandId();
-
-       tupdesc = RelationGetDescr(onerel);
-
-       /*
-        * We need a ResultRelInfo and an EState so we can use the regular
-        * executor's index-entry-making machinery.
-        */
-       estate = CreateExecutorState();
-
-       resultRelInfo = makeNode(ResultRelInfo);
-       resultRelInfo->ri_RangeTableIndex = 1;          /* dummy */
-       resultRelInfo->ri_RelationDesc = onerel;
-       resultRelInfo->ri_TrigDesc = NULL;      /* we don't fire triggers */
-
-       ExecOpenIndices(resultRelInfo);
-
-       estate->es_result_relations = resultRelInfo;
-       estate->es_num_result_relations = 1;
-       estate->es_result_relation_info = resultRelInfo;
-
-       /* Set up a dummy tuple table too */
-       tupleTable = ExecCreateTupleTable(1);
-       slot = ExecAllocTableSlot(tupleTable);
-       ExecSetSlotDescriptor(slot, tupdesc, false);
+       ExecContext_Init(&ec, onerel);
 
        Nvacpagelist.num_pages = 0;
        num_fraged_pages = fraged_pages->num_pages;
@@ -1539,8 +1562,6 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                last_vacuum_page = NULL;
                last_vacuum_block = InvalidBlockNumber;
        }
-       cur_buffer = InvalidBuffer;
-       num_moved = 0;
 
        vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
        vacpage->offsets_used = vacpage->offsets_free = 0;
@@ -1560,6 +1581,14 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                 blkno > last_move_dest_block;
                 blkno--)
        {
+               Buffer                  buf;
+               Page                    page;
+               OffsetNumber    offnum,
+                                               maxoff;
+               bool                    isempty,
+                                               dowrite,
+                                               chain_tuple_moved;
+
                vacuum_delay_point();
 
                /*
@@ -1635,7 +1664,9 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                         offnum <= maxoff;
                         offnum = OffsetNumberNext(offnum))
                {
-                       itemid = PageGetItemId(page, offnum);
+                       Size                    tuple_len;
+                       HeapTupleData   tuple;
+                       ItemId                  itemid = PageGetItemId(page, offnum);
 
                        if (!ItemIdIsUsed(itemid))
                                continue;
@@ -1645,45 +1676,71 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                        tuple_len = tuple.t_len = ItemIdGetLength(itemid);
                        ItemPointerSet(&(tuple.t_self), blkno, offnum);
 
+                       /*
+                        * VACUUM FULL has an exclusive lock on the relation.  So
+                        * normally no other transaction can have pending INSERTs or
+                        * DELETEs in this relation.  A tuple is either
+                        *   (a) a tuple in a system catalog, inserted or deleted by
+                        *       a not yet committed transaction or
+                        *   (b) dead (XMIN_INVALID or XMAX_COMMITTED) or
+                        *   (c) inserted by a committed xact (XMIN_COMMITTED) or
+                        *   (d) moved by the currently running VACUUM.
+                        * In case (a) we wouldn't be in repair_frag() at all.
+                        * In case (b) we cannot be here, because scan_heap() has
+                        * already marked the item as unused, see continue above.
+                        * Case (c) is what normally is to be expected.
+                        * Case (d) is only possible, if a whole tuple chain has been
+                        * moved while processing this or a higher numbered block.
+                        */
                        if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
                        {
-                               if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
-                                       elog(ERROR, "HEAP_MOVED_IN was not expected");
+                               /*
+                                * There cannot be another concurrently running VACUUM.  If
+                                * the tuple had been moved in by a previous VACUUM, the
+                                * visibility check would have set XMIN_COMMITTED.  If the
+                                * tuple had been moved in by the currently running VACUUM,
+                                * the loop would have been terminated.  We had
+                                * elog(ERROR, ...) here, but as we are testing for a
+                                * can't-happen condition, Assert() seems more appropriate.
+                                */
+                               Assert(!(tuple.t_data->t_infomask & HEAP_MOVED_IN));
 
                                /*
                                 * If this (chain) tuple is moved by me already then I
                                 * have to check is it in vacpage or not - i.e. is it
                                 * moved while cleaning this page or some previous one.
                                 */
-                               if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
-                               {
-                                       if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
-                                               elog(ERROR, "invalid XVAC in tuple header");
-                                       if (keep_tuples == 0)
-                                               continue;
-                                       if (chain_tuple_moved)          /* some chains was moved
-                                                                                                * while */
-                                       {                       /* cleaning this page */
-                                               Assert(vacpage->offsets_free > 0);
-                                               for (i = 0; i < vacpage->offsets_free; i++)
-                                               {
-                                                       if (vacpage->offsets[i] == offnum)
-                                                               break;
-                                               }
-                                               if (i >= vacpage->offsets_free) /* not found */
-                                               {
-                                                       vacpage->offsets[vacpage->offsets_free++] = offnum;
-                                                       keep_tuples--;
-                                               }
+                               Assert(tuple.t_data->t_infomask & HEAP_MOVED_OFF);
+                               /*
+                                * MOVED_OFF by another VACUUM would have caused the
+                                * visibility check to set XMIN_COMMITTED or XMIN_INVALID.
+                                */
+                               Assert(HeapTupleHeaderGetXvac(tuple.t_data) == myXID);
+
+                               /* Can't we Assert(keep_tuples > 0) here? */
+                               if (keep_tuples == 0)
+                                       continue;
+                               if (chain_tuple_moved)          /* some chains was moved
+                                                                                        * while */
+                               {                       /* cleaning this page */
+                                       Assert(vacpage->offsets_free > 0);
+                                       for (i = 0; i < vacpage->offsets_free; i++)
+                                       {
+                                               if (vacpage->offsets[i] == offnum)
+                                                       break;
                                        }
-                                       else
+                                       if (i >= vacpage->offsets_free) /* not found */
                                        {
                                                vacpage->offsets[vacpage->offsets_free++] = offnum;
                                                keep_tuples--;
                                        }
-                                       continue;
                                }
-                               elog(ERROR, "HEAP_MOVED_OFF was expected");
+                               else
+                               {
+                                       vacpage->offsets[vacpage->offsets_free++] = offnum;
+                                       keep_tuples--;
+                               }
+                               continue;
                        }
 
                        /*
@@ -1716,8 +1773,6 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                Buffer          Cbuf = buf;
                                bool            freeCbuf = false;
                                bool            chain_move_failed = false;
-                               Page            Cpage;
-                               ItemId          Citemid;
                                ItemPointerData Ctid;
                                HeapTupleData tp = tuple;
                                Size            tlen = tuple_len;
@@ -1728,10 +1783,10 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                int                     to_item = 0;
                                int                     ti;
 
-                               if (cur_buffer != InvalidBuffer)
+                               if (dst_buffer != InvalidBuffer)
                                {
-                                       WriteBuffer(cur_buffer);
-                                       cur_buffer = InvalidBuffer;
+                                       WriteBuffer(dst_buffer);
+                                       dst_buffer = InvalidBuffer;
                                }
 
                                /* Quick exit if we have no vtlinks to search in */
@@ -1754,6 +1809,10 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                           !(ItemPointerEquals(&(tp.t_self),
                                                                                   &(tp.t_data->t_ctid))))
                                {
+                                       Page            Cpage;
+                                       ItemId          Citemid;
+                                       ItemPointerData Ctid;
+
                                        Ctid = tp.t_data->t_ctid;
                                        if (freeCbuf)
                                                ReleaseBuffer(Cbuf);
@@ -1929,12 +1988,14 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                }
 
                                /*
-                                * Okay, move the whle tuple chain
+                                * Okay, move the whole tuple chain
                                 */
                                ItemPointerSetInvalid(&Ctid);
                                for (ti = 0; ti < num_vtmove; ti++)
                                {
                                        VacPage         destvacpage = vtmove[ti].vacpage;
+                                       Page            Cpage;
+                                       ItemId          Citemid;
 
                                        /* Get page to move from */
                                        tuple.t_self = vtmove[ti].tid;
@@ -1942,13 +2003,13 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                                         ItemPointerGetBlockNumber(&(tuple.t_self)));
 
                                        /* Get page to move to */
-                                       cur_buffer = ReadBuffer(onerel, destvacpage->blkno);
+                                       dst_buffer = ReadBuffer(onerel, destvacpage->blkno);
 
-                                       LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
-                                       if (cur_buffer != Cbuf)
+                                       LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
+                                       if (dst_buffer != Cbuf)
                                                LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
 
-                                       ToPage = BufferGetPage(cur_buffer);
+                                       dst_page = BufferGetPage(dst_buffer);
                                        Cpage = BufferGetPage(Cbuf);
 
                                        Citemid = PageGetItemId(Cpage,
@@ -1961,120 +2022,14 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                         * make a copy of the source tuple, and then mark the
                                         * source tuple MOVED_OFF.
                                         */
-                                       heap_copytuple_with_tuple(&tuple, &newtup);
-
-                                       /*
-                                        * register invalidation of source tuple in catcaches.
-                                        */
-                                       CacheInvalidateHeapTuple(onerel, &tuple);
-
-                                       /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
-                                       START_CRIT_SECTION();
-
-                                       tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
-                                                                                                 HEAP_XMIN_INVALID |
-                                                                                                 HEAP_MOVED_IN);
-                                       tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
-                                       HeapTupleHeaderSetXvac(tuple.t_data, myXID);
-
-                                       /*
-                                        * If this page was not used before - clean it.
-                                        *
-                                        * NOTE: a nasty bug used to lurk here.  It is possible
-                                        * for the source and destination pages to be the same
-                                        * (since this tuple-chain member can be on a page
-                                        * lower than the one we're currently processing in
-                                        * the outer loop).  If that's true, then after
-                                        * vacuum_page() the source tuple will have been
-                                        * moved, and tuple.t_data will be pointing at
-                                        * garbage.  Therefore we must do everything that uses
-                                        * tuple.t_data BEFORE this step!!
-                                        *
-                                        * This path is different from the other callers of
-                                        * vacuum_page, because we have already incremented
-                                        * the vacpage's offsets_used field to account for the
-                                        * tuple(s) we expect to move onto the page. Therefore
-                                        * vacuum_page's check for offsets_used == 0 is wrong.
-                                        * But since that's a good debugging check for all
-                                        * other callers, we work around it here rather than
-                                        * remove it.
-                                        */
-                                       if (!PageIsEmpty(ToPage) && vtmove[ti].cleanVpd)
-                                       {
-                                               int                     sv_offsets_used = destvacpage->offsets_used;
-
-                                               destvacpage->offsets_used = 0;
-                                               vacuum_page(onerel, cur_buffer, destvacpage);
-                                               destvacpage->offsets_used = sv_offsets_used;
-                                       }
-
-                                       /*
-                                        * Update the state of the copied tuple, and store it
-                                        * on the destination page.
-                                        */
-                                       newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
-                                                                                                  HEAP_XMIN_INVALID |
-                                                                                                  HEAP_MOVED_OFF);
-                                       newtup.t_data->t_infomask |= HEAP_MOVED_IN;
-                                       HeapTupleHeaderSetXvac(newtup.t_data, myXID);
-                                       newoff = PageAddItem(ToPage,
-                                                                                (Item) newtup.t_data,
-                                                                                tuple_len,
-                                                                                InvalidOffsetNumber,
-                                                                                LP_USED);
-                                       if (newoff == InvalidOffsetNumber)
-                                       {
-                                               elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
-                                                 (unsigned long) tuple_len, destvacpage->blkno);
-                                       }
-                                       newitemid = PageGetItemId(ToPage, newoff);
-                                       pfree(newtup.t_data);
-                                       newtup.t_datamcxt = NULL;
-                                       newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
-                                       ItemPointerSet(&(newtup.t_self), destvacpage->blkno, newoff);
-
-                                       /* XLOG stuff */
-                                       if (!onerel->rd_istemp)
-                                       {
-                                               XLogRecPtr      recptr =
-                                               log_heap_move(onerel, Cbuf, tuple.t_self,
-                                                                         cur_buffer, &newtup);
-
-                                               if (Cbuf != cur_buffer)
-                                               {
-                                                       PageSetLSN(Cpage, recptr);
-                                                       PageSetSUI(Cpage, ThisStartUpID);
-                                               }
-                                               PageSetLSN(ToPage, recptr);
-                                               PageSetSUI(ToPage, ThisStartUpID);
-                                       }
-                                       else
-                                       {
-                                               /*
-                                                * No XLOG record, but still need to flag that XID
-                                                * exists on disk
-                                                */
-                                               MyXactMadeTempRelUpdate = true;
-                                       }
-
-                                       END_CRIT_SECTION();
+                                       move_chain_tuple(onerel, Cbuf, Cpage, &tuple,
+                                                                        dst_buffer, dst_page, destvacpage,
+                                                                        &ec, &Ctid, vtmove[ti].cleanVpd);
 
+                                       num_moved++;
                                        if (destvacpage->blkno > last_move_dest_block)
                                                last_move_dest_block = destvacpage->blkno;
 
-                                       /*
-                                        * Set new tuple's t_ctid pointing to itself for last
-                                        * tuple in chain, and to next tuple in chain
-                                        * otherwise.
-                                        */
-                                       if (!ItemPointerIsValid(&Ctid))
-                                               newtup.t_data->t_ctid = newtup.t_self;
-                                       else
-                                               newtup.t_data->t_ctid = Ctid;
-                                       Ctid = newtup.t_self;
-
-                                       num_moved++;
-
                                        /*
                                         * Remember that we moved tuple from the current page
                                         * (corresponding index tuple will be cleaned).
@@ -2085,23 +2040,11 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                        else
                                                keep_tuples++;
 
-                                       LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
-                                       if (cur_buffer != Cbuf)
-                                               LockBuffer(Cbuf, BUFFER_LOCK_UNLOCK);
-
-                                       /* Create index entries for the moved tuple */
-                                       if (resultRelInfo->ri_NumIndices > 0)
-                                       {
-                                               ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
-                                               ExecInsertIndexTuples(slot, &(newtup.t_self),
-                                                                                         estate, true);
-                                       }
-
-                                       WriteBuffer(cur_buffer);
+                                       WriteBuffer(dst_buffer);
                                        WriteBuffer(Cbuf);
                                }                               /* end of move-the-tuple-chain loop */
 
-                               cur_buffer = InvalidBuffer;
+                               dst_buffer = InvalidBuffer;
                                pfree(vtmove);
                                chain_tuple_moved = true;
 
@@ -2110,13 +2053,13 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                        }                                       /* end of is-tuple-in-chain test */
 
                        /* try to find new page for this tuple */
-                       if (cur_buffer == InvalidBuffer ||
-                               !enough_space(cur_page, tuple_len))
+                       if (dst_buffer == InvalidBuffer ||
+                               !enough_space(dst_vacpage, tuple_len))
                        {
-                               if (cur_buffer != InvalidBuffer)
+                               if (dst_buffer != InvalidBuffer)
                                {
-                                       WriteBuffer(cur_buffer);
-                                       cur_buffer = InvalidBuffer;
+                                       WriteBuffer(dst_buffer);
+                                       dst_buffer = InvalidBuffer;
                                }
                                for (i = 0; i < num_fraged_pages; i++)
                                {
@@ -2125,110 +2068,32 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                }
                                if (i == num_fraged_pages)
                                        break;          /* can't move item anywhere */
-                               cur_item = i;
-                               cur_page = fraged_pages->pagedesc[cur_item];
-                               cur_buffer = ReadBuffer(onerel, cur_page->blkno);
-                               LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
-                               ToPage = BufferGetPage(cur_buffer);
+                               dst_vacpage = fraged_pages->pagedesc[i];
+                               dst_buffer = ReadBuffer(onerel, dst_vacpage->blkno);
+                               LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
+                               dst_page = BufferGetPage(dst_buffer);
                                /* if this page was not used before - clean it */
-                               if (!PageIsEmpty(ToPage) && cur_page->offsets_used == 0)
-                                       vacuum_page(onerel, cur_buffer, cur_page);
+                               if (!PageIsEmpty(dst_page) && dst_vacpage->offsets_used == 0)
+                                       vacuum_page(onerel, dst_buffer, dst_vacpage);
                        }
                        else
-                               LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
+                               LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
 
                        LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
 
-                       /* copy tuple */
-                       heap_copytuple_with_tuple(&tuple, &newtup);
+                       move_plain_tuple(onerel, buf, page, &tuple,
+                                                        dst_buffer, dst_page, dst_vacpage, &ec);
 
-                       /*
-                        * register invalidation of source tuple in catcaches.
-                        *
-                        * (Note: we do not need to register the copied tuple, because we
-                        * are not changing the tuple contents and so there cannot be
-                        * any need to flush negative catcache entries.)
-                        */
-                       CacheInvalidateHeapTuple(onerel, &tuple);
-
-                       /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
-                       START_CRIT_SECTION();
 
-                       /*
-                        * Mark new tuple as MOVED_IN by me.
-                        */
-                       newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
-                                                                                  HEAP_XMIN_INVALID |
-                                                                                  HEAP_MOVED_OFF);
-                       newtup.t_data->t_infomask |= HEAP_MOVED_IN;
-                       HeapTupleHeaderSetXvac(newtup.t_data, myXID);
-
-                       /* add tuple to the page */
-                       newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
-                                                                InvalidOffsetNumber, LP_USED);
-                       if (newoff == InvalidOffsetNumber)
-                       {
-                               elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
-                                        (unsigned long) tuple_len,
-                                        cur_page->blkno, (unsigned long) cur_page->free,
-                                        cur_page->offsets_used, cur_page->offsets_free);
-                       }
-                       newitemid = PageGetItemId(ToPage, newoff);
-                       pfree(newtup.t_data);
-                       newtup.t_datamcxt = NULL;
-                       newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
-                       ItemPointerSet(&(newtup.t_data->t_ctid), cur_page->blkno, newoff);
-                       newtup.t_self = newtup.t_data->t_ctid;
+                       num_moved++;
+                       if (dst_vacpage->blkno > last_move_dest_block)
+                               last_move_dest_block = dst_vacpage->blkno;
 
                        /*
-                        * Mark old tuple as MOVED_OFF by me.
+                        * Remember that we moved tuple from the current page
+                        * (corresponding index tuple will be cleaned).
                         */
-                       tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
-                                                                                 HEAP_XMIN_INVALID |
-                                                                                 HEAP_MOVED_IN);
-                       tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
-                       HeapTupleHeaderSetXvac(tuple.t_data, myXID);
-
-                       /* XLOG stuff */
-                       if (!onerel->rd_istemp)
-                       {
-                               XLogRecPtr      recptr =
-                               log_heap_move(onerel, buf, tuple.t_self,
-                                                         cur_buffer, &newtup);
-
-                               PageSetLSN(page, recptr);
-                               PageSetSUI(page, ThisStartUpID);
-                               PageSetLSN(ToPage, recptr);
-                               PageSetSUI(ToPage, ThisStartUpID);
-                       }
-                       else
-                       {
-                               /*
-                                * No XLOG record, but still need to flag that XID exists
-                                * on disk
-                                */
-                               MyXactMadeTempRelUpdate = true;
-                       }
-
-                       END_CRIT_SECTION();
-
-                       cur_page->offsets_used++;
-                       num_moved++;
-                       cur_page->free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower;
-                       if (cur_page->blkno > last_move_dest_block)
-                               last_move_dest_block = cur_page->blkno;
-
                        vacpage->offsets[vacpage->offsets_free++] = offnum;
-
-                       LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
-                       LockBuffer(buf, BUFFER_LOCK_UNLOCK);
-
-                       /* insert index' tuples if needed */
-                       if (resultRelInfo->ri_NumIndices > 0)
-                       {
-                               ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
-                               ExecInsertIndexTuples(slot, &(newtup.t_self), estate, true);
-                       }
                }                                               /* walk along page */
 
                /*
@@ -2249,36 +2114,31 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                 off <= maxoff;
                                 off = OffsetNumberNext(off))
                        {
-                               itemid = PageGetItemId(page, off);
+                               ItemId  itemid = PageGetItemId(page, off);
+                               HeapTupleHeader htup;
+
                                if (!ItemIdIsUsed(itemid))
                                        continue;
-                               tuple.t_datamcxt = NULL;
-                               tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
-                               if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)
+                               htup = (HeapTupleHeader) PageGetItem(page, itemid);
+                               if (htup->t_infomask & HEAP_XMIN_COMMITTED)
                                        continue;
-                               if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
-                                       elog(ERROR, "HEAP_MOVED_IN was not expected");
-                               if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
+                               /*
+                               ** See comments in the walk-along-page loop above, why we
+                               ** have Asserts here instead of if (...) elog(ERROR).
+                               */
+                               Assert(!(htup->t_infomask & HEAP_MOVED_IN));
+                               Assert(htup->t_infomask & HEAP_MOVED_OFF);
+                               Assert(HeapTupleHeaderGetXvac(htup) == myXID);
+                               if (chain_tuple_moved)
                                {
-                                       if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
-                                               elog(ERROR, "invalid XVAC in tuple header");
-                                       /* some chains was moved while */
-                                       if (chain_tuple_moved)
-                                       {                       /* cleaning this page */
-                                               Assert(vacpage->offsets_free > 0);
-                                               for (i = 0; i < vacpage->offsets_free; i++)
-                                               {
-                                                       if (vacpage->offsets[i] == off)
-                                                               break;
-                                               }
-                                               if (i >= vacpage->offsets_free) /* not found */
-                                               {
-                                                       vacpage->offsets[vacpage->offsets_free++] = off;
-                                                       Assert(keep_tuples > 0);
-                                                       keep_tuples--;
-                                               }
+                                       /* some chains was moved while cleaning this page */
+                                       Assert(vacpage->offsets_free > 0);
+                                       for (i = 0; i < vacpage->offsets_free; i++)
+                                       {
+                                               if (vacpage->offsets[i] == off)
+                                                       break;
                                        }
-                                       else
+                                       if (i >= vacpage->offsets_free) /* not found */
                                        {
                                                vacpage->offsets[vacpage->offsets_free++] = off;
                                                Assert(keep_tuples > 0);
@@ -2286,7 +2146,11 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                        }
                                }
                                else
-                                       elog(ERROR, "HEAP_MOVED_OFF was expected");
+                               {
+                                       vacpage->offsets[vacpage->offsets_free++] = off;
+                                       Assert(keep_tuples > 0);
+                                       keep_tuples--;
+                               }
                        }
                }
 
@@ -2312,10 +2176,10 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
 
        blkno++;                                        /* new number of blocks */
 
-       if (cur_buffer != InvalidBuffer)
+       if (dst_buffer != InvalidBuffer)
        {
                Assert(num_moved > 0);
-               WriteBuffer(cur_buffer);
+               WriteBuffer(dst_buffer);
        }
 
        if (num_moved > 0)
@@ -2348,6 +2212,9 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                Assert((*curpage)->blkno < blkno);
                if ((*curpage)->offsets_used == 0)
                {
+                       Buffer          buf;
+                       Page            page;
+
                        /* this page was not used as a move target, so must clean it */
                        buf = ReadBuffer(onerel, (*curpage)->blkno);
                        LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
@@ -2363,62 +2230,10 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
         * Now scan all the pages that we moved tuples onto and update tuple
         * status bits.  This is not really necessary, but will save time for
         * future transactions examining these tuples.
-        *
-        * XXX NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
-        * pages that were move source pages but not move dest pages.  One
-        * also wonders whether it wouldn't be better to skip this step and
-        * let the tuple status updates happen someplace that's not holding an
-        * exclusive lock on the relation.
         */
-       checked_moved = 0;
-       for (i = 0, curpage = fraged_pages->pagedesc;
-                i < num_fraged_pages;
-                i++, curpage++)
-       {
-               vacuum_delay_point();
-
-               Assert((*curpage)->blkno < blkno);
-               if ((*curpage)->blkno > last_move_dest_block)
-                       break;                          /* no need to scan any further */
-               if ((*curpage)->offsets_used == 0)
-                       continue;                       /* this page was never used as a move dest */
-               buf = ReadBuffer(onerel, (*curpage)->blkno);
-               LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
-               page = BufferGetPage(buf);
-               num_tuples = 0;
-               max_offset = PageGetMaxOffsetNumber(page);
-               for (newoff = FirstOffsetNumber;
-                        newoff <= max_offset;
-                        newoff = OffsetNumberNext(newoff))
-               {
-                       itemid = PageGetItemId(page, newoff);
-                       if (!ItemIdIsUsed(itemid))
-                               continue;
-                       tuple.t_datamcxt = NULL;
-                       tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
-                       if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
-                       {
-                               if (!(tuple.t_data->t_infomask & HEAP_MOVED))
-                                       elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
-                               if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
-                                       elog(ERROR, "invalid XVAC in tuple header");
-                               if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
-                               {
-                                       tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
-                                       tuple.t_data->t_infomask &= ~HEAP_MOVED;
-                                       num_tuples++;
-                               }
-                               else
-                                       tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
-                       }
-               }
-               LockBuffer(buf, BUFFER_LOCK_UNLOCK);
-               WriteBuffer(buf);
-               Assert((*curpage)->offsets_used == num_tuples);
-               checked_moved += num_tuples;
-       }
-       Assert(num_moved == checked_moved);
-
+       update_hint_bits(onerel, fraged_pages, num_fraged_pages,
+                                        last_move_dest_block, num_moved);
+  
        /*
         * It'd be cleaner to make this report at the bottom of this routine,
         * but then the rusage would double-count the second pass of index
@@ -2455,6 +2270,13 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                *vpleft = *vpright;
                                *vpright = vpsave;
                        }
+                       /*
+                        * keep_tuples is the number of tuples that have been moved
+                        * off a page during chain moves but not been scanned over
+                        * subsequently.  The tuple ids of these tuples are not
+                        * recorded as free offsets for any VacPage, so they will not
+                        * be cleared from the indexes.
+                        */
                        Assert(keep_tuples >= 0);
                        for (i = 0; i < nindexes; i++)
                                vacuum_index(&Nvacpagelist, Irel[i],
@@ -2465,36 +2287,41 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                if (vacpage->blkno == (blkno - 1) &&
                        vacpage->offsets_free > 0)
                {
-                       OffsetNumber unused[BLCKSZ / sizeof(OffsetNumber)];
-                       int                     uncnt;
+                       Buffer                  buf;
+                       Page                    page;
+                       OffsetNumber    unused[BLCKSZ / sizeof(OffsetNumber)];
+                       OffsetNumber    offnum,
+                                                       maxoff;
+                       int                             uncnt;
+                       int                             num_tuples = 0;
 
                        buf = ReadBuffer(onerel, vacpage->blkno);
                        LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
                        page = BufferGetPage(buf);
-                       num_tuples = 0;
                        maxoff = PageGetMaxOffsetNumber(page);
                        for (offnum = FirstOffsetNumber;
                                 offnum <= maxoff;
                                 offnum = OffsetNumberNext(offnum))
                        {
-                               itemid = PageGetItemId(page, offnum);
+                               ItemId  itemid = PageGetItemId(page, offnum);
+                               HeapTupleHeader htup;
+
                                if (!ItemIdIsUsed(itemid))
                                        continue;
-                               tuple.t_datamcxt = NULL;
-                               tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
+                               htup = (HeapTupleHeader) PageGetItem(page, itemid);
+                               if (htup->t_infomask & HEAP_XMIN_COMMITTED)
+                                       continue;
 
-                               if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
-                               {
-                                       if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
-                                       {
-                                               if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
-                                                       elog(ERROR, "invalid XVAC in tuple header");
-                                               itemid->lp_flags &= ~LP_USED;
-                                               num_tuples++;
-                                       }
-                                       else
-                                               elog(ERROR, "HEAP_MOVED_OFF was expected");
-                               }
+                               /*
+                               ** See comments in the walk-along-page loop above, why we
+                               ** have Asserts here instead of if (...) elog(ERROR).
+                               */
+                               Assert(!(htup->t_infomask & HEAP_MOVED_IN));
+                               Assert(htup->t_infomask & HEAP_MOVED_OFF);
+                               Assert(HeapTupleHeaderGetXvac(htup) == myXID);
+
+                               itemid->lp_flags &= ~LP_USED;
+                               num_tuples++;
 
                        }
                        Assert(vacpage->offsets_free == num_tuples);
@@ -2554,11 +2381,337 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
        if (vacrelstats->vtlinks != NULL)
                pfree(vacrelstats->vtlinks);
 
-       ExecDropTupleTable(tupleTable, true);
+       ExecContext_Finish(&ec);
+}
+
+/*
+ *     move_chain_tuple() -- move one tuple that is part of a tuple chain
+ *
+ *             This routine moves old_tup from old_page to dst_page.
+ *             old_page and dst_page might be the same page.
+ *             On entry old_buf and dst_buf are locked exclusively, both locks (or
+ *             the single lock, if this is a intra-page-move) are released before
+ *             exit.
+ *
+ *             Yes, a routine with ten parameters is ugly, but it's still better
+ *             than having these 120 lines of code in repair_frag() which is
+ *             already too long and almost unreadable.
+ */
+static void
+move_chain_tuple(Relation rel,
+                                Buffer old_buf, Page old_page, HeapTuple old_tup,
+                                Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
+                                ExecContext ec, ItemPointer ctid, bool cleanVpd)
+{
+       TransactionId myXID = GetCurrentTransactionId();
+       HeapTupleData   newtup;
+       OffsetNumber    newoff;
+       ItemId                  newitemid;
+       Size                    tuple_len = old_tup->t_len;
+
+       heap_copytuple_with_tuple(old_tup, &newtup);
+
+       /*
+        * register invalidation of source tuple in catcaches.
+        */
+       CacheInvalidateHeapTuple(rel, old_tup);
+
+       /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
+       START_CRIT_SECTION();
+
+       old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
+                                                                 HEAP_XMIN_INVALID |
+                                                                 HEAP_MOVED_IN);
+       old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
+       HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
+
+       /*
+        * If this page was not used before - clean it.
+        *
+        * NOTE: a nasty bug used to lurk here.  It is possible
+        * for the source and destination pages to be the same
+        * (since this tuple-chain member can be on a page
+        * lower than the one we're currently processing in
+        * the outer loop).  If that's true, then after
+        * vacuum_page() the source tuple will have been
+        * moved, and tuple.t_data will be pointing at
+        * garbage.  Therefore we must do everything that uses
+        * old_tup->t_data BEFORE this step!!
+        *
+        * This path is different from the other callers of
+        * vacuum_page, because we have already incremented
+        * the vacpage's offsets_used field to account for the
+        * tuple(s) we expect to move onto the page. Therefore
+        * vacuum_page's check for offsets_used == 0 is wrong.
+        * But since that's a good debugging check for all
+        * other callers, we work around it here rather than
+        * remove it.
+        */
+       if (!PageIsEmpty(dst_page) && cleanVpd)
+       {
+               int             sv_offsets_used = dst_vacpage->offsets_used;
+
+               dst_vacpage->offsets_used = 0;
+               vacuum_page(rel, dst_buf, dst_vacpage);
+               dst_vacpage->offsets_used = sv_offsets_used;
+       }
+
+       /*
+        * Update the state of the copied tuple, and store it
+        * on the destination page.
+        */
+       newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
+                                                                  HEAP_XMIN_INVALID |
+                                                                  HEAP_MOVED_OFF);
+       newtup.t_data->t_infomask |= HEAP_MOVED_IN;
+       HeapTupleHeaderSetXvac(newtup.t_data, myXID);
+       newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
+                                                InvalidOffsetNumber, LP_USED);
+       if (newoff == InvalidOffsetNumber)
+       {
+               elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
+                 (unsigned long) tuple_len, dst_vacpage->blkno);
+       }
+       newitemid = PageGetItemId(dst_page, newoff);
+       pfree(newtup.t_data);
+       newtup.t_datamcxt = NULL;
+       newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
+       ItemPointerSet(&(newtup.t_self), dst_vacpage->blkno, newoff);
+
+       /* XLOG stuff */
+       if (!rel->rd_istemp)
+       {
+               XLogRecPtr      recptr = log_heap_move(rel, old_buf, old_tup->t_self,
+                                                                                  dst_buf, &newtup);
 
-       ExecCloseIndices(resultRelInfo);
+               if (old_buf != dst_buf)
+               {
+                       PageSetLSN(old_page, recptr);
+                       PageSetSUI(old_page, ThisStartUpID);
+               }
+               PageSetLSN(dst_page, recptr);
+               PageSetSUI(dst_page, ThisStartUpID);
+       }
+       else
+       {
+               /*
+                * No XLOG record, but still need to flag that XID
+                * exists on disk
+                */
+               MyXactMadeTempRelUpdate = true;
+       }
+
+       END_CRIT_SECTION();
+
+       /*
+        * Set new tuple's t_ctid pointing to itself for last
+        * tuple in chain, and to next tuple in chain
+        * otherwise.
+        */
+       /* Is this ok after log_heap_move() and END_CRIT_SECTION()? */
+       if (!ItemPointerIsValid(ctid))
+               newtup.t_data->t_ctid = newtup.t_self;
+       else
+               newtup.t_data->t_ctid = *ctid;
+       *ctid = newtup.t_self;
 
-       FreeExecutorState(estate);
+       LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
+       if (dst_buf != old_buf)
+               LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
+
+       /* Create index entries for the moved tuple */
+       if (ec->resultRelInfo->ri_NumIndices > 0)
+       {
+               ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
+               ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
+       }
+}
+
+/*
+ *     move_plain_tuple() -- move one tuple that is not part of a chain
+ *
+ *             This routine moves old_tup from old_page to dst_page.
+ *             On entry old_buf and dst_buf are locked exclusively, both locks are
+ *             released before exit.
+ *
+ *             Yes, a routine with eight parameters is ugly, but it's still better
+ *             than having these 90 lines of code in repair_frag() which is already
+ *             too long and almost unreadable.
+ */
+static void
+move_plain_tuple(Relation rel,
+                                Buffer old_buf, Page old_page, HeapTuple old_tup,
+                                Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
+                                ExecContext ec)
+{
+       TransactionId myXID = GetCurrentTransactionId();
+       HeapTupleData   newtup;
+       OffsetNumber    newoff;
+       ItemId                  newitemid;
+       Size                    tuple_len = old_tup->t_len;
+
+       /* copy tuple */
+       heap_copytuple_with_tuple(old_tup, &newtup);
+
+       /*
+        * register invalidation of source tuple in catcaches.
+        *
+        * (Note: we do not need to register the copied tuple, because we
+        * are not changing the tuple contents and so there cannot be
+        * any need to flush negative catcache entries.)
+        */
+       CacheInvalidateHeapTuple(rel, old_tup);
+
+       /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
+       START_CRIT_SECTION();
+
+       /*
+        * Mark new tuple as MOVED_IN by me.
+        */
+       newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
+                                                                  HEAP_XMIN_INVALID |
+                                                                  HEAP_MOVED_OFF);
+       newtup.t_data->t_infomask |= HEAP_MOVED_IN;
+       HeapTupleHeaderSetXvac(newtup.t_data, myXID);
+
+       /* add tuple to the page */
+       newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
+                                                InvalidOffsetNumber, LP_USED);
+       if (newoff == InvalidOffsetNumber)
+       {
+               elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
+                        (unsigned long) tuple_len,
+                        dst_vacpage->blkno, (unsigned long) dst_vacpage->free,
+                        dst_vacpage->offsets_used, dst_vacpage->offsets_free);
+       }
+       newitemid = PageGetItemId(dst_page, newoff);
+       pfree(newtup.t_data);
+       newtup.t_datamcxt = NULL;
+       newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
+       ItemPointerSet(&(newtup.t_data->t_ctid), dst_vacpage->blkno, newoff);
+       newtup.t_self = newtup.t_data->t_ctid;
+
+       /*
+        * Mark old tuple as MOVED_OFF by me.
+        */
+       old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
+                                                                 HEAP_XMIN_INVALID |
+                                                                 HEAP_MOVED_IN);
+       old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
+       HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
+
+       /* XLOG stuff */
+       if (!rel->rd_istemp)
+       {
+               XLogRecPtr      recptr = log_heap_move(rel, old_buf, old_tup->t_self,
+                                                                                  dst_buf, &newtup);
+
+               PageSetLSN(old_page, recptr);
+               PageSetSUI(old_page, ThisStartUpID);
+               PageSetLSN(dst_page, recptr);
+               PageSetSUI(dst_page, ThisStartUpID);
+       }
+       else
+       {
+               /*
+                * No XLOG record, but still need to flag that XID exists
+                * on disk
+                */
+               MyXactMadeTempRelUpdate = true;
+       }
+
+       END_CRIT_SECTION();
+
+       dst_vacpage->free = ((PageHeader) dst_page)->pd_upper -
+                                               ((PageHeader) dst_page)->pd_lower;
+       LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
+       LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
+
+       dst_vacpage->offsets_used++;
+
+       /* insert index' tuples if needed */
+       if (ec->resultRelInfo->ri_NumIndices > 0)
+       {
+               ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
+               ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
+       }
+}
+
+/*
+ *     update_hint_bits() -- update hint bits in destination pages
+ *
+ * Scan all the pages that we moved tuples onto and update tuple
+ * status bits.  This is not really necessary, but will save time for
+ * future transactions examining these tuples.
+ *
+ * XXX NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
+ * pages that were move source pages but not move dest pages.  One
+ * also wonders whether it wouldn't be better to skip this step and
+ * let the tuple status updates happen someplace that's not holding an
+ * exclusive lock on the relation.
+ */
+static void
+update_hint_bits(Relation rel, VacPageList fraged_pages, int num_fraged_pages,
+                                BlockNumber last_move_dest_block, int num_moved)
+{
+       int                     checked_moved = 0;
+       int                     i;
+       VacPage    *curpage;
+
+       for (i = 0, curpage = fraged_pages->pagedesc;
+                i < num_fraged_pages;
+                i++, curpage++)
+       {
+               Buffer                  buf;
+               Page                    page;
+               OffsetNumber    max_offset;
+               OffsetNumber    off;
+               int                             num_tuples = 0;
+
+               vacuum_delay_point();
+
+               if ((*curpage)->blkno > last_move_dest_block)
+                       break;                          /* no need to scan any further */
+               if ((*curpage)->offsets_used == 0)
+                       continue;                       /* this page was never used as a move dest */
+               buf = ReadBuffer(rel, (*curpage)->blkno);
+               LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+               page = BufferGetPage(buf);
+               max_offset = PageGetMaxOffsetNumber(page);
+               for (off = FirstOffsetNumber;
+                        off <= max_offset;
+                        off = OffsetNumberNext(off))
+               {
+                       ItemId  itemid = PageGetItemId(page, off);
+                       HeapTupleHeader htup;
+
+                       if (!ItemIdIsUsed(itemid))
+                               continue;
+                       htup = (HeapTupleHeader) PageGetItem(page, itemid);
+                       if (htup->t_infomask & HEAP_XMIN_COMMITTED)
+                               continue;
+                       /*
+                        * See comments in the walk-along-page loop above, why we
+                        * have Asserts here instead of if (...) elog(ERROR).  The
+                        * difference here is that we may see MOVED_IN.
+                        */
+                       Assert(htup->t_infomask & HEAP_MOVED);
+                       Assert(HeapTupleHeaderGetXvac(htup) == GetCurrentTransactionId());
+                       if (htup->t_infomask & HEAP_MOVED_IN)
+                       {
+                               htup->t_infomask |= HEAP_XMIN_COMMITTED;
+                               htup->t_infomask &= ~HEAP_MOVED;
+                               num_tuples++;
+                       }
+                       else
+                               htup->t_infomask |= HEAP_XMIN_INVALID;
+               }
+               LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+               WriteBuffer(buf);
+               Assert((*curpage)->offsets_used == num_tuples);
+               checked_moved += num_tuples;
+       }
+       Assert(num_moved == checked_moved);
 }
 
 /*