1 /*-------------------------------------------------------------------------
4 * routines for defining a rewrite rule
6 * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * $PostgreSQL: pgsql/src/backend/rewrite/rewriteDefine.c,v 1.130 2008/10/04 21:56:54 tgl Exp $
13 *-------------------------------------------------------------------------
17 #include "access/heapam.h"
18 #include "catalog/dependency.h"
19 #include "catalog/indexing.h"
20 #include "catalog/namespace.h"
21 #include "catalog/pg_rewrite.h"
22 #include "miscadmin.h"
23 #include "nodes/nodeFuncs.h"
24 #include "parser/parse_utilcmd.h"
25 #include "rewrite/rewriteDefine.h"
26 #include "rewrite/rewriteManip.h"
27 #include "rewrite/rewriteSupport.h"
28 #include "storage/smgr.h"
29 #include "utils/acl.h"
30 #include "utils/builtins.h"
31 #include "utils/inval.h"
32 #include "utils/lsyscache.h"
33 #include "utils/rel.h"
34 #include "utils/syscache.h"
35 #include "utils/tqual.h"
38 static void checkRuleResultList(List *targetList, TupleDesc resultDesc,
40 static bool setRuleCheckAsUser_walker(Node *node, Oid *context);
41 static void setRuleCheckAsUser_Query(Query *qry, Oid userid);
46 * takes the arguments and inserts them as a row into the system
47 * relation "pg_rewrite"
50 InsertRule(char *rulname,
53 AttrNumber evslot_index,
59 char *evqual = nodeToString(event_qual);
60 char *actiontree = nodeToString((Node *) action);
62 Datum values[Natts_pg_rewrite];
63 char nulls[Natts_pg_rewrite];
64 char replaces[Natts_pg_rewrite];
66 Relation pg_rewrite_desc;
72 bool is_update = false;
75 * Set up *nulls and *values arrays
77 MemSet(nulls, ' ', sizeof(nulls));
80 namestrcpy(&rname, rulname);
81 values[i++] = NameGetDatum(&rname); /* rulename */
82 values[i++] = ObjectIdGetDatum(eventrel_oid); /* ev_class */
83 values[i++] = Int16GetDatum(evslot_index); /* ev_attr */
84 values[i++] = CharGetDatum(evtype + '0'); /* ev_type */
85 values[i++] = CharGetDatum(RULE_FIRES_ON_ORIGIN); /* ev_enabled */
86 values[i++] = BoolGetDatum(evinstead); /* is_instead */
87 values[i++] = CStringGetTextDatum(evqual); /* ev_qual */
88 values[i++] = CStringGetTextDatum(actiontree); /* ev_action */
91 * Ready to store new pg_rewrite tuple
93 pg_rewrite_desc = heap_open(RewriteRelationId, RowExclusiveLock);
96 * Check to see if we are replacing an existing tuple
98 oldtup = SearchSysCache(RULERELNAME,
99 ObjectIdGetDatum(eventrel_oid),
100 PointerGetDatum(rulname),
103 if (HeapTupleIsValid(oldtup))
107 (errcode(ERRCODE_DUPLICATE_OBJECT),
108 errmsg("rule \"%s\" for relation \"%s\" already exists",
109 rulname, get_rel_name(eventrel_oid))));
112 * When replacing, we don't need to replace every attribute
114 MemSet(replaces, ' ', sizeof(replaces));
115 replaces[Anum_pg_rewrite_ev_attr - 1] = 'r';
116 replaces[Anum_pg_rewrite_ev_type - 1] = 'r';
117 replaces[Anum_pg_rewrite_is_instead - 1] = 'r';
118 replaces[Anum_pg_rewrite_ev_qual - 1] = 'r';
119 replaces[Anum_pg_rewrite_ev_action - 1] = 'r';
121 tup = heap_modifytuple(oldtup, RelationGetDescr(pg_rewrite_desc),
122 values, nulls, replaces);
124 simple_heap_update(pg_rewrite_desc, &tup->t_self, tup);
126 ReleaseSysCache(oldtup);
128 rewriteObjectId = HeapTupleGetOid(tup);
133 tup = heap_formtuple(pg_rewrite_desc->rd_att, values, nulls);
135 rewriteObjectId = simple_heap_insert(pg_rewrite_desc, tup);
138 /* Need to update indexes in either case */
139 CatalogUpdateIndexes(pg_rewrite_desc, tup);
143 /* If replacing, get rid of old dependencies and make new ones */
145 deleteDependencyRecordsFor(RewriteRelationId, rewriteObjectId);
148 * Install dependency on rule's relation to ensure it will go away on
149 * relation deletion. If the rule is ON SELECT, make the dependency
150 * implicit --- this prevents deleting a view's SELECT rule. Other kinds
151 * of rules can be AUTO.
153 myself.classId = RewriteRelationId;
154 myself.objectId = rewriteObjectId;
155 myself.objectSubId = 0;
157 referenced.classId = RelationRelationId;
158 referenced.objectId = eventrel_oid;
159 referenced.objectSubId = 0;
161 recordDependencyOn(&myself, &referenced,
162 (evtype == CMD_SELECT) ? DEPENDENCY_INTERNAL : DEPENDENCY_AUTO);
165 * Also install dependencies on objects referenced in action and qual.
167 recordDependencyOnExpr(&myself, (Node *) action, NIL,
170 if (event_qual != NULL)
172 /* Find query containing OLD/NEW rtable entries */
173 Query *qry = (Query *) linitial(action);
175 qry = getInsertSelectQuery(qry, NULL);
176 recordDependencyOnExpr(&myself, event_qual, qry->rtable,
180 heap_close(pg_rewrite_desc, RowExclusiveLock);
182 return rewriteObjectId;
187 * Execute a CREATE RULE command.
190 DefineRule(RuleStmt *stmt, const char *queryString)
196 /* Parse analysis ... */
197 transformRuleStmt(stmt, queryString, &actions, &whereClause);
199 /* ... find the relation ... */
200 relId = RangeVarGetRelid(stmt->relation, false);
202 /* ... and execute */
203 DefineQueryRewrite(stmt->rulename,
217 * This is essentially the same as DefineRule() except that the rule's
218 * action and qual have already been passed through parse analysis.
221 DefineQueryRewrite(char *rulename,
229 Relation event_relation;
234 bool RelisBecomingView = false;
237 * If we are installing an ON SELECT rule, we had better grab
238 * AccessExclusiveLock to ensure no SELECTs are currently running on the
239 * event relation. For other types of rules, it might be sufficient to
240 * grab ShareLock to lock out insert/update/delete actions. But for now,
241 * let's just grab AccessExclusiveLock all the time.
243 event_relation = heap_open(event_relid, AccessExclusiveLock);
246 * Check user has permission to apply rules to this relation.
248 if (!pg_class_ownercheck(event_relid, GetUserId()))
249 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
250 RelationGetRelationName(event_relation));
253 * No rule actions that modify OLD or NEW
257 query = (Query *) lfirst(l);
258 if (query->resultRelation == 0)
260 /* Don't be fooled by INSERT/SELECT */
261 if (query != getInsertSelectQuery(query, NULL))
263 if (query->resultRelation == PRS2_OLD_VARNO)
265 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
266 errmsg("rule actions on OLD are not implemented"),
267 errhint("Use views or triggers instead.")));
268 if (query->resultRelation == PRS2_NEW_VARNO)
270 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
271 errmsg("rule actions on NEW are not implemented"),
272 errhint("Use triggers instead.")));
275 if (event_type == CMD_SELECT)
278 * Rules ON SELECT are restricted to view definitions
280 * So there cannot be INSTEAD NOTHING, ...
282 if (list_length(action) == 0)
284 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
285 errmsg("INSTEAD NOTHING rules on SELECT are not implemented"),
286 errhint("Use views instead.")));
289 * ... there cannot be multiple actions, ...
291 if (list_length(action) > 1)
293 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
294 errmsg("multiple actions for rules on SELECT are not implemented")));
297 * ... the one action must be a SELECT, ...
299 query = (Query *) linitial(action);
301 query->commandType != CMD_SELECT ||
302 query->utilityStmt != NULL ||
303 query->intoClause != NULL)
305 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
306 errmsg("rules on SELECT must have action INSTEAD SELECT")));
309 * ... there can be no rule qual, ...
311 if (event_qual != NULL)
313 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
314 errmsg("event qualifications are not implemented for rules on SELECT")));
317 * ... the targetlist of the SELECT action must exactly match the
318 * event relation, ...
320 checkRuleResultList(query->targetList,
321 RelationGetDescr(event_relation),
325 * ... there must not be another ON SELECT rule already ...
327 if (!replace && event_relation->rd_rules != NULL)
331 for (i = 0; i < event_relation->rd_rules->numLocks; i++)
335 rule = event_relation->rd_rules->rules[i];
336 if (rule->event == CMD_SELECT)
338 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
339 errmsg("\"%s\" is already a view",
340 RelationGetRelationName(event_relation))));
345 * ... and finally the rule must be named _RETURN.
347 if (strcmp(rulename, ViewSelectRuleName) != 0)
350 * In versions before 7.3, the expected name was _RETviewname. For
351 * backwards compatibility with old pg_dump output, accept that
352 * and silently change it to _RETURN. Since this is just a quick
353 * backwards-compatibility hack, limit the number of characters
354 * checked to a few less than NAMEDATALEN; this saves having to
355 * worry about where a multibyte character might have gotten
358 if (strncmp(rulename, "_RET", 4) != 0 ||
359 strncmp(rulename + 4, RelationGetRelationName(event_relation),
360 NAMEDATALEN - 4 - 4) != 0)
362 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
363 errmsg("view rule for \"%s\" must be named \"%s\"",
364 RelationGetRelationName(event_relation),
365 ViewSelectRuleName)));
366 rulename = pstrdup(ViewSelectRuleName);
370 * Are we converting a relation to a view?
372 * If so, check that the relation is empty because the storage for the
373 * relation is going to be deleted. Also insist that the rel not have
374 * any triggers, indexes, or child tables.
376 if (event_relation->rd_rel->relkind != RELKIND_VIEW)
378 HeapScanDesc scanDesc;
380 scanDesc = heap_beginscan(event_relation, SnapshotNow, 0, NULL);
381 if (heap_getnext(scanDesc, ForwardScanDirection) != NULL)
383 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
384 errmsg("could not convert table \"%s\" to a view because it is not empty",
385 RelationGetRelationName(event_relation))));
386 heap_endscan(scanDesc);
388 if (event_relation->rd_rel->reltriggers != 0)
390 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
391 errmsg("could not convert table \"%s\" to a view because it has triggers",
392 RelationGetRelationName(event_relation)),
393 errhint("In particular, the table cannot be involved in any foreign key relationships.")));
395 if (event_relation->rd_rel->relhasindex)
397 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
398 errmsg("could not convert table \"%s\" to a view because it has indexes",
399 RelationGetRelationName(event_relation))));
401 if (event_relation->rd_rel->relhassubclass)
403 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
404 errmsg("could not convert table \"%s\" to a view because it has child tables",
405 RelationGetRelationName(event_relation))));
407 RelisBecomingView = true;
413 * For non-SELECT rules, a RETURNING list can appear in at most one of
414 * the actions ... and there can't be any RETURNING list at all in a
415 * conditional or non-INSTEAD rule. (Actually, there can be at most
416 * one RETURNING list across all rules on the same event, but it seems
417 * best to enforce that at rule expansion time.) If there is a
418 * RETURNING list, it must match the event relation.
420 bool haveReturning = false;
424 query = (Query *) lfirst(l);
426 if (!query->returningList)
430 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
431 errmsg("cannot have multiple RETURNING lists in a rule")));
432 haveReturning = true;
433 if (event_qual != NULL)
435 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
436 errmsg("RETURNING lists are not supported in conditional rules")));
439 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
440 errmsg("RETURNING lists are not supported in non-INSTEAD rules")));
441 checkRuleResultList(query->returningList,
442 RelationGetDescr(event_relation),
448 * This rule is allowed - prepare to install it.
452 /* discard rule if it's null action and not INSTEAD; it's a no-op */
453 if (action != NIL || is_instead)
455 ruleId = InsertRule(rulename,
465 * Set pg_class 'relhasrules' field TRUE for event relation. If
466 * appropriate, also modify the 'relkind' field to show that the
467 * relation is now a view.
469 * Important side effect: an SI notice is broadcast to force all
470 * backends (including me!) to update relcache entries with the new
473 SetRelationRuleStatus(event_relid, true, RelisBecomingView);
477 * IF the relation is becoming a view, delete the storage files associated
478 * with it. NB: we had better have AccessExclusiveLock to do this ...
480 * XXX what about getting rid of its TOAST table? For now, we don't.
482 if (RelisBecomingView)
486 RelationOpenSmgr(event_relation);
487 for (forknum = 0; forknum <= MAX_FORKNUM; forknum++)
488 if (smgrexists(event_relation->rd_smgr, forknum))
489 smgrscheduleunlink(event_relation->rd_smgr, forknum,
490 event_relation->rd_istemp);
491 RelationCloseSmgr(event_relation);
494 /* Close rel, but keep lock till commit... */
495 heap_close(event_relation, NoLock);
499 * checkRuleResultList
500 * Verify that targetList produces output compatible with a tupledesc
502 * The targetList might be either a SELECT targetlist, or a RETURNING list;
503 * isSelect tells which. (This is mostly used for choosing error messages,
504 * but also we don't enforce column name matching for RETURNING.)
507 checkRuleResultList(List *targetList, TupleDesc resultDesc, bool isSelect)
513 foreach(tllist, targetList)
515 TargetEntry *tle = (TargetEntry *) lfirst(tllist);
517 Form_pg_attribute attr;
520 /* resjunk entries may be ignored */
524 if (i > resultDesc->natts)
526 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
528 errmsg("SELECT rule's target list has too many entries") :
529 errmsg("RETURNING list has too many entries")));
531 attr = resultDesc->attrs[i - 1];
532 attname = NameStr(attr->attname);
535 * Disallow dropped columns in the relation. This won't happen in the
536 * cases we actually care about (namely creating a view via CREATE
537 * TABLE then CREATE RULE, or adding a RETURNING rule to a view).
538 * Trying to cope with it is much more trouble than it's worth,
539 * because we'd have to modify the rule to insert dummy NULLs at the
542 if (attr->attisdropped)
544 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
545 errmsg("cannot convert relation containing dropped columns to view")));
547 if (isSelect && strcmp(tle->resname, attname) != 0)
549 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
550 errmsg("SELECT rule's target entry %d has different column name from \"%s\"", i, attname)));
552 if (attr->atttypid != exprType((Node *) tle->expr))
554 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
556 errmsg("SELECT rule's target entry %d has different type from column \"%s\"",
558 errmsg("RETURNING list's entry %d has different type from column \"%s\"",
562 * Allow typmods to be different only if one of them is -1, ie,
563 * "unspecified". This is necessary for cases like "numeric", where
564 * the table will have a filled-in default length but the select
565 * rule's expression will probably have typmod = -1.
567 tletypmod = exprTypmod((Node *) tle->expr);
568 if (attr->atttypmod != tletypmod &&
569 attr->atttypmod != -1 && tletypmod != -1)
571 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
573 errmsg("SELECT rule's target entry %d has different size from column \"%s\"",
575 errmsg("RETURNING list's entry %d has different size from column \"%s\"",
579 if (i != resultDesc->natts)
581 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
583 errmsg("SELECT rule's target list has too few entries") :
584 errmsg("RETURNING list has too few entries")));
589 * Recursively scan a query or expression tree and set the checkAsUser
590 * field to the given userid in all rtable entries.
592 * Note: for a view (ON SELECT rule), the checkAsUser field of the *OLD*
593 * RTE entry will be overridden when the view rule is expanded, and the
594 * checkAsUser field of the *NEW* entry is irrelevant because that entry's
595 * requiredPerms bits will always be zero. However, for other types of rules
596 * it's important to set these fields to match the rule owner. So we just set
600 setRuleCheckAsUser(Node *node, Oid userid)
602 (void) setRuleCheckAsUser_walker(node, &userid);
606 setRuleCheckAsUser_walker(Node *node, Oid *context)
610 if (IsA(node, Query))
612 setRuleCheckAsUser_Query((Query *) node, *context);
615 return expression_tree_walker(node, setRuleCheckAsUser_walker,
620 setRuleCheckAsUser_Query(Query *qry, Oid userid)
624 /* Set all the RTEs in this query node */
625 foreach(l, qry->rtable)
627 RangeTblEntry *rte = (RangeTblEntry *) lfirst(l);
629 if (rte->rtekind == RTE_SUBQUERY)
631 /* Recurse into subquery in FROM */
632 setRuleCheckAsUser_Query(rte->subquery, userid);
635 rte->checkAsUser = userid;
638 /* Recurse into subquery-in-WITH */
639 foreach(l, qry->cteList)
641 CommonTableExpr *cte = (CommonTableExpr *) lfirst(l);
643 setRuleCheckAsUser_Query((Query *) cte->ctequery, userid);
646 /* If there are sublinks, search for them and process their RTEs */
647 if (qry->hasSubLinks)
648 query_tree_walker(qry, setRuleCheckAsUser_walker, (void *) &userid,
649 QTW_IGNORE_RC_SUBQUERIES);
654 * Change the firing semantics of an existing rule.
657 EnableDisableRule(Relation rel, const char *rulename,
660 Relation pg_rewrite_desc;
661 Oid owningRel = RelationGetRelid(rel);
662 Oid eventRelationOid;
664 bool changed = false;
667 * Find the rule tuple to change.
669 pg_rewrite_desc = heap_open(RewriteRelationId, RowExclusiveLock);
670 ruletup = SearchSysCacheCopy(RULERELNAME,
671 ObjectIdGetDatum(owningRel),
672 PointerGetDatum(rulename),
674 if (!HeapTupleIsValid(ruletup))
676 (errcode(ERRCODE_UNDEFINED_OBJECT),
677 errmsg("rule \"%s\" for relation \"%s\" does not exist",
678 rulename, get_rel_name(owningRel))));
681 * Verify that the user has appropriate permissions.
683 eventRelationOid = ((Form_pg_rewrite) GETSTRUCT(ruletup))->ev_class;
684 Assert(eventRelationOid == owningRel);
685 if (!pg_class_ownercheck(eventRelationOid, GetUserId()))
686 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
687 get_rel_name(eventRelationOid));
690 * Change ev_enabled if it is different from the desired new state.
692 if (DatumGetChar(((Form_pg_rewrite) GETSTRUCT(ruletup))->ev_enabled) !=
695 ((Form_pg_rewrite) GETSTRUCT(ruletup))->ev_enabled =
696 CharGetDatum(fires_when);
697 simple_heap_update(pg_rewrite_desc, &ruletup->t_self, ruletup);
699 /* keep system catalog indexes current */
700 CatalogUpdateIndexes(pg_rewrite_desc, ruletup);
705 heap_freetuple(ruletup);
706 heap_close(pg_rewrite_desc, RowExclusiveLock);
709 * If we changed anything, broadcast a SI inval message to force each
710 * backend (including our own!) to rebuild relation's relcache entry.
711 * Otherwise they will fail to apply the change promptly.
714 CacheInvalidateRelcache(rel);
719 * Rename an existing rewrite rule.
721 * This is unused code at the moment. Note that it lacks a permissions check.
725 RenameRewriteRule(Oid owningRel, const char *oldName,
728 Relation pg_rewrite_desc;
731 pg_rewrite_desc = heap_open(RewriteRelationId, RowExclusiveLock);
733 ruletup = SearchSysCacheCopy(RULERELNAME,
734 ObjectIdGetDatum(owningRel),
735 PointerGetDatum(oldName),
737 if (!HeapTupleIsValid(ruletup))
739 (errcode(ERRCODE_UNDEFINED_OBJECT),
740 errmsg("rule \"%s\" for relation \"%s\" does not exist",
741 oldName, get_rel_name(owningRel))));
743 /* should not already exist */
744 if (IsDefinedRewriteRule(owningRel, newName))
746 (errcode(ERRCODE_DUPLICATE_OBJECT),
747 errmsg("rule \"%s\" for relation \"%s\" already exists",
748 newName, get_rel_name(owningRel))));
750 namestrcpy(&(((Form_pg_rewrite) GETSTRUCT(ruletup))->rulename), newName);
752 simple_heap_update(pg_rewrite_desc, &ruletup->t_self, ruletup);
754 /* keep system catalog indexes current */
755 CatalogUpdateIndexes(pg_rewrite_desc, ruletup);
757 heap_freetuple(ruletup);
758 heap_close(pg_rewrite_desc, RowExclusiveLock);