]> granicus.if.org Git - postgresql/blobdiff - src/backend/commands/vacuum.c
Repair some REINDEX problems per recent discussions. The relcache is
[postgresql] / src / backend / commands / vacuum.c
index fe72ff96020f3d0e1c5c94e32d0007d3dab6aa34..e626848f12b6bf237d19fdbeef06b4cf35696c2b 100644 (file)
@@ -4,16 +4,16 @@
  *       The postgres vacuum cleaner.
  *
  * This file includes the "full" version of VACUUM, as well as control code
- * used by all three of full VACUUM, lazy VACUUM, and ANALYZE.  See
+ * used by all three of full VACUUM, lazy VACUUM, and ANALYZE. See
  * vacuumlazy.c and analyze.c for the rest of the code for the latter two.
  *
  *
- * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.208 2001/08/26 16:55:59 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.260 2003/09/24 18:54:01 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -27,6 +27,7 @@
 #include "access/xlog.h"
 #include "catalog/catalog.h"
 #include "catalog/catname.h"
+#include "catalog/namespace.h"
 #include "catalog/pg_database.h"
 #include "catalog/pg_index.h"
 #include "commands/vacuum.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
 #include "utils/inval.h"
+#include "utils/lsyscache.h"
 #include "utils/relcache.h"
 #include "utils/syscache.h"
-#include "utils/temprel.h"
-
 #include "pgstat.h"
 
 
-typedef struct VRelListData
-{
-       Oid                     vrl_relid;
-       struct VRelListData *vrl_next;
-} VRelListData;
-
-typedef VRelListData *VRelList;
-
 typedef struct VacPageData
 {
        BlockNumber blkno;                      /* BlockNumber of this Page */
@@ -68,11 +60,11 @@ typedef VacPageData *VacPage;
 
 typedef struct VacPageListData
 {
-       BlockNumber     empty_end_pages;        /* Number of "empty" end-pages */
-       int                     num_pages;                      /* Number of pages in pagedesc */
+       BlockNumber empty_end_pages;    /* Number of "empty" end-pages */
+       int                     num_pages;              /* Number of pages in pagedesc */
        int                     num_allocated_pages;    /* Number of allocated pages in
                                                                                 * pagedesc */
-       VacPage    *pagedesc;                   /* Descriptions of pages */
+       VacPage    *pagedesc;           /* Descriptions of pages */
 } VacPageListData;
 
 typedef VacPageListData *VacPageList;
@@ -96,7 +88,7 @@ typedef VTupleMoveData *VTupleMove;
 
 typedef struct VRelStats
 {
-       BlockNumber     rel_pages;
+       BlockNumber rel_pages;
        double          rel_tuples;
        Size            min_tlen;
        Size            max_tlen;
@@ -108,46 +100,41 @@ typedef struct VRelStats
 
 static MemoryContext vac_context = NULL;
 
-static int     MESSAGE_LEVEL;          /* message level */
+static int     elevel = -1;
 
 static TransactionId OldestXmin;
 static TransactionId FreezeLimit;
 
-static TransactionId initialOldestXmin;
-static TransactionId initialFreezeLimit;
-
 
 /* non-export function prototypes */
-static void vacuum_init(VacuumStmt *vacstmt);
-static void vacuum_shutdown(VacuumStmt *vacstmt);
-static VRelList getrels(Name VacRelP, const char *stmttype);
+static List *getrels(const RangeVar *vacrel, const char *stmttype);
 static void vac_update_dbstats(Oid dbid,
-                                                          TransactionId vacuumXID,
-                                                          TransactionId frozenXID);
+                                  TransactionId vacuumXID,
+                                  TransactionId frozenXID);
 static void vac_truncate_clog(TransactionId vacuumXID,
-                                                         TransactionId frozenXID);
-static void vacuum_rel(Oid relid, VacuumStmt *vacstmt);
+                                 TransactionId frozenXID);
+static bool vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind);
 static void full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
 static void scan_heap(VRelStats *vacrelstats, Relation onerel,
-                                         VacPageList vacuum_pages, VacPageList fraged_pages);
+                 VacPageList vacuum_pages, VacPageList fraged_pages);
 static void repair_frag(VRelStats *vacrelstats, Relation onerel,
-                                               VacPageList vacuum_pages, VacPageList fraged_pages,
-                                               int nindexes, Relation *Irel);
+                       VacPageList vacuum_pages, VacPageList fraged_pages,
+                       int nindexes, Relation *Irel);
 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
-                                               VacPageList vacpagelist);
+                       VacPageList vacpagelist);
 static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
 static void vacuum_index(VacPageList vacpagelist, Relation indrel,
-                                                double num_tuples, int keep_tuples);
+                        double num_tuples, int keep_tuples);
 static void scan_index(Relation indrel, double num_tuples);
 static bool tid_reaped(ItemPointer itemptr, void *state);
 static bool dummy_tid_reaped(ItemPointer itemptr, void *state);
 static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
-                                                  BlockNumber rel_pages);
+                          BlockNumber rel_pages);
 static VacPage copy_vac_page(VacPage vacpage);
 static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
 static void *vac_bsearch(const void *key, const void *base,
-                                                size_t nelem, size_t size,
-                                                int (*compar) (const void *, const void *));
+                       size_t nelem, size_t size,
+                       int (*compar) (const void *, const void *));
 static int     vac_cmp_blk(const void *left, const void *right);
 static int     vac_cmp_offno(const void *left, const void *right);
 static int     vac_cmp_vtlinks(const void *left, const void *right);
@@ -169,10 +156,17 @@ void
 vacuum(VacuumStmt *vacstmt)
 {
        const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
-       NameData        VacRel;
-       Name            VacRelName;
-       VRelList        vrl,
-                               cur;
+       MemoryContext anl_context = NULL;
+       TransactionId initialOldestXmin = InvalidTransactionId;
+       TransactionId initialFreezeLimit = InvalidTransactionId;
+       bool            all_rels;
+       List       *vrl,
+                          *cur;
+
+       if (vacstmt->verbose)
+               elevel = INFO;
+       else
+               elevel = DEBUG2;
 
        /*
         * We cannot run VACUUM inside a user transaction block; if we were
@@ -183,150 +177,183 @@ vacuum(VacuumStmt *vacstmt)
         * user's transaction too, which would certainly not be the desired
         * behavior.
         */
-       if (IsTransactionBlock())
-               elog(ERROR, "%s cannot run inside a BEGIN/END block", stmttype);
+       if (vacstmt->vacuum)
+               PreventTransactionChain((void *) vacstmt, stmttype);
 
        /*
         * Send info about dead objects to the statistics collector
         */
-       pgstat_vacuum_tabstat();
-
-       if (vacstmt->verbose)
-               MESSAGE_LEVEL = NOTICE;
-       else
-               MESSAGE_LEVEL = DEBUG;
+       if (vacstmt->vacuum)
+               pgstat_vacuum_tabstat();
 
        /*
         * Create special memory context for cross-transaction storage.
         *
-        * Since it is a child of QueryContext, it will go away eventually even
+        * Since it is a child of PortalContext, it will go away eventually even
         * if we suffer an error; there's no need for special abort cleanup
         * logic.
         */
-       vac_context = AllocSetContextCreate(QueryContext,
+       vac_context = AllocSetContextCreate(PortalContext,
                                                                                "Vacuum",
                                                                                ALLOCSET_DEFAULT_MINSIZE,
                                                                                ALLOCSET_DEFAULT_INITSIZE,
                                                                                ALLOCSET_DEFAULT_MAXSIZE);
 
-       /* Convert vacrel, which is just a string, to a Name */
-       if (vacstmt->vacrel)
-       {
-               namestrcpy(&VacRel, vacstmt->vacrel);
-               VacRelName = &VacRel;
-       }
-       else
-               VacRelName = NULL;
+       /*
+        * If we are running only ANALYZE, we don't need per-table
+        * transactions, but we still need a memory context with table
+        * lifetime.
+        */
+       if (vacstmt->analyze && !vacstmt->vacuum)
+               anl_context = AllocSetContextCreate(PortalContext,
+                                                                                       "Analyze",
+                                                                                       ALLOCSET_DEFAULT_MINSIZE,
+                                                                                       ALLOCSET_DEFAULT_INITSIZE,
+                                                                                       ALLOCSET_DEFAULT_MAXSIZE);
+
+       /* Assume we are processing everything unless one table is mentioned */
+       all_rels = (vacstmt->relation == NULL);
 
        /* Build list of relations to process (note this lives in vac_context) */
-       vrl = getrels(VacRelName, stmttype);
+       vrl = getrels(vacstmt->relation, stmttype);
 
        /*
-        * Start up the vacuum cleaner.
+        * Formerly, there was code here to prevent more than one VACUUM from
+        * executing concurrently in the same database.  However, there's no
+        * good reason to prevent that, and manually removing lockfiles after
+        * a vacuum crash was a pain for dbadmins.      So, forget about
+        * lockfiles, and just rely on the locks we grab on each target table
+        * to ensure that there aren't two VACUUMs running on the same table
+        * at the same time.
         */
-       vacuum_init(vacstmt);
 
        /*
-        * Process each selected relation.  We are careful to process
-        * each relation in a separate transaction in order to avoid holding
-        * too many locks at one time.  Also, if we are doing VACUUM ANALYZE,
-        * the ANALYZE part runs as a separate transaction from the VACUUM
-        * to further reduce locking.
+        * The strangeness with committing and starting transactions here is
+        * due to wanting to run each table's VACUUM as a separate
+        * transaction, so that we don't hold locks unnecessarily long.  Also,
+        * if we are doing VACUUM ANALYZE, the ANALYZE part runs as a separate
+        * transaction from the VACUUM to further reduce locking.
+        *
+        * vacuum_rel expects to be entered with no transaction active; it will
+        * start and commit its own transaction.  But we are called by an SQL
+        * command, and so we are executing inside a transaction already.  We
+        * commit the transaction started in PostgresMain() here, and start
+        * another one before exiting to match the commit waiting for us back
+        * in PostgresMain().
+        *
+        * In the case of an ANALYZE statement (no vacuum, just analyze) it's
+        * okay to run the whole thing in the outer transaction, and so we
+        * skip transaction start/stop operations.
         */
-       for (cur = vrl; cur != (VRelList) NULL; cur = cur->vrl_next)
+       if (vacstmt->vacuum)
        {
-               if (vacstmt->vacuum)
-                       vacuum_rel(cur->vrl_relid, vacstmt);
-               if (vacstmt->analyze)
-                       analyze_rel(cur->vrl_relid, vacstmt);
-       }
+               if (all_rels)
+               {
+                       /*
+                        * It's a database-wide VACUUM.
+                        *
+                        * Compute the initially applicable OldestXmin and FreezeLimit
+                        * XIDs, so that we can record these values at the end of the
+                        * VACUUM. Note that individual tables may well be processed
+                        * with newer values, but we can guarantee that no
+                        * (non-shared) relations are processed with older ones.
+                        *
+                        * It is okay to record non-shared values in pg_database, even
+                        * though we may vacuum shared relations with older cutoffs,
+                        * because only the minimum of the values present in
+                        * pg_database matters.  We can be sure that shared relations
+                        * have at some time been vacuumed with cutoffs no worse than
+                        * the global minimum; for, if there is a backend in some
+                        * other DB with xmin = OLDXMIN that's determining the cutoff
+                        * with which we vacuum shared relations, it is not possible
+                        * for that database to have a cutoff newer than OLDXMIN
+                        * recorded in pg_database.
+                        */
+                       vacuum_set_xid_limits(vacstmt, false,
+                                                                 &initialOldestXmin,
+                                                                 &initialFreezeLimit);
+               }
 
-       /* clean up */
-       vacuum_shutdown(vacstmt);
-}
+               /* matches the StartTransaction in PostgresMain() */
+               CommitTransactionCommand();
+       }
 
