]> granicus.if.org Git - postgresql/blobdiff - src/backend/commands/vacuumlazy.c
Revert no-op changes to BufferGetPage()
[postgresql] / src / backend / commands / vacuumlazy.c
index acc6c427075be5db709e65f541579535723a760a..426e75609303901b77eed8b0e0b808b7d6379411 100644 (file)
  * relations with finite memory space usage.  To do that, we set upper bounds
  * on the number of tuples and pages we will keep track of at once.
  *
- * We are willing to use at most maintenance_work_mem memory space to keep
- * track of dead tuples.  We initially allocate an array of TIDs of that size,
- * with an upper limit that depends on table size (this limit ensures we don't
- * allocate a huge area uselessly for vacuuming small tables). If the array
- * threatens to overflow, we suspend the heap scan phase and perform a pass of
- * index cleanup and page compaction, then resume the heap scan with an empty
- * TID array.
+ * We are willing to use at most maintenance_work_mem (or perhaps
+ * autovacuum_work_mem) memory space to keep track of dead tuples.  We
+ * initially allocate an array of TIDs of that size, with an upper limit that
+ * depends on table size (this limit ensures we don't allocate a huge area
+ * uselessly for vacuuming small tables).  If the array threatens to overflow,
+ * we suspend the heap scan phase and perform a pass of index cleanup and page
+ * compaction, then resume the heap scan with an empty TID array.
  *
  * If we're processing a table with no indexes, we can just vacuum each page
  * as we go; there's no need to save up multiple tuples to minimize the number
  * the TID array, just enough to hold as many heap tuples as fit on one page.
  *
  *
- * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/commands/vacuumlazy.c,v 1.125 2009/12/19 01:32:34 sriggs Exp $
+ *       src/backend/commands/vacuumlazy.c
  *
  *-------------------------------------------------------------------------
  */
 
 #include "access/genam.h"
 #include "access/heapam.h"
+#include "access/heapam_xlog.h"
+#include "access/htup_details.h"
+#include "access/multixact.h"
 #include "access/transam.h"
 #include "access/visibilitymap.h"
+#include "access/xlog.h"
+#include "catalog/catalog.h"
 #include "catalog/storage.h"
 #include "commands/dbcommands.h"
+#include "commands/progress.h"
 #include "commands/vacuum.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "portability/instr_time.h"
 #include "postmaster/autovacuum.h"
 #include "storage/bufmgr.h"
 #include "storage/freespace.h"
 #include "storage/lmgr.h"
-#include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/pg_rusage.h"
+#include "utils/timestamp.h"
 #include "utils/tqual.h"
 
 
 #define REL_TRUNCATE_MINIMUM   1000
 #define REL_TRUNCATE_FRACTION  16
 
+/*
+ * Timing parameters for truncate locking heuristics.
+ *
+ * These were not exposed as user tunable GUC values because it didn't seem
+ * that the potential for improvement was great enough to merit the cost of
+ * supporting them.
+ */
+#define VACUUM_TRUNCATE_LOCK_CHECK_INTERVAL            20              /* ms */
+#define VACUUM_TRUNCATE_LOCK_WAIT_INTERVAL             50              /* ms */
+#define VACUUM_TRUNCATE_LOCK_TIMEOUT                   5000    /* ms */
+
 /*
  * Guesstimation of number of dead tuples per page.  This is used to
  * provide an upper limit to memory allocated when vacuuming small
  * Before we consider skipping a page that's marked as clean in
  * visibility map, we must've seen at least this many clean pages.
  */
-#define SKIP_PAGES_THRESHOLD   32
+#define SKIP_PAGES_THRESHOLD   ((BlockNumber) 32)
 
 typedef struct LVRelStats
 {
        /* hasindex = true means two-pass strategy; false means one-pass */
        bool            hasindex;
-       bool            scanned_all;    /* have we scanned all pages (this far)? */
        /* Overall statistics about rel */
-       BlockNumber rel_pages;
+       BlockNumber old_rel_pages;      /* previous value of pg_class.relpages */
+       BlockNumber rel_pages;          /* total number of pages */
+       BlockNumber scanned_pages;      /* number of pages we examined */
+       BlockNumber pinskipped_pages;           /* # of pages we skipped due to a pin */
+       BlockNumber frozenskipped_pages;        /* # of frozen pages we skipped */
+       double          scanned_tuples; /* counts only tuples on scanned pages */
        double          old_rel_tuples; /* previous value of pg_class.reltuples */
-       double          rel_tuples;             /* counts only tuples on scanned pages */
+       double          new_rel_tuples; /* new estimated total # of tuples */
+       double          new_dead_tuples;        /* new estimated total # of dead tuples */
        BlockNumber pages_removed;
        double          tuples_deleted;
        BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
@@ -99,6 +122,7 @@ typedef struct LVRelStats
        ItemPointer dead_tuples;        /* array of ItemPointerData */
        int                     num_index_scans;
        TransactionId latestRemovedXid;
+       bool            lock_waiter_detected;
 } LVRelStats;
 
 
@@ -107,14 +131,16 @@ static int        elevel = -1;
 
 static TransactionId OldestXmin;
 static TransactionId FreezeLimit;
+static MultiXactId MultiXactCutoff;
 
 static BufferAccessStrategy vac_strategy;
 
 
 /* non-export function prototypes */
 static void lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
-                          Relation *Irel, int nindexes, bool scan_all);
+                          Relation *Irel, int nindexes, bool aggressive);
 static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
+static bool lazy_check_needs_freeze(Buffer buf, bool *hastup);
 static void lazy_vacuum_index(Relation indrel,
                                  IndexBulkDeleteResult **stats,
                                  LVRelStats *vacrelstats);
@@ -122,7 +148,8 @@ static void lazy_cleanup_index(Relation indrel,
                                   IndexBulkDeleteResult *stats,
                                   LVRelStats *vacrelstats);
 static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
-                                int tupindex, LVRelStats *vacrelstats);
+                                int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer);
+static bool should_attempt_truncation(LVRelStats *vacrelstats);
 static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats);
 static BlockNumber count_nondeletable_pages(Relation onerel,
                                                 LVRelStats *vacrelstats);
@@ -131,6 +158,8 @@ static void lazy_record_dead_tuple(LVRelStats *vacrelstats,
                                           ItemPointer itemptr);
 static bool lazy_tid_reaped(ItemPointer itemptr, void *state);
 static int     vac_cmp_itemptr(const void *left, const void *right);
+static bool heap_page_is_all_visible(Relation rel, Buffer buf,
+                                        TransactionId *visibility_cutoff_xid, bool *all_frozen);
 
 
 /*
@@ -141,129 +170,225 @@ static int      vac_cmp_itemptr(const void *left, const void *right);
  *
  *             At entry, we have already established a transaction and opened
  *             and locked the relation.
- *
- *             The return value indicates whether this function has held off
- *             interrupts -- caller must RESUME_INTERRUPTS() after commit if true.
  */
-bool
-lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
-                               BufferAccessStrategy bstrategy, bool *scanned_all)
+void
+lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params,
+                               BufferAccessStrategy bstrategy)
 {
        LVRelStats *vacrelstats;
        Relation   *Irel;
        int                     nindexes;
-       BlockNumber possibly_freeable;
        PGRUsage        ru0;
        TimestampTz starttime = 0;
-       bool            scan_all;
-       TransactionId freezeTableLimit;
-       bool            heldoff = false;
+       long            secs;
+       int                     usecs;
+       double          read_rate,
+                               write_rate;
+       bool            aggressive;             /* should we scan all unfrozen pages? */
+       bool            scanned_all_unfrozen;   /* actually scanned all such pages? */
+       TransactionId xidFullScanLimit;
+       MultiXactId mxactFullScanLimit;
+       BlockNumber new_rel_pages;
+       double          new_rel_tuples;
+       BlockNumber new_rel_allvisible;
+       double          new_live_tuples;
+       TransactionId new_frozen_xid;
+       MultiXactId new_min_multi;
 
-       pg_rusage_init(&ru0);
+       Assert(params != NULL);
 
        /* measure elapsed time iff autovacuum logging requires it */
-       if (IsAutoVacuumWorkerProcess() && Log_autovacuum_min_duration > 0)
+       if (IsAutoVacuumWorkerProcess() && params->log_min_duration >= 0)
+       {
+               pg_rusage_init(&ru0);
                starttime = GetCurrentTimestamp();
+       }
 
-       if (vacstmt->options & VACOPT_VERBOSE)
+       if (options & VACOPT_VERBOSE)
                elevel = INFO;
        else
                elevel = DEBUG2;
 
+       pgstat_progress_start_command(PROGRESS_COMMAND_VACUUM,
+                                                                 RelationGetRelid(onerel));
+
        vac_strategy = bstrategy;
 
-       vacuum_set_xid_limits(vacstmt->freeze_min_age, vacstmt->freeze_table_age,
-                                                 onerel->rd_rel->relisshared,
-                                                 &OldestXmin, &FreezeLimit, &freezeTableLimit);
-       scan_all = TransactionIdPrecedesOrEquals(onerel->rd_rel->relfrozenxid,
-                                                                                        freezeTableLimit);
+       vacuum_set_xid_limits(onerel,
+                                                 params->freeze_min_age,
+                                                 params->freeze_table_age,
+                                                 params->multixact_freeze_min_age,
+                                                 params->multixact_freeze_table_age,
+                                                 &OldestXmin, &FreezeLimit, &xidFullScanLimit,
+                                                 &MultiXactCutoff, &mxactFullScanLimit);
+
+       /*
+        * We request an aggressive scan if either the table's frozen Xid is now
+        * older than or equal to the requested Xid full-table scan limit; or if
+        * the table's minimum MultiXactId is older than or equal to the requested
+        * mxid full-table scan limit.
+        */
+       aggressive = TransactionIdPrecedesOrEquals(onerel->rd_rel->relfrozenxid,
+                                                                                          xidFullScanLimit);
+       aggressive |= MultiXactIdPrecedesOrEquals(onerel->rd_rel->relminmxid,
+                                                                                         mxactFullScanLimit);
 
        vacrelstats = (LVRelStats *) palloc0(sizeof(LVRelStats));
 
-       vacrelstats->scanned_all = true;        /* will be cleared if we skip a page */
+       vacrelstats->old_rel_pages = onerel->rd_rel->relpages;
        vacrelstats->old_rel_tuples = onerel->rd_rel->reltuples;
        vacrelstats->num_index_scans = 0;
+       vacrelstats->pages_removed = 0;
+       vacrelstats->lock_waiter_detected = false;
 
        /* Open all indexes of the relation */
        vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &Irel);
        vacrelstats->hasindex = (nindexes > 0);
 
        /* Do the vacuuming */
