]> granicus.if.org Git - postgresql/blobdiff - src/backend/commands/vacuum.c
Fix "ANALYZE t, t" inside a transaction block.
[postgresql] / src / backend / commands / vacuum.c
index b50c554c517677b9076a4e28fb7d67ff5d98f6c9..7d6c50b49d9301bbacdbfd280d5633a23cc91c93 100644 (file)
@@ -9,7 +9,7 @@
  * in cluster.c.
  *
  *
- * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
 #include "access/heapam.h"
 #include "access/htup_details.h"
 #include "access/multixact.h"
+#include "access/tableam.h"
 #include "access/transam.h"
 #include "access/xact.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_database.h"
-#include "catalog/pg_inherits_fn.h"
+#include "catalog/pg_inherits.h"
 #include "catalog/pg_namespace.h"
 #include "commands/cluster.h"
+#include "commands/defrem.h"
 #include "commands/vacuum.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
@@ -50,7 +52,6 @@
 #include "utils/memutils.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
-#include "utils/tqual.h"
 
 
 /*
@@ -68,14 +69,14 @@ static BufferAccessStrategy vac_strategy;
 
 
 /* non-export function prototypes */
-static List *expand_vacuum_rel(VacuumRelation *vrel);
-static List *get_all_vacuum_rels(void);
+static List *expand_vacuum_rel(VacuumRelation *vrel, int options);
+static List *get_all_vacuum_rels(int options);
 static void vac_truncate_clog(TransactionId frozenXID,
-                                 MultiXactId minMulti,
-                                 TransactionId lastSaneFrozenXid,
-                                 MultiXactId lastSaneMinMulti);
-static bool vacuum_rel(Oid relid, RangeVar *relation, int options,
-                  VacuumParams *params);
+                                                         MultiXactId minMulti,
+                                                         TransactionId lastSaneFrozenXid,
+                                                         MultiXactId lastSaneMinMulti);
+static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params);
+static VacOptTernaryValue get_vacopt_ternary_value(DefElem *def);
 
 /*
  * Primary entry point for manual VACUUM and ANALYZE commands
@@ -84,20 +85,77 @@ static bool vacuum_rel(Oid relid, RangeVar *relation, int options,
  * happen in vacuum().
  */
 void
-ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel)
+ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
 {
        VacuumParams params;
+       bool            verbose = false;
+       bool            skip_locked = false;
+       bool            analyze = false;
+       bool            freeze = false;
+       bool            full = false;
+       bool            disable_page_skipping = false;
+       ListCell   *lc;
+
+       /* Set default value */
+       params.index_cleanup = VACOPT_TERNARY_DEFAULT;
+       params.truncate = VACOPT_TERNARY_DEFAULT;
+
+       /* Parse options list */
+       foreach(lc, vacstmt->options)
+       {
+               DefElem    *opt = (DefElem *) lfirst(lc);
+
+               /* Parse common options for VACUUM and ANALYZE */
+               if (strcmp(opt->defname, "verbose") == 0)
+                       verbose = defGetBoolean(opt);
+               else if (strcmp(opt->defname, "skip_locked") == 0)
+                       skip_locked = defGetBoolean(opt);
+               else if (!vacstmt->is_vacuumcmd)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_SYNTAX_ERROR),
+                                        errmsg("unrecognized ANALYZE option \"%s\"", opt->defname),
+                                        parser_errposition(pstate, opt->location)));
+
+               /* Parse options available on VACUUM */
+               else if (strcmp(opt->defname, "analyze") == 0)
+                       analyze = defGetBoolean(opt);
+               else if (strcmp(opt->defname, "freeze") == 0)
+                       freeze = defGetBoolean(opt);
+               else if (strcmp(opt->defname, "full") == 0)
+                       full = defGetBoolean(opt);
+               else if (strcmp(opt->defname, "disable_page_skipping") == 0)
+                       disable_page_skipping = defGetBoolean(opt);
+               else if (strcmp(opt->defname, "index_cleanup") == 0)
+                       params.index_cleanup = get_vacopt_ternary_value(opt);
+               else if (strcmp(opt->defname, "truncate") == 0)
+                       params.truncate = get_vacopt_ternary_value(opt);
+               else
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_SYNTAX_ERROR),
+                                        errmsg("unrecognized VACUUM option \"%s\"", opt->defname),
+                                        parser_errposition(pstate, opt->location)));
+       }
+
+       /* Set vacuum options */
+       params.options =
+               (vacstmt->is_vacuumcmd ? VACOPT_VACUUM : VACOPT_ANALYZE) |
+               (verbose ? VACOPT_VERBOSE : 0) |
+               (skip_locked ? VACOPT_SKIP_LOCKED : 0) |
+               (analyze ? VACOPT_ANALYZE : 0) |
+               (freeze ? VACOPT_FREEZE : 0) |
+               (full ? VACOPT_FULL : 0) |
+               (disable_page_skipping ? VACOPT_DISABLE_PAGE_SKIPPING : 0);
 
        /* sanity checks on options */
-       Assert(vacstmt->options & (VACOPT_VACUUM | VACOPT_ANALYZE));
-       Assert((vacstmt->options & VACOPT_VACUUM) ||
-                  !(vacstmt->options & (VACOPT_FULL | VACOPT_FREEZE)));
-       Assert(!(vacstmt->options & VACOPT_SKIPTOAST));
+       Assert(params.options & (VACOPT_VACUUM | VACOPT_ANALYZE));
+       Assert((params.options & VACOPT_VACUUM) ||
+                  !(params.options & (VACOPT_FULL | VACOPT_FREEZE)));
+       Assert(!(params.options & VACOPT_SKIPTOAST));
 
        /*
         * Make sure VACOPT_ANALYZE is specified if any column lists are present.
         */
