]> granicus.if.org Git - postgresql/blobdiff - src/backend/commands/indexcmds.c
Add FILLFACTOR to CREATE INDEX.
[postgresql] / src / backend / commands / indexcmds.c
index 3a1519a50072a70d87db695f25e62d60a931ccb9..7e35258ac09e17facb4b19a5294eac674b4d7f33 100644 (file)
  * indexcmds.c
  *       POSTGRES define and remove index code.
  *
- * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/commands/indexcmds.c,v 1.75 2002/06/20 20:29:27 momjian Exp $
+ *       $PostgreSQL: pgsql/src/backend/commands/indexcmds.c,v 1.142 2006/07/02 02:23:19 momjian Exp $
  *
  *-------------------------------------------------------------------------
  */
 
 #include "postgres.h"
 
+#include "access/genam.h"
 #include "access/heapam.h"
 #include "catalog/catalog.h"
-#include "catalog/catname.h"
+#include "catalog/dependency.h"
+#include "catalog/heap.h"
 #include "catalog/index.h"
-#include "catalog/namespace.h"
+#include "catalog/indexing.h"
 #include "catalog/pg_opclass.h"
-#include "catalog/pg_proc.h"
+#include "catalog/pg_tablespace.h"
+#include "commands/dbcommands.h"
 #include "commands/defrem.h"
+#include "commands/tablecmds.h"
+#include "commands/tablespace.h"
+#include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "optimizer/clauses.h"
-#include "optimizer/planmain.h"
-#include "optimizer/prep.h"
 #include "parser/parsetree.h"
 #include "parser/parse_coerce.h"
+#include "parser/parse_expr.h"
 #include "parser/parse_func.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
+#include "utils/fmgroids.h"
 #include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/relcache.h"
 #include "utils/syscache.h"
 
 
-#define IsFuncIndex(ATTR_LIST) (((IndexElem*)lfirst(ATTR_LIST))->funcname != NIL)
-
 /* non-export function prototypes */
-static void CheckPredicate(List *predList, List *rangeTable, Oid baseRelOid);
-static void FuncIndexArgs(IndexInfo *indexInfo, Oid *classOidP,
-                         IndexElem *funcIndex,
-                         Oid relId,
-                         char *accessMethodName, Oid accessMethodId);
-static void NormIndexAttrs(IndexInfo *indexInfo, Oid *classOidP,
-                          List *attList,
-                          Oid relId,
-                          char *accessMethodName, Oid accessMethodId);
-static Oid GetAttrOpClass(IndexElem *attribute, Oid attrType,
-                          char *accessMethodName, Oid accessMethodId);
-static Oid     GetDefaultOpClass(Oid attrType, Oid accessMethodId);
+static void CheckPredicate(Expr *predicate);
+static void ComputeIndexAttrs(IndexInfo *indexInfo, Oid *classOidP,
+                                 List *attList,
+                                 Oid relId,
+                                 char *accessMethodName, Oid accessMethodId,
+                                 bool isconstraint);
+static Oid GetIndexOpClass(List *opclass, Oid attrType,
+                               char *accessMethodName, Oid accessMethodId);
+static bool relationHasPrimaryKey(Relation rel);
+
 
 /*
  * DefineIndex
  *             Creates a new index.
  *
- * 'attributeList' is a list of IndexElem specifying either a functional
- *             index or a list of attributes to index on.
- * 'predicate' is the qual specified in the where clause.
- * 'rangetable' is needed to interpret the predicate.
+ * 'heapRelation': the relation the index will apply to.
+ * 'indexRelationName': the name for the new index, or NULL to indicate
+ *             that a nonconflicting default name should be picked.
+ * 'indexRelationId': normally InvalidOid, but during bootstrap can be
+ *             nonzero to specify a preselected OID for the index.
+ * 'accessMethodName': name of the AM to use.
+ * 'tableSpaceName': name of the tablespace to create the index in.
+ *             NULL specifies using the appropriate default.
+ * 'attributeList': a list of IndexElem specifying columns and expressions
+ *             to index on.
+ * 'predicate': the partial-index condition, or NULL if none.
+ * 'rangetable': needed to interpret the predicate.
+ * 'unique': make the index enforce uniqueness.
+ * 'primary': mark the index as a primary key in the catalogs.
+ * 'isconstraint': index is for a PRIMARY KEY or UNIQUE constraint,
+ *             so build a pg_constraint entry for it.
+ * 'is_alter_table': this is due to an ALTER rather than a CREATE operation.
+ * 'options': options passed by WITH.
+ * 'check_rights': check for CREATE rights in the namespace.  (This should
+ *             be true except when ALTER is deleting/recreating an index.)
+ * 'skip_build': make the catalog entries but leave the index file empty;
+ *             it will be filled later.
+ * 'quiet': suppress the NOTICE chatter ordinarily provided for constraints.
  */
 void
 DefineIndex(RangeVar *heapRelation,
                        char *indexRelationName,
+                       Oid indexRelationId,
                        char *accessMethodName,
+                       char *tableSpaceName,
                        List *attributeList,
+                       Expr *predicate,
+                       List *rangetable,
+                       List *options,
                        bool unique,
                        bool primary,
-                       Expr *predicate,
-                       List *rangetable)
+                       bool isconstraint,
+                       bool is_alter_table,
+                       bool check_rights,
+                       bool skip_build,
+                       bool quiet)
 {
        Oid                *classObjectId;
        Oid                     accessMethodId;
        Oid                     relationId;
        Oid                     namespaceId;
+       Oid                     tablespaceId;
        Relation        rel;
        HeapTuple       tuple;
        Form_pg_am      accessMethodForm;
        IndexInfo  *indexInfo;
        int                     numberOfAttributes;
-       List       *cnfPred = NIL;
 
        /*
         * count attributes in index
         */
-       numberOfAttributes = length(attributeList);
+       numberOfAttributes = list_length(attributeList);
        if (numberOfAttributes <= 0)
-               elog(ERROR, "DefineIndex: must specify at least one attribute");
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+                                errmsg("must specify at least one column")));
        if (numberOfAttributes > INDEX_MAX_KEYS)
-               elog(ERROR, "Cannot use more than %d attributes in an index",
-                        INDEX_MAX_KEYS);
+               ereport(ERROR,
+                               (errcode(ERRCODE_TOO_MANY_COLUMNS),
+                                errmsg("cannot use more than %d columns in an index",
+                                               INDEX_MAX_KEYS)));
 
        /*
         * Open heap relation, acquire a suitable lock on it, remember its OID
@@ -100,131 +135,278 @@ DefineIndex(RangeVar *heapRelation,
        /* Note: during bootstrap may see uncataloged relation */
        if (rel->rd_rel->relkind != RELKIND_RELATION &&
                rel->rd_rel->relkind != RELKIND_UNCATALOGED)
-               elog(ERROR, "DefineIndex: relation \"%s\" is not a table",
-                        heapRelation->relname);
+               ereport(ERROR,
+                               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                errmsg("\"%s\" is not a table",
+                                               heapRelation->relname)));
 
        relationId = RelationGetRelid(rel);
        namespaceId = RelationGetNamespace(rel);
 