-/*
- *     vacuum_init(), vacuum_shutdown() -- start up and shut down the vacuum cleaner.
- *
- *             Formerly, there was code here to prevent more than one VACUUM from
- *             executing concurrently in the same database.  However, there's no
- *             good reason to prevent that, and manually removing lockfiles after
- *             a vacuum crash was a pain for dbadmins.  So, forget about lockfiles,
- *             and just rely on the locks we grab on each target table
- *             to ensure that there aren't two VACUUMs running on the same table
- *             at the same time.
- *
- *             The strangeness with committing and starting transactions in the
- *             init and shutdown routines is due to the fact that the vacuum cleaner
- *             is invoked via an SQL command, and so is already executing inside
- *             a transaction.  We need to leave ourselves in a predictable state
- *             on entry and exit to the vacuum cleaner.  We commit the transaction
- *             started in PostgresMain() inside vacuum_init(), and start one in
- *             vacuum_shutdown() to match the commit waiting for us back in
- *             PostgresMain().
- */
-static void
-vacuum_init(VacuumStmt *vacstmt)
-{
-       if (vacstmt->vacuum && vacstmt->vacrel == NULL)
+       /*
+        * Loop to process each selected relation.
+        */
+       foreach(cur, vrl)
        {
-               /*
-                * Compute the initially applicable OldestXmin and FreezeLimit XIDs,
-                * so that we can record these values at the end of the VACUUM.
-                * Note that individual tables may well be processed with newer values,
-                * but we can guarantee that no (non-shared) relations are processed
-                * with older ones.
-                *
-                * It is okay to record non-shared values in pg_database, even though
-                * we may vacuum shared relations with older cutoffs, because only
-                * the minimum of the values present in pg_database matters.  We
-                * can be sure that shared relations have at some time been vacuumed
-                * with cutoffs no worse than the global minimum; for, if there is
-                * a backend in some other DB with xmin = OLDXMIN that's determining
-                * the cutoff with which we vacuum shared relations, it is not possible
-                * for that database to have a cutoff newer than OLDXMIN recorded in
-                * pg_database.
-                */
-               vacuum_set_xid_limits(vacstmt, false,
-                                                         &initialOldestXmin, &initialFreezeLimit);
-       }
+               Oid                     relid = lfirsto(cur);
 
-       /* matches the StartTransaction in PostgresMain() */
-       CommitTransactionCommand();
-}
+               if (vacstmt->vacuum)
+               {
+                       if (!vacuum_rel(relid, vacstmt, RELKIND_RELATION))
+                               all_rels = false;               /* forget about updating dbstats */
+               }
+               if (vacstmt->analyze)
+               {
+                       MemoryContext old_context = NULL;
 
-static void
-vacuum_shutdown(VacuumStmt *vacstmt)
-{
-       /* on entry, we are not in a transaction */
+                       /*
+                        * If we vacuumed, use new transaction for analyze. Otherwise,
+                        * we can use the outer transaction, but we still need to call
+                        * analyze_rel in a memory context that will be cleaned up on
+                        * return (else we leak memory while processing multiple
+                        * tables).
+                        */
+                       if (vacstmt->vacuum)
+                       {
+                               StartTransactionCommand();
+                               SetQuerySnapshot();             /* might be needed for functions
+                                                                                * in indexes */
+                       }
+                       else
+                               old_context = MemoryContextSwitchTo(anl_context);
 
-       /* matches the CommitTransaction in PostgresMain() */
-       StartTransactionCommand();
+                       analyze_rel(relid, vacstmt);
 
-       /*
-        * If we did a database-wide VACUUM, update the database's pg_database
-        * row with info about the transaction IDs used, and try to truncate
-        * pg_clog.
-        */
-       if (vacstmt->vacuum && vacstmt->vacrel == NULL)
-       {
-               vac_update_dbstats(MyDatabaseId,
-                                                  initialOldestXmin, initialFreezeLimit);
-               vac_truncate_clog(initialOldestXmin, initialFreezeLimit);
+                       if (vacstmt->vacuum)
+                               CommitTransactionCommand();
+                       else
+                       {
+                               MemoryContextSwitchTo(old_context);
+                               MemoryContextResetAndDeleteChildren(anl_context);
+                       }
+               }
        }
 
        /*
-        * If we did a complete vacuum or analyze, then flush the init file that
-        * relcache.c uses to save startup time. The next backend startup will
-        * rebuild the init file with up-to-date information from pg_class.
-        * This lets the optimizer see the stats that we've collected for certain
-        * critical system indexes.  See relcache.c for more details.
-        *
-        * Ignore any failure to unlink the file, since it might not be there if
-        * no backend has been started since the last vacuum.
+        * Finish up processing.
         */
-       if (vacstmt->vacrel == NULL)
-               unlink(RELCACHE_INIT_FILENAME);
+       if (vacstmt->vacuum)
+       {
+               /* here, we are not in a transaction */
+
+               /*
+                * This matches the CommitTransaction waiting for us in
+                * PostgresMain().
+                */
+               StartTransactionCommand();
+
+               /*
+                * If it was a database-wide VACUUM, print FSM usage statistics
+                * (we don't make you be superuser to see these).
+                */
+               if (vacstmt->relation == NULL)
+                       PrintFreeSpaceMapStatistics(elevel);
+
+               /*
+                * If we completed a database-wide VACUUM without skipping any
+                * relations, update the database's pg_database row with info
+                * about the transaction IDs used, and try to truncate pg_clog.
+                */
+               if (all_rels)
+               {
+                       vac_update_dbstats(MyDatabaseId,
+                                                          initialOldestXmin, initialFreezeLimit);
+                       vac_truncate_clog(initialOldestXmin, initialFreezeLimit);
+               }
+       }
 
        /*
         * Clean up working storage --- note we must do this after
@@ -335,96 +362,64 @@ vacuum_shutdown(VacuumStmt *vacstmt)
         */
        MemoryContextDelete(vac_context);
        vac_context = NULL;
+
+       if (anl_context)
+               MemoryContextDelete(anl_context);
 }
 
 /*
- * Build a list of VRelListData nodes for each relation to be processed
+ * Build a list of Oids for each relation to be processed
  *
  * The list is built in vac_context so that it will survive across our
  * per-relation transactions.
  */
-static VRelList
-getrels(Name VacRelP, const char *stmttype)
+static List *
+getrels(const RangeVar *vacrel, const char *stmttype)
 {
-       Relation        rel;
-       TupleDesc       tupdesc;
-       HeapScanDesc scan;
-       HeapTuple       tuple;
-       VRelList        vrl,
-                               cur;
-       Datum           d;
-       char       *rname;
-       char            rkind;
-       bool            n;
-       ScanKeyData key;
-
-       if (VacRelP)
+       List       *vrl = NIL;
+       MemoryContext oldcontext;
+
+       if (vacrel)
        {
-               /*
-                * we could use the cache here, but it is clearer to use scankeys
-                * for both vacuum cases, bjm 2000/01/19
-                */
-               char       *nontemp_relname;
+               /* Process specific relation */
+               Oid                     relid;
 
-               /* We must re-map temp table names bjm 2000-04-06 */
-               nontemp_relname = get_temp_rel_by_username(NameStr(*VacRelP));
-               if (nontemp_relname == NULL)
-                       nontemp_relname = NameStr(*VacRelP);
+               relid = RangeVarGetRelid(vacrel, false);
 
-               ScanKeyEntryInitialize(&key, 0x0, Anum_pg_class_relname,
-                                                          F_NAMEEQ,
-                                                          PointerGetDatum(nontemp_relname));
+               /* Make a relation list entry for this guy */
+               oldcontext = MemoryContextSwitchTo(vac_context);
+               vrl = lappendo(vrl, relid);
+               MemoryContextSwitchTo(oldcontext);
        }
        else
        {
-               /* find all plain relations listed in pg_class */
-               ScanKeyEntryInitialize(&key, 0x0, Anum_pg_class_relkind,
-                                                          F_CHAREQ, CharGetDatum(RELKIND_RELATION));
-       }
+               /* Process all plain relations listed in pg_class */
+               Relation        pgclass;
+               HeapScanDesc scan;
+               HeapTuple       tuple;
+               ScanKeyData key;
 
-       vrl = cur = (VRelList) NULL;
+               ScanKeyEntryInitialize(&key, 0x0,
+                                                          Anum_pg_class_relkind,
+                                                          F_CHAREQ,
+                                                          CharGetDatum(RELKIND_RELATION));
 
-       rel = heap_openr(RelationRelationName, AccessShareLock);
-       tupdesc = RelationGetDescr(rel);
+               pgclass = heap_openr(RelationRelationName, AccessShareLock);
 
-       scan = heap_beginscan(rel, false, SnapshotNow, 1, &key);
+               scan = heap_beginscan(pgclass, SnapshotNow, 1, &key);
 
-       while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
-       {
-               d = heap_getattr(tuple, Anum_pg_class_relname, tupdesc, &n);
-               rname = (char *) DatumGetName(d);
-
-               d = heap_getattr(tuple, Anum_pg_class_relkind, tupdesc, &n);
-               rkind = DatumGetChar(d);
-
-               if (rkind != RELKIND_RELATION)
+               while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
                {
-                       elog(NOTICE, "%s: can not process indexes, views or special system tables",
-                                stmttype);
-                       continue;
+                       /* Make a relation list entry for this guy */
+                       oldcontext = MemoryContextSwitchTo(vac_context);
+                       vrl = lappendo(vrl, HeapTupleGetOid(tuple));
+                       MemoryContextSwitchTo(oldcontext);
                }
 
-               /* Make a relation list entry for this guy */
-               if (vrl == (VRelList) NULL)
-                       vrl = cur = (VRelList)
-                               MemoryContextAlloc(vac_context, sizeof(VRelListData));
-               else
-               {
-                       cur->vrl_next = (VRelList)
-                               MemoryContextAlloc(vac_context, sizeof(VRelListData));
-                       cur = cur->vrl_next;
-               }
-
-               cur->vrl_relid = tuple->t_data->t_oid;
-               cur->vrl_next = (VRelList) NULL;
+               heap_endscan(scan);
+               heap_close(pgclass, AccessShareLock);
        }
 
-       heap_endscan(scan);
-       heap_close(rel, AccessShareLock);
-
-       if (vrl == NULL)
-               elog(NOTICE, "%s: table not found", stmttype);
-
        return vrl;
 }
 
@@ -467,7 +462,9 @@ vacuum_set_xid_limits(VacuumStmt *vacstmt, bool sharedRel,
         */
        if (TransactionIdFollows(limit, *oldestXmin))
        {
-               elog(NOTICE, "oldest Xmin is far in the past --- close open transactions soon to avoid wraparound problems");
+               ereport(WARNING,
+                               (errmsg("oldest Xmin is far in the past"),
+                                errhint("Close open transactions soon to avoid wraparound problems.")));
                limit = *oldestXmin;
        }
 
@@ -519,22 +516,32 @@ vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
        /* get the buffer cache tuple */
        rtup.t_self = ctup->t_self;
        ReleaseSysCache(ctup);
-       heap_fetch(rd, SnapshotNow, &rtup, &buffer, NULL);
+       if (!heap_fetch(rd, SnapshotNow, &rtup, &buffer, false, NULL))
+               elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
+                        relid);
 
        /* overwrite the existing statistics in the tuple */
        pgcform = (Form_pg_class) GETSTRUCT(&rtup);
        pgcform->relpages = (int32) num_pages;
        pgcform->reltuples = num_tuples;
        pgcform->relhasindex = hasindex;
+
        /*
-        * If we have discovered that there are no indexes, then there's
-        * no primary key either.  This could be done more thoroughly...
+        * If we have discovered that there are no indexes, then there's no
+        * primary key either.  This could be done more thoroughly...
         */
        if (!hasindex)
                pgcform->relhaspkey = false;
 
-       /* invalidate the tuple in the cache and write the buffer */
-       RelationInvalidateHeapTuple(rd, &rtup);
+       /*
+        * Invalidate the tuple in the catcaches; this also arranges to flush
+        * the relation's relcache entry.  (If we fail to commit for some
+        * reason, no flush will occur, but no great harm is done since there
+        * are no noncritical state updates here.)
+        */
+       CacheInvalidateHeapTuple(rd, &rtup);
+
+       /* Write the buffer */
        WriteBuffer(buffer);
 
        heap_close(rd, RowExclusiveLock);
@@ -574,12 +581,12 @@ vac_update_dbstats(Oid dbid,
                                                   ObjectIdAttributeNumber, F_OIDEQ,
                                                   ObjectIdGetDatum(dbid));
 
-       scan = heap_beginscan(relation, 0, SnapshotNow, 1, entry);
+       scan = heap_beginscan(relation, SnapshotNow, 1, entry);
 
-       tuple = heap_getnext(scan, 0);
+       tuple = heap_getnext(scan, ForwardScanDirection);
 
        if (!HeapTupleIsValid(tuple))