-       if (!(vacstmt->options & VACOPT_ANALYZE))
+       if (!(params.options & VACOPT_ANALYZE))
        {
                ListCell   *lc;
 
@@ -116,7 +174,7 @@ ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel)
         * All freeze ages are zero if the FREEZE option is given; otherwise pass
         * them as -1 which means to use the default values.
         */
-       if (vacstmt->options & VACOPT_FREEZE)
+       if (params.options & VACOPT_FREEZE)
        {
                params.freeze_min_age = 0;
                params.freeze_table_age = 0;
@@ -138,14 +196,12 @@ ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel)
        params.log_min_duration = -1;
 
        /* Now go through the common routine */
-       vacuum(vacstmt->options, vacstmt->rels, &params, NULL, isTopLevel);
+       vacuum(vacstmt->rels, &params, NULL, isTopLevel);
 }
 
 /*
  * Internal entry point for VACUUM and ANALYZE commands.
  *
- * options is a bitmask of VacuumOption flags, indicating what to do.
- *
  * relations, if not NIL, is a list of VacuumRelation to process; otherwise,
  * we process all relevant tables in the database.  For each VacuumRelation,
  * if a valid OID is supplied, the table with that OID is what to process;
@@ -163,7 +219,7 @@ ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel)
  * memory context that will not disappear at transaction commit.
  */
 void
