]> granicus.if.org Git - postgresql/blobdiff - src/backend/catalog/heap.c
Revise collation derivation method and expression-tree representation.
[postgresql] / src / backend / catalog / heap.c
index 2ee79aef9f0c40a21df9ff2aa97b309fdcde3e30..567eb7fd6ebb42acff259e71235c651f2440a3a6 100644 (file)
@@ -3,12 +3,12 @@
  * heap.c
  *       code to create and destroy POSTGRES heap relations
  *
- * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/catalog/heap.c,v 1.331 2008/03/26 21:10:37 alvherre Exp $
+ *       src/backend/catalog/heap.c
  *
  *
  * INTERFACE ROUTINES
@@ -31,6 +31,7 @@
 
 #include "access/genam.h"
 #include "access/heapam.h"
+#include "access/sysattr.h"
 #include "access/transam.h"
 #include "access/xact.h"
 #include "catalog/catalog.h"
 #include "catalog/heap.h"
 #include "catalog/index.h"
 #include "catalog/indexing.h"
+#include "catalog/namespace.h"
+#include "catalog/objectaccess.h"
 #include "catalog/pg_attrdef.h"
+#include "catalog/pg_collation.h"
 #include "catalog/pg_constraint.h"
+#include "catalog/pg_foreign_table.h"
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_namespace.h"
 #include "catalog/pg_statistic.h"
 #include "catalog/pg_tablespace.h"
 #include "catalog/pg_type.h"
+#include "catalog/pg_type_fn.h"
+#include "catalog/storage.h"
 #include "commands/tablecmds.h"
 #include "commands/typecmds.h"
 #include "miscadmin.h"
-#include "optimizer/clauses.h"
+#include "nodes/nodeFuncs.h"
 #include "optimizer/var.h"
 #include "parser/parse_coerce.h"
+#include "parser/parse_collate.h"
 #include "parser/parse_expr.h"
 #include "parser/parse_relation.h"
+#include "storage/bufmgr.h"
+#include "storage/freespace.h"
 #include "storage/smgr.h"
+#include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
 #include "utils/inval.h"
 #include "utils/tqual.h"
 
 
+/* Potentially set by contrib/pg_upgrade_support functions */
+Oid                    binary_upgrade_next_heap_pg_class_oid = InvalidOid;
+Oid                    binary_upgrade_next_toast_pg_class_oid = InvalidOid;
+
 static void AddNewRelationTuple(Relation pg_class_desc,
                                        Relation new_rel_desc,
-                                       Oid new_rel_oid, Oid new_type_oid,
+                                       Oid new_rel_oid,
+                                       Oid new_type_oid,
+                                       Oid reloftype,
                                        Oid relowner,
                                        char relkind,
+                                       Datum relacl,
                                        Datum reloptions);
 static Oid AddNewRelationType(const char *typeName,
                                   Oid typeNamespace,
                                   Oid new_rel_oid,
                                   char new_rel_kind,
+                                  Oid ownerid,
+                                  Oid new_row_type,
                                   Oid new_array_type);
 static void RelationRemoveInheritance(Oid relid);
-static void StoreRelCheck(Relation rel, char *ccname, char *ccbin);
-static void StoreConstraints(Relation rel, TupleDesc tupdesc);
+static void StoreRelCheck(Relation rel, char *ccname, Node *expr,
+                         bool is_local, int inhcount);
+static void StoreConstraints(Relation rel, List *cooked_constraints);
+static bool MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr,
+                                                       bool allow_merge, bool is_local);
 static void SetRelationNumChecks(Relation rel, int numchecks);
+static Node *cookConstraint(ParseState *pstate,
+                          Node *raw_constraint,
+                          char *relname);
 static List *insert_ordered_unique_oid(List *list, Oid datum);
 
 
@@ -98,6 +124,12 @@ static List *insert_ordered_unique_oid(List *list, Oid datum);
  *             Disadvantage:  special cases will be all over the place.
  */
 
+/*
+ * The initializers below do not include the attoptions or attacl fields,
+ * but that's OK - we're never going to reference anything beyond the
+ * fixed-size portion of the structure anyway.
+ */
+
 static FormData_pg_attribute a1 = {
        0, {"ctid"}, TIDOID, 0, sizeof(ItemPointerData),
        SelfItemPointerAttributeNumber, 0, -1, -1,
@@ -209,7 +241,9 @@ heap_create(const char *relname,
                        Oid relid,
                        TupleDesc tupDesc,
                        char relkind,
+                       char relpersistence,
                        bool shared_relation,
+                       bool mapped_relation,
                        bool allow_system_table_mods)
 {
        bool            create_storage;
@@ -238,6 +272,7 @@ heap_create(const char *relname,
        {
                case RELKIND_VIEW:
                case RELKIND_COMPOSITE_TYPE:
+               case RELKIND_FOREIGN_TABLE:
                        create_storage = false;
 
                        /*
@@ -280,16 +315,20 @@ heap_create(const char *relname,
                                                                         tupDesc,
                                                                         relid,
                                                                         reltablespace,
-                                                                        shared_relation);
+                                                                        shared_relation,
+                                                                        mapped_relation,
+                                                                        relpersistence);
 
        /*
-        * have the storage manager create the relation's disk file, if needed.
+        * Have the storage manager create the relation's disk file, if needed.
+        *
+        * We only create the main fork here, other forks will be created on
+        * demand.
         */
        if (create_storage)
        {
-               Assert(rel->rd_smgr == NULL);
                RelationOpenSmgr(rel);
-               smgrcreate(rel->rd_smgr, rel->rd_istemp, false);
+               RelationCreateStorage(rel->rd_node, relpersistence);
        }
 
        return rel;
@@ -335,7 +374,8 @@ heap_create(const char *relname,
  * --------------------------------
  */
 void
-CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind)
+CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind,
+                                                bool allow_system_table_mods)
 {
        int                     i;
        int                     j;
@@ -389,7 +429,9 @@ CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind)
        for (i = 0; i < natts; i++)
        {
                CheckAttributeType(NameStr(tupdesc->attrs[i]->attname),
-                                                  tupdesc->attrs[i]->atttypid);
+                                                  tupdesc->attrs[i]->atttypid,
+                                                  tupdesc->attrs[i]->attcollation,
+                                                  allow_system_table_mods);
        }
 }
 
@@ -402,7 +444,8 @@ CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind)
  * --------------------------------
  */
 void
