]> granicus.if.org Git - postgresql/blobdiff - src/backend/catalog/heap.c
Revise collation derivation method and expression-tree representation.
[postgresql] / src / backend / catalog / heap.c
index a3e629307452ab6fff7b49f5436e8c86d69b5a72..567eb7fd6ebb42acff259e71235c651f2440a3a6 100644 (file)
@@ -3,12 +3,12 @@
  * heap.c
  *       code to create and destroy POSTGRES heap relations
  *
- * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/catalog/heap.c,v 1.299 2006/05/10 23:18:39 tgl Exp $
+ *       src/backend/catalog/heap.c
  *
  *
  * INTERFACE ROUTINES
  */
 #include "postgres.h"
 
-#include "access/heapam.h"
 #include "access/genam.h"
+#include "access/heapam.h"
+#include "access/sysattr.h"
+#include "access/transam.h"
+#include "access/xact.h"
 #include "catalog/catalog.h"
 #include "catalog/dependency.h"
 #include "catalog/heap.h"
 #include "catalog/index.h"
 #include "catalog/indexing.h"
+#include "catalog/namespace.h"
+#include "catalog/objectaccess.h"
 #include "catalog/pg_attrdef.h"
+#include "catalog/pg_collation.h"
 #include "catalog/pg_constraint.h"
+#include "catalog/pg_foreign_table.h"
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_namespace.h"
 #include "catalog/pg_statistic.h"
+#include "catalog/pg_tablespace.h"
 #include "catalog/pg_type.h"
+#include "catalog/pg_type_fn.h"
+#include "catalog/storage.h"
 #include "commands/tablecmds.h"
-#include "commands/trigger.h"
+#include "commands/typecmds.h"
 #include "miscadmin.h"
-#include "nodes/makefuncs.h"
-#include "optimizer/clauses.h"
-#include "optimizer/planmain.h"
+#include "nodes/nodeFuncs.h"
 #include "optimizer/var.h"
 #include "parser/parse_coerce.h"
+#include "parser/parse_collate.h"
 #include "parser/parse_expr.h"
 #include "parser/parse_relation.h"
-#include "rewrite/rewriteRemove.h"
+#include "storage/bufmgr.h"
+#include "storage/freespace.h"
 #include "storage/smgr.h"
+#include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/relcache.h"
+#include "utils/snapmgr.h"
 #include "utils/syscache.h"
+#include "utils/tqual.h"
 
 
+/* Potentially set by contrib/pg_upgrade_support functions */
+Oid                    binary_upgrade_next_heap_pg_class_oid = InvalidOid;
+Oid                    binary_upgrade_next_toast_pg_class_oid = InvalidOid;
+
 static void AddNewRelationTuple(Relation pg_class_desc,
                                        Relation new_rel_desc,
-                                       Oid new_rel_oid, Oid new_type_oid,
+                                       Oid new_rel_oid,
+                                       Oid new_type_oid,
+                                       Oid reloftype,
                                        Oid relowner,
-                                       char relkind);
+                                       char relkind,
+                                       Datum relacl,
+                                       Datum reloptions);
 static Oid AddNewRelationType(const char *typeName,
                                   Oid typeNamespace,
                                   Oid new_rel_oid,
-                                  char new_rel_kind);
+                                  char new_rel_kind,
+                                  Oid ownerid,
+                                  Oid new_row_type,
+                                  Oid new_array_type);
 static void RelationRemoveInheritance(Oid relid);
-static void StoreRelCheck(Relation rel, char *ccname, char *ccbin);
-static void StoreConstraints(Relation rel, TupleDesc tupdesc);
+static void StoreRelCheck(Relation rel, char *ccname, Node *expr,
+                         bool is_local, int inhcount);
+static void StoreConstraints(Relation rel, List *cooked_constraints);
+static bool MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr,
+                                                       bool allow_merge, bool is_local);
 static void SetRelationNumChecks(Relation rel, int numchecks);
+static Node *cookConstraint(ParseState *pstate,
+                          Node *raw_constraint,
+                          char *relname);
+static List *insert_ordered_unique_oid(List *list, Oid datum);
 
 
 /* ----------------------------------------------------------------
@@ -93,6 +124,12 @@ static void SetRelationNumChecks(Relation rel, int numchecks);
  *             Disadvantage:  special cases will be all over the place.
  */
 
+/*
+ * The initializers below do not include the attoptions or attacl fields,
+ * but that's OK - we're never going to reference anything beyond the
+ * fixed-size portion of the structure anyway.
+ */
+
 static FormData_pg_attribute a1 = {
        0, {"ctid"}, TIDOID, 0, sizeof(ItemPointerData),
        SelfItemPointerAttributeNumber, 0, -1, -1,
@@ -204,7 +241,9 @@ heap_create(const char *relname,
                        Oid relid,
                        TupleDesc tupDesc,
                        char relkind,
+                       char relpersistence,
                        bool shared_relation,
+                       bool mapped_relation,
                        bool allow_system_table_mods)
 {
        bool            create_storage;
@@ -233,6 +272,7 @@ heap_create(const char *relname,
        {
                case RELKIND_VIEW:
                case RELKIND_COMPOSITE_TYPE:
+               case RELKIND_FOREIGN_TABLE:
                        create_storage = false;
 
                        /*
@@ -275,16 +315,20 @@ heap_create(const char *relname,
                                                                         tupDesc,
                                                                         relid,
                                                                         reltablespace,
-                                                                        shared_relation);
+                                                                        shared_relation,
+                                                                        mapped_relation,
+                                                                        relpersistence);
 
        /*
-        * have the storage manager create the relation's disk file, if needed.
+        * Have the storage manager create the relation's disk file, if needed.
+        *
+        * We only create the main fork here, other forks will be created on
+        * demand.
         */
        if (create_storage)
        {
-               Assert(rel->rd_smgr == NULL);
                RelationOpenSmgr(rel);
-               smgrcreate(rel->rd_smgr, rel->rd_istemp, false);
+               RelationCreateStorage(rel->rd_node, relpersistence);
        }
 
        return rel;
@@ -330,7 +374,8 @@ heap_create(const char *relname,
  * --------------------------------
  */
 void
-CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind)
+CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind,
+                                                bool allow_system_table_mods)
 {
        int                     i;
        int                     j;
@@ -373,7 +418,7 @@ CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind)
                                           NameStr(tupdesc->attrs[i]->attname)) == 0)
                                ereport(ERROR,
                                                (errcode(ERRCODE_DUPLICATE_COLUMN),
-                                                errmsg("column name \"%s\" is duplicated",
+                                                errmsg("column name \"%s\" specified more than once",
                                                                NameStr(tupdesc->attrs[j]->attname))));
                }
        }
@@ -384,7 +429,9 @@ CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind)
        for (i = 0; i < natts; i++)
        {
                CheckAttributeType(NameStr(tupdesc->attrs[i]->attname),
-                                                  tupdesc->attrs[i]->atttypid);
+                                                  tupdesc->attrs[i]->atttypid,
+                                                  tupdesc->attrs[i]->attcollation,
+                                                  allow_system_table_mods);
        }
 }
 
@@ -397,30 +444,136 @@ CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind)
  * --------------------------------
  */
 void
-CheckAttributeType(const char *attname, Oid atttypid)
+CheckAttributeType(const char *attname, Oid atttypid, Oid attcollation,
+                                  bool allow_system_table_mods)
 {
        char            att_typtype = get_typtype(atttypid);
 
-       /*
-        * Warn user, but don't fail, if column to be created has UNKNOWN type
-        * (usually as a result of a 'retrieve into' - jolly)
-        *
-        * Refuse any attempt to create a pseudo-type column.
-        */
        if (atttypid == UNKNOWNOID)
+       {
+               /*
+                * Warn user, but don't fail, if column to be created has UNKNOWN type
+                * (usually as a result of a 'retrieve into' - jolly)
+                */
                ereport(WARNING,
                                (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
                                 errmsg("column \"%s\" has type \"unknown\"", attname),
                                 errdetail("Proceeding with relation creation anyway.")));
-       else if (att_typtype == 'p')
+       }
+       else if (att_typtype == TYPTYPE_PSEUDO)
        {
-               /* Special hack for pg_statistic: allow ANYARRAY during initdb */
-               if (atttypid != ANYARRAYOID || IsUnderPostmaster)
+               /*
+                * Refuse any attempt to create a pseudo-type column, except for a
+                * special hack for pg_statistic: allow ANYARRAY when modifying system
+                * catalogs (this allows creating pg_statistic and cloning it during
+                * VACUUM FULL)
+                */
+               if (atttypid != ANYARRAYOID || !allow_system_table_mods)
                        ereport(ERROR,
                                        (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
                                         errmsg("column \"%s\" has pseudo-type %s",
                                                        attname, format_type_be(atttypid))));
        }
+       else if (att_typtype == TYPTYPE_COMPOSITE)
+       {
+               /*
+                * For a composite type, recurse into its attributes.  You might think
+                * this isn't necessary, but since we allow system catalogs to break
+                * the rule, we have to guard against the case.
+                */
+               Relation        relation;
+               TupleDesc       tupdesc;
+               int                     i;
+
+               relation = relation_open(get_typ_typrelid(atttypid), AccessShareLock);
+
+               tupdesc = RelationGetDescr(relation);
+
+               for (i = 0; i < tupdesc->natts; i++)
+               {
+                       Form_pg_attribute attr = tupdesc->attrs[i];
+
+                       if (attr->attisdropped)
+                               continue;
+                       CheckAttributeType(NameStr(attr->attname), attr->atttypid, attr->attcollation,
+                                                          allow_system_table_mods);
+               }
+
+               relation_close(relation, AccessShareLock);
+       }
+
+       /*
+        * This might not be strictly invalid per SQL standard, but it is
+        * pretty useless, and it cannot be dumped, so we must disallow
+        * it.
+        */
+       if (type_is_collatable(atttypid) && !OidIsValid(attcollation))
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+                                        errmsg("no collation was derived for column \"%s\" with collatable type %s",
+                                                       attname, format_type_be(atttypid)),
+                                        errhint("Use the COLLATE clause to set the collation explicitly.")));
+}
+
+/*
+ * InsertPgAttributeTuple
+ *             Construct and insert a new tuple in pg_attribute.
+ *
+ * Caller has already opened and locked pg_attribute.  new_attribute is the
+ * attribute to insert (but we ignore attacl and attoptions, which are always
+ * initialized to NULL).
+ *
+ * indstate is the index state for CatalogIndexInsert. It can be passed as
+ * NULL, in which case we'll fetch the necessary info.  (Don't do this when
+ * inserting multiple attributes, because it's a tad more expensive.)
+ */
+void
+InsertPgAttributeTuple(Relation pg_attribute_rel,
+                                          Form_pg_attribute new_attribute,
+                                          CatalogIndexState indstate)
+{
+       Datum           values[Natts_pg_attribute];
+       bool            nulls[Natts_pg_attribute];
+       HeapTuple       tup;
+
+       /* This is a tad tedious, but way cleaner than what we used to do... */
+       memset(values, 0, sizeof(values));
+       memset(nulls, false, sizeof(nulls));
+
+       values[Anum_pg_attribute_attrelid - 1] = ObjectIdGetDatum(new_attribute->attrelid);
+       values[Anum_pg_attribute_attname - 1] = NameGetDatum(&new_attribute->attname);
+       values[Anum_pg_attribute_atttypid - 1] = ObjectIdGetDatum(new_attribute->atttypid);
+       values[Anum_pg_attribute_attstattarget - 1] = Int32GetDatum(new_attribute->attstattarget);
+       values[Anum_pg_attribute_attlen - 1] = Int16GetDatum(new_attribute->attlen);
+       values[Anum_pg_attribute_attnum - 1] = Int16GetDatum(new_attribute->attnum);
+       values[Anum_pg_attribute_attndims - 1] = Int32GetDatum(new_attribute->attndims);
+       values[Anum_pg_attribute_attcacheoff - 1] = Int32GetDatum(new_attribute->attcacheoff);
+       values[Anum_pg_attribute_atttypmod - 1] = Int32GetDatum(new_attribute->atttypmod);
+       values[Anum_pg_attribute_attbyval - 1] = BoolGetDatum(new_attribute->attbyval);
+       values[Anum_pg_attribute_attstorage - 1] = CharGetDatum(new_attribute->attstorage);
+       values[Anum_pg_attribute_attalign - 1] = CharGetDatum(new_attribute->attalign);
+       values[Anum_pg_attribute_attnotnull - 1] = BoolGetDatum(new_attribute->attnotnull);
+       values[Anum_pg_attribute_atthasdef - 1] = BoolGetDatum(new_attribute->atthasdef);
+       values[Anum_pg_attribute_attisdropped - 1] = BoolGetDatum(new_attribute->attisdropped);
+       values[Anum_pg_attribute_attislocal - 1] = BoolGetDatum(new_attribute->attislocal);
+       values[Anum_pg_attribute_attinhcount - 1] = Int32GetDatum(new_attribute->attinhcount);
+       values[Anum_pg_attribute_attcollation - 1] = ObjectIdGetDatum(new_attribute->attcollation);
+
+       /* start out with empty permissions and empty options */
+       nulls[Anum_pg_attribute_attacl - 1] = true;
+       nulls[Anum_pg_attribute_attoptions - 1] = true;
+
+       tup = heap_form_tuple(RelationGetDescr(pg_attribute_rel), values, nulls);
+
+       /* finally insert the new tuple, update the indexes, and clean up */
+       simple_heap_insert(pg_attribute_rel, tup);
+
+       if (indstate != NULL)
+               CatalogIndexInsert(indstate, tup);
+       else
+               CatalogUpdateIndexes(pg_attribute_rel, tup);
+
+       heap_freetuple(tup);
 }
 
 /* --------------------------------
@@ -437,9 +590,8 @@ AddNewAttributeTuples(Oid new_rel_oid,
                                          bool oidislocal,
                                          int oidinhcount)
 {
-       const Form_pg_attribute *dpp;
+       Form_pg_attribute attr;
        int                     i;
-       HeapTuple       tup;
        Relation        rel;
        CatalogIndexState indstate;
        int                     natts = tupdesc->natts;
@@ -457,35 +609,33 @@ AddNewAttributeTuples(Oid new_rel_oid,
         * First we add the user attributes.  This is also a convenient place to
         * add dependencies on their datatypes.
         */