-       if (!IsBootstrapProcessingMode() &&
-               IsSystemRelation(rel) &&
-               !IndexesAreActive(relationId, false))
-               elog(ERROR, "Existing indexes are inactive. REINDEX first");
-
-       heap_close(rel, NoLock);
-
        /*
         * Verify we (still) have CREATE rights in the rel's namespace.
         * (Presumably we did when the rel was created, but maybe not anymore.)
-        * Skip check if bootstrapping, since permissions machinery may not
-        * be working yet; also, always allow if it's a temp table.
+        * Skip check if caller doesn't want it.  Also skip check if
+        * bootstrapping, since permissions machinery may not be working yet.
         */
-       if (!IsBootstrapProcessingMode() && !isTempNamespace(namespaceId))
+       if (check_rights && !IsBootstrapProcessingMode())
        {
                AclResult       aclresult;
 
                aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
                                                                                  ACL_CREATE);
                if (aclresult != ACLCHECK_OK)
-                       aclcheck_error(aclresult, get_namespace_name(namespaceId));
+                       aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
+                                                  get_namespace_name(namespaceId));
+       }
+
+       /*
+        * Select tablespace to use.  If not specified, use default_tablespace
+        * (which may in turn default to database's default).
+        */
+       if (tableSpaceName)
+       {
+               tablespaceId = get_tablespace_oid(tableSpaceName);
+               if (!OidIsValid(tablespaceId))
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_UNDEFINED_OBJECT),
+                                        errmsg("tablespace \"%s\" does not exist",
+                                                       tableSpaceName)));
+       }
+       else
+       {
+               tablespaceId = GetDefaultTablespace();
+               /* note InvalidOid is OK in this case */
+       }
+
+       /* Check permissions except when using database's default */
+       if (OidIsValid(tablespaceId))
+       {
+               AclResult       aclresult;
+
+               aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(),
+                                                                                  ACL_CREATE);
+               if (aclresult != ACLCHECK_OK)
+                       aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
+                                                  get_tablespace_name(tablespaceId));
+       }
+
+       /*
+        * Force shared indexes into the pg_global tablespace.  This is a bit of a
+        * hack but seems simpler than marking them in the BKI commands.
+        */
+       if (rel->rd_rel->relisshared)
+               tablespaceId = GLOBALTABLESPACE_OID;
+
+       /*
+        * Select name for index if caller didn't specify
+        */
+       if (indexRelationName == NULL)
+       {
+               if (primary)
+                       indexRelationName = ChooseRelationName(RelationGetRelationName(rel),
+                                                                                                  NULL,
+                                                                                                  "pkey",
+                                                                                                  namespaceId);
+               else
+               {
+                       IndexElem  *iparam = (IndexElem *) linitial(attributeList);
+
+                       indexRelationName = ChooseRelationName(RelationGetRelationName(rel),
+                                                                                                  iparam->name,
+                                                                                                  "key",
+                                                                                                  namespaceId);
+               }
        }
 
        /*
-        * look up the access method, verify it can handle the requested
-        * features
+        * look up the access method, verify it can handle the requested features
         */
        tuple = SearchSysCache(AMNAME,
                                                   PointerGetDatum(accessMethodName),
                                                   0, 0, 0);
        if (!HeapTupleIsValid(tuple))
-               elog(ERROR, "DefineIndex: access method \"%s\" not found",
-                        accessMethodName);
-       accessMethodId = tuple->t_data->t_oid;
+       {
+               /*
+                * Hack to provide more-or-less-transparent updating of old RTREE
+                * indexes to GIST: if RTREE is requested and not found, use GIST.
+                */
+               if (strcmp(accessMethodName, "rtree") == 0)
+               {
+                       ereport(NOTICE,
+                                       (errmsg("substituting access method \"gist\" for obsolete method \"rtree\"")));
+                       accessMethodName = "gist";
+                       tuple = SearchSysCache(AMNAME,
+                                                                  PointerGetDatum(accessMethodName),
+                                                                  0, 0, 0);
+               }
+
+               if (!HeapTupleIsValid(tuple))
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_UNDEFINED_OBJECT),
+                                        errmsg("access method \"%s\" does not exist",
+                                                       accessMethodName)));
+       }
+       accessMethodId = HeapTupleGetOid(tuple);
        accessMethodForm = (Form_pg_am) GETSTRUCT(tuple);
 
        if (unique && !accessMethodForm->amcanunique)
-               elog(ERROR, "DefineIndex: access method \"%s\" does not support UNIQUE indexes",
-                        accessMethodName);
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                          errmsg("access method \"%s\" does not support unique indexes",
+                                         accessMethodName)));
        if (numberOfAttributes > 1 && !accessMethodForm->amcanmulticol)
-               elog(ERROR, "DefineIndex: access method \"%s\" does not support multi-column indexes",
-                        accessMethodName);
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                 errmsg("access method \"%s\" does not support multicolumn indexes",
+                                accessMethodName)));
 
        ReleaseSysCache(tuple);
 
        /*
-        * Convert the partial-index predicate from parsetree form to an
-        * implicit-AND qual expression, for easier evaluation at runtime.
-        * While we are at it, we reduce it to a canonical (CNF or DNF) form
-        * to simplify the task of proving implications.
+        * If a range table was created then check that only the base rel is
+        * mentioned.
         */
-       if (predicate != NULL && rangetable != NIL)
+       if (rangetable != NIL)
        {
-               cnfPred = canonicalize_qual((Expr *) copyObject(predicate), true);
-               fix_opids((Node *) cnfPred);
-               CheckPredicate(cnfPred, rangetable, relationId);
+               if (list_length(rangetable) != 1 || getrelid(1, rangetable) != relationId)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+                                        errmsg("index expressions and predicates may refer only to the table being indexed")));
        }
 
        /*
-        * Prepare arguments for index_create, primarily an IndexInfo
-        * structure
+        * Validate predicate, if given
         */
-       indexInfo = makeNode(IndexInfo);
-       indexInfo->ii_Predicate = cnfPred;
-       indexInfo->ii_FuncOid = InvalidOid;
-       indexInfo->ii_Unique = unique;
+       if (predicate)
+               CheckPredicate(predicate);
 
-       if (IsFuncIndex(attributeList))
+       /*
+        * Extra checks when creating a PRIMARY KEY index.
+        */
+       if (primary)
        {
-               IndexElem  *funcIndex = (IndexElem *) lfirst(attributeList);
-               int                     nargs;
+               List       *cmds;
+               ListCell   *keys;
+
+               /*
+                * If ALTER TABLE, check that there isn't already a PRIMARY KEY. In
+                * CREATE TABLE, we have faith that the parser rejected multiple pkey
+                * clauses; and CREATE INDEX doesn't have a way to say PRIMARY KEY, so
+                * it's no problem either.
+                */
+               if (is_alter_table &&
+                       relationHasPrimaryKey(rel))
+               {
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+                        errmsg("multiple primary keys for table \"%s\" are not allowed",
+                                       RelationGetRelationName(rel))));
+               }
 
-               /* Parser should have given us only one list item, but check */
-               if (numberOfAttributes != 1)
-                       elog(ERROR, "Functional index can only have one attribute");
+               /*
+                * Check that all of the attributes in a primary key are marked as not
+                * null, otherwise attempt to ALTER TABLE .. SET NOT NULL
+                */
+               cmds = NIL;
+               foreach(keys, attributeList)
+               {
+                       IndexElem  *key = (IndexElem *) lfirst(keys);
+                       HeapTuple       atttuple;
 
-               nargs = length(funcIndex->args);
-               if (nargs > INDEX_MAX_KEYS)
-                       elog(ERROR, "Index function can take at most %d arguments",
-                                INDEX_MAX_KEYS);
+                       if (!key->name)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                                errmsg("primary keys cannot be expressions")));
 
