]> granicus.if.org Git - postgresql/blobdiff - src/backend/catalog/heap.c
Revise collation derivation method and expression-tree representation.
[postgresql] / src / backend / catalog / heap.c
index 39aec680c086926ba232082a07ad58e83cf8536a..567eb7fd6ebb42acff259e71235c651f2440a3a6 100644 (file)
@@ -3,12 +3,12 @@
  * heap.c
  *       code to create and destroy POSTGRES heap relations
  *
- * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/catalog/heap.c,v 1.372 2010/02/26 02:00:36 momjian Exp $
+ *       src/backend/catalog/heap.c
  *
  *
  * INTERFACE ROUTINES
 #include "catalog/heap.h"
 #include "catalog/index.h"
 #include "catalog/indexing.h"
+#include "catalog/namespace.h"
+#include "catalog/objectaccess.h"
 #include "catalog/pg_attrdef.h"
+#include "catalog/pg_collation.h"
 #include "catalog/pg_constraint.h"
+#include "catalog/pg_foreign_table.h"
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_namespace.h"
 #include "catalog/pg_statistic.h"
@@ -54,6 +58,7 @@
 #include "nodes/nodeFuncs.h"
 #include "optimizer/var.h"
 #include "parser/parse_coerce.h"
+#include "parser/parse_collate.h"
 #include "parser/parse_expr.h"
 #include "parser/parse_relation.h"
 #include "storage/bufmgr.h"
@@ -70,9 +75,9 @@
 #include "utils/tqual.h"
 
 