-       dpp = tupdesc->attrs;
        for (i = 0; i < natts; i++)
        {
+               attr = tupdesc->attrs[i];
                /* Fill in the correct relation OID */
-               (*dpp)->attrelid = new_rel_oid;
+               attr->attrelid = new_rel_oid;
                /* Make sure these are OK, too */
-               (*dpp)->attstattarget = -1;
-               (*dpp)->attcacheoff = -1;
-
-               tup = heap_addheader(Natts_pg_attribute,
-                                                        false,
-                                                        ATTRIBUTE_TUPLE_SIZE,
-                                                        (void *) *dpp);
-
-               simple_heap_insert(rel, tup);
-
-               CatalogIndexInsert(indstate, tup);
+               attr->attstattarget = -1;
+               attr->attcacheoff = -1;
 
-               heap_freetuple(tup);
+               InsertPgAttributeTuple(rel, attr, indstate);
 
+               /* Add dependency info */
                myself.classId = RelationRelationId;
                myself.objectId = new_rel_oid;
                myself.objectSubId = i + 1;
                referenced.classId = TypeRelationId;
-               referenced.objectId = (*dpp)->atttypid;
+               referenced.objectId = attr->atttypid;
                referenced.objectSubId = 0;
                recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
 
-               dpp++;
+               if (OidIsValid(attr->attcollation))
+               {
+                       referenced.classId = CollationRelationId;
+                       referenced.objectId = attr->attcollation;
+                       referenced.objectSubId = 0;
+                       recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+               }
        }
 
        /*
@@ -495,44 +645,28 @@ AddNewAttributeTuples(Oid new_rel_oid,
         */
        if (relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE)
        {
-               dpp = SysAtt;
-               for (i = 0; i < -1 - FirstLowInvalidHeapAttributeNumber; i++)
+               for (i = 0; i < (int) lengthof(SysAtt); i++)
                {
-                       if (tupdesc->tdhasoid ||
-                               (*dpp)->attnum != ObjectIdAttributeNumber)
-                       {
-                               Form_pg_attribute attStruct;
-
-                               tup = heap_addheader(Natts_pg_attribute,
-                                                                        false,
-                                                                        ATTRIBUTE_TUPLE_SIZE,
-                                                                        (void *) *dpp);
-                               attStruct = (Form_pg_attribute) GETSTRUCT(tup);
-
-                               /* Fill in the correct relation OID in the copied tuple */
-                               attStruct->attrelid = new_rel_oid;
-
-                               /* Fill in correct inheritance info for the OID column */
-                               if (attStruct->attnum == ObjectIdAttributeNumber)
-                               {
-                                       attStruct->attislocal = oidislocal;
-                                       attStruct->attinhcount = oidinhcount;
-                               }
+                       FormData_pg_attribute attStruct;
 
-                               /*
-                                * Unneeded since they should be OK in the constant data
-                                * anyway
-                                */
-                               /* attStruct->attstattarget = 0; */
-                               /* attStruct->attcacheoff = -1; */
+                       /* skip OID where appropriate */
+                       if (!tupdesc->tdhasoid &&
+                               SysAtt[i]->attnum == ObjectIdAttributeNumber)
+                               continue;
 
-                               simple_heap_insert(rel, tup);
+                       memcpy(&attStruct, (char *) SysAtt[i], sizeof(FormData_pg_attribute));
 
-                               CatalogIndexInsert(indstate, tup);
+                       /* Fill in the correct relation OID in the copied tuple */
+                       attStruct.attrelid = new_rel_oid;
 
-                               heap_freetuple(tup);
+                       /* Fill in correct inheritance info for the OID column */
+                       if (attStruct.attnum == ObjectIdAttributeNumber)
+                       {
+                               attStruct.attislocal = oidislocal;
+                               attStruct.attinhcount = oidinhcount;
                        }
-                       dpp++;
+
+                       InsertPgAttributeTuple(rel, &attStruct, indstate);
                }
        }
 
@@ -544,6 +678,84 @@ AddNewAttributeTuples(Oid new_rel_oid,
        heap_close(rel, RowExclusiveLock);
 }
 
+/* --------------------------------
+ *             InsertPgClassTuple
+ *
+ *             Construct and insert a new tuple in pg_class.
+ *
+ * Caller has already opened and locked pg_class.
+ * Tuple data is taken from new_rel_desc->rd_rel, except for the
+ * variable-width fields which are not present in a cached reldesc.
+ * relacl and reloptions are passed in Datum form (to avoid having
+ * to reference the data types in heap.h).     Pass (Datum) 0 to set them
+ * to NULL.
+ * --------------------------------
+ */
+void
+InsertPgClassTuple(Relation pg_class_desc,
+                                  Relation new_rel_desc,
+                                  Oid new_rel_oid,
+                                  Datum relacl,
+                                  Datum reloptions)
+{
+       Form_pg_class rd_rel = new_rel_desc->rd_rel;
+       Datum           values[Natts_pg_class];
+       bool            nulls[Natts_pg_class];
+       HeapTuple       tup;
+
+       /* This is a tad tedious, but way cleaner than what we used to do... */
+       memset(values, 0, sizeof(values));
+       memset(nulls, false, sizeof(nulls));
+
+       values[Anum_pg_class_relname - 1] = NameGetDatum(&rd_rel->relname);
+       values[Anum_pg_class_relnamespace - 1] = ObjectIdGetDatum(rd_rel->relnamespace);
+       values[Anum_pg_class_reltype - 1] = ObjectIdGetDatum(rd_rel->reltype);
+       values[Anum_pg_class_reloftype - 1] = ObjectIdGetDatum(rd_rel->reloftype);
+       values[Anum_pg_class_relowner - 1] = ObjectIdGetDatum(rd_rel->relowner);
+       values[Anum_pg_class_relam - 1] = ObjectIdGetDatum(rd_rel->relam);
+       values[Anum_pg_class_relfilenode - 1] = ObjectIdGetDatum(rd_rel->relfilenode);
+       values[Anum_pg_class_reltablespace - 1] = ObjectIdGetDatum(rd_rel->reltablespace);
+       values[Anum_pg_class_relpages - 1] = Int32GetDatum(rd_rel->relpages);
+       values[Anum_pg_class_reltuples - 1] = Float4GetDatum(rd_rel->reltuples);
+       values[Anum_pg_class_reltoastrelid - 1] = ObjectIdGetDatum(rd_rel->reltoastrelid);
+       values[Anum_pg_class_reltoastidxid - 1] = ObjectIdGetDatum(rd_rel->reltoastidxid);
+       values[Anum_pg_class_relhasindex - 1] = BoolGetDatum(rd_rel->relhasindex);
+       values[Anum_pg_class_relisshared - 1] = BoolGetDatum(rd_rel->relisshared);
+       values[Anum_pg_class_relpersistence - 1] = CharGetDatum(rd_rel->relpersistence);
+       values[Anum_pg_class_relkind - 1] = CharGetDatum(rd_rel->relkind);
+       values[Anum_pg_class_relnatts - 1] = Int16GetDatum(rd_rel->relnatts);
+       values[Anum_pg_class_relchecks - 1] = Int16GetDatum(rd_rel->relchecks);
+       values[Anum_pg_class_relhasoids - 1] = BoolGetDatum(rd_rel->relhasoids);
+       values[Anum_pg_class_relhaspkey - 1] = BoolGetDatum(rd_rel->relhaspkey);
+       values[Anum_pg_class_relhasrules - 1] = BoolGetDatum(rd_rel->relhasrules);
+       values[Anum_pg_class_relhastriggers - 1] = BoolGetDatum(rd_rel->relhastriggers);
+       values[Anum_pg_class_relhassubclass - 1] = BoolGetDatum(rd_rel->relhassubclass);
+       values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid);
+       if (relacl != (Datum) 0)
+               values[Anum_pg_class_relacl - 1] = relacl;
+       else
+               nulls[Anum_pg_class_relacl - 1] = true;
+       if (reloptions != (Datum) 0)
+               values[Anum_pg_class_reloptions - 1] = reloptions;
+       else
+               nulls[Anum_pg_class_reloptions - 1] = true;
+
+       tup = heap_form_tuple(RelationGetDescr(pg_class_desc), values, nulls);
+
+       /*
+        * The new tuple must have the oid already chosen for the rel.  Sure would
+        * be embarrassing to do this sort of thing in polite company.
+        */
+       HeapTupleSetOid(tup, new_rel_oid);
+
+       /* finally insert the new tuple, update the indexes, and clean up */
+       simple_heap_insert(pg_class_desc, tup);
+
+       CatalogUpdateIndexes(pg_class_desc, tup);
+
+       heap_freetuple(tup);
+}
+
 /* --------------------------------
  *             AddNewRelationTuple
  *
@@ -556,11 +768,13 @@ AddNewRelationTuple(Relation pg_class_desc,
                                        Relation new_rel_desc,
                                        Oid new_rel_oid,
                                        Oid new_type_oid,
+                                       Oid reloftype,
                                        Oid relowner,
-                                       char relkind)
+                                       char relkind,
+                                       Datum relacl,
+                                       Datum reloptions)
 {
        Form_pg_class new_rel_reltup;
-       HeapTuple       tup;
 
        /*
         * first we update some of the information in our uncataloged relation's
@@ -589,33 +803,38 @@ AddNewRelationTuple(Relation pg_class_desc,
                        break;
        }
 
+       /* Initialize relfrozenxid */
+       if (relkind == RELKIND_RELATION ||
+               relkind == RELKIND_TOASTVALUE)
+       {
+               /*
+                * Initialize to the minimum XID that could put tuples in the table.
+                * We know that no xacts older than RecentXmin are still running, so
+                * that will do.
+                */
+               new_rel_reltup->relfrozenxid = RecentXmin;
+       }
+       else
+       {
+               /*
+                * Other relation types will not contain XIDs, so set relfrozenxid to
+                * InvalidTransactionId.  (Note: a sequence does contain a tuple, but
+                * we force its xmin to be FrozenTransactionId always; see
+                * commands/sequence.c.)
+                */
+               new_rel_reltup->relfrozenxid = InvalidTransactionId;
+       }
+
        new_rel_reltup->relowner = relowner;
        new_rel_reltup->reltype = new_type_oid;
