]> granicus.if.org Git - postgresql/blobdiff - src/backend/parser/parse_utilcmd.c
Allow WITH clauses to be attached to INSERT, UPDATE, DELETE statements.
[postgresql] / src / backend / parser / parse_utilcmd.c
index 89d81bff62431876654ad7b2e3eb87f2cb80fb73..a8aee204c74a5c66fb7bf43271bbc8d226787fc2 100644 (file)
  * a quick copyObject() call before manipulating the query tree.
  *
  *
- * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- *     $PostgreSQL: pgsql/src/backend/parser/parse_utilcmd.c,v 2.33 2009/12/20 18:28:14 tgl Exp $
+ *     src/backend/parser/parse_utilcmd.c
  *
  *-------------------------------------------------------------------------
  */
 #include "parser/parse_clause.h"
 #include "parser/parse_expr.h"
 #include "parser/parse_relation.h"
+#include "parser/parse_target.h"
 #include "parser/parse_type.h"
 #include "parser/parse_utilcmd.h"
 #include "parser/parser.h"
 #include "rewrite/rewriteManip.h"
+#include "storage/lock.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/relcache.h"
 #include "utils/syscache.h"
+#include "utils/typcache.h"
 
 
 /* State shared by transformCreateStmt and its subroutines */
@@ -103,6 +106,8 @@ static void transformTableConstraint(ParseState *pstate,
                                                 Constraint *constraint);
 static void transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
                                         InhRelation *inhrelation);
+static void transformOfType(ParseState *pstate, CreateStmtContext *cxt,
+                               TypeName *ofTypename);
 static char *chooseIndexName(const RangeVar *relation, IndexStmt *index_stmt);
 static IndexStmt *generateClonedIndexStmt(CreateStmtContext *cxt,
                                                Relation parent_index, AttrNumber *attmap);
@@ -182,6 +187,11 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
        cxt.pkey = NULL;
        cxt.hasoids = interpretOidsOption(stmt->options);
 
+       Assert(!stmt->ofTypename || !stmt->inhRelations);       /* grammar enforces */
+
+       if (stmt->ofTypename)
+               transformOfType(pstate, &cxt, stmt->ofTypename);
+
        /*
         * Run through each primary element in the table creation clause. Separate
         * column defs from constraints, and do preliminary analysis.
@@ -265,8 +275,9 @@ transformColumnDefinition(ParseState *pstate, CreateStmtContext *cxt,
 
        /* Check for SERIAL pseudo-types */
        is_serial = false;
