]> granicus.if.org Git - postgresql/blobdiff - src/backend/commands/vacuum.c
Fix "ANALYZE t, t" inside a transaction block.
[postgresql] / src / backend / commands / vacuum.c
index 0563e6347430d43c6cd5d4be5c8456fcc3406946..7d6c50b49d9301bbacdbfd280d5633a23cc91c93 100644 (file)
@@ -9,7 +9,7 @@
  * in cluster.c.
  *
  *
- * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
 #include "access/heapam.h"
 #include "access/htup_details.h"
 #include "access/multixact.h"
+#include "access/tableam.h"
 #include "access/transam.h"
 #include "access/xact.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_database.h"
+#include "catalog/pg_inherits.h"
 #include "catalog/pg_namespace.h"
 #include "commands/cluster.h"
+#include "commands/defrem.h"
 #include "commands/vacuum.h"
 #include "miscadmin.h"
+#include "nodes/makefuncs.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
 #include "storage/bufmgr.h"
@@ -48,7 +52,6 @@
 #include "utils/memutils.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
-#include "utils/tqual.h"
 
 
 /*
@@ -66,13 +69,14 @@ static BufferAccessStrategy vac_strategy;
 
 
 /* non-export function prototypes */
-static List *get_rel_oids(Oid relid, const RangeVar *vacrel);
+static List *expand_vacuum_rel(VacuumRelation *vrel, int options);
+static List *get_all_vacuum_rels(int options);
 static void vac_truncate_clog(TransactionId frozenXID,
-                                 MultiXactId minMulti,
-                                 TransactionId lastSaneFrozenXid,
-                                 MultiXactId lastSaneMinMulti);
-static bool vacuum_rel(Oid relid, RangeVar *relation, int options,
-                  VacuumParams *params);
+                                                         MultiXactId minMulti,
+                                                         TransactionId lastSaneFrozenXid,
+                                                         MultiXactId lastSaneMinMulti);
+static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params);
+static VacOptTernaryValue get_vacopt_ternary_value(DefElem *def);
 
 /*
  * Primary entry point for manual VACUUM and ANALYZE commands
@@ -81,22 +85,96 @@ static bool vacuum_rel(Oid relid, RangeVar *relation, int options,
  * happen in vacuum().
  */
 void
-ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel)
+ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
 {
        VacuumParams params;
+       bool            verbose = false;
+       bool            skip_locked = false;
+       bool            analyze = false;
+       bool            freeze = false;
+       bool            full = false;
+       bool            disable_page_skipping = false;
+       ListCell   *lc;
+
+       /* Set default value */
+       params.index_cleanup = VACOPT_TERNARY_DEFAULT;
+       params.truncate = VACOPT_TERNARY_DEFAULT;
+
+       /* Parse options list */
+       foreach(lc, vacstmt->options)
+       {
+               DefElem    *opt = (DefElem *) lfirst(lc);
+
+               /* Parse common options for VACUUM and ANALYZE */
+               if (strcmp(opt->defname, "verbose") == 0)
+                       verbose = defGetBoolean(opt);
+               else if (strcmp(opt->defname, "skip_locked") == 0)
+                       skip_locked = defGetBoolean(opt);
+               else if (!vacstmt->is_vacuumcmd)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_SYNTAX_ERROR),
+                                        errmsg("unrecognized ANALYZE option \"%s\"", opt->defname),
+                                        parser_errposition(pstate, opt->location)));
+
+               /* Parse options available on VACUUM */
+               else if (strcmp(opt->defname, "analyze") == 0)
+                       analyze = defGetBoolean(opt);
+               else if (strcmp(opt->defname, "freeze") == 0)
+                       freeze = defGetBoolean(opt);
+               else if (strcmp(opt->defname, "full") == 0)
+                       full = defGetBoolean(opt);
+               else if (strcmp(opt->defname, "disable_page_skipping") == 0)
+                       disable_page_skipping = defGetBoolean(opt);
+               else if (strcmp(opt->defname, "index_cleanup") == 0)
+                       params.index_cleanup = get_vacopt_ternary_value(opt);
+               else if (strcmp(opt->defname, "truncate") == 0)
+                       params.truncate = get_vacopt_ternary_value(opt);
+               else
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_SYNTAX_ERROR),
+                                        errmsg("unrecognized VACUUM option \"%s\"", opt->defname),
+                                        parser_errposition(pstate, opt->location)));
+       }
+
+       /* Set vacuum options */
+       params.options =
+               (vacstmt->is_vacuumcmd ? VACOPT_VACUUM : VACOPT_ANALYZE) |
+               (verbose ? VACOPT_VERBOSE : 0) |
+               (skip_locked ? VACOPT_SKIP_LOCKED : 0) |
+               (analyze ? VACOPT_ANALYZE : 0) |
+               (freeze ? VACOPT_FREEZE : 0) |
+               (full ? VACOPT_FULL : 0) |
+               (disable_page_skipping ? VACOPT_DISABLE_PAGE_SKIPPING : 0);
 
        /* sanity checks on options */
-       Assert(vacstmt->options & (VACOPT_VACUUM | VACOPT_ANALYZE));
-       Assert((vacstmt->options & VACOPT_VACUUM) ||
-                  !(vacstmt->options & (VACOPT_FULL | VACOPT_FREEZE)));
-       Assert((vacstmt->options & VACOPT_ANALYZE) || vacstmt->va_cols == NIL);
-       Assert(!(vacstmt->options & VACOPT_SKIPTOAST));
+       Assert(params.options & (VACOPT_VACUUM | VACOPT_ANALYZE));
+       Assert((params.options & VACOPT_VACUUM) ||
+                  !(params.options & (VACOPT_FULL | VACOPT_FREEZE)));
+       Assert(!(params.options & VACOPT_SKIPTOAST));
+
+       /*
+        * Make sure VACOPT_ANALYZE is specified if any column lists are present.
+        */
+       if (!(params.options & VACOPT_ANALYZE))
+       {
+               ListCell   *lc;
+
+               foreach(lc, vacstmt->rels)
+               {
+                       VacuumRelation *vrel = lfirst_node(VacuumRelation, lc);
+
+                       if (vrel->va_cols != NIL)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                                errmsg("ANALYZE option must be specified when a column list is provided")));
+               }
+       }
 
        /*
         * All freeze ages are zero if the FREEZE option is given; otherwise pass
         * them as -1 which means to use the default values.
         */
-       if (vacstmt->options & VACOPT_FREEZE)
+       if (params.options & VACOPT_FREEZE)
        {
                params.freeze_min_age = 0;
                params.freeze_table_age = 0;
@@ -118,24 +196,20 @@ ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel)
        params.log_min_duration = -1;
 
        /* Now go through the common routine */
