#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/tqual.h"
+#include "utils/typcache.h"
/*
static void ATPrepAddInherit(Relation child_rel);
static void ATExecAddInherit(Relation child_rel, RangeVar *parent, LOCKMODE lockmode);
static void ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode);
+static void drop_parent_dependency(Oid relid, Oid refclassid, Oid refobjid);
+static void ATExecAddOf(Relation rel, const TypeName *ofTypename, LOCKMODE lockmode);
+static void ATExecDropOf(Relation rel, LOCKMODE lockmode);
static void ATExecGenericOptions(Relation rel, List *options);
static void copy_relation_data(SMgrRelation rel, SMgrRelation dst,
cmd_lockmode = ShareUpdateExclusiveLock;
break;
+ /*
+ * These subcommands affect implicit row type conversion. They
+ * have affects similar to CREATE/DROP CAST on queries. We
+ * don't provide for invalidating parse trees as a result of
+ * such changes. Do avoid concurrent pg_class updates, though.
+ */
+ case AT_AddOf:
+ case AT_DropOf:
+ cmd_lockmode = ShareUpdateExclusiveLock;
+
/*
* These subcommands affect general strategies for performance
* and maintenance, though don't change the semantic results
case AT_EnableAlwaysRule:
case AT_EnableReplicaRule:
case AT_DisableRule:
- ATSimplePermissions(rel, ATT_TABLE);
- /* These commands never recurse */
- /* No command-specific prep needed */
- pass = AT_PASS_MISC;
- break;
case AT_DropInherit: /* NO INHERIT */
+ case AT_AddOf: /* OF */
+ case AT_DropOf: /* NOT OF */
ATSimplePermissions(rel, ATT_TABLE);
+ /* These commands never recurse */
/* No command-specific prep needed */
pass = AT_PASS_MISC;
break;
case AT_DropInherit:
ATExecDropInherit(rel, (RangeVar *) cmd->def, lockmode);
break;
+ case AT_AddOf:
+ ATExecAddOf(rel, (TypeName *) cmd->def, lockmode);
+ break;
+ case AT_DropOf:
+ ATExecDropOf(rel, lockmode);
+ break;
case AT_GenericOptions:
ATExecGenericOptions(rel, (List *) cmd->def);
break;
}
+/*
+ * check_of_type
+ *
+ * Check whether a type is suitable for CREATE TABLE OF/ALTER TABLE OF. If it
+ * isn't suitable, throw an error. Currently, we require that the type
+ * originated with CREATE TABLE AS. We could support any row type, but doing so
+ * would require handling a number of extra corner cases in the DDL commands.
+ */
+void
+check_of_type(HeapTuple typetuple)
+{
+ Form_pg_type typ = (Form_pg_type) GETSTRUCT(typetuple);
+ bool typeOk = false;
+
+ if (typ->typtype == TYPTYPE_COMPOSITE)
+ {
+ Relation typeRelation;
+
+ Assert(OidIsValid(typ->typrelid));
+ typeRelation = relation_open(typ->typrelid, AccessShareLock);
+ typeOk = (typeRelation->rd_rel->relkind == RELKIND_COMPOSITE_TYPE);
+ /*
+ * Close the parent rel, but keep our AccessShareLock on it until xact
+ * commit. That will prevent someone else from deleting or ALTERing
+ * the type before the typed table creation/conversion commits.
+ */
+ relation_close(typeRelation, NoLock);
+ }
+ if (!typeOk)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("type %s is not a composite type",
+ format_type_be(HeapTupleGetOid(typetuple)))));
+}
+
+
/*
* ALTER TABLE ADD COLUMN
*
ScanKeyData key[3];
HeapTuple inheritsTuple,
attributeTuple,
- constraintTuple,
- depTuple;
+ constraintTuple;
List *connames;
bool found = false;
systable_endscan(scan);
heap_close(catalogRelation, RowExclusiveLock);
- /*
- * Drop the dependency
- *
- * There's no convenient way to do this, so go trawling through pg_depend
- */
+ drop_parent_dependency(RelationGetRelid(rel),
+ RelationRelationId,
+ RelationGetRelid(parent_rel));
+
+ /* keep our lock on the parent relation until commit */
+ heap_close(parent_rel, NoLock);
+}
+
+/*
+ * Drop the dependency created by StoreCatalogInheritance1 (CREATE TABLE
+ * INHERITS/ALTER TABLE INHERIT -- refclassid will be RelationRelationId) or
+ * heap_create_with_catalog (CREATE TABLE OF/ALTER TABLE OF -- refclassid will
+ * be TypeRelationId). There's no convenient way to do this, so go trawling
+ * through pg_depend.
+ */
+static void
+drop_parent_dependency(Oid relid, Oid refclassid, Oid refobjid)
+{
+ Relation catalogRelation;
+ SysScanDesc scan;
+ ScanKeyData key[3];
+ HeapTuple depTuple;
+
catalogRelation = heap_open(DependRelationId, RowExclusiveLock);
ScanKeyInit(&key[0],
ScanKeyInit(&key[1],
Anum_pg_depend_objid,
BTEqualStrategyNumber, F_OIDEQ,
- ObjectIdGetDatum(RelationGetRelid(rel)));
+ ObjectIdGetDatum(relid));
ScanKeyInit(&key[2],
Anum_pg_depend_objsubid,
BTEqualStrategyNumber, F_INT4EQ,
{
Form_pg_depend dep = (Form_pg_depend) GETSTRUCT(depTuple);
- if (dep->refclassid == RelationRelationId &&
- dep->refobjid == RelationGetRelid(parent_rel) &&
+ if (dep->refclassid == refclassid &&
+ dep->refobjid == refobjid &&
dep->refobjsubid == 0 &&
dep->deptype == DEPENDENCY_NORMAL)
simple_heap_delete(catalogRelation, &depTuple->t_self);
systable_endscan(scan);
heap_close(catalogRelation, RowExclusiveLock);
+}
- /* keep our lock on the parent relation until commit */
- heap_close(parent_rel, NoLock);
+/*
+ * ALTER TABLE OF
+ *
+ * Attach a table to a composite type, as though it had been created with CREATE
+ * TABLE OF. All attname, atttypid, atttypmod and attcollation must match. The
+ * subject table must not have inheritance parents. These restrictions ensure
+ * that you cannot create a configuration impossible with CREATE TABLE OF alone.
+ */
+static void
+ATExecAddOf(Relation rel, const TypeName *ofTypename, LOCKMODE lockmode)
+{
+ Oid relid = RelationGetRelid(rel);
+ Type typetuple;
+ Form_pg_type typ;
+ Oid typeid;
+ Relation inheritsRelation,
+ relationRelation;
+ SysScanDesc scan;
+ ScanKeyData key;
+ AttrNumber table_attno,
+ type_attno;
+ TupleDesc typeTupleDesc,
+ tableTupleDesc;
+ ObjectAddress tableobj,
+ typeobj;
+ HeapTuple classtuple;
+
+ /* Validate the type. */
+ typetuple = typenameType(NULL, ofTypename, NULL);
+ check_of_type(typetuple);
+ typ = (Form_pg_type) GETSTRUCT(typetuple);
+ typeid = HeapTupleGetOid(typetuple);
+
+ /* Fail if the table has any inheritance parents. */
+ inheritsRelation = heap_open(InheritsRelationId, AccessShareLock);
+ ScanKeyInit(&key,
+ Anum_pg_inherits_inhrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(relid));
+ scan = systable_beginscan(inheritsRelation, InheritsRelidSeqnoIndexId,
+ true, SnapshotNow, 1, &key);
+ if (HeapTupleIsValid(systable_getnext(scan)))
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("typed tables cannot inherit")));
+ systable_endscan(scan);
+ heap_close(inheritsRelation, AccessShareLock);
+
+ /*
+ * Check the tuple descriptors for compatibility. Unlike inheritance, we
+ * require that the order also match. However, attnotnull need not match.
+ * Also unlike inheritance, we do not require matching relhasoids.
+ */
+ typeTupleDesc = lookup_rowtype_tupdesc(typeid, -1);
+ tableTupleDesc = RelationGetDescr(rel);
+ table_attno = 1;
+ for (type_attno = 1; type_attno <= typeTupleDesc->natts; type_attno++)
+ {
+ Form_pg_attribute type_attr,
+ table_attr;
+ const char *type_attname,
+ *table_attname;
+
+ /* Get the next non-dropped type attribute. */
+ type_attr = typeTupleDesc->attrs[type_attno - 1];
+ if (type_attr->attisdropped)
+ continue;
+ type_attname = NameStr(type_attr->attname);
+
+ /* Get the next non-dropped table attribute. */
+ do
+ {
+ if (table_attno > tableTupleDesc->natts)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("table is missing column \"%s\"",
+ type_attname)));
+ table_attr = tableTupleDesc->attrs[table_attno++ - 1];
+ } while (table_attr->attisdropped);
+ table_attname = NameStr(table_attr->attname);
+
+ /* Compare name. */
+ if (strncmp(table_attname, type_attname, NAMEDATALEN) != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("table has column \"%s\" where type requires \"%s\"",
+ table_attname, type_attname)));
+
+ /* Compare type. */
+ if (table_attr->atttypid != type_attr->atttypid ||
+ table_attr->atttypmod != type_attr->atttypmod ||
+ table_attr->attcollation != type_attr->attcollation)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("table \"%s\" has different type for column \"%s\"",
+ RelationGetRelationName(rel), type_attname)));
+ }
+ DecrTupleDescRefCount(typeTupleDesc);
+
+ /* Any remaining columns at the end of the table had better be dropped. */
+ for (; table_attno <= tableTupleDesc->natts; table_attno++)
+ {
+ Form_pg_attribute table_attr = tableTupleDesc->attrs[table_attno - 1];
+ if (!table_attr->attisdropped)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("table has extra column \"%s\"",
+ NameStr(table_attr->attname))));
+ }
+
+ /* If the table was already typed, drop the existing dependency. */
+ if (rel->rd_rel->reloftype)
+ drop_parent_dependency(relid, TypeRelationId, rel->rd_rel->reloftype);
+
+ /* Record a dependency on the new type. */
+ tableobj.classId = RelationRelationId;
+ tableobj.objectId = relid;
+ tableobj.objectSubId = 0;
+ typeobj.classId = TypeRelationId;
+ typeobj.objectId = typeid;
+ typeobj.objectSubId = 0;
+ recordDependencyOn(&tableobj, &typeobj, DEPENDENCY_NORMAL);
+
+ /* Update pg_class.reloftype */
+ relationRelation = heap_open(RelationRelationId, RowExclusiveLock);
+ classtuple = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relid));
+ if (!HeapTupleIsValid(classtuple))
+ elog(ERROR, "cache lookup failed for relation %u", relid);
+ ((Form_pg_class) GETSTRUCT(classtuple))->reloftype = typeid;
+ simple_heap_update(relationRelation, &classtuple->t_self, classtuple);
+ CatalogUpdateIndexes(relationRelation, classtuple);
+ heap_freetuple(classtuple);
+ heap_close(relationRelation, RowExclusiveLock);
+
+ ReleaseSysCache(typetuple);
+}
+
+/*
+ * ALTER TABLE NOT OF
+ *
+ * Detach a typed table from its originating type. Just clear reloftype and
+ * remove the dependency.
+ */
+static void
+ATExecDropOf(Relation rel, LOCKMODE lockmode)
+{
+ Oid relid = RelationGetRelid(rel);
+ Relation relationRelation;
+ HeapTuple tuple;
+
+ if (!OidIsValid(rel->rd_rel->reloftype))
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is not a typed table",
+ RelationGetRelationName(rel))));
+
+ /*
+ * We don't bother to check ownership of the type --- ownership of the table
+ * is presumed enough rights. No lock required on the type, either.
+ */
+
+ drop_parent_dependency(relid, TypeRelationId, rel->rd_rel->reloftype);
+
+ /* Clear pg_class.reloftype */
+ relationRelation = heap_open(RelationRelationId, RowExclusiveLock);
+ tuple = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relid));
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "cache lookup failed for relation %u", relid);
+ ((Form_pg_class) GETSTRUCT(tuple))->reloftype = InvalidOid;
+ simple_heap_update(relationRelation, &tuple->t_self, tuple);
+ CatalogUpdateIndexes(relationRelation, tuple);
+ heap_freetuple(tuple);
+ heap_close(relationRelation, RowExclusiveLock);
}
/*