-       lazy_scan_heap(onerel, vacrelstats, Irel, nindexes, scan_all);
+       lazy_scan_heap(onerel, vacrelstats, Irel, nindexes, aggressive);
 
        /* Done with indexes */
        vac_close_indexes(nindexes, Irel, NoLock);
 
        /*
-        * Optionally truncate the relation.
+        * Compute whether we actually scanned the whole relation. If we did, we
+        * can adjust relfrozenxid and relminmxid.
         *
-        * Don't even think about it unless we have a shot at releasing a goodly
-        * number of pages.  Otherwise, the time taken isn't worth it.
-        *
-        * Note that after we've truncated the heap, it's too late to abort the
-        * transaction; doing so would lose the sinval messages needed to tell
-        * the other backends about the table being shrunk.  We prevent interrupts
-        * in that case; caller is responsible for re-enabling them after
-        * committing the transaction.
+        * NB: We need to check this before truncating the relation, because that
+        * will change ->rel_pages.
         */
-       possibly_freeable = vacrelstats->rel_pages - vacrelstats->nonempty_pages;
-       if (possibly_freeable > 0 &&
-               (possibly_freeable >= REL_TRUNCATE_MINIMUM ||
-                possibly_freeable >= vacrelstats->rel_pages / REL_TRUNCATE_FRACTION))
+       if ((vacrelstats->scanned_pages + vacrelstats->frozenskipped_pages)
+               < vacrelstats->rel_pages)
        {
-               HOLD_INTERRUPTS();
-               heldoff = true;
-               lazy_truncate_heap(onerel, vacrelstats);
+               Assert(!aggressive);
+               scanned_all_unfrozen = false;
        }
+       else
+               scanned_all_unfrozen = true;
+
+       /*
+        * Optionally truncate the relation.
+        */
+       if (should_attempt_truncation(vacrelstats))
+               lazy_truncate_heap(onerel, vacrelstats);
+
+       /* Report that we are now doing final cleanup */
+       pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
+                                                                PROGRESS_VACUUM_PHASE_FINAL_CLEANUP);
 
        /* Vacuum the Free Space Map */
        FreeSpaceMapVacuum(onerel);
 
        /*
-        * Update statistics in pg_class.  But only if we didn't skip any pages;
-        * the tuple count only includes tuples from the pages we've visited, and
-        * we haven't frozen tuples in unvisited pages either.  The page count is
-        * accurate in any case, but because we use the reltuples / relpages ratio
-        * in the planner, it's better to not update relpages either if we can't
-        * update reltuples.
+        * Update statistics in pg_class.
+        *
+        * A corner case here is that if we scanned no pages at all because every
+        * page is all-visible, we should not update relpages/reltuples, because
+        * we have no new information to contribute.  In particular this keeps us
+        * from replacing relpages=reltuples=0 (which means "unknown tuple
+        * density") with nonzero relpages and reltuples=0 (which means "zero
+        * tuple density") unless there's some actual evidence for the latter.
+        *
+        * We do update relallvisible even in the corner case, since if the table
+        * is all-visible we'd definitely like to know that.  But clamp the value
+        * to be not more than what we're setting relpages to.
+        *
+        * Also, don't change relfrozenxid/relminmxid if we skipped any pages,
+        * since then we don't know for certain that all tuples have a newer xmin.
         */
-       if (vacrelstats->scanned_all)
-               vac_update_relstats(onerel,
-                                                       vacrelstats->rel_pages, vacrelstats->rel_tuples,
-                                                       vacrelstats->hasindex,
-                                                       FreezeLimit);
+       new_rel_pages = vacrelstats->rel_pages;
+       new_rel_tuples = vacrelstats->new_rel_tuples;
+       if (vacrelstats->scanned_pages == 0 && new_rel_pages > 0)
+       {
+               new_rel_pages = vacrelstats->old_rel_pages;
+               new_rel_tuples = vacrelstats->old_rel_tuples;
+       }
+
+       visibilitymap_count(onerel, &new_rel_allvisible, NULL);
+       if (new_rel_allvisible > new_rel_pages)
+               new_rel_allvisible = new_rel_pages;
+
+       new_frozen_xid = scanned_all_unfrozen ? FreezeLimit : InvalidTransactionId;
+       new_min_multi = scanned_all_unfrozen ? MultiXactCutoff : InvalidMultiXactId;
+
+       vac_update_relstats(onerel,
+                                               new_rel_pages,
+                                               new_rel_tuples,
+                                               new_rel_allvisible,
+                                               vacrelstats->hasindex,
+                                               new_frozen_xid,
+                                               new_min_multi,
+                                               false);
 
        /* report results to the stats collector, too */
+       new_live_tuples = new_rel_tuples - vacrelstats->new_dead_tuples;
+       if (new_live_tuples < 0)
+               new_live_tuples = 0;    /* just in case */
+
        pgstat_report_vacuum(RelationGetRelid(onerel),
                                                 onerel->rd_rel->relisshared,
-                                                vacrelstats->scanned_all,
-                                                (vacstmt->options & VACOPT_ANALYZE) != 0,
-                                                vacrelstats->rel_tuples);
+                                                new_live_tuples,
+                                                vacrelstats->new_dead_tuples);
+       pgstat_progress_end_command();
 
        /* and log the action if appropriate */
-       if (IsAutoVacuumWorkerProcess() && Log_autovacuum_min_duration >= 0)
+       if (IsAutoVacuumWorkerProcess() && params->log_min_duration >= 0)
        {
-               if (Log_autovacuum_min_duration == 0 ||
-                       TimestampDifferenceExceeds(starttime, GetCurrentTimestamp(),
-                                                                          Log_autovacuum_min_duration))
-                       ereport(LOG,
-                                       (errmsg("automatic vacuum of table \"%s.%s.%s\": index scans: %d\n"
-                                                       "pages: %d removed, %d remain\n"
-                                                       "tuples: %.0f removed, %.0f remain\n"
-                                                       "system usage: %s",
-                                                       get_database_name(MyDatabaseId),
-                                                       get_namespace_name(RelationGetNamespace(onerel)),
-                                                       RelationGetRelationName(onerel),
-                                                       vacrelstats->num_index_scans,
-                                                 vacrelstats->pages_removed, vacrelstats->rel_pages,
-                                               vacrelstats->tuples_deleted, vacrelstats->rel_tuples,
-                                                       pg_rusage_show(&ru0))));
-       }
+               TimestampTz endtime = GetCurrentTimestamp();
+
+               if (params->log_min_duration == 0 ||
+                       TimestampDifferenceExceeds(starttime, endtime,
+                                                                          params->log_min_duration))
+               {
+                       StringInfoData buf;
+
+                       TimestampDifference(starttime, endtime, &secs, &usecs);
+
+                       read_rate = 0;
+                       write_rate = 0;
+                       if ((secs > 0) || (usecs > 0))
+                       {
+                               read_rate = (double) BLCKSZ *VacuumPageMiss / (1024 * 1024) /
+                                                       (secs + usecs / 1000000.0);
+                               write_rate = (double) BLCKSZ *VacuumPageDirty / (1024 * 1024) /
+                                                       (secs + usecs / 1000000.0);
+                       }
 
-       if (scanned_all)
-               *scanned_all = vacrelstats->scanned_all;
+                       /*
+                        * This is pretty messy, but we split it up so that we can skip
+                        * emitting individual parts of the message when not applicable.
+                        */
+                       initStringInfo(&buf);
+                       appendStringInfo(&buf, _("automatic vacuum of table \"%s.%s.%s\": index scans: %d\n"),
+                                                        get_database_name(MyDatabaseId),
+                                                        get_namespace_name(RelationGetNamespace(onerel)),
+                                                        RelationGetRelationName(onerel),
+                                                        vacrelstats->num_index_scans);
+                       appendStringInfo(&buf, _("pages: %u removed, %u remain, %u skipped due to pins, %u skipped frozen\n"),
+                                                        vacrelstats->pages_removed,
+                                                        vacrelstats->rel_pages,
+                                                        vacrelstats->pinskipped_pages,
+                                                        vacrelstats->frozenskipped_pages);
+                       appendStringInfo(&buf,
+                                                        _("tuples: %.0f removed, %.0f remain, %.0f are dead but not yet removable\n"),
+                                                        vacrelstats->tuples_deleted,
+                                                        vacrelstats->new_rel_tuples,
+                                                        vacrelstats->new_dead_tuples);
+                       appendStringInfo(&buf,
+                                                _("buffer usage: %d hits, %d misses, %d dirtied\n"),
+                                                        VacuumPageHit,
+                                                        VacuumPageMiss,
+                                                        VacuumPageDirty);
+                       appendStringInfo(&buf, _("avg read rate: %.3f MB/s, avg write rate: %.3f MB/s\n"),
+                                                        read_rate, write_rate);
+                       appendStringInfo(&buf, _("system usage: %s"), pg_rusage_show(&ru0));
 
-       return heldoff;
+                       ereport(LOG,
+                                       (errmsg_internal("%s", buf.data)));
+                       pfree(buf.data);
+               }
+       }
 }
 
 /*
@@ -286,36 +411,44 @@ static void
 vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats)
 {
        /*
-        * No need to log changes for temp tables, they do not contain
-        * data visible on the standby server.
+        * Skip this for relations for which no WAL is to be written, or if we're
+        * not trying to support archive recovery.
         */