+       new_rel_reltup->reloftype = reloftype;
        new_rel_reltup->relkind = relkind;
 
        new_rel_desc->rd_att->tdtypeid = new_type_oid;
 
-       /* ----------------
-        *      now form a tuple to add to pg_class
-        *      XXX Natts_pg_class_fixed is a hack - see pg_class.h
-        * ----------------
-        */
-       tup = heap_addheader(Natts_pg_class_fixed,
-                                                true,
-                                                CLASS_TUPLE_SIZE,
-                                                (void *) new_rel_reltup);
-
-       /* force tuple to have the desired OID */
-       HeapTupleSetOid(tup, new_rel_oid);
-
-       /*
-        * finally insert the new tuple, update the indexes, and clean up.
-        */
-       simple_heap_insert(pg_class_desc, tup);
-
-       CatalogUpdateIndexes(pg_class_desc, tup);
-
-       heap_freetuple(tup);
+       /* Now build and insert the tuple */
+       InsertPgClassTuple(pg_class_desc, new_rel_desc, new_rel_oid,
+                                          relacl, reloptions);
 }
 
 
@@ -629,22 +848,33 @@ static Oid
 AddNewRelationType(const char *typeName,
                                   Oid typeNamespace,
                                   Oid new_rel_oid,
-                                  char new_rel_kind)
+                                  char new_rel_kind,
+                                  Oid ownerid,
+                                  Oid new_row_type,
+                                  Oid new_array_type)
 {
        return
-               TypeCreate(typeName,    /* type name */
+               TypeCreate(new_row_type,        /* optional predetermined OID */
+                                  typeName,    /* type name */
                                   typeNamespace,               /* type namespace */
                                   new_rel_oid, /* relation oid */
                                   new_rel_kind,        /* relation kind */
+                                  ownerid,             /* owner's ID */
                                   -1,                  /* internal size (varlena) */
-                                  'c',                 /* type-type (complex) */
-                                  ',',                 /* default array delimiter */
+                                  TYPTYPE_COMPOSITE,   /* type-type (composite) */
+                                  TYPCATEGORY_COMPOSITE,               /* type-category (ditto) */
+                                  false,               /* composite types are never preferred */
+                                  DEFAULT_TYPDELIM,    /* default array delimiter */
                                   F_RECORD_IN, /* input procedure */
                                   F_RECORD_OUT,        /* output procedure */
                                   F_RECORD_RECV,               /* receive procedure */
                                   F_RECORD_SEND,               /* send procedure */
+                                  InvalidOid,  /* typmodin procedure - none */
+                                  InvalidOid,  /* typmodout procedure - none */
                                   InvalidOid,  /* analyze procedure - default */
                                   InvalidOid,  /* array element type - irrelevant */
+                                  false,               /* this is not an array type */
+                                  new_array_type,              /* array type if any */
                                   InvalidOid,  /* domain base type - irrelevant */
                                   NULL,                /* default value - none */
                                   NULL,                /* default binary representation */
@@ -653,13 +883,36 @@ AddNewRelationType(const char *typeName,
                                   'x',                 /* fully TOASTable */
                                   -1,                  /* typmod */
                                   0,                   /* array dimensions for typBaseType */
-                                  false);              /* Type NOT NULL */
+                                  false,               /* Type NOT NULL */
+                                  InvalidOid); /* typcollation */
 }
 
 /* --------------------------------
  *             heap_create_with_catalog
  *
  *             creates a new cataloged relation.  see comments above.
+ *
+ * Arguments:
+ *     relname: name to give to new rel
+ *     relnamespace: OID of namespace it goes in
+ *     reltablespace: OID of tablespace it goes in
+ *     relid: OID to assign to new rel, or InvalidOid to select a new OID
+ *     reltypeid: OID to assign to rel's rowtype, or InvalidOid to select one
+ *     ownerid: OID of new rel's owner
+ *     tupdesc: tuple descriptor (source of column definitions)
+ *     cooked_constraints: list of precooked check constraints and defaults
+ *     relkind: relkind for new rel
+ *     shared_relation: TRUE if it's to be a shared relation
+ *     mapped_relation: TRUE if the relation will use the relfilenode map
+ *     oidislocal: TRUE if oid column (if any) should be marked attislocal
+ *     oidinhcount: attinhcount to assign to oid column (if any)
+ *     oncommit: ON COMMIT marking (only relevant if it's a temp table)
+ *     reloptions: reloptions in Datum form, or (Datum) 0 if none
+ *     use_user_acl: TRUE if should look for user-defined default permissions;
+ *             if FALSE, relacl is always set NULL
+ *     allow_system_table_mods: TRUE to allow creation in system namespaces
+ *
+ * Returns the OID of the new relation
  * --------------------------------
  */
 Oid
@@ -667,18 +920,30 @@ heap_create_with_catalog(const char *relname,
                                                 Oid relnamespace,
                                                 Oid reltablespace,
                                                 Oid relid,
+                                                Oid reltypeid,
+                                                Oid reloftypeid,
                                                 Oid ownerid,
                                                 TupleDesc tupdesc,
+                                                List *cooked_constraints,
                                                 char relkind,
+                                                char relpersistence,
                                                 bool shared_relation,
+                                                bool mapped_relation,
                                                 bool oidislocal,
                                                 int oidinhcount,
                                                 OnCommitAction oncommit,
-                                                bool allow_system_table_mods)
+                                                Datum reloptions,
+                                                bool use_user_acl,
+                                                bool allow_system_table_mods,
+                                                bool if_not_exists)
 {
        Relation        pg_class_desc;
        Relation        new_rel_desc;
+       Acl                *relacl;
+       Oid                     existing_relid;
+       Oid                     old_type_oid;
        Oid                     new_type_oid;
+       Oid                     new_array_oid = InvalidOid;
 
        pg_class_desc = heap_open(RelationRelationId, RowExclusiveLock);
 
@@ -687,12 +952,55 @@ heap_create_with_catalog(const char *relname,
         */
        Assert(IsNormalProcessingMode() || IsBootstrapProcessingMode());
 
-       CheckAttributeNamesTypes(tupdesc, relkind);
+       CheckAttributeNamesTypes(tupdesc, relkind, allow_system_table_mods);
 
-       if (get_relname_relid(relname, relnamespace))
+       /*
+        * If the relation already exists, it's an error, unless the user specifies
+        * "IF NOT EXISTS".  In that case, we just print a notice and do nothing
+        * further.
+        */
+       existing_relid = get_relname_relid(relname, relnamespace);
+       if (existing_relid != InvalidOid)
+       {
+               if (if_not_exists)
+               {
+                       ereport(NOTICE,
+                                       (errcode(ERRCODE_DUPLICATE_TABLE),
+                                        errmsg("relation \"%s\" already exists, skipping",
+                                        relname)));
+                       heap_close(pg_class_desc, RowExclusiveLock);
+                       return InvalidOid;
+               }
                ereport(ERROR,
                                (errcode(ERRCODE_DUPLICATE_TABLE),
                                 errmsg("relation \"%s\" already exists", relname)));
+       }
+
+       /*
+        * Since we are going to create a rowtype as well, also check for
+        * collision with an existing type name.  If there is one and it's an
+        * autogenerated array, we can rename it out of the way; otherwise we can
+        * at least give a good error message.
+        */
+       old_type_oid = GetSysCacheOid2(TYPENAMENSP,
+                                                                  CStringGetDatum(relname),
+                                                                  ObjectIdGetDatum(relnamespace));
+       if (OidIsValid(old_type_oid))
+       {
+               if (!moveArrayTypeName(old_type_oid, relname, relnamespace))
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_DUPLICATE_OBJECT),
+                                        errmsg("type \"%s\" already exists", relname),
+                          errhint("A relation has an associated type of the same name, "
+                                          "so you must use a name that doesn't conflict "
+                                          "with any existing type.")));
+       }
+
+       /*
+        * Shared relations must be in pg_global (last-ditch check)
+        */
+       if (shared_relation && reltablespace != GLOBALTABLESPACE_OID)
+               elog(ERROR, "shared relations must be placed in pg_global tablespace");
 
        /*
         * Allocate an OID for the relation, unless we were told what to use.
@@ -701,8 +1009,54 @@ heap_create_with_catalog(const char *relname,
         * collide with either pg_class OIDs or existing physical files.
         */
        if (!OidIsValid(relid))