-               elog(ERROR, "database %u does not exist", dbid);
+               elog(ERROR, "could not find tuple for database %u", dbid);
 
        dbform = (Form_pg_database) GETSTRUCT(tuple);
 
@@ -588,7 +595,7 @@ vac_update_dbstats(Oid dbid,
        dbform->datfrozenxid = frozenXID;
 
        /* invalidate the tuple in the cache and write the buffer */
-       RelationInvalidateHeapTuple(relation, tuple);
+       CacheInvalidateHeapTuple(relation, tuple);
        WriteNoReleaseBuffer(scan->rs_cbuf);
 
        heap_endscan(scan);
@@ -606,7 +613,7 @@ vac_update_dbstats(Oid dbid,
  *             seems to be in danger of wrapping around.
  *
  *             The passed XIDs are simply the ones I just wrote into my pg_database
- *             entry.  They're used to initialize the "min" calculations.
+ *             entry.  They're used to initialize the "min" calculations.
  *
  *             This routine is shared by full and lazy VACUUM.  Note that it is only
  *             applied after a database-wide VACUUM operation.
@@ -614,16 +621,21 @@ vac_update_dbstats(Oid dbid,
 static void
 vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
 {
+       TransactionId myXID;
        Relation        relation;
        HeapScanDesc scan;
        HeapTuple       tuple;
        int32           age;
+       bool            vacuumAlreadyWrapped = false;
+       bool            frozenAlreadyWrapped = false;
+
+       myXID = GetCurrentTransactionId();
 
        relation = heap_openr(DatabaseRelationName, AccessShareLock);
 
-       scan = heap_beginscan(relation, 0, SnapshotNow, 0, NULL);
+       scan = heap_beginscan(relation, SnapshotNow, 0, NULL);
 
-       while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
+       while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
        {
                Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
 
@@ -632,28 +644,59 @@ vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
                if (!dbform->datallowconn)
                        continue;
 
-               if (TransactionIdIsNormal(dbform->datvacuumxid) &&
-                       TransactionIdPrecedes(dbform->datvacuumxid, vacuumXID))
-                       vacuumXID = dbform->datvacuumxid;
-               if (TransactionIdIsNormal(dbform->datfrozenxid) &&
-                       TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
-                       frozenXID = dbform->datfrozenxid;
+               if (TransactionIdIsNormal(dbform->datvacuumxid))
+               {
+                       if (TransactionIdPrecedes(myXID, dbform->datvacuumxid))
+                               vacuumAlreadyWrapped = true;
+                       else if (TransactionIdPrecedes(dbform->datvacuumxid, vacuumXID))
+                               vacuumXID = dbform->datvacuumxid;
+               }
+               if (TransactionIdIsNormal(dbform->datfrozenxid))
+               {
+                       if (TransactionIdPrecedes(myXID, dbform->datfrozenxid))
+                               frozenAlreadyWrapped = true;
+                       else if (TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
+                               frozenXID = dbform->datfrozenxid;
+               }
        }
 
        heap_endscan(scan);
 
        heap_close(relation, AccessShareLock);
 
+       /*
+        * Do not truncate CLOG if we seem to have suffered wraparound
+        * already; the computed minimum XID might be bogus.
+        */
+       if (vacuumAlreadyWrapped)
+       {
+               ereport(WARNING,
+                               (errmsg("some databases have not been vacuumed in over 2 billion transactions"),
+                                errdetail("You may have already suffered transaction-wraparound data loss.")));
+               return;
+       }
+
        /* Truncate CLOG to the oldest vacuumxid */
        TruncateCLOG(vacuumXID);
 
        /* Give warning about impending wraparound problems */
-       age = (int32) (GetCurrentTransactionId() - frozenXID);
-       if (age > (int32) ((MaxTransactionId >> 3) * 3))
-               elog(NOTICE, "Some databases have not been vacuumed in %d transactions."
-                        "\n\tBetter vacuum them within %d transactions,"
-                        "\n\tor you may have a wraparound failure.",
-                        age, (int32) (MaxTransactionId >> 1) - age);
+       if (frozenAlreadyWrapped)
+       {
+               ereport(WARNING,
+                               (errmsg("some databases have not been vacuumed in over 1 billion transactions"),
+                                errhint("Better vacuum them soon, or you may have a wraparound failure.")));
+       }
+       else
+       {
+               age = (int32) (myXID - frozenXID);
+               if (age > (int32) ((MaxTransactionId >> 3) * 3))
+                       ereport(WARNING,
+                                       (errmsg("some databases have not been vacuumed in %d transactions",
+                                                       age),
+                                        errhint("Better vacuum them within %d transactions, "
+                                                        "or you may have a wraparound failure.",
+                                                        (int32) (MaxTransactionId >> 1) - age)));
+       }
 }
 
 
@@ -668,6 +711,11 @@ vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
 /*
  *     vacuum_rel() -- vacuum one heap relation
  *
+ *             Returns TRUE if we actually processed the relation (or can ignore it
+ *             for some reason), FALSE if we failed to process it due to permissions
+ *             or other reasons.  (A FALSE result really means that some data
+ *             may have been left unvacuumed, so we can't update XID stats.)
+ *
  *             Doing one heap at a time incurs extra overhead, since we need to
  *             check that the heap exists again just before we vacuum it.      The
  *             reason that we do this is so that vacuuming can be spread across
@@ -676,20 +724,23 @@ vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
  *
  *             At entry and exit, we are not inside a transaction.
  */
-static void
-vacuum_rel(Oid relid, VacuumStmt *vacstmt)
+static bool
+vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
 {
        LOCKMODE        lmode;
        Relation        onerel;
        LockRelId       onerelid;
        Oid                     toast_relid;
+       bool            result;
 
        /* Begin a transaction for vacuuming this relation */
        StartTransactionCommand();
+       SetQuerySnapshot();                     /* might be needed for functions in
+                                                                * indexes */
 
        /*
         * Check for user-requested abort.      Note we want this to be inside a
-        * transaction, so xact.c doesn't issue useless NOTICE.
+        * transaction, so xact.c doesn't issue useless WARNING.
         */
        CHECK_FOR_INTERRUPTS();
 
@@ -702,38 +753,69 @@ vacuum_rel(Oid relid, VacuumStmt *vacstmt)
                                                          0, 0, 0))
        {
                CommitTransactionCommand();
-               return;
+               return true;                    /* okay 'cause no data there */
        }
 
        /*
         * Determine the type of lock we want --- hard exclusive lock for a
         * FULL vacuum, but just ShareUpdateExclusiveLock for concurrent
-        * vacuum.  Either way, we can be sure that no other backend is vacuuming
-        * the same table.
+        * vacuum.      Either way, we can be sure that no other backend is
+        * vacuuming the same table.
         */
        lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock;
 
        /*
-        * Open the class, get an appropriate lock on it, and check permissions.
+        * Open the class, get an appropriate lock on it, and check
+        * permissions.
         *
         * We allow the user to vacuum a table if he is superuser, the table
         * owner, or the database owner (but in the latter case, only if it's
-        * not a shared relation).  pg_ownercheck includes the superuser case.
+        * not a shared relation).      pg_class_ownercheck includes the superuser
+        * case.
         *
-        * Note we choose to treat permissions failure as a NOTICE and keep
+        * Note we choose to treat permissions failure as a WARNING and keep
         * trying to vacuum the rest of the DB --- is this appropriate?
         */
-       onerel = heap_open(relid, lmode);
+       onerel = relation_open(relid, lmode);
 
-       if (! (pg_ownercheck(GetUserId(), RelationGetRelationName(onerel),
-                                                RELNAME) ||
-                  (is_dbadmin(MyDatabaseId) && !onerel->rd_rel->relisshared)))
+       if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
+                 (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
        {
-               elog(NOTICE, "Skipping \"%s\" --- only table or database owner can VACUUM it",
-                        RelationGetRelationName(onerel));
-               heap_close(onerel, lmode);
+               ereport(WARNING,
+                               (errmsg("skipping \"%s\" --- only table or database owner can VACUUM it",
+                                               RelationGetRelationName(onerel))));
+               relation_close(onerel, lmode);
                CommitTransactionCommand();
-               return;
+               return false;
+       }
+
+       /*
+        * Check that it's a plain table; we used to do this in getrels() but
+        * seems safer to check after we've locked the relation.
+        */
+       if (onerel->rd_rel->relkind != expected_relkind)
+       {
+               ereport(WARNING,
+                               (errmsg("skipping \"%s\" --- cannot VACUUM indexes, views or special system tables",
+                                               RelationGetRelationName(onerel))));
+               relation_close(onerel, lmode);
+               CommitTransactionCommand();
+               return false;
+       }
+
+       /*
+        * Silently ignore tables that are temp tables of other backends ---
+        * trying to vacuum these will lead to great unhappiness, since their
+        * contents are probably not up-to-date on disk.  (We don't throw a
+        * warning here; it would just lead to chatter during a database-wide
+        * VACUUM.)
+        */
+       if (isOtherTempNamespace(RelationGetNamespace(onerel)))
+       {
+               relation_close(onerel, lmode);
+               CommitTransactionCommand();
+               return true;                    /* assume no long-lived data in temp
+                                                                * tables */
        }
 
        /*
@@ -762,8 +844,10 @@ vacuum_rel(Oid relid, VacuumStmt *vacstmt)
        else
                lazy_vacuum_rel(onerel, vacstmt);
 
+       result = true;                          /* did the vacuum */
+
        /* all done with this class, but hold lock until commit */
-       heap_close(onerel, NoLock);
+       relation_close(onerel, NoLock);
 
        /*
         * Complete the transaction and free all temporary memory used.
@@ -773,17 +857,22 @@ vacuum_rel(Oid relid, VacuumStmt *vacstmt)
        /*
         * If the relation has a secondary toast rel, vacuum that too while we
         * still hold the session lock on the master table.  Note however that
-        * "analyze" will not get done on the toast table.  This is good,
-        * because the toaster always uses hardcoded index access and statistics
-        * are totally unimportant for toast relations.
+        * "analyze" will not get done on the toast table.      This is good,
+        * because the toaster always uses hardcoded index access and
+        * statistics are totally unimportant for toast relations.
         */
        if (toast_relid != InvalidOid)
-               vacuum_rel(toast_relid, vacstmt);
+       {
+               if (!vacuum_rel(toast_relid, vacstmt, RELKIND_TOASTVALUE))
+                       result = false;         /* failed to vacuum the TOAST table? */
+       }
 
        /*
         * Now release the session-level lock on the master table.
         */
        UnlockRelationForSession(&onerelid, lmode);
+
+       return result;
 }
 
 
@@ -815,11 +904,6 @@ full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
        int                     nindexes,
                                i;
        VRelStats  *vacrelstats;
-       bool            reindex = false;
-
-       if (IsIgnoringSystemIndexes() &&
-               IsSystemRelationName(RelationGetRelationName(onerel)))
-               reindex = true;
 
        vacuum_set_xid_limits(vacstmt, onerel->rd_rel->relisshared,
                                                  &OldestXmin, &FreezeLimit);
@@ -838,26 +922,9 @@ full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
 
        /* Now open all indexes of the relation */
        vac_open_indexes(onerel, &nindexes, &Irel);
-       if (!Irel)
-               reindex = false;
-       else if (!RelationGetForm(onerel)->relhasindex)
-               reindex = true;
        if (nindexes > 0)
                vacrelstats->hasindex = true;
 
-#ifdef NOT_USED
-       /*
-        * reindex in VACUUM is dangerous under WAL. ifdef out until it
-        * becomes safe.
-        */
-       if (reindex)
-       {
-               vac_close_indexes(nindexes, Irel);
-               Irel = (Relation *) NULL;
-               activate_indexes_of_a_table(RelationGetRelid(onerel), false);
-       }
-#endif  /* NOT_USED */
-
        /* Clean/scan index relation(s) */
        if (Irel != (Relation *) NULL)
        {
@@ -900,16 +967,10 @@ full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
                         */
                        i = FlushRelationBuffers(onerel, vacrelstats->rel_pages);
                        if (i < 0)
-                               elog(ERROR, "VACUUM (full_vacuum_rel): FlushRelationBuffers returned %d",
-                                        i);
+                               elog(ERROR, "FlushRelationBuffers returned %d", i);
                }
        }
 
-#ifdef NOT_USED
-       if (reindex)
-               activate_indexes_of_a_table(RelationGetRelid(onerel), true);
-#endif  /* NOT_USED */
-
        /* update shared free space map with final free space info */
        vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
 
@@ -945,16 +1006,14 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
        char       *relname;
        VacPage         vacpage,
                                vacpagecopy;
-       BlockNumber     empty_pages,
-                               new_pages,
-                               changed_pages,
+       BlockNumber empty_pages,
                                empty_end_pages;
        double          num_tuples,
                                tups_vacuumed,
                                nkeep,
                                nunused;
-       double          free_size,
-                               usable_free_size;
+       double          free_space,
+                               usable_free_space;
        Size            min_tlen = MaxTupleSize;
        Size            max_tlen = 0;
        int                     i;
@@ -967,11 +1026,14 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
        vac_init_rusage(&ru0);
 
        relname = RelationGetRelationName(onerel);
-       elog(MESSAGE_LEVEL, "--Relation %s--", relname);
+       ereport(elevel,
+                       (errmsg("vacuuming \"%s.%s\"",
+                                       get_namespace_name(RelationGetNamespace(onerel)),
+                                       relname)));
 
-       empty_pages = new_pages = changed_pages = empty_end_pages = 0;
+       empty_pages = empty_end_pages = 0;
        num_tuples = tups_vacuumed = nkeep = nunused = 0;
-       free_size = 0;
+       free_space = 0;
 
        nblocks = RelationGetNumberOfBlocks(onerel);
 
@@ -988,6 +1050,8 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
                bool            do_reap,
                                        do_frag;
 
+               CHECK_FOR_INTERRUPTS();
+
                buf = ReadBuffer(onerel, blkno);
                page = BufferGetPage(buf);
 
@@ -997,12 +1061,13 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
 
                if (PageIsNew(page))
                {
-                       elog(NOTICE, "Rel %s: Uninitialized page %u - fixing",
-                                relname, blkno);
+                       ereport(WARNING,
+                       (errmsg("relation \"%s\" page %u is uninitialized --- fixing",
+                                       relname, blkno)));
                        PageInit(page, BufferGetPageSize(buf), 0);
                        vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
-                       free_size += (vacpage->free - sizeof(ItemIdData));
-                       new_pages++;
+                       free_space += vacpage->free;
+                       empty_pages++;
                        empty_end_pages++;
                        vacpagecopy = copy_vac_page(vacpage);
                        vpage_insert(vacuum_pages, vacpagecopy);
@@ -1014,7 +1079,7 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
                if (PageIsEmpty(page))
                {
                        vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
-                       free_size += (vacpage->free - sizeof(ItemIdData));
+                       free_space += vacpage->free;
                        empty_pages++;
                        empty_end_pages++;
                        vacpagecopy = copy_vac_page(vacpage);
@@ -1057,29 +1122,32 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
                        switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin))
                        {
                                case HEAPTUPLE_DEAD:
-                                       tupgone = true; /* we can delete the tuple */
+                                       tupgone = true;         /* we can delete the tuple */
                                        break;
                                case HEAPTUPLE_LIVE:
+
                                        /*
-                                        * Tuple is good.  Consider whether to replace its xmin
-                                        * value with FrozenTransactionId.
+                                        * Tuple is good.  Consider whether to replace its
+                                        * xmin value with FrozenTransactionId.
                                         */
-                                       if (TransactionIdIsNormal(tuple.t_data->t_xmin) &&
-                                               TransactionIdPrecedes(tuple.t_data->t_xmin,
+                                       if (TransactionIdIsNormal(HeapTupleHeaderGetXmin(tuple.t_data)) &&
+                                               TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
                                                                                          FreezeLimit))
                                        {
-                                               tuple.t_data->t_xmin = FrozenTransactionId;
-                                               tuple.t_data->t_infomask &= ~HEAP_XMIN_INVALID;
-                                               tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
+                                               HeapTupleHeaderSetXmin(tuple.t_data, FrozenTransactionId);
+                                               /* infomask should be okay already */
+                                               Assert(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED);
                                                pgchanged = true;
                                        }
                                        break;
                                case HEAPTUPLE_RECENTLY_DEAD:
+
                                        /*
-                                        * If tuple is recently deleted then we must not remove
-                                        * it from relation.
+                                        * If tuple is recently deleted then we must not
+                                        * remove it from relation.
                                         */
                                        nkeep += 1;
+
                                        /*
                                         * If we do shrinking and this tuple is updated one
                                         * then remember it to construct updated tuple
@@ -1103,25 +1171,33 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
                                        }
                                        break;
                                case HEAPTUPLE_INSERT_IN_PROGRESS:
+
                                        /*
-                                        * This should not happen, since we hold exclusive lock
-                                        * on the relation; shouldn't we raise an error?
+                                        * This should not happen, since we hold exclusive
+                                        * lock on the relation; shouldn't we raise an error?
+                                        * (Actually, it can happen in system catalogs, since
+                                        * we tend to release write lock before commit there.)
                                         */
-                                       elog(NOTICE, "Rel %s: TID %u/%u: InsertTransactionInProgress %u - can't shrink relation",
-                                                relname, blkno, offnum, tuple.t_data->t_xmin);
+                                       ereport(NOTICE,
+                                                       (errmsg("relation \"%s\" TID %u/%u: InsertTransactionInProgress %u --- can't shrink relation",
+                                                                       relname, blkno, offnum, HeapTupleHeaderGetXmin(tuple.t_data))));
                                        do_shrinking = false;
                                        break;
                                case HEAPTUPLE_DELETE_IN_PROGRESS:
+
                                        /*
-                                        * This should not happen, since we hold exclusive lock
-                                        * on the relation; shouldn't we raise an error?
+                                        * This should not happen, since we hold exclusive
+                                        * lock on the relation; shouldn't we raise an error?
+                                        * (Actually, it can happen in system catalogs, since
+                                        * we tend to release write lock before commit there.)
                                         */
-                                       elog(NOTICE, "Rel %s: TID %u/%u: DeleteTransactionInProgress %u - can't shrink relation",
-                                                relname, blkno, offnum, tuple.t_data->t_xmax);
+                                       ereport(NOTICE,
+                                                       (errmsg("relation \"%s\" TID %u/%u: DeleteTransactionInProgress %u --- can't shrink relation",
+                                                                       relname, blkno, offnum, HeapTupleHeaderGetXmax(tuple.t_data))));
                                        do_shrinking = false;
                                        break;
                                default:
-                                       elog(ERROR, "Unexpected HeapTupleSatisfiesVacuum result");
+                                       elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
                                        break;
                        }
 
@@ -1132,10 +1208,10 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
                        /*
                         * Other checks...
                         */
-                       if (!OidIsValid(tuple.t_data->t_oid) &&
-                               onerel->rd_rel->relhasoids)
-                               elog(NOTICE, "Rel %s: TID %u/%u: OID IS INVALID. TUPGONE %d.",
-                                        relname, blkno, offnum, (int) tupgone);
+                       if (onerel->rd_rel->relhasoids &&
+                               !OidIsValid(HeapTupleGetOid(&tuple)))
+                               elog(WARNING, "relation \"%s\" TID %u/%u: OID is invalid",
+                                        relname, blkno, offnum);
 
                        if (tupgone)
                        {
@@ -1174,7 +1250,7 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
                                if (tuple.t_len > max_tlen)
                                        max_tlen = tuple.t_len;
                        }
-               } /* scan along page */
+               }                                               /* scan along page */
 
                if (tempPage != (Page) NULL)
                {
@@ -1192,14 +1268,15 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
                        do_reap = (vacpage->offsets_free > 0);
                }
 
-               free_size += vacpage->free;
+               free_space += vacpage->free;
+
                /*
                 * Add the page to fraged_pages if it has a useful amount of free
-                * space.  "Useful" means enough for a minimal-sized tuple.
-                * But we don't know that accurately near the start of the relation,
-                * so add pages unconditionally if they have >= BLCKSZ/10 free space.
+                * space.  "Useful" means enough for a minimal-sized tuple. But we
+                * don't know that accurately near the start of the relation, so
+                * add pages unconditionally if they have >= BLCKSZ/10 free space.
                 */
-               do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ/10);
+               do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ / 10);
 
                if (do_reap || do_frag)
                {
@@ -1210,16 +1287,21 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
                                vpage_insert(fraged_pages, vacpagecopy);
                }
 
