]> granicus.if.org Git - postgresql/blobdiff - src/backend/commands/vacuum.c
Repair some REINDEX problems per recent discussions. The relcache is
[postgresql] / src / backend / commands / vacuum.c
index 8ec2bd07fdd3ded8fd0604b93e82c6f92c7f82fe..e626848f12b6bf237d19fdbeef06b4cf35696c2b 100644 (file)
@@ -1,67 +1,51 @@
 /*-------------------------------------------------------------------------
  *
  * vacuum.c
- *       the postgres vacuum cleaner
+ *       The postgres vacuum cleaner.
  *
- * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
+ * This file includes the "full" version of VACUUM, as well as control code
+ * used by all three of full VACUUM, lazy VACUUM, and ANALYZE. See
+ * vacuumlazy.c and analyze.c for the rest of the code for the latter two.
+ *
+ *
+ * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.193 2001/05/18 21:24:18 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.260 2003/09/24 18:54:01 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
-#include <fcntl.h>
 #include <unistd.h>
-#include <time.h>
-#include <sys/time.h>
-#include <sys/types.h>
-#include <sys/file.h>
-#include <sys/stat.h>
-
-#ifndef HAVE_GETRUSAGE
-#include "rusagestub.h"
-#else
-#include <sys/resource.h>
-#endif
 
+#include "access/clog.h"
 #include "access/genam.h"
 #include "access/heapam.h"
 #include "access/xlog.h"
 #include "catalog/catalog.h"
 #include "catalog/catname.h"
-#include "catalog/index.h"
+#include "catalog/namespace.h"
+#include "catalog/pg_database.h"
+#include "catalog/pg_index.h"
 #include "commands/vacuum.h"
+#include "executor/executor.h"
 #include "miscadmin.h"
-#include "nodes/execnodes.h"
+#include "storage/freespace.h"
 #include "storage/sinval.h"
 #include "storage/smgr.h"
-#include "tcop/tcopprot.h"
+#include "tcop/pquery.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
 #include "utils/inval.h"
+#include "utils/lsyscache.h"
 #include "utils/relcache.h"
 #include "utils/syscache.h"
-#include "utils/temprel.h"
-
-extern XLogRecPtr log_heap_clean(Relation reln, Buffer buffer,
-                          char *unused, int unlen);
-extern XLogRecPtr log_heap_move(Relation reln,
-                         Buffer oldbuf, ItemPointerData from,
-                         Buffer newbuf, HeapTuple newtup);
-
+#include "pgstat.h"
 
-typedef struct VRelListData
-{
-       Oid                     vrl_relid;
-       struct VRelListData *vrl_next;
-} VRelListData;
-
-typedef VRelListData *VRelList;
 
 typedef struct VacPageData
 {
@@ -69,14 +53,14 @@ typedef struct VacPageData
        Size            free;                   /* FreeSpace on this Page */
        uint16          offsets_used;   /* Number of OffNums used by vacuum */
        uint16          offsets_free;   /* Number of OffNums free or to be free */
-       OffsetNumber offsets[1];        /* Array of its OffNums */
+       OffsetNumber offsets[1];        /* Array of free OffNums */
 } VacPageData;
 
 typedef VacPageData *VacPage;
 
 typedef struct VacPageListData
 {
-       int                     empty_end_pages;/* Number of "empty" end-pages */
+       BlockNumber empty_end_pages;    /* Number of "empty" end-pages */
        int                     num_pages;              /* Number of pages in pagedesc */
        int                     num_allocated_pages;    /* Number of allocated pages in
                                                                                 * pagedesc */
@@ -104,9 +88,8 @@ typedef VTupleMoveData *VTupleMove;
 
 typedef struct VRelStats
 {
-       Oid                     relid;
-       long            num_pages;
-       long            num_tuples;
+       BlockNumber rel_pages;
+       double          rel_tuples;
        Size            min_tlen;
        Size            max_tlen;
        bool            hasindex;
@@ -114,51 +97,56 @@ typedef struct VRelStats
        VTupleLink      vtlinks;
 } VRelStats;
 
-typedef struct VacRUsage
-{
-       struct timeval  tv;
-       struct rusage   ru;
-} VacRUsage;
 
 static MemoryContext vac_context = NULL;
 
-static int     MESSAGE_LEVEL;          /* message level */
+static int     elevel = -1;
 
-static TransactionId XmaxRecent;
+static TransactionId OldestXmin;
+static TransactionId FreezeLimit;
 
 
 /* non-export function prototypes */
-static void vacuum_init(void);
-static void vacuum_shutdown(void);
-static VRelList getrels(Name VacRelP, const char *stmttype);
-static void vacuum_rel(Oid relid);
+static List *getrels(const RangeVar *vacrel, const char *stmttype);
+static void vac_update_dbstats(Oid dbid,
+                                  TransactionId vacuumXID,
+                                  TransactionId frozenXID);
+static void vac_truncate_clog(TransactionId vacuumXID,
+                                 TransactionId frozenXID);
+static bool vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind);
+static void full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
 static void scan_heap(VRelStats *vacrelstats, Relation onerel,
-                                         VacPageList vacuum_pages, VacPageList fraged_pages);
+                 VacPageList vacuum_pages, VacPageList fraged_pages);
 static void repair_frag(VRelStats *vacrelstats, Relation onerel,
-                                               VacPageList vacuum_pages, VacPageList fraged_pages,
-                                               int nindices, Relation *Irel);
+                       VacPageList vacuum_pages, VacPageList fraged_pages,
+                       int nindexes, Relation *Irel);
 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
-                                               VacPageList vacpagelist);
+                       VacPageList vacpagelist);
 static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
 static void vacuum_index(VacPageList vacpagelist, Relation indrel,
-                                                long num_tuples, int keep_tuples);
-static void scan_index(Relation indrel, long num_tuples);
-static VacPage tid_reaped(ItemPointer itemptr, VacPageList vacpagelist);
-static void reap_page(VacPageList vacpagelist, VacPage vacpage);
+                        double num_tuples, int keep_tuples);
+static void scan_index(Relation indrel, double num_tuples);
+static bool tid_reaped(ItemPointer itemptr, void *state);
+static bool dummy_tid_reaped(ItemPointer itemptr, void *state);
+static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
+                          BlockNumber rel_pages);
+static VacPage copy_vac_page(VacPage vacpage);
 static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
-static void get_indices(Relation relation, int *nindices, Relation **Irel);
-static void close_indices(int nindices, Relation *Irel);
-static IndexInfo **get_index_desc(Relation onerel, int nindices,
-                          Relation *Irel);
 static void *vac_bsearch(const void *key, const void *base,
-                                                size_t nelem, size_t size,
-                                                int (*compar) (const void *, const void *));
+                       size_t nelem, size_t size,
+                       int (*compar) (const void *, const void *));
 static int     vac_cmp_blk(const void *left, const void *right);
 static int     vac_cmp_offno(const void *left, const void *right);
 static int     vac_cmp_vtlinks(const void *left, const void *right);
 static bool enough_space(VacPage vacpage, Size len);
-static void init_rusage(VacRUsage *ru0);
-static char *show_rusage(VacRUsage *ru0);
+
+
+/****************************************************************************
+ *                                                                                                                                                     *
+ *                     Code common to all flavors of VACUUM and ANALYZE                                *
+ *                                                                                                                                                     *
+ ****************************************************************************
+ */
 
 
 /*
@@ -168,10 +156,17 @@ void
 vacuum(VacuumStmt *vacstmt)
 {
        const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
-       NameData        VacRel;
-       Name            VacRelName;
-       VRelList        vrl,
-                               cur;
+       MemoryContext anl_context = NULL;
+       TransactionId initialOldestXmin = InvalidTransactionId;
+       TransactionId initialFreezeLimit = InvalidTransactionId;
+       bool            all_rels;
+       List       *vrl,
+                          *cur;
+
+       if (vacstmt->verbose)
+               elevel = INFO;
+       else
+               elevel = DEBUG2;
 
        /*
         * We cannot run VACUUM inside a user transaction block; if we were
@@ -182,108 +177,183 @@ vacuum(VacuumStmt *vacstmt)
         * user's transaction too, which would certainly not be the desired
         * behavior.
         */
-       if (IsTransactionBlock())
-               elog(ERROR, "%s cannot run inside a BEGIN/END block", stmttype);
+       if (vacstmt->vacuum)
+               PreventTransactionChain((void *) vacstmt, stmttype);
 
-       if (vacstmt->verbose)
-               MESSAGE_LEVEL = NOTICE;
-       else
-               MESSAGE_LEVEL = DEBUG;
+       /*
+        * Send info about dead objects to the statistics collector
+        */
+       if (vacstmt->vacuum)
+               pgstat_vacuum_tabstat();
 
        /*
         * Create special memory context for cross-transaction storage.
         *
-        * Since it is a child of QueryContext, it will go away eventually even
+        * Since it is a child of PortalContext, it will go away eventually even
         * if we suffer an error; there's no need for special abort cleanup
         * logic.
         */
-       vac_context = AllocSetContextCreate(QueryContext,
+       vac_context = AllocSetContextCreate(PortalContext,
                                                                                "Vacuum",
                                                                                ALLOCSET_DEFAULT_MINSIZE,
                                                                                ALLOCSET_DEFAULT_INITSIZE,
                                                                                ALLOCSET_DEFAULT_MAXSIZE);
 
-       /* Convert vacrel, which is just a string, to a Name */
-       if (vacstmt->vacrel)
-       {
-               namestrcpy(&VacRel, vacstmt->vacrel);
-               VacRelName = &VacRel;
-       }
-       else
-               VacRelName = NULL;
+       /*
+        * If we are running only ANALYZE, we don't need per-table
+        * transactions, but we still need a memory context with table
+        * lifetime.
+        */
+       if (vacstmt->analyze && !vacstmt->vacuum)
+               anl_context = AllocSetContextCreate(PortalContext,
+                                                                                       "Analyze",
+                                                                                       ALLOCSET_DEFAULT_MINSIZE,
+                                                                                       ALLOCSET_DEFAULT_INITSIZE,
+                                                                                       ALLOCSET_DEFAULT_MAXSIZE);
+
+       /* Assume we are processing everything unless one table is mentioned */
+       all_rels = (vacstmt->relation == NULL);
 
        /* Build list of relations to process (note this lives in vac_context) */
-       vrl = getrels(VacRelName, stmttype);
+       vrl = getrels(vacstmt->relation, stmttype);
 
        /*
-        * Start up the vacuum cleaner.
+        * Formerly, there was code here to prevent more than one VACUUM from
+        * executing concurrently in the same database.  However, there's no
+        * good reason to prevent that, and manually removing lockfiles after
+        * a vacuum crash was a pain for dbadmins.      So, forget about
+        * lockfiles, and just rely on the locks we grab on each target table
+        * to ensure that there aren't two VACUUMs running on the same table
+        * at the same time.
         */
-       vacuum_init();
 
        /*
-        * Process each selected relation.  We are careful to process
-        * each relation in a separate transaction in order to avoid holding
-        * too many locks at one time.
+        * The strangeness with committing and starting transactions here is
+        * due to wanting to run each table's VACUUM as a separate
+        * transaction, so that we don't hold locks unnecessarily long.  Also,
+        * if we are doing VACUUM ANALYZE, the ANALYZE part runs as a separate
+        * transaction from the VACUUM to further reduce locking.
+        *
+        * vacuum_rel expects to be entered with no transaction active; it will
+        * start and commit its own transaction.  But we are called by an SQL
+        * command, and so we are executing inside a transaction already.  We
+        * commit the transaction started in PostgresMain() here, and start
+        * another one before exiting to match the commit waiting for us back
+        * in PostgresMain().
+        *
+        * In the case of an ANALYZE statement (no vacuum, just analyze) it's
+        * okay to run the whole thing in the outer transaction, and so we
+        * skip transaction start/stop operations.
         */
-       for (cur = vrl; cur != (VRelList) NULL; cur = cur->vrl_next)
+       if (vacstmt->vacuum)
        {
+               if (all_rels)
+               {
+                       /*
+                        * It's a database-wide VACUUM.
+                        *
+                        * Compute the initially applicable OldestXmin and FreezeLimit
+                        * XIDs, so that we can record these values at the end of the
+                        * VACUUM. Note that individual tables may well be processed
+                        * with newer values, but we can guarantee that no
+                        * (non-shared) relations are processed with older ones.
+                        *
+                        * It is okay to record non-shared values in pg_database, even
+                        * though we may vacuum shared relations with older cutoffs,
+                        * because only the minimum of the values present in
+                        * pg_database matters.  We can be sure that shared relations
+                        * have at some time been vacuumed with cutoffs no worse than
+                        * the global minimum; for, if there is a backend in some
+                        * other DB with xmin = OLDXMIN that's determining the cutoff
+                        * with which we vacuum shared relations, it is not possible
+                        * for that database to have a cutoff newer than OLDXMIN
+                        * recorded in pg_database.
+                        */
+                       vacuum_set_xid_limits(vacstmt, false,
+                                                                 &initialOldestXmin,
+                                                                 &initialFreezeLimit);
+               }
+
+               /* matches the StartTransaction in PostgresMain() */
+               CommitTransactionCommand();
+       }
+
+       /*
+        * Loop to process each selected relation.
+        */
+       foreach(cur, vrl)
+       {
+               Oid                     relid = lfirsto(cur);
+
                if (vacstmt->vacuum)
-                       vacuum_rel(cur->vrl_relid);
-               /* analyze separately so locking is minimized */
+               {
+                       if (!vacuum_rel(relid, vacstmt, RELKIND_RELATION))
+                               all_rels = false;               /* forget about updating dbstats */
+               }
                if (vacstmt->analyze)
-                       analyze_rel(cur->vrl_relid, vacstmt);
-       }
+               {
+                       MemoryContext old_context = NULL;
 
-       /* clean up */
-       vacuum_shutdown();
-}
+                       /*
+                        * If we vacuumed, use new transaction for analyze. Otherwise,
+                        * we can use the outer transaction, but we still need to call
+                        * analyze_rel in a memory context that will be cleaned up on
+                        * return (else we leak memory while processing multiple
+                        * tables).
+                        */
+                       if (vacstmt->vacuum)
+                       {
+                               StartTransactionCommand();
+                               SetQuerySnapshot();             /* might be needed for functions
+                                                                                * in indexes */
+                       }
+                       else
+                               old_context = MemoryContextSwitchTo(anl_context);
 
-/*
- *     vacuum_init(), vacuum_shutdown() -- start up and shut down the vacuum cleaner.
- *
- *             Formerly, there was code here to prevent more than one VACUUM from
- *             executing concurrently in the same database.  However, there's no
- *             good reason to prevent that, and manually removing lockfiles after
- *             a vacuum crash was a pain for dbadmins.  So, forget about lockfiles,
- *             and just rely on the exclusive lock we grab on each target table
- *             to ensure that there aren't two VACUUMs running on the same table
- *             at the same time.
- *
- *             The strangeness with committing and starting transactions in the
- *             init and shutdown routines is due to the fact that the vacuum cleaner
- *             is invoked via an SQL command, and so is already executing inside
- *             a transaction.  We need to leave ourselves in a predictable state
- *             on entry and exit to the vacuum cleaner.  We commit the transaction
- *             started in PostgresMain() inside vacuum_init(), and start one in
- *             vacuum_shutdown() to match the commit waiting for us back in
- *             PostgresMain().
- */
-static void
-vacuum_init(void)
-{
-       /* matches the StartTransaction in PostgresMain() */
-       CommitTransactionCommand();
-}
+                       analyze_rel(relid, vacstmt);
 
-static void
-vacuum_shutdown(void)
-{
-       /* on entry, we are not in a transaction */
+                       if (vacstmt->vacuum)
+                               CommitTransactionCommand();
+                       else
+                       {
+                               MemoryContextSwitchTo(old_context);
+                               MemoryContextResetAndDeleteChildren(anl_context);
+                       }
+               }
+       }
 
        /*
-        * Flush the init file that relcache.c uses to save startup time. The
-        * next backend startup will rebuild the init file with up-to-date
-        * information from pg_class.  This lets the optimizer see the stats
-        * that we've collected for certain critical system indexes.  See
-        * relcache.c for more details.
-        *
-        * Ignore any failure to unlink the file, since it might not be there if
-        * no backend has been started since the last vacuum...
+        * Finish up processing.
         */
-       unlink(RELCACHE_INIT_FILENAME);
+       if (vacstmt->vacuum)
+       {
+               /* here, we are not in a transaction */
 
-       /* matches the CommitTransaction in PostgresMain() */
-       StartTransactionCommand();
+               /*
+                * This matches the CommitTransaction waiting for us in
+                * PostgresMain().
+                */
+               StartTransactionCommand();
+
+               /*
+                * If it was a database-wide VACUUM, print FSM usage statistics
+                * (we don't make you be superuser to see these).
+                */
+               if (vacstmt->relation == NULL)
+                       PrintFreeSpaceMapStatistics(elevel);
+
+               /*
+                * If we completed a database-wide VACUUM without skipping any
+                * relations, update the database's pg_database row with info
+                * about the transaction IDs used, and try to truncate pg_clog.
+                */
+               if (all_rels)
+               {
+                       vac_update_dbstats(MyDatabaseId,
+                                                          initialOldestXmin, initialFreezeLimit);
+                       vac_truncate_clog(initialOldestXmin, initialFreezeLimit);
+               }
+       }
 
        /*
         * Clean up working storage --- note we must do this after
@@ -292,102 +362,359 @@ vacuum_shutdown(void)
         */
        MemoryContextDelete(vac_context);
        vac_context = NULL;