-               indexInfo->ii_NumIndexAttrs = 1;
-               indexInfo->ii_NumKeyAttrs = nargs;
+                       /* System attributes are never null, so no problem */
+                       if (SystemAttributeByName(key->name, rel->rd_rel->relhasoids))
+                               continue;
 
-               classObjectId = (Oid *) palloc(sizeof(Oid));
+                       atttuple = SearchSysCacheAttName(relationId, key->name);
+                       if (HeapTupleIsValid(atttuple))
+                       {
+                               if (!((Form_pg_attribute) GETSTRUCT(atttuple))->attnotnull)
+                               {
+                                       /* Add a subcommand to make this one NOT NULL */
+                                       AlterTableCmd *cmd = makeNode(AlterTableCmd);
 
-               FuncIndexArgs(indexInfo, classObjectId, funcIndex,
-                                         relationId, accessMethodName, accessMethodId);
-       }
-       else
-       {
-               indexInfo->ii_NumIndexAttrs = numberOfAttributes;
-               indexInfo->ii_NumKeyAttrs = numberOfAttributes;
+                                       cmd->subtype = AT_SetNotNull;
+                                       cmd->name = key->name;
 
-               classObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
+                                       cmds = lappend(cmds, cmd);
+                               }
+                               ReleaseSysCache(atttuple);
+                       }
+                       else
+                       {
+                               /*
+                                * This shouldn't happen during CREATE TABLE, but can happen
+                                * during ALTER TABLE.  Keep message in sync with
+                                * transformIndexConstraints() in parser/analyze.c.
+                                */
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_UNDEFINED_COLUMN),
+                                                errmsg("column \"%s\" named in key does not exist",
+                                                               key->name)));
+                       }
+               }
 
-               NormIndexAttrs(indexInfo, classObjectId, attributeList,
-                                          relationId, accessMethodName, accessMethodId);
+               /*
+                * XXX: Shouldn't the ALTER TABLE .. SET NOT NULL cascade to child
+                * tables?      Currently, since the PRIMARY KEY itself doesn't cascade,
+                * we don't cascade the notnull constraint(s) either; but this is
+                * pretty debatable.
+                *
+                * XXX: possible future improvement: when being called from ALTER
+                * TABLE, it would be more efficient to merge this with the outer
+                * ALTER TABLE, so as to avoid two scans.  But that seems to
+                * complicate DefineIndex's API unduly.
+                */
+               if (cmds)
+                       AlterTableInternal(relationId, cmds, false);
        }
 
-       index_create(relationId, indexRelationName,
-                                indexInfo, accessMethodId, classObjectId,
-                                primary, allowSystemTableMods);
+       /*
+        * Prepare arguments for index_create, primarily an IndexInfo structure.
+        * Note that ii_Predicate must be in implicit-AND format.
+        */
+       indexInfo = makeNode(IndexInfo);
+       indexInfo->ii_NumIndexAttrs = numberOfAttributes;
+       indexInfo->ii_Expressions = NIL;        /* for now */
+       indexInfo->ii_ExpressionsState = NIL;
+       indexInfo->ii_Predicate = make_ands_implicit(predicate);
+       indexInfo->ii_PredicateState = NIL;
+       indexInfo->ii_Unique = unique;
+
+       classObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
+       ComputeIndexAttrs(indexInfo, classObjectId, attributeList,
+                                         relationId, accessMethodName, accessMethodId,
+                                         isconstraint);
+
+       heap_close(rel, NoLock);
 
        /*
-        * We update the relation's pg_class tuple even if it already has
-        * relhasindex = true.  This is needed to cause a shared-cache-inval
-        * message to be sent for the pg_class tuple, which will cause other
-        * backends to flush their relcache entries and in particular their
-        * cached lists of the indexes for this relation.
+        * Report index creation if appropriate (delay this till after most of the
+        * error checks)
         */
-       setRelhasindex(relationId, true, primary, InvalidOid);
+       if (isconstraint && !quiet)
+               ereport(NOTICE,
+                 (errmsg("%s %s will create implicit index \"%s\" for table \"%s\"",
+                                 is_alter_table ? "ALTER TABLE / ADD" : "CREATE TABLE /",
+                                 primary ? "PRIMARY KEY" : "UNIQUE",
+                                 indexRelationName, RelationGetRelationName(rel))));
+
+       index_create(relationId, indexRelationName, indexRelationId,
+                                indexInfo, accessMethodId, tablespaceId, classObjectId,
+                                options, primary, false, isconstraint,
+                                allowSystemTableMods, skip_build);
 }
 
 
 /*
  * CheckPredicate
- *             Checks that the given list of partial-index predicates refer
- *             (via the given range table) only to the given base relation oid.
+ *             Checks that the given partial-index predicate is valid.
  *
  * This used to also constrain the form of the predicate to forms that
  * indxpath.c could do something with. However, that seems overly
@@ -233,144 +415,42 @@ DefineIndex(RangeVar *heapRelation,
  * any evaluatable predicate will work.  So accept any predicate here
  * (except ones requiring a plan), and let indxpath.c fend for itself.
  */
-
 static void
-CheckPredicate(List *predList, List *rangeTable, Oid baseRelOid)
+CheckPredicate(Expr *predicate)
 {
-       if (length(rangeTable) != 1 || getrelid(1, rangeTable) != baseRelOid)
-               elog(ERROR,
-                "Partial-index predicates may refer only to the base relation");
-
        /*
         * We don't currently support generation of an actual query plan for a
-        * predicate, only simple scalar expressions; hence these
-        * restrictions.
-        */
-       if (contain_subplans((Node *) predList))
-               elog(ERROR, "Cannot use subselect in index predicate");
-       if (contain_agg_clause((Node *) predList))
-               elog(ERROR, "Cannot use aggregate in index predicate");
-
-       /*
-        * A predicate using mutable functions is probably wrong, for the
-        * same reasons that we don't allow a functional index to use one.
+        * predicate, only simple scalar expressions; hence these restrictions.
         */
-       if (contain_mutable_functions((Node *) predList))
-               elog(ERROR, "Functions in index predicate must be marked isImmutable");
-}
-
-
-static void
-FuncIndexArgs(IndexInfo *indexInfo,
-                         Oid *classOidP,
-                         IndexElem *funcIndex,
-                         Oid relId,
-                         char *accessMethodName,
-                         Oid accessMethodId)
-{
-       Oid                     argTypes[FUNC_MAX_ARGS];
-       List       *arglist;
-       int                     nargs = 0;
-       int                     i;
-       FuncDetailCode fdresult;
-       Oid                     funcid;
-       Oid                     rettype;
-       bool            retset;
-       Oid                *true_typeids;
+       if (contain_subplans((Node *) predicate))
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("cannot use subquery in index predicate")));
+       if (contain_agg_clause((Node *) predicate))
+               ereport(ERROR,
+                               (errcode(ERRCODE_GROUPING_ERROR),
+                                errmsg("cannot use aggregate in index predicate")));
 
        /*
-        * process the function arguments, which are a list of T_String
-        * (someday ought to allow more general expressions?)
-        *
-        * Note caller already checked that list is not too long.
+        * A predicate using mutable functions is probably wrong, for the same
+        * reasons that we don't allow an index expression to use one.
         */