-CheckAttributeType(const char *attname, Oid atttypid)
+CheckAttributeType(const char *attname, Oid atttypid, Oid attcollation,
+                                  bool allow_system_table_mods)
 {
        char            att_typtype = get_typtype(atttypid);
 
@@ -421,9 +464,11 @@ CheckAttributeType(const char *attname, Oid atttypid)
        {
                /*
                 * Refuse any attempt to create a pseudo-type column, except for a
-                * special hack for pg_statistic: allow ANYARRAY during initdb
+                * special hack for pg_statistic: allow ANYARRAY when modifying system
+                * catalogs (this allows creating pg_statistic and cloning it during
+                * VACUUM FULL)
                 */
-               if (atttypid != ANYARRAYOID || IsUnderPostmaster)
+               if (atttypid != ANYARRAYOID || !allow_system_table_mods)
                        ereport(ERROR,
                                        (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
                                         errmsg("column \"%s\" has pseudo-type %s",
@@ -450,11 +495,85 @@ CheckAttributeType(const char *attname, Oid atttypid)
 
                        if (attr->attisdropped)
                                continue;
-                       CheckAttributeType(NameStr(attr->attname), attr->atttypid);
+                       CheckAttributeType(NameStr(attr->attname), attr->atttypid, attr->attcollation,
+                                                          allow_system_table_mods);
                }
 
                relation_close(relation, AccessShareLock);
        }
+
+       /*
+        * This might not be strictly invalid per SQL standard, but it is
+        * pretty useless, and it cannot be dumped, so we must disallow
+        * it.
+        */
+       if (type_is_collatable(atttypid) && !OidIsValid(attcollation))
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+                                        errmsg("no collation was derived for column \"%s\" with collatable type %s",
+                                                       attname, format_type_be(atttypid)),
+                                        errhint("Use the COLLATE clause to set the collation explicitly.")));
+}
+
+/*
+ * InsertPgAttributeTuple
+ *             Construct and insert a new tuple in pg_attribute.
+ *
+ * Caller has already opened and locked pg_attribute.  new_attribute is the
+ * attribute to insert (but we ignore attacl and attoptions, which are always
+ * initialized to NULL).
+ *
+ * indstate is the index state for CatalogIndexInsert. It can be passed as
+ * NULL, in which case we'll fetch the necessary info.  (Don't do this when
+ * inserting multiple attributes, because it's a tad more expensive.)
+ */
+void
+InsertPgAttributeTuple(Relation pg_attribute_rel,
+                                          Form_pg_attribute new_attribute,
+                                          CatalogIndexState indstate)
+{
+       Datum           values[Natts_pg_attribute];
+       bool            nulls[Natts_pg_attribute];
+       HeapTuple       tup;
+
+       /* This is a tad tedious, but way cleaner than what we used to do... */
+       memset(values, 0, sizeof(values));
+       memset(nulls, false, sizeof(nulls));
+
+       values[Anum_pg_attribute_attrelid - 1] = ObjectIdGetDatum(new_attribute->attrelid);
+       values[Anum_pg_attribute_attname - 1] = NameGetDatum(&new_attribute->attname);
+       values[Anum_pg_attribute_atttypid - 1] = ObjectIdGetDatum(new_attribute->atttypid);
+       values[Anum_pg_attribute_attstattarget - 1] = Int32GetDatum(new_attribute->attstattarget);
+       values[Anum_pg_attribute_attlen - 1] = Int16GetDatum(new_attribute->attlen);
+       values[Anum_pg_attribute_attnum - 1] = Int16GetDatum(new_attribute->attnum);
+       values[Anum_pg_attribute_attndims - 1] = Int32GetDatum(new_attribute->attndims);
+       values[Anum_pg_attribute_attcacheoff - 1] = Int32GetDatum(new_attribute->attcacheoff);
+       values[Anum_pg_attribute_atttypmod - 1] = Int32GetDatum(new_attribute->atttypmod);
+       values[Anum_pg_attribute_attbyval - 1] = BoolGetDatum(new_attribute->attbyval);
+       values[Anum_pg_attribute_attstorage - 1] = CharGetDatum(new_attribute->attstorage);
+       values[Anum_pg_attribute_attalign - 1] = CharGetDatum(new_attribute->attalign);
+       values[Anum_pg_attribute_attnotnull - 1] = BoolGetDatum(new_attribute->attnotnull);
+       values[Anum_pg_attribute_atthasdef - 1] = BoolGetDatum(new_attribute->atthasdef);
+       values[Anum_pg_attribute_attisdropped - 1] = BoolGetDatum(new_attribute->attisdropped);
+       values[Anum_pg_attribute_attislocal - 1] = BoolGetDatum(new_attribute->attislocal);
+       values[Anum_pg_attribute_attinhcount - 1] = Int32GetDatum(new_attribute->attinhcount);
+       values[Anum_pg_attribute_attcollation - 1] = ObjectIdGetDatum(new_attribute->attcollation);
+
+       /* start out with empty permissions and empty options */
+       nulls[Anum_pg_attribute_attacl - 1] = true;
+       nulls[Anum_pg_attribute_attoptions - 1] = true;
+
+       tup = heap_form_tuple(RelationGetDescr(pg_attribute_rel), values, nulls);
+
+       /* finally insert the new tuple, update the indexes, and clean up */
+       simple_heap_insert(pg_attribute_rel, tup);
+
+       if (indstate != NULL)
+               CatalogIndexInsert(indstate, tup);
+       else
+               CatalogUpdateIndexes(pg_attribute_rel, tup);
+
+       heap_freetuple(tup);
 }
 
 /* --------------------------------
@@ -471,9 +590,8 @@ AddNewAttributeTuples(Oid new_rel_oid,
                                          bool oidislocal,
                                          int oidinhcount)
 {
-       const Form_pg_attribute *dpp;
+       Form_pg_attribute attr;
        int                     i;
-       HeapTuple       tup;
        Relation        rel;
        CatalogIndexState indstate;
        int                     natts = tupdesc->natts;
@@ -491,35 +609,33 @@ AddNewAttributeTuples(Oid new_rel_oid,
         * First we add the user attributes.  This is also a convenient place to
         * add dependencies on their datatypes.
         */