+
+       if (anl_context)
+               MemoryContextDelete(anl_context);
 }
 
 /*
- * Build a list of VRelListData nodes for each relation to be processed
+ * Build a list of Oids for each relation to be processed
+ *
+ * The list is built in vac_context so that it will survive across our
+ * per-relation transactions.
  */
-static VRelList
-getrels(Name VacRelP, const char *stmttype)
+static List *
+getrels(const RangeVar *vacrel, const char *stmttype)
 {
-       Relation        rel;
-       TupleDesc       tupdesc;
-       HeapScanDesc scan;
-       HeapTuple       tuple;
-       VRelList        vrl,
-                               cur;
-       Datum           d;
-       char       *rname;
-       char            rkind;
-       bool            n;
-       ScanKeyData key;
-
-       if (VacRelP)
+       List       *vrl = NIL;
+       MemoryContext oldcontext;
+
+       if (vacrel)
        {
+               /* Process specific relation */
+               Oid                     relid;
 
-               /*
-                * we could use the cache here, but it is clearer to use scankeys
-                * for both vacuum cases, bjm 2000/01/19
-                */
-               char       *nontemp_relname;
+               relid = RangeVarGetRelid(vacrel, false);
 
-               /* We must re-map temp table names bjm 2000-04-06 */
-               nontemp_relname = get_temp_rel_by_username(NameStr(*VacRelP));
-               if (nontemp_relname == NULL)
-                       nontemp_relname = NameStr(*VacRelP);
+               /* Make a relation list entry for this guy */
+               oldcontext = MemoryContextSwitchTo(vac_context);
+               vrl = lappendo(vrl, relid);
+               MemoryContextSwitchTo(oldcontext);
+       }
+       else
+       {
+               /* Process all plain relations listed in pg_class */
+               Relation        pgclass;
+               HeapScanDesc scan;
+               HeapTuple       tuple;
+               ScanKeyData key;
 
-               ScanKeyEntryInitialize(&key, 0x0, Anum_pg_class_relname,
-                                                          F_NAMEEQ,
-                                                          PointerGetDatum(nontemp_relname));
+               ScanKeyEntryInitialize(&key, 0x0,
+                                                          Anum_pg_class_relkind,
+                                                          F_CHAREQ,
+                                                          CharGetDatum(RELKIND_RELATION));
+
+               pgclass = heap_openr(RelationRelationName, AccessShareLock);
+
+               scan = heap_beginscan(pgclass, SnapshotNow, 1, &key);
+
+               while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+               {
+                       /* Make a relation list entry for this guy */
+                       oldcontext = MemoryContextSwitchTo(vac_context);
+                       vrl = lappendo(vrl, HeapTupleGetOid(tuple));
+                       MemoryContextSwitchTo(oldcontext);
+               }
+
+               heap_endscan(scan);
+               heap_close(pgclass, AccessShareLock);
+       }
+
+       return vrl;
+}
+
+/*
+ * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
+ */
+void
+vacuum_set_xid_limits(VacuumStmt *vacstmt, bool sharedRel,
+                                         TransactionId *oldestXmin,
+                                         TransactionId *freezeLimit)
+{
+       TransactionId limit;
+
+       *oldestXmin = GetOldestXmin(sharedRel);
+
+       Assert(TransactionIdIsNormal(*oldestXmin));
+
+       if (vacstmt->freeze)
+       {
+               /* FREEZE option: use oldest Xmin as freeze cutoff too */
+               limit = *oldestXmin;
        }
        else
        {
-               /* find all relations listed in pg_class */
-               ScanKeyEntryInitialize(&key, 0x0, Anum_pg_class_relkind,
-                                                          F_CHAREQ, CharGetDatum('r'));
+               /*
+                * Normal case: freeze cutoff is well in the past, to wit, about
+                * halfway to the wrap horizon
+                */
+               limit = GetCurrentTransactionId() - (MaxTransactionId >> 2);
        }
 
-       vrl = cur = (VRelList) NULL;
+       /*
+        * Be careful not to generate a "permanent" XID
+        */
+       if (!TransactionIdIsNormal(limit))
+               limit = FirstNormalTransactionId;
+
+       /*
+        * Ensure sane relationship of limits
+        */
+       if (TransactionIdFollows(limit, *oldestXmin))
+       {
+               ereport(WARNING,
+                               (errmsg("oldest Xmin is far in the past"),
+                                errhint("Close open transactions soon to avoid wraparound problems.")));
+               limit = *oldestXmin;
+       }
+
+       *freezeLimit = limit;
+}
+
+
+/*
+ *     vac_update_relstats() -- update statistics for one relation
+ *
+ *             Update the whole-relation statistics that are kept in its pg_class
+ *             row.  There are additional stats that will be updated if we are
+ *             doing ANALYZE, but we always update these stats.  This routine works
+ *             for both index and heap relation entries in pg_class.
+ *
+ *             We violate no-overwrite semantics here by storing new values for the
+ *             statistics columns directly into the pg_class tuple that's already on
+ *             the page.  The reason for this is that if we updated these tuples in
+ *             the usual way, vacuuming pg_class itself wouldn't work very well ---
+ *             by the time we got done with a vacuum cycle, most of the tuples in
+ *             pg_class would've been obsoleted.  Of course, this only works for
+ *             fixed-size never-null columns, but these are.
+ *
+ *             This routine is shared by full VACUUM, lazy VACUUM, and stand-alone
+ *             ANALYZE.
+ */
+void
+vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
+                                       bool hasindex)
+{
+       Relation        rd;
+       HeapTupleData rtup;
+       HeapTuple       ctup;
+       Form_pg_class pgcform;
+       Buffer          buffer;
+
+       /*
+        * update number of tuples and number of pages in pg_class
+        */
+       rd = heap_openr(RelationRelationName, RowExclusiveLock);
+
+       ctup = SearchSysCache(RELOID,
+                                                 ObjectIdGetDatum(relid),
+                                                 0, 0, 0);
+       if (!HeapTupleIsValid(ctup))
+               elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
+                        relid);
+
+       /* get the buffer cache tuple */
+       rtup.t_self = ctup->t_self;
+       ReleaseSysCache(ctup);
+       if (!heap_fetch(rd, SnapshotNow, &rtup, &buffer, false, NULL))
+               elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
+                        relid);
+
+       /* overwrite the existing statistics in the tuple */
+       pgcform = (Form_pg_class) GETSTRUCT(&rtup);
+       pgcform->relpages = (int32) num_pages;
+       pgcform->reltuples = num_tuples;
+       pgcform->relhasindex = hasindex;
+
+       /*
+        * If we have discovered that there are no indexes, then there's no
+        * primary key either.  This could be done more thoroughly...
+        */
+       if (!hasindex)
+               pgcform->relhaspkey = false;
+
+       /*
+        * Invalidate the tuple in the catcaches; this also arranges to flush
+        * the relation's relcache entry.  (If we fail to commit for some
+        * reason, no flush will occur, but no great harm is done since there
+        * are no noncritical state updates here.)
+        */
+       CacheInvalidateHeapTuple(rd, &rtup);
+
+       /* Write the buffer */
+       WriteBuffer(buffer);
+
+       heap_close(rd, RowExclusiveLock);
+}
+
+
+/*
+ *     vac_update_dbstats() -- update statistics for one database
+ *
+ *             Update the whole-database statistics that are kept in its pg_database
+ *             row.
+ *
+ *             We violate no-overwrite semantics here by storing new values for the
+ *             statistics columns directly into the tuple that's already on the page.
+ *             As with vac_update_relstats, this avoids leaving dead tuples behind
+ *             after a VACUUM; which is good since GetRawDatabaseInfo
+ *             can get confused by finding dead tuples in pg_database.
+ *
+ *             This routine is shared by full and lazy VACUUM.  Note that it is only
+ *             applied after a database-wide VACUUM operation.
+ */
+static void
+vac_update_dbstats(Oid dbid,
+                                  TransactionId vacuumXID,
+                                  TransactionId frozenXID)
+{
+       Relation        relation;
+       ScanKeyData entry[1];
+       HeapScanDesc scan;
+       HeapTuple       tuple;
+       Form_pg_database dbform;
+
+       relation = heap_openr(DatabaseRelationName, RowExclusiveLock);
+
+       /* Must use a heap scan, since there's no syscache for pg_database */
+       ScanKeyEntryInitialize(&entry[0], 0x0,
+                                                  ObjectIdAttributeNumber, F_OIDEQ,
+                                                  ObjectIdGetDatum(dbid));
+
+       scan = heap_beginscan(relation, SnapshotNow, 1, entry);
+
+       tuple = heap_getnext(scan, ForwardScanDirection);
+
+       if (!HeapTupleIsValid(tuple))
+               elog(ERROR, "could not find tuple for database %u", dbid);
 
-       rel = heap_openr(RelationRelationName, AccessShareLock);
-       tupdesc = RelationGetDescr(rel);
+       dbform = (Form_pg_database) GETSTRUCT(tuple);
 
-       scan = heap_beginscan(rel, false, SnapshotNow, 1, &key);
+       /* overwrite the existing statistics in the tuple */
+       dbform->datvacuumxid = vacuumXID;
+       dbform->datfrozenxid = frozenXID;
+
+       /* invalidate the tuple in the cache and write the buffer */
+       CacheInvalidateHeapTuple(relation, tuple);
+       WriteNoReleaseBuffer(scan->rs_cbuf);
+
+       heap_endscan(scan);
+
+       heap_close(relation, RowExclusiveLock);
+}
+
+
+/*
+ *     vac_truncate_clog() -- attempt to truncate the commit log
+ *
+ *             Scan pg_database to determine the system-wide oldest datvacuumxid,
+ *             and use it to truncate the transaction commit log (pg_clog).
+ *             Also generate a warning if the system-wide oldest datfrozenxid
+ *             seems to be in danger of wrapping around.
+ *
+ *             The passed XIDs are simply the ones I just wrote into my pg_database
+ *             entry.  They're used to initialize the "min" calculations.
+ *
+ *             This routine is shared by full and lazy VACUUM.  Note that it is only
+ *             applied after a database-wide VACUUM operation.
+ */
+static void
+vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
+{
+       TransactionId myXID;
+       Relation        relation;
+       HeapScanDesc scan;
+       HeapTuple       tuple;
+       int32           age;
+       bool            vacuumAlreadyWrapped = false;
+       bool            frozenAlreadyWrapped = false;
+
+       myXID = GetCurrentTransactionId();
+
+       relation = heap_openr(DatabaseRelationName, AccessShareLock);
 
-       while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
+       scan = heap_beginscan(relation, SnapshotNow, 0, NULL);
+
+       while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
        {
-               d = heap_getattr(tuple, Anum_pg_class_relname, tupdesc, &n);
-               rname = (char *) DatumGetName(d);
+               Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
 
-               d = heap_getattr(tuple, Anum_pg_class_relkind, tupdesc, &n);
-               rkind = DatumGetChar(d);
+               /* Ignore non-connectable databases (eg, template0) */
+               /* It's assumed that these have been frozen correctly */
+               if (!dbform->datallowconn)
+                       continue;
 
-               if (rkind != RELKIND_RELATION)
+               if (TransactionIdIsNormal(dbform->datvacuumxid))
                {
-                       elog(NOTICE, "%s: can not process indexes, views or special system tables",
-                                stmttype);
-                       continue;
+                       if (TransactionIdPrecedes(myXID, dbform->datvacuumxid))
+                               vacuumAlreadyWrapped = true;
+                       else if (TransactionIdPrecedes(dbform->datvacuumxid, vacuumXID))
+                               vacuumXID = dbform->datvacuumxid;
                }
-
-               /* Make a relation list entry for this guy */
-               if (vrl == (VRelList) NULL)
-                       vrl = cur = (VRelList)
-                               MemoryContextAlloc(vac_context, sizeof(VRelListData));
-               else
+               if (TransactionIdIsNormal(dbform->datfrozenxid))
                {
-                       cur->vrl_next = (VRelList)
-                               MemoryContextAlloc(vac_context, sizeof(VRelListData));
-                       cur = cur->vrl_next;
+                       if (TransactionIdPrecedes(myXID, dbform->datfrozenxid))
+                               frozenAlreadyWrapped = true;
+                       else if (TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
+                               frozenXID = dbform->datfrozenxid;
                }
-
-               cur->vrl_relid = tuple->t_data->t_oid;
-               cur->vrl_next = (VRelList) NULL;
        }
 
        heap_endscan(scan);
-       heap_close(rel, AccessShareLock);
 
-       if (vrl == NULL)
-               elog(NOTICE, "%s: table not found", stmttype);
+       heap_close(relation, AccessShareLock);
 
-       return vrl;
+       /*
+        * Do not truncate CLOG if we seem to have suffered wraparound
+        * already; the computed minimum XID might be bogus.
+        */
+       if (vacuumAlreadyWrapped)
+       {
+               ereport(WARNING,
+                               (errmsg("some databases have not been vacuumed in over 2 billion transactions"),
+                                errdetail("You may have already suffered transaction-wraparound data loss.")));
+               return;
+       }
+
+       /* Truncate CLOG to the oldest vacuumxid */
+       TruncateCLOG(vacuumXID);
+
+       /* Give warning about impending wraparound problems */
+       if (frozenAlreadyWrapped)
+       {
+               ereport(WARNING,
+                               (errmsg("some databases have not been vacuumed in over 1 billion transactions"),
+                                errhint("Better vacuum them soon, or you may have a wraparound failure.")));
+       }
+       else
+       {
+               age = (int32) (myXID - frozenXID);
+               if (age > (int32) ((MaxTransactionId >> 3) * 3))
+                       ereport(WARNING,
+                                       (errmsg("some databases have not been vacuumed in %d transactions",
+                                                       age),
+                                        errhint("Better vacuum them within %d transactions, "
+                                                        "or you may have a wraparound failure.",
+                                                        (int32) (MaxTransactionId >> 1) - age)));
+       }
 }
 
+
+/****************************************************************************
+ *                                                                                                                                                     *
+ *                     Code common to both flavors of VACUUM                                                   *
+ *                                                                                                                                                     *
+ ****************************************************************************
+ */
+
+
 /*
  *     vacuum_rel() -- vacuum one heap relation
  *
- *             This routine vacuums a single heap, cleans out its indices, and
- *             updates its num_pages and num_tuples statistics.
+ *             Returns TRUE if we actually processed the relation (or can ignore it
+ *             for some reason), FALSE if we failed to process it due to permissions
+ *             or other reasons.  (A FALSE result really means that some data
+ *             may have been left unvacuumed, so we can't update XID stats.)
  *
  *             Doing one heap at a time incurs extra overhead, since we need to
  *             check that the heap exists again just before we vacuum it.      The
@@ -397,28 +724,23 @@ getrels(Name VacRelP, const char *stmttype)
  *
  *             At entry and exit, we are not inside a transaction.
  */
-static void
-vacuum_rel(Oid relid)
+static bool
+vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
 {
+       LOCKMODE        lmode;
        Relation        onerel;
        LockRelId       onerelid;
-       VacPageListData vacuum_pages;           /* List of pages to vacuum and/or
-                                                                                * clean indices */
-       VacPageListData fraged_pages;           /* List of pages with space enough
-                                                                                * for re-using */
-       Relation   *Irel;
-       int32           nindices,
-                               i;
-       VRelStats  *vacrelstats;
-       bool            reindex = false;
        Oid                     toast_relid;
+       bool            result;
 
        /* Begin a transaction for vacuuming this relation */
        StartTransactionCommand();
+       SetQuerySnapshot();                     /* might be needed for functions in
+                                                                * indexes */
 
        /*
         * Check for user-requested abort.      Note we want this to be inside a
-        * transaction, so xact.c doesn't issue useless NOTICE.
+        * transaction, so xact.c doesn't issue useless WARNING.
         */
        CHECK_FOR_INTERRUPTS();
 
@@ -431,39 +753,83 @@ vacuum_rel(Oid relid)
                                                          0, 0, 0))
        {
                CommitTransactionCommand();
-               return;
+               return true;                    /* okay 'cause no data there */
        }
 
        /*
-        * Open the class, get an exclusive lock on it, and check permissions.
+        * Determine the type of lock we want --- hard exclusive lock for a
+        * FULL vacuum, but just ShareUpdateExclusiveLock for concurrent
+        * vacuum.      Either way, we can be sure that no other backend is
+        * vacuuming the same table.
+        */
+       lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock;
+
+       /*
+        * Open the class, get an appropriate lock on it, and check
+        * permissions.
+        *
+        * We allow the user to vacuum a table if he is superuser, the table
+        * owner, or the database owner (but in the latter case, only if it's
+        * not a shared relation).      pg_class_ownercheck includes the superuser
+        * case.
         *
-        * Note we choose to treat permissions failure as a NOTICE and keep
+        * Note we choose to treat permissions failure as a WARNING and keep
         * trying to vacuum the rest of the DB --- is this appropriate?
         */
-       onerel = heap_open(relid, AccessExclusiveLock);
+       onerel = relation_open(relid, lmode);
 