-       MemSet(argTypes, 0, sizeof(argTypes));
-
-       foreach(arglist, funcIndex->args)
-       {
-               char       *arg = strVal(lfirst(arglist));
-               HeapTuple       tuple;
-               Form_pg_attribute att;
-
-               tuple = SearchSysCache(ATTNAME,
-                                                          ObjectIdGetDatum(relId),
-                                                          PointerGetDatum(arg),
-                                                          0, 0);
-               if (!HeapTupleIsValid(tuple))
-                       elog(ERROR, "DefineIndex: attribute \"%s\" not found", arg);
-               att = (Form_pg_attribute) GETSTRUCT(tuple);
-               indexInfo->ii_KeyAttrNumbers[nargs] = att->attnum;
-               argTypes[nargs] = att->atttypid;
-               ReleaseSysCache(tuple);
-               nargs++;
-       }
-
-       /*
-        * Lookup the function procedure to get its OID and result type.
-        *
-        * We rely on parse_func.c to find the correct function in the possible
-        * presence of binary-compatible types.  However, parse_func may do
-        * too much: it will accept a function that requires run-time coercion
-        * of input types, and the executor is not currently set up to support
-        * that.  So, check to make sure that the selected function has
-        * exact-match or binary-compatible input types.
-        */
-       fdresult = func_get_detail(funcIndex->funcname, funcIndex->args,
-                                                          nargs, argTypes,
-                                                          &funcid, &rettype, &retset,
-                                                          &true_typeids);
-       if (fdresult != FUNCDETAIL_NORMAL)
-       {
-               if (fdresult == FUNCDETAIL_AGGREGATE)
-                       elog(ERROR, "DefineIndex: functional index may not use an aggregate function");
-               else if (fdresult == FUNCDETAIL_COERCION)
-                       elog(ERROR, "DefineIndex: functional index must use a real function, not a type coercion"
-                                "\n\tTry specifying the index opclass you want to use, instead");
-               else
-                       func_error("DefineIndex", funcIndex->funcname, nargs, argTypes,
-                                          NULL);
-       }
-
-       if (retset)
-               elog(ERROR, "DefineIndex: cannot index on a function returning a set");
-
-       for (i = 0; i < nargs; i++)
-       {
-               if (!IsBinaryCompatible(argTypes[i], true_typeids[i]))
-                       func_error("DefineIndex", funcIndex->funcname, nargs, argTypes,
-                                          "Index function must be binary-compatible with table datatype");
-       }
-
-       /*
-        * Require that the function be marked immutable.  Using a mutable
-        * function for a functional index is highly questionable, since if
-        * you aren't going to get the same result for the same data every
-        * time, it's not clear what the index entries mean at all.
-        */
-       if (func_volatile(funcid) != PROVOLATILE_IMMUTABLE)
-               elog(ERROR, "DefineIndex: index function must be marked isImmutable");
-
-       /* Process opclass, using func return type as default type */
-
-       classOidP[0] = GetAttrOpClass(funcIndex, rettype,
-                                                                 accessMethodName, accessMethodId);
-
-       /* OK, return results */
-
-       indexInfo->ii_FuncOid = funcid;
-       /* Need to do the fmgr function lookup now, too */
-       fmgr_info(funcid, &indexInfo->ii_FuncInfo);
+       if (contain_mutable_functions((Node *) predicate))
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+                  errmsg("functions in index predicate must be marked IMMUTABLE")));
 }
 
 static void
-NormIndexAttrs(IndexInfo *indexInfo,
-                          Oid *classOidP,
-                          List *attList,       /* list of IndexElem's */
-                          Oid relId,
-                          char *accessMethodName,
-                          Oid accessMethodId)
+ComputeIndexAttrs(IndexInfo *indexInfo,
+                                 Oid *classOidP,
+                                 List *attList,        /* list of IndexElem's */
+                                 Oid relId,
+                                 char *accessMethodName,
+                                 Oid accessMethodId,
+                                 bool isconstraint)
 {
-       List       *rest;
+       ListCell   *rest;
        int                     attn = 0;
 
        /*
@@ -379,51 +459,136 @@ NormIndexAttrs(IndexInfo *indexInfo,
        foreach(rest, attList)
        {
                IndexElem  *attribute = (IndexElem *) lfirst(rest);
-               HeapTuple       atttuple;
-               Form_pg_attribute attform;
+               Oid                     atttype;
+
+               if (attribute->name != NULL)
+               {
+                       /* Simple index attribute */
+                       HeapTuple       atttuple;
+                       Form_pg_attribute attform;
 
-               if (attribute->name == NULL)
-                       elog(ERROR, "missing attribute for define index");
+                       Assert(attribute->expr == NULL);
+                       atttuple = SearchSysCacheAttName(relId, attribute->name);
+                       if (!HeapTupleIsValid(atttuple))
+                       {
+                               /* difference in error message spellings is historical */
+                               if (isconstraint)
+                                       ereport(ERROR,
+                                                       (errcode(ERRCODE_UNDEFINED_COLUMN),
+                                                 errmsg("column \"%s\" named in key does not exist",
+                                                                attribute->name)));
+                               else
+                                       ereport(ERROR,
+                                                       (errcode(ERRCODE_UNDEFINED_COLUMN),
+                                                        errmsg("column \"%s\" does not exist",
+                                                                       attribute->name)));
+                       }
+                       attform = (Form_pg_attribute) GETSTRUCT(atttuple);
+                       indexInfo->ii_KeyAttrNumbers[attn] = attform->attnum;
+                       atttype = attform->atttypid;
+                       ReleaseSysCache(atttuple);
+               }
+               else if (attribute->expr && IsA(attribute->expr, Var))
+               {
+                       /* Tricky tricky, he wrote (column) ... treat as simple attr */
+                       Var                *var = (Var *) attribute->expr;
 
-               atttuple = SearchSysCache(ATTNAME,
-                                                                 ObjectIdGetDatum(relId),
-                                                                 PointerGetDatum(attribute->name),
-                                                                 0, 0);
-               if (!HeapTupleIsValid(atttuple))
-                       elog(ERROR, "DefineIndex: attribute \"%s\" not found",
-                                attribute->name);
-               attform = (Form_pg_attribute) GETSTRUCT(atttuple);
+                       indexInfo->ii_KeyAttrNumbers[attn] = var->varattno;
+                       atttype = get_atttype(relId, var->varattno);
+               }
+               else
+               {
+                       /* Index expression */
+                       Assert(attribute->expr != NULL);
+                       indexInfo->ii_KeyAttrNumbers[attn] = 0;         /* marks expression */
+                       indexInfo->ii_Expressions = lappend(indexInfo->ii_Expressions,
+                                                                                               attribute->expr);
+                       atttype = exprType(attribute->expr);
 
-               indexInfo->ii_KeyAttrNumbers[attn] = attform->attnum;
+                       /*
+                        * We don't currently support generation of an actual query plan
+                        * for an index expression, only simple scalar expressions; hence
+                        * these restrictions.
+                        */
+                       if (contain_subplans(attribute->expr))
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                                errmsg("cannot use subquery in index expression")));
+                       if (contain_agg_clause(attribute->expr))
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_GROUPING_ERROR),
+                               errmsg("cannot use aggregate function in index expression")));
 