-       if (rel->rd_istemp || !XLogArchivingActive())
+       if (!RelationNeedsWAL(rel) || !XLogIsNeeded())
                return;
 
-       (void) log_heap_cleanup_info(rel->rd_node, vacrelstats->latestRemovedXid);
+       /*
+        * No need to write the record at all unless it contains a valid value
+        */
+       if (TransactionIdIsValid(vacrelstats->latestRemovedXid))
+               (void) log_heap_cleanup_info(rel->rd_node, vacrelstats->latestRemovedXid);
 }
 
 /*
  *     lazy_scan_heap() -- scan an open heap relation
  *
- *             This routine sets commit status bits, builds lists of dead tuples
- *             and pages with free space, and calculates statistics on the number
- *             of live tuples in the heap.  When done, or when we run low on space
- *             for dead-tuple TIDs, invoke vacuuming of indexes and heap.
+ *             This routine prunes each page in the heap, which will among other
+ *             things truncate dead tuples to dead line pointers, defragment the
+ *             page, and set commit status bits (see heap_page_prune).  It also builds
+ *             lists of dead tuples and pages with free space, calculates statistics
+ *             on the number of live tuples in the heap, and marks pages as
+ *             all-visible if appropriate.  When done, or when we run low on space for
+ *             dead-tuple TIDs, invoke vacuuming of indexes and call lazy_vacuum_heap
+ *             to reclaim dead line pointers.
  *
- *             If there are no indexes then we just vacuum each dirty page as we
- *             process it, since there's no point in gathering many tuples.
+ *             If there are no indexes then we can reclaim line pointers on the fly;
+ *             dead line pointers need only be retained until all index pointers that
+ *             reference them have been killed.
  */
 static void
 lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
-                          Relation *Irel, int nindexes, bool scan_all)
+                          Relation *Irel, int nindexes, bool aggressive)
 {
        BlockNumber nblocks,
                                blkno;
        HeapTupleData tuple;
        char       *relname;
        BlockNumber empty_pages,
-                               scanned_pages,
                                vacuumed_pages;
        double          num_tuples,
                                tups_vacuumed,
@@ -325,7 +458,16 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
        int                     i;
        PGRUsage        ru0;
        Buffer          vmbuffer = InvalidBuffer;
-       BlockNumber all_visible_streak;
+       BlockNumber next_unskippable_block;
+       bool            skipping_blocks;
+       xl_heap_freeze_tuple *frozen;
+       StringInfoData buf;
+       const int       initprog_index[] = {
+               PROGRESS_VACUUM_PHASE,
+               PROGRESS_VACUUM_TOTAL_HEAP_BLKS,
+               PROGRESS_VACUUM_MAX_DEAD_TUPLES
+       };
+       int64           initprog_val[3];
 
        pg_rusage_init(&ru0);
 
@@ -335,7 +477,7 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
                                        get_namespace_name(RelationGetNamespace(onerel)),
                                        relname)));
 
-       empty_pages = vacuumed_pages = scanned_pages = 0;
+       empty_pages = vacuumed_pages = 0;
        num_tuples = tups_vacuumed = nkeep = nunused = 0;
 
        indstats = (IndexBulkDeleteResult **)
@@ -343,12 +485,89 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 
        nblocks = RelationGetNumberOfBlocks(onerel);
        vacrelstats->rel_pages = nblocks;
+       vacrelstats->scanned_pages = 0;
        vacrelstats->nonempty_pages = 0;
        vacrelstats->latestRemovedXid = InvalidTransactionId;
 
        lazy_space_alloc(vacrelstats, nblocks);
+       frozen = palloc(sizeof(xl_heap_freeze_tuple) * MaxHeapTuplesPerPage);
+
+       /* Report that we're scanning the heap, advertising total # of blocks */
+       initprog_val[0] = PROGRESS_VACUUM_PHASE_SCAN_HEAP;
+       initprog_val[1] = nblocks;
+       initprog_val[2] = vacrelstats->max_dead_tuples;
+       pgstat_progress_update_multi_param(3, initprog_index, initprog_val);
+
+       /*
+        * Except when aggressive is set, we want to skip pages that are
+        * all-visible according to the visibility map, but only when we can skip
+        * at least SKIP_PAGES_THRESHOLD consecutive pages.  Since we're reading
+        * sequentially, the OS should be doing readahead for us, so there's no
+        * gain in skipping a page now and then; that's likely to disable
+        * readahead and so be counterproductive. Also, skipping even a single
+        * page means that we can't update relfrozenxid, so we only want to do it
+        * if we can skip a goodly number of pages.
+        *
+        * When aggressive is set, we can't skip pages just because they are
+        * all-visible, but we can still skip pages that are all-frozen, since
+        * such pages do not need freezing and do not affect the value that we can
+        * safely set for relfrozenxid or relminmxid.
+        *
+        * Before entering the main loop, establish the invariant that
+        * next_unskippable_block is the next block number >= blkno that's not we
+        * can't skip based on the visibility map, either all-visible for a
+        * regular scan or all-frozen for an aggressive scan.  We set it to
+        * nblocks if there's no such block.  We also set up the skipping_blocks
+        * flag correctly at this stage.
+        *
+        * Note: The value returned by visibilitymap_get_status could be slightly
+        * out-of-date, since we make this test before reading the corresponding
+        * heap page or locking the buffer.  This is OK.  If we mistakenly think
+        * that the page is all-visible or all-frozen when in fact the flag's just
+        * been cleared, we might fail to vacuum the page.  It's easy to see that
+        * skipping a page when aggressive is not set is not a very big deal; we
+        * might leave some dead tuples lying around, but the next vacuum will
+        * find them.  But even when aggressive *is* set, it's still OK if we miss
+        * a page whose all-frozen marking has just been cleared.  Any new XIDs
+        * just added to that page are necessarily newer than the GlobalXmin we
+        * computed, so they'll have no effect on the value to which we can safely
+        * set relfrozenxid.  A similar argument applies for MXIDs and relminmxid.
+        *
+        * We will scan the table's last page, at least to the extent of
+        * determining whether it has tuples or not, even if it should be skipped
+        * according to the above rules; except when we've already determined that
+        * it's not worth trying to truncate the table.  This avoids having
+        * lazy_truncate_heap() take access-exclusive lock on the table to attempt
+        * a truncation that just fails immediately because there are tuples in
+        * the last page.  This is worth avoiding mainly because such a lock must
+        * be replayed on any hot standby, where it can be disruptive.
+        */
+       for (next_unskippable_block = 0;
+                next_unskippable_block < nblocks;
+                next_unskippable_block++)
+       {
+               uint8           vmstatus;
+
+               vmstatus = visibilitymap_get_status(onerel, next_unskippable_block,
+                                                                                       &vmbuffer);
+               if (aggressive)
+               {
+                       if ((vmstatus & VISIBILITYMAP_ALL_FROZEN) == 0)
+                               break;
+               }
+               else
+               {
+                       if ((vmstatus & VISIBILITYMAP_ALL_VISIBLE) == 0)
+                               break;
+               }
+               vacuum_delay_point();
+       }
+
+       if (next_unskippable_block >= SKIP_PAGES_THRESHOLD)
+               skipping_blocks = true;
+       else
+               skipping_blocks = false;
 