-       dpp = tupdesc->attrs;
        for (i = 0; i < natts; i++)
        {
+               attr = tupdesc->attrs[i];
                /* Fill in the correct relation OID */
-               (*dpp)->attrelid = new_rel_oid;
+               attr->attrelid = new_rel_oid;
                /* Make sure these are OK, too */
-               (*dpp)->attstattarget = -1;
-               (*dpp)->attcacheoff = -1;
-
-               tup = heap_addheader(Natts_pg_attribute,
-                                                        false,
-                                                        ATTRIBUTE_TUPLE_SIZE,
-                                                        (void *) *dpp);
-
-               simple_heap_insert(rel, tup);
+               attr->attstattarget = -1;
+               attr->attcacheoff = -1;
 
-               CatalogIndexInsert(indstate, tup);
-
-               heap_freetuple(tup);
+               InsertPgAttributeTuple(rel, attr, indstate);
 
+               /* Add dependency info */
                myself.classId = RelationRelationId;
                myself.objectId = new_rel_oid;
                myself.objectSubId = i + 1;
                referenced.classId = TypeRelationId;
-               referenced.objectId = (*dpp)->atttypid;
+               referenced.objectId = attr->atttypid;
                referenced.objectSubId = 0;
                recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
 
-               dpp++;
+               if (OidIsValid(attr->attcollation))
+               {
+                       referenced.classId = CollationRelationId;
+                       referenced.objectId = attr->attcollation;
+                       referenced.objectSubId = 0;
+                       recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+               }
        }
 
        /*
@@ -529,44 +645,28 @@ AddNewAttributeTuples(Oid new_rel_oid,
         */
        if (relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE)
        {
-               dpp = SysAtt;
-               for (i = 0; i < -1 - FirstLowInvalidHeapAttributeNumber; i++)
+               for (i = 0; i < (int) lengthof(SysAtt); i++)
                {
-                       if (tupdesc->tdhasoid ||
-                               (*dpp)->attnum != ObjectIdAttributeNumber)
-                       {
-                               Form_pg_attribute attStruct;
-
-                               tup = heap_addheader(Natts_pg_attribute,
-                                                                        false,
-                                                                        ATTRIBUTE_TUPLE_SIZE,
-                                                                        (void *) *dpp);
-                               attStruct = (Form_pg_attribute) GETSTRUCT(tup);
-
-                               /* Fill in the correct relation OID in the copied tuple */
-                               attStruct->attrelid = new_rel_oid;
-
-                               /* Fill in correct inheritance info for the OID column */
-                               if (attStruct->attnum == ObjectIdAttributeNumber)
-                               {
-                                       attStruct->attislocal = oidislocal;
-                                       attStruct->attinhcount = oidinhcount;
-                               }
+                       FormData_pg_attribute attStruct;
 
-                               /*
-                                * Unneeded since they should be OK in the constant data
-                                * anyway
-                                */
-                               /* attStruct->attstattarget = 0; */
-                               /* attStruct->attcacheoff = -1; */
+                       /* skip OID where appropriate */
+                       if (!tupdesc->tdhasoid &&
+                               SysAtt[i]->attnum == ObjectIdAttributeNumber)
+                               continue;
 
-                               simple_heap_insert(rel, tup);
+                       memcpy(&attStruct, (char *) SysAtt[i], sizeof(FormData_pg_attribute));
 
-                               CatalogIndexInsert(indstate, tup);
+                       /* Fill in the correct relation OID in the copied tuple */
+                       attStruct.attrelid = new_rel_oid;
 
-                               heap_freetuple(tup);
+                       /* Fill in correct inheritance info for the OID column */
+                       if (attStruct.attnum == ObjectIdAttributeNumber)
+                       {
+                               attStruct.attislocal = oidislocal;
+                               attStruct.attinhcount = oidinhcount;
                        }
-                       dpp++;
+
+                       InsertPgAttributeTuple(rel, &attStruct, indstate);
                }
        }
 
@@ -586,28 +686,31 @@ AddNewAttributeTuples(Oid new_rel_oid,
  * Caller has already opened and locked pg_class.
  * Tuple data is taken from new_rel_desc->rd_rel, except for the
  * variable-width fields which are not present in a cached reldesc.
- * We always initialize relacl to NULL (i.e., default permissions),
- * and reloptions is set to the passed-in text array (if any).
+ * relacl and reloptions are passed in Datum form (to avoid having
+ * to reference the data types in heap.h).     Pass (Datum) 0 to set them
+ * to NULL.
  * --------------------------------
  */
 void
 InsertPgClassTuple(Relation pg_class_desc,
                                   Relation new_rel_desc,
                                   Oid new_rel_oid,
+                                  Datum relacl,
                                   Datum reloptions)
 {
        Form_pg_class rd_rel = new_rel_desc->rd_rel;
        Datum           values[Natts_pg_class];
-       char            nulls[Natts_pg_class];
+       bool            nulls[Natts_pg_class];
        HeapTuple       tup;
 
        /* This is a tad tedious, but way cleaner than what we used to do... */
        memset(values, 0, sizeof(values));
-       memset(nulls, ' ', sizeof(nulls));
+       memset(nulls, false, sizeof(nulls));
 
        values[Anum_pg_class_relname - 1] = NameGetDatum(&rd_rel->relname);
        values[Anum_pg_class_relnamespace - 1] = ObjectIdGetDatum(rd_rel->relnamespace);
        values[Anum_pg_class_reltype - 1] = ObjectIdGetDatum(rd_rel->reltype);
+       values[Anum_pg_class_reloftype - 1] = ObjectIdGetDatum(rd_rel->reloftype);
        values[Anum_pg_class_relowner - 1] = ObjectIdGetDatum(rd_rel->relowner);
        values[Anum_pg_class_relam - 1] = ObjectIdGetDatum(rd_rel->relam);
        values[Anum_pg_class_relfilenode - 1] = ObjectIdGetDatum(rd_rel->relfilenode);
@@ -618,26 +721,26 @@ InsertPgClassTuple(Relation pg_class_desc,
        values[Anum_pg_class_reltoastidxid - 1] = ObjectIdGetDatum(rd_rel->reltoastidxid);
        values[Anum_pg_class_relhasindex - 1] = BoolGetDatum(rd_rel->relhasindex);
        values[Anum_pg_class_relisshared - 1] = BoolGetDatum(rd_rel->relisshared);
+       values[Anum_pg_class_relpersistence - 1] = CharGetDatum(rd_rel->relpersistence);
        values[Anum_pg_class_relkind - 1] = CharGetDatum(rd_rel->relkind);
        values[Anum_pg_class_relnatts - 1] = Int16GetDatum(rd_rel->relnatts);
        values[Anum_pg_class_relchecks - 1] = Int16GetDatum(rd_rel->relchecks);
-       values[Anum_pg_class_reltriggers - 1] = Int16GetDatum(rd_rel->reltriggers);
-       values[Anum_pg_class_relukeys - 1] = Int16GetDatum(rd_rel->relukeys);
-       values[Anum_pg_class_relfkeys - 1] = Int16GetDatum(rd_rel->relfkeys);
-       values[Anum_pg_class_relrefs - 1] = Int16GetDatum(rd_rel->relrefs);
        values[Anum_pg_class_relhasoids - 1] = BoolGetDatum(rd_rel->relhasoids);
        values[Anum_pg_class_relhaspkey - 1] = BoolGetDatum(rd_rel->relhaspkey);
        values[Anum_pg_class_relhasrules - 1] = BoolGetDatum(rd_rel->relhasrules);
+       values[Anum_pg_class_relhastriggers - 1] = BoolGetDatum(rd_rel->relhastriggers);
        values[Anum_pg_class_relhassubclass - 1] = BoolGetDatum(rd_rel->relhassubclass);
        values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid);
-       /* start out with empty permissions */
-       nulls[Anum_pg_class_relacl - 1] = 'n';
+       if (relacl != (Datum) 0)
+               values[Anum_pg_class_relacl - 1] = relacl;
+       else
+               nulls[Anum_pg_class_relacl - 1] = true;
        if (reloptions != (Datum) 0)
                values[Anum_pg_class_reloptions - 1] = reloptions;
        else
-               nulls[Anum_pg_class_reloptions - 1] = 'n';
+               nulls[Anum_pg_class_reloptions - 1] = true;
 
-       tup = heap_formtuple(RelationGetDescr(pg_class_desc), values, nulls);
+       tup = heap_form_tuple(RelationGetDescr(pg_class_desc), values, nulls);
 
        /*
         * The new tuple must have the oid already chosen for the rel.  Sure would
@@ -665,8 +768,10 @@ AddNewRelationTuple(Relation pg_class_desc,
                                        Relation new_rel_desc,
                                        Oid new_rel_oid,
                                        Oid new_type_oid,
+                                       Oid reloftype,
                                        Oid relowner,
                                        char relkind,
+                                       Datum relacl,
                                        Datum reloptions)
 {
        Form_pg_class new_rel_reltup;
@@ -722,12 +827,14 @@ AddNewRelationTuple(Relation pg_class_desc,
 
        new_rel_reltup->relowner = relowner;
        new_rel_reltup->reltype = new_type_oid;
+       new_rel_reltup->reloftype = reloftype;
        new_rel_reltup->relkind = relkind;
 
        new_rel_desc->rd_att->tdtypeid = new_type_oid;
 
        /* Now build and insert the tuple */
-       InsertPgClassTuple(pg_class_desc, new_rel_desc, new_rel_oid, reloptions);
+       InsertPgClassTuple(pg_class_desc, new_rel_desc, new_rel_oid,
+                                          relacl, reloptions);
 }
 
 
@@ -742,16 +849,21 @@ AddNewRelationType(const char *typeName,
                                   Oid typeNamespace,
                                   Oid new_rel_oid,
                                   char new_rel_kind,
+                                  Oid ownerid,
+                                  Oid new_row_type,
                                   Oid new_array_type)
 {
        return
-               TypeCreate(InvalidOid,  /* no predetermined OID */
+               TypeCreate(new_row_type,        /* optional predetermined OID */
                                   typeName,    /* type name */
                                   typeNamespace,               /* type namespace */
                                   new_rel_oid, /* relation oid */
                                   new_rel_kind,        /* relation kind */
+                                  ownerid,             /* owner's ID */
                                   -1,                  /* internal size (varlena) */
                                   TYPTYPE_COMPOSITE,   /* type-type (composite) */
+                                  TYPCATEGORY_COMPOSITE,               /* type-category (ditto) */
+                                  false,               /* composite types are never preferred */
                                   DEFAULT_TYPDELIM,    /* default array delimiter */
                                   F_RECORD_IN, /* input procedure */
                                   F_RECORD_OUT,        /* output procedure */
@@ -771,13 +883,36 @@ AddNewRelationType(const char *typeName,
                                   'x',                 /* fully TOASTable */
                                   -1,                  /* typmod */
                                   0,                   /* array dimensions for typBaseType */
-                                  false);              /* Type NOT NULL */
+                                  false,               /* Type NOT NULL */
+                                  InvalidOid); /* typcollation */
 }
 
 /* --------------------------------
  *             heap_create_with_catalog
  *
  *             creates a new cataloged relation.  see comments above.
+ *
+ * Arguments:
+ *     relname: name to give to new rel
+ *     relnamespace: OID of namespace it goes in
+ *     reltablespace: OID of tablespace it goes in
+ *     relid: OID to assign to new rel, or InvalidOid to select a new OID
+ *     reltypeid: OID to assign to rel's rowtype, or InvalidOid to select one
+ *     ownerid: OID of new rel's owner
+ *     tupdesc: tuple descriptor (source of column definitions)
+ *     cooked_constraints: list of precooked check constraints and defaults
+ *     relkind: relkind for new rel
+ *     shared_relation: TRUE if it's to be a shared relation
+ *     mapped_relation: TRUE if the relation will use the relfilenode map
+ *     oidislocal: TRUE if oid column (if any) should be marked attislocal
+ *     oidinhcount: attinhcount to assign to oid column (if any)
+ *     oncommit: ON COMMIT marking (only relevant if it's a temp table)
+ *     reloptions: reloptions in Datum form, or (Datum) 0 if none
+ *     use_user_acl: TRUE if should look for user-defined default permissions;
+ *             if FALSE, relacl is always set NULL
+ *     allow_system_table_mods: TRUE to allow creation in system namespaces
+ *
+ * Returns the OID of the new relation
  * --------------------------------
  */
 Oid
@@ -785,18 +920,27 @@ heap_create_with_catalog(const char *relname,
                                                 Oid relnamespace,
                                                 Oid reltablespace,
                                                 Oid relid,
+                                                Oid reltypeid,
+                                                Oid reloftypeid,
                                                 Oid ownerid,
                                                 TupleDesc tupdesc,
+                                                List *cooked_constraints,
                                                 char relkind,
+                                                char relpersistence,
                                                 bool shared_relation,
+                                                bool mapped_relation,
                                                 bool oidislocal,
                                                 int oidinhcount,
                                                 OnCommitAction oncommit,
                                                 Datum reloptions,
-                                                bool allow_system_table_mods)
+                                                bool use_user_acl,
+                                                bool allow_system_table_mods,
+                                                bool if_not_exists)
 {
        Relation        pg_class_desc;
        Relation        new_rel_desc;
+       Acl                *relacl;
+       Oid                     existing_relid;
        Oid                     old_type_oid;
        Oid                     new_type_oid;
        Oid                     new_array_oid = InvalidOid;
@@ -808,12 +952,29 @@ heap_create_with_catalog(const char *relname,
         */
        Assert(IsNormalProcessingMode() || IsBootstrapProcessingMode());
 
-       CheckAttributeNamesTypes(tupdesc, relkind);
+       CheckAttributeNamesTypes(tupdesc, relkind, allow_system_table_mods);
 
-       if (get_relname_relid(relname, relnamespace))
+       /*
+        * If the relation already exists, it's an error, unless the user specifies
+        * "IF NOT EXISTS".  In that case, we just print a notice and do nothing
+        * further.
+        */
+       existing_relid = get_relname_relid(relname, relnamespace);
+       if (existing_relid != InvalidOid)
+       {
+               if (if_not_exists)
+               {
+                       ereport(NOTICE,
+                                       (errcode(ERRCODE_DUPLICATE_TABLE),
+                                        errmsg("relation \"%s\" already exists, skipping",
+                                        relname)));
+                       heap_close(pg_class_desc, RowExclusiveLock);
+                       return InvalidOid;
+               }
                ereport(ERROR,
                                (errcode(ERRCODE_DUPLICATE_TABLE),
                                 errmsg("relation \"%s\" already exists", relname)));
+       }
 
        /*
         * Since we are going to create a rowtype as well, also check for
@@ -821,10 +982,9 @@ heap_create_with_catalog(const char *relname,
         * autogenerated array, we can rename it out of the way; otherwise we can
         * at least give a good error message.
         */
-       old_type_oid = GetSysCacheOid(TYPENAMENSP,
-                                                                 CStringGetDatum(relname),
-                                                                 ObjectIdGetDatum(relnamespace),
-                                                                 0, 0);
+       old_type_oid = GetSysCacheOid2(TYPENAMENSP,
+                                                                  CStringGetDatum(relname),
+                                                                  ObjectIdGetDatum(relnamespace));
        if (OidIsValid(old_type_oid))
        {
                if (!moveArrayTypeName(old_type_oid, relname, relnamespace))
@@ -837,23 +997,10 @@ heap_create_with_catalog(const char *relname,
        }
 
        /*
-        * Validate shared/non-shared tablespace (must check this before doing
-        * GetNewRelFileNode, to prevent Assert therein)
+        * Shared relations must be in pg_global (last-ditch check)
         */
-       if (shared_relation)
-       {
-               if (reltablespace != GLOBALTABLESPACE_OID)
-                       /* elog since this is not a user-facing error */
-                       elog(ERROR,
-                                "shared relations must be placed in pg_global tablespace");
-       }
-       else
-       {
-               if (reltablespace == GLOBALTABLESPACE_OID)
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-                                        errmsg("only shared relations can be placed in pg_global tablespace")));
-       }
+       if (shared_relation && reltablespace != GLOBALTABLESPACE_OID)
+               elog(ERROR, "shared relations must be placed in pg_global tablespace");
 
        /*
         * Allocate an OID for the relation, unless we were told what to use.
@@ -862,8 +1009,54 @@ heap_create_with_catalog(const char *relname,
         * collide with either pg_class OIDs or existing physical files.
         */
        if (!OidIsValid(relid))
-               relid = GetNewRelFileNode(reltablespace, shared_relation,
-                                                                 pg_class_desc);
+       {
+               /*
+                *      Use binary-upgrade override for pg_class.oid/relfilenode,
+                *      if supplied.
+                */
+               if (OidIsValid(binary_upgrade_next_heap_pg_class_oid) &&
+                       (relkind == RELKIND_RELATION || relkind == RELKIND_SEQUENCE ||
+                        relkind == RELKIND_VIEW || relkind == RELKIND_COMPOSITE_TYPE ||
+                        relkind == RELKIND_FOREIGN_TABLE))
+               {
+                       relid = binary_upgrade_next_heap_pg_class_oid;
+                       binary_upgrade_next_heap_pg_class_oid = InvalidOid;
+               }
+               else if (OidIsValid(binary_upgrade_next_toast_pg_class_oid) &&
+                                relkind == RELKIND_TOASTVALUE)
+               {
+                       relid = binary_upgrade_next_toast_pg_class_oid;
+                       binary_upgrade_next_toast_pg_class_oid = InvalidOid;
+               }
+               else
+                       relid = GetNewRelFileNode(reltablespace, pg_class_desc,
+                                                                         relpersistence);
+       }
+
+       /*
+        * Determine the relation's initial permissions.
+        */
+       if (use_user_acl)
+       {
+               switch (relkind)
+               {
+                       case RELKIND_RELATION:
+                       case RELKIND_VIEW:
+                       case RELKIND_FOREIGN_TABLE:
+                               relacl = get_user_default_acl(ACL_OBJECT_RELATION, ownerid,
+                                                                                         relnamespace);
+                               break;
+                       case RELKIND_SEQUENCE:
+                               relacl = get_user_default_acl(ACL_OBJECT_SEQUENCE, ownerid,
+                                                                                         relnamespace);
+                               break;
+                       default:
+                               relacl = NULL;
+                               break;
+               }
+       }
+       else
+               relacl = NULL;
 
        /*
         * Create the relcache entry (mostly dummy at this point) and the physical
@@ -876,7 +1069,9 @@ heap_create_with_catalog(const char *relname,
                                                           relid,
                                                           tupdesc,
                                                           relkind,
+                                                          relpersistence,
                                                           shared_relation,
+                                                          mapped_relation,
                                                           allow_system_table_mods);
 
        Assert(relid == RelationGetRelid(new_rel_desc));
@@ -885,22 +1080,20 @@ heap_create_with_catalog(const char *relname,
         * Decide whether to create an array type over the relation's rowtype. We
         * do not create any array types for system catalogs (ie, those made
         * during initdb).      We create array types for regular relations, views,
-        * and composite types ... but not, eg, for toast tables or sequences.
+        * composite types and foreign tables ... but not, eg, for toast tables or
+        * sequences.
         */
        if (IsUnderPostmaster && (relkind == RELKIND_RELATION ||
                                                          relkind == RELKIND_VIEW ||
+                                                         relkind == RELKIND_FOREIGN_TABLE ||
                                                          relkind == RELKIND_COMPOSITE_TYPE))
-       {
-               /* OK, so pre-assign a type OID for the array type */
-               Relation        pg_type = heap_open(TypeRelationId, AccessShareLock);
-
-               new_array_oid = GetNewOid(pg_type);
-               heap_close(pg_type, AccessShareLock);
-       }
+               new_array_oid = AssignTypeArrayOid();
 
        /*
         * Since defining a relation also defines a complex type, we add a new
-        * system type corresponding to the new relation.
+        * system type corresponding to the new relation.  The OID of the type can
+        * be preselected by the caller, but if reltypeid is InvalidOid, we'll
+        * generate a new OID for it.
         *
         * NOTE: we could get a unique-index failure here, in case someone else is
         * creating the same type name in parallel but hadn't committed yet when
@@ -910,6 +1103,8 @@ heap_create_with_catalog(const char *relname,
                                                                          relnamespace,
                                                                          relid,
                                                                          relkind,
+                                                                         ownerid,
+                                                                         reltypeid,
                                                                          new_array_oid);
 
        /*
@@ -926,8 +1121,11 @@ heap_create_with_catalog(const char *relname,
                                   relnamespace,        /* Same namespace as parent */
                                   InvalidOid,  /* Not composite, no relationOid */
                                   0,                   /* relkind, also N/A here */
+                                  ownerid,             /* owner's ID */
                                   -1,                  /* Internal size (varlena) */
                                   TYPTYPE_BASE,        /* Not composite - typelem is */
+                                  TYPCATEGORY_ARRAY,   /* type-category (array) */
+                                  false,               /* array types are never preferred */
                                   DEFAULT_TYPDELIM,    /* default array delimiter */
                                   F_ARRAY_IN,  /* array input proc */
                                   F_ARRAY_OUT, /* array output proc */
@@ -947,7 +1145,8 @@ heap_create_with_catalog(const char *relname,
                                   'x',                 /* fully TOASTable */
                                   -1,                  /* typmod */
                                   0,                   /* array dimensions for typBaseType */
-                                  false);              /* Type NOT NULL */
+                                  false,               /* Type NOT NULL */
+                                  InvalidOid); /* typcollation */
 
                pfree(relarrayname);
        }
@@ -963,8 +1162,10 @@ heap_create_with_catalog(const char *relname,
                                                new_rel_desc,
                                                relid,
                                                new_type_oid,
+                                               reloftypeid,
                                                ownerid,
                                                relkind,
+                                               PointerGetDatum(relacl),
                                                reloptions);
 
        /*
@@ -975,12 +1176,16 @@ heap_create_with_catalog(const char *relname,
 
        /*
         * Make a dependency link to force the relation to be deleted if its
-        * namespace is.  Also make a dependency link to its owner.
+        * namespace is.  Also make a dependency link to its owner, as well as
+        * dependencies for any roles mentioned in the default ACL.
         *
         * For composite types, these dependencies are tracked for the pg_type
         * entry, so we needn't record them here.  Likewise, TOAST tables don't
         * need a namespace dependency (they live in a pinned namespace) nor an
-        * owner dependency (they depend indirectly through the parent table).
+        * owner dependency (they depend indirectly through the parent table), nor
+        * should they have any ACL entries.  The same applies for extension
+        * dependencies.
+        *
         * Also, skip this in bootstrap mode, since we don't make dependencies
         * while bootstrapping.
         */
@@ -1000,16 +1205,41 @@ heap_create_with_catalog(const char *relname,
                recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
 
                recordDependencyOnOwner(RelationRelationId, relid, ownerid);
+
+               recordDependencyOnCurrentExtension(&myself);
+
+               if (reloftypeid)
+               {
+                       referenced.classId = TypeRelationId;
+                       referenced.objectId = reloftypeid;
+                       referenced.objectSubId = 0;
+                       recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+               }
+
+               if (relacl != NULL)
+               {
+                       int                     nnewmembers;
+                       Oid                *newmembers;
+
+                       nnewmembers = aclmembers(relacl, &newmembers);
+                       updateAclDependencies(RelationRelationId, relid, 0,
+                                                                 ownerid,
+                                                                 0, NULL,
+                                                                 nnewmembers, newmembers);
+               }
        }
 
+       /* Post creation hook for new relation */
+       InvokeObjectAccessHook(OAT_POST_CREATE, RelationRelationId, relid, 0);
+
        /*
-        * store constraints and defaults passed in the tupdesc, if any.
+        * Store any supplied constraints and defaults.
         *
         * NB: this may do a CommandCounterIncrement and rebuild the relcache
         * entry, so the relation must be valid and self-consistent at this point.
         * In particular, there are not yet constraints and defaults anywhere.
         */
-       StoreConstraints(new_rel_desc, tupdesc);
+       StoreConstraints(new_rel_desc, cooked_constraints);
 
        /*
         * If there's a special on-commit action, remember it
@@ -1017,6 +1247,25 @@ heap_create_with_catalog(const char *relname,
        if (oncommit != ONCOMMIT_NOOP)
                register_on_commit_action(relid, oncommit);
 
+       /*
+        * If this is an unlogged relation, it needs an init fork so that it
+        * can be correctly reinitialized on restart.  Since we're going to
+        * do an immediate sync, we ony need to xlog this if archiving or
+        * streaming is enabled.  And the immediate sync is required, because
+        * otherwise there's no guarantee that this will hit the disk before
+        * the next checkpoint moves the redo pointer.
+        */
+       if (relpersistence == RELPERSISTENCE_UNLOGGED)
+       {
+               Assert(relkind == RELKIND_RELATION || relkind == RELKIND_TOASTVALUE);
+
+               smgrcreate(new_rel_desc->rd_smgr, INIT_FORKNUM, false);
+               if (XLogIsNeeded())
+                       log_smgrcreate(&new_rel_desc->rd_smgr->smgr_rnode.node,
+                                                  INIT_FORKNUM);
+               smgrimmedsync(new_rel_desc->rd_smgr, INIT_FORKNUM);
+       }
+
        /*
         * ok, the relation has been cataloged, so close our relations and return
         * the OID of the newly created relation.
@@ -1079,9 +1328,7 @@ DeleteRelationTuple(Oid relid)
        /* Grab an appropriate lock on the pg_class relation */
        pg_class_desc = heap_open(RelationRelationId, RowExclusiveLock);
 
-       tup = SearchSysCache(RELOID,
-                                                ObjectIdGetDatum(relid),
-                                                0, 0, 0);
+       tup = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
        if (!HeapTupleIsValid(tup))
                elog(ERROR, "cache lookup failed for relation %u", relid);
 
@@ -1158,10 +1405,9 @@ RemoveAttributeById(Oid relid, AttrNumber attnum)
 
        attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
 
-       tuple = SearchSysCacheCopy(ATTNUM,
-                                                          ObjectIdGetDatum(relid),
-                                                          Int16GetDatum(attnum),
-                                                          0, 0);
+       tuple = SearchSysCacheCopy2(ATTNUM,
+                                                               ObjectIdGetDatum(relid),
+                                                               Int16GetDatum(attnum));
        if (!HeapTupleIsValid(tuple))           /* shouldn't happen */
                elog(ERROR, "cache lookup failed for attribute %d of relation %u",
                         attnum, relid);
@@ -1326,10 +1572,9 @@ RemoveAttrDefaultById(Oid attrdefId)
        /* Fix the pg_attribute row */
        attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
 
-       tuple = SearchSysCacheCopy(ATTNUM,
-                                                          ObjectIdGetDatum(myrelid),
-                                                          Int16GetDatum(myattnum),
-                                                          0, 0);
+       tuple = SearchSysCacheCopy2(ATTNUM,
+                                                               ObjectIdGetDatum(myrelid),
+                                                               Int16GetDatum(myattnum));
        if (!HeapTupleIsValid(tuple))           /* shouldn't happen */
                elog(ERROR, "cache lookup failed for attribute %d of relation %u",
                         myattnum, myrelid);
@@ -1371,13 +1616,40 @@ heap_drop_with_catalog(Oid relid)
        rel = relation_open(relid, AccessExclusiveLock);
 
        /*
-        * Schedule unlinking of the relation's physical file at commit.
+        * There can no longer be anyone *else* touching the relation, but we
+        * might still have open queries or cursors, or pending trigger events,
+        * in our own session.
+        */
+       CheckTableNotInUse(rel, "DROP TABLE");
+
+       /*
+        * Delete pg_foreign_table tuple first.
+        */
+       if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+       {
+               Relation    rel;
+               HeapTuple   tuple;
+
+               rel = heap_open(ForeignTableRelationId, RowExclusiveLock);
+
+               tuple = SearchSysCache1(FOREIGNTABLEREL, ObjectIdGetDatum(relid));
+               if (!HeapTupleIsValid(tuple))
+                       elog(ERROR, "cache lookup failed for foreign table %u", relid);
+
+               simple_heap_delete(rel, &tuple->t_self);
+
+               ReleaseSysCache(tuple);
+               heap_close(rel, RowExclusiveLock);
+       }
+
+       /*
+        * Schedule unlinking of the relation's physical files at commit.
         */
        if (rel->rd_rel->relkind != RELKIND_VIEW &&
-               rel->rd_rel->relkind != RELKIND_COMPOSITE_TYPE)
+               rel->rd_rel->relkind != RELKIND_COMPOSITE_TYPE &&
+               rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE)
        {
-               RelationOpenSmgr(rel);
-               smgrscheduleunlink(rel->rd_smgr, rel->rd_istemp);
+               RelationDropStorage(rel);
        }
 
        /*
@@ -1425,17 +1697,16 @@ heap_drop_with_catalog(Oid relid)
 
 /*
  * Store a default expression for column attnum of relation rel.
- * The expression must be presented as a nodeToString() string.
  */
 void
-StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin)
+StoreAttrDefault(Relation rel, AttrNumber attnum, Node *expr)
 {
-       Node       *expr;
+       char       *adbin;
        char       *adsrc;
        Relation        adrel;
        HeapTuple       tuple;
        Datum           values[4];
-       static char nulls[4] = {' ', ' ', ' ', ' '};
+       static bool nulls[4] = {false, false, false, false};
        Relation        attrrel;
        HeapTuple       atttup;
        Form_pg_attribute attStruct;
@@ -1444,12 +1715,12 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin)
                                defobject;
 
        /*
-        * Need to construct source equivalent of given node-string.
+        * Flatten expression to string form for storage.
         */
-       expr = stringToNode(adbin);
+       adbin = nodeToString(expr);
 
        /*
-        * deparse it
+        * Also deparse it to form the mostly-obsolete adsrc field.
         */
        adsrc = deparse_expression(expr,
                                                        deparse_context_for(RelationGetRelationName(rel),
@@ -1466,7 +1737,7 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin)
 
        adrel = heap_open(AttrDefaultRelationId, RowExclusiveLock);
 
-       tuple = heap_formtuple(adrel->rd_att, values, nulls);
+       tuple = heap_form_tuple(adrel->rd_att, values, nulls);
        attrdefOid = simple_heap_insert(adrel, tuple);
 
        CatalogUpdateIndexes(adrel, tuple);
@@ -1481,6 +1752,7 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin)
        pfree(DatumGetPointer(values[Anum_pg_attrdef_adbin - 1]));
        pfree(DatumGetPointer(values[Anum_pg_attrdef_adsrc - 1]));
        heap_freetuple(tuple);