-               relid = GetNewRelFileNode(reltablespace, shared_relation,
-                                                                 pg_class_desc);
+       {
+               /*
+                *      Use binary-upgrade override for pg_class.oid/relfilenode,
+                *      if supplied.
+                */
+               if (OidIsValid(binary_upgrade_next_heap_pg_class_oid) &&
+                       (relkind == RELKIND_RELATION || relkind == RELKIND_SEQUENCE ||
+                        relkind == RELKIND_VIEW || relkind == RELKIND_COMPOSITE_TYPE ||
+                        relkind == RELKIND_FOREIGN_TABLE))
+               {
+                       relid = binary_upgrade_next_heap_pg_class_oid;
+                       binary_upgrade_next_heap_pg_class_oid = InvalidOid;
+               }
+               else if (OidIsValid(binary_upgrade_next_toast_pg_class_oid) &&
+                                relkind == RELKIND_TOASTVALUE)
+               {
+                       relid = binary_upgrade_next_toast_pg_class_oid;
+                       binary_upgrade_next_toast_pg_class_oid = InvalidOid;
+               }
+               else
+                       relid = GetNewRelFileNode(reltablespace, pg_class_desc,
+                                                                         relpersistence);
+       }
+
+       /*
+        * Determine the relation's initial permissions.
+        */
+       if (use_user_acl)
+       {
+               switch (relkind)
+               {
+                       case RELKIND_RELATION:
+                       case RELKIND_VIEW:
+                       case RELKIND_FOREIGN_TABLE:
+                               relacl = get_user_default_acl(ACL_OBJECT_RELATION, ownerid,
+                                                                                         relnamespace);
+                               break;
+                       case RELKIND_SEQUENCE:
+                               relacl = get_user_default_acl(ACL_OBJECT_SEQUENCE, ownerid,
+                                                                                         relnamespace);
+                               break;
+                       default:
+                               relacl = NULL;
+                               break;
+               }
+       }
+       else
+               relacl = NULL;
 
        /*
         * Create the relcache entry (mostly dummy at this point) and the physical
@@ -715,22 +1069,87 @@ heap_create_with_catalog(const char *relname,
                                                           relid,
                                                           tupdesc,
                                                           relkind,
+                                                          relpersistence,
                                                           shared_relation,
+                                                          mapped_relation,
                                                           allow_system_table_mods);
 
        Assert(relid == RelationGetRelid(new_rel_desc));
 
        /*
-        * since defining a relation also defines a complex type, we add a new
-        * system type corresponding to the new relation.
+        * Decide whether to create an array type over the relation's rowtype. We
+        * do not create any array types for system catalogs (ie, those made
+        * during initdb).      We create array types for regular relations, views,
+        * composite types and foreign tables ... but not, eg, for toast tables or
+        * sequences.
+        */
+       if (IsUnderPostmaster && (relkind == RELKIND_RELATION ||
+                                                         relkind == RELKIND_VIEW ||
+                                                         relkind == RELKIND_FOREIGN_TABLE ||
+                                                         relkind == RELKIND_COMPOSITE_TYPE))
+               new_array_oid = AssignTypeArrayOid();
+
+       /*
+        * Since defining a relation also defines a complex type, we add a new
+        * system type corresponding to the new relation.  The OID of the type can
+        * be preselected by the caller, but if reltypeid is InvalidOid, we'll
+        * generate a new OID for it.
         *
-        * NOTE: we could get a unique-index failure here, in case the same name
-        * has already been used for a type.
+        * NOTE: we could get a unique-index failure here, in case someone else is
+        * creating the same type name in parallel but hadn't committed yet when
+        * we checked for a duplicate name above.
         */
        new_type_oid = AddNewRelationType(relname,
                                                                          relnamespace,
                                                                          relid,
-                                                                         relkind);
+                                                                         relkind,
+                                                                         ownerid,
+                                                                         reltypeid,
+                                                                         new_array_oid);
+
+       /*
+        * Now make the array type if wanted.
+        */
+       if (OidIsValid(new_array_oid))
+       {
+               char       *relarrayname;
+
+               relarrayname = makeArrayTypeName(relname, relnamespace);
+
+               TypeCreate(new_array_oid,               /* force the type's OID to this */
+                                  relarrayname,        /* Array type name */
+                                  relnamespace,        /* Same namespace as parent */
+                                  InvalidOid,  /* Not composite, no relationOid */
+                                  0,                   /* relkind, also N/A here */
+                                  ownerid,             /* owner's ID */
+                                  -1,                  /* Internal size (varlena) */
+                                  TYPTYPE_BASE,        /* Not composite - typelem is */
+                                  TYPCATEGORY_ARRAY,   /* type-category (array) */
+                                  false,               /* array types are never preferred */
+                                  DEFAULT_TYPDELIM,    /* default array delimiter */
+                                  F_ARRAY_IN,  /* array input proc */
+                                  F_ARRAY_OUT, /* array output proc */
+                                  F_ARRAY_RECV,        /* array recv (bin) proc */
+                                  F_ARRAY_SEND,        /* array send (bin) proc */
+                                  InvalidOid,  /* typmodin procedure - none */
+                                  InvalidOid,  /* typmodout procedure - none */
+                                  InvalidOid,  /* analyze procedure - default */
+                                  new_type_oid,        /* array element type - the rowtype */
+                                  true,                /* yes, this is an array type */
+                                  InvalidOid,  /* this has no array type */
+                                  InvalidOid,  /* domain base type - irrelevant */
+                                  NULL,                /* default value - none */
+                                  NULL,                /* default binary representation */
+                                  false,               /* passed by reference */
+                                  'd',                 /* alignment - must be the largest! */
+                                  'x',                 /* fully TOASTable */
+                                  -1,                  /* typmod */
+                                  0,                   /* array dimensions for typBaseType */
+                                  false,               /* Type NOT NULL */
+                                  InvalidOid); /* typcollation */
+
+               pfree(relarrayname);
+       }
 
        /*
         * now create an entry in pg_class for the relation.
@@ -743,8 +1162,11 @@ heap_create_with_catalog(const char *relname,
                                                new_rel_desc,
                                                relid,
                                                new_type_oid,
+                                               reloftypeid,
                                                ownerid,
-                                               relkind);
+                                               relkind,
+                                               PointerGetDatum(relacl),
+                                               reloptions);
 
        /*
         * now add tuples to pg_attribute for the attributes in our new relation.
@@ -753,13 +1175,23 @@ heap_create_with_catalog(const char *relname,
                                                  oidislocal, oidinhcount);
 
        /*
-        * make a dependency link to force the relation to be deleted if its
-        * namespace is.  Skip this in bootstrap mode, since we don't make
-        * dependencies while bootstrapping.
+        * Make a dependency link to force the relation to be deleted if its
+        * namespace is.  Also make a dependency link to its owner, as well as
+        * dependencies for any roles mentioned in the default ACL.
+        *
+        * For composite types, these dependencies are tracked for the pg_type
+        * entry, so we needn't record them here.  Likewise, TOAST tables don't
+        * need a namespace dependency (they live in a pinned namespace) nor an
+        * owner dependency (they depend indirectly through the parent table), nor
+        * should they have any ACL entries.  The same applies for extension
+        * dependencies.
         *
-        * Also make a dependency link to its owner.
+        * Also, skip this in bootstrap mode, since we don't make dependencies
+        * while bootstrapping.
         */
