* heap.c
* code to create and destroy POSTGRES heap relations
*
- * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/catalog/heap.c,v 1.292 2005/10/18 01:06:23 tgl Exp $
+ * src/backend/catalog/heap.c
*
*
* INTERFACE ROUTINES
*/
#include "postgres.h"
-#include "access/heapam.h"
#include "access/genam.h"
+#include "access/heapam.h"
+#include "access/sysattr.h"
+#include "access/transam.h"
+#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/dependency.h"
#include "catalog/heap.h"
#include "catalog/index.h"
#include "catalog/indexing.h"
+#include "catalog/namespace.h"
+#include "catalog/objectaccess.h"
#include "catalog/pg_attrdef.h"
+#include "catalog/pg_collation.h"
#include "catalog/pg_constraint.h"
+#include "catalog/pg_foreign_table.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_statistic.h"
+#include "catalog/pg_tablespace.h"
#include "catalog/pg_type.h"
+#include "catalog/pg_type_fn.h"
+#include "catalog/storage.h"
#include "commands/tablecmds.h"
-#include "commands/trigger.h"
+#include "commands/typecmds.h"
#include "miscadmin.h"
-#include "nodes/makefuncs.h"
-#include "optimizer/clauses.h"
-#include "optimizer/planmain.h"
+#include "nodes/nodeFuncs.h"
#include "optimizer/var.h"
#include "parser/parse_coerce.h"
+#include "parser/parse_collate.h"
#include "parser/parse_expr.h"
#include "parser/parse_relation.h"
-#include "rewrite/rewriteRemove.h"
+#include "storage/bufmgr.h"
+#include "storage/freespace.h"
#include "storage/smgr.h"
+#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/relcache.h"
+#include "utils/snapmgr.h"
#include "utils/syscache.h"
+#include "utils/tqual.h"
+/* Potentially set by contrib/pg_upgrade_support functions */
+Oid binary_upgrade_next_heap_pg_class_oid = InvalidOid;
+Oid binary_upgrade_next_toast_pg_class_oid = InvalidOid;
+
static void AddNewRelationTuple(Relation pg_class_desc,
Relation new_rel_desc,
- Oid new_rel_oid, Oid new_type_oid,
+ Oid new_rel_oid,
+ Oid new_type_oid,
+ Oid reloftype,
Oid relowner,
- char relkind);
+ char relkind,
+ Datum relacl,
+ Datum reloptions);
static Oid AddNewRelationType(const char *typeName,
Oid typeNamespace,
Oid new_rel_oid,
- char new_rel_kind);
+ char new_rel_kind,
+ Oid ownerid,
+ Oid new_row_type,
+ Oid new_array_type);
static void RelationRemoveInheritance(Oid relid);
-static void StoreRelCheck(Relation rel, char *ccname, char *ccbin);
-static void StoreConstraints(Relation rel, TupleDesc tupdesc);
+static void StoreRelCheck(Relation rel, char *ccname, Node *expr,
+ bool is_local, int inhcount);
+static void StoreConstraints(Relation rel, List *cooked_constraints);
+static bool MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr,
+ bool allow_merge, bool is_local);
static void SetRelationNumChecks(Relation rel, int numchecks);
+static Node *cookConstraint(ParseState *pstate,
+ Node *raw_constraint,
+ char *relname);
+static List *insert_ordered_unique_oid(List *list, Oid datum);
/* ----------------------------------------------------------------
* Disadvantage: special cases will be all over the place.
*/
+/*
+ * The initializers below do not include the attoptions or attacl fields,
+ * but that's OK - we're never going to reference anything beyond the
+ * fixed-size portion of the structure anyway.
+ */
+
static FormData_pg_attribute a1 = {
0, {"ctid"}, TIDOID, 0, sizeof(ItemPointerData),
SelfItemPointerAttributeNumber, 0, -1, -1,
Oid relid,
TupleDesc tupDesc,
char relkind,
+ char relpersistence,
bool shared_relation,
+ bool mapped_relation,
bool allow_system_table_mods)
{
bool create_storage;
{
case RELKIND_VIEW:
case RELKIND_COMPOSITE_TYPE:
+ case RELKIND_FOREIGN_TABLE:
create_storage = false;
/*
tupDesc,
relid,
reltablespace,
- shared_relation);
+ shared_relation,
+ mapped_relation,
+ relpersistence);
/*
- * have the storage manager create the relation's disk file, if needed.
+ * Have the storage manager create the relation's disk file, if needed.
+ *
+ * We only create the main fork here, other forks will be created on
+ * demand.
*/
if (create_storage)
{
- Assert(rel->rd_smgr == NULL);
RelationOpenSmgr(rel);
- smgrcreate(rel->rd_smgr, rel->rd_istemp, false);
+ RelationCreateStorage(rel->rd_node, relpersistence);
}
return rel;
* --------------------------------
*/
void
-CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind)
+CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind,
+ bool allow_system_table_mods)
{
int i;
int j;
NameStr(tupdesc->attrs[i]->attname)) == 0)
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_COLUMN),
- errmsg("column name \"%s\" is duplicated",
+ errmsg("column name \"%s\" specified more than once",
NameStr(tupdesc->attrs[j]->attname))));
}
}
for (i = 0; i < natts; i++)
{
CheckAttributeType(NameStr(tupdesc->attrs[i]->attname),
- tupdesc->attrs[i]->atttypid);
+ tupdesc->attrs[i]->atttypid,
+ tupdesc->attrs[i]->attcollation,
+ allow_system_table_mods);
}
}
* --------------------------------
*/
void
-CheckAttributeType(const char *attname, Oid atttypid)
+CheckAttributeType(const char *attname, Oid atttypid, Oid attcollation,
+ bool allow_system_table_mods)
{
char att_typtype = get_typtype(atttypid);
- /*
- * Warn user, but don't fail, if column to be created has UNKNOWN type
- * (usually as a result of a 'retrieve into' - jolly)
- *
- * Refuse any attempt to create a pseudo-type column.
- */
if (atttypid == UNKNOWNOID)
+ {
+ /*
+ * Warn user, but don't fail, if column to be created has UNKNOWN type
+ * (usually as a result of a 'retrieve into' - jolly)
+ */
ereport(WARNING,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("column \"%s\" has type \"unknown\"", attname),
errdetail("Proceeding with relation creation anyway.")));
- else if (att_typtype == 'p')
+ }
+ else if (att_typtype == TYPTYPE_PSEUDO)
{
- /* Special hack for pg_statistic: allow ANYARRAY during initdb */
- if (atttypid != ANYARRAYOID || IsUnderPostmaster)
+ /*
+ * Refuse any attempt to create a pseudo-type column, except for a
+ * special hack for pg_statistic: allow ANYARRAY when modifying system
+ * catalogs (this allows creating pg_statistic and cloning it during
+ * VACUUM FULL)
+ */
+ if (atttypid != ANYARRAYOID || !allow_system_table_mods)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("column \"%s\" has pseudo-type %s",
attname, format_type_be(atttypid))));
}
+ else if (att_typtype == TYPTYPE_COMPOSITE)
+ {
+ /*
+ * For a composite type, recurse into its attributes. You might think
+ * this isn't necessary, but since we allow system catalogs to break
+ * the rule, we have to guard against the case.
+ */
+ Relation relation;
+ TupleDesc tupdesc;
+ int i;
+
+ relation = relation_open(get_typ_typrelid(atttypid), AccessShareLock);
+
+ tupdesc = RelationGetDescr(relation);
+
+ for (i = 0; i < tupdesc->natts; i++)
+ {
+ Form_pg_attribute attr = tupdesc->attrs[i];
+
+ if (attr->attisdropped)
+ continue;
+ CheckAttributeType(NameStr(attr->attname), attr->atttypid, attr->attcollation,
+ allow_system_table_mods);
+ }
+
+ relation_close(relation, AccessShareLock);
+ }
+
+ /*
+ * This might not be strictly invalid per SQL standard, but it is
+ * pretty useless, and it cannot be dumped, so we must disallow
+ * it.
+ */
+ if (type_is_collatable(atttypid) && !OidIsValid(attcollation))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("no collation was derived for column \"%s\" with collatable type %s",
+ attname, format_type_be(atttypid)),
+ errhint("Use the COLLATE clause to set the collation explicitly.")));
+}
+
+/*
+ * InsertPgAttributeTuple
+ * Construct and insert a new tuple in pg_attribute.
+ *
+ * Caller has already opened and locked pg_attribute. new_attribute is the
+ * attribute to insert (but we ignore attacl and attoptions, which are always
+ * initialized to NULL).
+ *
+ * indstate is the index state for CatalogIndexInsert. It can be passed as
+ * NULL, in which case we'll fetch the necessary info. (Don't do this when
+ * inserting multiple attributes, because it's a tad more expensive.)
+ */
+void
+InsertPgAttributeTuple(Relation pg_attribute_rel,
+ Form_pg_attribute new_attribute,
+ CatalogIndexState indstate)
+{
+ Datum values[Natts_pg_attribute];
+ bool nulls[Natts_pg_attribute];
+ HeapTuple tup;
+
+ /* This is a tad tedious, but way cleaner than what we used to do... */
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+
+ values[Anum_pg_attribute_attrelid - 1] = ObjectIdGetDatum(new_attribute->attrelid);
+ values[Anum_pg_attribute_attname - 1] = NameGetDatum(&new_attribute->attname);
+ values[Anum_pg_attribute_atttypid - 1] = ObjectIdGetDatum(new_attribute->atttypid);
+ values[Anum_pg_attribute_attstattarget - 1] = Int32GetDatum(new_attribute->attstattarget);
+ values[Anum_pg_attribute_attlen - 1] = Int16GetDatum(new_attribute->attlen);
+ values[Anum_pg_attribute_attnum - 1] = Int16GetDatum(new_attribute->attnum);
+ values[Anum_pg_attribute_attndims - 1] = Int32GetDatum(new_attribute->attndims);
+ values[Anum_pg_attribute_attcacheoff - 1] = Int32GetDatum(new_attribute->attcacheoff);
+ values[Anum_pg_attribute_atttypmod - 1] = Int32GetDatum(new_attribute->atttypmod);
+ values[Anum_pg_attribute_attbyval - 1] = BoolGetDatum(new_attribute->attbyval);
+ values[Anum_pg_attribute_attstorage - 1] = CharGetDatum(new_attribute->attstorage);
+ values[Anum_pg_attribute_attalign - 1] = CharGetDatum(new_attribute->attalign);
+ values[Anum_pg_attribute_attnotnull - 1] = BoolGetDatum(new_attribute->attnotnull);
+ values[Anum_pg_attribute_atthasdef - 1] = BoolGetDatum(new_attribute->atthasdef);
+ values[Anum_pg_attribute_attisdropped - 1] = BoolGetDatum(new_attribute->attisdropped);
+ values[Anum_pg_attribute_attislocal - 1] = BoolGetDatum(new_attribute->attislocal);
+ values[Anum_pg_attribute_attinhcount - 1] = Int32GetDatum(new_attribute->attinhcount);
+ values[Anum_pg_attribute_attcollation - 1] = ObjectIdGetDatum(new_attribute->attcollation);
+
+ /* start out with empty permissions and empty options */
+ nulls[Anum_pg_attribute_attacl - 1] = true;
+ nulls[Anum_pg_attribute_attoptions - 1] = true;
+
+ tup = heap_form_tuple(RelationGetDescr(pg_attribute_rel), values, nulls);
+
+ /* finally insert the new tuple, update the indexes, and clean up */
+ simple_heap_insert(pg_attribute_rel, tup);
+
+ if (indstate != NULL)
+ CatalogIndexInsert(indstate, tup);
+ else
+ CatalogUpdateIndexes(pg_attribute_rel, tup);
+
+ heap_freetuple(tup);
}
/* --------------------------------
bool oidislocal,
int oidinhcount)
{
- const Form_pg_attribute *dpp;
+ Form_pg_attribute attr;
int i;
- HeapTuple tup;
Relation rel;
CatalogIndexState indstate;
int natts = tupdesc->natts;
* First we add the user attributes. This is also a convenient place to
* add dependencies on their datatypes.
*/
- dpp = tupdesc->attrs;
for (i = 0; i < natts; i++)
{
+ attr = tupdesc->attrs[i];
/* Fill in the correct relation OID */
- (*dpp)->attrelid = new_rel_oid;
+ attr->attrelid = new_rel_oid;
/* Make sure these are OK, too */
- (*dpp)->attstattarget = -1;
- (*dpp)->attcacheoff = -1;
+ attr->attstattarget = -1;
+ attr->attcacheoff = -1;
- tup = heap_addheader(Natts_pg_attribute,
- false,
- ATTRIBUTE_TUPLE_SIZE,
- (void *) *dpp);
-
- simple_heap_insert(rel, tup);
-
- CatalogIndexInsert(indstate, tup);
-
- heap_freetuple(tup);
+ InsertPgAttributeTuple(rel, attr, indstate);
+ /* Add dependency info */
myself.classId = RelationRelationId;
myself.objectId = new_rel_oid;
myself.objectSubId = i + 1;
referenced.classId = TypeRelationId;
- referenced.objectId = (*dpp)->atttypid;
+ referenced.objectId = attr->atttypid;
referenced.objectSubId = 0;
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
- dpp++;
+ if (OidIsValid(attr->attcollation))
+ {
+ referenced.classId = CollationRelationId;
+ referenced.objectId = attr->attcollation;
+ referenced.objectSubId = 0;
+ recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+ }
}
/*
*/
if (relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE)
{
- dpp = SysAtt;
- for (i = 0; i < -1 - FirstLowInvalidHeapAttributeNumber; i++)
+ for (i = 0; i < (int) lengthof(SysAtt); i++)
{
- if (tupdesc->tdhasoid ||
- (*dpp)->attnum != ObjectIdAttributeNumber)
- {
- Form_pg_attribute attStruct;
-
- tup = heap_addheader(Natts_pg_attribute,
- false,
- ATTRIBUTE_TUPLE_SIZE,
- (void *) *dpp);
- attStruct = (Form_pg_attribute) GETSTRUCT(tup);
+ FormData_pg_attribute attStruct;
- /* Fill in the correct relation OID in the copied tuple */
- attStruct->attrelid = new_rel_oid;
+ /* skip OID where appropriate */
+ if (!tupdesc->tdhasoid &&
+ SysAtt[i]->attnum == ObjectIdAttributeNumber)
+ continue;
- /* Fill in correct inheritance info for the OID column */
- if (attStruct->attnum == ObjectIdAttributeNumber)
- {
- attStruct->attislocal = oidislocal;
- attStruct->attinhcount = oidinhcount;
- }
+ memcpy(&attStruct, (char *) SysAtt[i], sizeof(FormData_pg_attribute));
- /*
- * Unneeded since they should be OK in the constant data
- * anyway
- */
- /* attStruct->attstattarget = 0; */
- /* attStruct->attcacheoff = -1; */
+ /* Fill in the correct relation OID in the copied tuple */
+ attStruct.attrelid = new_rel_oid;
- simple_heap_insert(rel, tup);
-
- CatalogIndexInsert(indstate, tup);
-
- heap_freetuple(tup);
+ /* Fill in correct inheritance info for the OID column */
+ if (attStruct.attnum == ObjectIdAttributeNumber)
+ {
+ attStruct.attislocal = oidislocal;
+ attStruct.attinhcount = oidinhcount;
}
- dpp++;
+
+ InsertPgAttributeTuple(rel, &attStruct, indstate);
}
}
heap_close(rel, RowExclusiveLock);
}
+/* --------------------------------
+ * InsertPgClassTuple
+ *
+ * Construct and insert a new tuple in pg_class.
+ *
+ * Caller has already opened and locked pg_class.
+ * Tuple data is taken from new_rel_desc->rd_rel, except for the
+ * variable-width fields which are not present in a cached reldesc.
+ * relacl and reloptions are passed in Datum form (to avoid having
+ * to reference the data types in heap.h). Pass (Datum) 0 to set them
+ * to NULL.
+ * --------------------------------
+ */
+void
+InsertPgClassTuple(Relation pg_class_desc,
+ Relation new_rel_desc,
+ Oid new_rel_oid,
+ Datum relacl,
+ Datum reloptions)
+{
+ Form_pg_class rd_rel = new_rel_desc->rd_rel;
+ Datum values[Natts_pg_class];
+ bool nulls[Natts_pg_class];
+ HeapTuple tup;
+
+ /* This is a tad tedious, but way cleaner than what we used to do... */
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+
+ values[Anum_pg_class_relname - 1] = NameGetDatum(&rd_rel->relname);
+ values[Anum_pg_class_relnamespace - 1] = ObjectIdGetDatum(rd_rel->relnamespace);
+ values[Anum_pg_class_reltype - 1] = ObjectIdGetDatum(rd_rel->reltype);
+ values[Anum_pg_class_reloftype - 1] = ObjectIdGetDatum(rd_rel->reloftype);
+ values[Anum_pg_class_relowner - 1] = ObjectIdGetDatum(rd_rel->relowner);
+ values[Anum_pg_class_relam - 1] = ObjectIdGetDatum(rd_rel->relam);
+ values[Anum_pg_class_relfilenode - 1] = ObjectIdGetDatum(rd_rel->relfilenode);
+ values[Anum_pg_class_reltablespace - 1] = ObjectIdGetDatum(rd_rel->reltablespace);
+ values[Anum_pg_class_relpages - 1] = Int32GetDatum(rd_rel->relpages);
+ values[Anum_pg_class_reltuples - 1] = Float4GetDatum(rd_rel->reltuples);
+ values[Anum_pg_class_reltoastrelid - 1] = ObjectIdGetDatum(rd_rel->reltoastrelid);
+ values[Anum_pg_class_reltoastidxid - 1] = ObjectIdGetDatum(rd_rel->reltoastidxid);
+ values[Anum_pg_class_relhasindex - 1] = BoolGetDatum(rd_rel->relhasindex);
+ values[Anum_pg_class_relisshared - 1] = BoolGetDatum(rd_rel->relisshared);
+ values[Anum_pg_class_relpersistence - 1] = CharGetDatum(rd_rel->relpersistence);
+ values[Anum_pg_class_relkind - 1] = CharGetDatum(rd_rel->relkind);
+ values[Anum_pg_class_relnatts - 1] = Int16GetDatum(rd_rel->relnatts);
+ values[Anum_pg_class_relchecks - 1] = Int16GetDatum(rd_rel->relchecks);
+ values[Anum_pg_class_relhasoids - 1] = BoolGetDatum(rd_rel->relhasoids);
+ values[Anum_pg_class_relhaspkey - 1] = BoolGetDatum(rd_rel->relhaspkey);
+ values[Anum_pg_class_relhasrules - 1] = BoolGetDatum(rd_rel->relhasrules);
+ values[Anum_pg_class_relhastriggers - 1] = BoolGetDatum(rd_rel->relhastriggers);
+ values[Anum_pg_class_relhassubclass - 1] = BoolGetDatum(rd_rel->relhassubclass);
+ values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid);
+ if (relacl != (Datum) 0)
+ values[Anum_pg_class_relacl - 1] = relacl;
+ else
+ nulls[Anum_pg_class_relacl - 1] = true;
+ if (reloptions != (Datum) 0)
+ values[Anum_pg_class_reloptions - 1] = reloptions;
+ else
+ nulls[Anum_pg_class_reloptions - 1] = true;
+
+ tup = heap_form_tuple(RelationGetDescr(pg_class_desc), values, nulls);
+
+ /*
+ * The new tuple must have the oid already chosen for the rel. Sure would
+ * be embarrassing to do this sort of thing in polite company.
+ */
+ HeapTupleSetOid(tup, new_rel_oid);
+
+ /* finally insert the new tuple, update the indexes, and clean up */
+ simple_heap_insert(pg_class_desc, tup);
+
+ CatalogUpdateIndexes(pg_class_desc, tup);
+
+ heap_freetuple(tup);
+}
+
/* --------------------------------
* AddNewRelationTuple
*
Relation new_rel_desc,
Oid new_rel_oid,
Oid new_type_oid,
+ Oid reloftype,
Oid relowner,
- char relkind)
+ char relkind,
+ Datum relacl,
+ Datum reloptions)
{
Form_pg_class new_rel_reltup;
- HeapTuple tup;
/*
* first we update some of the information in our uncataloged relation's
break;
}
+ /* Initialize relfrozenxid */
+ if (relkind == RELKIND_RELATION ||
+ relkind == RELKIND_TOASTVALUE)
+ {
+ /*
+ * Initialize to the minimum XID that could put tuples in the table.
+ * We know that no xacts older than RecentXmin are still running, so
+ * that will do.
+ */
+ new_rel_reltup->relfrozenxid = RecentXmin;
+ }
+ else
+ {
+ /*
+ * Other relation types will not contain XIDs, so set relfrozenxid to
+ * InvalidTransactionId. (Note: a sequence does contain a tuple, but
+ * we force its xmin to be FrozenTransactionId always; see
+ * commands/sequence.c.)
+ */
+ new_rel_reltup->relfrozenxid = InvalidTransactionId;
+ }
+
new_rel_reltup->relowner = relowner;
new_rel_reltup->reltype = new_type_oid;
+ new_rel_reltup->reloftype = reloftype;
new_rel_reltup->relkind = relkind;
new_rel_desc->rd_att->tdtypeid = new_type_oid;
- /* ----------------
- * now form a tuple to add to pg_class
- * XXX Natts_pg_class_fixed is a hack - see pg_class.h
- * ----------------
- */
- tup = heap_addheader(Natts_pg_class_fixed,
- true,
- CLASS_TUPLE_SIZE,
- (void *) new_rel_reltup);
-
- /* force tuple to have the desired OID */
- HeapTupleSetOid(tup, new_rel_oid);
-
- /*
- * finally insert the new tuple, update the indexes, and clean up.
- */
- simple_heap_insert(pg_class_desc, tup);
-
- CatalogUpdateIndexes(pg_class_desc, tup);
-
- heap_freetuple(tup);
+ /* Now build and insert the tuple */
+ InsertPgClassTuple(pg_class_desc, new_rel_desc, new_rel_oid,
+ relacl, reloptions);
}
AddNewRelationType(const char *typeName,
Oid typeNamespace,
Oid new_rel_oid,
- char new_rel_kind)
+ char new_rel_kind,
+ Oid ownerid,
+ Oid new_row_type,
+ Oid new_array_type)
{
return
- TypeCreate(typeName, /* type name */
+ TypeCreate(new_row_type, /* optional predetermined OID */
+ typeName, /* type name */
typeNamespace, /* type namespace */
new_rel_oid, /* relation oid */
new_rel_kind, /* relation kind */
+ ownerid, /* owner's ID */
-1, /* internal size (varlena) */
- 'c', /* type-type (complex) */
- ',', /* default array delimiter */
+ TYPTYPE_COMPOSITE, /* type-type (composite) */
+ TYPCATEGORY_COMPOSITE, /* type-category (ditto) */
+ false, /* composite types are never preferred */
+ DEFAULT_TYPDELIM, /* default array delimiter */
F_RECORD_IN, /* input procedure */
F_RECORD_OUT, /* output procedure */
F_RECORD_RECV, /* receive procedure */
F_RECORD_SEND, /* send procedure */
+ InvalidOid, /* typmodin procedure - none */
+ InvalidOid, /* typmodout procedure - none */
InvalidOid, /* analyze procedure - default */
InvalidOid, /* array element type - irrelevant */
+ false, /* this is not an array type */
+ new_array_type, /* array type if any */
InvalidOid, /* domain base type - irrelevant */
NULL, /* default value - none */
NULL, /* default binary representation */
'x', /* fully TOASTable */
-1, /* typmod */
0, /* array dimensions for typBaseType */
- false); /* Type NOT NULL */
+ false, /* Type NOT NULL */
+ InvalidOid); /* typcollation */
}
/* --------------------------------
* heap_create_with_catalog
*
* creates a new cataloged relation. see comments above.
+ *
+ * Arguments:
+ * relname: name to give to new rel
+ * relnamespace: OID of namespace it goes in
+ * reltablespace: OID of tablespace it goes in
+ * relid: OID to assign to new rel, or InvalidOid to select a new OID
+ * reltypeid: OID to assign to rel's rowtype, or InvalidOid to select one
+ * ownerid: OID of new rel's owner
+ * tupdesc: tuple descriptor (source of column definitions)
+ * cooked_constraints: list of precooked check constraints and defaults
+ * relkind: relkind for new rel
+ * shared_relation: TRUE if it's to be a shared relation
+ * mapped_relation: TRUE if the relation will use the relfilenode map
+ * oidislocal: TRUE if oid column (if any) should be marked attislocal
+ * oidinhcount: attinhcount to assign to oid column (if any)
+ * oncommit: ON COMMIT marking (only relevant if it's a temp table)
+ * reloptions: reloptions in Datum form, or (Datum) 0 if none
+ * use_user_acl: TRUE if should look for user-defined default permissions;
+ * if FALSE, relacl is always set NULL
+ * allow_system_table_mods: TRUE to allow creation in system namespaces
+ *
+ * Returns the OID of the new relation
* --------------------------------
*/
Oid
Oid relnamespace,
Oid reltablespace,
Oid relid,
+ Oid reltypeid,
+ Oid reloftypeid,
Oid ownerid,
TupleDesc tupdesc,
+ List *cooked_constraints,
char relkind,
+ char relpersistence,
bool shared_relation,
+ bool mapped_relation,
bool oidislocal,
int oidinhcount,
OnCommitAction oncommit,
- bool allow_system_table_mods)
+ Datum reloptions,
+ bool use_user_acl,
+ bool allow_system_table_mods,
+ bool if_not_exists)
{
Relation pg_class_desc;
Relation new_rel_desc;
+ Acl *relacl;
+ Oid existing_relid;
+ Oid old_type_oid;
Oid new_type_oid;
+ Oid new_array_oid = InvalidOid;
pg_class_desc = heap_open(RelationRelationId, RowExclusiveLock);
*/
Assert(IsNormalProcessingMode() || IsBootstrapProcessingMode());
- CheckAttributeNamesTypes(tupdesc, relkind);
+ CheckAttributeNamesTypes(tupdesc, relkind, allow_system_table_mods);
- if (get_relname_relid(relname, relnamespace))
+ /*
+ * If the relation already exists, it's an error, unless the user specifies
+ * "IF NOT EXISTS". In that case, we just print a notice and do nothing
+ * further.
+ */
+ existing_relid = get_relname_relid(relname, relnamespace);
+ if (existing_relid != InvalidOid)
+ {
+ if (if_not_exists)
+ {
+ ereport(NOTICE,
+ (errcode(ERRCODE_DUPLICATE_TABLE),
+ errmsg("relation \"%s\" already exists, skipping",
+ relname)));
+ heap_close(pg_class_desc, RowExclusiveLock);
+ return InvalidOid;
+ }
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_TABLE),
errmsg("relation \"%s\" already exists", relname)));
+ }
+
+ /*
+ * Since we are going to create a rowtype as well, also check for
+ * collision with an existing type name. If there is one and it's an
+ * autogenerated array, we can rename it out of the way; otherwise we can
+ * at least give a good error message.
+ */
+ old_type_oid = GetSysCacheOid2(TYPENAMENSP,
+ CStringGetDatum(relname),
+ ObjectIdGetDatum(relnamespace));
+ if (OidIsValid(old_type_oid))
+ {
+ if (!moveArrayTypeName(old_type_oid, relname, relnamespace))
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("type \"%s\" already exists", relname),
+ errhint("A relation has an associated type of the same name, "
+ "so you must use a name that doesn't conflict "
+ "with any existing type.")));
+ }
+
+ /*
+ * Shared relations must be in pg_global (last-ditch check)
+ */
+ if (shared_relation && reltablespace != GLOBALTABLESPACE_OID)
+ elog(ERROR, "shared relations must be placed in pg_global tablespace");
/*
* Allocate an OID for the relation, unless we were told what to use.
*
- * The OID will be the relfilenode as well, so make sure it doesn't collide
- * with either pg_class OIDs or existing physical files.
+ * The OID will be the relfilenode as well, so make sure it doesn't
+ * collide with either pg_class OIDs or existing physical files.
*/
if (!OidIsValid(relid))
- relid = GetNewRelFileNode(reltablespace, shared_relation,
- pg_class_desc);
+ {
+ /*
+ * Use binary-upgrade override for pg_class.oid/relfilenode,
+ * if supplied.
+ */
+ if (OidIsValid(binary_upgrade_next_heap_pg_class_oid) &&
+ (relkind == RELKIND_RELATION || relkind == RELKIND_SEQUENCE ||
+ relkind == RELKIND_VIEW || relkind == RELKIND_COMPOSITE_TYPE ||
+ relkind == RELKIND_FOREIGN_TABLE))
+ {
+ relid = binary_upgrade_next_heap_pg_class_oid;
+ binary_upgrade_next_heap_pg_class_oid = InvalidOid;
+ }
+ else if (OidIsValid(binary_upgrade_next_toast_pg_class_oid) &&
+ relkind == RELKIND_TOASTVALUE)
+ {
+ relid = binary_upgrade_next_toast_pg_class_oid;
+ binary_upgrade_next_toast_pg_class_oid = InvalidOid;
+ }
+ else
+ relid = GetNewRelFileNode(reltablespace, pg_class_desc,
+ relpersistence);
+ }
+
+ /*
+ * Determine the relation's initial permissions.
+ */
+ if (use_user_acl)
+ {
+ switch (relkind)
+ {
+ case RELKIND_RELATION:
+ case RELKIND_VIEW:
+ case RELKIND_FOREIGN_TABLE:
+ relacl = get_user_default_acl(ACL_OBJECT_RELATION, ownerid,
+ relnamespace);
+ break;
+ case RELKIND_SEQUENCE:
+ relacl = get_user_default_acl(ACL_OBJECT_SEQUENCE, ownerid,
+ relnamespace);
+ break;
+ default:
+ relacl = NULL;
+ break;
+ }
+ }
+ else
+ relacl = NULL;
/*
* Create the relcache entry (mostly dummy at this point) and the physical
relid,
tupdesc,
relkind,
+ relpersistence,
shared_relation,
+ mapped_relation,
allow_system_table_mods);
Assert(relid == RelationGetRelid(new_rel_desc));
/*
- * since defining a relation also defines a complex type, we add a new
- * system type corresponding to the new relation.
+ * Decide whether to create an array type over the relation's rowtype. We
+ * do not create any array types for system catalogs (ie, those made
+ * during initdb). We create array types for regular relations, views,
+ * composite types and foreign tables ... but not, eg, for toast tables or
+ * sequences.
+ */
+ if (IsUnderPostmaster && (relkind == RELKIND_RELATION ||
+ relkind == RELKIND_VIEW ||
+ relkind == RELKIND_FOREIGN_TABLE ||
+ relkind == RELKIND_COMPOSITE_TYPE))
+ new_array_oid = AssignTypeArrayOid();
+
+ /*
+ * Since defining a relation also defines a complex type, we add a new
+ * system type corresponding to the new relation. The OID of the type can
+ * be preselected by the caller, but if reltypeid is InvalidOid, we'll
+ * generate a new OID for it.
*
- * NOTE: we could get a unique-index failure here, in case the same name has
- * already been used for a type.
+ * NOTE: we could get a unique-index failure here, in case someone else is
+ * creating the same type name in parallel but hadn't committed yet when
+ * we checked for a duplicate name above.
*/
new_type_oid = AddNewRelationType(relname,
relnamespace,
relid,
- relkind);
+ relkind,
+ ownerid,
+ reltypeid,
+ new_array_oid);
+
+ /*
+ * Now make the array type if wanted.
+ */
+ if (OidIsValid(new_array_oid))
+ {
+ char *relarrayname;
+
+ relarrayname = makeArrayTypeName(relname, relnamespace);
+
+ TypeCreate(new_array_oid, /* force the type's OID to this */
+ relarrayname, /* Array type name */
+ relnamespace, /* Same namespace as parent */
+ InvalidOid, /* Not composite, no relationOid */
+ 0, /* relkind, also N/A here */
+ ownerid, /* owner's ID */
+ -1, /* Internal size (varlena) */
+ TYPTYPE_BASE, /* Not composite - typelem is */
+ TYPCATEGORY_ARRAY, /* type-category (array) */
+ false, /* array types are never preferred */
+ DEFAULT_TYPDELIM, /* default array delimiter */
+ F_ARRAY_IN, /* array input proc */
+ F_ARRAY_OUT, /* array output proc */
+ F_ARRAY_RECV, /* array recv (bin) proc */
+ F_ARRAY_SEND, /* array send (bin) proc */
+ InvalidOid, /* typmodin procedure - none */
+ InvalidOid, /* typmodout procedure - none */
+ InvalidOid, /* analyze procedure - default */
+ new_type_oid, /* array element type - the rowtype */
+ true, /* yes, this is an array type */
+ InvalidOid, /* this has no array type */
+ InvalidOid, /* domain base type - irrelevant */
+ NULL, /* default value - none */
+ NULL, /* default binary representation */
+ false, /* passed by reference */
+ 'd', /* alignment - must be the largest! */
+ 'x', /* fully TOASTable */
+ -1, /* typmod */
+ 0, /* array dimensions for typBaseType */
+ false, /* Type NOT NULL */
+ InvalidOid); /* typcollation */
+
+ pfree(relarrayname);
+ }
/*
* now create an entry in pg_class for the relation.
new_rel_desc,
relid,
new_type_oid,
+ reloftypeid,
ownerid,
- relkind);
+ relkind,
+ PointerGetDatum(relacl),
+ reloptions);
/*
* now add tuples to pg_attribute for the attributes in our new relation.
oidislocal, oidinhcount);
/*
- * make a dependency link to force the relation to be deleted if its
- * namespace is. Skip this in bootstrap mode, since we don't make
- * dependencies while bootstrapping.
+ * Make a dependency link to force the relation to be deleted if its
+ * namespace is. Also make a dependency link to its owner, as well as
+ * dependencies for any roles mentioned in the default ACL.
+ *
+ * For composite types, these dependencies are tracked for the pg_type
+ * entry, so we needn't record them here. Likewise, TOAST tables don't
+ * need a namespace dependency (they live in a pinned namespace) nor an
+ * owner dependency (they depend indirectly through the parent table), nor
+ * should they have any ACL entries. The same applies for extension
+ * dependencies.
*
- * Also make a dependency link to its owner.
+ * Also, skip this in bootstrap mode, since we don't make dependencies
+ * while bootstrapping.
*/
- if (!IsBootstrapProcessingMode())
+ if (relkind != RELKIND_COMPOSITE_TYPE &&
+ relkind != RELKIND_TOASTVALUE &&
+ !IsBootstrapProcessingMode())
{
ObjectAddress myself,
referenced;
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
recordDependencyOnOwner(RelationRelationId, relid, ownerid);
+
+ recordDependencyOnCurrentExtension(&myself);
+
+ if (reloftypeid)
+ {
+ referenced.classId = TypeRelationId;
+ referenced.objectId = reloftypeid;
+ referenced.objectSubId = 0;
+ recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+ }
+
+ if (relacl != NULL)
+ {
+ int nnewmembers;
+ Oid *newmembers;
+
+ nnewmembers = aclmembers(relacl, &newmembers);
+ updateAclDependencies(RelationRelationId, relid, 0,
+ ownerid,
+ 0, NULL,
+ nnewmembers, newmembers);
+ }
}
+ /* Post creation hook for new relation */
+ InvokeObjectAccessHook(OAT_POST_CREATE, RelationRelationId, relid, 0);
+
/*
- * store constraints and defaults passed in the tupdesc, if any.
+ * Store any supplied constraints and defaults.
*
- * NB: this may do a CommandCounterIncrement and rebuild the relcache entry,
- * so the relation must be valid and self-consistent at this point. In
- * particular, there are not yet constraints and defaults anywhere.
+ * NB: this may do a CommandCounterIncrement and rebuild the relcache
+ * entry, so the relation must be valid and self-consistent at this point.
+ * In particular, there are not yet constraints and defaults anywhere.
*/
- StoreConstraints(new_rel_desc, tupdesc);
+ StoreConstraints(new_rel_desc, cooked_constraints);
/*
* If there's a special on-commit action, remember it
if (oncommit != ONCOMMIT_NOOP)
register_on_commit_action(relid, oncommit);
+ /*
+ * If this is an unlogged relation, it needs an init fork so that it
+ * can be correctly reinitialized on restart. Since we're going to
+ * do an immediate sync, we ony need to xlog this if archiving or
+ * streaming is enabled. And the immediate sync is required, because
+ * otherwise there's no guarantee that this will hit the disk before
+ * the next checkpoint moves the redo pointer.
+ */
+ if (relpersistence == RELPERSISTENCE_UNLOGGED)
+ {
+ Assert(relkind == RELKIND_RELATION || relkind == RELKIND_TOASTVALUE);
+
+ smgrcreate(new_rel_desc->rd_smgr, INIT_FORKNUM, false);
+ if (XLogIsNeeded())
+ log_smgrcreate(&new_rel_desc->rd_smgr->smgr_rnode.node,
+ INIT_FORKNUM);
+ smgrimmedsync(new_rel_desc->rd_smgr, INIT_FORKNUM);
+ }
+
/*
* ok, the relation has been cataloged, so close our relations and return
* the OID of the newly created relation.
/* Grab an appropriate lock on the pg_class relation */
pg_class_desc = heap_open(RelationRelationId, RowExclusiveLock);
- tup = SearchSysCache(RELOID,
- ObjectIdGetDatum(relid),
- 0, 0, 0);
+ tup = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
if (!HeapTupleIsValid(tup))
elog(ERROR, "cache lookup failed for relation %u", relid);
attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
- tuple = SearchSysCacheCopy(ATTNUM,
- ObjectIdGetDatum(relid),
- Int16GetDatum(attnum),
- 0, 0);
+ tuple = SearchSysCacheCopy2(ATTNUM,
+ ObjectIdGetDatum(relid),
+ Int16GetDatum(attnum));
if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
elog(ERROR, "cache lookup failed for attribute %d of relation %u",
attnum, relid);
/* Fix the pg_attribute row */
attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
- tuple = SearchSysCacheCopy(ATTNUM,
- ObjectIdGetDatum(myrelid),
- Int16GetDatum(myattnum),
- 0, 0);
+ tuple = SearchSysCacheCopy2(ATTNUM,
+ ObjectIdGetDatum(myrelid),
+ Int16GetDatum(myattnum));
if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
elog(ERROR, "cache lookup failed for attribute %d of relation %u",
myattnum, myrelid);
rel = relation_open(relid, AccessExclusiveLock);
/*
- * Schedule unlinking of the relation's physical file at commit.
+ * There can no longer be anyone *else* touching the relation, but we
+ * might still have open queries or cursors, or pending trigger events,
+ * in our own session.
+ */
+ CheckTableNotInUse(rel, "DROP TABLE");
+
+ /*
+ * Delete pg_foreign_table tuple first.
+ */
+ if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+ {
+ Relation rel;
+ HeapTuple tuple;
+
+ rel = heap_open(ForeignTableRelationId, RowExclusiveLock);
+
+ tuple = SearchSysCache1(FOREIGNTABLEREL, ObjectIdGetDatum(relid));
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "cache lookup failed for foreign table %u", relid);
+
+ simple_heap_delete(rel, &tuple->t_self);
+
+ ReleaseSysCache(tuple);
+ heap_close(rel, RowExclusiveLock);
+ }
+
+ /*
+ * Schedule unlinking of the relation's physical files at commit.
*/
if (rel->rd_rel->relkind != RELKIND_VIEW &&
- rel->rd_rel->relkind != RELKIND_COMPOSITE_TYPE)
+ rel->rd_rel->relkind != RELKIND_COMPOSITE_TYPE &&
+ rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE)
{
- RelationOpenSmgr(rel);
- smgrscheduleunlink(rel->rd_smgr, rel->rd_istemp);
+ RelationDropStorage(rel);
}
/*
/*
* Store a default expression for column attnum of relation rel.
- * The expression must be presented as a nodeToString() string.
*/
void
-StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin)
+StoreAttrDefault(Relation rel, AttrNumber attnum, Node *expr)
{
- Node *expr;
+ char *adbin;
char *adsrc;
Relation adrel;
HeapTuple tuple;
Datum values[4];
- static char nulls[4] = {' ', ' ', ' ', ' '};
+ static bool nulls[4] = {false, false, false, false};
Relation attrrel;
HeapTuple atttup;
Form_pg_attribute attStruct;
defobject;
/*
- * Need to construct source equivalent of given node-string.
+ * Flatten expression to string form for storage.
*/
- expr = stringToNode(adbin);
+ adbin = nodeToString(expr);
/*
- * deparse it
+ * Also deparse it to form the mostly-obsolete adsrc field.
*/
adsrc = deparse_expression(expr,
deparse_context_for(RelationGetRelationName(rel),
*/
values[Anum_pg_attrdef_adrelid - 1] = RelationGetRelid(rel);
values[Anum_pg_attrdef_adnum - 1] = attnum;
- values[Anum_pg_attrdef_adbin - 1] = DirectFunctionCall1(textin,
- CStringGetDatum(adbin));
- values[Anum_pg_attrdef_adsrc - 1] = DirectFunctionCall1(textin,
- CStringGetDatum(adsrc));
+ values[Anum_pg_attrdef_adbin - 1] = CStringGetTextDatum(adbin);
+ values[Anum_pg_attrdef_adsrc - 1] = CStringGetTextDatum(adsrc);
adrel = heap_open(AttrDefaultRelationId, RowExclusiveLock);
- tuple = heap_formtuple(adrel->rd_att, values, nulls);
+ tuple = heap_form_tuple(adrel->rd_att, values, nulls);
attrdefOid = simple_heap_insert(adrel, tuple);
CatalogUpdateIndexes(adrel, tuple);
pfree(DatumGetPointer(values[Anum_pg_attrdef_adbin - 1]));
pfree(DatumGetPointer(values[Anum_pg_attrdef_adsrc - 1]));
heap_freetuple(tuple);
+ pfree(adbin);
pfree(adsrc);
/*
* exists.
*/
attrrel = heap_open(AttributeRelationId, RowExclusiveLock);
- atttup = SearchSysCacheCopy(ATTNUM,
- ObjectIdGetDatum(RelationGetRelid(rel)),
- Int16GetDatum(attnum),
- 0, 0);
+ atttup = SearchSysCacheCopy2(ATTNUM,
+ ObjectIdGetDatum(RelationGetRelid(rel)),
+ Int16GetDatum(attnum));
if (!HeapTupleIsValid(atttup))
elog(ERROR, "cache lookup failed for attribute %d of relation %u",
attnum, RelationGetRelid(rel));
/*
* Store a check-constraint expression for the given relation.
- * The expression must be presented as a nodeToString() string.
*
* Caller is responsible for updating the count of constraints
* in the pg_class entry for the relation.
*/
static void
-StoreRelCheck(Relation rel, char *ccname, char *ccbin)
+StoreRelCheck(Relation rel, char *ccname, Node *expr,
+ bool is_local, int inhcount)
{
- Node *expr;
+ char *ccbin;
char *ccsrc;
List *varList;
int keycount;
int16 *attNos;
/*
- * Convert condition to an expression tree.
+ * Flatten expression to string form for storage.
*/
- expr = stringToNode(ccbin);
+ ccbin = nodeToString(expr);
/*
- * deparse it
+ * Also deparse it to form the mostly-obsolete consrc field.
*/
ccsrc = deparse_expression(expr,
deparse_context_for(RelationGetRelationName(rel),
false, false);
/*
- * Find columns of rel that are used in ccbin
+ * Find columns of rel that are used in expr
*
- * NB: pull_var_clause is okay here only because we don't allow subselects in
- * check constraints; it would fail to examine the contents of subselects.
+ * NB: pull_var_clause is okay here only because we don't allow subselects
+ * in check constraints; it would fail to examine the contents of
+ * subselects.
*/
- varList = pull_var_clause(expr, false);
+ varList = pull_var_clause(expr, PVC_REJECT_PLACEHOLDERS);
keycount = list_length(varList);
if (keycount > 0)
CONSTRAINT_CHECK, /* Constraint Type */
false, /* Is Deferrable */
false, /* Is Deferred */
+ true, /* Is Validated */
RelationGetRelid(rel), /* relation */
attNos, /* attrs in the constraint */
keycount, /* # attrs in the constraint */
InvalidOid, /* not a domain constraint */
+ InvalidOid, /* no associated index */
InvalidOid, /* Foreign key fields */
NULL,
+ NULL,
+ NULL,
+ NULL,
0,
' ',
' ',
' ',
- InvalidOid, /* no associated index */
- expr, /* Tree form check constraint */
- ccbin, /* Binary form check constraint */
- ccsrc); /* Source form check constraint */
-
+ NULL, /* not an exclusion constraint */
+ expr, /* Tree form of check constraint */
+ ccbin, /* Binary form of check constraint */
+ ccsrc, /* Source form of check constraint */
+ is_local, /* conislocal */
+ inhcount); /* coninhcount */
+
+ pfree(ccbin);
pfree(ccsrc);
}
/*
- * Store defaults and constraints passed in via the tuple constraint struct.
+ * Store defaults and constraints (passed as a list of CookedConstraint).
*
* NOTE: only pre-cooked expressions will be passed this way, which is to
* say expressions inherited from an existing relation. Newly parsed
* expressions can be added later, by direct calls to StoreAttrDefault
- * and StoreRelCheck (see AddRelationRawConstraints()).
+ * and StoreRelCheck (see AddRelationNewConstraints()).
*/
static void
-StoreConstraints(Relation rel, TupleDesc tupdesc)
+StoreConstraints(Relation rel, List *cooked_constraints)
{
- TupleConstr *constr = tupdesc->constr;
- int i;
+ int numchecks = 0;
+ ListCell *lc;
- if (!constr)
+ if (!cooked_constraints)
return; /* nothing to do */
/*
*/
CommandCounterIncrement();
- for (i = 0; i < constr->num_defval; i++)
- StoreAttrDefault(rel, constr->defval[i].adnum,
- constr->defval[i].adbin);
+ foreach(lc, cooked_constraints)
+ {
+ CookedConstraint *con = (CookedConstraint *) lfirst(lc);
- for (i = 0; i < constr->num_check; i++)
- StoreRelCheck(rel, constr->check[i].ccname,
- constr->check[i].ccbin);
+ switch (con->contype)
+ {
+ case CONSTR_DEFAULT:
+ StoreAttrDefault(rel, con->attnum, con->expr);
+ break;
+ case CONSTR_CHECK:
+ StoreRelCheck(rel, con->name, con->expr,
+ con->is_local, con->inhcount);
+ numchecks++;
+ break;
+ default:
+ elog(ERROR, "unrecognized constraint type: %d",
+ (int) con->contype);
+ }
+ }
- if (constr->num_check > 0)
- SetRelationNumChecks(rel, constr->num_check);
+ if (numchecks > 0)
+ SetRelationNumChecks(rel, numchecks);
}
/*
- * AddRelationRawConstraints
+ * AddRelationNewConstraints
*
- * Add raw (not-yet-transformed) column default expressions and/or constraint
- * check expressions to an existing relation. This is defined to do both
- * for efficiency in DefineRelation, but of course you can do just one or
- * the other by passing empty lists.
+ * Add new column default expressions and/or constraint check expressions
+ * to an existing relation. This is defined to do both for efficiency in
+ * DefineRelation, but of course you can do just one or the other by passing
+ * empty lists.
*
* rel: relation to be modified
- * rawColDefaults: list of RawColumnDefault structures
- * rawConstraints: list of Constraint nodes
+ * newColDefaults: list of RawColumnDefault structures
+ * newConstraints: list of Constraint nodes
+ * allow_merge: TRUE if check constraints may be merged with existing ones
+ * is_local: TRUE if definition is local, FALSE if it's inherited
*
- * All entries in rawColDefaults will be processed. Entries in rawConstraints
- * will be processed only if they are CONSTR_CHECK type and contain a "raw"
- * expression.
+ * All entries in newColDefaults will be processed. Entries in newConstraints
+ * will be processed only if they are CONSTR_CHECK type.
*
* Returns a list of CookedConstraint nodes that shows the cooked form of
* the default and constraint expressions added to the relation.
* tuples visible.
*/
List *
-AddRelationRawConstraints(Relation rel,
- List *rawColDefaults,
- List *rawConstraints)
+AddRelationNewConstraints(Relation rel,
+ List *newColDefaults,
+ List *newConstraints,
+ bool allow_merge,
+ bool is_local)
{
List *cookedConstraints = NIL;
TupleDesc tupleDesc;
/*
* Process column default expressions.
*/
- foreach(cell, rawColDefaults)
+ foreach(cell, newColDefaults)
{
RawColumnDefault *colDef = (RawColumnDefault *) lfirst(cell);
Form_pg_attribute atp = rel->rd_att->attrs[colDef->attnum - 1];
atp->atttypid, atp->atttypmod,
NameStr(atp->attname));
- StoreAttrDefault(rel, colDef->attnum, nodeToString(expr));
+ /*
+ * If the expression is just a NULL constant, we do not bother to make
+ * an explicit pg_attrdef entry, since the default behavior is
+ * equivalent.
+ *
+ * Note a nonobvious property of this test: if the column is of a
+ * domain type, what we'll get is not a bare null Const but a
+ * CoerceToDomain expr, so we will not discard the default. This is
+ * critical because the column default needs to be retained to
+ * override any default that the domain might have.
+ */
+ if (expr == NULL ||
+ (IsA(expr, Const) &&((Const *) expr)->constisnull))
+ continue;
+
+ StoreAttrDefault(rel, colDef->attnum, expr);
cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
cooked->contype = CONSTR_DEFAULT;
cooked->name = NULL;
cooked->attnum = colDef->attnum;
cooked->expr = expr;
+ cooked->is_local = is_local;
+ cooked->inhcount = is_local ? 0 : 1;
cookedConstraints = lappend(cookedConstraints, cooked);
}
*/
numchecks = numoldchecks;
checknames = NIL;
- foreach(cell, rawConstraints)
+ foreach(cell, newConstraints)
{
Constraint *cdef = (Constraint *) lfirst(cell);
char *ccname;
- if (cdef->contype != CONSTR_CHECK || cdef->raw_expr == NULL)
+ if (cdef->contype != CONSTR_CHECK)
continue;
- Assert(cdef->cooked_expr == NULL);
-
- /*
- * Transform raw parsetree to executable expression.
- */
- expr = transformExpr(pstate, cdef->raw_expr);
- /*
- * Make sure it yields a boolean result.
- */
- expr = coerce_to_boolean(pstate, expr, "CHECK");
+ if (cdef->raw_expr != NULL)
+ {
+ Assert(cdef->cooked_expr == NULL);
- /*
- * Make sure no outside relations are referred to.
- */
- if (list_length(pstate->p_rtable) != 1)
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
- errmsg("only table \"%s\" can be referenced in check constraint",
- RelationGetRelationName(rel))));
+ /*
+ * Transform raw parsetree to executable expression, and verify
+ * it's valid as a CHECK constraint.
+ */
+ expr = cookConstraint(pstate, cdef->raw_expr,
+ RelationGetRelationName(rel));
+ }
+ else
+ {
+ Assert(cdef->cooked_expr != NULL);
- /*
- * No subplans or aggregates, either...
- */
- if (pstate->p_hasSubLinks)
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("cannot use subquery in check constraint")));
- if (pstate->p_hasAggs)
- ereport(ERROR,
- (errcode(ERRCODE_GROUPING_ERROR),
- errmsg("cannot use aggregate function in check constraint")));
+ /*
+ * Here, we assume the parser will only pass us valid CHECK
+ * expressions, so we do no particular checking.
+ */
+ expr = stringToNode(cdef->cooked_expr);
+ }
/*
* Check name uniqueness, or generate a name if none was given.
*/
- if (cdef->name != NULL)
+ if (cdef->conname != NULL)
{
ListCell *cell2;
- ccname = cdef->name;
- /* Check against pre-existing constraints */
- if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
- RelationGetRelid(rel),
- RelationGetNamespace(rel),
- ccname))
- ereport(ERROR,
- (errcode(ERRCODE_DUPLICATE_OBJECT),
- errmsg("constraint \"%s\" for relation \"%s\" already exists",
- ccname, RelationGetRelationName(rel))));
+ ccname = cdef->conname;
/* Check against other new constraints */
/* Needed because we don't do CommandCounterIncrement in loop */
foreach(cell2, checknames)
errmsg("check constraint \"%s\" already exists",
ccname)));
}
+
+ /* save name for future checks */
+ checknames = lappend(checknames, ccname);
+
+ /*
+ * Check against pre-existing constraints. If we are allowed to
+ * merge with an existing constraint, there's no more to do here.
+ * (We omit the duplicate constraint from the result, which is
+ * what ATAddCheckConstraint wants.)
+ */
+ if (MergeWithExistingConstraint(rel, ccname, expr,
+ allow_merge, is_local))
+ continue;
}
else
{
List *vars;
char *colname;
- vars = pull_var_clause(expr, false);
+ vars = pull_var_clause(expr, PVC_REJECT_PLACEHOLDERS);
/* eliminate duplicates */
vars = list_union(NIL, vars);
"check",
RelationGetNamespace(rel),
checknames);
- }
- /* save name for future checks */
- checknames = lappend(checknames, ccname);
+ /* save name for future checks */
+ checknames = lappend(checknames, ccname);
+ }
/*
* OK, store it.
*/
- StoreRelCheck(rel, ccname, nodeToString(expr));
+ StoreRelCheck(rel, ccname, expr, is_local, is_local ? 0 : 1);
numchecks++;
cooked->name = ccname;
cooked->attnum = 0;
cooked->expr = expr;
+ cooked->is_local = is_local;
+ cooked->inhcount = is_local ? 0 : 1;
cookedConstraints = lappend(cookedConstraints, cooked);
}
return cookedConstraints;
}
+/*
+ * Check for a pre-existing check constraint that conflicts with a proposed
+ * new one, and either adjust its conislocal/coninhcount settings or throw
+ * error as needed.
+ *
+ * Returns TRUE if merged (constraint is a duplicate), or FALSE if it's
+ * got a so-far-unique name, or throws error if conflict.
+ */
+static bool
+MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr,
+ bool allow_merge, bool is_local)
+{
+ bool found;
+ Relation conDesc;
+ SysScanDesc conscan;
+ ScanKeyData skey[2];
+ HeapTuple tup;
+
+ /* Search for a pg_constraint entry with same name and relation */
+ conDesc = heap_open(ConstraintRelationId, RowExclusiveLock);
+
+ found = false;
+
+ ScanKeyInit(&skey[0],
+ Anum_pg_constraint_conname,
+ BTEqualStrategyNumber, F_NAMEEQ,
+ CStringGetDatum(ccname));
+
+ ScanKeyInit(&skey[1],
+ Anum_pg_constraint_connamespace,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetNamespace(rel)));
+
+ conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
+ SnapshotNow, 2, skey);
+
+ while (HeapTupleIsValid(tup = systable_getnext(conscan)))
+ {
+ Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tup);
+
+ if (con->conrelid == RelationGetRelid(rel))
+ {
+ /* Found it. Conflicts if not identical check constraint */
+ if (con->contype == CONSTRAINT_CHECK)
+ {
+ Datum val;
+ bool isnull;
+
+ val = fastgetattr(tup,
+ Anum_pg_constraint_conbin,
+ conDesc->rd_att, &isnull);
+ if (isnull)
+ elog(ERROR, "null conbin for rel %s",
+ RelationGetRelationName(rel));
+ if (equal(expr, stringToNode(TextDatumGetCString(val))))
+ found = true;
+ }
+ if (!found || !allow_merge)
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("constraint \"%s\" for relation \"%s\" already exists",
+ ccname, RelationGetRelationName(rel))));
+ /* OK to update the tuple */
+ ereport(NOTICE,
+ (errmsg("merging constraint \"%s\" with inherited definition",
+ ccname)));
+ tup = heap_copytuple(tup);
+ con = (Form_pg_constraint) GETSTRUCT(tup);
+ if (is_local)
+ con->conislocal = true;
+ else
+ con->coninhcount++;
+ simple_heap_update(conDesc, &tup->t_self, tup);
+ CatalogUpdateIndexes(conDesc, tup);
+ break;
+ }
+ }
+
+ systable_endscan(conscan);
+ heap_close(conDesc, RowExclusiveLock);
+
+ return found;
+}
+
/*
* Update the count of constraints in the relation's pg_class tuple.
*
Form_pg_class relStruct;
relrel = heap_open(RelationRelationId, RowExclusiveLock);
- reltup = SearchSysCacheCopy(RELOID,
- ObjectIdGetDatum(RelationGetRelid(rel)),
- 0, 0, 0);
+ reltup = SearchSysCacheCopy1(RELOID,
+ ObjectIdGetDatum(RelationGetRelid(rel)));
if (!HeapTupleIsValid(reltup))
elog(ERROR, "cache lookup failed for relation %u",
RelationGetRelid(rel));
ereport(ERROR,
(errcode(ERRCODE_GROUPING_ERROR),
errmsg("cannot use aggregate function in default expression")));
+ if (pstate->p_hasWindowFuncs)
+ ereport(ERROR,
+ (errcode(ERRCODE_WINDOWING_ERROR),
+ errmsg("cannot use window function in default expression")));
/*
* Coerce the expression to the correct type and typmod, if given. This
* should match the parser's processing of non-defaulted expressions ---
- * see updateTargetListEntry().
+ * see transformAssignedExpr().
*/
if (OidIsValid(atttypid))
{
expr = coerce_to_target_type(pstate, expr, type_id,
atttypid, atttypmod,
COERCION_ASSIGNMENT,
- COERCE_IMPLICIT_CAST);
+ COERCE_IMPLICIT_CAST,
+ -1);
if (expr == NULL)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errhint("You will need to rewrite or cast the expression.")));
}
+ /*
+ * Finally, take care of collations in the finished expression.
+ */
+ assign_expr_collations(pstate, expr);
+
return expr;
}
-
/*
- * Removes all constraints on a relation that match the given name.
- *
- * It is the responsibility of the calling function to acquire a suitable
- * lock on the relation.
+ * Take a raw CHECK constraint expression and convert it to a cooked format
+ * ready for storage.
*
- * Returns: The number of constraints removed.
+ * Parse state must be set up to recognize any vars that might appear
+ * in the expression.
*/
-int
-RemoveRelConstraints(Relation rel, const char *constrName,
- DropBehavior behavior)
+static Node *
+cookConstraint(ParseState *pstate,
+ Node *raw_constraint,
+ char *relname)
{
- int ndeleted = 0;
- Relation conrel;
- SysScanDesc conscan;
- ScanKeyData key[1];
- HeapTuple contup;
-
- /* Grab an appropriate lock on the pg_constraint relation */
- conrel = heap_open(ConstraintRelationId, RowExclusiveLock);
-
- /* Use the index to scan only constraints of the target relation */
- ScanKeyInit(&key[0],
- Anum_pg_constraint_conrelid,
- BTEqualStrategyNumber, F_OIDEQ,
- ObjectIdGetDatum(RelationGetRelid(rel)));
-
- conscan = systable_beginscan(conrel, ConstraintRelidIndexId, true,
- SnapshotNow, 1, key);
+ Node *expr;
/*
- * Scan over the result set, removing any matching entries.
+ * Transform raw parsetree to executable expression.
*/
- while ((contup = systable_getnext(conscan)) != NULL)
- {
- Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(contup);
-
- if (strcmp(NameStr(con->conname), constrName) == 0)
- {
- ObjectAddress conobj;
+ expr = transformExpr(pstate, raw_constraint);
- conobj.classId = ConstraintRelationId;
- conobj.objectId = HeapTupleGetOid(contup);
- conobj.objectSubId = 0;
+ /*
+ * Make sure it yields a boolean result.
+ */
+ expr = coerce_to_boolean(pstate, expr, "CHECK");
- performDeletion(&conobj, behavior);
+ /*
+ * Take care of collations.
+ */
+ assign_expr_collations(pstate, expr);
- ndeleted++;
- }
- }
+ /*
+ * Make sure no outside relations are referred to.
+ */
+ if (list_length(pstate->p_rtable) != 1)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+ errmsg("only table \"%s\" can be referenced in check constraint",
+ relname)));
- /* Clean up after the scan */
- systable_endscan(conscan);
- heap_close(conrel, RowExclusiveLock);
+ /*
+ * No subplans or aggregates, either...
+ */
+ if (pstate->p_hasSubLinks)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot use subquery in check constraint")));
+ if (pstate->p_hasAggs)
+ ereport(ERROR,
+ (errcode(ERRCODE_GROUPING_ERROR),
+ errmsg("cannot use aggregate function in check constraint")));
+ if (pstate->p_hasWindowFuncs)
+ ereport(ERROR,
+ (errcode(ERRCODE_WINDOWING_ERROR),
+ errmsg("cannot use window function in check constraint")));
- return ndeleted;
+ return expr;
}
/*
* RemoveStatistics --- remove entries in pg_statistic for a rel or column
*
- * If attnum is zero, remove all entries for rel; else remove only the one
+ * If attnum is zero, remove all entries for rel; else remove only the one(s)
* for that column.
*/
void
nkeys = 2;
}
- scan = systable_beginscan(pgstatistic, StatisticRelidAttnumIndexId, true,
+ scan = systable_beginscan(pgstatistic, StatisticRelidAttnumInhIndexId, true,
SnapshotNow, nkeys, key);
+ /* we must loop even when attnum != 0, in case of inherited stats */
while (HeapTupleIsValid(tuple = systable_getnext(scan)))
simple_heap_delete(pgstatistic, &tuple->t_self);
/*
- * RelationTruncateIndexes - truncate all
- * indexes associated with the heap relation to zero tuples.
+ * RelationTruncateIndexes - truncate all indexes associated
+ * with the heap relation to zero tuples.
*
* The routine will truncate and then reconstruct the indexes on
- * the relation specified by the heapId parameter.
+ * the specified relation. Caller must hold exclusive lock on rel.
*/
static void
-RelationTruncateIndexes(Oid heapId)
+RelationTruncateIndexes(Relation heapRelation)
{
- Relation heapRelation;
ListCell *indlist;
- /*
- * Open the heap rel. We need grab no lock because we assume
- * heap_truncate is holding an exclusive lock on the heap rel.
- */
- heapRelation = heap_open(heapId, NoLock);
-
/* Ask the relcache to produce a list of the indexes of the rel */
foreach(indlist, RelationGetIndexList(heapRelation))
{
Relation currentIndex;
IndexInfo *indexInfo;
- /* Open the index relation */
- currentIndex = index_open(indexId);
-
- /* Obtain exclusive lock on it, just to be sure */
- LockRelation(currentIndex, AccessExclusiveLock);
+ /* Open the index relation; use exclusive lock, just to be sure */
+ currentIndex = index_open(indexId, AccessExclusiveLock);
/* Fetch info needed for index_build */
indexInfo = BuildIndexInfo(currentIndex);
- /* Now truncate the actual file (and discard buffers) */
+ /*
+ * Now truncate the actual file (and discard buffers).
+ */
RelationTruncate(currentIndex, 0);
/* Initialize the index and rebuild */
- index_build(heapRelation, currentIndex, indexInfo);
+ /* Note: we do not need to re-establish pkey setting */
+ index_build(heapRelation, currentIndex, indexInfo, false);
- /*
- * index_build will close both the heap and index relations (but not
- * give up the locks we hold on them). We're done with this index,
- * but we must re-open the heap rel.
- */
- heapRelation = heap_open(heapId, NoLock);
+ /* We're done with this index */
+ index_close(currentIndex, NoLock);
}
-
- /* Finish by closing the heap rel again */
- heap_close(heapRelation, NoLock);
}
/*
{
Oid rid = lfirst_oid(cell);
Relation rel;
- Oid toastrelid;
rel = heap_open(rid, AccessExclusiveLock);
relations = lappend(relations, rel);
-
- /* If there is a toast table, add it to the list too */
- toastrelid = rel->rd_rel->reltoastrelid;
- if (OidIsValid(toastrelid))
- {
- rel = heap_open(toastrelid, AccessExclusiveLock);
- relations = lappend(relations, rel);
- }
}
/* Don't allow truncate on tables that are referenced by foreign keys */
{
Relation rel = lfirst(cell);
- /* Truncate the actual file (and discard buffers) */
- RelationTruncate(rel, 0);
+ /* Truncate the relation */
+ heap_truncate_one_rel(rel);
- /* If this relation has indexes, truncate the indexes too */
- RelationTruncateIndexes(RelationGetRelid(rel));
-
- /*
- * Close the relation, but keep exclusive lock on it until commit.
- */
+ /* Close the relation, but keep exclusive lock on it until commit */
heap_close(rel, NoLock);
}
}
+/*
+ * heap_truncate_one_rel
+ *
+ * This routine deletes all data within the specified relation.
+ *
+ * This is not transaction-safe, because the truncation is done immediately
+ * and cannot be rolled back later. Caller is responsible for having
+ * checked permissions etc, and must have obtained AccessExclusiveLock.
+ */
+void
+heap_truncate_one_rel(Relation rel)
+{
+ Oid toastrelid;
+
+ /* Truncate the actual file (and discard buffers) */
+ RelationTruncate(rel, 0);
+
+ /* If the relation has indexes, truncate the indexes too */
+ RelationTruncateIndexes(rel);
+
+ /* If there is a toast table, truncate that too */
+ toastrelid = rel->rd_rel->reltoastrelid;
+ if (OidIsValid(toastrelid))
+ {
+ Relation toastrel = heap_open(toastrelid, AccessExclusiveLock);
+
+ RelationTruncate(toastrel, 0);
+ RelationTruncateIndexes(toastrel);
+ /* keep the lock... */
+ heap_close(toastrel, NoLock);
+ }
+}
+
/*
* heap_truncate_check_FKs
* Check for foreign keys referencing a list of relations that
- * are to be truncated
+ * are to be truncated, and raise error if there are any
*
* We disallow such FKs (except self-referential ones) since the whole point
* of TRUNCATE is to not scan the individual rows to be thrown away.
heap_truncate_check_FKs(List *relations, bool tempTables)
{
List *oids = NIL;
+ List *dependents;
ListCell *cell;
- Relation fkeyRel;
- SysScanDesc fkeyScan;
- HeapTuple tuple;
/*
* Build a list of OIDs of the interesting relations.
{
Relation rel = lfirst(cell);
- if (rel->rd_rel->reltriggers != 0)
+ if (rel->rd_rel->relhastriggers)
oids = lappend_oid(oids, RelationGetRelid(rel));
}
return;
/*
- * Otherwise, must scan pg_constraint. Right now, it is a seqscan because
- * there is no available index on confrelid.
+ * Otherwise, must scan pg_constraint. We make one pass with all the
+ * relations considered; if this finds nothing, then all is well.
+ */
+ dependents = heap_truncate_find_FKs(oids);
+ if (dependents == NIL)
+ return;
+
+ /*
+ * Otherwise we repeat the scan once per relation to identify a particular
+ * pair of relations to complain about. This is pretty slow, but
+ * performance shouldn't matter much in a failure path. The reason for
+ * doing things this way is to ensure that the message produced is not
+ * dependent on chance row locations within pg_constraint.
+ */
+ foreach(cell, oids)
+ {
+ Oid relid = lfirst_oid(cell);
+ ListCell *cell2;
+
+ dependents = heap_truncate_find_FKs(list_make1_oid(relid));
+
+ foreach(cell2, dependents)
+ {
+ Oid relid2 = lfirst_oid(cell2);
+
+ if (!list_member_oid(oids, relid2))
+ {
+ char *relname = get_rel_name(relid);
+ char *relname2 = get_rel_name(relid2);
+
+ if (tempTables)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("unsupported ON COMMIT and foreign key combination"),
+ errdetail("Table \"%s\" references \"%s\", but they do not have the same ON COMMIT setting.",
+ relname2, relname)));
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot truncate a table referenced in a foreign key constraint"),
+ errdetail("Table \"%s\" references \"%s\".",
+ relname2, relname),
+ errhint("Truncate table \"%s\" at the same time, "
+ "or use TRUNCATE ... CASCADE.",
+ relname2)));
+ }
+ }
+ }
+}
+
+/*
+ * heap_truncate_find_FKs
+ * Find relations having foreign keys referencing any of the given rels
+ *
+ * Input and result are both lists of relation OIDs. The result contains
+ * no duplicates, does *not* include any rels that were already in the input
+ * list, and is sorted in OID order. (The last property is enforced mainly
+ * to guarantee consistent behavior in the regression tests; we don't want
+ * behavior to change depending on chance locations of rows in pg_constraint.)
+ *
+ * Note: caller should already have appropriate lock on all rels mentioned
+ * in relationIds. Since adding or dropping an FK requires exclusive lock
+ * on both rels, this ensures that the answer will be stable.
+ */
+List *
+heap_truncate_find_FKs(List *relationIds)
+{
+ List *result = NIL;
+ Relation fkeyRel;
+ SysScanDesc fkeyScan;
+ HeapTuple tuple;
+
+ /*
+ * Must scan pg_constraint. Right now, it is a seqscan because there is
+ * no available index on confrelid.
*/
fkeyRel = heap_open(ConstraintRelationId, AccessShareLock);
if (con->contype != CONSTRAINT_FOREIGN)
continue;
- /* Not for one of our list of tables */
- if (!list_member_oid(oids, con->confrelid))
+ /* Not referencing one of our list of tables */
+ if (!list_member_oid(relationIds, con->confrelid))
continue;
- /* The referencer should be in our list too */
- if (!list_member_oid(oids, con->conrelid))
- {
- if (tempTables)
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("unsupported ON COMMIT and foreign key combination"),
- errdetail("Table \"%s\" references \"%s\" via foreign key constraint \"%s\", but they do not have the same ON COMMIT setting.",
- get_rel_name(con->conrelid),
- get_rel_name(con->confrelid),
- NameStr(con->conname))));
- else
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("cannot truncate a table referenced in a foreign key constraint"),
- errdetail("Table \"%s\" references \"%s\" via foreign key constraint \"%s\".",
- get_rel_name(con->conrelid),
- get_rel_name(con->confrelid),
- NameStr(con->conname)),
- errhint("Truncate table \"%s\" at the same time.",
- get_rel_name(con->conrelid))));
- }
+ /* Add referencer unless already in input or result list */
+ if (!list_member_oid(relationIds, con->conrelid))
+ result = insert_ordered_unique_oid(result, con->conrelid);
}
systable_endscan(fkeyScan);
heap_close(fkeyRel, AccessShareLock);
+
+ return result;
+}
+
+/*
+ * insert_ordered_unique_oid
+ * Insert a new Oid into a sorted list of Oids, preserving ordering,
+ * and eliminating duplicates
+ *
+ * Building the ordered list this way is O(N^2), but with a pretty small
+ * constant, so for the number of entries we expect it will probably be
+ * faster than trying to apply qsort(). It seems unlikely someone would be
+ * trying to truncate a table with thousands of dependent tables ...
+ */
+static List *
+insert_ordered_unique_oid(List *list, Oid datum)
+{
+ ListCell *prev;
+
+ /* Does the datum belong at the front? */
+ if (list == NIL || datum < linitial_oid(list))
+ return lcons_oid(datum, list);
+ /* Does it match the first entry? */
+ if (datum == linitial_oid(list))
+ return list; /* duplicate, so don't insert */
+ /* No, so find the entry it belongs after */
+ prev = list_head(list);
+ for (;;)
+ {
+ ListCell *curr = lnext(prev);
+
+ if (curr == NULL || datum < lfirst_oid(curr))
+ break; /* it belongs after 'prev', before 'curr' */
+
+ if (datum == lfirst_oid(curr))
+ return list; /* duplicate, so don't insert */
+
+ prev = curr;
+ }
+ /* Insert datum into list after 'prev' */
+ lappend_cell_oid(list, prev, datum);
+ return list;
}