+               /*
+                * Include the page in empty_end_pages if it will be empty after
+                * vacuuming; this is to keep us from using it as a move
+                * destination.
+                */
                if (notup)
+               {
+                       empty_pages++;
                        empty_end_pages++;
+               }
                else
                        empty_end_pages = 0;
 
                if (pgchanged)
-               {
                        WriteBuffer(buf);
-                       changed_pages++;
-               }
                else
                        ReleaseBuffer(buf);
        }
@@ -1238,25 +1320,26 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
        fraged_pages->empty_end_pages = empty_end_pages;
 
        /*
-        * Clear the fraged_pages list if we found we couldn't shrink.
-        * Else, remove any "empty" end-pages from the list, and compute
-        * usable free space = free space in remaining pages.
+        * Clear the fraged_pages list if we found we couldn't shrink. Else,
+        * remove any "empty" end-pages from the list, and compute usable free
+        * space = free space in remaining pages.
         */
        if (do_shrinking)
        {
                Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
                fraged_pages->num_pages -= empty_end_pages;
-               usable_free_size = 0;
+               usable_free_space = 0;
                for (i = 0; i < fraged_pages->num_pages; i++)
-                       usable_free_size += fraged_pages->pagedesc[i]->free;
+                       usable_free_space += fraged_pages->pagedesc[i]->free;
        }
        else
        {
                fraged_pages->num_pages = 0;
-               usable_free_size = 0;
+               usable_free_space = 0;
        }
 
-       if (usable_free_size > 0 && num_vtlinks > 0)
+       /* don't bother to save vtlinks if we will not call repair_frag */
+       if (fraged_pages->num_pages > 0 && num_vtlinks > 0)
        {
                qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
                          vac_cmp_vtlinks);
@@ -1270,17 +1353,24 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
                pfree(vtlinks);
        }
 
-       elog(MESSAGE_LEVEL, "Pages %u: Changed %u, reaped %u, Empty %u, New %u; \
-Tup %.0f: Vac %.0f, Keep/VTL %.0f/%u, UnUsed %.0f, MinLen %lu, MaxLen %lu; \
-Re-using: Free/Avail. Space %.0f/%.0f; EndEmpty/Avail. Pages %u/%u.\n\t%s",
-                nblocks, changed_pages, vacuum_pages->num_pages, empty_pages,
-                new_pages, num_tuples, tups_vacuumed,
-                nkeep, vacrelstats->num_vtlinks,
-                nunused, (unsigned long) min_tlen, (unsigned long) max_tlen,
-                free_size, usable_free_size,
-                empty_end_pages, fraged_pages->num_pages,
-                vac_show_rusage(&ru0));
-
+       ereport(elevel,
+                       (errmsg("\"%s\": found %.0f removable, %.0f nonremovable tuples in %u pages",
+                                       RelationGetRelationName(onerel),
+                                       tups_vacuumed, num_tuples, nblocks),
+                        errdetail("%.0f dead tuples cannot be removed yet.\n"
+                               "Nonremovable tuples range from %lu to %lu bytes long.\n"
+                                          "There were %.0f unused item pointers.\n"
+                "Total free space (including removable tuples) is %.0f bytes.\n"
+                                          "%u pages are or will become empty, including %u at the end of the table.\n"
+                                          "%u pages containing %.0f free bytes are potential move destinations.\n"
+                                          "%s",
+                                          nkeep,
+                                          (unsigned long) min_tlen, (unsigned long) max_tlen,
+                                          nunused,
+                                          free_space,
+                                          empty_pages, empty_end_pages,
+                                          fraged_pages->num_pages, usable_free_space,
+                                          vac_show_rusage(&ru0))));
 }
 
 
@@ -1303,9 +1393,9 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
        CommandId       myCID;
        Buffer          buf,
                                cur_buffer;
-       BlockNumber     nblocks,
+       BlockNumber nblocks,
                                blkno;
-       BlockNumber     last_move_dest_block = 0,
+       BlockNumber last_move_dest_block = 0,
                                last_vacuum_block;
        Page            page,
                                ToPage = NULL;
@@ -1352,14 +1442,15 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
         * We need a ResultRelInfo and an EState so we can use the regular
         * executor's index-entry-making machinery.
         */