-       if (!IsBootstrapProcessingMode())
+       if (relkind != RELKIND_COMPOSITE_TYPE &&
+               relkind != RELKIND_TOASTVALUE &&
+               !IsBootstrapProcessingMode())
        {
                ObjectAddress myself,
                                        referenced;
@@ -772,23 +1204,42 @@ heap_create_with_catalog(const char *relname,
                referenced.objectSubId = 0;
                recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
 
-               /*
-                * For composite types, the dependency on owner is tracked for the
-                * pg_type entry, so don't record it here.  All other relkinds need
-                * their ownership tracked.
-                */
-               if (relkind != RELKIND_COMPOSITE_TYPE)
-                       recordDependencyOnOwner(RelationRelationId, relid, ownerid);
+               recordDependencyOnOwner(RelationRelationId, relid, ownerid);
+
+               recordDependencyOnCurrentExtension(&myself);
+
+               if (reloftypeid)
+               {
+                       referenced.classId = TypeRelationId;
+                       referenced.objectId = reloftypeid;
+                       referenced.objectSubId = 0;
+                       recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+               }
+
+               if (relacl != NULL)
+               {
+                       int                     nnewmembers;
+                       Oid                *newmembers;
+
+                       nnewmembers = aclmembers(relacl, &newmembers);
+                       updateAclDependencies(RelationRelationId, relid, 0,
+                                                                 ownerid,
+                                                                 0, NULL,
+                                                                 nnewmembers, newmembers);
+               }
        }
 
+       /* Post creation hook for new relation */
+       InvokeObjectAccessHook(OAT_POST_CREATE, RelationRelationId, relid, 0);
+
        /*
-        * store constraints and defaults passed in the tupdesc, if any.
+        * Store any supplied constraints and defaults.
         *
         * NB: this may do a CommandCounterIncrement and rebuild the relcache
         * entry, so the relation must be valid and self-consistent at this point.
         * In particular, there are not yet constraints and defaults anywhere.
         */
-       StoreConstraints(new_rel_desc, tupdesc);
+       StoreConstraints(new_rel_desc, cooked_constraints);
 
        /*
         * If there's a special on-commit action, remember it
@@ -796,6 +1247,25 @@ heap_create_with_catalog(const char *relname,
        if (oncommit != ONCOMMIT_NOOP)
                register_on_commit_action(relid, oncommit);
 
+       /*
+        * If this is an unlogged relation, it needs an init fork so that it
+        * can be correctly reinitialized on restart.  Since we're going to
+        * do an immediate sync, we ony need to xlog this if archiving or
+        * streaming is enabled.  And the immediate sync is required, because
+        * otherwise there's no guarantee that this will hit the disk before
+        * the next checkpoint moves the redo pointer.
+        */
+       if (relpersistence == RELPERSISTENCE_UNLOGGED)
+       {
+               Assert(relkind == RELKIND_RELATION || relkind == RELKIND_TOASTVALUE);
+
+               smgrcreate(new_rel_desc->rd_smgr, INIT_FORKNUM, false);
+               if (XLogIsNeeded())
+                       log_smgrcreate(&new_rel_desc->rd_smgr->smgr_rnode.node,
+                                                  INIT_FORKNUM);
+               smgrimmedsync(new_rel_desc->rd_smgr, INIT_FORKNUM);
+       }
+
        /*
         * ok, the relation has been cataloged, so close our relations and return
         * the OID of the newly created relation.
@@ -858,9 +1328,7 @@ DeleteRelationTuple(Oid relid)
        /* Grab an appropriate lock on the pg_class relation */
        pg_class_desc = heap_open(RelationRelationId, RowExclusiveLock);
 
-       tup = SearchSysCache(RELOID,
-                                                ObjectIdGetDatum(relid),
-                                                0, 0, 0);
+       tup = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
        if (!HeapTupleIsValid(tup))
                elog(ERROR, "cache lookup failed for relation %u", relid);
 
@@ -937,10 +1405,9 @@ RemoveAttributeById(Oid relid, AttrNumber attnum)
 
        attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
 
-       tuple = SearchSysCacheCopy(ATTNUM,
-                                                          ObjectIdGetDatum(relid),
-                                                          Int16GetDatum(attnum),
-                                                          0, 0);
+       tuple = SearchSysCacheCopy2(ATTNUM,
+                                                               ObjectIdGetDatum(relid),
+                                                               Int16GetDatum(attnum));
        if (!HeapTupleIsValid(tuple))           /* shouldn't happen */
                elog(ERROR, "cache lookup failed for attribute %d of relation %u",
                         attnum, relid);
@@ -1105,10 +1572,9 @@ RemoveAttrDefaultById(Oid attrdefId)
        /* Fix the pg_attribute row */
        attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
 
-       tuple = SearchSysCacheCopy(ATTNUM,
-                                                          ObjectIdGetDatum(myrelid),
-                                                          Int16GetDatum(myattnum),
-                                                          0, 0);
+       tuple = SearchSysCacheCopy2(ATTNUM,
+                                                               ObjectIdGetDatum(myrelid),
+                                                               Int16GetDatum(myattnum));
        if (!HeapTupleIsValid(tuple))           /* shouldn't happen */
                elog(ERROR, "cache lookup failed for attribute %d of relation %u",
                         myattnum, myrelid);
@@ -1150,13 +1616,40 @@ heap_drop_with_catalog(Oid relid)
        rel = relation_open(relid, AccessExclusiveLock);
 
        /*
-        * Schedule unlinking of the relation's physical file at commit.
+        * There can no longer be anyone *else* touching the relation, but we
+        * might still have open queries or cursors, or pending trigger events,
+        * in our own session.
+        */
+       CheckTableNotInUse(rel, "DROP TABLE");
+
+       /*
+        * Delete pg_foreign_table tuple first.
+        */
+       if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+       {
+               Relation    rel;
+               HeapTuple   tuple;
+
+               rel = heap_open(ForeignTableRelationId, RowExclusiveLock);
+
+               tuple = SearchSysCache1(FOREIGNTABLEREL, ObjectIdGetDatum(relid));
+               if (!HeapTupleIsValid(tuple))
+                       elog(ERROR, "cache lookup failed for foreign table %u", relid);
+
+               simple_heap_delete(rel, &tuple->t_self);
+
+               ReleaseSysCache(tuple);
+               heap_close(rel, RowExclusiveLock);
+       }
+
+       /*
+        * Schedule unlinking of the relation's physical files at commit.
         */
        if (rel->rd_rel->relkind != RELKIND_VIEW &&
-               rel->rd_rel->relkind != RELKIND_COMPOSITE_TYPE)
+               rel->rd_rel->relkind != RELKIND_COMPOSITE_TYPE &&
+               rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE)
        {
-               RelationOpenSmgr(rel);
-               smgrscheduleunlink(rel->rd_smgr, rel->rd_istemp);
+               RelationDropStorage(rel);
        }
 
        /*
@@ -1204,17 +1697,16 @@ heap_drop_with_catalog(Oid relid)
 
 /*
  * Store a default expression for column attnum of relation rel.
- * The expression must be presented as a nodeToString() string.
  */
 void
-StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin)
+StoreAttrDefault(Relation rel, AttrNumber attnum, Node *expr)
 {
-       Node       *expr;
+       char       *adbin;
        char       *adsrc;
        Relation        adrel;
        HeapTuple       tuple;
        Datum           values[4];
-       static char nulls[4] = {' ', ' ', ' ', ' '};
+       static bool nulls[4] = {false, false, false, false};
        Relation        attrrel;
        HeapTuple       atttup;
        Form_pg_attribute attStruct;
@@ -1223,12 +1715,12 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin)
                                defobject;
 
        /*
-        * Need to construct source equivalent of given node-string.
+        * Flatten expression to string form for storage.
         */
-       expr = stringToNode(adbin);
+       adbin = nodeToString(expr);
 
        /*
-        * deparse it
+        * Also deparse it to form the mostly-obsolete adsrc field.
         */
        adsrc = deparse_expression(expr,
                                                        deparse_context_for(RelationGetRelationName(rel),
@@ -1240,14 +1732,12 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin)
         */
        values[Anum_pg_attrdef_adrelid - 1] = RelationGetRelid(rel);
        values[Anum_pg_attrdef_adnum - 1] = attnum;
-       values[Anum_pg_attrdef_adbin - 1] = DirectFunctionCall1(textin,
-                                                                                                        CStringGetDatum(adbin));
-       values[Anum_pg_attrdef_adsrc - 1] = DirectFunctionCall1(textin,
-                                                                                                        CStringGetDatum(adsrc));
+       values[Anum_pg_attrdef_adbin - 1] = CStringGetTextDatum(adbin);
+       values[Anum_pg_attrdef_adsrc - 1] = CStringGetTextDatum(adsrc);
 
        adrel = heap_open(AttrDefaultRelationId, RowExclusiveLock);
 
-       tuple = heap_formtuple(adrel->rd_att, values, nulls);
+       tuple = heap_form_tuple(adrel->rd_att, values, nulls);
        attrdefOid = simple_heap_insert(adrel, tuple);
 
        CatalogUpdateIndexes(adrel, tuple);
@@ -1262,6 +1752,7 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin)
        pfree(DatumGetPointer(values[Anum_pg_attrdef_adbin - 1]));
        pfree(DatumGetPointer(values[Anum_pg_attrdef_adsrc - 1]));
        heap_freetuple(tuple);
+       pfree(adbin);
        pfree(adsrc);
 
        /*
@@ -1269,10 +1760,9 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin)
         * exists.
         */
        attrrel = heap_open(AttributeRelationId, RowExclusiveLock);
-       atttup = SearchSysCacheCopy(ATTNUM,
-                                                               ObjectIdGetDatum(RelationGetRelid(rel)),
-                                                               Int16GetDatum(attnum),
-                                                               0, 0);
+       atttup = SearchSysCacheCopy2(ATTNUM,
+                                                                ObjectIdGetDatum(RelationGetRelid(rel)),
+                                                                Int16GetDatum(attnum));
        if (!HeapTupleIsValid(atttup))
                elog(ERROR, "cache lookup failed for attribute %d of relation %u",
                         attnum, RelationGetRelid(rel));
@@ -1305,27 +1795,27 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin)
 
 /*
  * Store a check-constraint expression for the given relation.
- * The expression must be presented as a nodeToString() string.
  *
  * Caller is responsible for updating the count of constraints
  * in the pg_class entry for the relation.
  */
 static void
-StoreRelCheck(Relation rel, char *ccname, char *ccbin)
+StoreRelCheck(Relation rel, char *ccname, Node *expr,
+                         bool is_local, int inhcount)
 {
-       Node       *expr;
+       char       *ccbin;
        char       *ccsrc;
        List       *varList;
        int                     keycount;
        int16      *attNos;
 
        /*
-        * Convert condition to an expression tree.
+        * Flatten expression to string form for storage.
         */
-       expr = stringToNode(ccbin);
+       ccbin = nodeToString(expr);
 
        /*
-        * deparse it
+        * Also deparse it to form the mostly-obsolete consrc field.
         */
        ccsrc = deparse_expression(expr,
                                                        deparse_context_for(RelationGetRelationName(rel),
@@ -1333,13 +1823,13 @@ StoreRelCheck(Relation rel, char *ccname, char *ccbin)
                                                           false, false);
 
        /*
-        * Find columns of rel that are used in ccbin
+        * Find columns of rel that are used in expr
         *
         * NB: pull_var_clause is okay here only because we don't allow subselects
         * in check constraints; it would fail to examine the contents of
         * subselects.
         */
-       varList = pull_var_clause(expr, false);
+       varList = pull_var_clause(expr, PVC_REJECT_PLACEHOLDERS);
        keycount = list_length(varList);
 
        if (keycount > 0)
@@ -1372,39 +1862,47 @@ StoreRelCheck(Relation rel, char *ccname, char *ccbin)
                                                  CONSTRAINT_CHECK,             /* Constraint Type */
                                                  false,        /* Is Deferrable */
                                                  false,        /* Is Deferred */
+                                                 true,         /* Is Validated */
                                                  RelationGetRelid(rel),                /* relation */
                                                  attNos,               /* attrs in the constraint */
                                                  keycount,             /* # attrs in the constraint */
                                                  InvalidOid,   /* not a domain constraint */
+                                                 InvalidOid,   /* no associated index */
                                                  InvalidOid,   /* Foreign key fields */
                                                  NULL,
+                                                 NULL,
+                                                 NULL,
+                                                 NULL,
                                                  0,
                                                  ' ',
                                                  ' ',
                                                  ' ',
-                                                 InvalidOid,   /* no associated index */
-                                                 expr, /* Tree form check constraint */
-                                                 ccbin,        /* Binary form check constraint */
-                                                 ccsrc);               /* Source form check constraint */
-
+                                                 NULL, /* not an exclusion constraint */
+                                                 expr, /* Tree form of check constraint */
+                                                 ccbin,        /* Binary form of check constraint */
+                                                 ccsrc,        /* Source form of check constraint */
+                                                 is_local,             /* conislocal */
+                                                 inhcount);    /* coninhcount */
+
+       pfree(ccbin);
        pfree(ccsrc);
 }
 
 /*
- * Store defaults and constraints passed in via the tuple constraint struct.
+ * Store defaults and constraints (passed as a list of CookedConstraint).
  *
  * NOTE: only pre-cooked expressions will be passed this way, which is to
  * say expressions inherited from an existing relation.  Newly parsed
  * expressions can be added later, by direct calls to StoreAttrDefault
- * and StoreRelCheck (see AddRelationRawConstraints()).
+ * and StoreRelCheck (see AddRelationNewConstraints()).
  */
 static void