-       if (!pg_ownercheck(GetUserId(), RelationGetRelationName(onerel),
-                                          RELNAME))
+       if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
+                 (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
        {
-               elog(NOTICE, "Skipping \"%s\" --- only table owner can VACUUM it",
-                        RelationGetRelationName(onerel));
-               heap_close(onerel, AccessExclusiveLock);
+               ereport(WARNING,
+                               (errmsg("skipping \"%s\" --- only table or database owner can VACUUM it",
+                                               RelationGetRelationName(onerel))));
+               relation_close(onerel, lmode);
                CommitTransactionCommand();
-               return;
+               return false;
        }
 
        /*
-        * Get a session-level exclusive lock too.      This will protect our
-        * exclusive access to the relation across multiple transactions, so
-        * that we can vacuum the relation's TOAST table (if any) secure in
-        * the knowledge that no one is diddling the parent relation.
+        * Check that it's a plain table; we used to do this in getrels() but
+        * seems safer to check after we've locked the relation.
+        */
+       if (onerel->rd_rel->relkind != expected_relkind)
+       {
+               ereport(WARNING,
+                               (errmsg("skipping \"%s\" --- cannot VACUUM indexes, views or special system tables",
+                                               RelationGetRelationName(onerel))));
+               relation_close(onerel, lmode);
+               CommitTransactionCommand();
+               return false;
+       }
+
+       /*
+        * Silently ignore tables that are temp tables of other backends ---
+        * trying to vacuum these will lead to great unhappiness, since their
+        * contents are probably not up-to-date on disk.  (We don't throw a
+        * warning here; it would just lead to chatter during a database-wide
+        * VACUUM.)
+        */
+       if (isOtherTempNamespace(RelationGetNamespace(onerel)))
+       {
+               relation_close(onerel, lmode);
+               CommitTransactionCommand();
+               return true;                    /* assume no long-lived data in temp
+                                                                * tables */
+       }
+
+       /*
+        * Get a session-level lock too. This will protect our access to the
+        * relation across multiple transactions, so that we can vacuum the
+        * relation's TOAST table (if any) secure in the knowledge that no one
+        * is deleting the parent relation.
         *
         * NOTE: this cannot block, even if someone else is waiting for access,
         * because the lock manager knows that both lock requests are from the
         * same process.
         */
        onerelid = onerel->rd_lockInfo.lockRelId;
-       LockRelationForSession(&onerelid, AccessExclusiveLock);
+       LockRelationForSession(&onerelid, lmode);
 
        /*
         * Remember the relation's TOAST relation for later
@@ -471,65 +837,108 @@ vacuum_rel(Oid relid)
        toast_relid = onerel->rd_rel->reltoastrelid;
 
        /*
-        * Set up statistics-gathering machinery.
+        * Do the actual work --- either FULL or "lazy" vacuum
         */
-       vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
-       vacrelstats->relid = relid;
-       vacrelstats->num_pages = 0;
-       vacrelstats->num_tuples = 0;
-       vacrelstats->hasindex = false;
+       if (vacstmt->full)
+               full_vacuum_rel(onerel, vacstmt);
+       else
+               lazy_vacuum_rel(onerel, vacstmt);
 
-       GetXmaxRecent(&XmaxRecent);
+       result = true;                          /* did the vacuum */
 
-       /* scan it */
-       reindex = false;
-       vacuum_pages.num_pages = fraged_pages.num_pages = 0;
-       scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
-       if (IsIgnoringSystemIndexes() &&
-               IsSystemRelationName(RelationGetRelationName(onerel)))
-               reindex = true;
-
-       /* Now open indices */
-       nindices = 0;
-       Irel = (Relation *) NULL;
-       get_indices(onerel, &nindices, &Irel);
-       if (!Irel)
-               reindex = false;
-       else if (!RelationGetForm(onerel)->relhasindex)
-               reindex = true;
-       if (nindices > 0)
-               vacrelstats->hasindex = true;
-       else
-               vacrelstats->hasindex = false;
+       /* all done with this class, but hold lock until commit */
+       relation_close(onerel, NoLock);
+
+       /*
+        * Complete the transaction and free all temporary memory used.
+        */
+       CommitTransactionCommand();
 
-#ifdef NOT_USED
        /*
-        * reindex in VACUUM is dangerous under WAL. ifdef out until it
-        * becomes safe.
+        * If the relation has a secondary toast rel, vacuum that too while we
+        * still hold the session lock on the master table.  Note however that
+        * "analyze" will not get done on the toast table.      This is good,
+        * because the toaster always uses hardcoded index access and
+        * statistics are totally unimportant for toast relations.
         */
-       if (reindex)
+       if (toast_relid != InvalidOid)
        {
-               for (i = 0; i < nindices; i++)
-                       index_close(Irel[i]);
-               Irel = (Relation *) NULL;
-               activate_indexes_of_a_table(relid, false);
+               if (!vacuum_rel(toast_relid, vacstmt, RELKIND_TOASTVALUE))
+                       result = false;         /* failed to vacuum the TOAST table? */
        }
-#endif  /* NOT_USED */
+
+       /*
+        * Now release the session-level lock on the master table.
+        */
+       UnlockRelationForSession(&onerelid, lmode);
+
+       return result;
+}
+
+
+/****************************************************************************
+ *                                                                                                                                                     *
+ *                     Code for VACUUM FULL (only)                                                                             *
+ *                                                                                                                                                     *
+ ****************************************************************************
+ */
+
+
+/*
+ *     full_vacuum_rel() -- perform FULL VACUUM for one heap relation
+ *
+ *             This routine vacuums a single heap, cleans out its indexes, and
+ *             updates its num_pages and num_tuples statistics.
+ *
+ *             At entry, we have already established a transaction and opened
+ *             and locked the relation.
+ */
+static void
+full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
+{
+       VacPageListData vacuum_pages;           /* List of pages to vacuum and/or
+                                                                                * clean indexes */
+       VacPageListData fraged_pages;           /* List of pages with space enough
+                                                                                * for re-using */
+       Relation   *Irel;
+       int                     nindexes,
+                               i;
+       VRelStats  *vacrelstats;
+
+       vacuum_set_xid_limits(vacstmt, onerel->rd_rel->relisshared,
+                                                 &OldestXmin, &FreezeLimit);
+
+       /*
+        * Set up statistics-gathering machinery.
+        */
+       vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
+       vacrelstats->rel_pages = 0;
+       vacrelstats->rel_tuples = 0;
+       vacrelstats->hasindex = false;
+
+       /* scan the heap */
+       vacuum_pages.num_pages = fraged_pages.num_pages = 0;
+       scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
+
+       /* Now open all indexes of the relation */
+       vac_open_indexes(onerel, &nindexes, &Irel);
+       if (nindexes > 0)
+               vacrelstats->hasindex = true;
 
        /* Clean/scan index relation(s) */
        if (Irel != (Relation *) NULL)
        {
                if (vacuum_pages.num_pages > 0)
                {
-                       for (i = 0; i < nindices; i++)
+                       for (i = 0; i < nindexes; i++)
                                vacuum_index(&vacuum_pages, Irel[i],
-                                                        vacrelstats->num_tuples, 0);
+                                                        vacrelstats->rel_tuples, 0);
                }
                else
                {
-                       /* just scan indices to update statistic */
-                       for (i = 0; i < nindices; i++)
-                               scan_index(Irel[i], vacrelstats->num_tuples);
+                       /* just scan indexes to update statistic */
+                       for (i = 0; i < nindexes; i++)
+                               scan_index(Irel[i], vacrelstats->rel_tuples);
                }
        }
 
@@ -537,12 +946,12 @@ vacuum_rel(Oid relid)
        {
                /* Try to shrink heap */
                repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
-                                       nindices, Irel);
+                                       nindexes, Irel);
+               vac_close_indexes(nindexes, Irel);
        }
        else
        {
-               if (Irel != (Relation *) NULL)
-                       close_indices(nindices, Irel);
+               vac_close_indexes(nindexes, Irel);
                if (vacuum_pages.num_pages > 0)
                {
                        /* Clean pages from vacuum_pages list */
@@ -550,60 +959,35 @@ vacuum_rel(Oid relid)
                }
                else
                {
-
                        /*
                         * Flush dirty pages out to disk.  We must do this even if we
                         * didn't do anything else, because we want to ensure that all
                         * tuples have correct on-row commit status on disk (see
                         * bufmgr.c's comments for FlushRelationBuffers()).
                         */
-                       i = FlushRelationBuffers(onerel, vacrelstats->num_pages);
+                       i = FlushRelationBuffers(onerel, vacrelstats->rel_pages);
                        if (i < 0)
-                               elog(ERROR, "VACUUM (vacuum_rel): FlushRelationBuffers returned %d",
-                                        i);
+                               elog(ERROR, "FlushRelationBuffers returned %d", i);
                }
        }
-#ifdef NOT_USED
-       if (reindex)
-               activate_indexes_of_a_table(relid, true);
-#endif  /* NOT_USED */
 
-       /* all done with this class, but hold lock until commit */
-       heap_close(onerel, NoLock);
+       /* update shared free space map with final free space info */
+       vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
 
        /* update statistics in pg_class */
-       vac_update_relstats(vacrelstats->relid, vacrelstats->num_pages,
-                                               vacrelstats->num_tuples, vacrelstats->hasindex);
-
-       /*
-        * Complete the transaction and free all temporary memory used.
-        */
-       CommitTransactionCommand();
-
-       /*
-        * If the relation has a secondary toast one, vacuum that too while we
-        * still hold the session lock on the master table. We don't need to
-        * propagate "analyze" to it, because the toaster always uses
-        * hardcoded index access and statistics are totally unimportant for
-        * toast relations
-        */
-       if (toast_relid != InvalidOid)
-               vacuum_rel(toast_relid);
-
-       /*
-        * Now release the session-level lock on the master table.
-        */
-       UnlockRelationForSession(&onerelid, AccessExclusiveLock);
+       vac_update_relstats(RelationGetRelid(onerel), vacrelstats->rel_pages,
+                                               vacrelstats->rel_tuples, vacrelstats->hasindex);
 }
 
+
 /*
  *     scan_heap() -- scan an open heap relation
  *
- *             This routine sets commit times, constructs vacuum_pages list of
- *             empty/uninitialized pages and pages with dead tuples and
- *             ~LP_USED line pointers, constructs fraged_pages list of pages
- *             appropriate for purposes of shrinking and maintains statistics
- *             on the number of live tuples in a heap.
+ *             This routine sets commit status bits, constructs vacuum_pages (list
+ *             of pages we need to compact free space on and/or clean indexes of
+ *             deleted tuples), constructs fraged_pages (list of pages with free
+ *             space that tuples could be moved into), and calculates statistics
+ *             on the number of live tuples in the heap.
  */
 static void
 scan_heap(VRelStats *vacrelstats, Relation onerel,
@@ -614,68 +998,80 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
        ItemId          itemid;
        Buffer          buf;
        HeapTupleData tuple;
-       Page            page,
-                               tempPage = NULL;
        OffsetNumber offnum,
                                maxoff;
        bool            pgchanged,
                                tupgone,
-                               dobufrel,
                                notup;
        char       *relname;
        VacPage         vacpage,
-                               vp;
-       long            num_tuples;
-       uint32          tups_vacuumed,
-                               nkeep,
-                               nunused,
-                               ncrash,
-                               empty_pages,
-                               new_pages,
-                               changed_pages,
+                               vacpagecopy;
+       BlockNumber empty_pages,
                                empty_end_pages;
-       Size            free_size,
-                               usable_free_size;
+       double          num_tuples,
+                               tups_vacuumed,
+                               nkeep,
+                               nunused;
+       double          free_space,
+                               usable_free_space;
        Size            min_tlen = MaxTupleSize;
        Size            max_tlen = 0;
-       int32           i;
+       int                     i;
        bool            do_shrinking = true;
        VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
        int                     num_vtlinks = 0;
        int                     free_vtlinks = 100;
        VacRUsage       ru0;
 
-       init_rusage(&ru0);
+       vac_init_rusage(&ru0);
 
        relname = RelationGetRelationName(onerel);
-       elog(MESSAGE_LEVEL, "--Relation %s--", relname);
+       ereport(elevel,
+                       (errmsg("vacuuming \"%s.%s\"",
+                                       get_namespace_name(RelationGetNamespace(onerel)),
+                                       relname)));
 
-       tups_vacuumed = num_tuples = nkeep = nunused = ncrash = empty_pages =
-               new_pages = changed_pages = empty_end_pages = 0;
-       free_size = usable_free_size = 0;
+       empty_pages = empty_end_pages = 0;
+       num_tuples = tups_vacuumed = nkeep = nunused = 0;
+       free_space = 0;
 
        nblocks = RelationGetNumberOfBlocks(onerel);
 
+       /*
+        * We initially create each VacPage item in a maximal-sized workspace,
+        * then copy the workspace into a just-large-enough copy.
+        */
        vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
-       vacpage->offsets_used = 0;
 
        for (blkno = 0; blkno < nblocks; blkno++)
        {
+               Page            page,
+                                       tempPage = NULL;
+               bool            do_reap,
+                                       do_frag;
+
+               CHECK_FOR_INTERRUPTS();
+
                buf = ReadBuffer(onerel, blkno);
                page = BufferGetPage(buf);
+
                vacpage->blkno = blkno;
+               vacpage->offsets_used = 0;
                vacpage->offsets_free = 0;
 
                if (PageIsNew(page))
                {
-                       elog(NOTICE, "Rel %s: Uninitialized page %u - fixing",
-                                relname, blkno);
+                       ereport(WARNING,
+                       (errmsg("relation \"%s\" page %u is uninitialized --- fixing",
+                                       relname, blkno)));
                        PageInit(page, BufferGetPageSize(buf), 0);
                        vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
-                       free_size += (vacpage->free - sizeof(ItemIdData));
-                       new_pages++;
+                       free_space += vacpage->free;
+                       empty_pages++;
                        empty_end_pages++;
-                       reap_page(vacuum_pages, vacpage);
+                       vacpagecopy = copy_vac_page(vacpage);
+                       vpage_insert(vacuum_pages, vacpagecopy);
+                       vpage_insert(fraged_pages, vacpagecopy);
                        WriteBuffer(buf);
                        continue;
                }
@@ -683,10 +1079,12 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
                if (PageIsEmpty(page))
                {
                        vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
-                       free_size += (vacpage->free - sizeof(ItemIdData));
+                       free_space += vacpage->free;
                        empty_pages++;
                        empty_end_pages++;
-                       reap_page(vacuum_pages, vacpage);
+                       vacpagecopy = copy_vac_page(vacpage);
+                       vpage_insert(vacuum_pages, vacpagecopy);
+                       vpage_insert(fraged_pages, vacpagecopy);
                        ReleaseBuffer(buf);
                        continue;
                }
@@ -698,16 +1096,18 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
                         offnum <= maxoff;
                         offnum = OffsetNumberNext(offnum))
                {
+                       uint16          sv_infomask;
+
                        itemid = PageGetItemId(page, offnum);
 
                        /*
-                        * Collect un-used items too - it's possible to have indices
+                        * Collect un-used items too - it's possible to have indexes
                         * pointing here after crash.
                         */
                        if (!ItemIdIsUsed(itemid))
                        {
                                vacpage->offsets[vacpage->offsets_free++] = offnum;
-                               nunused++;
+                               nunused += 1;
                                continue;
                        }
 
@@ -715,146 +1115,47 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
                        tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
                        tuple.t_len = ItemIdGetLength(itemid);
                        ItemPointerSet(&(tuple.t_self), blkno, offnum);
+
                        tupgone = false;
+                       sv_infomask = tuple.t_data->t_infomask;
 
-                       if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
+                       switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin))
                        {
-                               if (tuple.t_data->t_infomask & HEAP_XMIN_INVALID)
-                                       tupgone = true;
-                               else if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
-                               {
-                                       if (TransactionIdDidCommit((TransactionId)
-                                                                                          tuple.t_data->t_cmin))
-                                       {
-                                               tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
-                                               pgchanged = true;
-                                               tupgone = true;
-                                       }
-                                       else
-                                       {
-                                               tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
-                                               pgchanged = true;
-                                       }
-                               }
-                               else if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
-                               {
-                                       if (!TransactionIdDidCommit((TransactionId)
-                                                                                               tuple.t_data->t_cmin))
-                                       {
-                                               tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
-                                               pgchanged = true;
-                                               tupgone = true;
-                                       }
-                                       else
-                                       {
-                                               tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
-                                               pgchanged = true;
-                                       }
-                               }
-                               else
-                               {
-                                       if (TransactionIdDidAbort(tuple.t_data->t_xmin))
-                                               tupgone = true;
-                                       else if (TransactionIdDidCommit(tuple.t_data->t_xmin))
-                                       {
-                                               tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
-                                               pgchanged = true;
-                                       }
-                                       else if (!TransactionIdIsInProgress(tuple.t_data->t_xmin))
-                                       {
-                                               /*
-                                                * Not Aborted, Not Committed, Not in Progress -
-                                                * so it's from crashed process. - vadim 11/26/96
-                                                */
-                                               ncrash++;
-                                               tupgone = true;
-                                       }
-                                       else
-                                       {
-                                               elog(NOTICE, "Rel %s: TID %u/%u: InsertTransactionInProgress %u - can't shrink relation",
-                                                  relname, blkno, offnum, tuple.t_data->t_xmin);
-                                               do_shrinking = false;
-                                       }
-                               }
-                       }
+                               case HEAPTUPLE_DEAD:
+                                       tupgone = true;         /* we can delete the tuple */
+                                       break;
+                               case HEAPTUPLE_LIVE:
 