+       estate = CreateExecutorState();
+
        resultRelInfo = makeNode(ResultRelInfo);
        resultRelInfo->ri_RangeTableIndex = 1;          /* dummy */
        resultRelInfo->ri_RelationDesc = onerel;
-       resultRelInfo->ri_TrigDesc = NULL;                      /* we don't fire triggers */
+       resultRelInfo->ri_TrigDesc = NULL;      /* we don't fire triggers */
 
        ExecOpenIndices(resultRelInfo);
 
-       estate = CreateExecutorState();
        estate->es_result_relations = resultRelInfo;
        estate->es_num_result_relations = 1;
        estate->es_result_relation_info = resultRelInfo;
@@ -1393,9 +1484,9 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
        /*
         * Scan pages backwards from the last nonempty page, trying to move
         * tuples down to lower pages.  Quit when we reach a page that we have
-        * moved any tuples onto, or the first page if we haven't moved anything,
-        * or when we find a page we cannot completely empty (this last condition
-        * is handled by "break" statements within the loop).
+        * moved any tuples onto, or the first page if we haven't moved
+        * anything, or when we find a page we cannot completely empty (this
+        * last condition is handled by "break" statements within the loop).
         *
         * NB: this code depends on the vacuum_pages and fraged_pages lists being
         * in order by blkno.
@@ -1405,20 +1496,23 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                 blkno > last_move_dest_block;
                 blkno--)
        {
+               CHECK_FOR_INTERRUPTS();
+
                /*
-                * Forget fraged_pages pages at or after this one; they're no longer
-                * useful as move targets, since we only want to move down.  Note
-                * that since we stop the outer loop at last_move_dest_block, pages
-                * removed here cannot have had anything moved onto them already.
+                * Forget fraged_pages pages at or after this one; they're no
+                * longer useful as move targets, since we only want to move down.
+                * Note that since we stop the outer loop at last_move_dest_block,
+                * pages removed here cannot have had anything moved onto them
+                * already.
                 *
-                * Also note that we don't change the stored fraged_pages list,
-                * only our local variable num_fraged_pages; so the forgotten pages
-                * are still available to be loaded into the free space map later.
+                * Also note that we don't change the stored fraged_pages list, only
+                * our local variable num_fraged_pages; so the forgotten pages are
+                * still available to be loaded into the free space map later.
                 */
                while (num_fraged_pages > 0 &&
-                          fraged_pages->pagedesc[num_fraged_pages-1]->blkno >= blkno)
+                       fraged_pages->pagedesc[num_fraged_pages - 1]->blkno >= blkno)
                {
-                       Assert(fraged_pages->pagedesc[num_fraged_pages-1]->offsets_used == 0);
+                       Assert(fraged_pages->pagedesc[num_fraged_pages - 1]->offsets_used == 0);
                        --num_fraged_pages;
                }
 
@@ -1489,8 +1583,6 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
 
                        if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
                        {
-                               if ((TransactionId) tuple.t_data->t_cmin != myXID)
-                                       elog(ERROR, "Invalid XID in t_cmin");
                                if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
                                        elog(ERROR, "HEAP_MOVED_IN was not expected");
 
@@ -1501,6 +1593,8 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                 */
                                if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
                                {
+                                       if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
+                                               elog(ERROR, "invalid XVAC in tuple header");
                                        if (keep_tuples == 0)
                                                continue;
                                        if (chain_tuple_moved)          /* some chains was moved
@@ -1532,41 +1626,67 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                         * If this tuple is in the chain of tuples created in updates
                         * by "recent" transactions then we have to move all chain of
                         * tuples to another places.
+                        *
+                        * NOTE: this test is not 100% accurate: it is possible for a
+                        * tuple to be an updated one with recent xmin, and yet not
+                        * have a corresponding tuple in the vtlinks list.      Presumably
+                        * there was once a parent tuple with xmax matching the xmin,
+                        * but it's possible that that tuple has been removed --- for
+                        * example, if it had xmin = xmax then
+                        * HeapTupleSatisfiesVacuum would deem it removable as soon as
+                        * the xmin xact completes.
+                        *
+                        * To be on the safe side, we abandon the repair_frag process if
+                        * we cannot find the parent tuple in vtlinks.  This may be
+                        * overly conservative; AFAICS it would be safe to move the
+                        * chain.
                         */
-                       if ((tuple.t_data->t_infomask & HEAP_UPDATED &&
-                                !TransactionIdPrecedes(tuple.t_data->t_xmin, OldestXmin)) ||
-                               (!(tuple.t_data->t_infomask & HEAP_XMAX_INVALID) &&
+                       if (((tuple.t_data->t_infomask & HEAP_UPDATED) &&
+                        !TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
+                                                                       OldestXmin)) ||
+                               (!(tuple.t_data->t_infomask & (HEAP_XMAX_INVALID |
+                                                                                          HEAP_MARKED_FOR_UPDATE)) &&
                                 !(ItemPointerEquals(&(tuple.t_self),
                                                                         &(tuple.t_data->t_ctid)))))
                        {
                                Buffer          Cbuf = buf;
+                               bool            freeCbuf = false;
+                               bool            chain_move_failed = false;
                                Page            Cpage;
                                ItemId          Citemid;
                                ItemPointerData Ctid;
                                HeapTupleData tp = tuple;
                                Size            tlen = tuple_len;
-                               VTupleMove      vtmove = (VTupleMove)
-                               palloc(100 * sizeof(VTupleMoveData));
-                               int                     num_vtmove = 0;
-                               int                     free_vtmove = 100;
+                               VTupleMove      vtmove;
+                               int                     num_vtmove;
+                               int                     free_vtmove;
                                VacPage         to_vacpage = NULL;
                                int                     to_item = 0;
-                               bool            freeCbuf = false;
                                int                     ti;
 
-                               if (vacrelstats->vtlinks == NULL)
-                                       elog(ERROR, "No one parent tuple was found");
                                if (cur_buffer != InvalidBuffer)
                                {
                                        WriteBuffer(cur_buffer);
                                        cur_buffer = InvalidBuffer;
                                }
 
+                               /* Quick exit if we have no vtlinks to search in */
+                               if (vacrelstats->vtlinks == NULL)
+                               {
+                                       elog(DEBUG2, "parent item in update-chain not found --- can't continue repair_frag");
+                                       break;          /* out of walk-along-page loop */
+                               }
+
+                               vtmove = (VTupleMove) palloc(100 * sizeof(VTupleMoveData));
+                               num_vtmove = 0;
+                               free_vtmove = 100;
+
                                /*
                                 * If this tuple is in the begin/middle of the chain then
                                 * we have to move to the end of chain.
                                 */
-                               while (!(tp.t_data->t_infomask & HEAP_XMAX_INVALID) &&
+                               while (!(tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
+                                                                                         HEAP_MARKED_FOR_UPDATE)) &&
                                           !(ItemPointerEquals(&(tp.t_self),
                                                                                   &(tp.t_data->t_ctid))))
                                {
@@ -1581,7 +1701,6 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                                                          ItemPointerGetOffsetNumber(&Ctid));
                                        if (!ItemIdIsUsed(Citemid))
                                        {
-
                                                /*
                                                 * This means that in the middle of chain there
                                                 * was tuple updated by older (than OldestXmin)
@@ -1591,22 +1710,35 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                                 * in scan_heap(), but it's not implemented at the
                                                 * moment and so we just stop shrinking here.
                                                 */
-                                               ReleaseBuffer(Cbuf);
-                                               pfree(vtmove);
-                                               vtmove = NULL;
-                                               elog(NOTICE, "Child itemid in update-chain marked as unused - can't continue repair_frag");
-                                               break;
+                                               elog(DEBUG2, "child itemid in update-chain marked as unused --- can't continue repair_frag");
+                                               chain_move_failed = true;
+                                               break;  /* out of loop to move to chain end */
                                        }
                                        tp.t_datamcxt = NULL;
                                        tp.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
                                        tp.t_self = Ctid;
                                        tlen = tp.t_len = ItemIdGetLength(Citemid);
                                }
-                               if (vtmove == NULL)
-                                       break;
-                               /* first, can chain be moved ? */
+                               if (chain_move_failed)
+                               {
+                                       if (freeCbuf)
+                                               ReleaseBuffer(Cbuf);
+                                       pfree(vtmove);
+                                       break;          /* out of walk-along-page loop */
+                               }
+
+                               /*
+                                * Check if all items in chain can be moved
+                                */
                                for (;;)
                                {
+                                       Buffer          Pbuf;
+                                       Page            Ppage;
+                                       ItemId          Pitemid;
+                                       HeapTupleData Ptp;
+                                       VTupleLinkData vtld,
+                                                          *vtlp;
+
                                        if (to_vacpage == NULL ||
                                                !enough_space(to_vacpage, tlen))
                                        {
@@ -1619,27 +1751,23 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                                if (i == num_fraged_pages)
                                                {
                                                        /* can't move item anywhere */
-                                                       for (i = 0; i < num_vtmove; i++)
-                                                       {
-                                                               Assert(vtmove[i].vacpage->offsets_used > 0);
-                                                               (vtmove[i].vacpage->offsets_used)--;
-                                                       }
-                                                       num_vtmove = 0;
-                                                       break;
+                                                       chain_move_failed = true;
+                                                       break;          /* out of check-all-items loop */
                                                }
                                                to_item = i;
                                                to_vacpage = fraged_pages->pagedesc[to_item];
                                        }
                                        to_vacpage->free -= MAXALIGN(tlen);
                                        if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
-                                               to_vacpage->free -= MAXALIGN(sizeof(ItemIdData));
+                                               to_vacpage->free -= sizeof(ItemIdData);
                                        (to_vacpage->offsets_used)++;
                                        if (free_vtmove == 0)
                                        {
                                                free_vtmove = 1000;
-                                               vtmove = (VTupleMove) repalloc(vtmove,
-                                                                                        (free_vtmove + num_vtmove) *
-                                                                                                sizeof(VTupleMoveData));
+                                               vtmove = (VTupleMove)
+                                                       repalloc(vtmove,
+                                                                        (free_vtmove + num_vtmove) *
+                                                                        sizeof(VTupleMoveData));
                                        }
                                        vtmove[num_vtmove].tid = tp.t_self;
                                        vtmove[num_vtmove].vacpage = to_vacpage;
@@ -1650,113 +1778,95 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                        free_vtmove--;
                                        num_vtmove++;
 
-                                       /* All done ? */
+                                       /* At beginning of chain? */
                                        if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
-                                               TransactionIdPrecedes(tp.t_data->t_xmin, OldestXmin))
+                                               TransactionIdPrecedes(HeapTupleHeaderGetXmin(tp.t_data),
+                                                                                         OldestXmin))
                                                break;
 
-                                       /* Well, try to find tuple with old row version */
-                                       for (;;)
+                                       /* No, move to tuple with prior row version */
+                                       vtld.new_tid = tp.t_self;
+                                       vtlp = (VTupleLink)
+                                               vac_bsearch((void *) &vtld,
+                                                                       (void *) (vacrelstats->vtlinks),
+                                                                       vacrelstats->num_vtlinks,
+                                                                       sizeof(VTupleLinkData),
+                                                                       vac_cmp_vtlinks);
+                                       if (vtlp == NULL)
                                        {
-                                               Buffer          Pbuf;
-                                               Page            Ppage;
-                                               ItemId          Pitemid;
-                                               HeapTupleData Ptp;
-                                               VTupleLinkData vtld,
-                                                                  *vtlp;
-
-                                               vtld.new_tid = tp.t_self;
-                                               vtlp = (VTupleLink)
-                                                       vac_bsearch((void *) &vtld,
-                                                                               (void *) (vacrelstats->vtlinks),
-                                                                               vacrelstats->num_vtlinks,
-                                                                               sizeof(VTupleLinkData),
-                                                                               vac_cmp_vtlinks);
-                                               if (vtlp == NULL)
-                                                       elog(ERROR, "Parent tuple was not found");
-                                               tp.t_self = vtlp->this_tid;
-                                               Pbuf = ReadBuffer(onerel,
+                                               /* see discussion above */
+                                               elog(DEBUG2, "parent item in update-chain not found --- can't continue repair_frag");
+                                               chain_move_failed = true;
+                                               break;  /* out of check-all-items loop */
+                                       }
+                                       tp.t_self = vtlp->this_tid;
+                                       Pbuf = ReadBuffer(onerel,
                                                                ItemPointerGetBlockNumber(&(tp.t_self)));
-                                               Ppage = BufferGetPage(Pbuf);
-                                               Pitemid = PageGetItemId(Ppage,
+                                       Ppage = BufferGetPage(Pbuf);
+                                       Pitemid = PageGetItemId(Ppage,
                                                           ItemPointerGetOffsetNumber(&(tp.t_self)));
-                                               if (!ItemIdIsUsed(Pitemid))
-                                                       elog(ERROR, "Parent itemid marked as unused");
-                                               Ptp.t_datamcxt = NULL;
-                                               Ptp.t_data = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
-                                               Assert(ItemPointerEquals(&(vtld.new_tid),
-                                                                                                &(Ptp.t_data->t_ctid)));
+                                       /* this can't happen since we saw tuple earlier: */
+                                       if (!ItemIdIsUsed(Pitemid))
+                                               elog(ERROR, "parent itemid marked as unused");
+                                       Ptp.t_datamcxt = NULL;
+                                       Ptp.t_data = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
 
-                                               /*
-                                                * Read above about cases when
-                                                * !ItemIdIsUsed(Citemid) (child item is
-                                                * removed)... Due to the fact that at the moment
-                                                * we don't remove unuseful part of update-chain,
-                                                * it's possible to get too old parent row here.
-                                                * Like as in the case which caused this problem,
-                                                * we stop shrinking here. I could try to find
-                                                * real parent row but want not to do it because
-                                                * of real solution will be implemented anyway,
-                                                * latter, and we are too close to 6.5 release. -
-                                                * vadim 06/11/99
-                                                */
-                                               if (!(TransactionIdEquals(Ptp.t_data->t_xmax,
-                                                                                                 tp.t_data->t_xmin)))
-                                               {
-                                                       if (freeCbuf)
-                                                               ReleaseBuffer(Cbuf);
-                                                       freeCbuf = false;
-                                                       ReleaseBuffer(Pbuf);
-                                                       for (i = 0; i < num_vtmove; i++)
-                                                       {
-                                                               Assert(vtmove[i].vacpage->offsets_used > 0);
-                                                               (vtmove[i].vacpage->offsets_used)--;
-                                                       }
-                                                       num_vtmove = 0;
-                                                       elog(NOTICE, "Too old parent tuple found - can't continue repair_frag");
-                                                       break;
-                                               }
-#ifdef NOT_USED                                        /* I'm not sure that this will wotk
-                                                                * properly... */
+                                       /* ctid should not have changed since we saved it */
+                                       Assert(ItemPointerEquals(&(vtld.new_tid),
+                                                                                        &(Ptp.t_data->t_ctid)));
 
-                                               /*
-                                                * If this tuple is updated version of row and it
-                                                * was created by the same transaction then no one
-                                                * is interested in this tuple - mark it as
-                                                * removed.
-                                                */
-                                               if (Ptp.t_data->t_infomask & HEAP_UPDATED &&
-                                                       TransactionIdEquals(Ptp.t_data->t_xmin,
-                                                                                               Ptp.t_data->t_xmax))
-                                               {
-                                                       TransactionIdStore(myXID,
-                                                               (TransactionId *) &(Ptp.t_data->t_cmin));
-                                                       Ptp.t_data->t_infomask &=
-                                                               ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
-                                                       Ptp.t_data->t_infomask |= HEAP_MOVED_OFF;
-                                                       WriteBuffer(Pbuf);
-                                                       continue;
-                                               }
-#endif
-                                               tp.t_datamcxt = Ptp.t_datamcxt;
-                                               tp.t_data = Ptp.t_data;
-                                               tlen = tp.t_len = ItemIdGetLength(Pitemid);
-                                               if (freeCbuf)
-                                                       ReleaseBuffer(Cbuf);
-                                               Cbuf = Pbuf;
-                                               freeCbuf = true;
-                                               break;
+                                       /*
+                                        * Read above about cases when !ItemIdIsUsed(Citemid)
+                                        * (child item is removed)... Due to the fact that at
+                                        * the moment we don't remove unuseful part of
+                                        * update-chain, it's possible to get too old parent
+                                        * row here. Like as in the case which caused this
+                                        * problem, we stop shrinking here. I could try to
+                                        * find real parent row but want not to do it because
+                                        * of real solution will be implemented anyway, later,
+                                        * and we are too close to 6.5 release. - vadim
+                                        * 06/11/99
+                                        */
+                                       if (!(TransactionIdEquals(HeapTupleHeaderGetXmax(Ptp.t_data),
+                                                                        HeapTupleHeaderGetXmin(tp.t_data))))
+                                       {
+                                               ReleaseBuffer(Pbuf);
+                                               elog(DEBUG2, "too old parent tuple found --- can't continue repair_frag");
+                                               chain_move_failed = true;
+                                               break;  /* out of check-all-items loop */
                                        }
-                                       if (num_vtmove == 0)
-                                               break;
-                               }
+                                       tp.t_datamcxt = Ptp.t_datamcxt;
+                                       tp.t_data = Ptp.t_data;
+                                       tlen = tp.t_len = ItemIdGetLength(Pitemid);
+                                       if (freeCbuf)
+                                               ReleaseBuffer(Cbuf);
+                                       Cbuf = Pbuf;
+                                       freeCbuf = true;
+                               }                               /* end of check-all-items loop */
+
                                if (freeCbuf)
                                        ReleaseBuffer(Cbuf);
-                               if (num_vtmove == 0)    /* chain can't be moved */
+                               freeCbuf = false;
+
+                               if (chain_move_failed)
                                {
+                                       /*
+                                        * Undo changes to offsets_used state.  We don't
+                                        * bother cleaning up the amount-free state, since
+                                        * we're not going to do any further tuple motion.
+                                        */
+                                       for (i = 0; i < num_vtmove; i++)
+                                       {
+                                               Assert(vtmove[i].vacpage->offsets_used > 0);
+                                               (vtmove[i].vacpage->offsets_used)--;
+                                       }
                                        pfree(vtmove);
-                                       break;
+                                       break;          /* out of walk-along-page loop */
                                }
+
+                               /*
+                                * Okay, move the whle tuple chain
+                                */
                                ItemPointerSetInvalid(&Ctid);
                                for (ti = 0; ti < num_vtmove; ti++)
                                {
@@ -1789,15 +1899,19 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                         */
                                        heap_copytuple_with_tuple(&tuple, &newtup);
 
-                                       RelationInvalidateHeapTuple(onerel, &tuple);
+                                       /*
+                                        * register invalidation of source tuple in catcaches.
+                                        */
+                                       CacheInvalidateHeapTuple(onerel, &tuple);
 
-                                       /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
+                                       /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
                                        START_CRIT_SECTION();
 
-                                       TransactionIdStore(myXID, (TransactionId *) &(tuple.t_data->t_cmin));
-                                       tuple.t_data->t_infomask &=
-                                               ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
+                                       tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
+                                                                                                 HEAP_XMIN_INVALID |
+                                                                                                 HEAP_MOVED_IN);
                                        tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
+                                       HeapTupleHeaderSetXvac(tuple.t_data, myXID);
 
                                        /*
                                         * If this page was not used before - clean it.
@@ -1834,15 +1948,19 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                         * Update the state of the copied tuple, and store it
                                         * on the destination page.
                                         */
-                                       TransactionIdStore(myXID, (TransactionId *) &(newtup.t_data->t_cmin));
-                                       newtup.t_data->t_infomask &=
-                                               ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF);
+                                       newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
+                                                                                                  HEAP_XMIN_INVALID |
+                                                                                                  HEAP_MOVED_OFF);
                                        newtup.t_data->t_infomask |= HEAP_MOVED_IN;