-StoreConstraints(Relation rel, TupleDesc tupdesc)
+StoreConstraints(Relation rel, List *cooked_constraints)
 {
-       TupleConstr *constr = tupdesc->constr;
-       int                     i;
+       int                     numchecks = 0;
+       ListCell   *lc;
 
-       if (!constr)
+       if (!cooked_constraints)
                return;                                 /* nothing to do */
 
        /*
@@ -1414,33 +1912,46 @@ StoreConstraints(Relation rel, TupleDesc tupdesc)
         */
        CommandCounterIncrement();
 
-       for (i = 0; i < constr->num_defval; i++)
-               StoreAttrDefault(rel, constr->defval[i].adnum,
-                                                constr->defval[i].adbin);
+       foreach(lc, cooked_constraints)
+       {
+               CookedConstraint *con = (CookedConstraint *) lfirst(lc);
 
-       for (i = 0; i < constr->num_check; i++)
-               StoreRelCheck(rel, constr->check[i].ccname,
-                                         constr->check[i].ccbin);
+               switch (con->contype)
+               {
+                       case CONSTR_DEFAULT:
+                               StoreAttrDefault(rel, con->attnum, con->expr);
+                               break;
+                       case CONSTR_CHECK:
+                               StoreRelCheck(rel, con->name, con->expr,
+                                                         con->is_local, con->inhcount);
+                               numchecks++;
+                               break;
+                       default:
+                               elog(ERROR, "unrecognized constraint type: %d",
+                                        (int) con->contype);
+               }
+       }
 
-       if (constr->num_check > 0)
-               SetRelationNumChecks(rel, constr->num_check);
+       if (numchecks > 0)
+               SetRelationNumChecks(rel, numchecks);
 }
 
 /*
- * AddRelationRawConstraints
+ * AddRelationNewConstraints
  *
- * Add raw (not-yet-transformed) column default expressions and/or constraint
- * check expressions to an existing relation.  This is defined to do both
- * for efficiency in DefineRelation, but of course you can do just one or
- * the other by passing empty lists.
+ * Add new column default expressions and/or constraint check expressions
+ * to an existing relation.  This is defined to do both for efficiency in
+ * DefineRelation, but of course you can do just one or the other by passing
+ * empty lists.
  *
  * rel: relation to be modified
- * rawColDefaults: list of RawColumnDefault structures
- * rawConstraints: list of Constraint nodes
+ * newColDefaults: list of RawColumnDefault structures
+ * newConstraints: list of Constraint nodes
+ * allow_merge: TRUE if check constraints may be merged with existing ones
+ * is_local: TRUE if definition is local, FALSE if it's inherited
  *
- * All entries in rawColDefaults will be processed.  Entries in rawConstraints
- * will be processed only if they are CONSTR_CHECK type and contain a "raw"
- * expression.
+ * All entries in newColDefaults will be processed.  Entries in newConstraints
+ * will be processed only if they are CONSTR_CHECK type.
  *
  * Returns a list of CookedConstraint nodes that shows the cooked form of
  * the default and constraint expressions added to the relation.
@@ -1451,9 +1962,11 @@ StoreConstraints(Relation rel, TupleDesc tupdesc)
  * tuples visible.
  */
 List *
-AddRelationRawConstraints(Relation rel,
-                                                 List *rawColDefaults,
-                                                 List *rawConstraints)
+AddRelationNewConstraints(Relation rel,
+                                                 List *newColDefaults,
+                                                 List *newConstraints,
+                                                 bool allow_merge,
+                                                 bool is_local)
 {
        List       *cookedConstraints = NIL;
        TupleDesc       tupleDesc;
@@ -1492,7 +2005,7 @@ AddRelationRawConstraints(Relation rel,
        /*
         * Process column default expressions.
         */
-       foreach(cell, rawColDefaults)
+       foreach(cell, newColDefaults)
        {
                RawColumnDefault *colDef = (RawColumnDefault *) lfirst(cell);
                Form_pg_attribute atp = rel->rd_att->attrs[colDef->attnum - 1];
@@ -1501,13 +2014,30 @@ AddRelationRawConstraints(Relation rel,
                                                   atp->atttypid, atp->atttypmod,
                                                   NameStr(atp->attname));
 
-               StoreAttrDefault(rel, colDef->attnum, nodeToString(expr));
+               /*
+                * If the expression is just a NULL constant, we do not bother to make
+                * an explicit pg_attrdef entry, since the default behavior is
+                * equivalent.
+                *
+                * Note a nonobvious property of this test: if the column is of a
+                * domain type, what we'll get is not a bare null Const but a
+                * CoerceToDomain expr, so we will not discard the default.  This is
+                * critical because the column default needs to be retained to
+                * override any default that the domain might have.
+                */
+               if (expr == NULL ||
+                       (IsA(expr, Const) &&((Const *) expr)->constisnull))
+                       continue;
+
+               StoreAttrDefault(rel, colDef->attnum, expr);
 
                cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
                cooked->contype = CONSTR_DEFAULT;
                cooked->name = NULL;
                cooked->attnum = colDef->attnum;
                cooked->expr = expr;
+               cooked->is_local = is_local;
+               cooked->inhcount = is_local ? 0 : 1;
                cookedConstraints = lappend(cookedConstraints, cooked);
        }
 
@@ -1516,63 +2046,44 @@ AddRelationRawConstraints(Relation rel,
         */
        numchecks = numoldchecks;
        checknames = NIL;
-       foreach(cell, rawConstraints)
+       foreach(cell, newConstraints)
        {
                Constraint *cdef = (Constraint *) lfirst(cell);
                char       *ccname;
 
-               if (cdef->contype != CONSTR_CHECK || cdef->raw_expr == NULL)
+               if (cdef->contype != CONSTR_CHECK)
                        continue;
-               Assert(cdef->cooked_expr == NULL);
-
-               /*
-                * Transform raw parsetree to executable expression.
-                */
-               expr = transformExpr(pstate, cdef->raw_expr);
 
-               /*
-                * Make sure it yields a boolean result.
-                */
-               expr = coerce_to_boolean(pstate, expr, "CHECK");
+               if (cdef->raw_expr != NULL)
+               {
+                       Assert(cdef->cooked_expr == NULL);
 
-               /*
-                * Make sure no outside relations are referred to.
-                */
-               if (list_length(pstate->p_rtable) != 1)
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
-                       errmsg("only table \"%s\" can be referenced in check constraint",
-                                  RelationGetRelationName(rel))));
+                       /*
+                        * Transform raw parsetree to executable expression, and verify
+                        * it's valid as a CHECK constraint.
+                        */
+                       expr = cookConstraint(pstate, cdef->raw_expr,
+                                                                 RelationGetRelationName(rel));
+               }
+               else
+               {
+                       Assert(cdef->cooked_expr != NULL);
 
-               /*
-                * No subplans or aggregates, either...
-                */
-               if (pstate->p_hasSubLinks)
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                                        errmsg("cannot use subquery in check constraint")));
-               if (pstate->p_hasAggs)
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_GROUPING_ERROR),
-                          errmsg("cannot use aggregate function in check constraint")));
+                       /*
+                        * Here, we assume the parser will only pass us valid CHECK
+                        * expressions, so we do no particular checking.
+                        */
+                       expr = stringToNode(cdef->cooked_expr);
+               }
 
                /*
                 * Check name uniqueness, or generate a name if none was given.
                 */
-               if (cdef->name != NULL)
+               if (cdef->conname != NULL)
                {
                        ListCell   *cell2;
 
-                       ccname = cdef->name;
-                       /* Check against pre-existing constraints */
-                       if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
-                                                                        RelationGetRelid(rel),
-                                                                        RelationGetNamespace(rel),
-                                                                        ccname))
-                               ereport(ERROR,
-                                               (errcode(ERRCODE_DUPLICATE_OBJECT),
-                               errmsg("constraint \"%s\" for relation \"%s\" already exists",
-                                          ccname, RelationGetRelationName(rel))));
+                       ccname = cdef->conname;
                        /* Check against other new constraints */
                        /* Needed because we don't do CommandCounterIncrement in loop */
                        foreach(cell2, checknames)
@@ -1583,6 +2094,19 @@ AddRelationRawConstraints(Relation rel,
                                                         errmsg("check constraint \"%s\" already exists",
                                                                        ccname)));
                        }
+
+                       /* save name for future checks */
+                       checknames = lappend(checknames, ccname);
+
+                       /*
+                        * Check against pre-existing constraints.      If we are allowed to
+                        * merge with an existing constraint, there's no more to do here.
+                        * (We omit the duplicate constraint from the result, which is
+                        * what ATAddCheckConstraint wants.)
+                        */
+                       if (MergeWithExistingConstraint(rel, ccname, expr,
+                                                                                       allow_merge, is_local))
+                               continue;
                }
                else
                {
@@ -1601,7 +2125,7 @@ AddRelationRawConstraints(Relation rel,
                        List       *vars;
                        char       *colname;
 
-                       vars = pull_var_clause(expr, false);
+                       vars = pull_var_clause(expr, PVC_REJECT_PLACEHOLDERS);
 
                        /* eliminate duplicates */
                        vars = list_union(NIL, vars);
@@ -1617,15 +2141,15 @@ AddRelationRawConstraints(Relation rel,
                                                                                  "check",
                                                                                  RelationGetNamespace(rel),
                                                                                  checknames);
-               }
 
-               /* save name for future checks */
-               checknames = lappend(checknames, ccname);
+                       /* save name for future checks */
+                       checknames = lappend(checknames, ccname);
+               }
 
                /*
                 * OK, store it.
                 */
-               StoreRelCheck(rel, ccname, nodeToString(expr));
+               StoreRelCheck(rel, ccname, expr, is_local, is_local ? 0 : 1);
 
                numchecks++;
 
@@ -1634,6 +2158,8 @@ AddRelationRawConstraints(Relation rel,
                cooked->name = ccname;
                cooked->attnum = 0;
                cooked->expr = expr;
+               cooked->is_local = is_local;
+               cooked->inhcount = is_local ? 0 : 1;
                cookedConstraints = lappend(cookedConstraints, cooked);
        }
 
@@ -1649,6 +2175,90 @@ AddRelationRawConstraints(Relation rel,
        return cookedConstraints;
 }
 