-                       /*
-                        * here we are concerned about tuples with xmin committed and
-                        * xmax unknown or committed
-                        */
-                       if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED &&
-                               !(tuple.t_data->t_infomask & HEAP_XMAX_INVALID))
-                       {
-                               if (tuple.t_data->t_infomask & HEAP_XMAX_COMMITTED)
-                               {
-                                       if (tuple.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
-                                       {
-                                               tuple.t_data->t_infomask |= HEAP_XMAX_INVALID;
-                                               tuple.t_data->t_infomask &=
-                                                       ~(HEAP_XMAX_COMMITTED | HEAP_MARKED_FOR_UPDATE);
-                                               pgchanged = true;
-                                       }
-                                       else
-                                               tupgone = true;
-                               }
-                               else if (TransactionIdDidAbort(tuple.t_data->t_xmax))
-                               {
-                                       tuple.t_data->t_infomask |= HEAP_XMAX_INVALID;
-                                       pgchanged = true;
-                               }
-                               else if (TransactionIdDidCommit(tuple.t_data->t_xmax))
-                               {
-                                       if (tuple.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
-                                       {
-                                               tuple.t_data->t_infomask |= HEAP_XMAX_INVALID;
-                                               tuple.t_data->t_infomask &=
-                                                       ~(HEAP_XMAX_COMMITTED | HEAP_MARKED_FOR_UPDATE);
-                                               pgchanged = true;
-                                       }
-                                       else
-                                               tupgone = true;
-                               }
-                               else if (!TransactionIdIsInProgress(tuple.t_data->t_xmax))
-                               {
                                        /*
-                                        * Not Aborted, Not Committed, Not in Progress - so it
-                                        * from crashed process. - vadim 06/02/97
+                                        * Tuple is good.  Consider whether to replace its
+                                        * xmin value with FrozenTransactionId.
                                         */
-                                       tuple.t_data->t_infomask |= HEAP_XMAX_INVALID;
-                                       tuple.t_data->t_infomask &=
-                                               ~(HEAP_XMAX_COMMITTED | HEAP_MARKED_FOR_UPDATE);
-                                       pgchanged = true;
-                               }
-                               else
-                               {
-                                       elog(NOTICE, "Rel %s: TID %u/%u: DeleteTransactionInProgress %u - can't shrink relation",
-                                                relname, blkno, offnum, tuple.t_data->t_xmax);
-                                       do_shrinking = false;
-                               }
-
-                               /*
-                                * If tuple is recently deleted then we must not remove it
-                                * from relation.
-                                */
-                               if (tupgone &&
-                                       (tuple.t_data->t_infomask & HEAP_XMIN_INVALID) == 0 &&
-                                       tuple.t_data->t_xmax >= XmaxRecent)
-                               {
-                                       tupgone = false;
-                                       nkeep++;
-                                       if (!(tuple.t_data->t_infomask & HEAP_XMAX_COMMITTED))
+                                       if (TransactionIdIsNormal(HeapTupleHeaderGetXmin(tuple.t_data)) &&
+                                               TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
+                                                                                         FreezeLimit))
                                        {
-                                               tuple.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
+                                               HeapTupleHeaderSetXmin(tuple.t_data, FrozenTransactionId);
+                                               /* infomask should be okay already */
+                                               Assert(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED);
                                                pgchanged = true;
                                        }
+                                       break;
+                               case HEAPTUPLE_RECENTLY_DEAD:
+
+                                       /*
+                                        * If tuple is recently deleted then we must not
+                                        * remove it from relation.
+                                        */
+                                       nkeep += 1;
 
                                        /*
                                         * If we do shrinking and this tuple is updated one
                                         * then remember it to construct updated tuple
                                         * dependencies.
                                         */
-                                       if (do_shrinking && !(ItemPointerEquals(&(tuple.t_self),
-                                                                                          &(tuple.t_data->t_ctid))))
+                                       if (do_shrinking &&
+                                               !(ItemPointerEquals(&(tuple.t_self),
+                                                                                       &(tuple.t_data->t_ctid))))
                                        {
                                                if (free_vtlinks == 0)
                                                {
@@ -868,17 +1169,49 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
                                                free_vtlinks--;
                                                num_vtlinks++;
                                        }
-                               }
+                                       break;
+                               case HEAPTUPLE_INSERT_IN_PROGRESS:
+
+                                       /*
+                                        * This should not happen, since we hold exclusive
+                                        * lock on the relation; shouldn't we raise an error?
+                                        * (Actually, it can happen in system catalogs, since
+                                        * we tend to release write lock before commit there.)
+                                        */
+                                       ereport(NOTICE,
+                                                       (errmsg("relation \"%s\" TID %u/%u: InsertTransactionInProgress %u --- can't shrink relation",
+                                                                       relname, blkno, offnum, HeapTupleHeaderGetXmin(tuple.t_data))));
+                                       do_shrinking = false;
+                                       break;
+                               case HEAPTUPLE_DELETE_IN_PROGRESS:
+
+                                       /*
+                                        * This should not happen, since we hold exclusive
+                                        * lock on the relation; shouldn't we raise an error?
+                                        * (Actually, it can happen in system catalogs, since
+                                        * we tend to release write lock before commit there.)
+                                        */
+                                       ereport(NOTICE,
+                                                       (errmsg("relation \"%s\" TID %u/%u: DeleteTransactionInProgress %u --- can't shrink relation",
+                                                                       relname, blkno, offnum, HeapTupleHeaderGetXmax(tuple.t_data))));
+                                       do_shrinking = false;
+                                       break;
+                               default:
+                                       elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
+                                       break;
                        }
 
+                       /* check for hint-bit update by HeapTupleSatisfiesVacuum */
+                       if (sv_infomask != tuple.t_data->t_infomask)
+                               pgchanged = true;
+
                        /*
                         * Other checks...
                         */
-                       if (!OidIsValid(tuple.t_data->t_oid))
-                       {
-                               elog(NOTICE, "Rel %s: TID %u/%u: OID IS INVALID. TUPGONE %d.",
-                                        relname, blkno, offnum, tupgone);
-                       }
+                       if (onerel->rd_rel->relhasoids &&
+                               !OidIsValid(HeapTupleGetOid(&tuple)))
+                               elog(WARNING, "relation \"%s\" TID %u/%u: OID is invalid",
+                                        relname, blkno, offnum);
 
                        if (tupgone)
                        {
@@ -898,64 +1231,86 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
 
                                        pageSize = PageGetPageSize(page);
                                        tempPage = (Page) palloc(pageSize);
-                                       memmove(tempPage, page, pageSize);
+                                       memcpy(tempPage, page, pageSize);
                                }
 
                                /* mark it unused on the temp page */
-                               lpp = &(((PageHeader) tempPage)->pd_linp[offnum - 1]);
+                               lpp = PageGetItemId(tempPage, offnum);
                                lpp->lp_flags &= ~LP_USED;
 
                                vacpage->offsets[vacpage->offsets_free++] = offnum;
-                               tups_vacuumed++;
+                               tups_vacuumed += 1;
                        }
                        else
                        {
-                               num_tuples++;
+                               num_tuples += 1;
                                notup = false;
                                if (tuple.t_len < min_tlen)
                                        min_tlen = tuple.t_len;
                                if (tuple.t_len > max_tlen)
                                        max_tlen = tuple.t_len;
                        }
-               }
-
-               if (pgchanged)
-               {
-                       WriteBuffer(buf);
-                       dobufrel = false;
-                       changed_pages++;
-               }
-               else
-                       dobufrel = true;
+               }                                               /* scan along page */
 
                if (tempPage != (Page) NULL)
-               {                                               /* Some tuples are gone */
+               {
+                       /* Some tuples are removable; figure free space after removal */
                        PageRepairFragmentation(tempPage, NULL);
                        vacpage->free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower;
-                       free_size += vacpage->free;
-                       reap_page(vacuum_pages, vacpage);
                        pfree(tempPage);
-                       tempPage = (Page) NULL;
+                       do_reap = true;
                }
-               else if (vacpage->offsets_free > 0)
-               {                                               /* there are only ~LP_USED line pointers */
+               else
+               {
+                       /* Just use current available space */
                        vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
-                       free_size += vacpage->free;
-                       reap_page(vacuum_pages, vacpage);
+                       /* Need to reap the page if it has ~LP_USED line pointers */
+                       do_reap = (vacpage->offsets_free > 0);
                }
-               if (dobufrel)
-                       ReleaseBuffer(buf);
+
+               free_space += vacpage->free;
+
+               /*
+                * Add the page to fraged_pages if it has a useful amount of free
+                * space.  "Useful" means enough for a minimal-sized tuple. But we
+                * don't know that accurately near the start of the relation, so
+                * add pages unconditionally if they have >= BLCKSZ/10 free space.
+                */
+               do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ / 10);
+
+               if (do_reap || do_frag)
+               {
+                       vacpagecopy = copy_vac_page(vacpage);
+                       if (do_reap)
+                               vpage_insert(vacuum_pages, vacpagecopy);
+                       if (do_frag)
+                               vpage_insert(fraged_pages, vacpagecopy);
+               }
+
+               /*
+                * Include the page in empty_end_pages if it will be empty after
+                * vacuuming; this is to keep us from using it as a move
+                * destination.
+                */
                if (notup)
+               {
+                       empty_pages++;
                        empty_end_pages++;
+               }
                else
                        empty_end_pages = 0;
+
+               if (pgchanged)
+                       WriteBuffer(buf);
+               else
+                       ReleaseBuffer(buf);
        }
 
        pfree(vacpage);
 
        /* save stats in the rel list for use later */
-       vacrelstats->num_tuples = num_tuples;
-       vacrelstats->num_pages = nblocks;
+       vacrelstats->rel_tuples = num_tuples;
+       vacrelstats->rel_pages = nblocks;
        if (num_tuples == 0)
                min_tlen = max_tlen = 0;
        vacrelstats->min_tlen = min_tlen;
@@ -965,29 +1320,26 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
        fraged_pages->empty_end_pages = empty_end_pages;
 
        /*
-        * Try to make fraged_pages keeping in mind that we can't use free
-        * space of "empty" end-pages and last page if it reaped.
+        * Clear the fraged_pages list if we found we couldn't shrink. Else,
+        * remove any "empty" end-pages from the list, and compute usable free
+        * space = free space in remaining pages.
         */
-       if (do_shrinking && vacuum_pages->num_pages - empty_end_pages > 0)
+       if (do_shrinking)
        {
-               int                     nusf;           /* blocks usefull for re-using */
-
-               nusf = vacuum_pages->num_pages - empty_end_pages;
-               if ((vacuum_pages->pagedesc[nusf - 1])->blkno == nblocks - empty_end_pages - 1)
-                       nusf--;
-
-               for (i = 0; i < nusf; i++)
-               {
-                       vp = vacuum_pages->pagedesc[i];
-                       if (enough_space(vp, min_tlen))
-                       {
-                               vpage_insert(fraged_pages, vp);
-                               usable_free_size += vp->free;
-                       }
-               }
+               Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
+               fraged_pages->num_pages -= empty_end_pages;
+               usable_free_space = 0;
+               for (i = 0; i < fraged_pages->num_pages; i++)
+                       usable_free_space += fraged_pages->pagedesc[i]->free;
+       }
+       else
+       {
+               fraged_pages->num_pages = 0;
+               usable_free_space = 0;
        }
 
-       if (usable_free_size > 0 && num_vtlinks > 0)
+       /* don't bother to save vtlinks if we will not call repair_frag */
+       if (fraged_pages->num_pages > 0 && num_vtlinks > 0)
        {
                qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
                          vac_cmp_vtlinks);
@@ -1001,17 +1353,24 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
                pfree(vtlinks);
        }
 
-       elog(MESSAGE_LEVEL, "Pages %u: Changed %u, reaped %u, Empty %u, New %u; \
-Tup %lu: Vac %u, Keep/VTL %u/%u, Crash %u, UnUsed %u, MinLen %lu, MaxLen %lu; \
-Re-using: Free/Avail. Space %lu/%lu; EndEmpty/Avail. Pages %u/%u. %s",
-                nblocks, changed_pages, vacuum_pages->num_pages, empty_pages,
-                new_pages, num_tuples, tups_vacuumed,
-                nkeep, vacrelstats->num_vtlinks, ncrash,
-                nunused, (unsigned long) min_tlen, (unsigned long) max_tlen,
-                (unsigned long) free_size, (unsigned long) usable_free_size,
-                empty_end_pages, fraged_pages->num_pages,
-                show_rusage(&ru0));
-
+       ereport(elevel,
+                       (errmsg("\"%s\": found %.0f removable, %.0f nonremovable tuples in %u pages",
+                                       RelationGetRelationName(onerel),
+                                       tups_vacuumed, num_tuples, nblocks),
+                        errdetail("%.0f dead tuples cannot be removed yet.\n"
+                               "Nonremovable tuples range from %lu to %lu bytes long.\n"
+                                          "There were %.0f unused item pointers.\n"
+                "Total free space (including removable tuples) is %.0f bytes.\n"
+                                          "%u pages are or will become empty, including %u at the end of the table.\n"
+                                          "%u pages containing %.0f free bytes are potential move destinations.\n"
+                                          "%s",
+                                          nkeep,
+                                          (unsigned long) min_tlen, (unsigned long) max_tlen,
+                                          nunused,
+                                          free_space,
+                                          empty_pages, empty_end_pages,
+                                          fraged_pages->num_pages, usable_free_space,
+                                          vac_show_rusage(&ru0))));
 }
 
 
@@ -1019,8 +1378,8 @@ Re-using: Free/Avail. Space %lu/%lu; EndEmpty/Avail. Pages %u/%u. %s",
  *     repair_frag() -- try to repair relation's fragmentation
  *
  *             This routine marks dead tuples as unused and tries re-use dead space
- *             by moving tuples (and inserting indices if needed). It constructs
- *             Nvacpagelist list of free-ed pages (moved tuples) and clean indices
+ *             by moving tuples (and inserting indexes if needed). It constructs
+ *             Nvacpagelist list of free-ed pages (moved tuples) and clean indexes
  *             for them after committing (in hack-manner - without losing locks
  *             and freeing memory!) current transaction. It truncates relation
  *             if some end-blocks are gone away.