-               classOidP[attn] = GetAttrOpClass(attribute, attform->atttypid,
-                                                                          accessMethodName, accessMethodId);
+                       /*
+                        * A expression using mutable functions is probably wrong, since
+                        * if you aren't going to get the same result for the same data
+                        * every time, it's not clear what the index entries mean at all.
+                        */
+                       if (contain_mutable_functions(attribute->expr))
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+                                                errmsg("functions in index expression must be marked IMMUTABLE")));
+               }
 
-               ReleaseSysCache(atttuple);
+               classOidP[attn] = GetIndexOpClass(attribute->opclass,
+                                                                                 atttype,
+                                                                                 accessMethodName,
+                                                                                 accessMethodId);
                attn++;
        }
 }
 
+/*
+ * Resolve possibly-defaulted operator class specification
+ */
 static Oid
-GetAttrOpClass(IndexElem *attribute, Oid attrType,
-                          char *accessMethodName, Oid accessMethodId)
+GetIndexOpClass(List *opclass, Oid attrType,
+                               char *accessMethodName, Oid accessMethodId)
 {
-       char       *catalogname;
-       char       *schemaname = NULL;
-       char       *opcname = NULL;
+       char       *schemaname;
+       char       *opcname;
        HeapTuple       tuple;
        Oid                     opClassId,
                                opInputType;
 
-       if (attribute->opclass == NIL)
+       /*
+        * Release 7.0 removed network_ops, timespan_ops, and datetime_ops, so we
+        * ignore those opclass names so the default *_ops is used.  This can be
+        * removed in some later release.  bjm 2000/02/07
+        *
+        * Release 7.1 removes lztext_ops, so suppress that too for a while.  tgl
+        * 2000/07/30
+        *
+        * Release 7.2 renames timestamp_ops to timestamptz_ops, so suppress that
+        * too for awhile.      I'm starting to think we need a better approach. tgl
+        * 2000/10/01
+        *
+        * Release 8.0 removes bigbox_ops (which was dead code for a long while
+        * anyway).  tgl 2003/11/11
+        */
+       if (list_length(opclass) == 1)
+       {
+               char       *claname = strVal(linitial(opclass));
+
+               if (strcmp(claname, "network_ops") == 0 ||
+                       strcmp(claname, "timespan_ops") == 0 ||
+                       strcmp(claname, "datetime_ops") == 0 ||
+                       strcmp(claname, "lztext_ops") == 0 ||
+                       strcmp(claname, "timestamp_ops") == 0 ||
+                       strcmp(claname, "bigbox_ops") == 0)
+                       opclass = NIL;
+       }
+
+       if (opclass == NIL)
        {
                /* no operator class specified, so find the default */
                opClassId = GetDefaultOpClass(attrType, accessMethodId);
                if (!OidIsValid(opClassId))
-                       elog(ERROR, "data type %s has no default operator class for access method \"%s\""
-                                "\n\tYou must specify an operator class for the index or define a"
-                                "\n\tdefault operator class for the data type",
-                                format_type_be(attrType), accessMethodName);
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_UNDEFINED_OBJECT),
+                                        errmsg("data type %s has no default operator class for access method \"%s\"",
+                                                       format_type_be(attrType), accessMethodName),
+                                        errhint("You must specify an operator class for the index or define a default operator class for the data type.")));
                return opClassId;
        }
 
@@ -432,42 +597,14 @@ GetAttrOpClass(IndexElem *attribute, Oid attrType,
         */
 
        /* deconstruct the name list */
-       switch (length(attribute->opclass))
-       {
-               case 1:
-                       opcname = strVal(lfirst(attribute->opclass));
-                       break;
-               case 2:
-                       schemaname = strVal(lfirst(attribute->opclass));
-                       opcname = strVal(lsecond(attribute->opclass));
-                       break;
-               case 3:
-                       catalogname = strVal(lfirst(attribute->opclass));
-                       schemaname = strVal(lsecond(attribute->opclass));
-                       opcname = strVal(lfirst(lnext(lnext(attribute->opclass))));
-                       /*
-                        * We check the catalog name and then ignore it.
-                        */
-                       if (strcmp(catalogname, DatabaseName) != 0)
-                               elog(ERROR, "Cross-database references are not implemented");
-                       break;
-               default:
-                       elog(ERROR, "Improper opclass name (too many dotted names): %s",
-                                NameListToString(attribute->opclass));
-                       break;
-       }
+       DeconstructQualifiedName(opclass, &schemaname, &opcname);
 
        if (schemaname)
        {
                /* Look in specific schema only */
-               Oid             namespaceId;
-
-               namespaceId = GetSysCacheOid(NAMESPACENAME,
-                                                                        CStringGetDatum(schemaname),
-                                                                        0, 0, 0);
-               if (!OidIsValid(namespaceId))
-                       elog(ERROR, "Namespace \"%s\" does not exist",
-                                schemaname);
+               Oid                     namespaceId;
+
+               namespaceId = LookupExplicitNamespace(schemaname);
                tuple = SearchSysCache(CLAAMNAMENSP,
                                                           ObjectIdGetDatum(accessMethodId),
                                                           PointerGetDatum(opcname),
@@ -479,41 +616,59 @@ GetAttrOpClass(IndexElem *attribute, Oid attrType,
                /* Unqualified opclass name, so search the search path */
                opClassId = OpclassnameGetOpcid(accessMethodId, opcname);
                if (!OidIsValid(opClassId))
-                       elog(ERROR, "DefineIndex: operator class \"%s\" not supported by access method \"%s\"",
-                                opcname, accessMethodName);
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_UNDEFINED_OBJECT),
+                                        errmsg("operator class \"%s\" does not exist for access method \"%s\"",
+                                                       opcname, accessMethodName)));
                tuple = SearchSysCache(CLAOID,
                                                           ObjectIdGetDatum(opClassId),
                                                           0, 0, 0);
        }
 
        if (!HeapTupleIsValid(tuple))
-               elog(ERROR, "DefineIndex: operator class \"%s\" not supported by access method \"%s\"",
-                        NameListToString(attribute->opclass), accessMethodName);
+               ereport(ERROR,
+                               (errcode(ERRCODE_UNDEFINED_OBJECT),
+                                errmsg("operator class \"%s\" does not exist for access method \"%s\"",
+                                               NameListToString(opclass), accessMethodName)));
 
        /*
-        * Verify that the index operator class accepts this
-        * datatype.  Note we will accept binary compatibility.
+        * Verify that the index operator class accepts this datatype.  Note we
+        * will accept binary compatibility.
         */
-       opClassId = tuple->t_data->t_oid;
+       opClassId = HeapTupleGetOid(tuple);
        opInputType = ((Form_pg_opclass) GETSTRUCT(tuple))->opcintype;
 
-       if (!IsBinaryCompatible(attrType, opInputType))
-               elog(ERROR, "operator class \"%s\" does not accept data type %s",
-                        NameListToString(attribute->opclass), format_type_be(attrType));
+       if (!IsBinaryCoercible(attrType, opInputType))
+               ereport(ERROR,
+                               (errcode(ERRCODE_DATATYPE_MISMATCH),
+                                errmsg("operator class \"%s\" does not accept data type %s",
+                                         NameListToString(opclass), format_type_be(attrType))));
 
        ReleaseSysCache(tuple);
 
        return opClassId;
 }
 