-vacuum(int options, List *relations, VacuumParams *params,
+vacuum(List *relations, VacuumParams *params,
           BufferAccessStrategy bstrategy, bool isTopLevel)
 {
        static bool in_vacuum = false;
@@ -174,7 +230,7 @@ vacuum(int options, List *relations, VacuumParams *params,
 
        Assert(params != NULL);
 
-       stmttype = (options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE";
+       stmttype = (params->options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE";
 
        /*
         * We cannot run VACUUM inside a user transaction block; if we were inside
@@ -184,13 +240,13 @@ vacuum(int options, List *relations, VacuumParams *params,
         *
         * ANALYZE (without VACUUM) can run either way.
         */
-       if (options & VACOPT_VACUUM)
+       if (params->options & VACOPT_VACUUM)
        {
-               PreventTransactionChain(isTopLevel, stmttype);
+               PreventInTransactionBlock(isTopLevel, stmttype);
                in_outer_xact = false;
        }
        else
-               in_outer_xact = IsInTransactionChain(isTopLevel);
+               in_outer_xact = IsInTransactionBlock(isTopLevel);
 
        /*
         * Due to static variables vac_context, anl_context and vac_strategy,
@@ -206,8 +262,8 @@ vacuum(int options, List *relations, VacuumParams *params,
        /*
         * Sanity check DISABLE_PAGE_SKIPPING option.
         */
-       if ((options & VACOPT_FULL) != 0 &&
-               (options & VACOPT_DISABLE_PAGE_SKIPPING) != 0)
+       if ((params->options & VACOPT_FULL) != 0 &&
+               (params->options & VACOPT_DISABLE_PAGE_SKIPPING) != 0)
                ereport(ERROR,
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                 errmsg("VACUUM option DISABLE_PAGE_SKIPPING cannot be used with FULL")));
@@ -216,7 +272,7 @@ vacuum(int options, List *relations, VacuumParams *params,
         * Send info about dead objects to the statistics collector, unless we are
         * in autovacuum --- autovacuum.c does this for itself.
         */
-       if ((options & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess())
+       if ((params->options & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess())
                pgstat_vacuum_stat();
 
        /*
@@ -257,7 +313,7 @@ vacuum(int options, List *relations, VacuumParams *params,
                        List       *sublist;
                        MemoryContext old_context;
 
-                       sublist = expand_vacuum_rel(vrel);
+                       sublist = expand_vacuum_rel(vrel, params->options);
                        old_context = MemoryContextSwitchTo(vac_context);
                        newrels = list_concat(newrels, sublist);
                        MemoryContextSwitchTo(old_context);
@@ -265,7 +321,7 @@ vacuum(int options, List *relations, VacuumParams *params,
                relations = newrels;
        }
        else
-               relations = get_all_vacuum_rels();
+               relations = get_all_vacuum_rels(params->options);
 
        /*
         * Decide whether we need to start/commit our own transactions.
@@ -281,11 +337,11 @@ vacuum(int options, List *relations, VacuumParams *params,
         * transaction block, and also in an autovacuum worker, use own
         * transactions so we can release locks sooner.
         */
-       if (options & VACOPT_VACUUM)
+       if (params->options & VACOPT_VACUUM)
                use_own_xacts = true;
        else
        {
-               Assert(options & VACOPT_ANALYZE);
+               Assert(params->options & VACOPT_ANALYZE);
                if (IsAutoVacuumWorkerProcess())
                        use_own_xacts = true;
                else if (in_outer_xact)
@@ -335,13 +391,13 @@ vacuum(int options, List *relations, VacuumParams *params,
                {
                        VacuumRelation *vrel = lfirst_node(VacuumRelation, cur);
 
-                       if (options & VACOPT_VACUUM)
+                       if (params->options & VACOPT_VACUUM)
                        {
-                               if (!vacuum_rel(vrel->oid, vrel->relation, options, params))
+                               if (!vacuum_rel(vrel->oid, vrel->relation, params))
                                        continue;
                        }
 
-                       if (options & VACOPT_ANALYZE)
+                       if (params->options & VACOPT_ANALYZE)
                        {
                                /*
                                 * If using separate xacts, start one for analyze. Otherwise,
@@ -354,7 +410,7 @@ vacuum(int options, List *relations, VacuumParams *params,
                                        PushActiveSnapshot(GetTransactionSnapshot());
                                }
 
-                               analyze_rel(vrel->oid, vrel->relation, options, params,
+                               analyze_rel(vrel->oid, vrel->relation, params,
                                                        vrel->va_cols, in_outer_xact, vac_strategy);
 
                                if (use_own_xacts)
@@ -362,6 +418,15 @@ vacuum(int options, List *relations, VacuumParams *params,
                                        PopActiveSnapshot();
                                        CommitTransactionCommand();
                                }
+                               else
+                               {
+                                       /*
+                                        * If we're not using separate xacts, better separate the
+                                        * ANALYZE actions with CCIs.  This avoids trouble if user
+                                        * says "ANALYZE t, t".
+                                        */
+                                       CommandCounterIncrement();
+                               }
                        }
                }
        }
@@ -390,7 +455,7 @@ vacuum(int options, List *relations, VacuumParams *params,
                StartTransactionCommand();
        }
 
-       if ((options & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess())
+       if ((params->options & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess())
        {
                /*
                 * Update pg_database.datfrozenxid, and truncate pg_xact if possible.
@@ -408,6 +473,185 @@ vacuum(int options, List *relations, VacuumParams *params,
        vac_context = NULL;
 }
 
+/*
+ * Check if a given relation can be safely vacuumed or analyzed.  If the
+ * user is not the relation owner, issue a WARNING log message and return
+ * false to let the caller decide what to do with this relation.  This
+ * routine is used to decide if a relation can be processed for VACUUM or
+ * ANALYZE.
+ */
+bool
+vacuum_is_relation_owner(Oid relid, Form_pg_class reltuple, int options)
+{
+       char       *relname;
+
+       Assert((options & (VACOPT_VACUUM | VACOPT_ANALYZE)) != 0);
+
+       /*
+        * Check permissions.
+        *
+        * We allow the user to vacuum or analyze a table if he is superuser, the
+        * table owner, or the database owner (but in the latter case, only if
+        * it's not a shared relation).  pg_class_ownercheck includes the
+        * superuser case.
+        *
+        * Note we choose to treat permissions failure as a WARNING and keep
+        * trying to vacuum or analyze the rest of the DB --- is this appropriate?
+        */
+       if (pg_class_ownercheck(relid, GetUserId()) ||
+               (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !reltuple->relisshared))
+               return true;
+
+       relname = NameStr(reltuple->relname);
+
+       if ((options & VACOPT_VACUUM) != 0)
+       {
+               if (reltuple->relisshared)
+                       ereport(WARNING,
+                                       (errmsg("skipping \"%s\" --- only superuser can vacuum it",
+                                                       relname)));
+               else if (reltuple->relnamespace == PG_CATALOG_NAMESPACE)
+                       ereport(WARNING,
+                                       (errmsg("skipping \"%s\" --- only superuser or database owner can vacuum it",
+                                                       relname)));
+               else
+                       ereport(WARNING,
+                                       (errmsg("skipping \"%s\" --- only table or database owner can vacuum it",
+                                                       relname)));
+
+               /*
+                * For VACUUM ANALYZE, both logs could show up, but just generate
+                * information for VACUUM as that would be the first one to be
+                * processed.
+                */
+               return false;
+       }
+
+       if ((options & VACOPT_ANALYZE) != 0)
+       {
+               if (reltuple->relisshared)
+                       ereport(WARNING,
+                                       (errmsg("skipping \"%s\" --- only superuser can analyze it",
+                                                       relname)));
+               else if (reltuple->relnamespace == PG_CATALOG_NAMESPACE)
+                       ereport(WARNING,
+                                       (errmsg("skipping \"%s\" --- only superuser or database owner can analyze it",
+                                                       relname)));
+               else
+                       ereport(WARNING,
+                                       (errmsg("skipping \"%s\" --- only table or database owner can analyze it",
+                                                       relname)));
+       }
+
+       return false;
+}
+
+
+/*
+ * vacuum_open_relation
+ *
+ * This routine is used for attempting to open and lock a relation which
+ * is going to be vacuumed or analyzed.  If the relation cannot be opened
+ * or locked, a log is emitted if possible.
+ */
+Relation
+vacuum_open_relation(Oid relid, RangeVar *relation, int options,
+                                        bool verbose, LOCKMODE lmode)
+{
+       Relation        onerel;
+       bool            rel_lock = true;
+       int                     elevel;
+
+       Assert((options & (VACOPT_VACUUM | VACOPT_ANALYZE)) != 0);
+
+       /*
+        * Open the relation and get the appropriate lock on it.
+        *
+        * There's a race condition here: the relation may have gone away since
+        * the last time we saw it.  If so, we don't need to vacuum or analyze it.
+        *
+        * If we've been asked not to wait for the relation lock, acquire it first
+        * in non-blocking mode, before calling try_relation_open().
+        */
+       if (!(options & VACOPT_SKIP_LOCKED))
+               onerel = try_relation_open(relid, lmode);
+       else if (ConditionalLockRelationOid(relid, lmode))
+               onerel = try_relation_open(relid, NoLock);
+       else
+       {
+               onerel = NULL;
+               rel_lock = false;
+       }
+
+       /* if relation is opened, leave */
+       if (onerel)
+               return onerel;
+
+       /*
+        * Relation could not be opened, hence generate if possible a log
+        * informing on the situation.
+        *
+        * If the RangeVar is not defined, we do not have enough information to
+        * provide a meaningful log statement.  Chances are that the caller has
+        * intentionally not provided this information so that this logging is
+        * skipped, anyway.
+        */
+       if (relation == NULL)
+               return NULL;
+
+       /*
+        * Determine the log level.
+        *
+        * For manual VACUUM or ANALYZE, we emit a WARNING to match the log
+        * statements in the permission checks; otherwise, only log if the caller
+        * so requested.
+        */
+       if (!IsAutoVacuumWorkerProcess())
+               elevel = WARNING;
+       else if (verbose)
+               elevel = LOG;
+       else
+               return NULL;
+
+       if ((options & VACOPT_VACUUM) != 0)
+       {
+               if (!rel_lock)
+                       ereport(elevel,
+                                       (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+                                        errmsg("skipping vacuum of \"%s\" --- lock not available",
+                                                       relation->relname)));
+               else
+                       ereport(elevel,
+                                       (errcode(ERRCODE_UNDEFINED_TABLE),
+                                        errmsg("skipping vacuum of \"%s\" --- relation no longer exists",
+                                                       relation->relname)));
+
+               /*
+                * For VACUUM ANALYZE, both logs could show up, but just generate
+                * information for VACUUM as that would be the first one to be
+                * processed.
+                */
+               return NULL;
+       }
+
+       if ((options & VACOPT_ANALYZE) != 0)
+       {
+               if (!rel_lock)
+                       ereport(elevel,
+                                       (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+                                        errmsg("skipping analyze of \"%s\" --- lock not available",
+                                                       relation->relname)));
+               else
+                       ereport(elevel,
+                                       (errcode(ERRCODE_UNDEFINED_TABLE),
+                                        errmsg("skipping analyze of \"%s\" --- relation no longer exists",
+                                                       relation->relname)));
+       }
+
+       return NULL;
+}
+
+
 /*
  * Given a VacuumRelation, fill in the table OID if it wasn't specified,
  * and optionally add VacuumRelations for partitions of the table.
@@ -423,7 +667,7 @@ vacuum(int options, List *relations, VacuumParams *params,
  * are made in vac_context.
  */
 static List *
-expand_vacuum_rel(VacuumRelation *vrel)
+expand_vacuum_rel(VacuumRelation *vrel, int options)
 {
        List       *vacrels = NIL;
        MemoryContext oldcontext;
@@ -442,31 +686,67 @@ expand_vacuum_rel(VacuumRelation *vrel)
                HeapTuple       tuple;
                Form_pg_class classForm;
                bool            include_parts;
+               int                     rvr_opts;
+
+               /*
+                * Since autovacuum workers supply OIDs when calling vacuum(), no
+                * autovacuum worker should reach this code.
+                */
+               Assert(!IsAutoVacuumWorkerProcess());
 
                /*
                 * We transiently take AccessShareLock to protect the syscache lookup
                 * below, as well as find_all_inheritors's expectation that the caller
                 * holds some lock on the starting relation.
                 */
-               relid = RangeVarGetRelid(vrel->relation, AccessShareLock, false);
+               rvr_opts = (options & VACOPT_SKIP_LOCKED) ? RVR_SKIP_LOCKED : 0;
+               relid = RangeVarGetRelidExtended(vrel->relation,
+                                                                                AccessShareLock,
+                                                                                rvr_opts,
+                                                                                NULL, NULL);
 
                /*
-                * Make a returnable VacuumRelation for this rel.
+                * If the lock is unavailable, emit the same log statement that
+                * vacuum_rel() and analyze_rel() would.
                 */
-               oldcontext = MemoryContextSwitchTo(vac_context);
-               vacrels = lappend(vacrels, makeVacuumRelation(vrel->relation,
-                                                                                                         relid,
-                                                                                                         vrel->va_cols));
-               MemoryContextSwitchTo(oldcontext);
+               if (!OidIsValid(relid))
+               {
+                       if (options & VACOPT_VACUUM)
+                               ereport(WARNING,
+                                               (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+                                                errmsg("skipping vacuum of \"%s\" --- lock not available",
+                                                               vrel->relation->relname)));
+                       else
+                               ereport(WARNING,
+                                               (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+                                                errmsg("skipping analyze of \"%s\" --- lock not available",
+                                                               vrel->relation->relname)));
+                       return vacrels;
+               }
 
                /*
-                * To check whether the relation is a partitioned table, fetch its
-                * syscache entry.
+                * To check whether the relation is a partitioned table and its
+                * ownership, fetch its syscache entry.
                 */
                tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
                if (!HeapTupleIsValid(tuple))
                        elog(ERROR, "cache lookup failed for relation %u", relid);
                classForm = (Form_pg_class) GETSTRUCT(tuple);
+
+               /*
+                * Make a returnable VacuumRelation for this rel if user is a proper
+                * owner.
+                */
+               if (vacuum_is_relation_owner(relid, classForm, options))
+               {
+                       oldcontext = MemoryContextSwitchTo(vac_context);
+                       vacrels = lappend(vacrels, makeVacuumRelation(vrel->relation,
+                                                                                                                 relid,
+                                                                                                                 vrel->va_cols));
+                       MemoryContextSwitchTo(oldcontext);
+               }
+
+
                include_parts = (classForm->relkind == RELKIND_PARTITIONED_TABLE);
                ReleaseSysCache(tuple);
 
@@ -475,7 +755,9 @@ expand_vacuum_rel(VacuumRelation *vrel)
                 * the list returned by find_all_inheritors() includes the passed-in
                 * OID, so we have to skip that.  There's no point in taking locks on
                 * the individual partitions yet, and doing so would just add
-                * unnecessary deadlock risk.
+                * unnecessary deadlock risk.  For this last reason we do not check
+                * yet the ownership of the partitions, which get added to the list to
+                * process.  Ownership will be checked later on anyway.
                 */
                if (include_parts)
                {
@@ -524,21 +806,26 @@ expand_vacuum_rel(VacuumRelation *vrel)
  * the current database.  The list is built in vac_context.
  */
 static List *
-get_all_vacuum_rels(void)
+get_all_vacuum_rels(int options)
 {
        List       *vacrels = NIL;
        Relation        pgclass;
-       HeapScanDesc scan;
+       TableScanDesc scan;
        HeapTuple       tuple;
 
-       pgclass = heap_open(RelationRelationId, AccessShareLock);
+       pgclass = table_open(RelationRelationId, AccessShareLock);
 
-       scan = heap_beginscan_catalog(pgclass, 0, NULL);
+       scan = table_beginscan_catalog(pgclass, 0, NULL);
 
        while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
        {
                Form_pg_class classForm = (Form_pg_class) GETSTRUCT(tuple);
                MemoryContext oldcontext;
+               Oid                     relid = classForm->oid;
+
+               /* check permissions of relation */
+               if (!vacuum_is_relation_owner(relid, classForm, options))
+                       continue;
 
                /*
                 * We include partitioned tables here; depending on which operation is
@@ -557,19 +844,19 @@ get_all_vacuum_rels(void)
                 */
                oldcontext = MemoryContextSwitchTo(vac_context);
                vacrels = lappend(vacrels, makeVacuumRelation(NULL,
-                                                                                                         HeapTupleGetOid(tuple),
+                                                                                                         relid,
                                                                                                          NIL));
                MemoryContextSwitchTo(oldcontext);
        }
 
-       heap_endscan(scan);
-       heap_close(pgclass, AccessShareLock);
+       table_endscan(scan);
+       table_close(pgclass, AccessShareLock);
 
        return vacrels;
 }
 
 /*
- * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
+ * vacuum_set_xid_limits() -- compute oldestXmin and freeze cutoff points
  *
  * The output parameters are:
  * - oldestXmin is the cutoff value used to distinguish whether tuples are
@@ -771,6 +1058,9 @@ vacuum_set_xid_limits(Relation rel,
  *             subset of the table.  When we have only partial information, we take
  *             the old value of pg_class.reltuples as a measurement of the
  *             tuple density in the unscanned pages.
+ *
+ *             Note: scanned_tuples should count only *live* tuples, since
+ *             pg_class.reltuples is defined that way.
  */
 double
 vac_estimate_reltuples(Relation relation,
@@ -852,6 +1142,9 @@ vac_estimate_reltuples(Relation relation,
  *             transaction.  This is OK since postponing the flag maintenance is
  *             always allowable.
  *
+ *             Note: num_tuples should count only *live* tuples, since
+ *             pg_class.reltuples is defined that way.
+ *
  *             This routine is shared by VACUUM and ANALYZE.
  */
 void
@@ -868,7 +1161,7 @@ vac_update_relstats(Relation relation,
        Form_pg_class pgcform;
        bool            dirty;
 
-       rd = heap_open(RelationRelationId, RowExclusiveLock);
+       rd = table_open(RelationRelationId, RowExclusiveLock);
 
        /* Fetch a copy of the tuple to scribble on */
        ctup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relid));
@@ -909,16 +1202,6 @@ vac_update_relstats(Relation relation,
                        dirty = true;
                }
 
-               /*
-                * If we have discovered that there are no indexes, then there's no
-                * primary key either.  This could be done more thoroughly...
-                */
-               if (pgcform->relhaspkey && !hasindex)
-               {
-                       pgcform->relhaspkey = false;
-                       dirty = true;
-               }
-
                /* We also clear relhasrules and relhastriggers if needed */
                if (pgcform->relhasrules && relation->rd_rules == NULL)
                {
@@ -969,7 +1252,7 @@ vac_update_relstats(Relation relation,
        if (dirty)
                heap_inplace_update(rd, ctup);
 
-       heap_close(rd, RowExclusiveLock);
+       table_close(rd, RowExclusiveLock);
 }
 
 
@@ -1032,7 +1315,7 @@ vac_update_datfrozenxid(void)
         * We must seqscan pg_class to find the minimum Xid, because there is no
         * index that can help us here.
         */
-       relation = heap_open(RelationRelationId, AccessShareLock);
+       relation = table_open(RelationRelationId, AccessShareLock);
 
        scan = systable_beginscan(relation, InvalidOid, false,
                                                          NULL, 0, NULL);
@@ -1043,41 +1326,66 @@ vac_update_datfrozenxid(void)
 
                /*
                 * Only consider relations able to hold unfrozen XIDs (anything else
-                * should have InvalidTransactionId in relfrozenxid anyway.)
+                * should have InvalidTransactionId in relfrozenxid anyway).
                 */
                if (classForm->relkind != RELKIND_RELATION &&
                        classForm->relkind != RELKIND_MATVIEW &&
                        classForm->relkind != RELKIND_TOASTVALUE)
+               {
+                       Assert(!TransactionIdIsValid(classForm->relfrozenxid));
+                       Assert(!MultiXactIdIsValid(classForm->relminmxid));
                        continue;
-
-               Assert(TransactionIdIsNormal(classForm->relfrozenxid));
-               Assert(MultiXactIdIsValid(classForm->relminmxid));
+               }
 
                /*
+                * Some table AMs might not need per-relation xid / multixid horizons.
+                * It therefore seems reasonable to allow relfrozenxid and relminmxid
+                * to not be set (i.e. set to their respective Invalid*Id)
+                * independently. Thus validate and compute horizon for each only if
+                * set.
+                *
                 * If things are working properly, no relation should have a
                 * relfrozenxid or relminmxid that is "in the future".  However, such
                 * cases have been known to arise due to bugs in pg_upgrade.  If we
                 * see any entries that are "in the future", chicken out and don't do
-                * anything.  This ensures we won't truncate clog before those
-                * relations have been scanned and cleaned up.
+                * anything.  This ensures we won't truncate clog & multixact SLRUs
+                * before those relations have been scanned and cleaned up.
                 */
-               if (TransactionIdPrecedes(lastSaneFrozenXid, classForm->relfrozenxid) ||
-                       MultiXactIdPrecedes(lastSaneMinMulti, classForm->relminmxid))
+
+               if (TransactionIdIsValid(classForm->relfrozenxid))
                {
-                       bogus = true;
-                       break;
+                       Assert(TransactionIdIsNormal(classForm->relfrozenxid));
+
+                       /* check for values in the future */
+                       if (TransactionIdPrecedes(lastSaneFrozenXid, classForm->relfrozenxid))
+                       {
+                               bogus = true;
+                               break;
+                       }
+
+                       /* determine new horizon */
+                       if (TransactionIdPrecedes(classForm->relfrozenxid, newFrozenXid))
+                               newFrozenXid = classForm->relfrozenxid;
                }
 
-               if (TransactionIdPrecedes(classForm->relfrozenxid, newFrozenXid))
-                       newFrozenXid = classForm->relfrozenxid;
+               if (MultiXactIdIsValid(classForm->relminmxid))
+               {
+                       /* check for values in the future */
+                       if (MultiXactIdPrecedes(lastSaneMinMulti, classForm->relminmxid))
+                       {
+                               bogus = true;
+                               break;
+                       }
 
-               if (MultiXactIdPrecedes(classForm->relminmxid, newMinMulti))
-                       newMinMulti = classForm->relminmxid;
+                       /* determine new horizon */
+                       if (MultiXactIdPrecedes(classForm->relminmxid, newMinMulti))
+                               newMinMulti = classForm->relminmxid;
+               }
        }
 
        /* we're done with pg_class */
        systable_endscan(scan);
-       heap_close(relation, AccessShareLock);
+       table_close(relation, AccessShareLock);
 
        /* chicken out if bogus data found */
        if (bogus)
@@ -1087,7 +1395,7 @@ vac_update_datfrozenxid(void)
        Assert(MultiXactIdIsValid(newMinMulti));
 
        /* Now fetch the pg_database tuple we need to update. */
-       relation = heap_open(DatabaseRelationId, RowExclusiveLock);
+       relation = table_open(DatabaseRelationId, RowExclusiveLock);
 
        /* Fetch a copy of the tuple to scribble on */
        tuple = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(MyDatabaseId));
@@ -1125,7 +1433,7 @@ vac_update_datfrozenxid(void)
                heap_inplace_update(relation, tuple);
 
        heap_freetuple(tuple);
-       heap_close(relation, RowExclusiveLock);
+       table_close(relation, RowExclusiveLock);
 
        /*
         * If we were able to advance datfrozenxid or datminmxid, see if we can
@@ -1163,7 +1471,7 @@ vac_truncate_clog(TransactionId frozenXID,
 {
        TransactionId nextXID = ReadNewTransactionId();
        Relation        relation;
-       HeapScanDesc scan;
+       TableScanDesc scan;
        HeapTuple       tuple;
        Oid                     oldestxid_datoid;
        Oid                     minmulti_datoid;
@@ -1192,9 +1500,9 @@ vac_truncate_clog(TransactionId frozenXID,
         * worst possible outcome is that pg_xact is not truncated as aggressively
         * as it could be.
         */
-       relation = heap_open(DatabaseRelationId, AccessShareLock);
+       relation = table_open(DatabaseRelationId, AccessShareLock);
 
-       scan = heap_beginscan_catalog(relation, 0, NULL);
+       scan = table_beginscan_catalog(relation, 0, NULL);
 
        while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
        {
@@ -1223,19 +1531,19 @@ vac_truncate_clog(TransactionId frozenXID,
                else if (TransactionIdPrecedes(datfrozenxid, frozenXID))
                {
                        frozenXID = datfrozenxid;
-                       oldestxid_datoid = HeapTupleGetOid(tuple);
+                       oldestxid_datoid = dbform->oid;
                }
 
                if (MultiXactIdPrecedes(datminmxid, minMulti))
                {
                        minMulti = datminmxid;
-                       minmulti_datoid = HeapTupleGetOid(tuple);
+                       minmulti_datoid = dbform->oid;
                }
        }
 
-       heap_endscan(scan);
+       table_endscan(scan);
 
-       heap_close(relation, AccessShareLock);
+       table_close(relation, AccessShareLock);
 
        /*
         * Do not truncate CLOG if we seem to have suffered wraparound already;
@@ -1302,7 +1610,7 @@ vac_truncate_clog(TransactionId frozenXID,
  *             At entry and exit, we are not inside a transaction.
  */
 static bool
-vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
+vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params)
 {
        LOCKMODE        lmode;
        Relation        onerel;
@@ -1311,7 +1619,6 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
        Oid                     save_userid;
        int                     save_sec_context;
        int                     save_nestlevel;
-       bool            rel_lock = true;
 
        Assert(params != NULL);
 
@@ -1324,7 +1631,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
         */
        PushActiveSnapshot(GetTransactionSnapshot());
 
-       if (!(options & VACOPT_FULL))
+       if (!(params->options & VACOPT_FULL))
        {
                /*
                 * In lazy vacuum, we can set the PROC_IN_VACUUM flag, which lets
@@ -1364,100 +1671,33 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
         * vacuum, but just ShareUpdateExclusiveLock for concurrent vacuum. Either
         * way, we can be sure that no other backend is vacuuming the same table.
         */
-       lmode = (options & VACOPT_FULL) ? AccessExclusiveLock : ShareUpdateExclusiveLock;
+       lmode = (params->options & VACOPT_FULL) ?
+               AccessExclusiveLock : ShareUpdateExclusiveLock;
 
-       /*
-        * Open the relation and get the appropriate lock on it.
-        *
-        * There's a race condition here: the rel may have gone away since the
-        * last time we saw it.  If so, we don't need to vacuum it.
-        *
-        * If we've been asked not to wait for the relation lock, acquire it first
-        * in non-blocking mode, before calling try_relation_open().
-        */
-       if (!(options & VACOPT_NOWAIT))
-               onerel = try_relation_open(relid, lmode);
-       else if (ConditionalLockRelationOid(relid, lmode))
-               onerel = try_relation_open(relid, NoLock);
-       else
-       {
-               onerel = NULL;
-               rel_lock = false;
-       }
+       /* open the relation and get the appropriate lock on it */
+       onerel = vacuum_open_relation(relid, relation, params->options,
+                                                                 params->log_min_duration >= 0, lmode);
 
-       /*
-        * If we failed to open or lock the relation, emit a log message before
-        * exiting.
-        */
+       /* leave if relation could not be opened or locked */
        if (!onerel)
        {
-               int                     elevel = 0;
-
-               /*
-                * Determine the log level.
-                *
-                * If the RangeVar is not defined, we do not have enough information
-                * to provide a meaningful log statement.  Chances are that
-                * vacuum_rel's caller has intentionally not provided this information
-                * so that this logging is skipped, anyway.
-                *
-                * Otherwise, for autovacuum logs, we emit a LOG if
-                * log_autovacuum_min_duration is not disabled.  For manual VACUUM, we
-                * emit a WARNING to match the log statements in the permission
-                * checks.
-                */
-               if (relation != NULL)
-               {
-                       if (!IsAutoVacuumWorkerProcess())
-                               elevel = WARNING;
-                       else if (params->log_min_duration >= 0)
-                               elevel = LOG;
-               }
-
-               if (elevel != 0)
-               {
-                       if (!rel_lock)
-                               ereport(elevel,
-                                               (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
-                                                errmsg("skipping vacuum of \"%s\" --- lock not available",
-                                                               relation->relname)));
-                       else
-                               ereport(elevel,
-                                               (errcode(ERRCODE_UNDEFINED_TABLE),
-                                                errmsg("skipping vacuum of \"%s\" --- relation no longer exists",
-                                                               relation->relname)));
-               }
-
                PopActiveSnapshot();
                CommitTransactionCommand();
                return false;
        }
 
        /*
-        * Check permissions.
-        *
-        * We allow the user to vacuum a table if he is superuser, the table
-        * owner, or the database owner (but in the latter case, only if it's not
-        * a shared relation).  pg_class_ownercheck includes the superuser case.
-        *
-        * Note we choose to treat permissions failure as a WARNING and keep
-        * trying to vacuum the rest of the DB --- is this appropriate?
+        * Check if relation needs to be skipped based on ownership.  This check
+        * happens also when building the relation list to vacuum for a manual
+        * operation, and needs to be done additionally here as VACUUM could
+        * happen across multiple transactions where relation ownership could have
+        * changed in-between.  Make sure to only generate logs for VACUUM in this
+        * case.
         */
-       if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
-                 (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
+       if (!vacuum_is_relation_owner(RelationGetRelid(onerel),
+                                                                 onerel->rd_rel,
+                                                                 params->options & VACOPT_VACUUM))
        {
-               if (onerel->rd_rel->relisshared)
-                       ereport(WARNING,
-                                       (errmsg("skipping \"%s\" --- only superuser can vacuum it",
-                                                       RelationGetRelationName(onerel))));
-               else if (onerel->rd_rel->relnamespace == PG_CATALOG_NAMESPACE)
-                       ereport(WARNING,
-                                       (errmsg("skipping \"%s\" --- only superuser or database owner can vacuum it",
-                                                       RelationGetRelationName(onerel))));
-               else
-                       ereport(WARNING,
-                                       (errmsg("skipping \"%s\" --- only table or database owner can vacuum it",
-                                                       RelationGetRelationName(onerel))));
                relation_close(onerel, lmode);
                PopActiveSnapshot();
                CommitTransactionCommand();
@@ -1523,12 +1763,32 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
        onerelid = onerel->rd_lockInfo.lockRelId;
        LockRelationIdForSession(&onerelid, lmode);
 
+       /* Set index cleanup option based on reloptions if not yet */
+       if (params->index_cleanup == VACOPT_TERNARY_DEFAULT)
+       {
+               if (onerel->rd_options == NULL ||
+                       ((StdRdOptions *) onerel->rd_options)->vacuum_index_cleanup)
+                       params->index_cleanup = VACOPT_TERNARY_ENABLED;
+               else
+                       params->index_cleanup = VACOPT_TERNARY_DISABLED;
+       }
+
+       /* Set truncate option based on reloptions if not yet */
+       if (params->truncate == VACOPT_TERNARY_DEFAULT)
+       {
+               if (onerel->rd_options == NULL ||
+                       ((StdRdOptions *) onerel->rd_options)->vacuum_truncate)
+                       params->truncate = VACOPT_TERNARY_ENABLED;
+               else
+                       params->truncate = VACOPT_TERNARY_DISABLED;
+       }
+
        /*
         * Remember the relation's TOAST relation for later, if the caller asked
         * us to process it.  In VACUUM FULL, though, the toast table is
         * automatically rebuilt by cluster_rel so we shouldn't recurse to it.
         */
-       if (!(options & VACOPT_SKIPTOAST) && !(options & VACOPT_FULL))
+       if (!(params->options & VACOPT_SKIPTOAST) && !(params->options & VACOPT_FULL))
                toast_relid = onerel->rd_rel->reltoastrelid;
        else
                toast_relid = InvalidOid;
@@ -1547,18 +1807,22 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
        /*
         * Do the actual work --- either FULL or "lazy" vacuum
         */
-       if (options & VACOPT_FULL)
+       if (params->options & VACOPT_FULL)
        {
+               int                     cluster_options = 0;
+
                /* close relation before vacuuming, but hold lock until commit */
                relation_close(onerel, NoLock);
                onerel = NULL;
 
+               if ((params->options & VACOPT_VERBOSE) != 0)
+                       cluster_options |= CLUOPT_VERBOSE;
+
                /* VACUUM FULL is now a variant of CLUSTER; see cluster.c */
-               cluster_rel(relid, InvalidOid, false,
-                                       (options & VACOPT_VERBOSE) != 0);
+               cluster_rel(relid, InvalidOid, cluster_options);
        }
        else
-               lazy_vacuum_rel(onerel, options, params, vac_strategy);
+               table_relation_vacuum(onerel, params, vac_strategy);
 
        /* Roll back any GUC changes executed by index functions */
        AtEOXact_GUC(false, save_nestlevel);
@@ -1584,7 +1848,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
         * totally unimportant for toast relations.
         */
        if (toast_relid != InvalidOid)
-               vacuum_rel(toast_relid, NULL, options, params);
+               vacuum_rel(toast_relid, NULL, params);
 
        /*
         * Now release the session-level lock on the master table.
@@ -1601,7 +1865,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
  * specified kind of lock on each.  Return an array of Relation pointers for
  * the indexes into *Irel, and the number of indexes into *nindexes.
  *
- * We consider an index vacuumable if it is marked insertable (IndexIsReady).
+ * We consider an index vacuumable if it is marked insertable (indisready).
  * If it isn't, probably a CREATE INDEX CONCURRENTLY command failed early in
  * execution, and what we have is too corrupt to be processable.  We will
  * vacuum even if the index isn't indisvalid; this is important because in a
@@ -1636,7 +1900,7 @@ vac_open_indexes(Relation relation, LOCKMODE lockmode,
                Relation        indrel;
 
                indrel = index_open(indexoid, lockmode);
-               if (IndexIsReady(indrel->rd_index))
+               if (indrel->rd_index->indisready)
                        (*Irel)[i++] = indrel;
                else
                        index_close(indrel, lockmode);
@@ -1682,13 +1946,13 @@ vacuum_delay_point(void)
        if (VacuumCostActive && !InterruptPending &&
                VacuumCostBalance >= VacuumCostLimit)
        {
-               int                     msec;
+               double          msec;
 
                msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit;
                if (msec > VacuumCostDelay * 4)
                        msec = VacuumCostDelay * 4;
 
-               pg_usleep(msec * 1000L);
+               pg_usleep((long) (msec * 1000));
 
                VacuumCostBalance = 0;
 
@@ -1699,3 +1963,15 @@ vacuum_delay_point(void)
                CHECK_FOR_INTERRUPTS();
        }
 }
+
+/*
+ * A wrapper function of defGetBoolean().
+ *
+ * This function returns VACOPT_TERNARY_ENABLED and VACOPT_TERNARY_DISABLED
+ * instead of true and false.
+ */
+static VacOptTernaryValue
+get_vacopt_ternary_value(DefElem *def)
+{
+       return defGetBoolean(def) ? VACOPT_TERNARY_ENABLED : VACOPT_TERNARY_DISABLED;
+}