-                                       newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
-                                                                                InvalidOffsetNumber, LP_USED);
+                                       HeapTupleHeaderSetXvac(newtup.t_data, myXID);
+                                       newoff = PageAddItem(ToPage,
+                                                                                (Item) newtup.t_data,
+                                                                                tuple_len,
+                                                                                InvalidOffsetNumber,
+                                                                                LP_USED);
                                        if (newoff == InvalidOffsetNumber)
                                        {
-                                               elog(STOP, "moving chain: failed to add item with len = %lu to page %u",
+                                               elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
                                                  (unsigned long) tuple_len, destvacpage->blkno);
                                        }
                                        newitemid = PageGetItemId(ToPage, newoff);
@@ -1851,6 +1969,8 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                        newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
                                        ItemPointerSet(&(newtup.t_self), destvacpage->blkno, newoff);
 
+                                       /* XLOG stuff */
+                                       if (!onerel->rd_istemp)
                                        {
                                                XLogRecPtr      recptr =
                                                log_heap_move(onerel, Cbuf, tuple.t_self,
@@ -1864,6 +1984,15 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                                PageSetLSN(ToPage, recptr);
                                                PageSetSUI(ToPage, ThisStartUpID);
                                        }
+                                       else
+                                       {
+                                               /*
+                                                * No XLOG record, but still need to flag that XID
+                                                * exists on disk
+                                                */
+                                               MyXactMadeTempRelUpdate = true;
+                                       }
+
                                        END_CRIT_SECTION();
 
                                        if (destvacpage->blkno > last_move_dest_block)
@@ -1906,12 +2035,15 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
 
                                        WriteBuffer(cur_buffer);
                                        WriteBuffer(Cbuf);
-                               }
+                               }                               /* end of move-the-tuple-chain loop */
+
                                cur_buffer = InvalidBuffer;
                                pfree(vtmove);
                                chain_tuple_moved = true;
+
+                               /* advance to next tuple in walk-along-page loop */
                                continue;
-                       }
+                       }                                       /* end of is-tuple-in-chain test */
 
                        /* try to find new page for this tuple */
                        if (cur_buffer == InvalidBuffer ||
@@ -1946,26 +2078,33 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                        /* copy tuple */
                        heap_copytuple_with_tuple(&tuple, &newtup);
 
-                       RelationInvalidateHeapTuple(onerel, &tuple);
+                       /*
+                        * register invalidation of source tuple in catcaches.
+                        *
+                        * (Note: we do not need to register the copied tuple, because we
+                        * are not changing the tuple contents and so there cannot be
+                        * any need to flush negative catcache entries.)
+                        */
+                       CacheInvalidateHeapTuple(onerel, &tuple);
 
-                       /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
+                       /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
                        START_CRIT_SECTION();
 
                        /*
-                        * Mark new tuple as moved_in by vacuum and store vacuum XID
-                        * in t_cmin !!!
+                        * Mark new tuple as MOVED_IN by me.
                         */
-                       TransactionIdStore(myXID, (TransactionId *) &(newtup.t_data->t_cmin));
-                       newtup.t_data->t_infomask &=
-                               ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF);
+                       newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
+                                                                                  HEAP_XMIN_INVALID |
+                                                                                  HEAP_MOVED_OFF);
                        newtup.t_data->t_infomask |= HEAP_MOVED_IN;
+                       HeapTupleHeaderSetXvac(newtup.t_data, myXID);
 
                        /* add tuple to the page */
                        newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
                                                                 InvalidOffsetNumber, LP_USED);
                        if (newoff == InvalidOffsetNumber)
                        {
-                               elog(STOP, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
+                               elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
                                         (unsigned long) tuple_len,
                                         cur_page->blkno, (unsigned long) cur_page->free,
                                         cur_page->offsets_used, cur_page->offsets_free);
@@ -1978,14 +2117,16 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                        newtup.t_self = newtup.t_data->t_ctid;
 
                        /*
-                        * Mark old tuple as moved_off by vacuum and store vacuum XID
-                        * in t_cmin !!!
+                        * Mark old tuple as MOVED_OFF by me.
                         */
-                       TransactionIdStore(myXID, (TransactionId *) &(tuple.t_data->t_cmin));
-                       tuple.t_data->t_infomask &=
-                               ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
+                       tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
+                                                                                 HEAP_XMIN_INVALID |
+                                                                                 HEAP_MOVED_IN);
                        tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
+                       HeapTupleHeaderSetXvac(tuple.t_data, myXID);
 
+                       /* XLOG stuff */
+                       if (!onerel->rd_istemp)
                        {
                                XLogRecPtr      recptr =
                                log_heap_move(onerel, buf, tuple.t_self,
@@ -1996,6 +2137,15 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                PageSetLSN(ToPage, recptr);
                                PageSetSUI(ToPage, ThisStartUpID);
                        }
+                       else
+                       {
+                               /*
+                                * No XLOG record, but still need to flag that XID exists
+                                * on disk
+                                */
+                               MyXactMadeTempRelUpdate = true;
+                       }
+
                        END_CRIT_SECTION();
 
                        cur_page->offsets_used++;
@@ -2017,10 +2167,20 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                        }
                }                                               /* walk along page */
 