-       vacuum(vacstmt->options, vacstmt->relation, InvalidOid, &params,
-                  vacstmt->va_cols, NULL, isTopLevel);
+       vacuum(vacstmt->rels, &params, NULL, isTopLevel);
 }
 
 /*
- * Primary entry point for VACUUM and ANALYZE commands.
+ * Internal entry point for VACUUM and ANALYZE commands.
  *
- * options is a bitmask of VacuumOption flags, indicating what to do.
- *
- * relid, if not InvalidOid, indicate the relation to process; otherwise,
- * the RangeVar is used.  (The latter must always be passed, because it's
- * used for error messages.)
+ * relations, if not NIL, is a list of VacuumRelation to process; otherwise,
+ * we process all relevant tables in the database.  For each VacuumRelation,
+ * if a valid OID is supplied, the table with that OID is what to process;
+ * otherwise, the VacuumRelation's RangeVar indicates what to process.
  *
  * params contains a set of parameters that can be used to customize the
  * behavior.
  *
- * va_cols is a list of columns to analyze, or NIL to process them all.
- *
  * bstrategy is normally given as NULL, but in autovacuum it can be passed
  * in to use the same buffer strategy object across multiple vacuum() calls.
  *
@@ -145,18 +219,18 @@ ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel)
  * memory context that will not disappear at transaction commit.
  */
 void