-/* Kluge for upgrade-in-place support */
-Oid                    binary_upgrade_next_heap_relfilenode = InvalidOid;
-Oid                    binary_upgrade_next_toast_relfilenode = InvalidOid;
+/* Potentially set by contrib/pg_upgrade_support functions */
+Oid                    binary_upgrade_next_heap_pg_class_oid = InvalidOid;
+Oid                    binary_upgrade_next_toast_pg_class_oid = InvalidOid;
 
 static void AddNewRelationTuple(Relation pg_class_desc,
                                        Relation new_rel_desc,
@@ -236,6 +241,7 @@ heap_create(const char *relname,
                        Oid relid,
                        TupleDesc tupDesc,
                        char relkind,
+                       char relpersistence,
                        bool shared_relation,
                        bool mapped_relation,
                        bool allow_system_table_mods)
@@ -266,6 +272,7 @@ heap_create(const char *relname,
        {
                case RELKIND_VIEW:
                case RELKIND_COMPOSITE_TYPE:
+               case RELKIND_FOREIGN_TABLE:
                        create_storage = false;
 
                        /*
@@ -309,7 +316,8 @@ heap_create(const char *relname,
                                                                         relid,
                                                                         reltablespace,
                                                                         shared_relation,
-                                                                        mapped_relation);
+                                                                        mapped_relation,
+                                                                        relpersistence);
 
        /*
         * Have the storage manager create the relation's disk file, if needed.
@@ -320,7 +328,7 @@ heap_create(const char *relname,
        if (create_storage)
        {
                RelationOpenSmgr(rel);
-               RelationCreateStorage(rel->rd_node, rel->rd_istemp);
+               RelationCreateStorage(rel->rd_node, relpersistence);
        }
 
        return rel;
@@ -422,6 +430,7 @@ CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind,
        {
                CheckAttributeType(NameStr(tupdesc->attrs[i]->attname),
                                                   tupdesc->attrs[i]->atttypid,
+                                                  tupdesc->attrs[i]->attcollation,
                                                   allow_system_table_mods);
        }
 }
@@ -435,7 +444,7 @@ CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind,
  * --------------------------------
  */
 void
-CheckAttributeType(const char *attname, Oid atttypid,
+CheckAttributeType(const char *attname, Oid atttypid, Oid attcollation,
                                   bool allow_system_table_mods)
 {
        char            att_typtype = get_typtype(atttypid);
@@ -486,12 +495,24 @@ CheckAttributeType(const char *attname, Oid atttypid,
 
                        if (attr->attisdropped)
                                continue;
-                       CheckAttributeType(NameStr(attr->attname), attr->atttypid,
+                       CheckAttributeType(NameStr(attr->attname), attr->atttypid, attr->attcollation,
                                                           allow_system_table_mods);
                }
 
                relation_close(relation, AccessShareLock);
        }
+
+       /*
+        * This might not be strictly invalid per SQL standard, but it is
+        * pretty useless, and it cannot be dumped, so we must disallow
+        * it.
+        */
+       if (type_is_collatable(atttypid) && !OidIsValid(attcollation))
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+                                        errmsg("no collation was derived for column \"%s\" with collatable type %s",
+                                                       attname, format_type_be(atttypid)),
+                                        errhint("Use the COLLATE clause to set the collation explicitly.")));
 }
 
 /*
@@ -536,6 +557,7 @@ InsertPgAttributeTuple(Relation pg_attribute_rel,
        values[Anum_pg_attribute_attisdropped - 1] = BoolGetDatum(new_attribute->attisdropped);
        values[Anum_pg_attribute_attislocal - 1] = BoolGetDatum(new_attribute->attislocal);
        values[Anum_pg_attribute_attinhcount - 1] = Int32GetDatum(new_attribute->attinhcount);
+       values[Anum_pg_attribute_attcollation - 1] = ObjectIdGetDatum(new_attribute->attcollation);
 
        /* start out with empty permissions and empty options */
        nulls[Anum_pg_attribute_attacl - 1] = true;
@@ -606,6 +628,14 @@ AddNewAttributeTuples(Oid new_rel_oid,
                referenced.objectId = attr->atttypid;
                referenced.objectSubId = 0;
                recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+
+               if (OidIsValid(attr->attcollation))
+               {
+                       referenced.classId = CollationRelationId;
+                       referenced.objectId = attr->attcollation;
+                       referenced.objectSubId = 0;
+                       recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+               }
        }
 
        /*
@@ -691,13 +721,12 @@ InsertPgClassTuple(Relation pg_class_desc,
        values[Anum_pg_class_reltoastidxid - 1] = ObjectIdGetDatum(rd_rel->reltoastidxid);
        values[Anum_pg_class_relhasindex - 1] = BoolGetDatum(rd_rel->relhasindex);
        values[Anum_pg_class_relisshared - 1] = BoolGetDatum(rd_rel->relisshared);
-       values[Anum_pg_class_relistemp - 1] = BoolGetDatum(rd_rel->relistemp);
+       values[Anum_pg_class_relpersistence - 1] = CharGetDatum(rd_rel->relpersistence);
        values[Anum_pg_class_relkind - 1] = CharGetDatum(rd_rel->relkind);
        values[Anum_pg_class_relnatts - 1] = Int16GetDatum(rd_rel->relnatts);
        values[Anum_pg_class_relchecks - 1] = Int16GetDatum(rd_rel->relchecks);
        values[Anum_pg_class_relhasoids - 1] = BoolGetDatum(rd_rel->relhasoids);
        values[Anum_pg_class_relhaspkey - 1] = BoolGetDatum(rd_rel->relhaspkey);
-       values[Anum_pg_class_relhasexclusion - 1] = BoolGetDatum(rd_rel->relhasexclusion);
        values[Anum_pg_class_relhasrules - 1] = BoolGetDatum(rd_rel->relhasrules);
        values[Anum_pg_class_relhastriggers - 1] = BoolGetDatum(rd_rel->relhastriggers);
        values[Anum_pg_class_relhassubclass - 1] = BoolGetDatum(rd_rel->relhassubclass);
@@ -854,7 +883,8 @@ AddNewRelationType(const char *typeName,
                                   'x',                 /* fully TOASTable */
                                   -1,                  /* typmod */
                                   0,                   /* array dimensions for typBaseType */
-                                  false);              /* Type NOT NULL */
+                                  false,               /* Type NOT NULL */
+                                  InvalidOid); /* typcollation */
 }
 
 /* --------------------------------
@@ -896,6 +926,7 @@ heap_create_with_catalog(const char *relname,
                                                 TupleDesc tupdesc,
                                                 List *cooked_constraints,
                                                 char relkind,
+                                                char relpersistence,
                                                 bool shared_relation,
                                                 bool mapped_relation,
                                                 bool oidislocal,
@@ -903,11 +934,13 @@ heap_create_with_catalog(const char *relname,
                                                 OnCommitAction oncommit,
                                                 Datum reloptions,
                                                 bool use_user_acl,
-                                                bool allow_system_table_mods)
+                                                bool allow_system_table_mods,
+                                                bool if_not_exists)
 {
        Relation        pg_class_desc;
        Relation        new_rel_desc;
        Acl                *relacl;
+       Oid                     existing_relid;
        Oid                     old_type_oid;
        Oid                     new_type_oid;
        Oid                     new_array_oid = InvalidOid;
@@ -921,10 +954,27 @@ heap_create_with_catalog(const char *relname,
 
        CheckAttributeNamesTypes(tupdesc, relkind, allow_system_table_mods);
 
-       if (get_relname_relid(relname, relnamespace))
+       /*
+        * If the relation already exists, it's an error, unless the user specifies
+        * "IF NOT EXISTS".  In that case, we just print a notice and do nothing
+        * further.
+        */
+       existing_relid = get_relname_relid(relname, relnamespace);
+       if (existing_relid != InvalidOid)
+       {
+               if (if_not_exists)
+               {
+                       ereport(NOTICE,
+                                       (errcode(ERRCODE_DUPLICATE_TABLE),
+                                        errmsg("relation \"%s\" already exists, skipping",
+                                        relname)));
+                       heap_close(pg_class_desc, RowExclusiveLock);
+                       return InvalidOid;
+               }
                ereport(ERROR,
                                (errcode(ERRCODE_DUPLICATE_TABLE),
                                 errmsg("relation \"%s\" already exists", relname)));
+       }
 
        /*
         * Since we are going to create a rowtype as well, also check for
@@ -960,22 +1010,27 @@ heap_create_with_catalog(const char *relname,
         */
        if (!OidIsValid(relid))
        {
-               /* Use binary-upgrade overrides if applicable */
-               if (OidIsValid(binary_upgrade_next_heap_relfilenode) &&
+               /*
+                *      Use binary-upgrade override for pg_class.oid/relfilenode,
+                *      if supplied.
+                */
+               if (OidIsValid(binary_upgrade_next_heap_pg_class_oid) &&
                        (relkind == RELKIND_RELATION || relkind == RELKIND_SEQUENCE ||
-                        relkind == RELKIND_VIEW || relkind == RELKIND_COMPOSITE_TYPE))
+                        relkind == RELKIND_VIEW || relkind == RELKIND_COMPOSITE_TYPE ||
+                        relkind == RELKIND_FOREIGN_TABLE))
                {
-                       relid = binary_upgrade_next_heap_relfilenode;
-                       binary_upgrade_next_heap_relfilenode = InvalidOid;
+                       relid = binary_upgrade_next_heap_pg_class_oid;
+                       binary_upgrade_next_heap_pg_class_oid = InvalidOid;
                }