@@ -1028,14 +1387,16 @@ Re-using: Free/Avail. Space %lu/%lu; EndEmpty/Avail. Pages %u/%u. %s",
 static void
 repair_frag(VRelStats *vacrelstats, Relation onerel,
                        VacPageList vacuum_pages, VacPageList fraged_pages,
-                       int nindices, Relation *Irel)
+                       int nindexes, Relation *Irel)
 {
        TransactionId myXID;
        CommandId       myCID;
        Buffer          buf,
                                cur_buffer;
-       int                     nblocks,
+       BlockNumber nblocks,
                                blkno;
+       BlockNumber last_move_dest_block = 0,
+                               last_vacuum_block;
        Page            page,
                                ToPage = NULL;
        OffsetNumber offnum,
@@ -1047,19 +1408,17 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
        HeapTupleData tuple,
                                newtup;
        TupleDesc       tupdesc;
-       IndexInfo **indexInfo = NULL;
-       Datum           idatum[INDEX_MAX_KEYS];
-       char            inulls[INDEX_MAX_KEYS];
-       InsertIndexResult iresult;
+       ResultRelInfo *resultRelInfo;
+       EState     *estate;
+       TupleTable      tupleTable;
+       TupleTableSlot *slot;
        VacPageListData Nvacpagelist;
        VacPage         cur_page = NULL,
                                last_vacuum_page,
                                vacpage,
                           *curpage;
        int                     cur_item = 0;
-       int                     last_move_dest_block = -1,
-                               last_vacuum_block,
-                               i = 0;
+       int                     i;
        Size            tuple_len;
        int                     num_moved,
                                num_fraged_pages,
@@ -1072,22 +1431,50 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                chain_tuple_moved;
        VacRUsage       ru0;
 
-       init_rusage(&ru0);
+       vac_init_rusage(&ru0);
 
        myXID = GetCurrentTransactionId();
        myCID = GetCurrentCommandId();
 
        tupdesc = RelationGetDescr(onerel);
 
-       if (Irel != (Relation *) NULL)          /* preparation for index' inserts */
-               indexInfo = get_index_desc(onerel, nindices, Irel);
+       /*
+        * We need a ResultRelInfo and an EState so we can use the regular
+        * executor's index-entry-making machinery.
+        */
+       estate = CreateExecutorState();
+
+       resultRelInfo = makeNode(ResultRelInfo);
+       resultRelInfo->ri_RangeTableIndex = 1;          /* dummy */
+       resultRelInfo->ri_RelationDesc = onerel;
+       resultRelInfo->ri_TrigDesc = NULL;      /* we don't fire triggers */
+
+       ExecOpenIndices(resultRelInfo);
+
+       estate->es_result_relations = resultRelInfo;
+       estate->es_num_result_relations = 1;
+       estate->es_result_relation_info = resultRelInfo;
+
+       /* Set up a dummy tuple table too */
+       tupleTable = ExecCreateTupleTable(1);
+       slot = ExecAllocTableSlot(tupleTable);
+       ExecSetSlotDescriptor(slot, tupdesc, false);
 
        Nvacpagelist.num_pages = 0;
        num_fraged_pages = fraged_pages->num_pages;
-       Assert(vacuum_pages->num_pages > vacuum_pages->empty_end_pages);
+       Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
        vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
-       last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
-       last_vacuum_block = last_vacuum_page->blkno;
+       if (vacuumed_pages > 0)
+       {
+               /* get last reaped page from vacuum_pages */
+               last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
+               last_vacuum_block = last_vacuum_page->blkno;
+       }
+       else
+       {
+               last_vacuum_page = NULL;
+               last_vacuum_block = InvalidBlockNumber;
+       }
        cur_buffer = InvalidBuffer;
        num_moved = 0;
 
@@ -1097,19 +1484,41 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
        /*
         * Scan pages backwards from the last nonempty page, trying to move
         * tuples down to lower pages.  Quit when we reach a page that we have
-        * moved any tuples onto.  Note that if a page is still in the
-        * fraged_pages list (list of candidate move-target pages) when we
-        * reach it, we will remove it from the list.  This ensures we never
-        * move a tuple up to a higher page number.
+        * moved any tuples onto, or the first page if we haven't moved
+        * anything, or when we find a page we cannot completely empty (this
+        * last condition is handled by "break" statements within the loop).
         *
         * NB: this code depends on the vacuum_pages and fraged_pages lists being
-        * in order, and on fraged_pages being a subset of vacuum_pages.
+        * in order by blkno.
         */
-       nblocks = vacrelstats->num_pages;
+       nblocks = vacrelstats->rel_pages;
        for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
                 blkno > last_move_dest_block;
                 blkno--)
        {
+               CHECK_FOR_INTERRUPTS();
+
+               /*
+                * Forget fraged_pages pages at or after this one; they're no
+                * longer useful as move targets, since we only want to move down.
+                * Note that since we stop the outer loop at last_move_dest_block,
+                * pages removed here cannot have had anything moved onto them
+                * already.
+                *
+                * Also note that we don't change the stored fraged_pages list, only
+                * our local variable num_fraged_pages; so the forgotten pages are
+                * still available to be loaded into the free space map later.
+                */
+               while (num_fraged_pages > 0 &&
+                       fraged_pages->pagedesc[num_fraged_pages - 1]->blkno >= blkno)
+               {
+                       Assert(fraged_pages->pagedesc[num_fraged_pages - 1]->offsets_used == 0);
+                       --num_fraged_pages;
+               }
+
+               /*
+                * Process this page of relation.
+                */
                buf = ReadBuffer(onerel, blkno);
                page = BufferGetPage(buf);
 
@@ -1118,10 +1527,13 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                isempty = PageIsEmpty(page);
 
                dowrite = false;
-               if (blkno == last_vacuum_block) /* it's reaped page */
+
+               /* Is the page in the vacuum_pages list? */
+               if (blkno == last_vacuum_block)
                {
-                       if (last_vacuum_page->offsets_free > 0)         /* there are dead tuples */
-                       {                                       /* on this page - clean */
+                       if (last_vacuum_page->offsets_free > 0)
+                       {
+                               /* there are dead tuples on this page - clean them */
                                Assert(!isempty);
                                LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
                                vacuum_page(onerel, buf, last_vacuum_page);
@@ -1140,14 +1552,7 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                        else
                        {
                                last_vacuum_page = NULL;
-                               last_vacuum_block = -1;
-                       }
-                       if (num_fraged_pages > 0 &&
-                               fraged_pages->pagedesc[num_fraged_pages - 1]->blkno ==
-                               (BlockNumber) blkno)
-                       {
-                               /* page is in fraged_pages too; remove it */
-                               --num_fraged_pages;
+                               last_vacuum_block = InvalidBlockNumber;
                        }
                        if (isempty)
                        {
@@ -1178,8 +1583,6 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
 
                        if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
                        {
-                               if ((TransactionId) tuple.t_data->t_cmin != myXID)
-                                       elog(ERROR, "Invalid XID in t_cmin");
                                if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
                                        elog(ERROR, "HEAP_MOVED_IN was not expected");
 
@@ -1190,6 +1593,8 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                 */
                                if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
                                {
+                                       if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
+                                               elog(ERROR, "invalid XVAC in tuple header");
                                        if (keep_tuples == 0)
                                                continue;
                                        if (chain_tuple_moved)          /* some chains was moved
@@ -1221,41 +1626,69 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                         * If this tuple is in the chain of tuples created in updates
                         * by "recent" transactions then we have to move all chain of
                         * tuples to another places.
+                        *
+                        * NOTE: this test is not 100% accurate: it is possible for a
+                        * tuple to be an updated one with recent xmin, and yet not
+                        * have a corresponding tuple in the vtlinks list.      Presumably
+                        * there was once a parent tuple with xmax matching the xmin,
+                        * but it's possible that that tuple has been removed --- for
+                        * example, if it had xmin = xmax then
+                        * HeapTupleSatisfiesVacuum would deem it removable as soon as
+                        * the xmin xact completes.
+                        *
+                        * To be on the safe side, we abandon the repair_frag process if
+                        * we cannot find the parent tuple in vtlinks.  This may be
+                        * overly conservative; AFAICS it would be safe to move the
+                        * chain.
                         */
-                       if ((tuple.t_data->t_infomask & HEAP_UPDATED &&
-                                tuple.t_data->t_xmin >= XmaxRecent) ||
-                               (!(tuple.t_data->t_infomask & HEAP_XMAX_INVALID) &&
-                                !(ItemPointerEquals(&(tuple.t_self), &(tuple.t_data->t_ctid)))))
+                       if (((tuple.t_data->t_infomask & HEAP_UPDATED) &&
+                        !TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
+                                                                       OldestXmin)) ||
+                               (!(tuple.t_data->t_infomask & (HEAP_XMAX_INVALID |
+                                                                                          HEAP_MARKED_FOR_UPDATE)) &&
+                                !(ItemPointerEquals(&(tuple.t_self),
+                                                                        &(tuple.t_data->t_ctid)))))
                        {
                                Buffer          Cbuf = buf;
+                               bool            freeCbuf = false;
+                               bool            chain_move_failed = false;
                                Page            Cpage;
                                ItemId          Citemid;
                                ItemPointerData Ctid;
                                HeapTupleData tp = tuple;
                                Size            tlen = tuple_len;
-                               VTupleMove      vtmove = (VTupleMove)
-                               palloc(100 * sizeof(VTupleMoveData));
-                               int                     num_vtmove = 0;
-                               int                     free_vtmove = 100;
+                               VTupleMove      vtmove;
+                               int                     num_vtmove;
+                               int                     free_vtmove;
                                VacPage         to_vacpage = NULL;
                                int                     to_item = 0;
-                               bool            freeCbuf = false;
                                int                     ti;
 
-                               if (vacrelstats->vtlinks == NULL)
-                                       elog(ERROR, "No one parent tuple was found");
                                if (cur_buffer != InvalidBuffer)
                                {
                                        WriteBuffer(cur_buffer);
                                        cur_buffer = InvalidBuffer;
                                }
 
+                               /* Quick exit if we have no vtlinks to search in */
+                               if (vacrelstats->vtlinks == NULL)
+                               {
+                                       elog(DEBUG2, "parent item in update-chain not found --- can't continue repair_frag");
+                                       break;          /* out of walk-along-page loop */
+                               }
+
+                               vtmove = (VTupleMove) palloc(100 * sizeof(VTupleMoveData));
+                               num_vtmove = 0;
+                               free_vtmove = 100;
+
                                /*
                                 * If this tuple is in the begin/middle of the chain then
                                 * we have to move to the end of chain.
                                 */
-                               while (!(tp.t_data->t_infomask & HEAP_XMAX_INVALID) &&
-                               !(ItemPointerEquals(&(tp.t_self), &(tp.t_data->t_ctid))))
+                               while (!(tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
+                                                                                         HEAP_MARKED_FOR_UPDATE)) &&
+                                          !(ItemPointerEquals(&(tp.t_self),
+                                                                                  &(tp.t_data->t_ctid))))
                                {
                                        Ctid = tp.t_data->t_ctid;
                                        if (freeCbuf)
@@ -1268,79 +1701,73 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                                                          ItemPointerGetOffsetNumber(&Ctid));
                                        if (!ItemIdIsUsed(Citemid))
                                        {
-
                                                /*
                                                 * This means that in the middle of chain there
-                                                * was tuple updated by older (than XmaxRecent)
+                                                * was tuple updated by older (than OldestXmin)
                                                 * xaction and this tuple is already deleted by
                                                 * me. Actually, upper part of chain should be
                                                 * removed and seems that this should be handled
                                                 * in scan_heap(), but it's not implemented at the
                                                 * moment and so we just stop shrinking here.
                                                 */
-                                               ReleaseBuffer(Cbuf);
-                                               pfree(vtmove);
-                                               vtmove = NULL;
-                                               elog(NOTICE, "Child itemid in update-chain marked as unused - can't continue repair_frag");
-                                               break;
+                                               elog(DEBUG2, "child itemid in update-chain marked as unused --- can't continue repair_frag");
+                                               chain_move_failed = true;
+                                               break;  /* out of loop to move to chain end */
                                        }
                                        tp.t_datamcxt = NULL;
                                        tp.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
                                        tp.t_self = Ctid;
                                        tlen = tp.t_len = ItemIdGetLength(Citemid);
                                }
-                               if (vtmove == NULL)
-                                       break;
-                               /* first, can chain be moved ? */
+                               if (chain_move_failed)
+                               {
+                                       if (freeCbuf)
+                                               ReleaseBuffer(Cbuf);
+                                       pfree(vtmove);
+                                       break;          /* out of walk-along-page loop */
+                               }
+
+                               /*
+                                * Check if all items in chain can be moved
+                                */
                                for (;;)
                                {
+                                       Buffer          Pbuf;
+                                       Page            Ppage;
+                                       ItemId          Pitemid;
+                                       HeapTupleData Ptp;
+                                       VTupleLinkData vtld,
+                                                          *vtlp;
+
                                        if (to_vacpage == NULL ||
                                                !enough_space(to_vacpage, tlen))
                                        {
-
-                                               /*
-                                                * if to_vacpage no longer has enough free space
-                                                * to be useful, remove it from fraged_pages list
-                                                */
-                                               if (to_vacpage != NULL &&
-                                               !enough_space(to_vacpage, vacrelstats->min_tlen))
-                                               {
-                                                       Assert(num_fraged_pages > to_item);
-                                                       memmove(fraged_pages->pagedesc + to_item,
-                                                                       fraged_pages->pagedesc + to_item + 1,
-                                                                       sizeof(VacPage) * (num_fraged_pages - to_item - 1));
-                                                       num_fraged_pages--;
-                                               }
                                                for (i = 0; i < num_fraged_pages; i++)
                                                {
                                                        if (enough_space(fraged_pages->pagedesc[i], tlen))
                                                                break;
                                                }
 
-                                               /* can't move item anywhere */
                                                if (i == num_fraged_pages)
                                                {
-                                                       for (i = 0; i < num_vtmove; i++)
-                                                       {
-                                                               Assert(vtmove[i].vacpage->offsets_used > 0);
-                                                               (vtmove[i].vacpage->offsets_used)--;
-                                                       }
-                                                       num_vtmove = 0;
-                                                       break;
+                                                       /* can't move item anywhere */
+                                                       chain_move_failed = true;
+                                                       break;          /* out of check-all-items loop */
                                                }
                                                to_item = i;
                                                to_vacpage = fraged_pages->pagedesc[to_item];
                                        }
                                        to_vacpage->free -= MAXALIGN(tlen);
                                        if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
-                                               to_vacpage->free -= MAXALIGN(sizeof(ItemIdData));
+                                               to_vacpage->free -= sizeof(ItemIdData);
                                        (to_vacpage->offsets_used)++;
                                        if (free_vtmove == 0)
                                        {
                                                free_vtmove = 1000;
-                                               vtmove = (VTupleMove) repalloc(vtmove,
-                                                                                        (free_vtmove + num_vtmove) *
-                                                                                                sizeof(VTupleMoveData));
+                                               vtmove = (VTupleMove)
+                                                       repalloc(vtmove,
+                                                                        (free_vtmove + num_vtmove) *
+                                                                        sizeof(VTupleMoveData));
                                        }
                                        vtmove[num_vtmove].tid = tp.t_self;
                                        vtmove[num_vtmove].vacpage = to_vacpage;
@@ -1351,111 +1778,95 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                        free_vtmove--;
                                        num_vtmove++;
 
-                                       /* All done ? */
+                                       /* At beginning of chain? */
                                        if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
-                                               tp.t_data->t_xmin < XmaxRecent)
+                                               TransactionIdPrecedes(HeapTupleHeaderGetXmin(tp.t_data),
+                                                                                         OldestXmin))
                                                break;
 
-                                       /* Well, try to find tuple with old row version */
-                                       for (;;)
+                                       /* No, move to tuple with prior row version */
+                                       vtld.new_tid = tp.t_self;
+                                       vtlp = (VTupleLink)
+                                               vac_bsearch((void *) &vtld,
+                                                                       (void *) (vacrelstats->vtlinks),
+                                                                       vacrelstats->num_vtlinks,
+                                                                       sizeof(VTupleLinkData),
+                                                                       vac_cmp_vtlinks);
+                                       if (vtlp == NULL)
                                        {
-                                               Buffer          Pbuf;
-                                               Page            Ppage;
-                                               ItemId          Pitemid;
-                                               HeapTupleData Ptp;
-                                               VTupleLinkData vtld,
-                                                                  *vtlp;
-
-                                               vtld.new_tid = tp.t_self;
-                                               vtlp = (VTupleLink)
-                                                       vac_bsearch((void *) &vtld,
-                                                                               (void *) (vacrelstats->vtlinks),
-                                                                               vacrelstats->num_vtlinks,
-                                                                               sizeof(VTupleLinkData),
-                                                                               vac_cmp_vtlinks);
-                                               if (vtlp == NULL)
-                                                       elog(ERROR, "Parent tuple was not found");
-                                               tp.t_self = vtlp->this_tid;
-                                               Pbuf = ReadBuffer(onerel,
+                                               /* see discussion above */
+                                               elog(DEBUG2, "parent item in update-chain not found --- can't continue repair_frag");
+                                               chain_move_failed = true;
+                                               break;  /* out of check-all-items loop */
+                                       }
+                                       tp.t_self = vtlp->this_tid;
+                                       Pbuf = ReadBuffer(onerel,
                                                                ItemPointerGetBlockNumber(&(tp.t_self)));
-                                               Ppage = BufferGetPage(Pbuf);
-                                               Pitemid = PageGetItemId(Ppage,
+                                       Ppage = BufferGetPage(Pbuf);
+                                       Pitemid = PageGetItemId(Ppage,
                                                           ItemPointerGetOffsetNumber(&(tp.t_self)));
-                                               if (!ItemIdIsUsed(Pitemid))
-                                                       elog(ERROR, "Parent itemid marked as unused");
-                                               Ptp.t_datamcxt = NULL;
-                                               Ptp.t_data = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
-                                               Assert(ItemPointerEquals(&(vtld.new_tid),
-                                                                                                &(Ptp.t_data->t_ctid)));
+                                       /* this can't happen since we saw tuple earlier: */
+                                       if (!ItemIdIsUsed(Pitemid))
+                                               elog(ERROR, "parent itemid marked as unused");
+                                       Ptp.t_datamcxt = NULL;
+                                       Ptp.t_data = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
 
-                                               /*
-                                                * Read above about cases when
-                                                * !ItemIdIsUsed(Citemid) (child item is
-                                                * removed)... Due to the fact that at the moment
-                                                * we don't remove unuseful part of update-chain,
-                                                * it's possible to get too old parent row here.
-                                                * Like as in the case which caused this problem,
-                                                * we stop shrinking here. I could try to find
-                                                * real parent row but want not to do it because
-                                                * of real solution will be implemented anyway,
-                                                * latter, and we are too close to 6.5 release. -
-                                                * vadim 06/11/99
-                                                */
-                                               if (Ptp.t_data->t_xmax != tp.t_data->t_xmin)
-                                               {
-                                                       if (freeCbuf)
-                                                               ReleaseBuffer(Cbuf);
-                                                       freeCbuf = false;
-                                                       ReleaseBuffer(Pbuf);
-                                                       for (i = 0; i < num_vtmove; i++)
-                                                       {
-                                                               Assert(vtmove[i].vacpage->offsets_used > 0);
-                                                               (vtmove[i].vacpage->offsets_used)--;
-                                                       }
-                                                       num_vtmove = 0;
-                                                       elog(NOTICE, "Too old parent tuple found - can't continue repair_frag");
-                                                       break;
-                                               }
-#ifdef NOT_USED                                        /* I'm not sure that this will wotk
-                                                                * properly... */
+                                       /* ctid should not have changed since we saved it */
+                                       Assert(ItemPointerEquals(&(vtld.new_tid),
+                                                                                        &(Ptp.t_data->t_ctid)));
 
-                                               /*
-                                                * If this tuple is updated version of row and it
-                                                * was created by the same transaction then no one
-                                                * is interested in this tuple - mark it as
-                                                * removed.
-                                                */
-                                               if (Ptp.t_data->t_infomask & HEAP_UPDATED &&
-                                                       Ptp.t_data->t_xmin == Ptp.t_data->t_xmax)
-                                               {
-                                                       TransactionIdStore(myXID,
-                                                               (TransactionId *) &(Ptp.t_data->t_cmin));
-                                                       Ptp.t_data->t_infomask &=
-                                                               ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
-                                                       Ptp.t_data->t_infomask |= HEAP_MOVED_OFF;
-                                                       WriteBuffer(Pbuf);
-                                                       continue;
-                                               }
-#endif
-                                               tp.t_datamcxt = Ptp.t_datamcxt;
-                                               tp.t_data = Ptp.t_data;
-                                               tlen = tp.t_len = ItemIdGetLength(Pitemid);
-                                               if (freeCbuf)
-                                                       ReleaseBuffer(Cbuf);
-                                               Cbuf = Pbuf;
-                                               freeCbuf = true;
-                                               break;
+                                       /*
+                                        * Read above about cases when !ItemIdIsUsed(Citemid)
+                                        * (child item is removed)... Due to the fact that at
+                                        * the moment we don't remove unuseful part of
+                                        * update-chain, it's possible to get too old parent
+                                        * row here. Like as in the case which caused this
+                                        * problem, we stop shrinking here. I could try to
+                                        * find real parent row but want not to do it because
+                                        * of real solution will be implemented anyway, later,
+                                        * and we are too close to 6.5 release. - vadim
+                                        * 06/11/99
+                                        */
+                                       if (!(TransactionIdEquals(HeapTupleHeaderGetXmax(Ptp.t_data),
+                                                                        HeapTupleHeaderGetXmin(tp.t_data))))
+                                       {
+                                               ReleaseBuffer(Pbuf);
+                                               elog(DEBUG2, "too old parent tuple found --- can't continue repair_frag");
+                                               chain_move_failed = true;
+                                               break;  /* out of check-all-items loop */
                                        }
-                                       if (num_vtmove == 0)
-                                               break;
-                               }
+                                       tp.t_datamcxt = Ptp.t_datamcxt;
+                                       tp.t_data = Ptp.t_data;
+                                       tlen = tp.t_len = ItemIdGetLength(Pitemid);
+                                       if (freeCbuf)
+                                               ReleaseBuffer(Cbuf);
+                                       Cbuf = Pbuf;
+                                       freeCbuf = true;
+                               }                               /* end of check-all-items loop */
+
                                if (freeCbuf)
                                        ReleaseBuffer(Cbuf);
-                               if (num_vtmove == 0)    /* chain can't be moved */
+                               freeCbuf = false;
+
+                               if (chain_move_failed)
                                {
+                                       /*
+                                        * Undo changes to offsets_used state.  We don't
+                                        * bother cleaning up the amount-free state, since
+                                        * we're not going to do any further tuple motion.
+                                        */
+                                       for (i = 0; i < num_vtmove; i++)
+                                       {
+                                               Assert(vtmove[i].vacpage->offsets_used > 0);
+                                               (vtmove[i].vacpage->offsets_used)--;
+                                       }
                                        pfree(vtmove);
-                                       break;
+                                       break;          /* out of walk-along-page loop */
                                }
+
+                               /*
+                                * Okay, move the whle tuple chain
+                                */
                                ItemPointerSetInvalid(&Ctid);
                                for (ti = 0; ti < num_vtmove; ti++)
                                {
@@ -1488,15 +1899,19 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                         */
                                        heap_copytuple_with_tuple(&tuple, &newtup);
 
-                                       RelationInvalidateHeapTuple(onerel, &tuple);
+                                       /*
+                                        * register invalidation of source tuple in catcaches.
+                                        */
+                                       CacheInvalidateHeapTuple(onerel, &tuple);
 
-                                       /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
+                                       /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
                                        START_CRIT_SECTION();
 
-                                       TransactionIdStore(myXID, (TransactionId *) &(tuple.t_data->t_cmin));
-                                       tuple.t_data->t_infomask &=
-                                               ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
+                                       tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
+                                                                                                 HEAP_XMIN_INVALID |
+                                                                                                 HEAP_MOVED_IN);
                                        tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
+                                       HeapTupleHeaderSetXvac(tuple.t_data, myXID);
 
                                        /*
                                         * If this page was not used before - clean it.
@@ -1533,15 +1948,19 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                         * Update the state of the copied tuple, and store it
                                         * on the destination page.
                                         */
-                                       TransactionIdStore(myXID, (TransactionId *) &(newtup.t_data->t_cmin));
-                                       newtup.t_data->t_infomask &=
-                                               ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF);
+                                       newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
+                                                                                                  HEAP_XMIN_INVALID |
+                                                                                                  HEAP_MOVED_OFF);
                                        newtup.t_data->t_infomask |= HEAP_MOVED_IN;
-                                       newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
-                                                                                InvalidOffsetNumber, LP_USED);
+                                       HeapTupleHeaderSetXvac(newtup.t_data, myXID);
+                                       newoff = PageAddItem(ToPage,
+                                                                                (Item) newtup.t_data,
+                                                                                tuple_len,
+                                                                                InvalidOffsetNumber,
+                                                                                LP_USED);
                                        if (newoff == InvalidOffsetNumber)
                                        {
-                                               elog(STOP, "moving chain: failed to add item with len = %lu to page %u",
+                                               elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
                                                  (unsigned long) tuple_len, destvacpage->blkno);
                                        }
                                        newitemid = PageGetItemId(ToPage, newoff);
@@ -1550,6 +1969,8 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                        newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
                                        ItemPointerSet(&(newtup.t_self), destvacpage->blkno, newoff);
 
+                                       /* XLOG stuff */
+                                       if (!onerel->rd_istemp)
                                        {
                                                XLogRecPtr      recptr =
                                                log_heap_move(onerel, Cbuf, tuple.t_self,
@@ -1563,9 +1984,18 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                                PageSetLSN(ToPage, recptr);
                                                PageSetSUI(ToPage, ThisStartUpID);
                                        }
+                                       else
+                                       {
+                                               /*
+                                                * No XLOG record, but still need to flag that XID
+                                                * exists on disk
+                                                */
+                                               MyXactMadeTempRelUpdate = true;
+                                       }
+
                                        END_CRIT_SECTION();
 
-                                       if (((int) destvacpage->blkno) > last_move_dest_block)
+                                       if (destvacpage->blkno > last_move_dest_block)
                                                last_move_dest_block = destvacpage->blkno;
 
                                        /*
@@ -1595,43 +2025,25 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                        if (cur_buffer != Cbuf)
                                                LockBuffer(Cbuf, BUFFER_LOCK_UNLOCK);
 
-                                       if (Irel != (Relation *) NULL)
+                                       /* Create index entries for the moved tuple */
+                                       if (resultRelInfo->ri_NumIndices > 0)
                                        {
-
-                                               /*
-                                                * XXX using CurrentMemoryContext here means
-                                                * intra-vacuum memory leak for functional
-                                                * indexes. Should fix someday.
-                                                *
-                                                * XXX This code fails to handle partial indexes!
-                                                * Probably should change it to use
-                                                * ExecOpenIndices.
-                                                */
-                                               for (i = 0; i < nindices; i++)
-                                               {
-                                                       FormIndexDatum(indexInfo[i],
-                                                                                  &newtup,
-                                                                                  tupdesc,
-                                                                                  CurrentMemoryContext,
-                                                                                  idatum,
-                                                                                  inulls);
-                                                       iresult = index_insert(Irel[i],
-                                                                                                  idatum,
-                                                                                                  inulls,
-                                                                                                  &newtup.t_self,
-                                                                                                  onerel);
-                                                       if (iresult)
-                                                               pfree(iresult);
-                                               }
+                                               ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
+                                               ExecInsertIndexTuples(slot, &(newtup.t_self),
+                                                                                         estate, true);
                                        }
+
                                        WriteBuffer(cur_buffer);
                                        WriteBuffer(Cbuf);
-                               }
+                               }                               /* end of move-the-tuple-chain loop */
+
                                cur_buffer = InvalidBuffer;
                                pfree(vtmove);
                                chain_tuple_moved = true;