-vacuum(int options, RangeVar *relation, Oid relid, VacuumParams *params,
-          List *va_cols, BufferAccessStrategy bstrategy, bool isTopLevel)
+vacuum(List *relations, VacuumParams *params,
+          BufferAccessStrategy bstrategy, bool isTopLevel)
 {
+       static bool in_vacuum = false;
+
        const char *stmttype;
        volatile bool in_outer_xact,
                                use_own_xacts;
-       List       *relations;
-       static bool in_vacuum = false;
 
        Assert(params != NULL);
 
-       stmttype = (options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE";
+       stmttype = (params->options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE";
 
        /*
         * We cannot run VACUUM inside a user transaction block; if we were inside
@@ -166,13 +240,13 @@ vacuum(int options, RangeVar *relation, Oid relid, VacuumParams *params,
         *
         * ANALYZE (without VACUUM) can run either way.
         */
-       if (options & VACOPT_VACUUM)
+       if (params->options & VACOPT_VACUUM)
        {
-               PreventTransactionChain(isTopLevel, stmttype);
+               PreventInTransactionBlock(isTopLevel, stmttype);
                in_outer_xact = false;
        }
        else
-               in_outer_xact = IsInTransactionChain(isTopLevel);
+               in_outer_xact = IsInTransactionBlock(isTopLevel);
 
        /*
         * Due to static variables vac_context, anl_context and vac_strategy,
@@ -188,8 +262,8 @@ vacuum(int options, RangeVar *relation, Oid relid, VacuumParams *params,
        /*
         * Sanity check DISABLE_PAGE_SKIPPING option.
         */
-       if ((options & VACOPT_FULL) != 0 &&
-               (options & VACOPT_DISABLE_PAGE_SKIPPING) != 0)
+       if ((params->options & VACOPT_FULL) != 0 &&
+               (params->options & VACOPT_DISABLE_PAGE_SKIPPING) != 0)
                ereport(ERROR,
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                 errmsg("VACUUM option DISABLE_PAGE_SKIPPING cannot be used with FULL")));
@@ -198,7 +272,7 @@ vacuum(int options, RangeVar *relation, Oid relid, VacuumParams *params,
         * Send info about dead objects to the statistics collector, unless we are
         * in autovacuum --- autovacuum.c does this for itself.
         */
-       if ((options & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess())
+       if ((params->options & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess())
                pgstat_vacuum_stat();
 
        /*
@@ -209,9 +283,7 @@ vacuum(int options, RangeVar *relation, Oid relid, VacuumParams *params,
         */
        vac_context = AllocSetContextCreate(PortalContext,
                                                                                "Vacuum",
-                                                                               ALLOCSET_DEFAULT_MINSIZE,
-                                                                               ALLOCSET_DEFAULT_INITSIZE,
-                                                                               ALLOCSET_DEFAULT_MAXSIZE);
+                                                                               ALLOCSET_DEFAULT_SIZES);
 
        /*
         * If caller didn't give us a buffer strategy object, make one in the
@@ -227,10 +299,29 @@ vacuum(int options, RangeVar *relation, Oid relid, VacuumParams *params,
        vac_strategy = bstrategy;
 
        /*
-        * Build list of relations to process, unless caller gave us one. (If we
-        * build one, we put it in vac_context for safekeeping.)
+        * Build list of relation(s) to process, putting any new data in
+        * vac_context for safekeeping.
         */
-       relations = get_rel_oids(relid, relation);
+       if (relations != NIL)
+       {
+               List       *newrels = NIL;
+               ListCell   *lc;
+
+               foreach(lc, relations)
+               {
+                       VacuumRelation *vrel = lfirst_node(VacuumRelation, lc);
+                       List       *sublist;
+                       MemoryContext old_context;
+
+                       sublist = expand_vacuum_rel(vrel, params->options);
+                       old_context = MemoryContextSwitchTo(vac_context);
+                       newrels = list_concat(newrels, sublist);
+                       MemoryContextSwitchTo(old_context);
+               }
+               relations = newrels;
+       }
+       else
+               relations = get_all_vacuum_rels(params->options);
 
        /*
         * Decide whether we need to start/commit our own transactions.
@@ -246,11 +337,11 @@ vacuum(int options, RangeVar *relation, Oid relid, VacuumParams *params,
         * transaction block, and also in an autovacuum worker, use own
         * transactions so we can release locks sooner.
         */
-       if (options & VACOPT_VACUUM)
+       if (params->options & VACOPT_VACUUM)
                use_own_xacts = true;
        else
        {
-               Assert(options & VACOPT_ANALYZE);
+               Assert(params->options & VACOPT_ANALYZE);
                if (IsAutoVacuumWorkerProcess())
                        use_own_xacts = true;
                else if (in_outer_xact)
@@ -281,7 +372,7 @@ vacuum(int options, RangeVar *relation, Oid relid, VacuumParams *params,
                CommitTransactionCommand();
        }
 
-       /* Turn vacuum cost accounting on or off */
+       /* Turn vacuum cost accounting on or off, and set/clear in_vacuum */
        PG_TRY();
        {
                ListCell   *cur;
@@ -298,15 +389,15 @@ vacuum(int options, RangeVar *relation, Oid relid, VacuumParams *params,
                 */
                foreach(cur, relations)
                {
-                       Oid                     relid = lfirst_oid(cur);
+                       VacuumRelation *vrel = lfirst_node(VacuumRelation, cur);
 
-                       if (options & VACOPT_VACUUM)
+                       if (params->options & VACOPT_VACUUM)
                        {
-                               if (!vacuum_rel(relid, relation, options, params))
+                               if (!vacuum_rel(vrel->oid, vrel->relation, params))
                                        continue;
                        }
 
-                       if (options & VACOPT_ANALYZE)
+                       if (params->options & VACOPT_ANALYZE)
                        {
                                /*
                                 * If using separate xacts, start one for analyze. Otherwise,
@@ -319,14 +410,23 @@ vacuum(int options, RangeVar *relation, Oid relid, VacuumParams *params,
                                        PushActiveSnapshot(GetTransactionSnapshot());
                                }
 
-                               analyze_rel(relid, relation, options, params,
-                                                       va_cols, in_outer_xact, vac_strategy);
+                               analyze_rel(vrel->oid, vrel->relation, params,
+                                                       vrel->va_cols, in_outer_xact, vac_strategy);
 
                                if (use_own_xacts)
                                {
                                        PopActiveSnapshot();
                                        CommitTransactionCommand();
                                }
+                               else
+                               {
+                                       /*
+                                        * If we're not using separate xacts, better separate the
+                                        * ANALYZE actions with CCIs.  This avoids trouble if user
+                                        * says "ANALYZE t, t".
+                                        */
+                                       CommandCounterIncrement();
+                               }
                        }
                }
        }
@@ -355,10 +455,10 @@ vacuum(int options, RangeVar *relation, Oid relid, VacuumParams *params,
                StartTransactionCommand();
        }
 
-       if ((options & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess())
+       if ((params->options & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess())
        {
                /*
-                * Update pg_database.datfrozenxid, and truncate pg_clog if possible.
+                * Update pg_database.datfrozenxid, and truncate pg_xact if possible.
                 * (autovacuum.c does this for itself.)
                 */
                vac_update_datfrozenxid();
@@ -374,82 +474,389 @@ vacuum(int options, RangeVar *relation, Oid relid, VacuumParams *params,
 }
 
 /*
- * Build a list of Oids for each relation to be processed
+ * Check if a given relation can be safely vacuumed or analyzed.  If the
+ * user is not the relation owner, issue a WARNING log message and return
+ * false to let the caller decide what to do with this relation.  This
+ * routine is used to decide if a relation can be processed for VACUUM or
+ * ANALYZE.
+ */
+bool
+vacuum_is_relation_owner(Oid relid, Form_pg_class reltuple, int options)
+{
+       char       *relname;
+
+       Assert((options & (VACOPT_VACUUM | VACOPT_ANALYZE)) != 0);
+
+       /*
+        * Check permissions.
+        *
+        * We allow the user to vacuum or analyze a table if he is superuser, the
+        * table owner, or the database owner (but in the latter case, only if
+        * it's not a shared relation).  pg_class_ownercheck includes the
+        * superuser case.
+        *
+        * Note we choose to treat permissions failure as a WARNING and keep
+        * trying to vacuum or analyze the rest of the DB --- is this appropriate?
+        */
+       if (pg_class_ownercheck(relid, GetUserId()) ||
+               (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !reltuple->relisshared))
+               return true;
+
+       relname = NameStr(reltuple->relname);
+
+       if ((options & VACOPT_VACUUM) != 0)
+       {
+               if (reltuple->relisshared)
+                       ereport(WARNING,
+                                       (errmsg("skipping \"%s\" --- only superuser can vacuum it",
+                                                       relname)));
+               else if (reltuple->relnamespace == PG_CATALOG_NAMESPACE)
+                       ereport(WARNING,
+                                       (errmsg("skipping \"%s\" --- only superuser or database owner can vacuum it",
+                                                       relname)));
+               else
+                       ereport(WARNING,
+                                       (errmsg("skipping \"%s\" --- only table or database owner can vacuum it",
+                                                       relname)));
+
+               /*
+                * For VACUUM ANALYZE, both logs could show up, but just generate
+                * information for VACUUM as that would be the first one to be
+                * processed.
+                */
+               return false;
+       }
+
+       if ((options & VACOPT_ANALYZE) != 0)
+       {
+               if (reltuple->relisshared)
+                       ereport(WARNING,
+                                       (errmsg("skipping \"%s\" --- only superuser can analyze it",
+                                                       relname)));
+               else if (reltuple->relnamespace == PG_CATALOG_NAMESPACE)
+                       ereport(WARNING,
+                                       (errmsg("skipping \"%s\" --- only superuser or database owner can analyze it",
+                                                       relname)));
+               else
+                       ereport(WARNING,
+                                       (errmsg("skipping \"%s\" --- only table or database owner can analyze it",
+                                                       relname)));
+       }
+
+       return false;
+}
+
+
+/*
+ * vacuum_open_relation
  *
- * The list is built in vac_context so that it will survive across our
- * per-relation transactions.
+ * This routine is used for attempting to open and lock a relation which
+ * is going to be vacuumed or analyzed.  If the relation cannot be opened
+ * or locked, a log is emitted if possible.
  */
-static List *
-get_rel_oids(Oid relid, const RangeVar *vacrel)
+Relation
+vacuum_open_relation(Oid relid, RangeVar *relation, int options,
+                                        bool verbose, LOCKMODE lmode)
 {
-       List       *oid_list = NIL;
-       MemoryContext oldcontext;
+       Relation        onerel;
+       bool            rel_lock = true;
+       int                     elevel;
+
+       Assert((options & (VACOPT_VACUUM | VACOPT_ANALYZE)) != 0);
 
-       /* OID supplied by VACUUM's caller? */
-       if (OidIsValid(relid))
+       /*
+        * Open the relation and get the appropriate lock on it.
+        *
+        * There's a race condition here: the relation may have gone away since
+        * the last time we saw it.  If so, we don't need to vacuum or analyze it.
+        *
+        * If we've been asked not to wait for the relation lock, acquire it first
+        * in non-blocking mode, before calling try_relation_open().
+        */
+       if (!(options & VACOPT_SKIP_LOCKED))
+               onerel = try_relation_open(relid, lmode);
+       else if (ConditionalLockRelationOid(relid, lmode))
+               onerel = try_relation_open(relid, NoLock);
+       else
        {
-               oldcontext = MemoryContextSwitchTo(vac_context);
-               oid_list = lappend_oid(oid_list, relid);
-               MemoryContextSwitchTo(oldcontext);
+               onerel = NULL;
+               rel_lock = false;
        }
-       else if (vacrel)
+
+       /* if relation is opened, leave */
+       if (onerel)
+               return onerel;
+
+       /*
+        * Relation could not be opened, hence generate if possible a log
+        * informing on the situation.
+        *
+        * If the RangeVar is not defined, we do not have enough information to
+        * provide a meaningful log statement.  Chances are that the caller has
+        * intentionally not provided this information so that this logging is
+        * skipped, anyway.
+        */
+       if (relation == NULL)
+               return NULL;
+
+       /*
+        * Determine the log level.
+        *
+        * For manual VACUUM or ANALYZE, we emit a WARNING to match the log
+        * statements in the permission checks; otherwise, only log if the caller
+        * so requested.
+        */
+       if (!IsAutoVacuumWorkerProcess())
+               elevel = WARNING;
+       else if (verbose)
+               elevel = LOG;
+       else
+               return NULL;
+
+       if ((options & VACOPT_VACUUM) != 0)
        {
-               /* Process a specific relation */
-               Oid                     relid;
+               if (!rel_lock)
+                       ereport(elevel,
+                                       (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+                                        errmsg("skipping vacuum of \"%s\" --- lock not available",
+                                                       relation->relname)));
+               else
+                       ereport(elevel,
+                                       (errcode(ERRCODE_UNDEFINED_TABLE),
+                                        errmsg("skipping vacuum of \"%s\" --- relation no longer exists",
+                                                       relation->relname)));
 
                /*
-                * Since we don't take a lock here, the relation might be gone, or the
-                * RangeVar might no longer refer to the OID we look up here.  In the
-                * former case, VACUUM will do nothing; in the latter case, it will
-                * process the OID we looked up here, rather than the new one. Neither
-                * is ideal, but there's little practical alternative, since we're
-                * going to commit this transaction and begin a new one between now
-                * and then.
+                * For VACUUM ANALYZE, both logs could show up, but just generate
+                * information for VACUUM as that would be the first one to be
+                * processed.
                 */
-               relid = RangeVarGetRelid(vacrel, NoLock, false);
+               return NULL;
+       }
+
+       if ((options & VACOPT_ANALYZE) != 0)
+       {
+               if (!rel_lock)
+                       ereport(elevel,
+                                       (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+                                        errmsg("skipping analyze of \"%s\" --- lock not available",
+                                                       relation->relname)));
+               else
+                       ereport(elevel,
+                                       (errcode(ERRCODE_UNDEFINED_TABLE),
+                                        errmsg("skipping analyze of \"%s\" --- relation no longer exists",
+                                                       relation->relname)));
+       }
+
+       return NULL;
+}
+
+
+/*
+ * Given a VacuumRelation, fill in the table OID if it wasn't specified,
+ * and optionally add VacuumRelations for partitions of the table.
+ *
+ * If a VacuumRelation does not have an OID supplied and is a partitioned
+ * table, an extra entry will be added to the output for each partition.
+ * Presently, only autovacuum supplies OIDs when calling vacuum(), and
+ * it does not want us to expand partitioned tables.
+ *
+ * We take care not to modify the input data structure, but instead build
+ * new VacuumRelation(s) to return.  (But note that they will reference
+ * unmodified parts of the input, eg column lists.)  New data structures
+ * are made in vac_context.
+ */
+static List *
+expand_vacuum_rel(VacuumRelation *vrel, int options)
+{
+       List       *vacrels = NIL;
+       MemoryContext oldcontext;
 
-               /* Make a relation list entry for this guy */
+       /* If caller supplied OID, there's nothing we need do here. */
+       if (OidIsValid(vrel->oid))
+       {
                oldcontext = MemoryContextSwitchTo(vac_context);
-               oid_list = lappend_oid(oid_list, relid);
+               vacrels = lappend(vacrels, vrel);
                MemoryContextSwitchTo(oldcontext);
        }
        else
        {
-               /*
-                * Process all plain relations and materialized views listed in
-                * pg_class
-                */
-               Relation        pgclass;
-               HeapScanDesc scan;
+               /* Process a specific relation, and possibly partitions thereof */
+               Oid                     relid;
                HeapTuple       tuple;
+               Form_pg_class classForm;
+               bool            include_parts;
+               int                     rvr_opts;
 
-               pgclass = heap_open(RelationRelationId, AccessShareLock);
+               /*
+                * Since autovacuum workers supply OIDs when calling vacuum(), no
+                * autovacuum worker should reach this code.
+                */
+               Assert(!IsAutoVacuumWorkerProcess());
 
-               scan = heap_beginscan_catalog(pgclass, 0, NULL);
+               /*
+                * We transiently take AccessShareLock to protect the syscache lookup
+                * below, as well as find_all_inheritors's expectation that the caller
+                * holds some lock on the starting relation.
+                */
+               rvr_opts = (options & VACOPT_SKIP_LOCKED) ? RVR_SKIP_LOCKED : 0;
+               relid = RangeVarGetRelidExtended(vrel->relation,
+                                                                                AccessShareLock,
+                                                                                rvr_opts,
+                                                                                NULL, NULL);
 
-               while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+               /*
+                * If the lock is unavailable, emit the same log statement that
+                * vacuum_rel() and analyze_rel() would.
+                */
+               if (!OidIsValid(relid))
                {
-                       Form_pg_class classForm = (Form_pg_class) GETSTRUCT(tuple);
+                       if (options & VACOPT_VACUUM)
+                               ereport(WARNING,
+                                               (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+                                                errmsg("skipping vacuum of \"%s\" --- lock not available",
+                                                               vrel->relation->relname)));
+                       else
+                               ereport(WARNING,
+                                               (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+                                                errmsg("skipping analyze of \"%s\" --- lock not available",
+                                                               vrel->relation->relname)));
+                       return vacrels;
+               }
 
-                       if (classForm->relkind != RELKIND_RELATION &&
-                               classForm->relkind != RELKIND_MATVIEW)
-                               continue;
+               /*
+                * To check whether the relation is a partitioned table and its
+                * ownership, fetch its syscache entry.
+                */
+               tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
+               if (!HeapTupleIsValid(tuple))
+                       elog(ERROR, "cache lookup failed for relation %u", relid);
+               classForm = (Form_pg_class) GETSTRUCT(tuple);
 
-                       /* Make a relation list entry for this guy */
+               /*
+                * Make a returnable VacuumRelation for this rel if user is a proper
+                * owner.
+                */
+               if (vacuum_is_relation_owner(relid, classForm, options))
+               {
                        oldcontext = MemoryContextSwitchTo(vac_context);
-                       oid_list = lappend_oid(oid_list, HeapTupleGetOid(tuple));
+                       vacrels = lappend(vacrels, makeVacuumRelation(vrel->relation,
+                                                                                                                 relid,
+                                                                                                                 vrel->va_cols));
                        MemoryContextSwitchTo(oldcontext);
                }
 
-               heap_endscan(scan);
-               heap_close(pgclass, AccessShareLock);
+
+               include_parts = (classForm->relkind == RELKIND_PARTITIONED_TABLE);
+               ReleaseSysCache(tuple);
+
+               /*
+                * If it is, make relation list entries for its partitions.  Note that
+                * the list returned by find_all_inheritors() includes the passed-in
+                * OID, so we have to skip that.  There's no point in taking locks on
+                * the individual partitions yet, and doing so would just add
+                * unnecessary deadlock risk.  For this last reason we do not check
+                * yet the ownership of the partitions, which get added to the list to
+                * process.  Ownership will be checked later on anyway.
+                */
+               if (include_parts)
+               {
+                       List       *part_oids = find_all_inheritors(relid, NoLock, NULL);
+                       ListCell   *part_lc;
+
+                       foreach(part_lc, part_oids)
+                       {
+                               Oid                     part_oid = lfirst_oid(part_lc);
+
+                               if (part_oid == relid)
+                                       continue;       /* ignore original table */
+
+                               /*
+                                * We omit a RangeVar since it wouldn't be appropriate to
+                                * complain about failure to open one of these relations
+                                * later.
+                                */
+                               oldcontext = MemoryContextSwitchTo(vac_context);
+                               vacrels = lappend(vacrels, makeVacuumRelation(NULL,
+                                                                                                                         part_oid,
+                                                                                                                         vrel->va_cols));
+                               MemoryContextSwitchTo(oldcontext);
+                       }
+               }
+
+               /*
+                * Release lock again.  This means that by the time we actually try to
+                * process the table, it might be gone or renamed.  In the former case
+                * we'll silently ignore it; in the latter case we'll process it
+                * anyway, but we must beware that the RangeVar doesn't necessarily
+                * identify it anymore.  This isn't ideal, perhaps, but there's little
+                * practical alternative, since we're typically going to commit this
+                * transaction and begin a new one between now and then.  Moreover,
+                * holding locks on multiple relations would create significant risk
+                * of deadlock.
+                */
+               UnlockRelationOid(relid, AccessShareLock);
        }
 
-       return oid_list;
+       return vacrels;
 }
 
 /*
- * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
+ * Construct a list of VacuumRelations for all vacuumable rels in
+ * the current database.  The list is built in vac_context.
+ */
+static List *
+get_all_vacuum_rels(int options)
+{
+       List       *vacrels = NIL;
+       Relation        pgclass;
+       TableScanDesc scan;
+       HeapTuple       tuple;
+
+       pgclass = table_open(RelationRelationId, AccessShareLock);
+
+       scan = table_beginscan_catalog(pgclass, 0, NULL);
+
+       while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+       {
+               Form_pg_class classForm = (Form_pg_class) GETSTRUCT(tuple);
+               MemoryContext oldcontext;
+               Oid                     relid = classForm->oid;
+
+               /* check permissions of relation */
+               if (!vacuum_is_relation_owner(relid, classForm, options))
+                       continue;
+
+               /*
+                * We include partitioned tables here; depending on which operation is
+                * to be performed, caller will decide whether to process or ignore
+                * them.
+                */
+               if (classForm->relkind != RELKIND_RELATION &&
+                       classForm->relkind != RELKIND_MATVIEW &&
+                       classForm->relkind != RELKIND_PARTITIONED_TABLE)
+                       continue;
+
+               /*
+                * Build VacuumRelation(s) specifying the table OIDs to be processed.
+                * We omit a RangeVar since it wouldn't be appropriate to complain
+                * about failure to open one of these relations later.
+                */
+               oldcontext = MemoryContextSwitchTo(vac_context);
+               vacrels = lappend(vacrels, makeVacuumRelation(NULL,
+                                                                                                         relid,
+                                                                                                         NIL));
+               MemoryContextSwitchTo(oldcontext);
+       }
+
+       table_endscan(scan);
+       table_close(pgclass, AccessShareLock);
+
+       return vacrels;
+}
+
+/*
+ * vacuum_set_xid_limits() -- compute oldestXmin and freeze cutoff points
  *
  * The output parameters are:
  * - oldestXmin is the cutoff value used to distinguish whether tuples are
@@ -499,7 +906,7 @@ vacuum_set_xid_limits(Relation rel,
         * always an independent transaction.
         */
        *oldestXmin =
-               TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, true), rel);
+               TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, PROCARRAY_FLAGS_VACUUM), rel);
 
        Assert(TransactionIdIsNormal(*oldestXmin));
 
@@ -535,7 +942,8 @@ vacuum_set_xid_limits(Relation rel,
        {
                ereport(WARNING,
                                (errmsg("oldest xmin is far in the past"),
-                                errhint("Close open transactions soon to avoid wraparound problems.")));
+                                errhint("Close open transactions soon to avoid wraparound problems.\n"
+                                                "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
                limit = *oldestXmin;
        }
 
@@ -645,16 +1053,17 @@ vacuum_set_xid_limits(Relation rel,
  * vac_estimate_reltuples() -- estimate the new value for pg_class.reltuples
  *
  *             If we scanned the whole relation then we should just use the count of
- *             live tuples seen; but if we did not, we should not trust the count
- *             unreservedly, especially not in VACUUM, which may have scanned a quite
- *             nonrandom subset of the table.  When we have only partial information,
- *             we take the old value of pg_class.reltuples as a measurement of the
+ *             live tuples seen; but if we did not, we should not blindly extrapolate
+ *             from that number, since VACUUM may have scanned a quite nonrandom
+ *             subset of the table.  When we have only partial information, we take
+ *             the old value of pg_class.reltuples as a measurement of the
  *             tuple density in the unscanned pages.
  *
- *             This routine is shared by VACUUM and ANALYZE.
+ *             Note: scanned_tuples should count only *live* tuples, since
+ *             pg_class.reltuples is defined that way.
  */
 double
-vac_estimate_reltuples(Relation relation, bool is_analyze,
+vac_estimate_reltuples(Relation relation,
                                           BlockNumber total_pages,
                                           BlockNumber scanned_pages,
                                           double scanned_tuples)
@@ -662,9 +1071,8 @@ vac_estimate_reltuples(Relation relation, bool is_analyze,
        BlockNumber old_rel_pages = relation->rd_rel->relpages;
        double          old_rel_tuples = relation->rd_rel->reltuples;
        double          old_density;
-       double          new_density;
-       double          multiplier;
-       double          updated_density;
+       double          unscanned_pages;
+       double          total_tuples;
 
        /* If we did scan the whole table, just use the count as-is */
        if (scanned_pages >= total_pages)
@@ -688,31 +1096,14 @@ vac_estimate_reltuples(Relation relation, bool is_analyze,
 
        /*
         * Okay, we've covered the corner cases.  The normal calculation is to
-        * convert the old measurement to a density (tuples per page), then update
-        * the density using an exponential-moving-average approach, and finally
-        * compute reltuples as updated_density * total_pages.
-        *
-        * For ANALYZE, the moving average multiplier is just the fraction of the
-        * table's pages we scanned.  This is equivalent to assuming that the
-        * tuple density in the unscanned pages didn't change.  Of course, it
-        * probably did, if the new density measurement is different. But over
-        * repeated cycles, the value of reltuples will converge towards the
-        * correct value, if repeated measurements show the same new density.
-        *
-        * For VACUUM, the situation is a bit different: we have looked at a
-        * nonrandom sample of pages, but we know for certain that the pages we
-        * didn't look at are precisely the ones that haven't changed lately.
-        * Thus, there is a reasonable argument for doing exactly the same thing
-        * as for the ANALYZE case, that is use the old density measurement as the
-        * value for the unscanned pages.
-        *
-        * This logic could probably use further refinement.
+        * convert the old measurement to a density (tuples per page), then
+        * estimate the number of tuples in the unscanned pages using that figure,
+        * and finally add on the number of tuples in the scanned pages.
         */
        old_density = old_rel_tuples / old_rel_pages;
-       new_density = scanned_tuples / scanned_pages;
-       multiplier = (double) scanned_pages / (double) total_pages;
-       updated_density = old_density + (new_density - old_density) * multiplier;
-       return floor(updated_density * total_pages + 0.5);
+       unscanned_pages = (double) total_pages - (double) scanned_pages;
+       total_tuples = old_density * unscanned_pages + scanned_tuples;
+       return floor(total_tuples + 0.5);
 }
 
 
@@ -751,6 +1142,9 @@ vac_estimate_reltuples(Relation relation, bool is_analyze,
  *             transaction.  This is OK since postponing the flag maintenance is
  *             always allowable.
  *
+ *             Note: num_tuples should count only *live* tuples, since
+ *             pg_class.reltuples is defined that way.
+ *
  *             This routine is shared by VACUUM and ANALYZE.
  */
 void
@@ -767,7 +1161,7 @@ vac_update_relstats(Relation relation,
        Form_pg_class pgcform;
        bool            dirty;
 
-       rd = heap_open(RelationRelationId, RowExclusiveLock);
+       rd = table_open(RelationRelationId, RowExclusiveLock);
 
        /* Fetch a copy of the tuple to scribble on */
        ctup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relid));
@@ -808,16 +1202,6 @@ vac_update_relstats(Relation relation,
                        dirty = true;
                }
 
-               /*
-                * If we have discovered that there are no indexes, then there's no
-                * primary key either.  This could be done more thoroughly...
-                */
-               if (pgcform->relhaspkey && !hasindex)
-               {
-                       pgcform->relhaspkey = false;
-                       dirty = true;
-               }
-
                /* We also clear relhasrules and relhastriggers if needed */
                if (pgcform->relhasrules && relation->rd_rules == NULL)
                {
@@ -868,7 +1252,7 @@ vac_update_relstats(Relation relation,
        if (dirty)
                heap_inplace_update(rd, ctup);
 
-       heap_close(rd, RowExclusiveLock);
+       table_close(rd, RowExclusiveLock);
 }
 
 
@@ -882,7 +1266,7 @@ vac_update_relstats(Relation relation,
  *             pg_class.relminmxid values.
  *
  *             If we are able to advance either pg_database value, also try to
- *             truncate pg_clog and pg_multixact.
+ *             truncate pg_xact and pg_multixact.
  *
  *             We violate transaction semantics here by overwriting the database's
  *             existing pg_database tuple with the new values.  This is reasonably
@@ -911,7 +1295,7 @@ vac_update_datfrozenxid(void)
         * committed pg_class entries for new tables; see AddNewRelationTuple().
         * So we cannot produce a wrong minimum by starting with this.
         */
-       newFrozenXid = GetOldestXmin(NULL, true);
+       newFrozenXid = GetOldestXmin(NULL, PROCARRAY_FLAGS_VACUUM);
 
        /*
         * Similarly, initialize the MultiXact "min" with the value that would be
@@ -931,7 +1315,7 @@ vac_update_datfrozenxid(void)
         * We must seqscan pg_class to find the minimum Xid, because there is no
         * index that can help us here.
         */
-       relation = heap_open(RelationRelationId, AccessShareLock);
+       relation = table_open(RelationRelationId, AccessShareLock);
 
        scan = systable_beginscan(relation, InvalidOid, false,
                                                          NULL, 0, NULL);
@@ -942,41 +1326,66 @@ vac_update_datfrozenxid(void)
 
                /*
                 * Only consider relations able to hold unfrozen XIDs (anything else
-                * should have InvalidTransactionId in relfrozenxid anyway.)
+                * should have InvalidTransactionId in relfrozenxid anyway).
                 */
                if (classForm->relkind != RELKIND_RELATION &&
                        classForm->relkind != RELKIND_MATVIEW &&
                        classForm->relkind != RELKIND_TOASTVALUE)
+               {
+                       Assert(!TransactionIdIsValid(classForm->relfrozenxid));
+                       Assert(!MultiXactIdIsValid(classForm->relminmxid));
                        continue;
-
-               Assert(TransactionIdIsNormal(classForm->relfrozenxid));
-               Assert(MultiXactIdIsValid(classForm->relminmxid));
+               }
 
                /*
+                * Some table AMs might not need per-relation xid / multixid horizons.
+                * It therefore seems reasonable to allow relfrozenxid and relminmxid
+                * to not be set (i.e. set to their respective Invalid*Id)
+                * independently. Thus validate and compute horizon for each only if
+                * set.
+                *
                 * If things are working properly, no relation should have a
                 * relfrozenxid or relminmxid that is "in the future".  However, such
                 * cases have been known to arise due to bugs in pg_upgrade.  If we
                 * see any entries that are "in the future", chicken out and don't do
-                * anything.  This ensures we won't truncate clog before those
-                * relations have been scanned and cleaned up.
+                * anything.  This ensures we won't truncate clog & multixact SLRUs
+                * before those relations have been scanned and cleaned up.
                 */
-               if (TransactionIdPrecedes(lastSaneFrozenXid, classForm->relfrozenxid) ||
-                       MultiXactIdPrecedes(lastSaneMinMulti, classForm->relminmxid))
+
+               if (TransactionIdIsValid(classForm->relfrozenxid))
                {
-                       bogus = true;
-                       break;
+                       Assert(TransactionIdIsNormal(classForm->relfrozenxid));
+
+                       /* check for values in the future */
+                       if (TransactionIdPrecedes(lastSaneFrozenXid, classForm->relfrozenxid))
+                       {
+                               bogus = true;
+                               break;
+                       }
+
+                       /* determine new horizon */
+                       if (TransactionIdPrecedes(classForm->relfrozenxid, newFrozenXid))
+                               newFrozenXid = classForm->relfrozenxid;
                }
 
-               if (TransactionIdPrecedes(classForm->relfrozenxid, newFrozenXid))
-                       newFrozenXid = classForm->relfrozenxid;
+               if (MultiXactIdIsValid(classForm->relminmxid))
+               {
+                       /* check for values in the future */
+                       if (MultiXactIdPrecedes(lastSaneMinMulti, classForm->relminmxid))
+                       {
+                               bogus = true;
+                               break;
+                       }
 
-               if (MultiXactIdPrecedes(classForm->relminmxid, newMinMulti))
-                       newMinMulti = classForm->relminmxid;
+                       /* determine new horizon */
+                       if (MultiXactIdPrecedes(classForm->relminmxid, newMinMulti))
+                               newMinMulti = classForm->relminmxid;
+               }
        }
 
        /* we're done with pg_class */
        systable_endscan(scan);
-       heap_close(relation, AccessShareLock);
+       table_close(relation, AccessShareLock);
 
        /* chicken out if bogus data found */
        if (bogus)
@@ -986,7 +1395,7 @@ vac_update_datfrozenxid(void)
        Assert(MultiXactIdIsValid(newMinMulti));
 
        /* Now fetch the pg_database tuple we need to update. */
-       relation = heap_open(DatabaseRelationId, RowExclusiveLock);
+       relation = table_open(DatabaseRelationId, RowExclusiveLock);
 
        /* Fetch a copy of the tuple to scribble on */
        tuple = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(MyDatabaseId));
@@ -1024,11 +1433,11 @@ vac_update_datfrozenxid(void)
                heap_inplace_update(relation, tuple);
 
        heap_freetuple(tuple);
-       heap_close(relation, RowExclusiveLock);
+       table_close(relation, RowExclusiveLock);
 
        /*
         * If we were able to advance datfrozenxid or datminmxid, see if we can
-        * truncate pg_clog and/or pg_multixact.  Also do it if the shared
+        * truncate pg_xact and/or pg_multixact.  Also do it if the shared
         * XID-wrap-limit info is stale, since this action will update that too.
         */
        if (dirty || ForceTransactionIdLimitUpdate())
@@ -1041,7 +1450,7 @@ vac_update_datfrozenxid(void)
  *     vac_truncate_clog() -- attempt to truncate the commit log
  *
  *             Scan pg_database to determine the system-wide oldest datfrozenxid,
- *             and use it to truncate the transaction commit log (pg_clog).
+ *             and use it to truncate the transaction commit log (pg_xact).
  *             Also update the XID wrap limit info maintained by varsup.c.
  *             Likewise for datminmxid.
  *
@@ -1062,7 +1471,7 @@ vac_truncate_clog(TransactionId frozenXID,
 {
        TransactionId nextXID = ReadNewTransactionId();
        Relation        relation;
-       HeapScanDesc scan;
+       TableScanDesc scan;
        HeapTuple       tuple;
        Oid                     oldestxid_datoid;
        Oid                     minmulti_datoid;
@@ -1088,12 +1497,12 @@ vac_truncate_clog(TransactionId frozenXID,
         * of the interlock against copying a DB containing an active backend.
         * Hence the new entry will not reduce the minimum.  Also, if two VACUUMs
         * concurrently modify the datfrozenxid's of different databases, the
-        * worst possible outcome is that pg_clog is not truncated as aggressively
+        * worst possible outcome is that pg_xact is not truncated as aggressively
         * as it could be.
         */
-       relation = heap_open(DatabaseRelationId, AccessShareLock);
+       relation = table_open(DatabaseRelationId, AccessShareLock);
 
-       scan = heap_beginscan_catalog(relation, 0, NULL);
+       scan = table_beginscan_catalog(relation, 0, NULL);
 
        while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
        {
@@ -1122,19 +1531,19 @@ vac_truncate_clog(TransactionId frozenXID,
                else if (TransactionIdPrecedes(datfrozenxid, frozenXID))
                {
                        frozenXID = datfrozenxid;
-                       oldestxid_datoid = HeapTupleGetOid(tuple);
+                       oldestxid_datoid = dbform->oid;
                }
 
                if (MultiXactIdPrecedes(datminmxid, minMulti))
                {
                        minMulti = datminmxid;
-                       minmulti_datoid = HeapTupleGetOid(tuple);
+                       minmulti_datoid = dbform->oid;
                }
        }
 
-       heap_endscan(scan);
+       table_endscan(scan);
 
-       heap_close(relation, AccessShareLock);
+       table_close(relation, AccessShareLock);
 
        /*
         * Do not truncate CLOG if we seem to have suffered wraparound already;
@@ -1154,10 +1563,19 @@ vac_truncate_clog(TransactionId frozenXID,
        if (bogus)
                return;
 
+       /*
+        * Advance the oldest value for commit timestamps before truncating, so
+        * that if a user requests a timestamp for a transaction we're truncating
+        * away right after this point, they get NULL instead of an ugly "file not
+        * found" error from slru.c.  This doesn't matter for xact/multixact
+        * because they are not subject to arbitrary lookups from users.
+        */
+       AdvanceOldestCommitTsXid(frozenXID);
+
        /*
         * Truncate CLOG, multixact and CommitTs to the oldest computed value.
         */
-       TruncateCLOG(frozenXID);
+       TruncateCLOG(frozenXID, oldestxid_datoid);
        TruncateCommitTs(frozenXID);
        TruncateMultiXact(minMulti, minmulti_datoid);
 
@@ -1168,14 +1586,21 @@ vac_truncate_clog(TransactionId frozenXID,
         * signalling twice?
         */
        SetTransactionIdLimit(frozenXID, oldestxid_datoid);
-       SetMultiXactIdLimit(minMulti, minmulti_datoid);
-       AdvanceOldestCommitTsXid(frozenXID);
+       SetMultiXactIdLimit(minMulti, minmulti_datoid, false);
 }
 
 
 /*
  *     vacuum_rel() -- vacuum one heap relation
  *
+ *             relid identifies the relation to vacuum.  If relation is supplied,
+ *             use the name therein for reporting any failure to open/lock the rel;
+ *             do not use it once we've successfully opened the rel, since it might
+ *             be stale.
+ *
+ *             Returns true if it's okay to proceed with a requested ANALYZE
+ *             operation on this table.
+ *
  *             Doing one heap at a time incurs extra overhead, since we need to
  *             check that the heap exists again just before we vacuum it.  The
  *             reason that we do this is so that vacuuming can be spread across
@@ -1185,7 +1610,7 @@ vac_truncate_clog(TransactionId frozenXID,
  *             At entry and exit, we are not inside a transaction.
  */
 static bool
-vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
+vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params)
 {
        LOCKMODE        lmode;
        Relation        onerel;
@@ -1206,7 +1631,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
         */
        PushActiveSnapshot(GetTransactionSnapshot());
 
-       if (!(options & VACOPT_FULL))
+       if (!(params->options & VACOPT_FULL))
        {
                /*
                 * In lazy vacuum, we can set the PROC_IN_VACUUM flag, which lets
@@ -1246,31 +1671,14 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
         * vacuum, but just ShareUpdateExclusiveLock for concurrent vacuum. Either
         * way, we can be sure that no other backend is vacuuming the same table.
         */
-       lmode = (options & VACOPT_FULL) ? AccessExclusiveLock : ShareUpdateExclusiveLock;
+       lmode = (params->options & VACOPT_FULL) ?
+               AccessExclusiveLock : ShareUpdateExclusiveLock;
 
-       /*
-        * Open the relation and get the appropriate lock on it.
-        *
-        * There's a race condition here: the rel may have gone away since the
-        * last time we saw it.  If so, we don't need to vacuum it.
-        *
-        * If we've been asked not to wait for the relation lock, acquire it first
-        * in non-blocking mode, before calling try_relation_open().
-        */
-       if (!(options & VACOPT_NOWAIT))
-               onerel = try_relation_open(relid, lmode);
-       else if (ConditionalLockRelationOid(relid, lmode))
-               onerel = try_relation_open(relid, NoLock);
-       else
-       {
-               onerel = NULL;
-               if (IsAutoVacuumWorkerProcess() && params->log_min_duration >= 0)
-                       ereport(LOG,
-                                       (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
-                                  errmsg("skipping vacuum of \"%s\" --- lock not available",
-                                                 relation->relname)));
-       }
+       /* open the relation and get the appropriate lock on it */
+       onerel = vacuum_open_relation(relid, relation, params->options,
+                                                                 params->log_min_duration >= 0, lmode);
 
+       /* leave if relation could not be opened or locked */
        if (!onerel)
        {
                PopActiveSnapshot();
@@ -1279,30 +1687,17 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
        }
 
        /*
-        * Check permissions.
-        *
-        * We allow the user to vacuum a table if he is superuser, the table
-        * owner, or the database owner (but in the latter case, only if it's not
-        * a shared relation).  pg_class_ownercheck includes the superuser case.
-        *
-        * Note we choose to treat permissions failure as a WARNING and keep
-        * trying to vacuum the rest of the DB --- is this appropriate?
+        * Check if relation needs to be skipped based on ownership.  This check
+        * happens also when building the relation list to vacuum for a manual
+        * operation, and needs to be done additionally here as VACUUM could
+        * happen across multiple transactions where relation ownership could have
+        * changed in-between.  Make sure to only generate logs for VACUUM in this
+        * case.
         */
-       if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
-                 (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
+       if (!vacuum_is_relation_owner(RelationGetRelid(onerel),
+                                                                 onerel->rd_rel,
+                                                                 params->options & VACOPT_VACUUM))
        {
-               if (onerel->rd_rel->relisshared)
-                       ereport(WARNING,
-                                 (errmsg("skipping \"%s\" --- only superuser can vacuum it",
-                                                 RelationGetRelationName(onerel))));
-               else if (onerel->rd_rel->relnamespace == PG_CATALOG_NAMESPACE)
-                       ereport(WARNING,
-                                       (errmsg("skipping \"%s\" --- only superuser or database owner can vacuum it",
-                                                       RelationGetRelationName(onerel))));
-               else
-                       ereport(WARNING,
-                                       (errmsg("skipping \"%s\" --- only table or database owner can vacuum it",
-                                                       RelationGetRelationName(onerel))));
                relation_close(onerel, lmode);
                PopActiveSnapshot();
                CommitTransactionCommand();
@@ -1310,13 +1705,12 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
        }
 
        /*
-        * Check that it's a vacuumable relation; we used to do this in
-        * get_rel_oids() but seems safer to check after we've locked the
-        * relation.
+        * Check that it's of a vacuumable relkind.
         */
        if (onerel->rd_rel->relkind != RELKIND_RELATION &&
                onerel->rd_rel->relkind != RELKIND_MATVIEW &&
-               onerel->rd_rel->relkind != RELKIND_TOASTVALUE)
+               onerel->rd_rel->relkind != RELKIND_TOASTVALUE &&
+               onerel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
        {
                ereport(WARNING,
                                (errmsg("skipping \"%s\" --- cannot vacuum non-tables or special system tables",
@@ -1342,6 +1736,20 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
                return false;
        }
 
+       /*
+        * Silently ignore partitioned tables as there is no work to be done.  The
+        * useful work is on their child partitions, which have been queued up for
+        * us separately.
+        */
+       if (onerel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+       {
+               relation_close(onerel, lmode);
+               PopActiveSnapshot();
+               CommitTransactionCommand();
+               /* It's OK to proceed with ANALYZE on this table */
+               return true;
+       }
+
        /*
         * Get a session-level lock too. This will protect our access to the
         * relation across multiple transactions, so that we can vacuum the
@@ -1355,12 +1763,32 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
        onerelid = onerel->rd_lockInfo.lockRelId;
        LockRelationIdForSession(&onerelid, lmode);
 
+       /* Set index cleanup option based on reloptions if not yet */
+       if (params->index_cleanup == VACOPT_TERNARY_DEFAULT)
+       {
+               if (onerel->rd_options == NULL ||
+                       ((StdRdOptions *) onerel->rd_options)->vacuum_index_cleanup)
+                       params->index_cleanup = VACOPT_TERNARY_ENABLED;
+               else
+                       params->index_cleanup = VACOPT_TERNARY_DISABLED;
+       }
+
+       /* Set truncate option based on reloptions if not yet */
+       if (params->truncate == VACOPT_TERNARY_DEFAULT)
+       {
+               if (onerel->rd_options == NULL ||
+                       ((StdRdOptions *) onerel->rd_options)->vacuum_truncate)
+                       params->truncate = VACOPT_TERNARY_ENABLED;
+               else
+                       params->truncate = VACOPT_TERNARY_DISABLED;
+       }
+
        /*
         * Remember the relation's TOAST relation for later, if the caller asked
         * us to process it.  In VACUUM FULL, though, the toast table is
         * automatically rebuilt by cluster_rel so we shouldn't recurse to it.
         */
-       if (!(options & VACOPT_SKIPTOAST) && !(options & VACOPT_FULL))
+       if (!(params->options & VACOPT_SKIPTOAST) && !(params->options & VACOPT_FULL))
                toast_relid = onerel->rd_rel->reltoastrelid;
        else
                toast_relid = InvalidOid;
@@ -1379,18 +1807,22 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
        /*
         * Do the actual work --- either FULL or "lazy" vacuum
         */
-       if (options & VACOPT_FULL)
+       if (params->options & VACOPT_FULL)
        {
+               int                     cluster_options = 0;
+
                /* close relation before vacuuming, but hold lock until commit */
                relation_close(onerel, NoLock);
                onerel = NULL;
 
+               if ((params->options & VACOPT_VERBOSE) != 0)
+                       cluster_options |= CLUOPT_VERBOSE;
+
                /* VACUUM FULL is now a variant of CLUSTER; see cluster.c */
-               cluster_rel(relid, InvalidOid, false,
-                                       (options & VACOPT_VERBOSE) != 0);
+               cluster_rel(relid, InvalidOid, cluster_options);
        }
        else
-               lazy_vacuum_rel(onerel, options, params, vac_strategy);
+               table_relation_vacuum(onerel, params, vac_strategy);
 
        /* Roll back any GUC changes executed by index functions */
        AtEOXact_GUC(false, save_nestlevel);
@@ -1416,7 +1848,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
         * totally unimportant for toast relations.
         */
        if (toast_relid != InvalidOid)
-               vacuum_rel(toast_relid, relation, options, params);
+               vacuum_rel(toast_relid, NULL, params);
 
        /*
         * Now release the session-level lock on the master table.
@@ -1433,7 +1865,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
  * specified kind of lock on each.  Return an array of Relation pointers for
  * the indexes into *Irel, and the number of indexes into *nindexes.
  *
- * We consider an index vacuumable if it is marked insertable (IndexIsReady).
+ * We consider an index vacuumable if it is marked insertable (indisready).
  * If it isn't, probably a CREATE INDEX CONCURRENTLY command failed early in
  * execution, and what we have is too corrupt to be processable.  We will
  * vacuum even if the index isn't indisvalid; this is important because in a
@@ -1468,7 +1900,7 @@ vac_open_indexes(Relation relation, LOCKMODE lockmode,
                Relation        indrel;
 
                indrel = index_open(indexoid, lockmode);
-               if (IndexIsReady(indrel->rd_index))
+               if (indrel->rd_index->indisready)
                        (*Irel)[i++] = indrel;
                else
                        index_close(indrel, lockmode);
@@ -1514,13 +1946,13 @@ vacuum_delay_point(void)
        if (VacuumCostActive && !InterruptPending &&
                VacuumCostBalance >= VacuumCostLimit)
        {
-               int                     msec;
+               double          msec;
 
                msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit;
                if (msec > VacuumCostDelay * 4)
                        msec = VacuumCostDelay * 4;
 
-               pg_usleep(msec * 1000L);
+               pg_usleep((long) (msec * 1000));
 
                VacuumCostBalance = 0;
 
@@ -1531,3 +1963,15 @@ vacuum_delay_point(void)
                CHECK_FOR_INTERRUPTS();
        }
 }
+
+/*
+ * A wrapper function of defGetBoolean().
+ *
+ * This function returns VACOPT_TERNARY_ENABLED and VACOPT_TERNARY_DISABLED
+ * instead of true and false.
+ */
+static VacOptTernaryValue
+get_vacopt_ternary_value(DefElem *def)
+{
+       return defGetBoolean(def) ? VACOPT_TERNARY_ENABLED : VACOPT_TERNARY_DISABLED;
+}