]> granicus.if.org Git - postgresql/blob - src/backend/rewrite/rewriteDefine.c
Remove global variable scanCommandId in favor of storing a command ID
[postgresql] / src / backend / rewrite / rewriteDefine.c
1 /*-------------------------------------------------------------------------
2  *
3  * rewriteDefine.c
4  *        routines for defining a rewrite rule
5  *
6  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/rewrite/rewriteDefine.c,v 1.72 2002/05/21 22:05:55 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/heapam.h"
18 #include "catalog/catname.h"
19 #include "catalog/indexing.h"
20 #include "catalog/pg_rewrite.h"
21 #include "commands/view.h"
22 #include "miscadmin.h"
23 #include "optimizer/clauses.h"
24 #include "parser/parse_relation.h"
25 #include "rewrite/rewriteDefine.h"
26 #include "rewrite/rewriteManip.h"
27 #include "rewrite/rewriteSupport.h"
28 #include "storage/smgr.h"
29 #include "utils/acl.h"
30 #include "utils/builtins.h"
31 #include "utils/syscache.h"
32
33
34 static void setRuleCheckAsUser(Query *qry, Oid userid);
35 static bool setRuleCheckAsUser_walker(Node *node, Oid *context);
36
37
38 /*
39  * InsertRule -
40  *        takes the arguments and inserts them as a row into the system
41  *        relation "pg_rewrite"
42  */
43 static Oid
44 InsertRule(char *rulname,
45                    int evtype,
46                    Oid eventrel_oid,
47                    AttrNumber evslot_index,
48                    bool evinstead,
49                    char *evqual,
50                    char *actiontree)
51 {
52         int                     i;
53         Datum           values[Natts_pg_rewrite];
54         char            nulls[Natts_pg_rewrite];
55         NameData        rname;
56         Relation        pg_rewrite_desc;
57         TupleDesc       tupDesc;
58         HeapTuple       tup;
59         Oid                     rewriteObjectId;
60
61         if (IsDefinedRewriteRule(eventrel_oid, rulname))
62                 elog(ERROR, "Attempt to insert rule \"%s\" failed: already exists",
63                          rulname);
64
65         /*
66          * Set up *nulls and *values arrays
67          */
68         MemSet(nulls, ' ', sizeof(nulls));
69
70         i = 0;
71         namestrcpy(&rname, rulname);
72         values[i++] = NameGetDatum(&rname);                             /* rulename */
73         values[i++] = ObjectIdGetDatum(eventrel_oid);   /* ev_class */
74         values[i++] = Int16GetDatum(evslot_index);              /* ev_attr */
75         values[i++] = CharGetDatum(evtype + '0');               /* ev_type */
76         values[i++] = BoolGetDatum(evinstead);                  /* is_instead */
77         values[i++] = DirectFunctionCall1(textin, CStringGetDatum(evqual));     /* ev_qual */
78         values[i++] = DirectFunctionCall1(textin, CStringGetDatum(actiontree)); /* ev_action */
79
80         /*
81          * create a new pg_rewrite tuple
82          */
83         pg_rewrite_desc = heap_openr(RewriteRelationName, RowExclusiveLock);
84
85         tupDesc = pg_rewrite_desc->rd_att;
86
87         tup = heap_formtuple(tupDesc,
88                                                  values,
89                                                  nulls);
90
91         rewriteObjectId = simple_heap_insert(pg_rewrite_desc, tup);
92
93         if (RelationGetForm(pg_rewrite_desc)->relhasindex)
94         {
95                 Relation        idescs[Num_pg_rewrite_indices];
96
97                 CatalogOpenIndices(Num_pg_rewrite_indices, Name_pg_rewrite_indices,
98                                                    idescs);
99                 CatalogIndexInsert(idescs, Num_pg_rewrite_indices, pg_rewrite_desc,
100                                                    tup);
101                 CatalogCloseIndices(Num_pg_rewrite_indices, idescs);
102         }
103
104         heap_freetuple(tup);
105
106         heap_close(pg_rewrite_desc, RowExclusiveLock);
107
108         return rewriteObjectId;
109 }
110
111 void
112 DefineQueryRewrite(RuleStmt *stmt)
113 {
114         RangeVar   *event_obj = stmt->relation;
115         Node       *event_qual = stmt->whereClause;
116         CmdType         event_type = stmt->event;
117         bool            is_instead = stmt->instead;
118         List       *action = stmt->actions;
119         Relation        event_relation;
120         Oid                     ev_relid;
121         Oid                     ruleId;
122         int                     event_attno;
123         Oid                     event_attype;
124         char       *actionP,
125                            *event_qualP;
126         List       *l;
127         Query      *query;
128         AclResult       aclresult;
129         bool            RelisBecomingView = false;
130
131         /*
132          * If we are installing an ON SELECT rule, we had better grab
133          * AccessExclusiveLock to ensure no SELECTs are currently running on
134          * the event relation.  For other types of rules, it might be
135          * sufficient to grab ShareLock to lock out insert/update/delete
136          * actions.  But for now, let's just grab AccessExclusiveLock all the
137          * time.
138          */
139         event_relation = heap_openrv(event_obj, AccessExclusiveLock);
140         ev_relid = RelationGetRelid(event_relation);
141
142         /*
143          * Check user has permission to apply rules to this relation.
144          */
145         aclresult = pg_class_aclcheck(ev_relid, GetUserId(), ACL_RULE);
146         if (aclresult != ACLCHECK_OK)
147                 aclcheck_error(aclresult, RelationGetRelationName(event_relation));
148
149         /*
150          * No rule actions that modify OLD or NEW
151          */
152         foreach(l, action)
153         {
154                 query = (Query *) lfirst(l);
155                 if (query->resultRelation == 0)
156                         continue;
157                 /* Don't be fooled by INSERT/SELECT */
158                 if (query != getInsertSelectQuery(query, NULL))
159                         continue;
160                 if (query->resultRelation == PRS2_OLD_VARNO)
161                         elog(ERROR, "rule actions on OLD currently not supported"
162                                  "\n\tuse views or triggers instead");
163                 if (query->resultRelation == PRS2_NEW_VARNO)
164                         elog(ERROR, "rule actions on NEW currently not supported"
165                                  "\n\tuse triggers instead");
166         }
167
168         /*
169          * Rules ON SELECT are restricted to view definitions
170          */
171         if (event_type == CMD_SELECT)
172         {
173                 List       *tllist;
174                 int                     i;
175
176                 /*
177                  * So there cannot be INSTEAD NOTHING, ...
178                  */
179                 if (length(action) == 0)
180                 {
181                         elog(ERROR, "instead nothing rules on select currently not supported"
182                                  "\n\tuse views instead");
183                 }
184
185                 /*
186                  * ... there cannot be multiple actions, ...
187                  */
188                 if (length(action) > 1)
189                         elog(ERROR, "multiple action rules on select currently not supported");
190
191                 /*
192                  * ... the one action must be a SELECT, ...
193                  */
194                 query = (Query *) lfirst(action);
195                 if (!is_instead || query->commandType != CMD_SELECT)
196                         elog(ERROR, "only instead-select rules currently supported on select");
197
198                 /*
199                  * ... there can be no rule qual, ...
200                  */
201                 if (event_qual != NULL)
202                         elog(ERROR, "event qualifications not supported for rules on select");
203
204                 /*
205                  * ... the targetlist of the SELECT action must exactly match the
206                  * event relation, ...
207                  */
208                 i = 0;
209                 foreach(tllist, query->targetList)
210                 {
211                         TargetEntry *tle = (TargetEntry *) lfirst(tllist);
212                         Resdom     *resdom = tle->resdom;
213                         Form_pg_attribute attr;
214                         char       *attname;
215
216                         if (resdom->resjunk)
217                                 continue;
218                         i++;
219                         if (i > event_relation->rd_att->natts)
220                                 elog(ERROR, "select rule's target list has too many entries");
221
222                         attr = event_relation->rd_att->attrs[i - 1];
223                         attname = NameStr(attr->attname);
224
225                         if (strcmp(resdom->resname, attname) != 0)
226                                 elog(ERROR, "select rule's target entry %d has different column name from %s", i, attname);
227
228                         if (attr->atttypid != resdom->restype)
229                                 elog(ERROR, "select rule's target entry %d has different type from attribute %s", i, attname);
230
231                         /*
232                          * Allow typmods to be different only if one of them is -1,
233                          * ie, "unspecified".  This is necessary for cases like
234                          * "numeric", where the table will have a filled-in default
235                          * length but the select rule's expression will probably have
236                          * typmod = -1.
237                          */
238                         if (attr->atttypmod != resdom->restypmod &&
239                                 attr->atttypmod != -1 && resdom->restypmod != -1)
240                                 elog(ERROR, "select rule's target entry %d has different size from attribute %s", i, attname);
241                 }
242
243                 if (i != event_relation->rd_att->natts)
244                         elog(ERROR, "select rule's target list has too few entries");
245
246                 /*
247                  * ... there must not be another ON SELECT rule already ...
248                  */
249                 if (event_relation->rd_rules != NULL)
250                 {
251                         for (i = 0; i < event_relation->rd_rules->numLocks; i++)
252                         {
253                                 RewriteRule *rule;
254
255                                 rule = event_relation->rd_rules->rules[i];
256                                 if (rule->event == CMD_SELECT)
257                                         elog(ERROR, "\"%s\" is already a view",
258                                                  RelationGetRelationName(event_relation));
259                         }
260                 }
261
262                 /*
263                  * ... and finally the rule must be named _RETURN.
264                  */
265                 if (strcmp(stmt->rulename, ViewSelectRuleName) != 0)
266                 {
267                         /*
268                          * In versions before 7.3, the expected name was _RETviewname.
269                          * For backwards compatibility with old pg_dump output, accept
270                          * that and silently change it to _RETURN.  Since this is just
271                          * a quick backwards-compatibility hack, limit the number of
272                          * characters checked to a few less than NAMEDATALEN; this
273                          * saves having to worry about where a multibyte character might
274                          * have gotten truncated.
275                          */
276                         if (strncmp(stmt->rulename, "_RET", 4) != 0 ||
277                                 strncmp(stmt->rulename + 4, event_obj->relname,
278                                                 NAMEDATALEN - 4 - 4) != 0)
279                                 elog(ERROR, "view rule for \"%s\" must be named \"%s\"",
280                                          event_obj->relname, ViewSelectRuleName);
281                         stmt->rulename = pstrdup(ViewSelectRuleName);
282                 }
283
284                 /*
285                  * Are we converting a relation to a view?
286                  *
287                  * If so, check that the relation is empty because the storage for
288                  * the relation is going to be deleted.
289                  */
290                 if (event_relation->rd_rel->relkind != RELKIND_VIEW)
291                 {
292                         HeapScanDesc scanDesc;
293
294                         scanDesc = heap_beginscan(event_relation, SnapshotNow, 0, NULL);
295                         if (heap_getnext(scanDesc, ForwardScanDirection) != NULL)
296                                 elog(ERROR, "Relation \"%s\" is not empty. Cannot convert it to view",
297                                          event_obj->relname);
298                         heap_endscan(scanDesc);
299
300                         RelisBecomingView = true;
301                 }
302         }
303
304         /*
305          * This rule is allowed - prepare to install it.
306          */
307         event_attno = -1;
308         event_attype = InvalidOid;
309
310         /*
311          * We want the rule's table references to be checked as though by the
312          * rule owner, not the user referencing the rule.  Therefore, scan
313          * through the rule's rtables and set the checkAsUser field on all
314          * rtable entries.
315          */
316         foreach(l, action)
317         {
318                 query = (Query *) lfirst(l);
319                 setRuleCheckAsUser(query, GetUserId());
320         }
321
322         /* discard rule if it's null action and not INSTEAD; it's a no-op */
323         if (action != NIL || is_instead)
324         {
325                 event_qualP = nodeToString(event_qual);
326                 actionP = nodeToString(action);
327
328                 ruleId = InsertRule(stmt->rulename,
329                                                         event_type,
330                                                         ev_relid,
331                                                         event_attno,
332                                                         is_instead,
333                                                         event_qualP,
334                                                         actionP);
335
336                 /*
337                  * Set pg_class 'relhasrules' field TRUE for event relation. If
338                  * appropriate, also modify the 'relkind' field to show that the
339                  * relation is now a view.
340                  *
341                  * Important side effect: an SI notice is broadcast to force all
342                  * backends (including me!) to update relcache entries with the
343                  * new rule.
344                  */
345                 SetRelationRuleStatus(ev_relid, true, RelisBecomingView);
346         }
347
348         /*
349          * IF the relation is becoming a view, delete the storage files
350          * associated with it.  NB: we had better have AccessExclusiveLock to
351          * do this ...
352          */
353         if (RelisBecomingView)
354                 smgrunlink(DEFAULT_SMGR, event_relation);
355
356         /* Close rel, but keep lock till commit... */
357         heap_close(event_relation, NoLock);
358 }
359
360 /*
361  * setRuleCheckAsUser
362  *              Recursively scan a query and set the checkAsUser field to the
363  *              given userid in all rtable entries.
364  *
365  * Note: for a view (ON SELECT rule), the checkAsUser field of the *OLD*
366  * RTE entry will be overridden when the view rule is expanded, and the
367  * checkAsUser field of the *NEW* entry is irrelevant because that entry's
368  * checkFor bits will never be set.  However, for other types of rules it's
369  * important to set these fields to match the rule owner.  So we just set
370  * them always.
371  */
372 static void
373 setRuleCheckAsUser(Query *qry, Oid userid)
374 {
375         List       *l;
376
377         /* Set all the RTEs in this query node */
378         foreach(l, qry->rtable)
379         {
380                 RangeTblEntry *rte = (RangeTblEntry *) lfirst(l);
381
382                 if (rte->rtekind == RTE_SUBQUERY)
383                 {
384                         /* Recurse into subquery in FROM */
385                         setRuleCheckAsUser(rte->subquery, userid);
386                 }
387                 else
388                         rte->checkAsUser = userid;
389         }
390
391         /* If there are sublinks, search for them and process their RTEs */
392         if (qry->hasSubLinks)
393                 query_tree_walker(qry, setRuleCheckAsUser_walker, (void *) &userid,
394                                                   false /* already did the ones in rtable */ );
395 }
396
397 /*
398  * Expression-tree walker to find sublink queries
399  */
400 static bool
401 setRuleCheckAsUser_walker(Node *node, Oid *context)
402 {
403         if (node == NULL)
404                 return false;
405         if (IsA(node, Query))
406         {
407                 Query      *qry = (Query *) node;
408
409                 setRuleCheckAsUser(qry, *context);
410                 return false;
411         }
412         return expression_tree_walker(node, setRuleCheckAsUser_walker,
413                                                                   (void *) context);
414 }
415
416
417 /*
418  * Rename an existing rewrite rule.
419  *
420  * This is unused code at the moment.
421  */
422 void
423 RenameRewriteRule(Oid owningRel, const char *oldName,
424                                   const char *newName)
425 {
426         Relation        pg_rewrite_desc;
427         HeapTuple       ruletup;
428
429         pg_rewrite_desc = heap_openr(RewriteRelationName, RowExclusiveLock);
430
431         ruletup = SearchSysCacheCopy(RULERELNAME,
432                                                                  ObjectIdGetDatum(owningRel),
433                                                                  PointerGetDatum(oldName),
434                                                                  0, 0);
435         if (!HeapTupleIsValid(ruletup))
436                 elog(ERROR, "RenameRewriteRule: rule \"%s\" does not exist", oldName);
437
438         /* should not already exist */
439         if (IsDefinedRewriteRule(owningRel, newName))
440                 elog(ERROR, "Attempt to rename rule \"%s\" failed: \"%s\" already exists",
441                          oldName, newName);
442
443         namestrcpy(&(((Form_pg_rewrite) GETSTRUCT(ruletup))->rulename), newName);
444
445         simple_heap_update(pg_rewrite_desc, &ruletup->t_self, ruletup);
446
447         /* keep system catalog indices current */
448         if (RelationGetForm(pg_rewrite_desc)->relhasindex)
449         {
450                 Relation        idescs[Num_pg_rewrite_indices];
451
452                 CatalogOpenIndices(Num_pg_rewrite_indices, Name_pg_rewrite_indices,
453                                                    idescs);
454                 CatalogIndexInsert(idescs, Num_pg_rewrite_indices, pg_rewrite_desc,
455                                                    ruletup);
456                 CatalogCloseIndices(Num_pg_rewrite_indices, idescs);
457         }
458
459         heap_freetuple(ruletup);
460         heap_close(pg_rewrite_desc, RowExclusiveLock);
461 }