+       pfree(adbin);
        pfree(adsrc);
 
        /*
@@ -1488,10 +1760,9 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin)
         * exists.
         */
        attrrel = heap_open(AttributeRelationId, RowExclusiveLock);
-       atttup = SearchSysCacheCopy(ATTNUM,
-                                                               ObjectIdGetDatum(RelationGetRelid(rel)),
-                                                               Int16GetDatum(attnum),
-                                                               0, 0);
+       atttup = SearchSysCacheCopy2(ATTNUM,
+                                                                ObjectIdGetDatum(RelationGetRelid(rel)),
+                                                                Int16GetDatum(attnum));
        if (!HeapTupleIsValid(atttup))
                elog(ERROR, "cache lookup failed for attribute %d of relation %u",
                         attnum, RelationGetRelid(rel));
@@ -1524,27 +1795,27 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin)
 
 /*
  * Store a check-constraint expression for the given relation.
- * The expression must be presented as a nodeToString() string.
  *
  * Caller is responsible for updating the count of constraints
  * in the pg_class entry for the relation.
  */
 static void
-StoreRelCheck(Relation rel, char *ccname, char *ccbin)
+StoreRelCheck(Relation rel, char *ccname, Node *expr,
+                         bool is_local, int inhcount)
 {
-       Node       *expr;
+       char       *ccbin;
        char       *ccsrc;
        List       *varList;
        int                     keycount;
        int16      *attNos;
 
        /*
-        * Convert condition to an expression tree.
+        * Flatten expression to string form for storage.
         */
-       expr = stringToNode(ccbin);
+       ccbin = nodeToString(expr);
 
        /*
-        * deparse it
+        * Also deparse it to form the mostly-obsolete consrc field.
         */
        ccsrc = deparse_expression(expr,
                                                        deparse_context_for(RelationGetRelationName(rel),
@@ -1552,13 +1823,13 @@ StoreRelCheck(Relation rel, char *ccname, char *ccbin)
                                                           false, false);
 
        /*
-        * Find columns of rel that are used in ccbin
+        * Find columns of rel that are used in expr
         *
         * NB: pull_var_clause is okay here only because we don't allow subselects
         * in check constraints; it would fail to examine the contents of
         * subselects.
         */
-       varList = pull_var_clause(expr, false);
+       varList = pull_var_clause(expr, PVC_REJECT_PLACEHOLDERS);
        keycount = list_length(varList);
 
        if (keycount > 0)
@@ -1591,10 +1862,12 @@ StoreRelCheck(Relation rel, char *ccname, char *ccbin)
                                                  CONSTRAINT_CHECK,             /* Constraint Type */
                                                  false,        /* Is Deferrable */
                                                  false,        /* Is Deferred */
+                                                 true,         /* Is Validated */
                                                  RelationGetRelid(rel),                /* relation */
                                                  attNos,               /* attrs in the constraint */
                                                  keycount,             /* # attrs in the constraint */
                                                  InvalidOid,   /* not a domain constraint */
+                                                 InvalidOid,   /* no associated index */
                                                  InvalidOid,   /* Foreign key fields */
                                                  NULL,
                                                  NULL,
@@ -1604,29 +1877,32 @@ StoreRelCheck(Relation rel, char *ccname, char *ccbin)
                                                  ' ',
                                                  ' ',
                                                  ' ',
-                                                 InvalidOid,   /* no associated index */
-                                                 expr, /* Tree form check constraint */
-                                                 ccbin,        /* Binary form check constraint */
-                                                 ccsrc);               /* Source form check constraint */
-
+                                                 NULL, /* not an exclusion constraint */
+                                                 expr, /* Tree form of check constraint */
+                                                 ccbin,        /* Binary form of check constraint */
+                                                 ccsrc,        /* Source form of check constraint */
+                                                 is_local,             /* conislocal */
+                                                 inhcount);    /* coninhcount */
+
+       pfree(ccbin);
        pfree(ccsrc);
 }
 
 /*
- * Store defaults and constraints passed in via the tuple constraint struct.
+ * Store defaults and constraints (passed as a list of CookedConstraint).
  *
  * NOTE: only pre-cooked expressions will be passed this way, which is to
  * say expressions inherited from an existing relation.  Newly parsed
  * expressions can be added later, by direct calls to StoreAttrDefault
- * and StoreRelCheck (see AddRelationRawConstraints()).
+ * and StoreRelCheck (see AddRelationNewConstraints()).
  */
 static void