+
+                               /* advance to next tuple in walk-along-page loop */
                                continue;
-                       }
+                       }                                       /* end of is-tuple-in-chain test */
 
                        /* try to find new page for this tuple */
                        if (cur_buffer == InvalidBuffer ||
@@ -1641,19 +2053,6 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                {
                                        WriteBuffer(cur_buffer);
                                        cur_buffer = InvalidBuffer;
-
-                                       /*
-                                        * If previous target page is now too full to add *any*
-                                        * tuple to it, remove it from fraged_pages.
-                                        */
-                                       if (!enough_space(cur_page, vacrelstats->min_tlen))
-                                       {
-                                               Assert(num_fraged_pages > cur_item);
-                                               memmove(fraged_pages->pagedesc + cur_item,
-                                                               fraged_pages->pagedesc + cur_item + 1,
-                                                               sizeof(VacPage) * (num_fraged_pages - cur_item - 1));
-                                               num_fraged_pages--;
-                                       }
                                }
                                for (i = 0; i < num_fraged_pages; i++)
                                {
@@ -1679,28 +2078,35 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                        /* copy tuple */
                        heap_copytuple_with_tuple(&tuple, &newtup);
 
-                       RelationInvalidateHeapTuple(onerel, &tuple);
+                       /*
+                        * register invalidation of source tuple in catcaches.
+                        *
+                        * (Note: we do not need to register the copied tuple, because we
+                        * are not changing the tuple contents and so there cannot be
+                        * any need to flush negative catcache entries.)
+                        */
+                       CacheInvalidateHeapTuple(onerel, &tuple);
 
-                       /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
+                       /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
                        START_CRIT_SECTION();
 
                        /*
-                        * Mark new tuple as moved_in by vacuum and store vacuum XID
-                        * in t_cmin !!!
+                        * Mark new tuple as MOVED_IN by me.
                         */
-                       TransactionIdStore(myXID, (TransactionId *) &(newtup.t_data->t_cmin));
-                       newtup.t_data->t_infomask &=
-                               ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF);
+                       newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
+                                                                                  HEAP_XMIN_INVALID |
+                                                                                  HEAP_MOVED_OFF);
                        newtup.t_data->t_infomask |= HEAP_MOVED_IN;
+                       HeapTupleHeaderSetXvac(newtup.t_data, myXID);
 
                        /* add tuple to the page */
                        newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
                                                                 InvalidOffsetNumber, LP_USED);
                        if (newoff == InvalidOffsetNumber)
                        {
-                               elog(STOP, "\
-failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
-                                        (unsigned long) tuple_len, cur_page->blkno, (unsigned long) cur_page->free,
+                               elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
+                                        (unsigned long) tuple_len,
+                                        cur_page->blkno, (unsigned long) cur_page->free,
                                         cur_page->offsets_used, cur_page->offsets_free);
                        }
                        newitemid = PageGetItemId(ToPage, newoff);
@@ -1711,14 +2117,16 @@ failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)"
                        newtup.t_self = newtup.t_data->t_ctid;
 
                        /*
-                        * Mark old tuple as moved_off by vacuum and store vacuum XID
-                        * in t_cmin !!!
+                        * Mark old tuple as MOVED_OFF by me.
                         */
-                       TransactionIdStore(myXID, (TransactionId *) &(tuple.t_data->t_cmin));
-                       tuple.t_data->t_infomask &=
-                               ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
+                       tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
+                                                                                 HEAP_XMIN_INVALID |
+                                                                                 HEAP_MOVED_IN);
                        tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
+                       HeapTupleHeaderSetXvac(tuple.t_data, myXID);
 
+                       /* XLOG stuff */
+                       if (!onerel->rd_istemp)
                        {
                                XLogRecPtr      recptr =
                                log_heap_move(onerel, buf, tuple.t_self,
@@ -1729,12 +2137,21 @@ failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)"
                                PageSetLSN(ToPage, recptr);
                                PageSetSUI(ToPage, ThisStartUpID);
                        }
+                       else
+                       {
+                               /*
+                                * No XLOG record, but still need to flag that XID exists
+                                * on disk
+                                */
+                               MyXactMadeTempRelUpdate = true;
+                       }
+
                        END_CRIT_SECTION();
 
                        cur_page->offsets_used++;
                        num_moved++;
                        cur_page->free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower;
-                       if (((int) cur_page->blkno) > last_move_dest_block)
+                       if (cur_page->blkno > last_move_dest_block)
                                last_move_dest_block = cur_page->blkno;
 
                        vacpage->offsets[vacpage->offsets_free++] = offnum;
@@ -1743,40 +2160,27 @@ failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)"
                        LockBuffer(buf, BUFFER_LOCK_UNLOCK);
 
                        /* insert index' tuples if needed */
-                       if (Irel != (Relation *) NULL)
+                       if (resultRelInfo->ri_NumIndices > 0)
                        {
-
-                               /*
-                                * XXX using CurrentMemoryContext here means intra-vacuum
-                                * memory leak for functional indexes. Should fix someday.
-                                *
-                                * XXX This code fails to handle partial indexes! Probably
-                                * should change it to use ExecOpenIndices.
-                                */
-                               for (i = 0; i < nindices; i++)
-                               {
-                                       FormIndexDatum(indexInfo[i],
-                                                                  &newtup,
-                                                                  tupdesc,
-                                                                  CurrentMemoryContext,
-                                                                  idatum,
-                                                                  inulls);
-                                       iresult = index_insert(Irel[i],
-                                                                                  idatum,
-                                                                                  inulls,
-                                                                                  &newtup.t_self,
-                                                                                  onerel);
-                                       if (iresult)
-                                               pfree(iresult);
-                               }
+                               ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
+                               ExecInsertIndexTuples(slot, &(newtup.t_self), estate, true);
                        }
-
                }                                               /* walk along page */
 
+               /*
+                * If we broke out of the walk-along-page loop early (ie, still
+                * have offnum <= maxoff), then we failed to move some tuple off
+                * this page.  No point in shrinking any more, so clean up and
+                * exit the per-page loop.
+                */
                if (offnum < maxoff && keep_tuples > 0)
                {
                        OffsetNumber off;
 
+                       /*
+                        * Fix vacpage state for any unvisited tuples remaining on
+                        * page
+                        */
                        for (off = OffsetNumberNext(offnum);
                                 off <= maxoff;
                                 off = OffsetNumberNext(off))
@@ -1788,12 +2192,12 @@ failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)"
                                tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
                                if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)
                                        continue;
-                               if ((TransactionId) tuple.t_data->t_cmin != myXID)
-                                       elog(ERROR, "Invalid XID in t_cmin (4)");
                                if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
-                                       elog(ERROR, "HEAP_MOVED_IN was not expected (2)");
+                                       elog(ERROR, "HEAP_MOVED_IN was not expected");
                                if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
                                {
+                                       if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
+                                               elog(ERROR, "invalid XVAC in tuple header");
                                        /* some chains was moved while */
                                        if (chain_tuple_moved)
                                        {                       /* cleaning this page */
@@ -1817,6 +2221,8 @@ failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)"
                                                keep_tuples--;
                                        }
                                }
+                               else
+                                       elog(ERROR, "HEAP_MOVED_OFF was expected");
                        }
                }
 
@@ -1827,7 +2233,7 @@ failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)"
                                qsort((char *) (vacpage->offsets), vacpage->offsets_free,
                                          sizeof(OffsetNumber), vac_cmp_offno);
                        }
-                       reap_page(&Nvacpagelist, vacpage);
+                       vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
                        WriteBuffer(buf);
                }
                else if (dowrite)
@@ -1836,7 +2242,7 @@ failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)"
                        ReleaseBuffer(buf);
 
                if (offnum <= maxoff)
-                       break;                          /* some item(s) left */
+                       break;                          /* had to quit early, see above note */
 
        }                                                       /* walk along relation */
 
@@ -1850,13 +2256,12 @@ failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)"
 
        if (num_moved > 0)
        {
-
                /*
                 * We have to commit our tuple movings before we truncate the
                 * relation.  Ideally we should do Commit/StartTransactionCommand
                 * here, relying on the session-level table lock to protect our
                 * exclusive access to the relation.  However, that would require
-                * a lot of extra code to close and re-open the relation, indices,
+                * a lot of extra code to close and re-open the relation, indexes,
                 * etc.  For now, a quick hack: record status of current
                 * transaction as committed, and continue.
                 */
@@ -1864,62 +2269,102 @@ failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)"
        }
 
        /*
-        * Clean uncleaned reaped pages from vacuum_pages list list and set
-        * xmin committed for inserted tuples
+        * We are not going to move any more tuples across pages, but we still
+        * need to apply vacuum_page to compact free space in the remaining
+        * pages in vacuum_pages list.  Note that some of these pages may also
+        * be in the fraged_pages list, and may have had tuples moved onto
+        * them; if so, we already did vacuum_page and needn't do it again.
         */