-static Oid
-GetDefaultOpClass(Oid attrType, Oid accessMethodId)
+/*
+ * GetDefaultOpClass
+ *
+ * Given the OIDs of a datatype and an access method, find the default
+ * operator class, if any.     Returns InvalidOid if there is none.
+ */
+Oid
+GetDefaultOpClass(Oid type_id, Oid am_id)
 {
-       OpclassCandidateList opclass;
        int                     nexact = 0;
        int                     ncompatible = 0;
        Oid                     exactOid = InvalidOid;
        Oid                     compatibleOid = InvalidOid;
+       Relation        rel;
+       ScanKeyData skey[1];
+       SysScanDesc scan;
+       HeapTuple       tup;
+
+       /* If it's a domain, look at the base type instead */
+       type_id = getBaseType(type_id);
 
        /*
         * We scan through all the opclasses available for the access method,
@@ -521,36 +676,49 @@ GetDefaultOpClass(Oid attrType, Oid accessMethodId)
         * (either exactly or binary-compatibly, but prefer an exact match).
         *
         * We could find more than one binary-compatible match, in which case we
-        * require the user to specify which one he wants.      If we find more
-        * than one exact match, then someone put bogus entries in pg_opclass.
-        *
-        * The initial search is done by namespace.c so that we only consider
-        * opclasses visible in the current namespace search path.
+        * require the user to specify which one he wants.      If we find more than
+        * one exact match, then someone put bogus entries in pg_opclass.
         */
-       for (opclass = OpclassGetCandidates(accessMethodId);
-                opclass != NULL;
-                opclass = opclass->next)
+       rel = heap_open(OperatorClassRelationId, AccessShareLock);
+
+       ScanKeyInit(&skey[0],
+                               Anum_pg_opclass_opcamid,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(am_id));
+
+       scan = systable_beginscan(rel, OpclassAmNameNspIndexId, true,
+                                                         SnapshotNow, 1, skey);
+
+       while (HeapTupleIsValid(tup = systable_getnext(scan)))
        {
+               Form_pg_opclass opclass = (Form_pg_opclass) GETSTRUCT(tup);
+
                if (opclass->opcdefault)
                {
-                       if (opclass->opcintype == attrType)
+                       if (opclass->opcintype == type_id)
                        {
                                nexact++;
-                               exactOid = opclass->oid;
+                               exactOid = HeapTupleGetOid(tup);
                        }
-                       else if (IsBinaryCompatible(opclass->opcintype, attrType))
+                       else if (IsBinaryCoercible(type_id, opclass->opcintype))
                        {
                                ncompatible++;
-                               compatibleOid = opclass->oid;
+                               compatibleOid = HeapTupleGetOid(tup);
                        }
                }
        }
 
+       systable_endscan(scan);
+
+       heap_close(rel, AccessShareLock);
+
        if (nexact == 1)
                return exactOid;
        if (nexact != 0)
-               elog(ERROR, "pg_opclass contains multiple default opclasses for data type %s",
-                        format_type_be(attrType));
+               ereport(ERROR,
+                               (errcode(ERRCODE_DUPLICATE_OBJECT),
+               errmsg("there are multiple default operator classes for data type %s",
+                          format_type_be(type_id))));
        if (ncompatible == 1)
                return compatibleOid;
 
@@ -558,221 +726,393 @@ GetDefaultOpClass(Oid attrType, Oid accessMethodId)
 }
 
 /*
- * RemoveIndex
- *             Deletes an index.
+ *     makeObjectName()
+ *
+ *     Create a name for an implicitly created index, sequence, constraint, etc.
  *
- * Exceptions:
- *             BadArg if name is invalid.
- *             "ERROR" if index nonexistent.
- *             ...
+ *     The parameters are typically: the original table name, the original field
+ *     name, and a "type" string (such as "seq" or "pkey").    The field name
+ *     and/or type can be NULL if not relevant.
+ *
+ *     The result is a palloc'd string.
+ *
+ *     The basic result we want is "name1_name2_label", omitting "_name2" or
+ *     "_label" when those parameters are NULL.  However, we must generate
+ *     a name with less than NAMEDATALEN characters!  So, we truncate one or
+ *     both names if necessary to make a short-enough string.  The label part
+ *     is never truncated (so it had better be reasonably short).
+ *
+ *     The caller is responsible for checking uniqueness of the generated
+ *     name and retrying as needed; retrying will be done by altering the
+ *     "label" string (which is why we never truncate that part).
  */
-void
-RemoveIndex(RangeVar *relation)
+char *
+makeObjectName(const char *name1, const char *name2, const char *label)
 {
-       Oid                     indOid;
-       HeapTuple       tuple;
+       char       *name;
+       int                     overhead = 0;   /* chars needed for label and underscores */
+       int                     availchars;             /* chars available for name(s) */
+       int                     name1chars;             /* chars allocated to name1 */
+       int                     name2chars;             /* chars allocated to name2 */
+       int                     ndx;
+
+       name1chars = strlen(name1);
+       if (name2)
+       {
+               name2chars = strlen(name2);
+               overhead++;                             /* allow for separating underscore */
+       }
+       else
+               name2chars = 0;
+       if (label)
+               overhead += strlen(label) + 1;
 
-       indOid = RangeVarGetRelid(relation, false);
-       tuple = SearchSysCache(RELOID,
-                                                  ObjectIdGetDatum(indOid),
-                                                  0, 0, 0);
-       if (!HeapTupleIsValid(tuple))
-               elog(ERROR, "index \"%s\" does not exist", relation->relname);
+       availchars = NAMEDATALEN - 1 - overhead;
+       Assert(availchars > 0);         /* else caller chose a bad label */
 
-       if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_INDEX)
-               elog(ERROR, "relation \"%s\" is of type \"%c\"",
-                        relation->relname, ((Form_pg_class) GETSTRUCT(tuple))->relkind);
+       /*
+        * If we must truncate,  preferentially truncate the longer name. This
+        * logic could be expressed without a loop, but it's simple and obvious as
+        * a loop.
+        */
+       while (name1chars + name2chars > availchars)
+       {
+               if (name1chars > name2chars)
+                       name1chars--;
+               else
+                       name2chars--;
+       }
 
-       ReleaseSysCache(tuple);
+       name1chars = pg_mbcliplen(name1, name1chars, name1chars);
+       if (name2)
+               name2chars = pg_mbcliplen(name2, name2chars, name2chars);
+
+       /* Now construct the string using the chosen lengths */
+       name = palloc(name1chars + name2chars + overhead + 1);
+       memcpy(name, name1, name1chars);
+       ndx = name1chars;
+       if (name2)
+       {
+               name[ndx++] = '_';
+               memcpy(name + ndx, name2, name2chars);
+               ndx += name2chars;
+       }
+       if (label)
+       {
+               name[ndx++] = '_';
+               strcpy(name + ndx, label);
+       }
+       else
+               name[ndx] = '\0';
 