-       all_visible_streak = 0;
        for (blkno = 0; blkno < nblocks; blkno++)
        {
                Buffer          buf;
@@ -358,44 +577,93 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
                bool            tupgone,
                                        hastup;
                int                     prev_dead_count;
-               OffsetNumber frozen[MaxOffsetNumber];
                int                     nfrozen;
                Size            freespace;
                bool            all_visible_according_to_vm = false;
                bool            all_visible;
+               bool            all_frozen = true;      /* provided all_visible is also true */
+               bool            has_dead_tuples;
+               TransactionId visibility_cutoff_xid = InvalidTransactionId;
 
-               /*
-                * Skip pages that don't require vacuuming according to the visibility
-                * map. But only if we've seen a streak of at least
-                * SKIP_PAGES_THRESHOLD pages marked as clean. Since we're reading
-                * sequentially, the OS should be doing readahead for us and there's
-                * no gain in skipping a page now and then. You need a longer run of
-                * consecutive skipped pages before it's worthwhile. Also, skipping
-                * even a single page means that we can't update relfrozenxid or
-                * reltuples, so we only want to do it if there's a good chance to
-                * skip a goodly number of pages.
-                */
-               if (!scan_all)
+               /* see note above about forcing scanning of last page */
+#define FORCE_CHECK_PAGE() \
+               (blkno == nblocks - 1 && should_attempt_truncation(vacrelstats))
+
+               pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, blkno);
+
+               if (blkno == next_unskippable_block)
                {
-                       all_visible_according_to_vm =
-                               visibilitymap_test(onerel, blkno, &vmbuffer);
-                       if (all_visible_according_to_vm)
+                       /* Time to advance next_unskippable_block */
+                       for (next_unskippable_block++;
+                                next_unskippable_block < nblocks;
+                                next_unskippable_block++)
                        {
-                               all_visible_streak++;
-                               if (all_visible_streak >= SKIP_PAGES_THRESHOLD)
+                               uint8           vmskipflags;
+
+                               vmskipflags = visibilitymap_get_status(onerel,
+                                                                                                          next_unskippable_block,
+                                                                                                          &vmbuffer);
+                               if (aggressive)
+                               {
+                                       if ((vmskipflags & VISIBILITYMAP_ALL_FROZEN) == 0)
+                                               break;
+                               }
+                               else
                                {
-                                       vacrelstats->scanned_all = false;
-                                       continue;
+                                       if ((vmskipflags & VISIBILITYMAP_ALL_VISIBLE) == 0)
+                                               break;
                                }
+                               vacuum_delay_point();
                        }
+
+                       /*
+                        * We know we can't skip the current block.  But set up
+                        * skipping_all_visible_blocks to do the right thing at the
+                        * following blocks.
+                        */
+                       if (next_unskippable_block - blkno > SKIP_PAGES_THRESHOLD)
+                               skipping_blocks = true;
                        else
-                               all_visible_streak = 0;
+                               skipping_blocks = false;
+
+                       /*
+                        * Normally, the fact that we can't skip this block must mean that
+                        * it's not all-visible.  But in an aggressive vacuum we know only
+                        * that it's not all-frozen, so it might still be all-visible.
+                        */
+                       if (aggressive && VM_ALL_VISIBLE(onerel, blkno, &vmbuffer))
+                               all_visible_according_to_vm = true;
+               }
+               else
+               {
+                       /*
+                        * The current block is potentially skippable; if we've seen a
+                        * long enough run of skippable blocks to justify skipping it, and
+                        * we're not forced to check it, then go ahead and skip.
+                        * Otherwise, the page must be at least all-visible if not
+                        * all-frozen, so we can set all_visible_according_to_vm = true.
+                        */
+                       if (skipping_blocks && !FORCE_CHECK_PAGE())
+                       {
+                               /*
+                                * Tricky, tricky.  If this is in aggressive vacuum, the page
+                                * must have been all-frozen at the time we checked whether it
+                                * was skippable, but it might not be any more.  We must be
+                                * careful to count it as a skipped all-frozen page in that
+                                * case, or else we'll think we can't update relfrozenxid and
+                                * relminmxid.  If it's not an aggressive vacuum, we don't
+                                * know whether it was all-frozen, so we have to recheck; but
+                                * in this case an approximate answer is OK.
+                                */
+                               if (aggressive || VM_ALL_FROZEN(onerel, blkno, &vmbuffer))
+                                       vacrelstats->frozenskipped_pages++;
+                               continue;
+                       }
+                       all_visible_according_to_vm = true;
                }
 
                vacuum_delay_point();
 
-               scanned_pages++;
-
                /*
                 * If we are close to overrunning the available space for dead-tuple
                 * TIDs, pause and do a cycle of vacuuming before we tackle this page.
@@ -403,27 +671,137 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
                if ((vacrelstats->max_dead_tuples - vacrelstats->num_dead_tuples) < MaxHeapTuplesPerPage &&
                        vacrelstats->num_dead_tuples > 0)
                {
+                       const int       hvp_index[] = {
+                               PROGRESS_VACUUM_PHASE,
+                               PROGRESS_VACUUM_NUM_INDEX_VACUUMS
+                       };
+                       int64           hvp_val[2];
+
+                       /*
+                        * Before beginning index vacuuming, we release any pin we may
+                        * hold on the visibility map page.  This isn't necessary for
+                        * correctness, but we do it anyway to avoid holding the pin
+                        * across a lengthy, unrelated operation.
+                        */
+                       if (BufferIsValid(vmbuffer))
+                       {
+                               ReleaseBuffer(vmbuffer);
+                               vmbuffer = InvalidBuffer;
+                       }
+
                        /* Log cleanup info before we touch indexes */
                        vacuum_log_cleanup_info(onerel, vacrelstats);
 
+                       /* Report that we are now vacuuming indexes */
+                       pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
+                                                                                PROGRESS_VACUUM_PHASE_VACUUM_INDEX);
+
                        /* Remove index entries */
                        for (i = 0; i < nindexes; i++)
                                lazy_vacuum_index(Irel[i],
                                                                  &indstats[i],
                                                                  vacrelstats);
+
+                       /*
+                        * Report that we are now vacuuming the heap.  We also increase
+                        * the number of index scans here; note that by using
+                        * pgstat_progress_update_multi_param we can update both
+                        * parameters atomically.
+                        */
+                       hvp_val[0] = PROGRESS_VACUUM_PHASE_VACUUM_HEAP;
+                       hvp_val[1] = vacrelstats->num_index_scans + 1;
+                       pgstat_progress_update_multi_param(2, hvp_index, hvp_val);
+
                        /* Remove tuples from heap */
                        lazy_vacuum_heap(onerel, vacrelstats);
-                       /* Forget the now-vacuumed tuples, and press on */
+
+                       /*
+                        * Forget the now-vacuumed tuples, and press on, but be careful
+                        * not to reset latestRemovedXid since we want that value to be
+                        * valid.
+                        */
                        vacrelstats->num_dead_tuples = 0;
-                       vacrelstats->latestRemovedXid = InvalidTransactionId;
                        vacrelstats->num_index_scans++;
+
+                       /* Report that we are once again scanning the heap */
+                       pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
+                                                                                PROGRESS_VACUUM_PHASE_SCAN_HEAP);
                }
 
+               /*
+                * Pin the visibility map page in case we need to mark the page
+                * all-visible.  In most cases this will be very cheap, because we'll
+                * already have the correct page pinned anyway.  However, it's
+                * possible that (a) next_unskippable_block is covered by a different
+                * VM page than the current block or (b) we released our pin and did a
+                * cycle of index vacuuming.
+                *
+                */
+               visibilitymap_pin(onerel, blkno, &vmbuffer);
+
                buf = ReadBufferExtended(onerel, MAIN_FORKNUM, blkno,
                                                                 RBM_NORMAL, vac_strategy);
 
                /* We need buffer cleanup lock so that we can prune HOT chains. */
-               LockBufferForCleanup(buf);
+               if (!ConditionalLockBufferForCleanup(buf))
+               {
+                       /*
+                        * If we're not performing an aggressive scan to guard against XID
+                        * wraparound, and we don't want to forcibly check the page, then
+                        * it's OK to skip vacuuming pages we get a lock conflict on. They
+                        * will be dealt with in some future vacuum.
+                        */
+                       if (!aggressive && !FORCE_CHECK_PAGE())
+                       {
+                               ReleaseBuffer(buf);
+                               vacrelstats->pinskipped_pages++;
+                               continue;
+                       }
+
+                       /*
+                        * Read the page with share lock to see if any xids on it need to
+                        * be frozen.  If not we just skip the page, after updating our
+                        * scan statistics.  If there are some, we wait for cleanup lock.
+                        *
+                        * We could defer the lock request further by remembering the page
+                        * and coming back to it later, or we could even register
+                        * ourselves for multiple buffers and then service whichever one
+                        * is received first.  For now, this seems good enough.
+                        *
+                        * If we get here with aggressive false, then we're just forcibly
+                        * checking the page, and so we don't want to insist on getting
+                        * the lock; we only need to know if the page contains tuples, so
+                        * that we can update nonempty_pages correctly.  It's convenient
+                        * to use lazy_check_needs_freeze() for both situations, though.
+                        */
+                       LockBuffer(buf, BUFFER_LOCK_SHARE);
+                       if (!lazy_check_needs_freeze(buf, &hastup))
+                       {
+                               UnlockReleaseBuffer(buf);
+                               vacrelstats->scanned_pages++;
+                               vacrelstats->pinskipped_pages++;
+                               if (hastup)
+                                       vacrelstats->nonempty_pages = blkno + 1;
+                               continue;
+                       }
+                       if (!aggressive)
+                       {
+                               /*
+                                * Here, we must not advance scanned_pages; that would amount
+                                * to claiming that the page contains no freezable tuples.
+                                */
+                               UnlockReleaseBuffer(buf);
+                               vacrelstats->pinskipped_pages++;
+                               if (hastup)
+                                       vacrelstats->nonempty_pages = blkno + 1;
+                               continue;
+                       }
+                       LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+                       LockBufferForCleanup(buf);
+                       /* drop through to normal processing */
+               }
+
+               vacrelstats->scanned_pages++;
 
                page = BufferGetPage(buf);
 
@@ -474,25 +852,36 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
                        empty_pages++;
                        freespace = PageGetHeapFreeSpace(page);
 
+                       /* empty pages are always all-visible and all-frozen */
                        if (!PageIsAllVisible(page))
                        {
-                               PageSetAllVisible(page);
-                               SetBufferCommitInfoNeedsSave(buf);
-                       }
+                               START_CRIT_SECTION();
 
-                       LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+                               /* mark buffer dirty before writing a WAL record */
+                               MarkBufferDirty(buf);
 
-                       /* Update the visibility map */
-                       if (!all_visible_according_to_vm)
-                       {
-                               visibilitymap_pin(onerel, blkno, &vmbuffer);
-                               LockBuffer(buf, BUFFER_LOCK_SHARE);
-                               if (PageIsAllVisible(page))
-                                       visibilitymap_set(onerel, blkno, PageGetLSN(page), &vmbuffer);
-                               LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+                               /*
+                                * It's possible that another backend has extended the heap,
+                                * initialized the page, and then failed to WAL-log the page
+                                * due to an ERROR.  Since heap extension is not WAL-logged,
+                                * recovery might try to replay our record setting the page
+                                * all-visible and find that the page isn't initialized, which
+                                * will cause a PANIC.  To prevent that, check whether the
+                                * page has been previously WAL-logged, and if not, do that
+                                * now.
+                                */
+                               if (RelationNeedsWAL(onerel) &&
+                                       PageGetLSN(page) == InvalidXLogRecPtr)
+                                       log_newpage_buffer(buf, true);
+
+                               PageSetAllVisible(page);
+                               visibilitymap_set(onerel, blkno, buf, InvalidXLogRecPtr,
+                                                                 vmbuffer, InvalidTransactionId,
+                                          VISIBILITYMAP_ALL_VISIBLE | VISIBILITYMAP_ALL_FROZEN);
+                               END_CRIT_SECTION();
                        }
 