-StoreConstraints(Relation rel, TupleDesc tupdesc)
+StoreConstraints(Relation rel, List *cooked_constraints)
 {
-       TupleConstr *constr = tupdesc->constr;
-       int                     i;
+       int                     numchecks = 0;
+       ListCell   *lc;
 
-       if (!constr)
+       if (!cooked_constraints)
                return;                                 /* nothing to do */
 
        /*
@@ -1636,33 +1912,46 @@ StoreConstraints(Relation rel, TupleDesc tupdesc)
         */
        CommandCounterIncrement();
 
-       for (i = 0; i < constr->num_defval; i++)
-               StoreAttrDefault(rel, constr->defval[i].adnum,
-                                                constr->defval[i].adbin);
+       foreach(lc, cooked_constraints)
+       {
+               CookedConstraint *con = (CookedConstraint *) lfirst(lc);
 
-       for (i = 0; i < constr->num_check; i++)
-               StoreRelCheck(rel, constr->check[i].ccname,
-                                         constr->check[i].ccbin);
+               switch (con->contype)
+               {
+                       case CONSTR_DEFAULT:
+                               StoreAttrDefault(rel, con->attnum, con->expr);
+                               break;
+                       case CONSTR_CHECK:
+                               StoreRelCheck(rel, con->name, con->expr,
+                                                         con->is_local, con->inhcount);
+                               numchecks++;
+                               break;
+                       default:
+                               elog(ERROR, "unrecognized constraint type: %d",
+                                        (int) con->contype);
+               }
+       }
 
-       if (constr->num_check > 0)
-               SetRelationNumChecks(rel, constr->num_check);
+       if (numchecks > 0)
+               SetRelationNumChecks(rel, numchecks);
 }
 
 /*
- * AddRelationRawConstraints
+ * AddRelationNewConstraints
  *
- * Add raw (not-yet-transformed) column default expressions and/or constraint
- * check expressions to an existing relation.  This is defined to do both
- * for efficiency in DefineRelation, but of course you can do just one or
- * the other by passing empty lists.
+ * Add new column default expressions and/or constraint check expressions
+ * to an existing relation.  This is defined to do both for efficiency in
+ * DefineRelation, but of course you can do just one or the other by passing
+ * empty lists.
  *
  * rel: relation to be modified
- * rawColDefaults: list of RawColumnDefault structures
- * rawConstraints: list of Constraint nodes
+ * newColDefaults: list of RawColumnDefault structures
+ * newConstraints: list of Constraint nodes
+ * allow_merge: TRUE if check constraints may be merged with existing ones
+ * is_local: TRUE if definition is local, FALSE if it's inherited
  *
- * All entries in rawColDefaults will be processed.  Entries in rawConstraints
- * will be processed only if they are CONSTR_CHECK type and contain a "raw"
- * expression.
+ * All entries in newColDefaults will be processed.  Entries in newConstraints
+ * will be processed only if they are CONSTR_CHECK type.
  *
  * Returns a list of CookedConstraint nodes that shows the cooked form of
  * the default and constraint expressions added to the relation.
@@ -1673,9 +1962,11 @@ StoreConstraints(Relation rel, TupleDesc tupdesc)
  * tuples visible.
  */
 List *