+               /*
+                * If we broke out of the walk-along-page loop early (ie, still
+                * have offnum <= maxoff), then we failed to move some tuple off
+                * this page.  No point in shrinking any more, so clean up and
+                * exit the per-page loop.
+                */
                if (offnum < maxoff && keep_tuples > 0)
                {
                        OffsetNumber off;
 
+                       /*
+                        * Fix vacpage state for any unvisited tuples remaining on
+                        * page
+                        */
                        for (off = OffsetNumberNext(offnum);
                                 off <= maxoff;
                                 off = OffsetNumberNext(off))
@@ -2032,12 +2192,12 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
                                if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)
                                        continue;
-                               if ((TransactionId) tuple.t_data->t_cmin != myXID)
-                                       elog(ERROR, "Invalid XID in t_cmin (4)");
                                if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
-                                       elog(ERROR, "HEAP_MOVED_IN was not expected (2)");
+                                       elog(ERROR, "HEAP_MOVED_IN was not expected");
                                if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
                                {
+                                       if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
+                                               elog(ERROR, "invalid XVAC in tuple header");
                                        /* some chains was moved while */
                                        if (chain_tuple_moved)
                                        {                       /* cleaning this page */
@@ -2061,6 +2221,8 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                                keep_tuples--;
                                        }
                                }
+                               else
+                                       elog(ERROR, "HEAP_MOVED_OFF was expected");
                        }
                }
 
@@ -2080,7 +2242,7 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                        ReleaseBuffer(buf);
 
                if (offnum <= maxoff)
-                       break;                          /* some item(s) left */
+                       break;                          /* had to quit early, see above note */
 
        }                                                       /* walk along relation */
 
@@ -2109,14 +2271,15 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
        /*
         * We are not going to move any more tuples across pages, but we still
         * need to apply vacuum_page to compact free space in the remaining
-        * pages in vacuum_pages list.  Note that some of these pages may also
-        * be in the fraged_pages list, and may have had tuples moved onto them;
-        * if so, we already did vacuum_page and needn't do it again.
+        * pages in vacuum_pages list.  Note that some of these pages may also
+        * be in the fraged_pages list, and may have had tuples moved onto
+        * them; if so, we already did vacuum_page and needn't do it again.
         */
        for (i = 0, curpage = vacuum_pages->pagedesc;
                 i < vacuumed_pages;
                 i++, curpage++)
        {
+               CHECK_FOR_INTERRUPTS();
                Assert((*curpage)->blkno < blkno);
                if ((*curpage)->offsets_used == 0)
                {
@@ -2132,21 +2295,22 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
        }
 
        /*
-        * Now scan all the pages that we moved tuples onto and update
-        * tuple status bits.  This is not really necessary, but will save time
-        * for future transactions examining these tuples.
+        * Now scan all the pages that we moved tuples onto and update tuple
+        * status bits.  This is not really necessary, but will save time for
+        * future transactions examining these tuples.
         *
-        * XXX Notice that this code fails to clear HEAP_MOVED_OFF tuples from
-        * pages that were move source pages but not move dest pages.  One also
-        * wonders whether it wouldn't be better to skip this step and let the
-        * tuple status updates happen someplace that's not holding an exclusive
-        * lock on the relation.
+        * XXX NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
+        * pages that were move source pages but not move dest pages.  One
+        * also wonders whether it wouldn't be better to skip this step and
+        * let the tuple status updates happen someplace that's not holding an
+        * exclusive lock on the relation.
         */
        checked_moved = 0;
        for (i = 0, curpage = fraged_pages->pagedesc;
                 i < num_fraged_pages;
                 i++, curpage++)
        {
+               CHECK_FOR_INTERRUPTS();
                Assert((*curpage)->blkno < blkno);
                if ((*curpage)->blkno > last_move_dest_block)
                        break;                          /* no need to scan any further */
@@ -2168,17 +2332,18 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                        tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
                        if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
                        {
-                               if ((TransactionId) tuple.t_data->t_cmin != myXID)
-                                       elog(ERROR, "Invalid XID in t_cmin (2)");
+                               if (!(tuple.t_data->t_infomask & HEAP_MOVED))
+                                       elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
+                               if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
+                                       elog(ERROR, "invalid XVAC in tuple header");
                                if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
                                {
                                        tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
+                                       tuple.t_data->t_infomask &= ~HEAP_MOVED;
                                        num_tuples++;
                                }
-                               else if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
-                                       tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
                                else
-                                       elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
+                                       tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
                        }
                }
                LockBuffer(buf, BUFFER_LOCK_UNLOCK);
@@ -2188,10 +2353,18 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
        }
        Assert(num_moved == checked_moved);
 
-       elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u; Tuple(s) moved: %u.\n\t%s",
-                RelationGetRelationName(onerel),
-                nblocks, blkno, num_moved,
-                vac_show_rusage(&ru0));
+       /*
+        * It'd be cleaner to make this report at the bottom of this routine,
+        * but then the rusage would double-count the second pass of index
+        * vacuuming.  So do it here and ignore the relatively small amount of
+        * processing that occurs below.
+        */
+       ereport(elevel,
+                       (errmsg("\"%s\": moved %u tuples, truncated %u to %u pages",
+                                       RelationGetRelationName(onerel),
+                                       num_moved, nblocks, blkno),
+                        errdetail("%s",
+                                          vac_show_rusage(&ru0))));
 
        /*
         * Reflect the motion of system tuples to catalog cache here.
@@ -2226,8 +2399,7 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                if (vacpage->blkno == (blkno - 1) &&
                        vacpage->offsets_free > 0)
                {
-                       OffsetNumber unbuf[BLCKSZ/sizeof(OffsetNumber)];
-                       OffsetNumber *unused = unbuf;
+                       OffsetNumber unused[BLCKSZ / sizeof(OffsetNumber)];
                        int                     uncnt;
 
                        buf = ReadBuffer(onerel, vacpage->blkno);
@@ -2247,30 +2419,44 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
 
                                if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
                                {
-                                       if ((TransactionId) tuple.t_data->t_cmin != myXID)
-                                               elog(ERROR, "Invalid XID in t_cmin (3)");
                                        if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
                                        {
+                                               if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
+                                                       elog(ERROR, "invalid XVAC in tuple header");
                                                itemid->lp_flags &= ~LP_USED;
                                                num_tuples++;
                                        }
                                        else
-                                               elog(ERROR, "HEAP_MOVED_OFF was expected (2)");
+                                               elog(ERROR, "HEAP_MOVED_OFF was expected");
                                }
 
                        }
                        Assert(vacpage->offsets_free == num_tuples);
+
                        START_CRIT_SECTION();
+
                        uncnt = PageRepairFragmentation(page, unused);
+
+                       /* XLOG stuff */
+                       if (!onerel->rd_istemp)
                        {
                                XLogRecPtr      recptr;
 
-                               recptr = log_heap_clean(onerel, buf, (char *) unused,
-                                                 (char *) (&(unused[uncnt])) - (char *) unused);
+                               recptr = log_heap_clean(onerel, buf, unused, uncnt);
                                PageSetLSN(page, recptr);
                                PageSetSUI(page, ThisStartUpID);
                        }
+                       else
+                       {
+                               /*
+                                * No XLOG record, but still need to flag that XID exists
+                                * on disk
+                                */
+                               MyXactMadeTempRelUpdate = true;
+                       }
+
                        END_CRIT_SECTION();
+
                        LockBuffer(buf, BUFFER_LOCK_UNLOCK);
                        WriteBuffer(buf);
                }
@@ -2290,14 +2476,13 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
         */
        i = FlushRelationBuffers(onerel, blkno);
        if (i < 0)
-               elog(ERROR, "VACUUM (repair_frag): FlushRelationBuffers returned %d",
-                        i);
+               elog(ERROR, "FlushRelationBuffers returned %d", i);
 
        /* truncate relation, if needed */
        if (blkno < nblocks)
        {
                blkno = smgrtruncate(DEFAULT_SMGR, onerel, blkno);
-               onerel->rd_nblocks = blkno;     /* update relcache immediately */
+               onerel->rd_nblocks = blkno;             /* update relcache immediately */
                onerel->rd_targblock = InvalidBlockNumber;
                vacrelstats->rel_pages = blkno; /* set new number of blocks */
        }
@@ -2310,6 +2495,8 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
        ExecDropTupleTable(tupleTable, true);
 
        ExecCloseIndices(resultRelInfo);
+
+       FreeExecutorState(estate);
 }
 
 /*
@@ -2323,7 +2510,7 @@ vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
 {
        Buffer          buf;
        VacPage    *vacpage;
-       BlockNumber     relblocks;
+       BlockNumber relblocks;
        int                     nblocks;
        int                     i;
 
@@ -2332,6 +2519,7 @@ vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
 
        for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
        {
+               CHECK_FOR_INTERRUPTS();
                if ((*vacpage)->offsets_free > 0)
                {
                        buf = ReadBuffer(onerel, (*vacpage)->blkno);
@@ -2353,17 +2541,17 @@ vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
 
        i = FlushRelationBuffers(onerel, relblocks);
        if (i < 0)
-               elog(ERROR, "VACUUM (vacuum_heap): FlushRelationBuffers returned %d",
-                        i);
+               elog(ERROR, "FlushRelationBuffers returned %d", i);
 
        /* truncate relation if there are some empty end-pages */
        if (vacuum_pages->empty_end_pages > 0)
        {
-               elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u.",
-                        RelationGetRelationName(onerel),
-                        vacrelstats->rel_pages, relblocks);
+               ereport(elevel,
+                               (errmsg("\"%s\": truncated %u to %u pages",
+                                               RelationGetRelationName(onerel),
+                                               vacrelstats->rel_pages, relblocks)));
                relblocks = smgrtruncate(DEFAULT_SMGR, onerel, relblocks);
-               onerel->rd_nblocks = relblocks; /* update relcache immediately */
+               onerel->rd_nblocks = relblocks; /* update relcache immediately */
                onerel->rd_targblock = InvalidBlockNumber;
                vacrelstats->rel_pages = relblocks;             /* set new number of
                                                                                                 * blocks */
@@ -2377,8 +2565,7 @@ vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
 static void
 vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
 {
-       OffsetNumber unbuf[BLCKSZ/sizeof(OffsetNumber)];
-       OffsetNumber *unused = unbuf;
+       OffsetNumber unused[BLCKSZ / sizeof(OffsetNumber)];
        int                     uncnt;
        Page            page = BufferGetPage(buffer);
        ItemId          itemid;
@@ -2388,20 +2575,30 @@ vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
        Assert(vacpage->offsets_used == 0);
 
        START_CRIT_SECTION();
+
        for (i = 0; i < vacpage->offsets_free; i++)
        {
                itemid = PageGetItemId(page, vacpage->offsets[i]);
                itemid->lp_flags &= ~LP_USED;
        }
+
        uncnt = PageRepairFragmentation(page, unused);
+
+       /* XLOG stuff */
+       if (!onerel->rd_istemp)
        {
                XLogRecPtr      recptr;
 
-               recptr = log_heap_clean(onerel, buffer, (char *) unused,
-                                                 (char *) (&(unused[uncnt])) - (char *) unused);
+               recptr = log_heap_clean(onerel, buffer, unused, uncnt);
                PageSetLSN(page, recptr);
                PageSetSUI(page, ThisStartUpID);
        }
+       else
+       {
+               /* No XLOG record, but still need to flag that XID exists on disk */
+               MyXactMadeTempRelUpdate = true;
+       }
+
        END_CRIT_SECTION();
 }
 