-                       ReleaseBuffer(buf);
+                       UnlockReleaseBuffer(buf);
                        RecordPageWithFreeSpace(onerel, blkno, freespace);
                        continue;
                }
@@ -502,18 +891,24 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
                 *
                 * We count tuples removed by the pruning step as removed by VACUUM.
                 */
-               tups_vacuumed += heap_page_prune(onerel, buf, OldestXmin,
-                                                                                false, false);
+               tups_vacuumed += heap_page_prune(onerel, buf, OldestXmin, false,
+                                                                                &vacrelstats->latestRemovedXid);
 
                /*
                 * Now scan the page to collect vacuumable items and check for tuples
                 * requiring freezing.
                 */
                all_visible = true;
+               has_dead_tuples = false;
                nfrozen = 0;
                hastup = false;
                prev_dead_count = vacrelstats->num_dead_tuples;
                maxoff = PageGetMaxOffsetNumber(page);
+
+               /*
+                * Note: If you change anything in the loop below, also look at
+                * heap_page_is_all_visible to see if that needs to be changed.
+                */
                for (offnum = FirstOffsetNumber;
                         offnum <= maxoff;
                         offnum = OffsetNumberNext(offnum))
@@ -555,10 +950,11 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 
                        tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
                        tuple.t_len = ItemIdGetLength(itemid);
+                       tuple.t_tableOid = RelationGetRelid(onerel);
 
                        tupgone = false;
 
-                       switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin, buf))
+                       switch (HeapTupleSatisfiesVacuum(&tuple, OldestXmin, buf))
                        {
                                case HEAPTUPLE_DEAD:
 
@@ -597,14 +993,14 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
                                         * NB: Like with per-tuple hint bits, we can't set the
                                         * PD_ALL_VISIBLE flag if the inserter committed
                                         * asynchronously. See SetHintBits for more info. Check
-                                        * that the HEAP_XMIN_COMMITTED hint bit is set because of
+                                        * that the tuple is hinted xmin-committed because of
                                         * that.
                                         */
                                        if (all_visible)
                                        {
                                                TransactionId xmin;
 
-                                               if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
+                                               if (!HeapTupleHeaderXminCommitted(tuple.t_data))
                                                {
                                                        all_visible = false;
                                                        break;
@@ -620,6 +1016,10 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
                                                        all_visible = false;
                                                        break;
                                                }
+
+                                               /* Track newest xmin on page. */
+                                               if (TransactionIdFollows(xmin, visibility_cutoff_xid))
+                                                       visibility_cutoff_xid = xmin;
                                        }
                                        break;
                                case HEAPTUPLE_RECENTLY_DEAD:
@@ -648,8 +1048,9 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
                        {
                                lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
                                HeapTupleHeaderAdvanceLatestRemovedXid(tuple.t_data,
-                                                                                               &vacrelstats->latestRemovedXid);
+                                                                                        &vacrelstats->latestRemovedXid);
                                tups_vacuumed += 1;
+                               has_dead_tuples = true;
                        }
                        else
                        {
@@ -660,9 +1061,11 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
                                 * Each non-removable tuple must be checked to see if it needs
                                 * freezing.  Note we already have exclusive buffer lock.
                                 */
-                               if (heap_freeze_tuple(tuple.t_data, FreezeLimit,
-                                                                         InvalidBuffer))
-                                       frozen[nfrozen++] = offnum;
+                               if (heap_prepare_freeze_tuple(tuple.t_data, FreezeLimit,
+                                                                                 MultiXactCutoff, &frozen[nfrozen]))
+                                       frozen[nfrozen++].offset = offnum;
+                               else if (heap_tuple_needs_eventual_freeze(tuple.t_data))
+                                       all_frozen = false;
                        }
                }                                               /* scan along page */
 
@@ -673,17 +1076,33 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
                 */
                if (nfrozen > 0)
                {
+                       START_CRIT_SECTION();
+
                        MarkBufferDirty(buf);
-                       /* no XLOG for temp tables, though */
-                       if (!onerel->rd_istemp)
+
+                       /* execute collected freezes */
+                       for (i = 0; i < nfrozen; i++)
+                       {
+                               ItemId          itemid;
+                               HeapTupleHeader htup;
+
+                               itemid = PageGetItemId(page, frozen[i].offset);
+                               htup = (HeapTupleHeader) PageGetItem(page, itemid);
+
+                               heap_execute_freeze_tuple(htup, &frozen[i]);
+                       }
+
+                       /* Now WAL-log freezing if necessary */
+                       if (RelationNeedsWAL(onerel))
                        {
                                XLogRecPtr      recptr;
 
                                recptr = log_heap_freeze(onerel, buf, FreezeLimit,
                                                                                 frozen, nfrozen);
                                PageSetLSN(page, recptr);
-                               PageSetTLI(page, ThisTimeLineID);
                        }
+
+                       END_CRIT_SECTION();
                }
 
                /*
@@ -694,49 +1113,103 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
                        vacrelstats->num_dead_tuples > 0)
                {
                        /* Remove tuples from heap */
-                       lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats);
-                       /* Forget the now-vacuumed tuples, and press on */
+                       lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats, &vmbuffer);
+                       has_dead_tuples = false;
+
+                       /*
+                        * Forget the now-vacuumed tuples, and press on, but be careful
+                        * not to reset latestRemovedXid since we want that value to be
+                        * valid.
+                        */
                        vacrelstats->num_dead_tuples = 0;
-                       vacrelstats->latestRemovedXid = InvalidTransactionId;
                        vacuumed_pages++;
                }
 
                freespace = PageGetHeapFreeSpace(page);
 
-               /* Update the all-visible flag on the page */
-               if (!PageIsAllVisible(page) && all_visible)
+               /* mark page all-visible, if appropriate */
+               if (all_visible && !all_visible_according_to_vm)
                {
+                       uint8           flags = VISIBILITYMAP_ALL_VISIBLE;
+
+                       if (all_frozen)
+                               flags |= VISIBILITYMAP_ALL_FROZEN;
+
+                       /*
+                        * It should never be the case that the visibility map page is set
+                        * while the page-level bit is clear, but the reverse is allowed
+                        * (if checksums are not enabled).  Regardless, set the both bits
+                        * so that we get back in sync.
+                        *
+                        * NB: If the heap page is all-visible but the VM bit is not set,
+                        * we don't need to dirty the heap page.  However, if checksums
+                        * are enabled, we do need to make sure that the heap page is
+                        * dirtied before passing it to visibilitymap_set(), because it
+                        * may be logged.  Given that this situation should only happen in
+                        * rare cases after a crash, it is not worth optimizing.
+                        */
                        PageSetAllVisible(page);
-                       SetBufferCommitInfoNeedsSave(buf);
+                       MarkBufferDirty(buf);
+                       visibilitymap_set(onerel, blkno, buf, InvalidXLogRecPtr,
+                                                         vmbuffer, visibility_cutoff_xid, flags);
                }
-               else if (PageIsAllVisible(page) && !all_visible)
+
+               /*
+                * As of PostgreSQL 9.2, the visibility map bit should never be set if
+                * the page-level bit is clear.  However, it's possible that the bit
+                * got cleared after we checked it and before we took the buffer
+                * content lock, so we must recheck before jumping to the conclusion
+                * that something bad has happened.
+                */
+               else if (all_visible_according_to_vm && !PageIsAllVisible(page)
+                                && VM_ALL_VISIBLE(onerel, blkno, &vmbuffer))
                {
-                       elog(WARNING, "PD_ALL_VISIBLE flag was incorrectly set in relation \"%s\" page %u",
+                       elog(WARNING, "page is not marked all-visible but visibility map bit is set in relation \"%s\" page %u",
                                 relname, blkno);
-                       PageClearAllVisible(page);
-                       SetBufferCommitInfoNeedsSave(buf);
-
-                       /*
-                        * Normally, we would drop the lock on the heap page before
-                        * updating the visibility map, but since this case shouldn't
-                        * happen anyway, don't worry about that.
-                        */
-                       visibilitymap_clear(onerel, blkno);
+                       visibilitymap_clear(onerel, blkno, vmbuffer);
                }
 
-               LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+               /*
+                * It's possible for the value returned by GetOldestXmin() to move
+                * backwards, so it's not wrong for us to see tuples that appear to
+                * not be visible to everyone yet, while PD_ALL_VISIBLE is already
+                * set. The real safe xmin value never moves backwards, but
+                * GetOldestXmin() is conservative and sometimes returns a value
+                * that's unnecessarily small, so if we see that contradiction it just
+                * means that the tuples that we think are not visible to everyone yet
+                * actually are, and the PD_ALL_VISIBLE flag is correct.
+                *
+                * There should never be dead tuples on a page with PD_ALL_VISIBLE
+                * set, however.
+                */
+               else if (PageIsAllVisible(page) && has_dead_tuples)
+               {
+                       elog(WARNING, "page containing dead tuples is marked as all-visible in relation \"%s\" page %u",
+                                relname, blkno);
+                       PageClearAllVisible(page);
+                       MarkBufferDirty(buf);
+                       visibilitymap_clear(onerel, blkno, vmbuffer);
+               }
 
-               /* Update the visibility map */
-               if (!all_visible_according_to_vm && all_visible)
+               /*
+                * If the page is marked as all-visible but not all-frozen, we should
+                * so mark it.  Note that all_frozen is only valid if all_visible is
+                * true, so we must check both.
+                */
+               else if (all_visible_according_to_vm && all_visible && all_frozen &&
+                                !VM_ALL_FROZEN(onerel, blkno, &vmbuffer))
                {
-                       visibilitymap_pin(onerel, blkno, &vmbuffer);
-                       LockBuffer(buf, BUFFER_LOCK_SHARE);
-                       if (PageIsAllVisible(page))
-                               visibilitymap_set(onerel, blkno, PageGetLSN(page), &vmbuffer);
-                       LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+                       /*
+                        * We can pass InvalidTransactionId as the cutoff XID here,
+                        * because setting the all-frozen bit doesn't cause recovery
+                        * conflicts.
+                        */
+                       visibilitymap_set(onerel, blkno, buf, InvalidXLogRecPtr,
+                                                         vmbuffer, InvalidTransactionId,
+                                                         VISIBILITYMAP_ALL_FROZEN);
                }
 
-               ReleaseBuffer(buf);
+               UnlockReleaseBuffer(buf);
 
                /* Remember the location of the last page with nonremovable tuples */
                if (hastup)
@@ -745,41 +1218,78 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
                /*
                 * If we remembered any tuples for deletion, then the page will be
                 * visited again by lazy_vacuum_heap, which will compute and record
-                * its post-compaction free space.      If not, then we're done with this
-                * page, so remember its free space as-is.      (This path will always be
+                * its post-compaction free space.  If not, then we're done with this
+                * page, so remember its free space as-is.  (This path will always be
                 * taken if there are no indexes.)
                 */
                if (vacrelstats->num_dead_tuples == prev_dead_count)
                        RecordPageWithFreeSpace(onerel, blkno, freespace);
        }
 