-       index_drop(indOid);
+       return name;
 }
 
 /*
- * Reindex
- *             Recreate an index.
+ * Select a nonconflicting name for a new relation.  This is ordinarily
+ * used to choose index names (which is why it's here) but it can also
+ * be used for sequences, or any autogenerated relation kind.
  *
- * Exceptions:
- *             "ERROR" if index nonexistent.
- *             ...
+ * name1, name2, and label are used the same way as for makeObjectName(),
+ * except that the label can't be NULL; digits will be appended to the label
+ * if needed to create a name that is unique within the specified namespace.
+ *
+ * Note: it is theoretically possible to get a collision anyway, if someone
+ * else chooses the same name concurrently.  This is fairly unlikely to be
+ * a problem in practice, especially if one is holding an exclusive lock on
+ * the relation identified by name1.  However, if choosing multiple names
+ * within a single command, you'd better create the new object and do
+ * CommandCounterIncrement before choosing the next one!
+ *
+ * Returns a palloc'd string.
  */
-void
-ReindexIndex(RangeVar *indexRelation, bool force /* currently unused */ )
+char *
+ChooseRelationName(const char *name1, const char *name2,
+                                  const char *label, Oid namespace)
 {
-       Oid                     indOid;
-       HeapTuple       tuple;
-       bool            overwrite = false;
+       int                     pass = 0;
+       char       *relname = NULL;
+       char            modlabel[NAMEDATALEN];
+
+       /* try the unmodified label first */
+       StrNCpy(modlabel, label, sizeof(modlabel));
+
+       for (;;)
+       {
+               relname = makeObjectName(name1, name2, modlabel);
+
+               if (!OidIsValid(get_relname_relid(relname, namespace)))
+                       break;
+
+               /* found a conflict, so try a new name component */
+               pfree(relname);
+               snprintf(modlabel, sizeof(modlabel), "%s%d", label, ++pass);
+       }
+
+       return relname;
+}
+
+/*
+ * relationHasPrimaryKey -
+ *
+ *     See whether an existing relation has a primary key.
+ */
+static bool
+relationHasPrimaryKey(Relation rel)
+{
+       bool            result = false;
+       List       *indexoidlist;
+       ListCell   *indexoidscan;
 
        /*
-        * REINDEX within a transaction block is dangerous, because if the
-        * transaction is later rolled back we have no way to undo truncation
-        * of the index's physical file.  Disallow it.
+        * Get the list of index OIDs for the table from the relcache, and look up
+        * each one in the pg_index syscache until we find one marked primary key
+        * (hopefully there isn't more than one such).
         */
-       if (IsTransactionBlock())
-               elog(ERROR, "REINDEX cannot run inside a BEGIN/END block");
+       indexoidlist = RelationGetIndexList(rel);
+
+       foreach(indexoidscan, indexoidlist)
+       {
+               Oid                     indexoid = lfirst_oid(indexoidscan);
+               HeapTuple       indexTuple;
+
+               indexTuple = SearchSysCache(INDEXRELID,
+                                                                       ObjectIdGetDatum(indexoid),
+                                                                       0, 0, 0);
+               if (!HeapTupleIsValid(indexTuple))              /* should not happen */
+                       elog(ERROR, "cache lookup failed for index %u", indexoid);
+               result = ((Form_pg_index) GETSTRUCT(indexTuple))->indisprimary;
+               ReleaseSysCache(indexTuple);
+               if (result)
+                       break;
+       }
+
+       list_free(indexoidlist);
+
+       return result;
+}
+
+
+/*
+ * RemoveIndex
+ *             Deletes an index.
+ */
+void
+RemoveIndex(RangeVar *relation, DropBehavior behavior)
+{
+       Oid                     indOid;
+       char            relkind;
+       ObjectAddress object;
+
+       indOid = RangeVarGetRelid(relation, false);
+       relkind = get_rel_relkind(indOid);
+       if (relkind != RELKIND_INDEX)
+               ereport(ERROR,
+                               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                errmsg("\"%s\" is not an index",
+                                               relation->relname)));
+
+       object.classId = RelationRelationId;
+       object.objectId = indOid;
+       object.objectSubId = 0;
+
+       performDeletion(&object, behavior);
+}
+
+/*
+ * ReindexIndex
+ *             Recreate a specific index.
+ */
+void
+ReindexIndex(RangeVar *indexRelation)
+{
+       Oid                     indOid;
+       HeapTuple       tuple;
 
        indOid = RangeVarGetRelid(indexRelation, false);
        tuple = SearchSysCache(RELOID,
                                                   ObjectIdGetDatum(indOid),
                                                   0, 0, 0);
-       if (!HeapTupleIsValid(tuple))
-               elog(ERROR, "index \"%s\" does not exist", indexRelation->relname);
+       if (!HeapTupleIsValid(tuple))           /* shouldn't happen */
+               elog(ERROR, "cache lookup failed for relation %u", indOid);
 
        if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_INDEX)
-               elog(ERROR, "relation \"%s\" is of type \"%c\"",
-                        indexRelation->relname,
-                        ((Form_pg_class) GETSTRUCT(tuple))->relkind);
+               ereport(ERROR,
+                               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                errmsg("\"%s\" is not an index",
+                                               indexRelation->relname)));
 
-       if (IsSystemClass((Form_pg_class) GETSTRUCT(tuple)))
-       {
-               if (!allowSystemTableMods)
-                       elog(ERROR, "\"%s\" is a system index. call REINDEX under standalone postgres with -O -P options",
-                                indexRelation->relname);
-               if (!IsIgnoringSystemIndexes())
-                       elog(ERROR, "\"%s\" is a system index. call REINDEX under standalone postgres with -P -O options",
-                                indexRelation->relname);
-       }
+       /* Check permissions */
+       if (!pg_class_ownercheck(indOid, GetUserId()))
+               aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
+                                          indexRelation->relname);
 
        ReleaseSysCache(tuple);
 
-       if (IsIgnoringSystemIndexes())
-               overwrite = true;
-       if (!reindex_index(indOid, force, overwrite))
-               elog(WARNING, "index \"%s\" wasn't reindexed", indexRelation->relname);
+       reindex_index(indOid);
 }
 
 /*
  * ReindexTable
- *             Recreate indexes of a table.
- *
- * Exceptions:
- *             "ERROR" if table nonexistent.
- *             ...
+ *             Recreate all indexes of a table (and of its toast table, if any)
  */
 void
-ReindexTable(RangeVar *relation, bool force)
+ReindexTable(RangeVar *relation)
 {
        Oid                     heapOid;
        HeapTuple       tuple;
 
-       /*
-        * REINDEX within a transaction block is dangerous, because if the
-        * transaction is later rolled back we have no way to undo truncation
-        * of the index's physical file.  Disallow it.
-        */
-       if (IsTransactionBlock())
-               elog(ERROR, "REINDEX cannot run inside a BEGIN/END block");
-
        heapOid = RangeVarGetRelid(relation, false);
        tuple = SearchSysCache(RELOID,
                                                   ObjectIdGetDatum(heapOid),
                                                   0, 0, 0);
-       if (!HeapTupleIsValid(tuple))
-               elog(ERROR, "table \"%s\" does not exist", relation->relname);
-
-       if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_RELATION)
-               elog(ERROR, "relation \"%s\" is of type \"%c\"",
-                        relation->relname,
-                        ((Form_pg_class) GETSTRUCT(tuple))->relkind);
+       if (!HeapTupleIsValid(tuple))           /* shouldn't happen */
+               elog(ERROR, "cache lookup failed for relation %u", heapOid);
+
+       if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_RELATION &&
+               ((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_TOASTVALUE)
+               ereport(ERROR,
+                               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                errmsg("\"%s\" is not a table",
+                                               relation->relname)));
+
+       /* Check permissions */
+       if (!pg_class_ownercheck(heapOid, GetUserId()))
+               aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
+                                          relation->relname);
+
+       /* Can't reindex shared tables except in standalone mode */
+       if (((Form_pg_class) GETSTRUCT(tuple))->relisshared && IsUnderPostmaster)
+               ereport(ERROR,
+                               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                                errmsg("shared table \"%s\" can only be reindexed in stand-alone mode",
+                                               relation->relname)));
 
        ReleaseSysCache(tuple);
 