+/*
+ * Check for a pre-existing check constraint that conflicts with a proposed
+ * new one, and either adjust its conislocal/coninhcount settings or throw
+ * error as needed.
+ *
+ * Returns TRUE if merged (constraint is a duplicate), or FALSE if it's
+ * got a so-far-unique name, or throws error if conflict.
+ */
+static bool
+MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr,
+                                                       bool allow_merge, bool is_local)
+{
+       bool            found;
+       Relation        conDesc;
+       SysScanDesc conscan;
+       ScanKeyData skey[2];
+       HeapTuple       tup;
+
+       /* Search for a pg_constraint entry with same name and relation */
+       conDesc = heap_open(ConstraintRelationId, RowExclusiveLock);
+
+       found = false;
+
+       ScanKeyInit(&skey[0],
+                               Anum_pg_constraint_conname,
+                               BTEqualStrategyNumber, F_NAMEEQ,
+                               CStringGetDatum(ccname));
+
+       ScanKeyInit(&skey[1],
+                               Anum_pg_constraint_connamespace,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(RelationGetNamespace(rel)));
+
+       conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
+                                                                SnapshotNow, 2, skey);
+
+       while (HeapTupleIsValid(tup = systable_getnext(conscan)))
+       {
+               Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tup);
+
+               if (con->conrelid == RelationGetRelid(rel))
+               {
+                       /* Found it.  Conflicts if not identical check constraint */
+                       if (con->contype == CONSTRAINT_CHECK)
+                       {
+                               Datum           val;
+                               bool            isnull;
+
+                               val = fastgetattr(tup,
+                                                                 Anum_pg_constraint_conbin,
+                                                                 conDesc->rd_att, &isnull);
+                               if (isnull)
+                                       elog(ERROR, "null conbin for rel %s",
+                                                RelationGetRelationName(rel));
+                               if (equal(expr, stringToNode(TextDatumGetCString(val))))
+                                       found = true;
+                       }
+                       if (!found || !allow_merge)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_DUPLICATE_OBJECT),
+                               errmsg("constraint \"%s\" for relation \"%s\" already exists",
+                                          ccname, RelationGetRelationName(rel))));
+                       /* OK to update the tuple */
+                       ereport(NOTICE,
+                          (errmsg("merging constraint \"%s\" with inherited definition",
+                                          ccname)));
+                       tup = heap_copytuple(tup);
+                       con = (Form_pg_constraint) GETSTRUCT(tup);
+                       if (is_local)
+                               con->conislocal = true;
+                       else
+                               con->coninhcount++;
+                       simple_heap_update(conDesc, &tup->t_self, tup);
+                       CatalogUpdateIndexes(conDesc, tup);
+                       break;
+               }
+       }
+
+       systable_endscan(conscan);
+       heap_close(conDesc, RowExclusiveLock);
+
+       return found;
+}
+
 /*
  * Update the count of constraints in the relation's pg_class tuple.
  *
@@ -1667,9 +2277,8 @@ SetRelationNumChecks(Relation rel, int numchecks)
        Form_pg_class relStruct;
 
        relrel = heap_open(RelationRelationId, RowExclusiveLock);
-       reltup = SearchSysCacheCopy(RELOID,
-                                                               ObjectIdGetDatum(RelationGetRelid(rel)),
-                                                               0, 0, 0);
+       reltup = SearchSysCacheCopy1(RELOID,
+                                                                ObjectIdGetDatum(RelationGetRelid(rel)));
        if (!HeapTupleIsValid(reltup))
                elog(ERROR, "cache lookup failed for relation %u",
                         RelationGetRelid(rel));
@@ -1749,11 +2358,15 @@ cookDefault(ParseState *pstate,
                ereport(ERROR,
                                (errcode(ERRCODE_GROUPING_ERROR),
                         errmsg("cannot use aggregate function in default expression")));
+       if (pstate->p_hasWindowFuncs)
+               ereport(ERROR,
+                               (errcode(ERRCODE_WINDOWING_ERROR),
+                                errmsg("cannot use window function in default expression")));
 
        /*
         * Coerce the expression to the correct type and typmod, if given. This
         * should match the parser's processing of non-defaulted expressions ---
-        * see updateTargetListEntry().
+        * see transformAssignedExpr().
         */
        if (OidIsValid(atttypid))
        {
@@ -1762,7 +2375,8 @@ cookDefault(ParseState *pstate,
                expr = coerce_to_target_type(pstate, expr, type_id,
                                                                         atttypid, atttypmod,
                                                                         COERCION_ASSIGNMENT,
-                                                                        COERCE_IMPLICIT_CAST);
+                                                                        COERCE_IMPLICIT_CAST,
+                                                                        -1);
                if (expr == NULL)
                        ereport(ERROR,
                                        (errcode(ERRCODE_DATATYPE_MISMATCH),
@@ -1774,73 +2388,76 @@ cookDefault(ParseState *pstate,
                           errhint("You will need to rewrite or cast the expression.")));
        }
 
+       /*
+        * Finally, take care of collations in the finished expression.
+        */
+       assign_expr_collations(pstate, expr);
+
        return expr;
 }
 
-
 /*
- * Removes all constraints on a relation that match the given name.
- *
- * It is the responsibility of the calling function to acquire a suitable
- * lock on the relation.
+ * Take a raw CHECK constraint expression and convert it to a cooked format
+ * ready for storage.
  *
- * Returns: The number of constraints removed.
+ * Parse state must be set up to recognize any vars that might appear
+ * in the expression.
  */
-int
-RemoveRelConstraints(Relation rel, const char *constrName,
-                                        DropBehavior behavior)
+static Node *
+cookConstraint(ParseState *pstate,
+                          Node *raw_constraint,
+                          char *relname)
 {
-       int                     ndeleted = 0;
-       Relation        conrel;
-       SysScanDesc conscan;
-       ScanKeyData key[1];
-       HeapTuple       contup;
-
-       /* Grab an appropriate lock on the pg_constraint relation */
-       conrel = heap_open(ConstraintRelationId, RowExclusiveLock);
-
-       /* Use the index to scan only constraints of the target relation */
-       ScanKeyInit(&key[0],
-                               Anum_pg_constraint_conrelid,
-                               BTEqualStrategyNumber, F_OIDEQ,
-                               ObjectIdGetDatum(RelationGetRelid(rel)));
-
-       conscan = systable_beginscan(conrel, ConstraintRelidIndexId, true,
-                                                                SnapshotNow, 1, key);
+       Node       *expr;
 
        /*
-        * Scan over the result set, removing any matching entries.
+        * Transform raw parsetree to executable expression.
         */
-       while ((contup = systable_getnext(conscan)) != NULL)
-       {
-               Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(contup);
-
-               if (strcmp(NameStr(con->conname), constrName) == 0)
-               {
-                       ObjectAddress conobj;
+       expr = transformExpr(pstate, raw_constraint);
 
-                       conobj.classId = ConstraintRelationId;
-                       conobj.objectId = HeapTupleGetOid(contup);
-                       conobj.objectSubId = 0;
+       /*
+        * Make sure it yields a boolean result.
+        */
+       expr = coerce_to_boolean(pstate, expr, "CHECK");
 
-                       performDeletion(&conobj, behavior);
+       /*
+        * Take care of collations.
+        */
+       assign_expr_collations(pstate, expr);
 
-                       ndeleted++;
-               }
-       }
+       /*
+        * Make sure no outside relations are referred to.
+        */
+       if (list_length(pstate->p_rtable) != 1)
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+                       errmsg("only table \"%s\" can be referenced in check constraint",
+                                  relname)));
 
-       /* Clean up after the scan */
-       systable_endscan(conscan);
-       heap_close(conrel, RowExclusiveLock);
+       /*
+        * No subplans or aggregates, either...
+        */
+       if (pstate->p_hasSubLinks)
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("cannot use subquery in check constraint")));
+       if (pstate->p_hasAggs)
+               ereport(ERROR,
+                               (errcode(ERRCODE_GROUPING_ERROR),
+                          errmsg("cannot use aggregate function in check constraint")));
+       if (pstate->p_hasWindowFuncs)
+               ereport(ERROR,
+                               (errcode(ERRCODE_WINDOWING_ERROR),
+                                errmsg("cannot use window function in check constraint")));
 
-       return ndeleted;
+       return expr;
 }
 
 
 /*
  * RemoveStatistics --- remove entries in pg_statistic for a rel or column
  *
- * If attnum is zero, remove all entries for rel; else remove only the one
+ * If attnum is zero, remove all entries for rel; else remove only the one(s)
  * for that column.
  */
 void
@@ -1870,9 +2487,10 @@ RemoveStatistics(Oid relid, AttrNumber attnum)
                nkeys = 2;
        }
 
-       scan = systable_beginscan(pgstatistic, StatisticRelidAttnumIndexId, true,
+       scan = systable_beginscan(pgstatistic, StatisticRelidAttnumInhIndexId, true,
                                                          SnapshotNow, nkeys, key);
 
+       /* we must loop even when attnum != 0, in case of inherited stats */
        while (HeapTupleIsValid(tuple = systable_getnext(scan)))
                simple_heap_delete(pgstatistic, &tuple->t_self);
 
@@ -1883,24 +2501,17 @@ RemoveStatistics(Oid relid, AttrNumber attnum)
 
 
 /*
- * RelationTruncateIndexes - truncate all
- * indexes associated with the heap relation to zero tuples.
+ * RelationTruncateIndexes - truncate all indexes associated
+ * with the heap relation to zero tuples.
  *
  * The routine will truncate and then reconstruct the indexes on
- * the relation specified by the heapId parameter.
+ * the specified relation.     Caller must hold exclusive lock on rel.
  */
 static void
-RelationTruncateIndexes(Oid heapId)
+RelationTruncateIndexes(Relation heapRelation)
 {
-       Relation        heapRelation;
        ListCell   *indlist;
 
-       /*
-        * Open the heap rel.  We need grab no lock because we assume
-        * heap_truncate is holding an exclusive lock on the heap rel.
-        */
-       heapRelation = heap_open(heapId, NoLock);
-
        /* Ask the relcache to produce a list of the indexes of the rel */
        foreach(indlist, RelationGetIndexList(heapRelation))
        {
@@ -1908,28 +2519,24 @@ RelationTruncateIndexes(Oid heapId)
                Relation        currentIndex;
                IndexInfo  *indexInfo;
 
-               /* Open the index relation */
-               currentIndex = index_open(indexId);
-
-               /* Obtain exclusive lock on it, just to be sure */
-               LockRelation(currentIndex, AccessExclusiveLock);
+               /* Open the index relation; use exclusive lock, just to be sure */
+               currentIndex = index_open(indexId, AccessExclusiveLock);
 
                /* Fetch info needed for index_build */
                indexInfo = BuildIndexInfo(currentIndex);
 
-               /* Now truncate the actual file (and discard buffers) */
+               /*
+                * Now truncate the actual file (and discard buffers).
+                */
                RelationTruncate(currentIndex, 0);
 
                /* Initialize the index and rebuild */
-               /* Note: we do not need to re-establish pkey or toast settings */
-               index_build(heapRelation, currentIndex, indexInfo, false, false);
+               /* Note: we do not need to re-establish pkey setting */
+               index_build(heapRelation, currentIndex, indexInfo, false);
 
                /* We're done with this index */
-               index_close(currentIndex);
+               index_close(currentIndex, NoLock);
        }
-
-       /* And now done with the heap; but keep lock until commit */
-       heap_close(heapRelation, NoLock);
 }
 
 /*
@@ -1952,18 +2559,9 @@ heap_truncate(List *relids)
        {
                Oid                     rid = lfirst_oid(cell);
                Relation        rel;
-               Oid                     toastrelid;
 
                rel = heap_open(rid, AccessExclusiveLock);
                relations = lappend(relations, rel);
-
-               /* If there is a toast table, add it to the list too */
-               toastrelid = rel->rd_rel->reltoastrelid;
-               if (OidIsValid(toastrelid))
-               {
-                       rel = heap_open(toastrelid, AccessExclusiveLock);
-                       relations = lappend(relations, rel);
-               }
        }
 
        /* Don't allow truncate on tables that are referenced by foreign keys */