+       /* report that everything is scanned and vacuumed */
+       pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, blkno);
+
+       pfree(frozen);
+
        /* save stats for use later */
-       vacrelstats->rel_tuples = num_tuples;
+       vacrelstats->scanned_tuples = num_tuples;
        vacrelstats->tuples_deleted = tups_vacuumed;
+       vacrelstats->new_dead_tuples = nkeep;
+
+       /* now we can compute the new value for pg_class.reltuples */
+       vacrelstats->new_rel_tuples = vac_estimate_reltuples(onerel, false,
+                                                                                                                nblocks,
+                                                                                                 vacrelstats->scanned_pages,
+                                                                                                                num_tuples);
+
+       /*
+        * Release any remaining pin on visibility map page.
+        */
+       if (BufferIsValid(vmbuffer))
+       {
+               ReleaseBuffer(vmbuffer);
+               vmbuffer = InvalidBuffer;
+       }
 
        /* If any tuples need to be deleted, perform final vacuum cycle */
        /* XXX put a threshold on min number of tuples here? */
        if (vacrelstats->num_dead_tuples > 0)
        {
+               const int       hvp_index[] = {
+                       PROGRESS_VACUUM_PHASE,
+                       PROGRESS_VACUUM_NUM_INDEX_VACUUMS
+               };
+               int64           hvp_val[2];
+
                /* Log cleanup info before we touch indexes */
                vacuum_log_cleanup_info(onerel, vacrelstats);
 
+               /* Report that we are now vacuuming indexes */
+               pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
+                                                                        PROGRESS_VACUUM_PHASE_VACUUM_INDEX);
+
                /* Remove index entries */
                for (i = 0; i < nindexes; i++)
                        lazy_vacuum_index(Irel[i],
                                                          &indstats[i],
                                                          vacrelstats);
+
+               /* Report that we are now vacuuming the heap */
+               hvp_val[0] = PROGRESS_VACUUM_PHASE_VACUUM_HEAP;
+               hvp_val[1] = vacrelstats->num_index_scans + 1;
+               pgstat_progress_update_multi_param(2, hvp_index, hvp_val);
+
                /* Remove tuples from heap */
+               pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
+                                                                        PROGRESS_VACUUM_PHASE_VACUUM_HEAP);
                lazy_vacuum_heap(onerel, vacrelstats);
                vacrelstats->num_index_scans++;
        }
 
-       /* Release the pin on the visibility map page */
-       if (BufferIsValid(vmbuffer))
-       {
-               ReleaseBuffer(vmbuffer);
-               vmbuffer = InvalidBuffer;
-       }
+       /* report all blocks vacuumed; and that we're cleaning up */
+       pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, blkno);
+       pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
+                                                                PROGRESS_VACUUM_PHASE_INDEX_CLEANUP);
 
        /* Do post-vacuum cleanup and statistics update for each index */
        for (i = 0; i < nindexes; i++)
@@ -792,18 +1302,34 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
                                                RelationGetRelationName(onerel),
                                                tups_vacuumed, vacuumed_pages)));
 
+       /*
+        * This is pretty messy, but we split it up so that we can skip emitting
+        * individual parts of the message when not applicable.
+        */
+       initStringInfo(&buf);
+       appendStringInfo(&buf,
+                                        _("%.0f dead row versions cannot be removed yet.\n"),
+                                        nkeep);
+       appendStringInfo(&buf, _("There were %.0f unused item pointers.\n"),
+                                        nunused);
+       appendStringInfo(&buf, ngettext("Skipped %u page due to buffer pins.\n",
+                                                                       "Skipped %u pages due to buffer pins.\n",
+                                                                       vacrelstats->pinskipped_pages),
+                                        vacrelstats->pinskipped_pages);
+       appendStringInfo(&buf, ngettext("%u page is entirely empty.\n",
+                                                                       "%u pages are entirely empty.\n",
+                                                                       empty_pages),
+                                        empty_pages);
+       appendStringInfo(&buf, _("%s."),
+                                        pg_rusage_show(&ru0));
+
        ereport(elevel,
                        (errmsg("\"%s\": found %.0f removable, %.0f nonremovable row versions in %u out of %u pages",
                                        RelationGetRelationName(onerel),
-                                       tups_vacuumed, num_tuples, scanned_pages, nblocks),
-                        errdetail("%.0f dead row versions cannot be removed yet.\n"
-                                          "There were %.0f unused item pointers.\n"
-                                          "%u pages are entirely empty.\n"
-                                          "%s.",
-                                          nkeep,
-                                          nunused,
-                                          empty_pages,
-                                          pg_rusage_show(&ru0))));
+                                       tups_vacuumed, num_tuples,
+                                       vacrelstats->scanned_pages, nblocks),
+                        errdetail_internal("%s", buf.data)));
+       pfree(buf.data);
 }
 
 
@@ -824,6 +1350,7 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
        int                     tupindex;
        int                     npages;
        PGRUsage        ru0;
+       Buffer          vmbuffer = InvalidBuffer;
 
        pg_rusage_init(&ru0);
        npages = 0;
@@ -841,8 +1368,14 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
                tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
                buf = ReadBufferExtended(onerel, MAIN_FORKNUM, tblk, RBM_NORMAL,
                                                                 vac_strategy);
-               LockBufferForCleanup(buf);
-               tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats);
+               if (!ConditionalLockBufferForCleanup(buf))
+               {
+                       ReleaseBuffer(buf);
+                       ++tupindex;
+                       continue;
+               }
+               tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats,
+                                                                       &vmbuffer);
 
                /* Now that we've compacted the page, record its available space */
                page = BufferGetPage(buf);
@@ -853,6 +1386,12 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
                npages++;
        }
 
+       if (BufferIsValid(vmbuffer))
+       {
+               ReleaseBuffer(vmbuffer);
+               vmbuffer = InvalidBuffer;
+       }
+
        ereport(elevel,
                        (errmsg("\"%s\": removed %d row versions in %d pages",
                                        RelationGetRelationName(onerel),
@@ -873,11 +1412,15 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
  */
 static int
 lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
-                                int tupindex, LVRelStats *vacrelstats)
+                                int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer)
 {
        Page            page = BufferGetPage(buffer);
        OffsetNumber unused[MaxOffsetNumber];
        int                     uncnt = 0;
+       TransactionId visibility_cutoff_xid;
+       bool            all_frozen;
+
+       pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, blkno);
 
        START_CRIT_SECTION();
 
@@ -898,26 +1441,119 @@ lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
 
        PageRepairFragmentation(page);
 
+       /*
+        * Mark buffer dirty before we write WAL.
+        */
        MarkBufferDirty(buffer);
 
        /* XLOG stuff */
-       if (!onerel->rd_istemp)
+       if (RelationNeedsWAL(onerel))
        {
                XLogRecPtr      recptr;
 
                recptr = log_heap_clean(onerel, buffer,
                                                                NULL, 0, NULL, 0,
                                                                unused, uncnt,
-                                                               vacrelstats->latestRemovedXid, false);
+                                                               vacrelstats->latestRemovedXid);
                PageSetLSN(page, recptr);
-               PageSetTLI(page, ThisTimeLineID);
        }
 
+       /*
+        * End critical section, so we safely can do visibility tests (which
+        * possibly need to perform IO and allocate memory!). If we crash now the
+        * page (including the corresponding vm bit) might not be marked all
+        * visible, but that's fine. A later vacuum will fix that.
+        */
        END_CRIT_SECTION();
 
