]> granicus.if.org Git - postgresql/blob - src/backend/rewrite/rewriteDefine.c
Fix initialization of fake LSN for unlogged relations
[postgresql] / src / backend / rewrite / rewriteDefine.c
1 /*-------------------------------------------------------------------------
2  *
3  * rewriteDefine.c
4  *        routines for defining a rewrite rule
5  *
6  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/rewrite/rewriteDefine.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/heapam.h"
18 #include "access/htup_details.h"
19 #include "access/multixact.h"
20 #include "access/tableam.h"
21 #include "access/transam.h"
22 #include "access/xact.h"
23 #include "catalog/catalog.h"
24 #include "catalog/dependency.h"
25 #include "catalog/heap.h"
26 #include "catalog/indexing.h"
27 #include "catalog/namespace.h"
28 #include "catalog/objectaccess.h"
29 #include "catalog/pg_rewrite.h"
30 #include "catalog/storage.h"
31 #include "commands/policy.h"
32 #include "miscadmin.h"
33 #include "nodes/nodeFuncs.h"
34 #include "parser/parse_utilcmd.h"
35 #include "rewrite/rewriteDefine.h"
36 #include "rewrite/rewriteManip.h"
37 #include "rewrite/rewriteSupport.h"
38 #include "utils/acl.h"
39 #include "utils/builtins.h"
40 #include "utils/inval.h"
41 #include "utils/lsyscache.h"
42 #include "utils/rel.h"
43 #include "utils/snapmgr.h"
44 #include "utils/syscache.h"
45
46
47 static void checkRuleResultList(List *targetList, TupleDesc resultDesc,
48                                                                 bool isSelect, bool requireColumnNameMatch);
49 static bool setRuleCheckAsUser_walker(Node *node, Oid *context);
50 static void setRuleCheckAsUser_Query(Query *qry, Oid userid);
51
52
53 /*
54  * InsertRule -
55  *        takes the arguments and inserts them as a row into the system
56  *        relation "pg_rewrite"
57  */
58 static Oid
59 InsertRule(const char *rulname,
60                    int evtype,
61                    Oid eventrel_oid,
62                    bool evinstead,
63                    Node *event_qual,
64                    List *action,
65                    bool replace)
66 {
67         char       *evqual = nodeToString(event_qual);
68         char       *actiontree = nodeToString((Node *) action);
69         Datum           values[Natts_pg_rewrite];
70         bool            nulls[Natts_pg_rewrite];
71         bool            replaces[Natts_pg_rewrite];
72         NameData        rname;
73         Relation        pg_rewrite_desc;
74         HeapTuple       tup,
75                                 oldtup;
76         Oid                     rewriteObjectId;
77         ObjectAddress myself,
78                                 referenced;
79         bool            is_update = false;
80
81         /*
82          * Set up *nulls and *values arrays
83          */
84         MemSet(nulls, false, sizeof(nulls));
85
86         namestrcpy(&rname, rulname);
87         values[Anum_pg_rewrite_rulename - 1] = NameGetDatum(&rname);
88         values[Anum_pg_rewrite_ev_class - 1] = ObjectIdGetDatum(eventrel_oid);
89         values[Anum_pg_rewrite_ev_type - 1] = CharGetDatum(evtype + '0');
90         values[Anum_pg_rewrite_ev_enabled - 1] = CharGetDatum(RULE_FIRES_ON_ORIGIN);
91         values[Anum_pg_rewrite_is_instead - 1] = BoolGetDatum(evinstead);
92         values[Anum_pg_rewrite_ev_qual - 1] = CStringGetTextDatum(evqual);
93         values[Anum_pg_rewrite_ev_action - 1] = CStringGetTextDatum(actiontree);
94
95         /*
96          * Ready to store new pg_rewrite tuple
97          */
98         pg_rewrite_desc = table_open(RewriteRelationId, RowExclusiveLock);
99
100         /*
101          * Check to see if we are replacing an existing tuple
102          */
103         oldtup = SearchSysCache2(RULERELNAME,
104                                                          ObjectIdGetDatum(eventrel_oid),
105                                                          PointerGetDatum(rulname));
106
107         if (HeapTupleIsValid(oldtup))
108         {
109                 if (!replace)
110                         ereport(ERROR,
111                                         (errcode(ERRCODE_DUPLICATE_OBJECT),
112                                          errmsg("rule \"%s\" for relation \"%s\" already exists",
113                                                         rulname, get_rel_name(eventrel_oid))));
114
115                 /*
116                  * When replacing, we don't need to replace every attribute
117                  */
118                 MemSet(replaces, false, sizeof(replaces));
119                 replaces[Anum_pg_rewrite_ev_type - 1] = true;
120                 replaces[Anum_pg_rewrite_is_instead - 1] = true;
121                 replaces[Anum_pg_rewrite_ev_qual - 1] = true;
122                 replaces[Anum_pg_rewrite_ev_action - 1] = true;
123
124                 tup = heap_modify_tuple(oldtup, RelationGetDescr(pg_rewrite_desc),
125                                                                 values, nulls, replaces);
126
127                 CatalogTupleUpdate(pg_rewrite_desc, &tup->t_self, tup);
128
129                 ReleaseSysCache(oldtup);
130
131                 rewriteObjectId = ((Form_pg_rewrite) GETSTRUCT(tup))->oid;
132                 is_update = true;
133         }
134         else
135         {
136                 rewriteObjectId = GetNewOidWithIndex(pg_rewrite_desc,
137                                                                                          RewriteOidIndexId,
138                                                                                          Anum_pg_rewrite_oid);
139                 values[Anum_pg_rewrite_oid - 1] = ObjectIdGetDatum(rewriteObjectId);
140
141                 tup = heap_form_tuple(pg_rewrite_desc->rd_att, values, nulls);
142
143                 CatalogTupleInsert(pg_rewrite_desc, tup);
144         }
145
146
147         heap_freetuple(tup);
148
149         /* If replacing, get rid of old dependencies and make new ones */
150         if (is_update)
151                 deleteDependencyRecordsFor(RewriteRelationId, rewriteObjectId, false);
152
153         /*
154          * Install dependency on rule's relation to ensure it will go away on
155          * relation deletion.  If the rule is ON SELECT, make the dependency
156          * implicit --- this prevents deleting a view's SELECT rule.  Other kinds
157          * of rules can be AUTO.
158          */
159         myself.classId = RewriteRelationId;
160         myself.objectId = rewriteObjectId;
161         myself.objectSubId = 0;
162
163         referenced.classId = RelationRelationId;
164         referenced.objectId = eventrel_oid;
165         referenced.objectSubId = 0;
166
167         recordDependencyOn(&myself, &referenced,
168                                            (evtype == CMD_SELECT) ? DEPENDENCY_INTERNAL : DEPENDENCY_AUTO);
169
170         /*
171          * Also install dependencies on objects referenced in action and qual.
172          */
173         recordDependencyOnExpr(&myself, (Node *) action, NIL,
174                                                    DEPENDENCY_NORMAL);
175
176         if (event_qual != NULL)
177         {
178                 /* Find query containing OLD/NEW rtable entries */
179                 Query      *qry = linitial_node(Query, action);
180
181                 qry = getInsertSelectQuery(qry, NULL);
182                 recordDependencyOnExpr(&myself, event_qual, qry->rtable,
183                                                            DEPENDENCY_NORMAL);
184         }
185
186         /* Post creation hook for new rule */
187         InvokeObjectPostCreateHook(RewriteRelationId, rewriteObjectId, 0);
188
189         table_close(pg_rewrite_desc, RowExclusiveLock);
190
191         return rewriteObjectId;
192 }
193
194 /*
195  * DefineRule
196  *              Execute a CREATE RULE command.
197  */
198 ObjectAddress
199 DefineRule(RuleStmt *stmt, const char *queryString)
200 {
201         List       *actions;
202         Node       *whereClause;
203         Oid                     relId;
204
205         /* Parse analysis. */
206         transformRuleStmt(stmt, queryString, &actions, &whereClause);
207
208         /*
209          * Find and lock the relation.  Lock level should match
210          * DefineQueryRewrite.
211          */
212         relId = RangeVarGetRelid(stmt->relation, AccessExclusiveLock, false);
213
214         /* ... and execute */
215         return DefineQueryRewrite(stmt->rulename,
216                                                           relId,
217                                                           whereClause,
218                                                           stmt->event,
219                                                           stmt->instead,
220                                                           stmt->replace,
221                                                           actions);
222 }
223
224
225 /*
226  * DefineQueryRewrite
227  *              Create a rule
228  *
229  * This is essentially the same as DefineRule() except that the rule's
230  * action and qual have already been passed through parse analysis.
231  */
232 ObjectAddress
233 DefineQueryRewrite(const char *rulename,
234                                    Oid event_relid,
235                                    Node *event_qual,
236                                    CmdType event_type,
237                                    bool is_instead,
238                                    bool replace,
239                                    List *action)
240 {
241         Relation        event_relation;
242         ListCell   *l;
243         Query      *query;
244         bool            RelisBecomingView = false;
245         Oid                     ruleId = InvalidOid;
246         ObjectAddress address;
247
248         /*
249          * If we are installing an ON SELECT rule, we had better grab
250          * AccessExclusiveLock to ensure no SELECTs are currently running on the
251          * event relation. For other types of rules, it would be sufficient to
252          * grab ShareRowExclusiveLock to lock out insert/update/delete actions and
253          * to ensure that we lock out current CREATE RULE statements; but because
254          * of race conditions in access to catalog entries, we can't do that yet.
255          *
256          * Note that this lock level should match the one used in DefineRule.
257          */
258         event_relation = table_open(event_relid, AccessExclusiveLock);
259
260         /*
261          * Verify relation is of a type that rules can sensibly be applied to.
262          * Internal callers can target materialized views, but transformRuleStmt()
263          * blocks them for users.  Don't mention them in the error message.
264          */
265         if (event_relation->rd_rel->relkind != RELKIND_RELATION &&
266                 event_relation->rd_rel->relkind != RELKIND_MATVIEW &&
267                 event_relation->rd_rel->relkind != RELKIND_VIEW &&
268                 event_relation->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
269                 ereport(ERROR,
270                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
271                                  errmsg("\"%s\" is not a table or view",
272                                                 RelationGetRelationName(event_relation))));
273
274         if (!allowSystemTableMods && IsSystemRelation(event_relation))
275                 ereport(ERROR,
276                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
277                                  errmsg("permission denied: \"%s\" is a system catalog",
278                                                 RelationGetRelationName(event_relation))));
279
280         /*
281          * Check user has permission to apply rules to this relation.
282          */
283         if (!pg_class_ownercheck(event_relid, GetUserId()))
284                 aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(event_relation->rd_rel->relkind),
285                                            RelationGetRelationName(event_relation));
286
287         /*
288          * No rule actions that modify OLD or NEW
289          */
290         foreach(l, action)
291         {
292                 query = lfirst_node(Query, l);
293                 if (query->resultRelation == 0)
294                         continue;
295                 /* Don't be fooled by INSERT/SELECT */
296                 if (query != getInsertSelectQuery(query, NULL))
297                         continue;
298                 if (query->resultRelation == PRS2_OLD_VARNO)
299                         ereport(ERROR,
300                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
301                                          errmsg("rule actions on OLD are not implemented"),
302                                          errhint("Use views or triggers instead.")));
303                 if (query->resultRelation == PRS2_NEW_VARNO)
304                         ereport(ERROR,
305                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
306                                          errmsg("rule actions on NEW are not implemented"),
307                                          errhint("Use triggers instead.")));
308         }
309
310         if (event_type == CMD_SELECT)
311         {
312                 /*
313                  * Rules ON SELECT are restricted to view definitions
314                  *
315                  * So there cannot be INSTEAD NOTHING, ...
316                  */
317                 if (list_length(action) == 0)
318                         ereport(ERROR,
319                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
320                                          errmsg("INSTEAD NOTHING rules on SELECT are not implemented"),
321                                          errhint("Use views instead.")));
322
323                 /*
324                  * ... there cannot be multiple actions, ...
325                  */
326                 if (list_length(action) > 1)
327                         ereport(ERROR,
328                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
329                                          errmsg("multiple actions for rules on SELECT are not implemented")));
330
331                 /*
332                  * ... the one action must be a SELECT, ...
333                  */
334                 query = linitial_node(Query, action);
335                 if (!is_instead ||
336                         query->commandType != CMD_SELECT)
337                         ereport(ERROR,
338                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
339                                          errmsg("rules on SELECT must have action INSTEAD SELECT")));
340
341                 /*
342                  * ... it cannot contain data-modifying WITH ...
343                  */
344                 if (query->hasModifyingCTE)
345                         ereport(ERROR,
346                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
347                                          errmsg("rules on SELECT must not contain data-modifying statements in WITH")));
348
349                 /*
350                  * ... there can be no rule qual, ...
351                  */
352                 if (event_qual != NULL)
353                         ereport(ERROR,
354                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
355                                          errmsg("event qualifications are not implemented for rules on SELECT")));
356
357                 /*
358                  * ... the targetlist of the SELECT action must exactly match the
359                  * event relation, ...
360                  */
361                 checkRuleResultList(query->targetList,
362                                                         RelationGetDescr(event_relation),
363                                                         true,
364                                                         event_relation->rd_rel->relkind !=
365                                                         RELKIND_MATVIEW);
366
367                 /*
368                  * ... there must not be another ON SELECT rule already ...
369                  */
370                 if (!replace && event_relation->rd_rules != NULL)
371                 {
372                         int                     i;
373
374                         for (i = 0; i < event_relation->rd_rules->numLocks; i++)
375                         {
376                                 RewriteRule *rule;
377
378                                 rule = event_relation->rd_rules->rules[i];
379                                 if (rule->event == CMD_SELECT)
380                                         ereport(ERROR,
381                                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
382                                                          errmsg("\"%s\" is already a view",
383                                                                         RelationGetRelationName(event_relation))));
384                         }
385                 }
386
387                 /*
388                  * ... and finally the rule must be named _RETURN.
389                  */
390                 if (strcmp(rulename, ViewSelectRuleName) != 0)
391                 {
392                         /*
393                          * In versions before 7.3, the expected name was _RETviewname. For
394                          * backwards compatibility with old pg_dump output, accept that
395                          * and silently change it to _RETURN.  Since this is just a quick
396                          * backwards-compatibility hack, limit the number of characters
397                          * checked to a few less than NAMEDATALEN; this saves having to
398                          * worry about where a multibyte character might have gotten
399                          * truncated.
400                          */
401                         if (strncmp(rulename, "_RET", 4) != 0 ||
402                                 strncmp(rulename + 4, RelationGetRelationName(event_relation),
403                                                 NAMEDATALEN - 4 - 4) != 0)
404                                 ereport(ERROR,
405                                                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
406                                                  errmsg("view rule for \"%s\" must be named \"%s\"",
407                                                                 RelationGetRelationName(event_relation),
408                                                                 ViewSelectRuleName)));
409                         rulename = pstrdup(ViewSelectRuleName);
410                 }
411
412                 /*
413                  * Are we converting a relation to a view?
414                  *
415                  * If so, check that the relation is empty because the storage for the
416                  * relation is going to be deleted.  Also insist that the rel not have
417                  * any triggers, indexes, child tables, policies, or RLS enabled.
418                  * (Note: these tests are too strict, because they will reject
419                  * relations that once had such but don't anymore.  But we don't
420                  * really care, because this whole business of converting relations to
421                  * views is just a kluge to allow dump/reload of views that
422                  * participate in circular dependencies.)
423                  */
424                 if (event_relation->rd_rel->relkind != RELKIND_VIEW &&
425                         event_relation->rd_rel->relkind != RELKIND_MATVIEW)
426                 {
427                         TableScanDesc scanDesc;
428                         Snapshot        snapshot;
429                         TupleTableSlot *slot;
430
431                         if (event_relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
432                                 ereport(ERROR,
433                                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
434                                                  errmsg("cannot convert partitioned table \"%s\" to a view",
435                                                                 RelationGetRelationName(event_relation))));
436
437                         if (event_relation->rd_rel->relispartition)
438                                 ereport(ERROR,
439                                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
440                                                  errmsg("cannot convert partition \"%s\" to a view",
441                                                                 RelationGetRelationName(event_relation))));
442
443                         snapshot = RegisterSnapshot(GetLatestSnapshot());
444                         scanDesc = table_beginscan(event_relation, snapshot, 0, NULL);
445                         slot = table_slot_create(event_relation, NULL);
446                         if (table_scan_getnextslot(scanDesc, ForwardScanDirection, slot))
447                                 ereport(ERROR,
448                                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
449                                                  errmsg("could not convert table \"%s\" to a view because it is not empty",
450                                                                 RelationGetRelationName(event_relation))));
451                         ExecDropSingleTupleTableSlot(slot);
452                         table_endscan(scanDesc);
453                         UnregisterSnapshot(snapshot);
454
455                         if (event_relation->rd_rel->relhastriggers)
456                                 ereport(ERROR,
457                                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
458                                                  errmsg("could not convert table \"%s\" to a view because it has triggers",
459                                                                 RelationGetRelationName(event_relation)),
460                                                  errhint("In particular, the table cannot be involved in any foreign key relationships.")));
461
462                         if (event_relation->rd_rel->relhasindex)
463                                 ereport(ERROR,
464                                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
465                                                  errmsg("could not convert table \"%s\" to a view because it has indexes",
466                                                                 RelationGetRelationName(event_relation))));
467
468                         if (event_relation->rd_rel->relhassubclass)
469                                 ereport(ERROR,
470                                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
471                                                  errmsg("could not convert table \"%s\" to a view because it has child tables",
472                                                                 RelationGetRelationName(event_relation))));
473
474                         if (event_relation->rd_rel->relrowsecurity)
475                                 ereport(ERROR,
476                                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
477                                                  errmsg("could not convert table \"%s\" to a view because it has row security enabled",
478                                                                 RelationGetRelationName(event_relation))));
479
480                         if (relation_has_policies(event_relation))
481                                 ereport(ERROR,
482                                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
483                                                  errmsg("could not convert table \"%s\" to a view because it has row security policies",
484                                                                 RelationGetRelationName(event_relation))));
485
486                         RelisBecomingView = true;
487                 }
488         }
489         else
490         {
491                 /*
492                  * For non-SELECT rules, a RETURNING list can appear in at most one of
493                  * the actions ... and there can't be any RETURNING list at all in a
494                  * conditional or non-INSTEAD rule.  (Actually, there can be at most
495                  * one RETURNING list across all rules on the same event, but it seems
496                  * best to enforce that at rule expansion time.)  If there is a
497                  * RETURNING list, it must match the event relation.
498                  */
499                 bool            haveReturning = false;
500
501                 foreach(l, action)
502                 {
503                         query = lfirst_node(Query, l);
504
505                         if (!query->returningList)
506                                 continue;
507                         if (haveReturning)
508                                 ereport(ERROR,
509                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
510                                                  errmsg("cannot have multiple RETURNING lists in a rule")));
511                         haveReturning = true;
512                         if (event_qual != NULL)
513                                 ereport(ERROR,
514                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
515                                                  errmsg("RETURNING lists are not supported in conditional rules")));
516                         if (!is_instead)
517                                 ereport(ERROR,
518                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
519                                                  errmsg("RETURNING lists are not supported in non-INSTEAD rules")));
520                         checkRuleResultList(query->returningList,
521                                                                 RelationGetDescr(event_relation),
522                                                                 false, false);
523                 }
524         }
525
526         /*
527          * This rule is allowed - prepare to install it.
528          */
529
530         /* discard rule if it's null action and not INSTEAD; it's a no-op */
531         if (action != NIL || is_instead)
532         {
533                 ruleId = InsertRule(rulename,
534                                                         event_type,
535                                                         event_relid,
536                                                         is_instead,
537                                                         event_qual,
538                                                         action,
539                                                         replace);
540
541                 /*
542                  * Set pg_class 'relhasrules' field true for event relation.
543                  *
544                  * Important side effect: an SI notice is broadcast to force all
545                  * backends (including me!) to update relcache entries with the new
546                  * rule.
547                  */
548                 SetRelationRuleStatus(event_relid, true);
549         }
550
551         /* ---------------------------------------------------------------------
552          * If the relation is becoming a view:
553          * - delete the associated storage files
554          * - get rid of any system attributes in pg_attribute; a view shouldn't
555          *       have any of those
556          * - remove the toast table; there is no need for it anymore, and its
557          *       presence would make vacuum slightly more complicated
558          * - set relkind to RELKIND_VIEW, and adjust other pg_class fields
559          *       to be appropriate for a view
560          *
561          * NB: we had better have AccessExclusiveLock to do this ...
562          * ---------------------------------------------------------------------
563          */
564         if (RelisBecomingView)
565         {
566                 Relation        relationRelation;
567                 Oid                     toastrelid;
568                 HeapTuple       classTup;
569                 Form_pg_class classForm;
570
571                 relationRelation = table_open(RelationRelationId, RowExclusiveLock);
572                 toastrelid = event_relation->rd_rel->reltoastrelid;
573
574                 /* drop storage while table still looks like a table  */
575                 RelationDropStorage(event_relation);
576                 DeleteSystemAttributeTuples(event_relid);
577
578                 /*
579                  * Drop the toast table if any.  (This won't take care of updating the
580                  * toast fields in the relation's own pg_class entry; we handle that
581                  * below.)
582                  */
583                 if (OidIsValid(toastrelid))
584                 {
585                         ObjectAddress toastobject;
586
587                         /*
588                          * Delete the dependency of the toast relation on the main
589                          * relation so we can drop the former without dropping the latter.
590                          */
591                         deleteDependencyRecordsFor(RelationRelationId, toastrelid,
592                                                                            false);
593
594                         /* Make deletion of dependency record visible */
595                         CommandCounterIncrement();
596
597                         /* Now drop toast table, including its index */
598                         toastobject.classId = RelationRelationId;
599                         toastobject.objectId = toastrelid;
600                         toastobject.objectSubId = 0;
601                         performDeletion(&toastobject, DROP_RESTRICT,
602                                                         PERFORM_DELETION_INTERNAL);
603                 }
604
605                 /*
606                  * SetRelationRuleStatus may have updated the pg_class row, so we must
607                  * advance the command counter before trying to update it again.
608                  */
609                 CommandCounterIncrement();
610
611                 /*
612                  * Fix pg_class entry to look like a normal view's, including setting
613                  * the correct relkind and removal of reltoastrelid of the toast table
614                  * we potentially removed above.
615                  */
616                 classTup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(event_relid));
617                 if (!HeapTupleIsValid(classTup))
618                         elog(ERROR, "cache lookup failed for relation %u", event_relid);
619                 classForm = (Form_pg_class) GETSTRUCT(classTup);
620
621                 classForm->relam = InvalidOid;
622                 classForm->reltablespace = InvalidOid;
623                 classForm->relpages = 0;
624                 classForm->reltuples = 0;
625                 classForm->relallvisible = 0;
626                 classForm->reltoastrelid = InvalidOid;
627                 classForm->relhasindex = false;
628                 classForm->relkind = RELKIND_VIEW;
629                 classForm->relfrozenxid = InvalidTransactionId;
630                 classForm->relminmxid = InvalidMultiXactId;
631                 classForm->relreplident = REPLICA_IDENTITY_NOTHING;
632
633                 CatalogTupleUpdate(relationRelation, &classTup->t_self, classTup);
634
635                 heap_freetuple(classTup);
636                 table_close(relationRelation, RowExclusiveLock);
637         }
638
639         ObjectAddressSet(address, RewriteRelationId, ruleId);
640
641         /* Close rel, but keep lock till commit... */
642         table_close(event_relation, NoLock);
643
644         return address;
645 }
646
647 /*
648  * checkRuleResultList
649  *              Verify that targetList produces output compatible with a tupledesc
650  *
651  * The targetList might be either a SELECT targetlist, or a RETURNING list;
652  * isSelect tells which.  This is used for choosing error messages.
653  *
654  * A SELECT targetlist may optionally require that column names match.
655  */
656 static void
657 checkRuleResultList(List *targetList, TupleDesc resultDesc, bool isSelect,
658                                         bool requireColumnNameMatch)
659 {
660         ListCell   *tllist;
661         int                     i;
662
663         /* Only a SELECT may require a column name match. */
664         Assert(isSelect || !requireColumnNameMatch);
665
666         i = 0;
667         foreach(tllist, targetList)
668         {
669                 TargetEntry *tle = (TargetEntry *) lfirst(tllist);
670                 Oid                     tletypid;
671                 int32           tletypmod;
672                 Form_pg_attribute attr;
673                 char       *attname;
674
675                 /* resjunk entries may be ignored */
676                 if (tle->resjunk)
677                         continue;
678                 i++;
679                 if (i > resultDesc->natts)
680                         ereport(ERROR,
681                                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
682                                          isSelect ?
683                                          errmsg("SELECT rule's target list has too many entries") :
684                                          errmsg("RETURNING list has too many entries")));
685
686                 attr = TupleDescAttr(resultDesc, i - 1);
687                 attname = NameStr(attr->attname);
688
689                 /*
690                  * Disallow dropped columns in the relation.  This is not really
691                  * expected to happen when creating an ON SELECT rule.  It'd be
692                  * possible if someone tried to convert a relation with dropped
693                  * columns to a view, but the only case we care about supporting
694                  * table-to-view conversion for is pg_dump, and pg_dump won't do that.
695                  *
696                  * Unfortunately, the situation is also possible when adding a rule
697                  * with RETURNING to a regular table, and rejecting that case is
698                  * altogether more annoying.  In principle we could support it by
699                  * modifying the targetlist to include dummy NULL columns
700                  * corresponding to the dropped columns in the tupdesc.  However,
701                  * places like ruleutils.c would have to be fixed to not process such
702                  * entries, and that would take an uncertain and possibly rather large
703                  * amount of work.  (Note we could not dodge that by marking the dummy
704                  * columns resjunk, since it's precisely the non-resjunk tlist columns
705                  * that are expected to correspond to table columns.)
706                  */
707                 if (attr->attisdropped)
708                         ereport(ERROR,
709                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
710                                          isSelect ?
711                                          errmsg("cannot convert relation containing dropped columns to view") :
712                                          errmsg("cannot create a RETURNING list for a relation containing dropped columns")));
713
714                 /* Check name match if required; no need for two error texts here */
715                 if (requireColumnNameMatch && strcmp(tle->resname, attname) != 0)
716                         ereport(ERROR,
717                                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
718                                          errmsg("SELECT rule's target entry %d has different column name from column \"%s\"",
719                                                         i, attname),
720                                          errdetail("SELECT target entry is named \"%s\".",
721                                                            tle->resname)));
722
723                 /* Check type match. */
724                 tletypid = exprType((Node *) tle->expr);
725                 if (attr->atttypid != tletypid)
726                         ereport(ERROR,
727                                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
728                                          isSelect ?
729                                          errmsg("SELECT rule's target entry %d has different type from column \"%s\"",
730                                                         i, attname) :
731                                          errmsg("RETURNING list's entry %d has different type from column \"%s\"",
732                                                         i, attname),
733                                          isSelect ?
734                                          errdetail("SELECT target entry has type %s, but column has type %s.",
735                                                            format_type_be(tletypid),
736                                                            format_type_be(attr->atttypid)) :
737                                          errdetail("RETURNING list entry has type %s, but column has type %s.",
738                                                            format_type_be(tletypid),
739                                                            format_type_be(attr->atttypid))));
740
741                 /*
742                  * Allow typmods to be different only if one of them is -1, ie,
743                  * "unspecified".  This is necessary for cases like "numeric", where
744                  * the table will have a filled-in default length but the select
745                  * rule's expression will probably have typmod = -1.
746                  */
747                 tletypmod = exprTypmod((Node *) tle->expr);
748                 if (attr->atttypmod != tletypmod &&
749                         attr->atttypmod != -1 && tletypmod != -1)
750                         ereport(ERROR,
751                                         (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
752                                          isSelect ?
753                                          errmsg("SELECT rule's target entry %d has different size from column \"%s\"",
754                                                         i, attname) :
755                                          errmsg("RETURNING list's entry %d has different size from column \"%s\"",
756                                                         i, attname),
757                                          isSelect ?
758                                          errdetail("SELECT target entry has type %s, but column has type %s.",
759                                                            format_type_with_typemod(tletypid, tletypmod),
760                                                            format_type_with_typemod(attr->atttypid,
761                                                                                                                 attr->atttypmod)) :
762                                          errdetail("RETURNING list entry has type %s, but column has type %s.",
763                                                            format_type_with_typemod(tletypid, tletypmod),
764                                                            format_type_with_typemod(attr->atttypid,
765                                                                                                                 attr->atttypmod))));
766         }
767
768         if (i != resultDesc->natts)
769                 ereport(ERROR,
770                                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
771                                  isSelect ?
772                                  errmsg("SELECT rule's target list has too few entries") :
773                                  errmsg("RETURNING list has too few entries")));
774 }
775
776 /*
777  * setRuleCheckAsUser
778  *              Recursively scan a query or expression tree and set the checkAsUser
779  *              field to the given userid in all rtable entries.
780  *
781  * Note: for a view (ON SELECT rule), the checkAsUser field of the OLD
782  * RTE entry will be overridden when the view rule is expanded, and the
783  * checkAsUser field of the NEW entry is irrelevant because that entry's
784  * requiredPerms bits will always be zero.  However, for other types of rules
785  * it's important to set these fields to match the rule owner.  So we just set
786  * them always.
787  */
788 void
789 setRuleCheckAsUser(Node *node, Oid userid)
790 {
791         (void) setRuleCheckAsUser_walker(node, &userid);
792 }
793
794 static bool
795 setRuleCheckAsUser_walker(Node *node, Oid *context)
796 {
797         if (node == NULL)
798                 return false;
799         if (IsA(node, Query))
800         {
801                 setRuleCheckAsUser_Query((Query *) node, *context);
802                 return false;
803         }
804         return expression_tree_walker(node, setRuleCheckAsUser_walker,
805                                                                   (void *) context);
806 }
807
808 static void
809 setRuleCheckAsUser_Query(Query *qry, Oid userid)
810 {
811         ListCell   *l;
812
813         /* Set all the RTEs in this query node */
814         foreach(l, qry->rtable)
815         {
816                 RangeTblEntry *rte = (RangeTblEntry *) lfirst(l);
817
818                 if (rte->rtekind == RTE_SUBQUERY)
819                 {
820                         /* Recurse into subquery in FROM */
821                         setRuleCheckAsUser_Query(rte->subquery, userid);
822                 }
823                 else
824                         rte->checkAsUser = userid;
825         }
826
827         /* Recurse into subquery-in-WITH */
828         foreach(l, qry->cteList)
829         {
830                 CommonTableExpr *cte = (CommonTableExpr *) lfirst(l);
831
832                 setRuleCheckAsUser_Query(castNode(Query, cte->ctequery), userid);
833         }
834
835         /* If there are sublinks, search for them and process their RTEs */
836         if (qry->hasSubLinks)
837                 query_tree_walker(qry, setRuleCheckAsUser_walker, (void *) &userid,
838                                                   QTW_IGNORE_RC_SUBQUERIES);
839 }
840
841
842 /*
843  * Change the firing semantics of an existing rule.
844  */
845 void
846 EnableDisableRule(Relation rel, const char *rulename,
847                                   char fires_when)
848 {
849         Relation        pg_rewrite_desc;
850         Oid                     owningRel = RelationGetRelid(rel);
851         Oid                     eventRelationOid;
852         HeapTuple       ruletup;
853         Form_pg_rewrite ruleform;
854         bool            changed = false;
855
856         /*
857          * Find the rule tuple to change.
858          */
859         pg_rewrite_desc = table_open(RewriteRelationId, RowExclusiveLock);
860         ruletup = SearchSysCacheCopy2(RULERELNAME,
861                                                                   ObjectIdGetDatum(owningRel),
862                                                                   PointerGetDatum(rulename));
863         if (!HeapTupleIsValid(ruletup))
864                 ereport(ERROR,
865                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
866                                  errmsg("rule \"%s\" for relation \"%s\" does not exist",
867                                                 rulename, get_rel_name(owningRel))));
868
869         ruleform = (Form_pg_rewrite) GETSTRUCT(ruletup);
870
871         /*
872          * Verify that the user has appropriate permissions.
873          */
874         eventRelationOid = ruleform->ev_class;
875         Assert(eventRelationOid == owningRel);
876         if (!pg_class_ownercheck(eventRelationOid, GetUserId()))
877                 aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(get_rel_relkind(eventRelationOid)),
878                                            get_rel_name(eventRelationOid));
879
880         /*
881          * Change ev_enabled if it is different from the desired new state.
882          */
883         if (DatumGetChar(ruleform->ev_enabled) !=
884                 fires_when)
885         {
886                 ruleform->ev_enabled = CharGetDatum(fires_when);
887                 CatalogTupleUpdate(pg_rewrite_desc, &ruletup->t_self, ruletup);
888
889                 changed = true;
890         }
891
892         InvokeObjectPostAlterHook(RewriteRelationId, ruleform->oid, 0);
893
894         heap_freetuple(ruletup);
895         table_close(pg_rewrite_desc, RowExclusiveLock);
896
897         /*
898          * If we changed anything, broadcast a SI inval message to force each
899          * backend (including our own!) to rebuild relation's relcache entry.
900          * Otherwise they will fail to apply the change promptly.
901          */
902         if (changed)
903                 CacheInvalidateRelcache(rel);
904 }
905
906
907 /*
908  * Perform permissions and integrity checks before acquiring a relation lock.
909  */
910 static void
911 RangeVarCallbackForRenameRule(const RangeVar *rv, Oid relid, Oid oldrelid,
912                                                           void *arg)
913 {
914         HeapTuple       tuple;
915         Form_pg_class form;
916
917         tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
918         if (!HeapTupleIsValid(tuple))
919                 return;                                 /* concurrently dropped */
920         form = (Form_pg_class) GETSTRUCT(tuple);
921
922         /* only tables and views can have rules */
923         if (form->relkind != RELKIND_RELATION &&
924                 form->relkind != RELKIND_VIEW &&
925                 form->relkind != RELKIND_PARTITIONED_TABLE)
926                 ereport(ERROR,
927                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
928                                  errmsg("\"%s\" is not a table or view", rv->relname)));
929
930         if (!allowSystemTableMods && IsSystemClass(relid, form))
931                 ereport(ERROR,
932                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
933                                  errmsg("permission denied: \"%s\" is a system catalog",
934                                                 rv->relname)));
935
936         /* you must own the table to rename one of its rules */
937         if (!pg_class_ownercheck(relid, GetUserId()))
938                 aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(get_rel_relkind(relid)), rv->relname);
939
940         ReleaseSysCache(tuple);
941 }
942
943 /*
944  * Rename an existing rewrite rule.
945  */
946 ObjectAddress
947 RenameRewriteRule(RangeVar *relation, const char *oldName,
948                                   const char *newName)
949 {
950         Oid                     relid;
951         Relation        targetrel;
952         Relation        pg_rewrite_desc;
953         HeapTuple       ruletup;
954         Form_pg_rewrite ruleform;
955         Oid                     ruleOid;
956         ObjectAddress address;
957
958         /*
959          * Look up name, check permissions, and acquire lock (which we will NOT
960          * release until end of transaction).
961          */
962         relid = RangeVarGetRelidExtended(relation, AccessExclusiveLock,
963                                                                          0,
964                                                                          RangeVarCallbackForRenameRule,
965                                                                          NULL);
966
967         /* Have lock already, so just need to build relcache entry. */
968         targetrel = relation_open(relid, NoLock);
969
970         /* Prepare to modify pg_rewrite */
971         pg_rewrite_desc = table_open(RewriteRelationId, RowExclusiveLock);
972
973         /* Fetch the rule's entry (it had better exist) */
974         ruletup = SearchSysCacheCopy2(RULERELNAME,
975                                                                   ObjectIdGetDatum(relid),
976                                                                   PointerGetDatum(oldName));
977         if (!HeapTupleIsValid(ruletup))
978                 ereport(ERROR,
979                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
980                                  errmsg("rule \"%s\" for relation \"%s\" does not exist",
981                                                 oldName, RelationGetRelationName(targetrel))));
982         ruleform = (Form_pg_rewrite) GETSTRUCT(ruletup);
983         ruleOid = ruleform->oid;
984
985         /* rule with the new name should not already exist */
986         if (IsDefinedRewriteRule(relid, newName))
987                 ereport(ERROR,
988                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
989                                  errmsg("rule \"%s\" for relation \"%s\" already exists",
990                                                 newName, RelationGetRelationName(targetrel))));
991
992         /*
993          * We disallow renaming ON SELECT rules, because they should always be
994          * named "_RETURN".
995          */
996         if (ruleform->ev_type == CMD_SELECT + '0')
997                 ereport(ERROR,
998                                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
999                                  errmsg("renaming an ON SELECT rule is not allowed")));
1000
1001         /* OK, do the update */
1002         namestrcpy(&(ruleform->rulename), newName);
1003
1004         CatalogTupleUpdate(pg_rewrite_desc, &ruletup->t_self, ruletup);
1005
1006         heap_freetuple(ruletup);
1007         table_close(pg_rewrite_desc, RowExclusiveLock);
1008
1009         /*
1010          * Invalidate relation's relcache entry so that other backends (and this
1011          * one too!) are sent SI message to make them rebuild relcache entries.
1012          * (Ideally this should happen automatically...)
1013          */
1014         CacheInvalidateRelcache(targetrel);
1015
1016         ObjectAddressSet(address, RewriteRelationId, ruleOid);
1017
1018         /*
1019          * Close rel, but keep exclusive lock!
1020          */
1021         relation_close(targetrel, NoLock);
1022
1023         return address;
1024 }