-AddRelationRawConstraints(Relation rel,
-                                                 List *rawColDefaults,
-                                                 List *rawConstraints)
+AddRelationNewConstraints(Relation rel,
+                                                 List *newColDefaults,
+                                                 List *newConstraints,
+                                                 bool allow_merge,
+                                                 bool is_local)
 {
        List       *cookedConstraints = NIL;
        TupleDesc       tupleDesc;
@@ -1714,7 +2005,7 @@ AddRelationRawConstraints(Relation rel,
        /*
         * Process column default expressions.
         */
-       foreach(cell, rawColDefaults)
+       foreach(cell, newColDefaults)
        {
                RawColumnDefault *colDef = (RawColumnDefault *) lfirst(cell);
                Form_pg_attribute atp = rel->rd_att->attrs[colDef->attnum - 1];
@@ -1738,13 +2029,15 @@ AddRelationRawConstraints(Relation rel,
                        (IsA(expr, Const) &&((Const *) expr)->constisnull))
                        continue;
 
-               StoreAttrDefault(rel, colDef->attnum, nodeToString(expr));
+               StoreAttrDefault(rel, colDef->attnum, expr);
 
                cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
                cooked->contype = CONSTR_DEFAULT;
                cooked->name = NULL;
                cooked->attnum = colDef->attnum;
                cooked->expr = expr;
+               cooked->is_local = is_local;
+               cooked->inhcount = is_local ? 0 : 1;
                cookedConstraints = lappend(cookedConstraints, cooked);
        }
 
@@ -1753,63 +2046,44 @@ AddRelationRawConstraints(Relation rel,
         */
        numchecks = numoldchecks;
        checknames = NIL;
-       foreach(cell, rawConstraints)
+       foreach(cell, newConstraints)
        {
                Constraint *cdef = (Constraint *) lfirst(cell);
                char       *ccname;
 
-               if (cdef->contype != CONSTR_CHECK || cdef->raw_expr == NULL)
+               if (cdef->contype != CONSTR_CHECK)
                        continue;
-               Assert(cdef->cooked_expr == NULL);
 
-               /*
-                * Transform raw parsetree to executable expression.
-                */
-               expr = transformExpr(pstate, cdef->raw_expr);
-
-               /*
-                * Make sure it yields a boolean result.
-                */
-               expr = coerce_to_boolean(pstate, expr, "CHECK");
+               if (cdef->raw_expr != NULL)
+               {
+                       Assert(cdef->cooked_expr == NULL);
 
-               /*
-                * Make sure no outside relations are referred to.
-                */
-               if (list_length(pstate->p_rtable) != 1)
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
-                       errmsg("only table \"%s\" can be referenced in check constraint",
-                                  RelationGetRelationName(rel))));
+                       /*
+                        * Transform raw parsetree to executable expression, and verify
+                        * it's valid as a CHECK constraint.
+                        */
+                       expr = cookConstraint(pstate, cdef->raw_expr,
+                                                                 RelationGetRelationName(rel));
+               }
+               else
+               {
+                       Assert(cdef->cooked_expr != NULL);
 
-               /*
-                * No subplans or aggregates, either...
-                */
-               if (pstate->p_hasSubLinks)
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                                        errmsg("cannot use subquery in check constraint")));
-               if (pstate->p_hasAggs)
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_GROUPING_ERROR),
-                          errmsg("cannot use aggregate function in check constraint")));
+                       /*
+                        * Here, we assume the parser will only pass us valid CHECK
+                        * expressions, so we do no particular checking.
+                        */
+                       expr = stringToNode(cdef->cooked_expr);
+               }
 
                /*
                 * Check name uniqueness, or generate a name if none was given.
                 */
-               if (cdef->name != NULL)
+               if (cdef->conname != NULL)
                {
                        ListCell   *cell2;
 
-                       ccname = cdef->name;
-                       /* Check against pre-existing constraints */
-                       if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
-                                                                        RelationGetRelid(rel),
-                                                                        RelationGetNamespace(rel),
-                                                                        ccname))
-                               ereport(ERROR,
-                                               (errcode(ERRCODE_DUPLICATE_OBJECT),
-                               errmsg("constraint \"%s\" for relation \"%s\" already exists",
-                                          ccname, RelationGetRelationName(rel))));
+                       ccname = cdef->conname;
                        /* Check against other new constraints */
                        /* Needed because we don't do CommandCounterIncrement in loop */
                        foreach(cell2, checknames)
@@ -1820,6 +2094,19 @@ AddRelationRawConstraints(Relation rel,
                                                         errmsg("check constraint \"%s\" already exists",
                                                                        ccname)));
                        }
+
+                       /* save name for future checks */
+                       checknames = lappend(checknames, ccname);
+
+                       /*
+                        * Check against pre-existing constraints.      If we are allowed to
+                        * merge with an existing constraint, there's no more to do here.
+                        * (We omit the duplicate constraint from the result, which is
+                        * what ATAddCheckConstraint wants.)
+                        */
+                       if (MergeWithExistingConstraint(rel, ccname, expr,
+                                                                                       allow_merge, is_local))
+                               continue;
                }
                else
                {
@@ -1838,7 +2125,7 @@ AddRelationRawConstraints(Relation rel,
                        List       *vars;
                        char       *colname;
 
-                       vars = pull_var_clause(expr, false);
+                       vars = pull_var_clause(expr, PVC_REJECT_PLACEHOLDERS);
 
                        /* eliminate duplicates */
                        vars = list_union(NIL, vars);
@@ -1854,15 +2141,15 @@ AddRelationRawConstraints(Relation rel,
                                                                                  "check",
                                                                                  RelationGetNamespace(rel),
                                                                                  checknames);
-               }
 
-               /* save name for future checks */
-               checknames = lappend(checknames, ccname);
+                       /* save name for future checks */
+                       checknames = lappend(checknames, ccname);
+               }
 
                /*
                 * OK, store it.
                 */
-               StoreRelCheck(rel, ccname, nodeToString(expr));
+               StoreRelCheck(rel, ccname, expr, is_local, is_local ? 0 : 1);
 
                numchecks++;
 
@@ -1871,6 +2158,8 @@ AddRelationRawConstraints(Relation rel,
                cooked->name = ccname;
                cooked->attnum = 0;
                cooked->expr = expr;
+               cooked->is_local = is_local;
+               cooked->inhcount = is_local ? 0 : 1;
                cookedConstraints = lappend(cookedConstraints, cooked);
        }
 
@@ -1886,6 +2175,90 @@ AddRelationRawConstraints(Relation rel,
        return cookedConstraints;
 }
 
+/*
+ * Check for a pre-existing check constraint that conflicts with a proposed
+ * new one, and either adjust its conislocal/coninhcount settings or throw
+ * error as needed.
+ *
+ * Returns TRUE if merged (constraint is a duplicate), or FALSE if it's
+ * got a so-far-unique name, or throws error if conflict.
+ */
+static bool
+MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr,
+                                                       bool allow_merge, bool is_local)
+{
+       bool            found;
+       Relation        conDesc;
+       SysScanDesc conscan;
+       ScanKeyData skey[2];
+       HeapTuple       tup;
+
+       /* Search for a pg_constraint entry with same name and relation */
+       conDesc = heap_open(ConstraintRelationId, RowExclusiveLock);
+
+       found = false;
+
+       ScanKeyInit(&skey[0],
+                               Anum_pg_constraint_conname,
+                               BTEqualStrategyNumber, F_NAMEEQ,
+                               CStringGetDatum(ccname));
+
+       ScanKeyInit(&skey[1],
+                               Anum_pg_constraint_connamespace,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(RelationGetNamespace(rel)));
+
+       conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
+                                                                SnapshotNow, 2, skey);
+
+       while (HeapTupleIsValid(tup = systable_getnext(conscan)))
+       {
+               Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tup);
+
+               if (con->conrelid == RelationGetRelid(rel))
+               {
+                       /* Found it.  Conflicts if not identical check constraint */
+                       if (con->contype == CONSTRAINT_CHECK)
+                       {
+                               Datum           val;
+                               bool            isnull;
+
+                               val = fastgetattr(tup,
+                                                                 Anum_pg_constraint_conbin,
+                                                                 conDesc->rd_att, &isnull);
+                               if (isnull)
+                                       elog(ERROR, "null conbin for rel %s",
+                                                RelationGetRelationName(rel));
+                               if (equal(expr, stringToNode(TextDatumGetCString(val))))
+                                       found = true;
+                       }
+                       if (!found || !allow_merge)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_DUPLICATE_OBJECT),
+                               errmsg("constraint \"%s\" for relation \"%s\" already exists",
+                                          ccname, RelationGetRelationName(rel))));
+                       /* OK to update the tuple */
+                       ereport(NOTICE,
+                          (errmsg("merging constraint \"%s\" with inherited definition",
+                                          ccname)));
+                       tup = heap_copytuple(tup);
+                       con = (Form_pg_constraint) GETSTRUCT(tup);
+                       if (is_local)
+                               con->conislocal = true;
+                       else
+                               con->coninhcount++;
+                       simple_heap_update(conDesc, &tup->t_self, tup);
+                       CatalogUpdateIndexes(conDesc, tup);
+                       break;
+               }
+       }
+
+       systable_endscan(conscan);
+       heap_close(conDesc, RowExclusiveLock);
+
+       return found;
+}
+
 /*
  * Update the count of constraints in the relation's pg_class tuple.
  *
@@ -1904,9 +2277,8 @@ SetRelationNumChecks(Relation rel, int numchecks)
        Form_pg_class relStruct;
 
        relrel = heap_open(RelationRelationId, RowExclusiveLock);
-       reltup = SearchSysCacheCopy(RELOID,
-                                                               ObjectIdGetDatum(RelationGetRelid(rel)),
-                                                               0, 0, 0);
+       reltup = SearchSysCacheCopy1(RELOID,
+                                                                ObjectIdGetDatum(RelationGetRelid(rel)));
        if (!HeapTupleIsValid(reltup))
                elog(ERROR, "cache lookup failed for relation %u",
                         RelationGetRelid(rel));
@@ -1986,6 +2358,10 @@ cookDefault(ParseState *pstate,
                ereport(ERROR,
                                (errcode(ERRCODE_GROUPING_ERROR),
                         errmsg("cannot use aggregate function in default expression")));
+       if (pstate->p_hasWindowFuncs)
+               ereport(ERROR,
+                               (errcode(ERRCODE_WINDOWING_ERROR),
+                                errmsg("cannot use window function in default expression")));
 
        /*
         * Coerce the expression to the correct type and typmod, if given. This
@@ -1999,7 +2375,8 @@ cookDefault(ParseState *pstate,
                expr = coerce_to_target_type(pstate, expr, type_id,
                                                                         atttypid, atttypmod,
                                                                         COERCION_ASSIGNMENT,
-                                                                        COERCE_IMPLICIT_CAST);
+                                                                        COERCE_IMPLICIT_CAST,
+                                                                        -1);
                if (expr == NULL)
                        ereport(ERROR,
                                        (errcode(ERRCODE_DATATYPE_MISMATCH),
@@ -2011,73 +2388,76 @@ cookDefault(ParseState *pstate,
                           errhint("You will need to rewrite or cast the expression.")));
        }
 
+       /*
+        * Finally, take care of collations in the finished expression.
+        */
+       assign_expr_collations(pstate, expr);
+
        return expr;
 }
 