-       checked_moved = 0;
-       for (i = 0, curpage = vacuum_pages->pagedesc; i < vacuumed_pages; i++, curpage++)
+       for (i = 0, curpage = vacuum_pages->pagedesc;
+                i < vacuumed_pages;
+                i++, curpage++)
        {
-               Assert((*curpage)->blkno < (BlockNumber) blkno);
-               buf = ReadBuffer(onerel, (*curpage)->blkno);
-               LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
-               page = BufferGetPage(buf);
-               if ((*curpage)->offsets_used == 0)              /* this page was not used */
+               CHECK_FOR_INTERRUPTS();
+               Assert((*curpage)->blkno < blkno);
+               if ((*curpage)->offsets_used == 0)
                {
+                       /* this page was not used as a move target, so must clean it */
+                       buf = ReadBuffer(onerel, (*curpage)->blkno);
+                       LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+                       page = BufferGetPage(buf);
                        if (!PageIsEmpty(page))
                                vacuum_page(onerel, buf, *curpage);
+                       LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+                       WriteBuffer(buf);
                }
-               else
-/* this page was used */
+       }
+
+       /*
+        * Now scan all the pages that we moved tuples onto and update tuple
+        * status bits.  This is not really necessary, but will save time for
+        * future transactions examining these tuples.
+        *
+        * XXX NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
+        * pages that were move source pages but not move dest pages.  One
+        * also wonders whether it wouldn't be better to skip this step and
+        * let the tuple status updates happen someplace that's not holding an
+        * exclusive lock on the relation.
+        */
+       checked_moved = 0;
+       for (i = 0, curpage = fraged_pages->pagedesc;
+                i < num_fraged_pages;
+                i++, curpage++)
+       {
+               CHECK_FOR_INTERRUPTS();
+               Assert((*curpage)->blkno < blkno);
+               if ((*curpage)->blkno > last_move_dest_block)
+                       break;                          /* no need to scan any further */
+               if ((*curpage)->offsets_used == 0)
+                       continue;                       /* this page was never used as a move dest */
+               buf = ReadBuffer(onerel, (*curpage)->blkno);
+               LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+               page = BufferGetPage(buf);
+               num_tuples = 0;
+               max_offset = PageGetMaxOffsetNumber(page);
+               for (newoff = FirstOffsetNumber;
+                        newoff <= max_offset;
+                        newoff = OffsetNumberNext(newoff))
                {
-                       num_tuples = 0;
-                       max_offset = PageGetMaxOffsetNumber(page);
-                       for (newoff = FirstOffsetNumber;
-                                newoff <= max_offset;
-                                newoff = OffsetNumberNext(newoff))
+                       itemid = PageGetItemId(page, newoff);
+                       if (!ItemIdIsUsed(itemid))
+                               continue;
+                       tuple.t_datamcxt = NULL;
+                       tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
+                       if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
                        {
-                               itemid = PageGetItemId(page, newoff);
-                               if (!ItemIdIsUsed(itemid))
-                                       continue;
-                               tuple.t_datamcxt = NULL;
-                               tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
-                               if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
+                               if (!(tuple.t_data->t_infomask & HEAP_MOVED))
+                                       elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
+                               if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
+                                       elog(ERROR, "invalid XVAC in tuple header");
+                               if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
                                {
-                                       if ((TransactionId) tuple.t_data->t_cmin != myXID)
-                                               elog(ERROR, "Invalid XID in t_cmin (2)");
-                                       if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
-                                       {
-                                               tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
-                                               num_tuples++;
-                                       }
-                                       else if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
-                                               tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
-                                       else
-                                               elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
+                                       tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
+                                       tuple.t_data->t_infomask &= ~HEAP_MOVED;
+                                       num_tuples++;
                                }
+                               else
+                                       tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
                        }
-                       Assert((*curpage)->offsets_used == num_tuples);
-                       checked_moved += num_tuples;
                }
                LockBuffer(buf, BUFFER_LOCK_UNLOCK);
                WriteBuffer(buf);
+               Assert((*curpage)->offsets_used == num_tuples);
+               checked_moved += num_tuples;
        }
        Assert(num_moved == checked_moved);
 
-       elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u; Tuple(s) moved: %u. %s",
-                RelationGetRelationName(onerel),
-                nblocks, blkno, num_moved,
-                show_rusage(&ru0));
+       /*
+        * It'd be cleaner to make this report at the bottom of this routine,
+        * but then the rusage would double-count the second pass of index
+        * vacuuming.  So do it here and ignore the relatively small amount of
+        * processing that occurs below.
+        */
+       ereport(elevel,
+                       (errmsg("\"%s\": moved %u tuples, truncated %u to %u pages",
+                                       RelationGetRelationName(onerel),
+                                       num_moved, nblocks, blkno),
+                        errdetail("%s",
+                                          vac_show_rusage(&ru0))));
 
        /*
         * Reflect the motion of system tuples to catalog cache here.
@@ -1928,7 +2373,7 @@ failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)"
 
        if (Nvacpagelist.num_pages > 0)
        {
-               /* vacuum indices again if needed */
+               /* vacuum indexes again if needed */
                if (Irel != (Relation *) NULL)
                {
                        VacPage    *vpleft,
@@ -1945,17 +2390,16 @@ failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)"
                                *vpright = vpsave;
                        }
                        Assert(keep_tuples >= 0);
-                       for (i = 0; i < nindices; i++)
+                       for (i = 0; i < nindexes; i++)
                                vacuum_index(&Nvacpagelist, Irel[i],
-                                                        vacrelstats->num_tuples, keep_tuples);
+                                                        vacrelstats->rel_tuples, keep_tuples);
                }
 
                /* clean moved tuples from last page in Nvacpagelist list */
-               if (vacpage->blkno == (BlockNumber) (blkno - 1) &&
+               if (vacpage->blkno == (blkno - 1) &&
                        vacpage->offsets_free > 0)
                {
-                       OffsetNumber unbuf[BLCKSZ/sizeof(OffsetNumber)];
-                       OffsetNumber *unused = unbuf;
+                       OffsetNumber unused[BLCKSZ / sizeof(OffsetNumber)];
                        int                     uncnt;
 
                        buf = ReadBuffer(onerel, vacpage->blkno);
@@ -1975,30 +2419,44 @@ failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)"
 
                                if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
                                {
-                                       if ((TransactionId) tuple.t_data->t_cmin != myXID)
-                                               elog(ERROR, "Invalid XID in t_cmin (3)");
                                        if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
                                        {
+                                               if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
+                                                       elog(ERROR, "invalid XVAC in tuple header");
                                                itemid->lp_flags &= ~LP_USED;
                                                num_tuples++;
                                        }
                                        else
-                                               elog(ERROR, "HEAP_MOVED_OFF was expected (2)");
+                                               elog(ERROR, "HEAP_MOVED_OFF was expected");
                                }
 
                        }
                        Assert(vacpage->offsets_free == num_tuples);
+
                        START_CRIT_SECTION();
+
                        uncnt = PageRepairFragmentation(page, unused);
+
+                       /* XLOG stuff */
+                       if (!onerel->rd_istemp)
                        {
                                XLogRecPtr      recptr;
 
-                               recptr = log_heap_clean(onerel, buf, (char *) unused,
-                                                 (char *) (&(unused[uncnt])) - (char *) unused);
+                               recptr = log_heap_clean(onerel, buf, unused, uncnt);
                                PageSetLSN(page, recptr);
                                PageSetSUI(page, ThisStartUpID);
                        }
+                       else
+                       {
+                               /*
+                                * No XLOG record, but still need to flag that XID exists
+                                * on disk
+                                */
+                               MyXactMadeTempRelUpdate = true;
+                       }
+
                        END_CRIT_SECTION();
+
                        LockBuffer(buf, BUFFER_LOCK_UNLOCK);
                        WriteBuffer(buf);
                }
@@ -2018,26 +2476,27 @@ failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)"
         */
        i = FlushRelationBuffers(onerel, blkno);
        if (i < 0)
-               elog(ERROR, "VACUUM (repair_frag): FlushRelationBuffers returned %d",
-                        i);
+               elog(ERROR, "FlushRelationBuffers returned %d", i);
 
        /* truncate relation, if needed */
        if (blkno < nblocks)
        {
                blkno = smgrtruncate(DEFAULT_SMGR, onerel, blkno);
-               Assert(blkno >= 0);
-               vacrelstats->num_pages = blkno; /* set new number of blocks */
-       }
-
-       if (Irel != (Relation *) NULL)          /* pfree index' allocations */
-       {
-               close_indices(nindices, Irel);
-               pfree(indexInfo);
+               onerel->rd_nblocks = blkno;             /* update relcache immediately */
+               onerel->rd_targblock = InvalidBlockNumber;
+               vacrelstats->rel_pages = blkno; /* set new number of blocks */
        }
 
+       /* clean up */
        pfree(vacpage);
        if (vacrelstats->vtlinks != NULL)
                pfree(vacrelstats->vtlinks);
+
+       ExecDropTupleTable(tupleTable, true);
+
+       ExecCloseIndices(resultRelInfo);
+
+       FreeExecutorState(estate);
 }
 
 /*
@@ -2051,7 +2510,8 @@ vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
 {
        Buffer          buf;
        VacPage    *vacpage;
-       long            nblocks;
+       BlockNumber relblocks;
+       int                     nblocks;
        int                     i;
 
        nblocks = vacuum_pages->num_pages;
@@ -2059,6 +2519,7 @@ vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
 
        for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
        {
+               CHECK_FOR_INTERRUPTS();
                if ((*vacpage)->offsets_free > 0)
                {
                        buf = ReadBuffer(onerel, (*vacpage)->blkno);
@@ -2075,23 +2536,24 @@ vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
         * tuples have correct on-row commit status on disk (see bufmgr.c's
         * comments for FlushRelationBuffers()).
         */
-       Assert(vacrelstats->num_pages >= vacuum_pages->empty_end_pages);
-       nblocks = vacrelstats->num_pages - vacuum_pages->empty_end_pages;
+       Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages);
+       relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages;
 
-       i = FlushRelationBuffers(onerel, nblocks);
+       i = FlushRelationBuffers(onerel, relblocks);
        if (i < 0)
-               elog(ERROR, "VACUUM (vacuum_heap): FlushRelationBuffers returned %d",
-                        i);
+               elog(ERROR, "FlushRelationBuffers returned %d", i);
 
        /* truncate relation if there are some empty end-pages */
        if (vacuum_pages->empty_end_pages > 0)
        {
-               elog(MESSAGE_LEVEL, "Rel %s: Pages: %lu --> %lu.",
-                        RelationGetRelationName(onerel),
-                        vacrelstats->num_pages, nblocks);
-               nblocks = smgrtruncate(DEFAULT_SMGR, onerel, nblocks);
-               Assert(nblocks >= 0);
-               vacrelstats->num_pages = nblocks;               /* set new number of
+               ereport(elevel,
+                               (errmsg("\"%s\": truncated %u to %u pages",
+                                               RelationGetRelationName(onerel),
+                                               vacrelstats->rel_pages, relblocks)));
+               relblocks = smgrtruncate(DEFAULT_SMGR, onerel, relblocks);
+               onerel->rd_nblocks = relblocks; /* update relcache immediately */
+               onerel->rd_targblock = InvalidBlockNumber;
+               vacrelstats->rel_pages = relblocks;             /* set new number of
                                                                                                 * blocks */
        }
 }
@@ -2103,8 +2565,7 @@ vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
 static void
 vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
 {
-       OffsetNumber unbuf[BLCKSZ/sizeof(OffsetNumber)];
-       OffsetNumber *unused = unbuf;
+       OffsetNumber unused[BLCKSZ / sizeof(OffsetNumber)];
        int                     uncnt;
        Page            page = BufferGetPage(buffer);
        ItemId          itemid;
@@ -2114,64 +2575,95 @@ vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
        Assert(vacpage->offsets_used == 0);
 
        START_CRIT_SECTION();
+
        for (i = 0; i < vacpage->offsets_free; i++)
        {
-               itemid = &(((PageHeader) page)->pd_linp[vacpage->offsets[i] - 1]);
+               itemid = PageGetItemId(page, vacpage->offsets[i]);
                itemid->lp_flags &= ~LP_USED;
        }
+
        uncnt = PageRepairFragmentation(page, unused);
+
+       /* XLOG stuff */
+       if (!onerel->rd_istemp)
        {
                XLogRecPtr      recptr;
 
-               recptr = log_heap_clean(onerel, buffer, (char *) unused,
-                                                 (char *) (&(unused[uncnt])) - (char *) unused);
+               recptr = log_heap_clean(onerel, buffer, unused, uncnt);
                PageSetLSN(page, recptr);
                PageSetSUI(page, ThisStartUpID);
        }
+       else
+       {
+               /* No XLOG record, but still need to flag that XID exists on disk */
+               MyXactMadeTempRelUpdate = true;
+       }
+
        END_CRIT_SECTION();
 }
 
 /*
- *     _scan_index() -- scan one index relation to update statistic.
+ *     scan_index() -- scan one index relation to update statistic.
  *
+ * We use this when we have no deletions to do.
  */
 static void
-scan_index(Relation indrel, long num_tuples)
+scan_index(Relation indrel, double num_tuples)
 {
-       RetrieveIndexResult res;
-       IndexScanDesc iscan;
-       long            nitups;
-       int                     nipages;
+       IndexBulkDeleteResult *stats;
+       IndexVacuumCleanupInfo vcinfo;
        VacRUsage       ru0;
 
-       init_rusage(&ru0);
+       vac_init_rusage(&ru0);
 
-       /* walk through the entire index */
-       iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
-       nitups = 0;
+       /*
+        * Even though we're not planning to delete anything, we use the
+        * ambulkdelete call, because (a) the scan happens within the index AM
+        * for more speed, and (b) it may want to pass private statistics to
+        * the amvacuumcleanup call.
+        */
+       stats = index_bulk_delete(indrel, dummy_tid_reaped, NULL);
 
-       while ((res = index_getnext(iscan, ForwardScanDirection))
-                  != (RetrieveIndexResult) NULL)
-       {
-               nitups++;
-               pfree(res);
-       }
+       /* Do post-VACUUM cleanup, even though we deleted nothing */
+       vcinfo.vacuum_full = true;
+       vcinfo.message_level = elevel;
 
-       index_endscan(iscan);
+       stats = index_vacuum_cleanup(indrel, &vcinfo, stats);
 
-       /* now update statistics in pg_class */
-       nipages = RelationGetNumberOfBlocks(indrel);
-       vac_update_relstats(RelationGetRelid(indrel), nipages, nitups, false);
+       if (!stats)
+               return;
 
-       elog(MESSAGE_LEVEL, "Index %s: Pages %u; Tuples %lu. %s",
-                RelationGetRelationName(indrel), nipages, nitups,
-                show_rusage(&ru0));
+       /* now update statistics in pg_class */
+       vac_update_relstats(RelationGetRelid(indrel),
+                                               stats->num_pages, stats->num_index_tuples,
+                                               false);
+
+       ereport(elevel,
+                       (errmsg("index \"%s\" now contains %.0f tuples in %u pages",
+                                       RelationGetRelationName(indrel),
+                                       stats->num_index_tuples,
+                                       stats->num_pages),
+                        errdetail("%u index pages have been deleted, %u are currently reusable.\n"
+                                          "%s",
+                                          stats->pages_deleted, stats->pages_free,
+                                          vac_show_rusage(&ru0))));
 
-       if (nitups != num_tuples)
-               elog(NOTICE, "Index %s: NUMBER OF INDEX' TUPLES (%lu) IS NOT THE SAME AS HEAP' (%lu).\
-\n\tRecreate the index.",
-                        RelationGetRelationName(indrel), nitups, num_tuples);
+       /*
+        * Check for tuple count mismatch.      If the index is partial, then it's
+        * OK for it to have fewer tuples than the heap; else we got trouble.
+        */
+       if (stats->num_index_tuples != num_tuples)
+       {
+               if (stats->num_index_tuples > num_tuples ||
+                       !vac_is_partial_index(indrel))
+                       ereport(WARNING,
+                                       (errmsg("index \"%s\" contains %.0f tuples, but table contains %.0f tuples",
+                                                       RelationGetRelationName(indrel),
+                                                       stats->num_index_tuples, num_tuples),
+                                        errhint("Rebuild the index with REINDEX.")));
+       }
 
+       pfree(stats);
 }
 
 /*
@@ -2179,89 +2671,81 @@ scan_index(Relation indrel, long num_tuples)
  *
  *             Vpl is the VacPageList of the heap we're currently vacuuming.
  *             It's locked. Indrel is an index relation on the vacuumed heap.
- *             We don't set locks on the index relation here, since the indexed
- *             access methods support locking at different granularities.
- *             We let them handle it.
+ *
+ *             We don't bother to set locks on the index relation here, since
+ *             the parent table is exclusive-locked already.
  *
  *             Finally, we arrange to update the index relation's statistics in
  *             pg_class.
  */
 static void
 vacuum_index(VacPageList vacpagelist, Relation indrel,
-                        long num_tuples, int keep_tuples)
+                        double num_tuples, int keep_tuples)
 {
-       RetrieveIndexResult res;
-       IndexScanDesc iscan;
-       ItemPointer heapptr;
-       int                     tups_vacuumed;
-       long            num_index_tuples;
-       int                     num_pages;
-       VacPage         vp;
+       IndexBulkDeleteResult *stats;
+       IndexVacuumCleanupInfo vcinfo;
        VacRUsage       ru0;
 
-       init_rusage(&ru0);
+       vac_init_rusage(&ru0);
 
-       /* walk through the entire index */
-       iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
-       tups_vacuumed = 0;
-       num_index_tuples = 0;
+       /* Do bulk deletion */
+       stats = index_bulk_delete(indrel, tid_reaped, (void *) vacpagelist);
 
-       while ((res = index_getnext(iscan, ForwardScanDirection))
-                  != (RetrieveIndexResult) NULL)
-       {
-               heapptr = &res->heap_iptr;
-
-               if ((vp = tid_reaped(heapptr, vacpagelist)) != (VacPage) NULL)
-               {
-#ifdef NOT_USED
-                       elog(DEBUG, "<%x,%x> -> <%x,%x>",
-                                ItemPointerGetBlockNumber(&(res->index_iptr)),
-                                ItemPointerGetOffsetNumber(&(res->index_iptr)),
-                                ItemPointerGetBlockNumber(&(res->heap_iptr)),
-                                ItemPointerGetOffsetNumber(&(res->heap_iptr)));
-#endif
-                       if (vp->offsets_free == 0)
-                       {
-                               elog(NOTICE, "Index %s: pointer to EmptyPage (blk %u off %u) - fixing",
-                                        RelationGetRelationName(indrel),
-                                        vp->blkno, ItemPointerGetOffsetNumber(heapptr));
-                       }
-                       ++tups_vacuumed;
-                       index_delete(indrel, &res->index_iptr);
-               }
-               else
-                       num_index_tuples++;
+       /* Do post-VACUUM cleanup */
+       vcinfo.vacuum_full = true;
+       vcinfo.message_level = elevel;
 
-               pfree(res);
-       }
+       stats = index_vacuum_cleanup(indrel, &vcinfo, stats);
 
-       index_endscan(iscan);
+       if (!stats)
+               return;
 
        /* now update statistics in pg_class */
-       num_pages = RelationGetNumberOfBlocks(indrel);
        vac_update_relstats(RelationGetRelid(indrel),
-                                               num_pages, num_index_tuples, false);
-
-       elog(MESSAGE_LEVEL, "Index %s: Pages %u; Tuples %lu: Deleted %u. %s",
-                RelationGetRelationName(indrel), num_pages,
-                num_index_tuples - keep_tuples, tups_vacuumed,
-                show_rusage(&ru0));
+                                               stats->num_pages, stats->num_index_tuples,
+                                               false);
+
+       ereport(elevel,
+                       (errmsg("index \"%s\" now contains %.0f tuples in %u pages",
+                                       RelationGetRelationName(indrel),
+                                       stats->num_index_tuples,
+                                       stats->num_pages),
+                        errdetail("%.0f index tuples were removed.\n"
+                "%u index pages have been deleted, %u are currently reusable.\n"
+                                          "%s",
+                                          stats->tuples_removed,
+                                          stats->pages_deleted, stats->pages_free,
+                                          vac_show_rusage(&ru0))));
 