+       /*
+        * Now that we have removed the dead tuples from the page, once again
+        * check if the page has become all-visible.  The page is already marked
+        * dirty, exclusively locked, and, if needed, a full page image has been
+        * emitted in the log_heap_clean() above.
+        */
+       if (heap_page_is_all_visible(onerel, buffer, &visibility_cutoff_xid,
+                                                                &all_frozen))
+               PageSetAllVisible(page);
+
+       /*
+        * All the changes to the heap page have been done. If the all-visible
+        * flag is now set, also set the VM all-visible bit (and, if possible, the
+        * all-frozen bit) unless this has already been done previously.
+        */
+       if (PageIsAllVisible(page))
+       {
+               uint8           vm_status = visibilitymap_get_status(onerel, blkno, vmbuffer);
+               uint8           flags = 0;
+
+               /* Set the VM all-frozen bit to flag, if needed */
+               if ((vm_status & VISIBILITYMAP_ALL_VISIBLE) == 0)
+                       flags |= VISIBILITYMAP_ALL_VISIBLE;
+               if ((vm_status & VISIBILITYMAP_ALL_FROZEN) == 0 && all_frozen)
+                       flags |= VISIBILITYMAP_ALL_FROZEN;
+
+               Assert(BufferIsValid(*vmbuffer));
+               if (flags != 0)
+                       visibilitymap_set(onerel, blkno, buffer, InvalidXLogRecPtr,
+                                                         *vmbuffer, visibility_cutoff_xid, flags);
+       }
+
        return tupindex;
 }
 
+/*
+ *     lazy_check_needs_freeze() -- scan page to see if any tuples
+ *                                      need to be cleaned to avoid wraparound
+ *
+ * Returns true if the page needs to be vacuumed using cleanup lock.
+ * Also returns a flag indicating whether page contains any tuples at all.
+ */
+static bool
+lazy_check_needs_freeze(Buffer buf, bool *hastup)
+{
+       Page            page = BufferGetPage(buf);
+       OffsetNumber offnum,
+                               maxoff;
+       HeapTupleHeader tupleheader;
+
+       *hastup = false;
+
+       /* If we hit an uninitialized page, we want to force vacuuming it. */
+       if (PageIsNew(page))
+               return true;
+
+       /* Quick out for ordinary empty page. */
+       if (PageIsEmpty(page))
+               return false;
+
+       maxoff = PageGetMaxOffsetNumber(page);
+       for (offnum = FirstOffsetNumber;
+                offnum <= maxoff;
+                offnum = OffsetNumberNext(offnum))
+       {
+               ItemId          itemid;
+
+               itemid = PageGetItemId(page, offnum);
+
+               /* this should match hastup test in count_nondeletable_pages() */
+               if (ItemIdIsUsed(itemid))
+                       *hastup = true;
+
+               /* dead and redirect items never need freezing */
+               if (!ItemIdIsNormal(itemid))
+                       continue;
+
+               tupleheader = (HeapTupleHeader) PageGetItem(page, itemid);
+
+               if (heap_tuple_needs_freeze(tupleheader, FreezeLimit,
+                                                                       MultiXactCutoff, buf))
+                       return true;
+       }                                                       /* scan along page */
+
+       return false;
+}
+
+
 /*
  *     lazy_vacuum_index() -- vacuum one index relation.
  *
@@ -935,7 +1571,6 @@ lazy_vacuum_index(Relation indrel,
        pg_rusage_init(&ru0);
 
        ivinfo.index = indrel;
-       ivinfo.vacuum_full = false;
        ivinfo.analyze_only = false;
        ivinfo.estimated_count = true;
        ivinfo.message_level = elevel;
@@ -967,12 +1602,10 @@ lazy_cleanup_index(Relation indrel,
        pg_rusage_init(&ru0);
 
        ivinfo.index = indrel;
-       ivinfo.vacuum_full = false;
        ivinfo.analyze_only = false;
-       ivinfo.estimated_count = !vacrelstats->scanned_all;
+       ivinfo.estimated_count = (vacrelstats->scanned_pages < vacrelstats->rel_pages);
        ivinfo.message_level = elevel;
-       /* use rel_tuples only if we scanned all pages, else fall back */
-       ivinfo.num_heap_tuples = vacrelstats->scanned_all ? vacrelstats->rel_tuples : vacrelstats->old_rel_tuples;
+       ivinfo.num_heap_tuples = vacrelstats->new_rel_tuples;
        ivinfo.strategy = vac_strategy;
 
        stats = index_vacuum_cleanup(&ivinfo, stats);
@@ -986,8 +1619,13 @@ lazy_cleanup_index(Relation indrel,
         */
        if (!stats->estimated_count)
                vac_update_relstats(indrel,
-                                                       stats->num_pages, stats->num_index_tuples,
-                                                       false, InvalidTransactionId);
+                                                       stats->num_pages,
+                                                       stats->num_index_tuples,
+                                                       0,
+                                                       false,
+                                                       InvalidTransactionId,
+                                                       InvalidMultiXactId,
+                                                       false);
 
        ereport(elevel,
                        (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
@@ -1004,6 +1642,31 @@ lazy_cleanup_index(Relation indrel,
        pfree(stats);
 }
 
+/*
+ * should_attempt_truncation - should we attempt to truncate the heap?
+ *
+ * Don't even think about it unless we have a shot at releasing a goodly
+ * number of pages.  Otherwise, the time taken isn't worth it.
+ *
+ * This is split out so that we can test whether truncation is going to be
+ * called for before we actually do it.  If you change the logic here, be
+ * careful to depend only on fields that lazy_scan_heap updates on-the-fly.
+ */
+static bool
+should_attempt_truncation(LVRelStats *vacrelstats)
+{
+       BlockNumber possibly_freeable;
+
+       possibly_freeable = vacrelstats->rel_pages - vacrelstats->nonempty_pages;
+       if (possibly_freeable > 0 &&
+               (possibly_freeable >= REL_TRUNCATE_MINIMUM ||
+                possibly_freeable >= vacrelstats->rel_pages / REL_TRUNCATE_FRACTION) &&
+               old_snapshot_threshold < 0)
+               return true;
+       else
+               return false;
+}
+
 /*
  * lazy_truncate_heap - try to truncate off any empty pages at the end
  */
@@ -1013,72 +1676,121 @@ lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats)
        BlockNumber old_rel_pages = vacrelstats->rel_pages;
        BlockNumber new_rel_pages;
        PGRUsage        ru0;
+       int                     lock_retry;
 
        pg_rusage_init(&ru0);
 
-       /*
-        * We need full exclusive lock on the relation in order to do truncation.
-        * If we can't get it, give up rather than waiting --- we don't want to
-        * block other backends, and we don't want to deadlock (which is quite
-        * possible considering we already hold a lower-grade lock).
-        */
-       if (!ConditionalLockRelation(onerel, AccessExclusiveLock))
-               return;
+       /* Report that we are now truncating */
+       pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
+                                                                PROGRESS_VACUUM_PHASE_TRUNCATE);
 
        /*
-        * Now that we have exclusive lock, look to see if the rel has grown
-        * whilst we were vacuuming with non-exclusive lock.  If so, give up; the
-        * newly added pages presumably contain non-deletable tuples.
+        * Loop until no more truncating can be done.
         */
-       new_rel_pages = RelationGetNumberOfBlocks(onerel);
-       if (new_rel_pages != old_rel_pages)
+       do
        {
-               /* might as well use the latest news when we update pg_class stats */
-               vacrelstats->rel_pages = new_rel_pages;
-               UnlockRelation(onerel, AccessExclusiveLock);
-               return;
-       }
+               /*
+                * We need full exclusive lock on the relation in order to do
+                * truncation. If we can't get it, give up rather than waiting --- we
+                * don't want to block other backends, and we don't want to deadlock
+                * (which is quite possible considering we already hold a lower-grade
+                * lock).
+                */
+               vacrelstats->lock_waiter_detected = false;
+               lock_retry = 0;
+               while (true)
+               {
+                       if (ConditionalLockRelation(onerel, AccessExclusiveLock))
+                               break;
 
-       /*
-        * Scan backwards from the end to verify that the end pages actually
-        * contain no tuples.  This is *necessary*, not optional, because other
-        * backends could have added tuples to these pages whilst we were
-        * vacuuming.
-        */
-       new_rel_pages = count_nondeletable_pages(onerel, vacrelstats);
+                       /*
+                        * Check for interrupts while trying to (re-)acquire the exclusive
+                        * lock.
+                        */
+                       CHECK_FOR_INTERRUPTS();
 
-       if (new_rel_pages >= old_rel_pages)
-       {
-               /* can't do anything after all */
-               UnlockRelation(onerel, AccessExclusiveLock);
-               return;
-       }
+                       if (++lock_retry > (VACUUM_TRUNCATE_LOCK_TIMEOUT /
+                                                               VACUUM_TRUNCATE_LOCK_WAIT_INTERVAL))
+                       {
+                               /*
+                                * We failed to establish the lock in the specified number of
+                                * retries. This means we give up truncating.
+                                */
+                               vacrelstats->lock_waiter_detected = true;
+                               ereport(elevel,
+                                               (errmsg("\"%s\": stopping truncate due to conflicting lock request",
+                                                               RelationGetRelationName(onerel))));
+                               return;
+                       }
 
-       /*
-        * Okay to truncate.
-        */
-       RelationTruncate(onerel, new_rel_pages);
+                       pg_usleep(VACUUM_TRUNCATE_LOCK_WAIT_INTERVAL);
+               }
 
-       /* force relcache inval so all backends reset their rd_targblock */
-       CacheInvalidateRelcache(onerel);
+               /*
+                * Now that we have exclusive lock, look to see if the rel has grown
+                * whilst we were vacuuming with non-exclusive lock.  If so, give up;
+                * the newly added pages presumably contain non-deletable tuples.
+                */
+               new_rel_pages = RelationGetNumberOfBlocks(onerel);
+               if (new_rel_pages != old_rel_pages)
+               {
+                       /*
+                        * Note: we intentionally don't update vacrelstats->rel_pages with
+                        * the new rel size here.  If we did, it would amount to assuming
+                        * that the new pages are empty, which is unlikely. Leaving the
+                        * numbers alone amounts to assuming that the new pages have the
+                        * same tuple density as existing ones, which is less unlikely.
+                        */
+                       UnlockRelation(onerel, AccessExclusiveLock);
+                       return;
+               }
 