-
 /*
- * Removes all constraints on a relation that match the given name.
+ * Take a raw CHECK constraint expression and convert it to a cooked format
+ * ready for storage.
  *
- * It is the responsibility of the calling function to acquire a suitable
- * lock on the relation.
- *
- * Returns: The number of constraints removed.
+ * Parse state must be set up to recognize any vars that might appear
+ * in the expression.
  */
-int
-RemoveRelConstraints(Relation rel, const char *constrName,
-                                        DropBehavior behavior)
+static Node *
+cookConstraint(ParseState *pstate,
+                          Node *raw_constraint,
+                          char *relname)
 {
-       int                     ndeleted = 0;
-       Relation        conrel;
-       SysScanDesc conscan;
-       ScanKeyData key[1];
-       HeapTuple       contup;
-
-       /* Grab an appropriate lock on the pg_constraint relation */
-       conrel = heap_open(ConstraintRelationId, RowExclusiveLock);
-
-       /* Use the index to scan only constraints of the target relation */
-       ScanKeyInit(&key[0],
-                               Anum_pg_constraint_conrelid,
-                               BTEqualStrategyNumber, F_OIDEQ,
-                               ObjectIdGetDatum(RelationGetRelid(rel)));
-
-       conscan = systable_beginscan(conrel, ConstraintRelidIndexId, true,
-                                                                SnapshotNow, 1, key);
+       Node       *expr;
 
        /*
-        * Scan over the result set, removing any matching entries.
+        * Transform raw parsetree to executable expression.
         */
-       while ((contup = systable_getnext(conscan)) != NULL)
-       {
-               Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(contup);
-
-               if (strcmp(NameStr(con->conname), constrName) == 0)
-               {
-                       ObjectAddress conobj;
+       expr = transformExpr(pstate, raw_constraint);
 
-                       conobj.classId = ConstraintRelationId;
-                       conobj.objectId = HeapTupleGetOid(contup);
-                       conobj.objectSubId = 0;
+       /*
+        * Make sure it yields a boolean result.
+        */
+       expr = coerce_to_boolean(pstate, expr, "CHECK");
 
-                       performDeletion(&conobj, behavior);
+       /*
+        * Take care of collations.
+        */
+       assign_expr_collations(pstate, expr);
 
-                       ndeleted++;
-               }
-       }
+       /*
+        * Make sure no outside relations are referred to.
+        */
+       if (list_length(pstate->p_rtable) != 1)
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+                       errmsg("only table \"%s\" can be referenced in check constraint",
+                                  relname)));
 
-       /* Clean up after the scan */
-       systable_endscan(conscan);
-       heap_close(conrel, RowExclusiveLock);
+       /*
+        * No subplans or aggregates, either...
+        */
+       if (pstate->p_hasSubLinks)
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("cannot use subquery in check constraint")));
+       if (pstate->p_hasAggs)
+               ereport(ERROR,
+                               (errcode(ERRCODE_GROUPING_ERROR),
+                          errmsg("cannot use aggregate function in check constraint")));
+       if (pstate->p_hasWindowFuncs)
+               ereport(ERROR,
+                               (errcode(ERRCODE_WINDOWING_ERROR),
+                                errmsg("cannot use window function in check constraint")));
 
-       return ndeleted;
+       return expr;
 }
 
 
 /*
  * RemoveStatistics --- remove entries in pg_statistic for a rel or column
  *
- * If attnum is zero, remove all entries for rel; else remove only the one
+ * If attnum is zero, remove all entries for rel; else remove only the one(s)
  * for that column.
  */
 void
@@ -2107,9 +2487,10 @@ RemoveStatistics(Oid relid, AttrNumber attnum)
                nkeys = 2;
        }
 
-       scan = systable_beginscan(pgstatistic, StatisticRelidAttnumIndexId, true,
+       scan = systable_beginscan(pgstatistic, StatisticRelidAttnumInhIndexId, true,
                                                          SnapshotNow, nkeys, key);
 
+       /* we must loop even when attnum != 0, in case of inherited stats */
        while (HeapTupleIsValid(tuple = systable_getnext(scan)))
                simple_heap_delete(pgstatistic, &tuple->t_self);
 
@@ -2144,7 +2525,9 @@ RelationTruncateIndexes(Relation heapRelation)
                /* Fetch info needed for index_build */
                indexInfo = BuildIndexInfo(currentIndex);
 
-               /* Now truncate the actual file (and discard buffers) */
+               /*
+                * Now truncate the actual file (and discard buffers).
+                */
                RelationTruncate(currentIndex, 0);
 
                /* Initialize the index and rebuild */
@@ -2176,18 +2559,9 @@ heap_truncate(List *relids)
        {
                Oid                     rid = lfirst_oid(cell);
                Relation        rel;
-               Oid                     toastrelid;
 
                rel = heap_open(rid, AccessExclusiveLock);
                relations = lappend(relations, rel);
-
-               /* If there is a toast table, add it to the list too */
-               toastrelid = rel->rd_rel->reltoastrelid;
-               if (OidIsValid(toastrelid))
-               {
-                       rel = heap_open(toastrelid, AccessExclusiveLock);
-                       relations = lappend(relations, rel);
-               }
        }
 
        /* Don't allow truncate on tables that are referenced by foreign keys */
@@ -2198,19 +2572,47 @@ heap_truncate(List *relids)
        {
                Relation        rel = lfirst(cell);
 
-               /* Truncate the actual file (and discard buffers) */
-               RelationTruncate(rel, 0);
+               /* Truncate the relation */
+               heap_truncate_one_rel(rel);
 
-               /* If this relation has indexes, truncate the indexes too */
-               RelationTruncateIndexes(rel);
-
-               /*
-                * Close the relation, but keep exclusive lock on it until commit.
-                */
+               /* Close the relation, but keep exclusive lock on it until commit */
                heap_close(rel, NoLock);
        }
 }
 
+/*
+ *      heap_truncate_one_rel
+ *
+ *      This routine deletes all data within the specified relation.
+ *
+ * This is not transaction-safe, because the truncation is done immediately
+ * and cannot be rolled back later.  Caller is responsible for having
+ * checked permissions etc, and must have obtained AccessExclusiveLock.
+ */
+void
+heap_truncate_one_rel(Relation rel)
+{
+       Oid                     toastrelid;
+
+       /* Truncate the actual file (and discard buffers) */
+       RelationTruncate(rel, 0);
+
+       /* If the relation has indexes, truncate the indexes too */
+       RelationTruncateIndexes(rel);
+
+       /* If there is a toast table, truncate that too */
+       toastrelid = rel->rd_rel->reltoastrelid;
+       if (OidIsValid(toastrelid))
+       {
+               Relation        toastrel = heap_open(toastrelid, AccessExclusiveLock);
+
+               RelationTruncate(toastrel, 0);
+               RelationTruncateIndexes(toastrel);
+               /* keep the lock... */
+               heap_close(toastrel, NoLock);
+       }
+}
+
 /*
  * heap_truncate_check_FKs
  *             Check for foreign keys referencing a list of relations that
@@ -2241,7 +2643,7 @@ heap_truncate_check_FKs(List *relations, bool tempTables)
        {
                Relation        rel = lfirst(cell);
 
-               if (rel->rd_rel->reltriggers != 0)
+               if (rel->rd_rel->relhastriggers)
                        oids = lappend_oid(oids, RelationGetRelid(rel));
        }