@@ -2414,17 +2611,25 @@ static void
 scan_index(Relation indrel, double num_tuples)
 {
        IndexBulkDeleteResult *stats;
+       IndexVacuumCleanupInfo vcinfo;
        VacRUsage       ru0;
 
        vac_init_rusage(&ru0);
 
        /*
-        * Even though we're not planning to delete anything, use the
-        * ambulkdelete call, so that the scan happens within the index AM
-        * for more speed.
+        * Even though we're not planning to delete anything, we use the
+        * ambulkdelete call, because (a) the scan happens within the index AM
+        * for more speed, and (b) it may want to pass private statistics to
+        * the amvacuumcleanup call.
         */
        stats = index_bulk_delete(indrel, dummy_tid_reaped, NULL);
 
+       /* Do post-VACUUM cleanup, even though we deleted nothing */
+       vcinfo.vacuum_full = true;
+       vcinfo.message_level = elevel;
+
+       stats = index_vacuum_cleanup(indrel, &vcinfo, stats);
+
        if (!stats)
                return;
 
@@ -2433,23 +2638,29 @@ scan_index(Relation indrel, double num_tuples)
                                                stats->num_pages, stats->num_index_tuples,
                                                false);
 
-       elog(MESSAGE_LEVEL, "Index %s: Pages %u; Tuples %.0f.\n\t%s",
-                RelationGetRelationName(indrel),
-                stats->num_pages, stats->num_index_tuples,
-                vac_show_rusage(&ru0));
+       ereport(elevel,
+                       (errmsg("index \"%s\" now contains %.0f tuples in %u pages",
+                                       RelationGetRelationName(indrel),
+                                       stats->num_index_tuples,
+                                       stats->num_pages),
+                        errdetail("%u index pages have been deleted, %u are currently reusable.\n"
+                                          "%s",
+                                          stats->pages_deleted, stats->pages_free,
+                                          vac_show_rusage(&ru0))));
 
        /*
-        * Check for tuple count mismatch.  If the index is partial, then
-        * it's OK for it to have fewer tuples than the heap; else we got trouble.
+        * Check for tuple count mismatch.      If the index is partial, then it's
+        * OK for it to have fewer tuples than the heap; else we got trouble.
         */
        if (stats->num_index_tuples != num_tuples)
        {
                if (stats->num_index_tuples > num_tuples ||
-                       ! vac_is_partial_index(indrel))
-                       elog(NOTICE, "Index %s: NUMBER OF INDEX' TUPLES (%.0f) IS NOT THE SAME AS HEAP' (%.0f).\
-\n\tRecreate the index.",
-                                RelationGetRelationName(indrel),
-                                stats->num_index_tuples, num_tuples);
+                       !vac_is_partial_index(indrel))
+                       ereport(WARNING,
+                                       (errmsg("index \"%s\" contains %.0f tuples, but table contains %.0f tuples",
+                                                       RelationGetRelationName(indrel),
+                                                       stats->num_index_tuples, num_tuples),
+                                        errhint("Rebuild the index with REINDEX.")));
        }
 
        pfree(stats);
@@ -2472,6 +2683,7 @@ vacuum_index(VacPageList vacpagelist, Relation indrel,
                         double num_tuples, int keep_tuples)
 {
        IndexBulkDeleteResult *stats;
+       IndexVacuumCleanupInfo vcinfo;
        VacRUsage       ru0;
 
        vac_init_rusage(&ru0);
@@ -2479,6 +2691,12 @@ vacuum_index(VacPageList vacpagelist, Relation indrel,
        /* Do bulk deletion */
        stats = index_bulk_delete(indrel, tid_reaped, (void *) vacpagelist);
 
+       /* Do post-VACUUM cleanup */
+       vcinfo.vacuum_full = true;
+       vcinfo.message_level = elevel;
+
+       stats = index_vacuum_cleanup(indrel, &vcinfo, stats);
+
        if (!stats)
                return;
 
@@ -2487,23 +2705,31 @@ vacuum_index(VacPageList vacpagelist, Relation indrel,
                                                stats->num_pages, stats->num_index_tuples,
                                                false);
 
-       elog(MESSAGE_LEVEL, "Index %s: Pages %u; Tuples %.0f: Deleted %.0f.\n\t%s",
-                RelationGetRelationName(indrel), stats->num_pages,
-                stats->num_index_tuples - keep_tuples, stats->tuples_removed,
-                vac_show_rusage(&ru0));
+       ereport(elevel,
+                       (errmsg("index \"%s\" now contains %.0f tuples in %u pages",
+                                       RelationGetRelationName(indrel),
+                                       stats->num_index_tuples,
+                                       stats->num_pages),
+                        errdetail("%.0f index tuples were removed.\n"
+                "%u index pages have been deleted, %u are currently reusable.\n"
+                                          "%s",
+                                          stats->tuples_removed,
+                                          stats->pages_deleted, stats->pages_free,
+                                          vac_show_rusage(&ru0))));
 
        /*
-        * Check for tuple count mismatch.  If the index is partial, then
-        * it's OK for it to have fewer tuples than the heap; else we got trouble.
+        * Check for tuple count mismatch.      If the index is partial, then it's
+        * OK for it to have fewer tuples than the heap; else we got trouble.
         */
        if (stats->num_index_tuples != num_tuples + keep_tuples)
        {
                if (stats->num_index_tuples > num_tuples + keep_tuples ||
-                       ! vac_is_partial_index(indrel))
-                       elog(NOTICE, "Index %s: NUMBER OF INDEX' TUPLES (%.0f) IS NOT THE SAME AS HEAP' (%.0f).\
-\n\tRecreate the index.",
-                                RelationGetRelationName(indrel),
-                                stats->num_index_tuples, num_tuples);
+                       !vac_is_partial_index(indrel))
+                       ereport(WARNING,
+                                       (errmsg("index \"%s\" contains %.0f tuples, but table contains %.0f tuples",
+                                                       RelationGetRelationName(indrel),
+                                         stats->num_index_tuples, num_tuples + keep_tuples),
+                                        errhint("Rebuild the index with REINDEX.")));
        }
 
        pfree(stats);
@@ -2519,7 +2745,7 @@ vacuum_index(VacPageList vacpagelist, Relation indrel,
 static bool
 tid_reaped(ItemPointer itemptr, void *state)
 {
-       VacPageList     vacpagelist = (VacPageList) state;
+       VacPageList vacpagelist = (VacPageList) state;
        OffsetNumber ioffno;
        OffsetNumber *voff;
        VacPage         vp,
@@ -2579,34 +2805,51 @@ vac_update_fsm(Relation onerel, VacPageList fraged_pages,
                           BlockNumber rel_pages)
 {
        int                     nPages = fraged_pages->num_pages;
+       VacPage    *pagedesc = fraged_pages->pagedesc;
+       Size            threshold;
+       PageFreeSpaceInfo *pageSpaces;
+       int                     outPages;
        int                     i;
-       BlockNumber *pages;
-       Size       *spaceAvail;
+
+       /*
+        * We only report pages with free space at least equal to the average
+        * request size --- this avoids cluttering FSM with uselessly-small
+        * bits of space.  Although FSM would discard pages with little free
+        * space anyway, it's important to do this prefiltering because (a) it
+        * reduces the time spent holding the FSM lock in
+        * RecordRelationFreeSpace, and (b) FSM uses the number of pages
+        * reported as a statistic for guiding space management.  If we didn't
+        * threshold our reports the same way vacuumlazy.c does, we'd be
+        * skewing that statistic.
+        */
+       threshold = GetAvgFSMRequestSize(&onerel->rd_node);
 
        /* +1 to avoid palloc(0) */
-       pages = (BlockNumber *) palloc((nPages + 1) * sizeof(BlockNumber));
-       spaceAvail = (Size *) palloc((nPages + 1) * sizeof(Size));
+       pageSpaces = (PageFreeSpaceInfo *)
+               palloc((nPages + 1) * sizeof(PageFreeSpaceInfo));
+       outPages = 0;
 
        for (i = 0; i < nPages; i++)
        {
-               pages[i] = fraged_pages->pagedesc[i]->blkno;
-               spaceAvail[i] = fraged_pages->pagedesc[i]->free;
                /*
-                * fraged_pages may contain entries for pages that we later decided
-                * to truncate from the relation; don't enter them into the map!
+                * fraged_pages may contain entries for pages that we later
+                * decided to truncate from the relation; don't enter them into
+                * the free space map!
                 */
-               if (pages[i] >= rel_pages)
-               {
-                       nPages = i;
+               if (pagedesc[i]->blkno >= rel_pages)
                        break;
+
+               if (pagedesc[i]->free >= threshold)
+               {
+                       pageSpaces[outPages].blkno = pagedesc[i]->blkno;
+                       pageSpaces[outPages].avail = pagedesc[i]->free;
+                       outPages++;
                }
        }
 
-       MultiRecordFreeSpace(&onerel->rd_node,
-                                                0, MaxBlockNumber,
-                                                nPages, pages, spaceAvail);
-       pfree(pages);
-       pfree(spaceAvail);
+       RecordRelationFreeSpace(&onerel->rd_node, outPages, pageSpaces);
+
+       pfree(pageSpaces);
 }
 
 /* Copy a VacPage structure */
@@ -2617,7 +2860,7 @@ copy_vac_page(VacPage vacpage)
 
        /* allocate a VacPageData entry */
        newvacpage = (VacPage) palloc(sizeof(VacPageData) +
-                                                                 vacpage->offsets_free * sizeof(OffsetNumber));
+                                                  vacpage->offsets_free * sizeof(OffsetNumber));
 
        /* fill it in */
        if (vacpage->offsets_free > 0)
@@ -2661,7 +2904,7 @@ vpage_insert(VacPageList vacpagelist, VacPage vpnew)
 /*
  * vac_bsearch: just like standard C library routine bsearch(),
  * except that we first test to see whether the target key is outside
- * the range of the table entries.  This case is handled relatively slowly
+ * the range of the table entries.     This case is handled relatively slowly
  * by the normal binary search algorithm (ie, no faster than any other key)
  * but it occurs often enough in VACUUM to be worth optimizing.
  */
@@ -2769,7 +3012,7 @@ vac_open_indexes(Relation relation, int *nindexes, Relation **Irel)
        i = 0;
        foreach(indexoidscan, indexoidlist)
        {
-               Oid                     indexoid = lfirsti(indexoidscan);
+               Oid                     indexoid = lfirsto(indexoidscan);
 
                (*Irel)[i] = index_open(indexoid);
                i++;
@@ -2797,29 +3040,18 @@ vac_close_indexes(int nindexes, Relation *Irel)
 bool
 vac_is_partial_index(Relation indrel)
 {
-       bool            result;
-       HeapTuple       cachetuple;
-       Form_pg_index indexStruct;
-
        /*
-        * If the index's AM doesn't support nulls, it's partial for our purposes
+        * If the index's AM doesn't support nulls, it's partial for our
+        * purposes
         */
-       if (! indrel->rd_am->amindexnulls)
+       if (!indrel->rd_am->amindexnulls)
                return true;
 
        /* Otherwise, look to see if there's a partial-index predicate */
-       cachetuple = SearchSysCache(INDEXRELID,
-                                                               ObjectIdGetDatum(RelationGetRelid(indrel)),
-                                                               0, 0, 0);
-       if (!HeapTupleIsValid(cachetuple))
-               elog(ERROR, "vac_is_partial_index: index %u not found",
-                        RelationGetRelid(indrel));
-       indexStruct = (Form_pg_index) GETSTRUCT(cachetuple);
-
-       result = (VARSIZE(&indexStruct->indpred) > VARHDRSZ);
+       if (!heap_attisnull(indrel->rd_indextuple, Anum_pg_index_indpred))
+               return true;
 
-       ReleaseSysCache(cachetuple);
-       return result;
+       return false;
 }
 
 
@@ -2888,9 +3120,9 @@ vac_show_rusage(VacRUsage *ru0)
        snprintf(result, sizeof(result),
                         "CPU %d.%02ds/%d.%02du sec elapsed %d.%02d sec.",
                         (int) (ru1.ru.ru_stime.tv_sec - ru0->ru.ru_stime.tv_sec),
-                        (int) (ru1.ru.ru_stime.tv_usec - ru0->ru.ru_stime.tv_usec) / 10000,
+         (int) (ru1.ru.ru_stime.tv_usec - ru0->ru.ru_stime.tv_usec) / 10000,
                         (int) (ru1.ru.ru_utime.tv_sec - ru0->ru.ru_utime.tv_sec),
-                        (int) (ru1.ru.ru_utime.tv_usec - ru0->ru.ru_utime.tv_usec) / 10000,
+         (int) (ru1.ru.ru_utime.tv_usec - ru0->ru.ru_utime.tv_usec) / 10000,
                         (int) (ru1.tv.tv_sec - ru0->tv.tv_sec),
                         (int) (ru1.tv.tv_usec - ru0->tv.tv_usec) / 10000);