-       /*
-        * Note: once we have truncated, we *must* keep the exclusive lock until
-        * commit.      The sinval message won't be sent until commit, and other
-        * backends must see it and reset their rd_targblock values before they
-        * can safely access the table again.
-        */
+               /*
+                * Scan backwards from the end to verify that the end pages actually
+                * contain no tuples.  This is *necessary*, not optional, because
+                * other backends could have added tuples to these pages whilst we
+                * were vacuuming.
+                */
+               new_rel_pages = count_nondeletable_pages(onerel, vacrelstats);
 
-       /* update statistics */
-       vacrelstats->rel_pages = new_rel_pages;
-       vacrelstats->pages_removed = old_rel_pages - new_rel_pages;
+               if (new_rel_pages >= old_rel_pages)
+               {
+                       /* can't do anything after all */
+                       UnlockRelation(onerel, AccessExclusiveLock);
+                       return;
+               }
 
-       ereport(elevel,
-                       (errmsg("\"%s\": truncated %u to %u pages",
-                                       RelationGetRelationName(onerel),
-                                       old_rel_pages, new_rel_pages),
-                        errdetail("%s.",
-                                          pg_rusage_show(&ru0))));
+               /*
+                * Okay to truncate.
+                */
+               RelationTruncate(onerel, new_rel_pages);
+
+               /*
+                * We can release the exclusive lock as soon as we have truncated.
+                * Other backends can't safely access the relation until they have
+                * processed the smgr invalidation that smgrtruncate sent out ... but
+                * that should happen as part of standard invalidation processing once
+                * they acquire lock on the relation.
+                */
+               UnlockRelation(onerel, AccessExclusiveLock);
+
+               /*
+                * Update statistics.  Here, it *is* correct to adjust rel_pages
+                * without also touching reltuples, since the tuple count wasn't
+                * changed by the truncation.
+                */
+               vacrelstats->pages_removed += old_rel_pages - new_rel_pages;
+               vacrelstats->rel_pages = new_rel_pages;
+
+               ereport(elevel,
+                               (errmsg("\"%s\": truncated %u to %u pages",
+                                               RelationGetRelationName(onerel),
+                                               old_rel_pages, new_rel_pages),
+                                errdetail("%s.",
+                                                  pg_rusage_show(&ru0))));
+               old_rel_pages = new_rel_pages;
+       } while (new_rel_pages > vacrelstats->nonempty_pages &&
+                        vacrelstats->lock_waiter_detected);
 }
 
 /*
@@ -1090,6 +1802,10 @@ static BlockNumber
 count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats)
 {
        BlockNumber blkno;
+       instr_time      starttime;
+
+       /* Initialize the starttime if we check for conflicting lock requests */
+       INSTR_TIME_SET_CURRENT(starttime);
 
        /* Strange coding of loop control is needed because blkno is unsigned */
        blkno = vacrelstats->rel_pages;
@@ -1101,6 +1817,38 @@ count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats)
                                        maxoff;
                bool            hastup;
 
+               /*
+                * Check if another process requests a lock on our relation. We are
+                * holding an AccessExclusiveLock here, so they will be waiting. We
+                * only do this once per VACUUM_TRUNCATE_LOCK_CHECK_INTERVAL, and we
+                * only check if that interval has elapsed once every 32 blocks to
+                * keep the number of system calls and actual shared lock table
+                * lookups to a minimum.
+                */
+               if ((blkno % 32) == 0)
+               {
+                       instr_time      currenttime;
+                       instr_time      elapsed;
+
+                       INSTR_TIME_SET_CURRENT(currenttime);
+                       elapsed = currenttime;
+                       INSTR_TIME_SUBTRACT(elapsed, starttime);
+                       if ((INSTR_TIME_GET_MICROSEC(elapsed) / 1000)
+                               >= VACUUM_TRUNCATE_LOCK_CHECK_INTERVAL)
+                       {
+                               if (LockHasWaitersRelation(onerel, AccessExclusiveLock))
+                               {
+                                       ereport(elevel,
+                                                       (errmsg("\"%s\": suspending truncate due to conflicting lock request",
+                                                                       RelationGetRelationName(onerel))));
+
+                                       vacrelstats->lock_waiter_detected = true;
+                                       return blkno;
+                               }
+                               starttime = currenttime;
+                       }
+               }
+
                /*
                 * We don't insert a vacuum delay point here, because we have an
                 * exclusive lock on the table which we want to hold for as short a
@@ -1172,10 +1920,13 @@ static void
 lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks)
 {
        long            maxtuples;
+       int                     vac_work_mem = IsAutoVacuumWorkerProcess() &&
+       autovacuum_work_mem != -1 ?
+       autovacuum_work_mem : maintenance_work_mem;
 
        if (vacrelstats->hasindex)
        {
-               maxtuples = (maintenance_work_mem * 1024L) / sizeof(ItemPointerData);
+               maxtuples = (vac_work_mem * 1024L) / sizeof(ItemPointerData);
                maxtuples = Min(maxtuples, INT_MAX);
                maxtuples = Min(maxtuples, MaxAllocSize / sizeof(ItemPointerData));
 
@@ -1213,6 +1964,8 @@ lazy_record_dead_tuple(LVRelStats *vacrelstats,
        {
                vacrelstats->dead_tuples[vacrelstats->num_dead_tuples] = *itemptr;
                vacrelstats->num_dead_tuples++;
+               pgstat_progress_update_param(PROGRESS_VACUUM_NUM_DEAD_TUPLES,
+                                                                        vacrelstats->num_dead_tuples);
        }
 }
 
@@ -1267,3 +2020,119 @@ vac_cmp_itemptr(const void *left, const void *right)
 
        return 0;
 }
+
+/*
+ * Check if every tuple in the given page is visible to all current and future
+ * transactions. Also return the visibility_cutoff_xid which is the highest
+ * xmin amongst the visible tuples.  Set *all_frozen to true if every tuple
+ * on this page is frozen.
+ */
+static bool
+heap_page_is_all_visible(Relation rel, Buffer buf,
+                                                TransactionId *visibility_cutoff_xid,
+                                                bool *all_frozen)
+{
+       Page            page = BufferGetPage(buf);
+       BlockNumber blockno = BufferGetBlockNumber(buf);
+       OffsetNumber offnum,
+                               maxoff;
+       bool            all_visible = true;
+
+       *visibility_cutoff_xid = InvalidTransactionId;
+       *all_frozen = true;
+
+       /*
+        * This is a stripped down version of the line pointer scan in
+        * lazy_scan_heap(). So if you change anything here, also check that code.
+        */
+       maxoff = PageGetMaxOffsetNumber(page);
+       for (offnum = FirstOffsetNumber;
+                offnum <= maxoff && all_visible;
+                offnum = OffsetNumberNext(offnum))
+       {
+               ItemId          itemid;
+               HeapTupleData tuple;
+
+               itemid = PageGetItemId(page, offnum);
+
+               /* Unused or redirect line pointers are of no interest */
+               if (!ItemIdIsUsed(itemid) || ItemIdIsRedirected(itemid))
+                       continue;
+
+               ItemPointerSet(&(tuple.t_self), blockno, offnum);
+
+               /*
+                * Dead line pointers can have index pointers pointing to them. So
+                * they can't be treated as visible
+                */
+               if (ItemIdIsDead(itemid))
+               {
+                       all_visible = false;
+                       break;
+               }
+
+               Assert(ItemIdIsNormal(itemid));
+
+               tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
+               tuple.t_len = ItemIdGetLength(itemid);
+               tuple.t_tableOid = RelationGetRelid(rel);
+
+               switch (HeapTupleSatisfiesVacuum(&tuple, OldestXmin, buf))
+               {
+                       case HEAPTUPLE_LIVE:
+                               {
+                                       TransactionId xmin;
+
+                                       /* Check comments in lazy_scan_heap. */
+                                       if (!HeapTupleHeaderXminCommitted(tuple.t_data))
+                                       {
+                                               all_visible = false;
+                                               break;
+                                       }
+
+                                       /*
+                                        * The inserter definitely committed. But is it old enough
+                                        * that everyone sees it as committed?
+                                        */
+                                       xmin = HeapTupleHeaderGetXmin(tuple.t_data);
+                                       if (!TransactionIdPrecedes(xmin, OldestXmin))
+                                       {
+                                               all_visible = false;
+                                               break;
+                                       }
+
+                                       /* Track newest xmin on page. */
+                                       if (TransactionIdFollows(xmin, *visibility_cutoff_xid))
+                                               *visibility_cutoff_xid = xmin;
+
+                                       /* Check whether this tuple is already frozen or not */
+                                       if (all_visible && *all_frozen &&
+                                               heap_tuple_needs_eventual_freeze(tuple.t_data))
+                                               *all_frozen = false;
+                               }
+                               break;
+
+                       case HEAPTUPLE_DEAD:
+                       case HEAPTUPLE_RECENTLY_DEAD:
+                       case HEAPTUPLE_INSERT_IN_PROGRESS:
+                       case HEAPTUPLE_DELETE_IN_PROGRESS:
+                               all_visible = false;
+                               break;
+
+                       default:
+                               elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
+                               break;
+               }
+       }                                                       /* scan along page */
+
+       /*
+        * We don't bother clearing *all_frozen when the page is discovered not to
+        * be all-visible, so do that now if necessary.  The page might fail to be
+        * all-frozen for other reasons anyway, but if it's not all-visible, then
+        * it definitely isn't all-frozen.
+        */
+       if (!all_visible)
+               *all_frozen = false;
+
+       return all_visible;
+}