-       if (num_index_tuples != num_tuples + keep_tuples)
-               elog(NOTICE, "Index %s: NUMBER OF INDEX' TUPLES (%lu) IS NOT THE SAME AS HEAP' (%lu).\
-\n\tRecreate the index.",
-                 RelationGetRelationName(indrel), num_index_tuples, num_tuples);
+       /*
+        * Check for tuple count mismatch.      If the index is partial, then it's
+        * OK for it to have fewer tuples than the heap; else we got trouble.
+        */
+       if (stats->num_index_tuples != num_tuples + keep_tuples)
+       {
+               if (stats->num_index_tuples > num_tuples + keep_tuples ||
+                       !vac_is_partial_index(indrel))
+                       ereport(WARNING,
+                                       (errmsg("index \"%s\" contains %.0f tuples, but table contains %.0f tuples",
+                                                       RelationGetRelationName(indrel),
+                                         stats->num_index_tuples, num_tuples + keep_tuples),
+                                        errhint("Rebuild the index with REINDEX.")));
+       }
 
+       pfree(stats);
 }
 
 /*
  *     tid_reaped() -- is a particular tid reaped?
  *
+ *             This has the right signature to be an IndexBulkDeleteCallback.
+ *
  *             vacpagelist->VacPage_array is sorted in right order.
  */
-static VacPage
-tid_reaped(ItemPointer itemptr, VacPageList vacpagelist)
+static bool
+tid_reaped(ItemPointer itemptr, void *state)
 {
+       VacPageList vacpagelist = (VacPageList) state;
        OffsetNumber ioffno;
        OffsetNumber *voff;
        VacPage         vp,
@@ -2278,8 +2762,8 @@ tid_reaped(ItemPointer itemptr, VacPageList vacpagelist)
                                                                  sizeof(VacPage),
                                                                  vac_cmp_blk);
 
-       if (vpp == (VacPage *) NULL)
-               return (VacPage) NULL;
+       if (vpp == NULL)
+               return false;
 
        /* ok - we are on a partially or fully reaped page */
        vp = *vpp;
@@ -2287,7 +2771,7 @@ tid_reaped(ItemPointer itemptr, VacPageList vacpagelist)
        if (vp->offsets_free == 0)
        {
                /* this is EmptyPage, so claim all tuples on it are reaped!!! */
-               return vp;
+               return true;
        }
 
        voff = (OffsetNumber *) vac_bsearch((void *) &ioffno,
@@ -2296,98 +2780,107 @@ tid_reaped(ItemPointer itemptr, VacPageList vacpagelist)
                                                                                sizeof(OffsetNumber),
                                                                                vac_cmp_offno);
 
-       if (voff == (OffsetNumber *) NULL)
-               return (VacPage) NULL;
+       if (voff == NULL)
+               return false;
 
        /* tid is reaped */
-       return vp;
+       return true;
 }
 
 /*
- *     vac_update_relstats() -- update statistics for one relation
- *
- *             Update the whole-relation statistics that are kept in its pg_class
- *             row.  There are additional stats that will be updated if we are
- *             doing VACUUM ANALYZE, but we always update these stats.
- *
- *             This routine works for both index and heap relation entries in
- *             pg_class.  We violate no-overwrite semantics here by storing new
- *             values for the statistics columns directly into the pg_class
- *             tuple that's already on the page.  The reason for this is that if
- *             we updated these tuples in the usual way, vacuuming pg_class itself
- *             wouldn't work very well --- by the time we got done with a vacuum
- *             cycle, most of the tuples in pg_class would've been obsoleted.
- *             Of course, this only works for fixed-size never-null columns, but
- *             these are.
+ * Dummy version for scan_index.
  */
-void
-vac_update_relstats(Oid relid, long num_pages, double num_tuples,
-                                       bool hasindex)
+static bool
+dummy_tid_reaped(ItemPointer itemptr, void *state)
 {
-       Relation        rd;
-       HeapTupleData rtup;
-       HeapTuple       ctup;
-       Form_pg_class pgcform;
-       Buffer          buffer;
+       return false;
+}
+
+/*
+ * Update the shared Free Space Map with the info we now have about
+ * free space in the relation, discarding any old info the map may have.
+ */
+static void
+vac_update_fsm(Relation onerel, VacPageList fraged_pages,
+                          BlockNumber rel_pages)
+{
+       int                     nPages = fraged_pages->num_pages;
+       VacPage    *pagedesc = fraged_pages->pagedesc;
+       Size            threshold;
+       PageFreeSpaceInfo *pageSpaces;
+       int                     outPages;
+       int                     i;
 
        /*
-        * update number of tuples and number of pages in pg_class
+        * We only report pages with free space at least equal to the average
+        * request size --- this avoids cluttering FSM with uselessly-small
+        * bits of space.  Although FSM would discard pages with little free
+        * space anyway, it's important to do this prefiltering because (a) it
+        * reduces the time spent holding the FSM lock in
+        * RecordRelationFreeSpace, and (b) FSM uses the number of pages
+        * reported as a statistic for guiding space management.  If we didn't
+        * threshold our reports the same way vacuumlazy.c does, we'd be
+        * skewing that statistic.
         */
-       rd = heap_openr(RelationRelationName, RowExclusiveLock);
+       threshold = GetAvgFSMRequestSize(&onerel->rd_node);
 
-       ctup = SearchSysCache(RELOID,
-                                                 ObjectIdGetDatum(relid),
-                                                 0, 0, 0);
-       if (!HeapTupleIsValid(ctup))
-               elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
-                        relid);
+       /* +1 to avoid palloc(0) */
+       pageSpaces = (PageFreeSpaceInfo *)
+               palloc((nPages + 1) * sizeof(PageFreeSpaceInfo));
+       outPages = 0;
 
-       /* get the buffer cache tuple */
-       rtup.t_self = ctup->t_self;
-       ReleaseSysCache(ctup);
-       heap_fetch(rd, SnapshotNow, &rtup, &buffer);
+       for (i = 0; i < nPages; i++)
+       {
+               /*
+                * fraged_pages may contain entries for pages that we later
+                * decided to truncate from the relation; don't enter them into
+                * the free space map!
+                */
+               if (pagedesc[i]->blkno >= rel_pages)
+                       break;
 
-       /* overwrite the existing statistics in the tuple */
-       pgcform = (Form_pg_class) GETSTRUCT(&rtup);
-       pgcform->reltuples = num_tuples;
-       pgcform->relpages = num_pages;
-       pgcform->relhasindex = hasindex;
+               if (pagedesc[i]->free >= threshold)
+               {
+                       pageSpaces[outPages].blkno = pagedesc[i]->blkno;
+                       pageSpaces[outPages].avail = pagedesc[i]->free;
+                       outPages++;
+               }
+       }
 
-       /* invalidate the tuple in the cache and write the buffer */
-       RelationInvalidateHeapTuple(rd, &rtup);
-       WriteBuffer(buffer);
+       RecordRelationFreeSpace(&onerel->rd_node, outPages, pageSpaces);
 
-       heap_close(rd, RowExclusiveLock);
+       pfree(pageSpaces);
 }
 
-/*
- *     reap_page() -- save a page on the array of reaped pages.
- *
- *             As a side effect of the way that the vacuuming loop for a given
- *             relation works, higher pages come after lower pages in the array
- *             (and highest tid on a page is last).
- */
-static void
-reap_page(VacPageList vacpagelist, VacPage vacpage)
+/* Copy a VacPage structure */
+static VacPage
+copy_vac_page(VacPage vacpage)
 {
        VacPage         newvacpage;
 
        /* allocate a VacPageData entry */
-       newvacpage = (VacPage) palloc(sizeof(VacPageData) + vacpage->offsets_free * sizeof(OffsetNumber));
+       newvacpage = (VacPage) palloc(sizeof(VacPageData) +
+                                                  vacpage->offsets_free * sizeof(OffsetNumber));
 
        /* fill it in */
        if (vacpage->offsets_free > 0)
-               memmove(newvacpage->offsets, vacpage->offsets, vacpage->offsets_free * sizeof(OffsetNumber));
+               memcpy(newvacpage->offsets, vacpage->offsets,
+                          vacpage->offsets_free * sizeof(OffsetNumber));
        newvacpage->blkno = vacpage->blkno;
        newvacpage->free = vacpage->free;
        newvacpage->offsets_used = vacpage->offsets_used;
        newvacpage->offsets_free = vacpage->offsets_free;
 
-       /* insert this page into vacpagelist list */
-       vpage_insert(vacpagelist, newvacpage);
-
+       return newvacpage;
 }
 
+/*
+ * Add a VacPage pointer to a VacPageList.
+ *
+ *             As a side effect of the way that scan_heap works,
+ *             higher pages come after lower pages in the array
+ *             (and highest tid on a page is last).
+ */
 static void
 vpage_insert(VacPageList vacpagelist, VacPage vpnew)
 {
@@ -2411,7 +2904,7 @@ vpage_insert(VacPageList vacpagelist, VacPage vpnew)
 /*
  * vac_bsearch: just like standard C library routine bsearch(),
  * except that we first test to see whether the target key is outside
- * the range of the table entries.  This case is handled relatively slowly
+ * the range of the table entries.     This case is handled relatively slowly
  * by the normal binary search algorithm (ie, no faster than any other key)
  * but it occurs often enough in VACUUM to be worth optimizing.
  */
@@ -2500,8 +2993,8 @@ vac_cmp_vtlinks(const void *left, const void *right)
 }
 
 
-static void
-get_indices(Relation relation, int *nindices, Relation **Irel)
+void
+vac_open_indexes(Relation relation, int *nindexes, Relation **Irel)
 {
        List       *indexoidlist,
                           *indexoidscan;
@@ -2509,17 +3002,17 @@ get_indices(Relation relation, int *nindices, Relation **Irel)
 
        indexoidlist = RelationGetIndexList(relation);
 
-       *nindices = length(indexoidlist);
+       *nindexes = length(indexoidlist);
 
-       if (*nindices > 0)
-               *Irel = (Relation *) palloc(*nindices * sizeof(Relation));
+       if (*nindexes > 0)
+               *Irel = (Relation *) palloc(*nindexes * sizeof(Relation));
        else
                *Irel = NULL;
 
        i = 0;
        foreach(indexoidscan, indexoidlist)
        {
-               Oid                     indexoid = lfirsti(indexoidscan);
+               Oid                     indexoid = lfirsto(indexoidscan);
 
                (*Irel)[i] = index_open(indexoid);
                i++;
@@ -2529,75 +3022,64 @@ get_indices(Relation relation, int *nindices, Relation **Irel)
 }
 
 
-static void
-close_indices(int nindices, Relation *Irel)
+void
+vac_close_indexes(int nindexes, Relation *Irel)
 {
-
        if (Irel == (Relation *) NULL)
                return;
 
-       while (nindices--)
-               index_close(Irel[nindices]);
+       while (nindexes--)
+               index_close(Irel[nindexes]);
        pfree(Irel);
-
 }
 
 
 /*
- * Obtain IndexInfo data for each index on the rel
+ * Is an index partial (ie, could it contain fewer tuples than the heap?)
  */
-static IndexInfo **
-get_index_desc(Relation onerel, int nindices, Relation *Irel)
+bool
+vac_is_partial_index(Relation indrel)
 {
-       IndexInfo **indexInfo;
-       int                     i;
-       HeapTuple       cachetuple;
-
-       indexInfo = (IndexInfo **) palloc(nindices * sizeof(IndexInfo *));
+       /*
+        * If the index's AM doesn't support nulls, it's partial for our
+        * purposes
+        */
+       if (!indrel->rd_am->amindexnulls)
+               return true;
 
-       for (i = 0; i < nindices; i++)
-       {
-               cachetuple = SearchSysCache(INDEXRELID,
-                                                        ObjectIdGetDatum(RelationGetRelid(Irel[i])),
-                                                                       0, 0, 0);
-               if (!HeapTupleIsValid(cachetuple))
-                       elog(ERROR, "get_index_desc: index %u not found",
-                                RelationGetRelid(Irel[i]));
-               indexInfo[i] = BuildIndexInfo(cachetuple);
-               ReleaseSysCache(cachetuple);
-       }
+       /* Otherwise, look to see if there's a partial-index predicate */
+       if (!heap_attisnull(indrel->rd_indextuple, Anum_pg_index_indpred))
+               return true;
 
-       return indexInfo;
+       return false;
 }
 
 
 static bool
 enough_space(VacPage vacpage, Size len)
 {
-
        len = MAXALIGN(len);
 
        if (len > vacpage->free)
                return false;
 
-       if (vacpage->offsets_used < vacpage->offsets_free)      /* there are free
-                                                                                                                * itemid(s) */
-               return true;                    /* and len <= free_space */
+       /* if there are free itemid(s) and len <= free_space... */
+       if (vacpage->offsets_used < vacpage->offsets_free)
+               return true;
 
-       /* ok. noff_usd >= noff_free and so we'll have to allocate new itemid */
-       if (len + MAXALIGN(sizeof(ItemIdData)) <= vacpage->free)
+       /* noff_used >= noff_free and so we'll have to allocate new itemid */
+       if (len + sizeof(ItemIdData) <= vacpage->free)
                return true;
 
        return false;
-
 }
 
 
 /*
  * Initialize usage snapshot.
  */
-static void
-init_rusage(VacRUsage *ru0)
+void
+vac_init_rusage(VacRUsage *ru0)
 {
        struct timezone tz;
 
@@ -2611,13 +3093,13 @@ init_rusage(VacRUsage *ru0)
  * tacky, but no one ever claimed that the Postgres backend is
  * threadable...
  */
-static char *
-show_rusage(VacRUsage *ru0)
+const char *
+vac_show_rusage(VacRUsage *ru0)
 {
        static char result[100];
        VacRUsage       ru1;
 
-       init_rusage(&ru1);
+       vac_init_rusage(&ru1);
 
        if (ru1.tv.tv_usec < ru0->tv.tv_usec)
        {
@@ -2638,9 +3120,9 @@ show_rusage(VacRUsage *ru0)
        snprintf(result, sizeof(result),
                         "CPU %d.%02ds/%d.%02du sec elapsed %d.%02d sec.",
                         (int) (ru1.ru.ru_stime.tv_sec - ru0->ru.ru_stime.tv_sec),
-                        (int) (ru1.ru.ru_stime.tv_usec - ru0->ru.ru_stime.tv_usec) / 10000,
+         (int) (ru1.ru.ru_stime.tv_usec - ru0->ru.ru_stime.tv_usec) / 10000,
                         (int) (ru1.ru.ru_utime.tv_sec - ru0->ru.ru_utime.tv_sec),
-                        (int) (ru1.ru.ru_utime.tv_usec - ru0->ru.ru_utime.tv_usec) / 10000,
+         (int) (ru1.ru.ru_utime.tv_usec - ru0->ru.ru_utime.tv_usec) / 10000,
                         (int) (ru1.tv.tv_sec - ru0->tv.tv_sec),
                         (int) (ru1.tv.tv_usec - ru0->tv.tv_usec) / 10000);