-       if (list_length(column->typeName->names) == 1 &&
-               !column->typeName->pct_type)
+       if (column->typeName
+               && list_length(column->typeName->names) == 1
+               && !column->typeName->pct_type)
        {
                char       *typname = strVal(linitial(column->typeName->names));
 
@@ -298,7 +309,8 @@ transformColumnDefinition(ParseState *pstate, CreateStmtContext *cxt,
        }
 
        /* Do necessary work on the column type declaration */
-       transformColumnType(pstate, column);
+       if (column->typeName)
+               transformColumnType(pstate, column);
 
        /* Special actions for SERIAL pseudo-types */
        if (is_serial)
@@ -349,6 +361,18 @@ transformColumnDefinition(ParseState *pstate, CreateStmtContext *cxt,
                seqstmt->sequence = makeRangeVar(snamespace, sname, -1);
                seqstmt->options = NIL;
 
+               /*
+                * If this is ALTER ADD COLUMN, make sure the sequence will be owned
+                * by the table's owner.  The current user might be someone else
+                * (perhaps a superuser, or someone who's only a member of the owning
+                * role), but the SEQUENCE OWNED BY mechanisms will bleat unless
+                * table and sequence have exactly the same owning role.
+                */
+               if (cxt->rel)
+                       seqstmt->ownerId = cxt->rel->rd_rel->relowner;
+               else
+                       seqstmt->ownerId = InvalidOid;
+
                cxt->blist = lappend(cxt->blist, seqstmt);
 
                /*
@@ -475,6 +499,7 @@ transformColumnDefinition(ParseState *pstate, CreateStmtContext *cxt,
                                break;
 
                        case CONSTR_FOREIGN:
+
                                /*
                                 * Fill in the current attribute's name and throw it into the
                                 * list of FK constraints to be processed later.
@@ -700,8 +725,8 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
 
                        /* Copy comment on constraint */
                        if ((inhRelation->options & CREATE_TABLE_LIKE_COMMENTS) &&
-                               (comment = GetComment(GetConstraintByName(RelationGetRelid(relation),
-                                                                                                                 n->conname),
+                               (comment = GetComment(get_constraint_oid(RelationGetRelid(relation),
+                                                                                                                 n->conname, false),
                                                                          ConstraintRelationId,
                                                                          0)) != NULL)
                        {
@@ -745,17 +770,15 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
                        /* Copy comment on index */
                        if (inhRelation->options & CREATE_TABLE_LIKE_COMMENTS)
                        {
-                               Form_pg_attribute *attrs;
-                               CommentStmt        *stmt;
-                               int                             colno;
-
                                comment = GetComment(parent_index_oid, RelationRelationId, 0);
-                               
+
                                if (comment != NULL)
                                {
+                                       CommentStmt *stmt;
+
                                        /*
-                                        * We have to assign the index a name now, so that we
-                                        * can reference it in CommentStmt.
+                                        * We have to assign the index a name now, so that we can
+                                        * reference it in CommentStmt.
                                         */
                                        if (index_stmt->idxname == NULL)
                                                index_stmt->idxname = chooseIndexName(cxt->relation,
@@ -771,41 +794,6 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
 
                                        cxt->alist = lappend(cxt->alist, stmt);
                                }
-
-                               /* Copy comments on index's columns */
-                               attrs = RelationGetDescr(parent_index)->attrs;
-                               for (colno = 1;
-                                        colno <= RelationGetNumberOfAttributes(parent_index);
-                                        colno++)
-                               {
-                                       char       *attname;
-
-                                       comment = GetComment(parent_index_oid, RelationRelationId,
-                                                                                colno);
-                                       if (comment == NULL)
-                                               continue;
-
-                                       /*
-                                        * We have to assign the index a name now, so that we
-                                        * can reference it in CommentStmt.
-                                        */
-                                       if (index_stmt->idxname == NULL)
-                                               index_stmt->idxname = chooseIndexName(cxt->relation,
-                                                                                                                         index_stmt);
-
-                                       attname = NameStr(attrs[colno - 1]->attname);
-
-                                       stmt = makeNode(CommentStmt);
-                                       stmt->objtype = OBJECT_COLUMN;
-                                       stmt->objname =
-                                               list_make3(makeString(cxt->relation->schemaname),
-                                                                  makeString(index_stmt->idxname),
-                                                                  makeString(attname));
-                                       stmt->objargs = NIL;
-                                       stmt->comment = comment;
-
-                                       cxt->alist = lappend(cxt->alist, stmt);
-                               }
                        }
 
                        /* Save it in the inh_indexes list for the time being */
@@ -823,37 +811,70 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
        heap_close(relation, NoLock);
 }
 
+static void
+transformOfType(ParseState *pstate, CreateStmtContext *cxt, TypeName *ofTypename)
+{
+       HeapTuple       tuple;
+       Form_pg_type typ;
+       TupleDesc       tupdesc;
+       int                     i;
+       Oid                     ofTypeId;
+
+       AssertArg(ofTypename);
+
+       tuple = typenameType(NULL, ofTypename, NULL);
+       typ = (Form_pg_type) GETSTRUCT(tuple);
+       ofTypeId = HeapTupleGetOid(tuple);
+       ofTypename->typeOid = ofTypeId;         /* cached for later */
+
+       if (typ->typtype != TYPTYPE_COMPOSITE)
+               ereport(ERROR,
+                               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                errmsg("type %s is not a composite type",
+                                               format_type_be(ofTypeId))));
+
+       tupdesc = lookup_rowtype_tupdesc(ofTypeId, -1);
+       for (i = 0; i < tupdesc->natts; i++)
+       {
+               Form_pg_attribute attr = tupdesc->attrs[i];
+               ColumnDef  *n = makeNode(ColumnDef);
+
+               if (attr->attisdropped)
+                       continue;
+
+               n->colname = pstrdup(NameStr(attr->attname));
+               n->typeName = makeTypeNameFromOid(attr->atttypid, attr->atttypmod);
+               n->constraints = NULL;
+               n->is_local = true;
+               n->is_from_type = true;
+               cxt->columns = lappend(cxt->columns, n);
+       }
+       DecrTupleDescRefCount(tupdesc);
+
+       ReleaseSysCache(tuple);
+}
+
 /*
  * chooseIndexName
  *
- * Set name for unnamed index. See also the same logic in DefineIndex.
+ * Compute name for an index.  This must match code in indexcmds.c.
+ *
+ * XXX this is inherently broken because the indexes aren't created
+ * immediately, so we fail to resolve conflicts when the same name is
+ * derived for multiple indexes.  However, that's a reasonably uncommon
+ * situation, so we'll live with it for now.
  */
 static char *
 chooseIndexName(const RangeVar *relation, IndexStmt *index_stmt)
 {
-       Oid     namespaceId;
-       
-       namespaceId = RangeVarGetCreationNamespace(relation);
-       if (index_stmt->primary)
-       {
-               /* no need for column list with pkey */
-               return ChooseRelationName(relation->relname, NULL, 
-                                                                 "pkey", namespaceId);
-       }
-       else if (index_stmt->excludeOpNames != NIL)
-       {
-               IndexElem  *iparam = (IndexElem *) linitial(index_stmt->indexParams);
+       Oid                     namespaceId;
+       List       *colnames;
 
-               return ChooseRelationName(relation->relname, iparam->name,
-                                                                 "exclusion", namespaceId);
-       }
-       else
-       {
-               IndexElem  *iparam = (IndexElem *) linitial(index_stmt->indexParams);
-
-               return ChooseRelationName(relation->relname, iparam->name,
-                                                                 "key", namespaceId);
-       }
+       namespaceId = RangeVarGetCreationNamespace(relation);
+       colnames = ChooseIndexColumnNames(index_stmt->indexParams);
+       return ChooseIndexName(relation->relname, namespaceId,
+                                                  colnames, index_stmt->excludeOpNames,
+                                                  index_stmt->primary, index_stmt->isconstraint);
 }
 
 /*
@@ -865,6 +886,7 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
                                                AttrNumber *attmap)
 {
        Oid                     source_relid = RelationGetRelid(source_idx);
+       Form_pg_attribute *attrs = RelationGetDescr(source_idx)->attrs;
        HeapTuple       ht_idxrel;
        HeapTuple       ht_idx;
        Form_pg_class idxrelrec;
@@ -884,9 +906,7 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
         * Fetch pg_class tuple of source index.  We can't use the copy in the
         * relcache entry because it doesn't include optional fields.
         */
-       ht_idxrel = SearchSysCache(RELOID,
-                                                          ObjectIdGetDatum(source_relid),
-                                                          0, 0, 0);
+       ht_idxrel = SearchSysCache1(RELOID, ObjectIdGetDatum(source_relid));
        if (!HeapTupleIsValid(ht_idxrel))
                elog(ERROR, "cache lookup failed for relation %u", source_relid);
        idxrelrec = (Form_pg_class) GETSTRUCT(ht_idxrel);
@@ -931,16 +951,15 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
         */
        if (index->primary || index->unique || idxrelrec->relhasexclusion)
        {
-               Oid             constraintId = get_index_constraint(source_relid);
+               Oid                     constraintId = get_index_constraint(source_relid);
 
                if (OidIsValid(constraintId))
                {
                        HeapTuple       ht_constr;
                        Form_pg_constraint conrec;
 
-                       ht_constr = SearchSysCache(CONSTROID,
-                                                                          ObjectIdGetDatum(constraintId),
-                                                                          0, 0, 0);
+                       ht_constr = SearchSysCache1(CONSTROID,
+                                                                               ObjectIdGetDatum(constraintId));
                        if (!HeapTupleIsValid(ht_constr))
                                elog(ERROR, "cache lookup failed for constraint %u",
                                         constraintId);
@@ -953,9 +972,9 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
                        /* If it's an exclusion constraint, we need the operator names */
                        if (idxrelrec->relhasexclusion)
                        {
-                               Datum  *elems;
-                               int             nElems;
-                               int             i;
+                               Datum      *elems;
+                               int                     nElems;
+                               int                     i;
 
                                Assert(conrec->contype == CONSTRAINT_EXCLUSION);
                                /* Extract operator OIDs from the pg_constraint tuple */
@@ -979,9 +998,8 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
                                        char       *nspname;
                                        List       *namelist;
 
-                                       opertup = SearchSysCache(OPEROID,
-                                                                                        ObjectIdGetDatum(operid),
-                                                                                        0, 0, 0);
+                                       opertup = SearchSysCache1(OPEROID,
+                                                                                         ObjectIdGetDatum(operid));
                                        if (!HeapTupleIsValid(opertup))
                                                elog(ERROR, "cache lookup failed for operator %u",
                                                         operid);
@@ -1060,6 +1078,9 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
                        keycoltype = exprType(indexkey);
                }
 
+               /* Copy the original index column name */
+               iparam->indexcolname = pstrdup(NameStr(attrs[keyno]->attname));
+
                /* Add the operator class name, if non-default */
                iparam->opclass = get_opclass(indclass->values[keyno], keycoltype);
 
@@ -1130,9 +1151,7 @@ get_opclass(Oid opclass, Oid actual_datatype)
        Form_pg_opclass opc_rec;
        List       *result = NIL;
 
-       ht_opc = SearchSysCache(CLAOID,
-                                                       ObjectIdGetDatum(opclass),
-                                                       0, 0, 0);
+       ht_opc = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
        if (!HeapTupleIsValid(ht_opc))
                elog(ERROR, "cache lookup failed for opclass %u", opclass);
        opc_rec = (Form_pg_opclass) GETSTRUCT(ht_opc);
@@ -1308,17 +1327,17 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
        index->concurrent = false;
 
        /*
-        * If it's an EXCLUDE constraint, the grammar returns a list of pairs
-        * of IndexElems and operator names.  We have to break that apart into
+        * If it's an EXCLUDE constraint, the grammar returns a list of pairs of
+        * IndexElems and operator names.  We have to break that apart into
         * separate lists.
         */
        if (constraint->contype == CONSTR_EXCLUSION)
        {
                foreach(lc, constraint->exclusions)
                {
-                       List    *pair = (List *) lfirst(lc);
-                       IndexElem *elem;
-                       List   *opname;
+                       List       *pair = (List *) lfirst(lc);
+                       IndexElem  *elem;
+                       List       *opname;
 
                        Assert(list_length(pair) == 2);
                        elem = (IndexElem *) linitial(pair);
@@ -1453,6 +1472,7 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
                iparam = makeNode(IndexElem);
                iparam->name = pstrdup(key);
                iparam->expr = NULL;
+               iparam->indexcolname = NULL;
                iparam->opclass = NIL;
                iparam->ordering = SORTBY_DEFAULT;
                iparam->nulls_ordering = SORTBY_NULLS_DEFAULT;
@@ -1524,7 +1544,7 @@ transformFKConstraints(ParseState *pstate, CreateStmtContext *cxt,
 }
 
 /*
- * transformIndexStmt - parse analysis for CREATE INDEX
+ * transformIndexStmt - parse analysis for CREATE INDEX and ALTER TABLE
  *
  * Note: this is a no-op for an index not using either index expressions or
  * a predicate expression.     There are several code paths that create indexes
@@ -1550,7 +1570,8 @@ transformIndexStmt(IndexStmt *stmt, const char *queryString)
         * because addRangeTableEntry() would acquire only AccessShareLock,
         * leaving DefineIndex() needing to do a lock upgrade with consequent risk
         * of deadlock.  Make sure this stays in sync with the type of lock
-        * DefineIndex() wants.
+        * DefineIndex() wants. If we are being called by ALTER TABLE, we will
+        * already hold a higher lock.
         */
        rel = heap_openrv(stmt->relation,
                                  (stmt->concurrent ? ShareUpdateExclusiveLock : ShareLock));
@@ -1581,6 +1602,11 @@ transformIndexStmt(IndexStmt *stmt, const char *queryString)
 
                if (ielem->expr)
                {
+                       /* Extract preliminary index col name before transforming expr */
+                       if (ielem->indexcolname == NULL)
+                               ielem->indexcolname = FigureIndexColname(ielem->expr);
+
+                       /* Now do parse transformation of the expression */
                        ielem->expr = transformExpr(pstate, ielem->expr);
 
                        /*
@@ -1841,6 +1867,35 @@ transformRuleStmt(RuleStmt *stmt, const char *queryString,
                                        break;
                        }
 
+                       /*
+                        * OLD/NEW are not allowed in WITH queries, because they would
+                        * amount to outer references for the WITH, which we disallow.
+                        * However, they were already in the outer rangetable when we
+                        * analyzed the query, so we have to check.
+                        *
+                        * Note that in the INSERT...SELECT case, we need to examine
+                        * the CTE lists of both top_subqry and sub_qry.
+                        *
+                        * Note that we aren't digging into the body of the query
+                        * looking for WITHs in nested sub-SELECTs.  A WITH down there
+                        * can legitimately refer to OLD/NEW, because it'd be an
+                        * indirect-correlated outer reference.
+                        */
+                       if (rangeTableEntry_used((Node *) top_subqry->cteList,
+                                                                        PRS2_OLD_VARNO, 0) ||
+                               rangeTableEntry_used((Node *) sub_qry->cteList,
+                                                                         PRS2_OLD_VARNO, 0))
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                                errmsg("cannot refer to OLD within WITH query")));
+                       if (rangeTableEntry_used((Node *) top_subqry->cteList,
+                                                                        PRS2_NEW_VARNO, 0) ||
+                               rangeTableEntry_used((Node *) sub_qry->cteList,
+                                                                        PRS2_NEW_VARNO, 0))
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                                errmsg("cannot refer to NEW within WITH query")));
+
                        /*
                         * For efficiency's sake, add OLD to the rule action's jointree
                         * only if it was actually referenced in the statement or qual.
@@ -1910,6 +1965,7 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString)
        List       *newcmds = NIL;
        bool            skipValidation = true;
        AlterTableCmd *newcmd;
+       LOCKMODE        lockmode;
 
        /*
         * We must not scribble on the passed-in AlterTableStmt, so copy it. (This
@@ -1918,13 +1974,19 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString)
        stmt = (AlterTableStmt *) copyObject(stmt);
 
        /*
-        * Acquire exclusive lock on the target relation, which will be held until
+        * Assign the appropriate lock level for this list of subcommands.
+        */
+       lockmode = AlterTableGetLockLevel(stmt->cmds);
+
+       /*
+        * Acquire appropriate lock on the target relation, which will be held until
         * end of transaction.  This ensures any decisions we make here based on
         * the state of the relation will still be good at execution. We must get
-        * exclusive lock now because execution will; taking a lower grade lock
-        * now and trying to upgrade later risks deadlock.
+        * lock now because execution will later require it; taking a lower grade lock
+        * now and trying to upgrade later risks deadlock.  Any new commands we add
+        * after this must not upgrade the lock level requested here.
         */
-       rel = relation_openrv(stmt->relation, AccessExclusiveLock);
+       rel = relation_openrv(stmt->relation, lockmode);
 
        /* Set up pstate */
        pstate = make_parsestate(NULL);