-               else if (OidIsValid(binary_upgrade_next_toast_relfilenode) &&
+               else if (OidIsValid(binary_upgrade_next_toast_pg_class_oid) &&
                                 relkind == RELKIND_TOASTVALUE)
                {
-                       relid = binary_upgrade_next_toast_relfilenode;
-                       binary_upgrade_next_toast_relfilenode = InvalidOid;
+                       relid = binary_upgrade_next_toast_pg_class_oid;
+                       binary_upgrade_next_toast_pg_class_oid = InvalidOid;
                }
                else
-                       relid = GetNewRelFileNode(reltablespace, pg_class_desc);
+                       relid = GetNewRelFileNode(reltablespace, pg_class_desc,
+                                                                         relpersistence);
        }
 
        /*
@@ -987,6 +1042,7 @@ heap_create_with_catalog(const char *relname,
                {
                        case RELKIND_RELATION:
                        case RELKIND_VIEW:
+                       case RELKIND_FOREIGN_TABLE:
                                relacl = get_user_default_acl(ACL_OBJECT_RELATION, ownerid,
                                                                                          relnamespace);
                                break;
@@ -1013,6 +1069,7 @@ heap_create_with_catalog(const char *relname,
                                                           relid,
                                                           tupdesc,
                                                           relkind,
+                                                          relpersistence,
                                                           shared_relation,
                                                           mapped_relation,
                                                           allow_system_table_mods);
@@ -1023,10 +1080,12 @@ heap_create_with_catalog(const char *relname,
         * Decide whether to create an array type over the relation's rowtype. We
         * do not create any array types for system catalogs (ie, those made
         * during initdb).      We create array types for regular relations, views,
-        * and composite types ... but not, eg, for toast tables or sequences.
+        * composite types and foreign tables ... but not, eg, for toast tables or
+        * sequences.
         */
        if (IsUnderPostmaster && (relkind == RELKIND_RELATION ||
                                                          relkind == RELKIND_VIEW ||
+                                                         relkind == RELKIND_FOREIGN_TABLE ||
                                                          relkind == RELKIND_COMPOSITE_TYPE))
                new_array_oid = AssignTypeArrayOid();
 