@@ -1974,23 +2572,51 @@ heap_truncate(List *relids)
        {
                Relation        rel = lfirst(cell);
 
-               /* Truncate the actual file (and discard buffers) */
-               RelationTruncate(rel, 0);
-
-               /* If this relation has indexes, truncate the indexes too */
-               RelationTruncateIndexes(RelationGetRelid(rel));
+               /* Truncate the relation */
+               heap_truncate_one_rel(rel);
 
-               /*
-                * Close the relation, but keep exclusive lock on it until commit.
-                */
+               /* Close the relation, but keep exclusive lock on it until commit */
                heap_close(rel, NoLock);
        }
 }
 
+/*
+ *      heap_truncate_one_rel
+ *
+ *      This routine deletes all data within the specified relation.
+ *
+ * This is not transaction-safe, because the truncation is done immediately
+ * and cannot be rolled back later.  Caller is responsible for having
+ * checked permissions etc, and must have obtained AccessExclusiveLock.
+ */
+void
+heap_truncate_one_rel(Relation rel)
+{
+       Oid                     toastrelid;
+
+       /* Truncate the actual file (and discard buffers) */
+       RelationTruncate(rel, 0);
+
+       /* If the relation has indexes, truncate the indexes too */
+       RelationTruncateIndexes(rel);
+
+       /* If there is a toast table, truncate that too */
+       toastrelid = rel->rd_rel->reltoastrelid;
+       if (OidIsValid(toastrelid))
+       {
+               Relation        toastrel = heap_open(toastrelid, AccessExclusiveLock);
+
+               RelationTruncate(toastrel, 0);
+               RelationTruncateIndexes(toastrel);
+               /* keep the lock... */
+               heap_close(toastrel, NoLock);
+       }
+}
+
 /*
  * heap_truncate_check_FKs
  *             Check for foreign keys referencing a list of relations that
- *             are to be truncated
+ *             are to be truncated, and raise error if there are any
  *
  * We disallow such FKs (except self-referential ones) since the whole point
  * of TRUNCATE is to not scan the individual rows to be thrown away.
@@ -2004,10 +2630,8 @@ void
 heap_truncate_check_FKs(List *relations, bool tempTables)
 {
        List       *oids = NIL;
+       List       *dependents;
        ListCell   *cell;
-       Relation        fkeyRel;
-       SysScanDesc fkeyScan;
-       HeapTuple       tuple;
 
        /*
         * Build a list of OIDs of the interesting relations.
@@ -2019,7 +2643,7 @@ heap_truncate_check_FKs(List *relations, bool tempTables)
        {
                Relation        rel = lfirst(cell);
 
-               if (rel->rd_rel->reltriggers != 0)
+               if (rel->rd_rel->relhastriggers)
                        oids = lappend_oid(oids, RelationGetRelid(rel));
        }
 
@@ -2030,68 +2654,68 @@ heap_truncate_check_FKs(List *relations, bool tempTables)
                return;
 
        /*
-        * Otherwise, must scan pg_constraint.  Right now, it is a seqscan because
-        * there is no available index on confrelid.
+        * Otherwise, must scan pg_constraint.  We make one pass with all the
+        * relations considered; if this finds nothing, then all is well.
         */
-       fkeyRel = heap_open(ConstraintRelationId, AccessShareLock);
-
-       fkeyScan = systable_beginscan(fkeyRel, InvalidOid, false,
-                                                                 SnapshotNow, 0, NULL);
+       dependents = heap_truncate_find_FKs(oids);
+       if (dependents == NIL)
+               return;
 
-       while (HeapTupleIsValid(tuple = systable_getnext(fkeyScan)))
+       /*
+        * Otherwise we repeat the scan once per relation to identify a particular
+        * pair of relations to complain about.  This is pretty slow, but
+        * performance shouldn't matter much in a failure path.  The reason for
+        * doing things this way is to ensure that the message produced is not
+        * dependent on chance row locations within pg_constraint.
+        */
+       foreach(cell, oids)
        {
-               Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
-
-               /* Not a foreign key */
-               if (con->contype != CONSTRAINT_FOREIGN)
-                       continue;
+               Oid                     relid = lfirst_oid(cell);
+               ListCell   *cell2;
 
-               /* Not referencing one of our list of tables */
-               if (!list_member_oid(oids, con->confrelid))
-                       continue;
+               dependents = heap_truncate_find_FKs(list_make1_oid(relid));
 
-               /* The referencer should be in our list too */
-               if (!list_member_oid(oids, con->conrelid))
+               foreach(cell2, dependents)
                {
-                       if (tempTables)
-                               ereport(ERROR,
-                                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                                errmsg("unsupported ON COMMIT and foreign key combination"),
-                                                errdetail("Table \"%s\" references \"%s\" via foreign key constraint \"%s\", but they do not have the same ON COMMIT setting.",
-                                                                  get_rel_name(con->conrelid),
-                                                                  get_rel_name(con->confrelid),
-                                                                  NameStr(con->conname))));
-                       else
-                               ereport(ERROR,
-                                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                                                errmsg("cannot truncate a table referenced in a foreign key constraint"),
-                                                errdetail("Table \"%s\" references \"%s\" via foreign key constraint \"%s\".",
-                                                                  get_rel_name(con->conrelid),
-                                                                  get_rel_name(con->confrelid),
-                                                                  NameStr(con->conname)),
-                                                errhint("Truncate table \"%s\" at the same time, "
-                                                                "or use TRUNCATE ... CASCADE.",
-                                                                get_rel_name(con->conrelid))));
+                       Oid                     relid2 = lfirst_oid(cell2);
+
+                       if (!list_member_oid(oids, relid2))
+                       {
+                               char       *relname = get_rel_name(relid);
+                               char       *relname2 = get_rel_name(relid2);
+
+                               if (tempTables)
+                                       ereport(ERROR,
+                                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                                        errmsg("unsupported ON COMMIT and foreign key combination"),
+                                                        errdetail("Table \"%s\" references \"%s\", but they do not have the same ON COMMIT setting.",
+                                                                          relname2, relname)));
+                               else
+                                       ereport(ERROR,
+                                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                                        errmsg("cannot truncate a table referenced in a foreign key constraint"),
+                                                        errdetail("Table \"%s\" references \"%s\".",
+                                                                          relname2, relname),
+                                                  errhint("Truncate table \"%s\" at the same time, "
+                                                                  "or use TRUNCATE ... CASCADE.",
+                                                                  relname2)));
+                       }
                }
        }
-
-       systable_endscan(fkeyScan);
-       heap_close(fkeyRel, AccessShareLock);
 }
 
 /*
  * heap_truncate_find_FKs
- *             Find relations having foreign keys referencing any relations that
- *             are to be truncated
+ *             Find relations having foreign keys referencing any of the given rels
  *
- * This is almost the same code as heap_truncate_check_FKs, but we don't
- * raise an error if we find such relations; instead we return a list of
- * their OIDs.  Also note that the input is a list of OIDs not a list
- * of Relations.  The result list does *not* include any rels that are
- * already in the input list.
+ * Input and result are both lists of relation OIDs.  The result contains
+ * no duplicates, does *not* include any rels that were already in the input
+ * list, and is sorted in OID order.  (The last property is enforced mainly
+ * to guarantee consistent behavior in the regression tests; we don't want
+ * behavior to change depending on chance locations of rows in pg_constraint.)
  *
- * Note: caller should already have exclusive lock on all rels mentioned
- * in relationIds.  Since adding or dropping an FK requires exclusive lock
+ * Note: caller should already have appropriate lock on all rels mentioned
+ * in relationIds.     Since adding or dropping an FK requires exclusive lock
  * on both rels, this ensures that the answer will be stable.
  */
 List *
@@ -2103,8 +2727,8 @@ heap_truncate_find_FKs(List *relationIds)
        HeapTuple       tuple;
 
        /*
-        * Must scan pg_constraint.  Right now, it is a seqscan because
-        * there is no available index on confrelid.
+        * Must scan pg_constraint.  Right now, it is a seqscan because there is
+        * no available index on confrelid.
         */
        fkeyRel = heap_open(ConstraintRelationId, AccessShareLock);
 
@@ -2125,7 +2749,7 @@ heap_truncate_find_FKs(List *relationIds)
 
                /* Add referencer unless already in input or result list */
                if (!list_member_oid(relationIds, con->conrelid))
-                       result = list_append_unique_oid(result, con->conrelid);
+                       result = insert_ordered_unique_oid(result, con->conrelid);
        }
 
        systable_endscan(fkeyScan);
@@ -2133,3 +2757,43 @@ heap_truncate_find_FKs(List *relationIds)
 
        return result;
 }
+
+/*
+ * insert_ordered_unique_oid
+ *             Insert a new Oid into a sorted list of Oids, preserving ordering,
+ *             and eliminating duplicates
+ *
+ * Building the ordered list this way is O(N^2), but with a pretty small
+ * constant, so for the number of entries we expect it will probably be
+ * faster than trying to apply qsort().  It seems unlikely someone would be
+ * trying to truncate a table with thousands of dependent tables ...
+ */
+static List *
+insert_ordered_unique_oid(List *list, Oid datum)
+{
+       ListCell   *prev;
+
+       /* Does the datum belong at the front? */
+       if (list == NIL || datum < linitial_oid(list))
+               return lcons_oid(datum, list);
+       /* Does it match the first entry? */
+       if (datum == linitial_oid(list))
+               return list;                    /* duplicate, so don't insert */
+       /* No, so find the entry it belongs after */
+       prev = list_head(list);
+       for (;;)
+       {
+               ListCell   *curr = lnext(prev);
+
+               if (curr == NULL || datum < lfirst_oid(curr))
+                       break;                          /* it belongs after 'prev', before 'curr' */
+
+               if (datum == lfirst_oid(curr))
+                       return list;            /* duplicate, so don't insert */
+
+               prev = curr;
+       }
+       /* Insert datum into list after 'prev' */
+       lappend_cell_oid(list, prev, datum);
+       return list;
+}