-       if (!reindex_relation(heapOid, force))
-               elog(WARNING, "table \"%s\" wasn't reindexed", relation->relname);
+       if (!reindex_relation(heapOid, true))
+               ereport(NOTICE,
+                               (errmsg("table \"%s\" has no indexes",
+                                               relation->relname)));
 }
 
 /*
  * ReindexDatabase
  *             Recreate indexes of a database.
+ *
+ * To reduce the probability of deadlocks, each table is reindexed in a
+ * separate transaction, so we can release the lock on it right away.
  */
 void
-ReindexDatabase(const char *dbname, bool force, bool all)
+ReindexDatabase(const char *databaseName, bool do_system, bool do_user)
 {
        Relation        relationRelation;
        HeapScanDesc scan;
        HeapTuple       tuple;
        MemoryContext private_context;
        MemoryContext old;
-       int                     relcnt,
-                               relalc,
-                               i,
-                               oncealc = 200;
-       Oid                *relids = (Oid *) NULL;
-
-       AssertArg(dbname);
+       List       *relids = NIL;
+       ListCell   *l;
 
-       if (strcmp(dbname, DatabaseName) != 0)
-               elog(ERROR, "REINDEX DATABASE: Can be executed only on the currently open database.");
+       AssertArg(databaseName);
 
-       if (!(superuser() || is_dbadmin(MyDatabaseId)))
-               elog(ERROR, "REINDEX DATABASE: Permission denied.");
+       if (strcmp(databaseName, get_database_name(MyDatabaseId)) != 0)
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("can only reindex the currently open database")));
 
-       if (!allowSystemTableMods)
-               elog(ERROR, "must be called under standalone postgres with -O -P options");
-       if (!IsIgnoringSystemIndexes())
-               elog(ERROR, "must be called under standalone postgres with -P -O options");
+       if (!pg_database_ownercheck(MyDatabaseId, GetUserId()))
+               aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
+                                          databaseName);
 
        /*
         * We cannot run inside a user transaction block; if we were inside a
-        * transaction, then our commit- and start-transaction-command calls
-        * would not have the intended effect!
+        * transaction, then our commit- and start-transaction-command calls would
+        * not have the intended effect!
         */
-       if (IsTransactionBlock())
-               elog(ERROR, "REINDEX DATABASE cannot run inside a BEGIN/END block");
+       PreventTransactionChain((void *) databaseName, "REINDEX DATABASE");
 
        /*
-        * Create a memory context that will survive forced transaction
-        * commits we do below.  Since it is a child of QueryContext, it will
-        * go away eventually even if we suffer an error; there's no need for
-        * special abort cleanup logic.
+        * Create a memory context that will survive forced transaction commits we
+        * do below.  Since it is a child of PortalContext, it will go away
+        * eventually even if we suffer an error; there's no need for special
+        * abort cleanup logic.
         */
-       private_context = AllocSetContextCreate(QueryContext,
+       private_context = AllocSetContextCreate(PortalContext,
                                                                                        "ReindexDatabase",
                                                                                        ALLOCSET_DEFAULT_MINSIZE,
                                                                                        ALLOCSET_DEFAULT_INITSIZE,
                                                                                        ALLOCSET_DEFAULT_MAXSIZE);
 
+       /*
+        * We always want to reindex pg_class first.  This ensures that if there
+        * is any corruption in pg_class' indexes, they will be fixed before we
+        * process any other tables.  This is critical because reindexing itself
+        * will try to update pg_class.
+        */
+       if (do_system)
+       {
+               old = MemoryContextSwitchTo(private_context);
+               relids = lappend_oid(relids, RelationRelationId);
+               MemoryContextSwitchTo(old);
+       }
+
        /*
         * Scan pg_class to build a list of the relations we need to reindex.
+        *
+        * We only consider plain relations here (toast rels will be processed
+        * indirectly by reindex_relation).
         */
-       relationRelation = heap_openr(RelationRelationName, AccessShareLock);
+       relationRelation = heap_open(RelationRelationId, AccessShareLock);
        scan = heap_beginscan(relationRelation, SnapshotNow, 0, NULL);
-       relcnt = relalc = 0;
        while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
        {
-               if (!all)
+               Form_pg_class classtuple = (Form_pg_class) GETSTRUCT(tuple);
+
+               if (classtuple->relkind != RELKIND_RELATION)
+                       continue;
+
+               /* Check user/system classification, and optionally skip */
+               if (IsSystemClass(classtuple))
                {
-                       if (!IsSystemClass((Form_pg_class) GETSTRUCT(tuple)))
+                       if (!do_system)
                                continue;
                }
-               if (((Form_pg_class) GETSTRUCT(tuple))->relkind == RELKIND_RELATION)
+               else
                {
-                       old = MemoryContextSwitchTo(private_context);
-                       if (relcnt == 0)
-                       {
-                               relalc = oncealc;
-                               relids = palloc(sizeof(Oid) * relalc);
-                       }
-                       else if (relcnt >= relalc)
-                       {
-                               relalc *= 2;
-                               relids = repalloc(relids, sizeof(Oid) * relalc);
-                       }
-                       MemoryContextSwitchTo(old);
-                       relids[relcnt] = tuple->t_data->t_oid;
-                       relcnt++;
+                       if (!do_user)
+                               continue;
                }
+
+               if (IsUnderPostmaster)  /* silently ignore shared tables */
+               {
+                       if (classtuple->relisshared)
+                               continue;
+               }
+
+               if (HeapTupleGetOid(tuple) == RelationRelationId)
+                       continue;                       /* got it already */
+
+               old = MemoryContextSwitchTo(private_context);
+               relids = lappend_oid(relids, HeapTupleGetOid(tuple));
+               MemoryContextSwitchTo(old);
        }
        heap_endscan(scan);
        heap_close(relationRelation, AccessShareLock);
 
        /* Now reindex each rel in a separate transaction */
        CommitTransactionCommand();
-       for (i = 0; i < relcnt; i++)
+       foreach(l, relids)
        {
+               Oid                     relid = lfirst_oid(l);
+
                StartTransactionCommand();
-               if (reindex_relation(relids[i], force))
-                       elog(WARNING, "relation %u was reindexed", relids[i]);
+               /* functions in indexes may want a snapshot set */
+               ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
+               if (reindex_relation(relid, true))
+                       ereport(NOTICE,
+                                       (errmsg("table \"%s\" was reindexed",
+                                                       get_rel_name(relid))));
                CommitTransactionCommand();
        }
        StartTransactionCommand();