@@ -1086,7 +1145,8 @@ heap_create_with_catalog(const char *relname,
                                   'x',                 /* fully TOASTable */
                                   -1,                  /* typmod */
                                   0,                   /* array dimensions for typBaseType */
-                                  false);              /* Type NOT NULL */
+                                  false,               /* Type NOT NULL */
+                                  InvalidOid); /* typcollation */
 
                pfree(relarrayname);
        }
@@ -1123,7 +1183,8 @@ heap_create_with_catalog(const char *relname,
         * entry, so we needn't record them here.  Likewise, TOAST tables don't
         * need a namespace dependency (they live in a pinned namespace) nor an
         * owner dependency (they depend indirectly through the parent table), nor
-        * should they have any ACL entries.
+        * should they have any ACL entries.  The same applies for extension
+        * dependencies.
         *
         * Also, skip this in bootstrap mode, since we don't make dependencies
         * while bootstrapping.
@@ -1145,6 +1206,8 @@ heap_create_with_catalog(const char *relname,
 
                recordDependencyOnOwner(RelationRelationId, relid, ownerid);
 
+               recordDependencyOnCurrentExtension(&myself);
+
                if (reloftypeid)
                {
                        referenced.classId = TypeRelationId;
@@ -1160,12 +1223,15 @@ heap_create_with_catalog(const char *relname,
 
                        nnewmembers = aclmembers(relacl, &newmembers);
                        updateAclDependencies(RelationRelationId, relid, 0,
-                                                                 ownerid, true,
+                                                                 ownerid,
                                                                  0, NULL,
                                                                  nnewmembers, newmembers);
                }
        }
 
+       /* Post creation hook for new relation */
+       InvokeObjectAccessHook(OAT_POST_CREATE, RelationRelationId, relid, 0);
+
        /*
         * Store any supplied constraints and defaults.
         *
@@ -1181,6 +1247,25 @@ heap_create_with_catalog(const char *relname,
        if (oncommit != ONCOMMIT_NOOP)
                register_on_commit_action(relid, oncommit);
 
+       /*
+        * If this is an unlogged relation, it needs an init fork so that it
+        * can be correctly reinitialized on restart.  Since we're going to
+        * do an immediate sync, we ony need to xlog this if archiving or
+        * streaming is enabled.  And the immediate sync is required, because
+        * otherwise there's no guarantee that this will hit the disk before
+        * the next checkpoint moves the redo pointer.
+        */
+       if (relpersistence == RELPERSISTENCE_UNLOGGED)
+       {
+               Assert(relkind == RELKIND_RELATION || relkind == RELKIND_TOASTVALUE);
+
+               smgrcreate(new_rel_desc->rd_smgr, INIT_FORKNUM, false);
+               if (XLogIsNeeded())
+                       log_smgrcreate(&new_rel_desc->rd_smgr->smgr_rnode.node,
+                                                  INIT_FORKNUM);
+               smgrimmedsync(new_rel_desc->rd_smgr, INIT_FORKNUM);
+       }
+
        /*
         * ok, the relation has been cataloged, so close our relations and return
         * the OID of the newly created relation.
@@ -1532,20 +1617,37 @@ heap_drop_with_catalog(Oid relid)
 
        /*
         * There can no longer be anyone *else* touching the relation, but we
-        * might still have open queries or cursors in our own session.
+        * might still have open queries or cursors, or pending trigger events,
+        * in our own session.
         */
-       if (rel->rd_refcnt != 1)
-               ereport(ERROR,
-                               (errcode(ERRCODE_OBJECT_IN_USE),
-                                errmsg("cannot drop \"%s\" because "
-                                               "it is being used by active queries in this session",
-                                               RelationGetRelationName(rel))));
+       CheckTableNotInUse(rel, "DROP TABLE");
+
+       /*
+        * Delete pg_foreign_table tuple first.
+        */
+       if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+       {
+               Relation    rel;
+               HeapTuple   tuple;
+
+               rel = heap_open(ForeignTableRelationId, RowExclusiveLock);
+
+               tuple = SearchSysCache1(FOREIGNTABLEREL, ObjectIdGetDatum(relid));
+               if (!HeapTupleIsValid(tuple))
+                       elog(ERROR, "cache lookup failed for foreign table %u", relid);
+
+               simple_heap_delete(rel, &tuple->t_self);
+
+               ReleaseSysCache(tuple);
+               heap_close(rel, RowExclusiveLock);
+       }
 
        /*
         * Schedule unlinking of the relation's physical files at commit.
         */
        if (rel->rd_rel->relkind != RELKIND_VIEW &&
-               rel->rd_rel->relkind != RELKIND_COMPOSITE_TYPE)
+               rel->rd_rel->relkind != RELKIND_COMPOSITE_TYPE &&
+               rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE)
        {
                RelationDropStorage(rel);
        }
@@ -1760,6 +1862,7 @@ StoreRelCheck(Relation rel, char *ccname, Node *expr,
                                                  CONSTRAINT_CHECK,             /* Constraint Type */
                                                  false,        /* Is Deferrable */
                                                  false,        /* Is Deferred */
+                                                 true,         /* Is Validated */
                                                  RelationGetRelid(rel),                /* relation */
                                                  attNos,               /* attrs in the constraint */
                                                  keycount,             /* # attrs in the constraint */
@@ -2285,6 +2388,11 @@ cookDefault(ParseState *pstate,
                           errhint("You will need to rewrite or cast the expression.")));
        }
 
+       /*
+        * Finally, take care of collations in the finished expression.
+        */
+       assign_expr_collations(pstate, expr);
+
        return expr;
 }
 
@@ -2312,6 +2420,11 @@ cookConstraint(ParseState *pstate,
         */
        expr = coerce_to_boolean(pstate, expr, "CHECK");
 
+       /*
+        * Take care of collations.
+        */
+       assign_expr_collations(pstate, expr);
+
        /*
         * Make sure no outside relations are referred to.
         */