]> granicus.if.org Git - postgresql/commitdiff
Make security barrier views automatically updatable
authorStephen Frost <sfrost@snowman.net>
Sun, 13 Apr 2014 01:04:58 +0000 (21:04 -0400)
committerStephen Frost <sfrost@snowman.net>
Sun, 13 Apr 2014 01:04:58 +0000 (21:04 -0400)
Views which are marked as security_barrier must have their quals
applied before any user-defined quals are called, to prevent
user-defined functions from being able to see rows which the
security barrier view is intended to prevent them from seeing.

Remove the restriction on security barrier views being automatically
updatable by adding a new securityQuals list to the RTE structure
which keeps track of the quals from security barrier views at each
level, independently of the user-supplied quals.  When RTEs are
later discovered which have securityQuals populated, they are turned
into subquery RTEs which are marked as security_barrier to prevent
any user-supplied quals being pushed down (modulo LEAKPROOF quals).

Dean Rasheed, reviewed by Craig Ringer, Simon Riggs, KaiGai Kohei

19 files changed:
doc/src/sgml/ref/create_view.sgml
src/backend/commands/tablecmds.c
src/backend/commands/view.c
src/backend/nodes/copyfuncs.c
src/backend/nodes/equalfuncs.c
src/backend/nodes/nodeFuncs.c
src/backend/nodes/outfuncs.c
src/backend/nodes/readfuncs.c
src/backend/optimizer/plan/planner.c
src/backend/optimizer/prep/Makefile
src/backend/optimizer/prep/prepsecurity.c [new file with mode: 0644]
src/backend/optimizer/prep/prepunion.c
src/backend/rewrite/rewriteHandler.c
src/include/nodes/parsenodes.h
src/include/optimizer/prep.h
src/include/rewrite/rewriteHandler.h
src/test/regress/expected/create_view.out
src/test/regress/expected/updatable_views.out
src/test/regress/sql/updatable_views.sql

index 57bfae3084964b54e36346aa1ebee3b32ce91506..a13f1cbde35defb0ed5cd7247f3ac4a089ccf42b 100644 (file)
@@ -323,12 +323,6 @@ CREATE VIEW vista AS SELECT text 'Hello World' AS hello;
        or set-returning functions.
       </para>
      </listitem>
-
-     <listitem>
-      <para>
-       The view must not have the <literal>security_barrier</> property.
-      </para>
-     </listitem>
     </itemizedlist>
    </para>
 
@@ -361,6 +355,19 @@ CREATE VIEW vista AS SELECT text 'Hello World' AS hello;
     such rows that are not visible through the view.
    </para>
 
+   <para>
+    If an automatically updatable view is marked with the
+    <literal>security_barrier</> property then all the view's <literal>WHERE</>
+    conditions (and any conditions using operators which are marked as LEAKPROOF)
+    will always be evaluated before any conditions that a user of the view has
+    added.   See <xref linkend="rules-privileges"> for full details.  Note that,
+    due to this, rows which are not ultimately returned (because they do not
+    pass the user's <literal>WHERE</> conditions) may still end up being locked.
+    <command>EXPLAIN</command> can be used to see which conditions are
+    applied at the relation level (and therefore do not lock rows) and which are
+    not.
+   </para>
+
    <para>
     A more complex view that does not satisfy all these conditions is
     read-only by default: the system will not allow an insert, update, or
index 8f9e5e56079b9ad2e6a2228790233cb100ba4a3e..f5ae98ff80e99d407f7e9158cb867c54997e5bf7 100644 (file)
@@ -8910,7 +8910,6 @@ ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation,
                List       *view_options = untransformRelOptions(newOptions);
                ListCell   *cell;
                bool            check_option = false;
-               bool            security_barrier = false;
 
                foreach(cell, view_options)
                {
@@ -8918,8 +8917,6 @@ ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation,
 
                        if (pg_strcasecmp(defel->defname, "check_option") == 0)
                                check_option = true;
-                       if (pg_strcasecmp(defel->defname, "security_barrier") == 0)
-                               security_barrier = defGetBoolean(defel);
                }
 
                /*
@@ -8929,8 +8926,7 @@ ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation,
                if (check_option)
                {
                        const char *view_updatable_error =
-                               view_query_is_auto_updatable(view_query,
-                                                                                        security_barrier, true);
+                               view_query_is_auto_updatable(view_query, true);
 
                        if (view_updatable_error)
                                ereport(ERROR,
index 173576217a733728f872aab37fd88ca4c08929e4..bc085666fbdd3305f832e5513ea966e4d83ffbd9 100644 (file)
@@ -396,7 +396,6 @@ DefineView(ViewStmt *stmt, const char *queryString)
        RangeVar   *view;
        ListCell   *cell;
        bool            check_option;
-       bool            security_barrier;
 
        /*
         * Run parse analysis to convert the raw parse tree to a Query.  Note this
@@ -451,7 +450,6 @@ DefineView(ViewStmt *stmt, const char *queryString)
         * specified.
         */
        check_option = false;
-       security_barrier = false;
 
        foreach(cell, stmt->options)
        {
@@ -459,8 +457,6 @@ DefineView(ViewStmt *stmt, const char *queryString)
 
                if (pg_strcasecmp(defel->defname, "check_option") == 0)
                        check_option = true;
-               if (pg_strcasecmp(defel->defname, "security_barrier") == 0)
-                       security_barrier = defGetBoolean(defel);
        }
 
        /*
@@ -470,7 +466,7 @@ DefineView(ViewStmt *stmt, const char *queryString)
        if (check_option)
        {
                const char *view_updatable_error =
-                       view_query_is_auto_updatable(viewParse, security_barrier, true);
+                       view_query_is_auto_updatable(viewParse, true);
 
                if (view_updatable_error)
                        ereport(ERROR,
index c89d8083c67a55eb4783e9cb16b3faf89bd90d30..98ad91078ed4b3ca69d6c6a9253e97d1d87ee9b4 100644 (file)
@@ -1998,6 +1998,7 @@ _copyRangeTblEntry(const RangeTblEntry *from)
        COPY_SCALAR_FIELD(checkAsUser);
        COPY_BITMAPSET_FIELD(selectedCols);
        COPY_BITMAPSET_FIELD(modifiedCols);
+       COPY_NODE_FIELD(securityQuals);
 
        return newnode;
 }
index 9793cf52a8c2b0f1ab638754c64ea10412a940da..9901d231cdb80b282c91860e66955ca67055300a 100644 (file)
@@ -2296,6 +2296,7 @@ _equalRangeTblEntry(const RangeTblEntry *a, const RangeTblEntry *b)
        COMPARE_SCALAR_FIELD(checkAsUser);
        COMPARE_BITMAPSET_FIELD(selectedCols);
        COMPARE_BITMAPSET_FIELD(modifiedCols);
+       COMPARE_NODE_FIELD(securityQuals);
 
        return true;
 }
index 123f2a6a3c7cbc3b1bc5e003b03154ead9fb992d..1e48a7f8890e9de8bb68df9152fb304e40d7972d 100644 (file)
@@ -2020,6 +2020,9 @@ range_table_walker(List *rtable,
                                        return true;
                                break;
                }
+
+               if (walker(rte->securityQuals, context))
+                       return true;
        }
        return false;
 }
@@ -2755,6 +2758,7 @@ range_table_mutator(List *rtable,
                                MUTATE(newrte->values_lists, rte->values_lists, List *);
                                break;
                }
+               MUTATE(newrte->securityQuals, rte->securityQuals, List *);
                newrt = lappend(newrt, newrte);
        }
        return newrt;
index bfb4b9fb03d16872e2f8f05a6602553e78859437..10e81391b13612de6e0e70102c046664aefe2cc9 100644 (file)
@@ -2409,6 +2409,7 @@ _outRangeTblEntry(StringInfo str, const RangeTblEntry *node)
        WRITE_OID_FIELD(checkAsUser);
        WRITE_BITMAPSET_FIELD(selectedCols);
        WRITE_BITMAPSET_FIELD(modifiedCols);
+       WRITE_NODE_FIELD(securityQuals);
 }
 
 static void
index 216d75e8d1a453520c4a1e6b3e27be6766c0d517..ef1eae91bf7108aa2b2c06260f1d13ed7a15c956 100644 (file)
@@ -1252,6 +1252,7 @@ _readRangeTblEntry(void)
        READ_OID_FIELD(checkAsUser);
        READ_BITMAPSET_FIELD(selectedCols);
        READ_BITMAPSET_FIELD(modifiedCols);
+       READ_NODE_FIELD(securityQuals);
 
        READ_DONE();
 }
index 35bda67b1f1c3988cf48b2455f1d72b000535d1b..0508d16902be68270616e88277e00a5569fcaf03 100644 (file)
@@ -915,6 +915,12 @@ inheritance_planner(PlannerInfo *root)
                /* Generate plan */
                subplan = grouping_planner(&subroot, 0.0 /* retrieve all tuples */ );
 
+               /*
+                * Planning may have modified the query result relation (if there
+                * were security barrier quals on the result RTE).
+                */
+               appinfo->child_relid = subroot.parse->resultRelation;
+
                /*
                 * If this child rel was excluded by constraint exclusion, exclude it
                 * from the result plan.
@@ -932,9 +938,40 @@ inheritance_planner(PlannerInfo *root)
                if (final_rtable == NIL)
                        final_rtable = subroot.parse->rtable;
                else
-                       final_rtable = list_concat(final_rtable,
+               {
+                       List       *tmp_rtable = NIL;
+                       ListCell   *cell1, *cell2;
+
+                       /*
+                        * Check to see if any of the original RTEs were turned into
+                        * subqueries during planning.  Currently, this should only ever
+                        * happen due to securityQuals being involved which push a
+                        * relation down under a subquery, to ensure that the security
+                        * barrier quals are evaluated first.
+                        *
+                        * When this happens, we want to use the new subqueries in the
+                        * final rtable.
+                        */
+                       forboth(cell1, final_rtable, cell2, subroot.parse->rtable)
+                       {
+                               RangeTblEntry *rte1 = (RangeTblEntry *) lfirst(cell1);
+                               RangeTblEntry *rte2 = (RangeTblEntry *) lfirst(cell2);
+
+                               if (rte1->rtekind == RTE_RELATION &&
+                                       rte2->rtekind == RTE_SUBQUERY)
+                               {
+                                       /* Should only be when there are securityQuals today */
+                                       Assert(rte1->securityQuals != NIL);
+                                       tmp_rtable = lappend(tmp_rtable, rte2);
+                               }
+                               else
+                                       tmp_rtable = lappend(tmp_rtable, rte1);
+                       }
+
+                       final_rtable = list_concat(tmp_rtable,
                                                                           list_copy_tail(subroot.parse->rtable,
                                                                                                 list_length(final_rtable)));
+               }
 
                /*
                 * We need to collect all the RelOptInfos from all child plans into
@@ -1162,6 +1199,12 @@ grouping_planner(PlannerInfo *root, double tuple_fraction)
                /* Preprocess targetlist */
                tlist = preprocess_targetlist(root, tlist);
 
+               /*
+                * Expand any rangetable entries that have security barrier quals.
+                * This may add new security barrier subquery RTEs to the rangetable.
+                */
+               expand_security_quals(root, tlist);
+
                /*
                 * Locate any window functions in the tlist.  (We don't need to look
                 * anywhere else, since expressions used in ORDER BY will be in there
index 86301bfbd32a706788815477eaa8e710d045361b..5195d9b0ba7aad112d76545a21b8c2e16db1b574 100644 (file)
@@ -12,6 +12,6 @@ subdir = src/backend/optimizer/prep
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = prepjointree.o prepqual.o preptlist.o prepunion.o
+OBJS = prepjointree.o prepqual.o prepsecurity.o preptlist.o prepunion.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/optimizer/prep/prepsecurity.c b/src/backend/optimizer/prep/prepsecurity.c
new file mode 100644 (file)
index 0000000..7daaa33
--- /dev/null
@@ -0,0 +1,466 @@
+/*-------------------------------------------------------------------------
+ *
+ * prepsecurity.c
+ *       Routines for preprocessing security barrier quals.
+ *
+ * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *       src/backend/optimizer/prep/prepsecurity.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/sysattr.h"
+#include "catalog/heap.h"
+#include "nodes/makefuncs.h"
+#include "nodes/nodeFuncs.h"
+#include "optimizer/prep.h"
+#include "parser/analyze.h"
+#include "parser/parsetree.h"
+#include "rewrite/rewriteManip.h"
+#include "utils/rel.h"
+
+
+typedef struct
+{
+       int                     rt_index;               /* Index of security barrier RTE */
+       int                     sublevels_up;   /* Current nesting depth */
+       Relation        rel;                    /* RTE relation at rt_index */
+       List       *targetlist;         /* Targetlist for new subquery RTE */
+       List       *colnames;           /* Column names in subquery RTE */
+       List       *vars_processed;     /* List of Vars already processed */
+} security_barrier_replace_vars_context;
+
+static void expand_security_qual(PlannerInfo *root, List *tlist, int rt_index,
+                                        RangeTblEntry *rte, Node *qual);
+
+static void security_barrier_replace_vars(Node *node,
+                                                         security_barrier_replace_vars_context *context);
+
+static bool security_barrier_replace_vars_walker(Node *node,
+                                                                        security_barrier_replace_vars_context *context);
+
+
+/*
+ * expand_security_quals -
+ *       expands any security barrier quals on RTEs in the query rtable, turning
+ *       them into security barrier subqueries.
+ *
+ * Any given RTE may have multiple security barrier quals in a list, from which
+ * we create a set of nested subqueries to isolate each security barrier from
+ * the others, providing protection against malicious user-defined security
+ * barriers.  The first security barrier qual in the list will be used in the
+ * innermost subquery.
+ */
+void
+expand_security_quals(PlannerInfo *root, List *tlist)
+{
+       Query      *parse = root->parse;
+       int                     rt_index;
+       ListCell   *cell;
+
+       /*
+        * Process each RTE in the rtable list.
+        *
+        * We only ever modify entries in place and append to the rtable, so it is
+        * safe to use a foreach loop here.
+        */
+       rt_index = 0;
+       foreach(cell, parse->rtable)
+       {
+               RangeTblEntry *rte = (RangeTblEntry *) lfirst(cell);
+
+               rt_index++;
+
+               if (rte->securityQuals == NIL)
+                       continue;
+
+               /*
+                * Ignore any RTEs that aren't used in the query (such RTEs may be
+                * present for permissions checks).
+                */
+               if (rt_index != parse->resultRelation &&
+                       !rangeTableEntry_used((Node *) parse, rt_index, 0))
+                       continue;
+
+               /*
+                * If this RTE is the target then we need to make a copy of it before
+                * expanding it.  The unexpanded copy will become the new target, and
+                * the original RTE will be expanded to become the source of rows to
+                * update/delete.
+                */
+               if (rt_index == parse->resultRelation)
+               {
+                       RangeTblEntry *newrte = copyObject(rte);
+                       parse->rtable = lappend(parse->rtable, newrte);
+                       parse->resultRelation = list_length(parse->rtable);
+
+                       /*
+                        * Wipe out any copied security barrier quals on the new target to
+                        * prevent infinite recursion.
+                        */
+                       newrte->securityQuals = NIL;
+
+                       /*
+                        * There's no need to do permissions checks twice, so wipe out the
+                        * permissions info for the original RTE (we prefer to keep the
+                        * bits set on the result RTE).
+                        */
+                       rte->requiredPerms = 0;
+                       rte->checkAsUser = InvalidOid;
+                       rte->selectedCols = NULL;
+                       rte->modifiedCols = NULL;
+
+                       /*
+                        * For the most part, Vars referencing the original relation should
+                        * remain as they are, meaning that they pull OLD values from the
+                        * expanded RTE.  But in the RETURNING list and in any WITH CHECK
+                        * OPTION quals, we want such Vars to represent NEW values, so
+                        * change them to reference the new RTE.
+                        */
+                       ChangeVarNodes((Node *) parse->returningList, rt_index,
+                                                  parse->resultRelation, 0);
+
+                       ChangeVarNodes((Node *) parse->withCheckOptions, rt_index,
+                                                  parse->resultRelation, 0);
+               }
+
+               /*
+                * Process each security barrier qual in turn, starting with the
+                * innermost one (the first in the list) and working outwards.
+                *
+                * We remove each qual from the list before processing it, so that its
+                * variables aren't modified by expand_security_qual.  Also we don't
+                * necessarily want the attributes referred to by the qual to be
+                * exposed by the newly built subquery.
+                */
+               while (rte->securityQuals != NIL)
+               {
+                       Node   *qual = (Node *) linitial(rte->securityQuals);
+                       rte->securityQuals = list_delete_first(rte->securityQuals);
+
+                       ChangeVarNodes(qual, rt_index, 1, 0);
+                       expand_security_qual(root, tlist, rt_index, rte, qual);
+               }
+       }
+}
+
+
+/*
+ * expand_security_qual -
+ *       expand the specified security barrier qual on a query RTE, turning the
+ *       RTE into a security barrier subquery.
+ */
+static void
+expand_security_qual(PlannerInfo *root, List *tlist, int rt_index,
+                                        RangeTblEntry *rte, Node *qual)
+{
+       Query              *parse = root->parse;
+       Oid                             relid = rte->relid;
+       Query              *subquery;
+       RangeTblEntry  *subrte;
+       RangeTblRef        *subrtr;
+       PlanRowMark        *rc;
+       security_barrier_replace_vars_context context;
+       ListCell           *cell;
+
+       /*
+        * There should only be 2 possible cases:
+        *
+        * 1. A relation RTE, which we turn into a subquery RTE containing all
+        * referenced columns.
+        *
+        * 2. A subquery RTE (either from a prior call to this function or from an
+        * expanded view).  In this case we build a new subquery on top of it to
+        * isolate this security barrier qual from any other quals.
+        */
+       switch (rte->rtekind)
+       {
+               case RTE_RELATION:
+                       /*
+                        * Turn the relation RTE into a security barrier subquery RTE,
+                        * moving all permissions checks down into the subquery.
+                        */
+                       subquery = makeNode(Query);
+                       subquery->commandType = CMD_SELECT;
+                       subquery->querySource = QSRC_INSTEAD_RULE;
+
+                       subrte = copyObject(rte);
+                       subrte->inFromCl = true;
+                       subrte->securityQuals = NIL;
+                       subquery->rtable = list_make1(subrte);
+
+                       subrtr = makeNode(RangeTblRef);
+                       subrtr->rtindex = 1;
+                       subquery->jointree = makeFromExpr(list_make1(subrtr), qual);
+                       subquery->hasSubLinks = checkExprHasSubLink(qual);
+
+                       rte->rtekind = RTE_SUBQUERY;
+                       rte->relid = InvalidOid;
+                       rte->subquery = subquery;
+                       rte->security_barrier = true;
+                       rte->inh = false;                       /* must not be set for a subquery */
+
+                       /* the permissions checks have now been moved down */
+                       rte->requiredPerms = 0;
+                       rte->checkAsUser = InvalidOid;
+                       rte->selectedCols = NULL;
+                       rte->modifiedCols = NULL;
+
+                       /*
+                        * Now deal with any PlanRowMark on this RTE by requesting a lock
+                        * of the same strength on the RTE copied down to the subquery.
+                        *
+                        * Note that we can't push the user-defined quals down since they
+                        * may included untrusted functions and that means that we will
+                        * end up locking all rows which pass the securityQuals, even if
+                        * those rows don't pass the user-defined quals.  This is currently
+                        * documented behavior, but it'd be nice to come up with a better
+                        * solution some day.
+                        */
+                       rc = get_plan_rowmark(root->rowMarks, rt_index);
+                       if (rc != NULL)
+                       {
+                               switch (rc->markType)
+                               {
+                                       case ROW_MARK_EXCLUSIVE:
+                                               applyLockingClause(subquery, 1, LCS_FORUPDATE,
+                                                                                  rc->noWait, false);
+                                               break;
+                                       case ROW_MARK_NOKEYEXCLUSIVE:
+                                               applyLockingClause(subquery, 1, LCS_FORNOKEYUPDATE,
+                                                                                  rc->noWait, false);
+                                               break;
+                                       case ROW_MARK_SHARE:
+                                               applyLockingClause(subquery, 1, LCS_FORSHARE,
+                                                                                  rc->noWait, false);
+                                               break;
+                                       case ROW_MARK_KEYSHARE:
+                                               applyLockingClause(subquery, 1, LCS_FORKEYSHARE,
+                                                                                  rc->noWait, false);
+                                               break;
+                                       case ROW_MARK_REFERENCE:
+                                       case ROW_MARK_COPY:
+                                               /* No locking needed */
+                                               break;
+                               }
+                               root->rowMarks = list_delete(root->rowMarks, rc);
+                       }
+
+                       /*
+                        * Replace any variables in the outer query that refer to the
+                        * original relation RTE with references to columns that we will
+                        * expose in the new subquery, building the subquery's targetlist
+                        * as we go.
+                        */
+                       context.rt_index = rt_index;
+                       context.sublevels_up = 0;
+                       context.rel = heap_open(relid, NoLock);
+                       context.targetlist = NIL;
+                       context.colnames = NIL;
+                       context.vars_processed = NIL;
+
+                       security_barrier_replace_vars((Node *) parse, &context);
+                       security_barrier_replace_vars((Node *) tlist, &context);
+
+                       heap_close(context.rel, NoLock);
+
+                       /* Now we know what columns the subquery needs to expose */
+                       rte->subquery->targetList = context.targetlist;
+                       rte->eref = makeAlias(rte->eref->aliasname, context.colnames);
+
+                       break;
+
+               case RTE_SUBQUERY:
+                       /*
+                        * Build a new subquery that includes all the same columns as the
+                        * original subquery.
+                        */
+                       subquery = makeNode(Query);
+                       subquery->commandType = CMD_SELECT;
+                       subquery->querySource = QSRC_INSTEAD_RULE;
+                       subquery->targetList = NIL;
+
+                       foreach(cell, rte->subquery->targetList)
+                       {
+                               TargetEntry        *tle;
+                               Var                        *var;
+
+                               tle = (TargetEntry *) lfirst(cell);
+                               var = makeVarFromTargetEntry(1, tle);
+
+                               tle = makeTargetEntry((Expr *) var,
+                                                                         list_length(subquery->targetList) + 1,
+                                                                         pstrdup(tle->resname),
+                                                                         tle->resjunk);
+                               subquery->targetList = lappend(subquery->targetList, tle);
+                       }
+
+                       subrte = makeNode(RangeTblEntry);
+                       subrte->rtekind = RTE_SUBQUERY;
+                       subrte->subquery = rte->subquery;
+                       subrte->security_barrier = rte->security_barrier;
+                       subrte->eref = copyObject(rte->eref);
+                       subrte->inFromCl = true;
+                       subquery->rtable = list_make1(subrte);
+
+                       subrtr = makeNode(RangeTblRef);
+                       subrtr->rtindex = 1;
+                       subquery->jointree = makeFromExpr(list_make1(subrtr), qual);
+                       subquery->hasSubLinks = checkExprHasSubLink(qual);
+
+                       rte->subquery = subquery;
+                       rte->security_barrier = true;
+
+                       break;
+
+               default:
+                       elog(ERROR, "invalid range table entry for security barrier qual");
+       }
+}
+
+
+/*
+ * security_barrier_replace_vars -
+ *       Apply security barrier variable replacement to an expression tree.
+ *
+ * This also builds/updates a targetlist with entries for each replacement
+ * variable that needs to be exposed by the security barrier subquery RTE.
+ *
+ * NOTE: although this has the form of a walker, we cheat and modify the
+ * nodes in-place.     The given expression tree should have been copied
+ * earlier to ensure that no unwanted side-effects occur!
+ */
+static void
+security_barrier_replace_vars(Node *node,
+                                                         security_barrier_replace_vars_context *context)
+{
+       /*
+        * Must be prepared to start with a Query or a bare expression tree; if
+        * it's a Query, go straight to query_tree_walker to make sure that
+        * sublevels_up doesn't get incremented prematurely.
+        */
+       if (node && IsA(node, Query))
+               query_tree_walker((Query *) node,
+                                                 security_barrier_replace_vars_walker,
+                                                 (void *) context, 0);
+       else
+               security_barrier_replace_vars_walker(node, context);
+}
+
+static bool
+security_barrier_replace_vars_walker(Node *node,
+                                                                        security_barrier_replace_vars_context *context)
+{
+       if (node == NULL)
+               return false;
+
+       if (IsA(node, Var))
+       {
+               Var                *var = (Var *) node;
+
+               /*
+                * Note that the same Var may be present in different lists, so we
+                * need to take care not to process it multiple times.
+                */
+               if (var->varno == context->rt_index &&
+                       var->varlevelsup == context->sublevels_up &&
+                       !list_member_ptr(context->vars_processed, var))
+               {
+                       /*
+                        * Found a matching variable. Make sure that it is in the subquery
+                        * targetlist and map its attno accordingly.
+                        */
+                       AttrNumber      attno;
+                       ListCell   *l;
+                       TargetEntry *tle;
+                       char       *attname;
+                       Var                *newvar;
+
+                       /* Search for the base attribute in the subquery targetlist */
+                       attno = InvalidAttrNumber;
+                       foreach(l, context->targetlist)
+                       {
+                               tle = (TargetEntry *) lfirst(l);
+                               attno++;
+
+                               Assert(IsA(tle->expr, Var));
+                               if (((Var *) tle->expr)->varattno == var->varattno &&
+                                       ((Var *) tle->expr)->varcollid == var->varcollid)
+                               {
+                                       /* Map the variable onto this subquery targetlist entry */
+                                       var->varattno = attno;
+                                       return false;
+                               }
+                       }
+
+                       /* Not in the subquery targetlist, so add it. Get its name. */
+                       if (var->varattno < 0)
+                       {
+                               Form_pg_attribute att_tup;
+
+                               att_tup = SystemAttributeDefinition(var->varattno,
+                                                                                                       context->rel->rd_rel->relhasoids);
+                               attname = NameStr(att_tup->attname);
+                       }
+                       else if (var->varattno == InvalidAttrNumber)
+                       {
+                               attname = "wholerow";
+                       }
+                       else if (var->varattno <= context->rel->rd_att->natts)
+                       {
+                               Form_pg_attribute att_tup;
+
+                               att_tup = context->rel->rd_att->attrs[var->varattno - 1];
+                               attname = NameStr(att_tup->attname);
+                       }
+                       else
+                       {
+                               elog(ERROR, "invalid attribute number %d in security_barrier_replace_vars", var->varattno);
+                       }
+
+                       /* New variable for subquery targetlist */
+                       newvar = copyObject(var);
+                       newvar->varno = 1;
+
+                       attno = list_length(context->targetlist) + 1;
+                       tle = makeTargetEntry((Expr *) newvar,
+                                                                 attno,
+                                                                 pstrdup(attname),
+                                                                 false);
+
+                       context->targetlist = lappend(context->targetlist, tle);
+
+                       context->colnames = lappend(context->colnames,
+                                                                               makeString(pstrdup(attname)));
+
+                       /* Update the outer query's variable */
+                       var->varattno = attno;
+
+                       /* Remember this Var so that we don't process it again */
+                       context->vars_processed = lappend(context->vars_processed, var);
+               }
+               return false;
+       }
+
+       if (IsA(node, Query))
+       {
+               /* Recurse into subselects */
+               bool            result;
+
+               context->sublevels_up++;
+               result = query_tree_walker((Query *) node,
+                                                                  security_barrier_replace_vars_walker,
+                                                                  (void *) context, 0);
+               context->sublevels_up--;
+               return result;
+       }
+
+       return expression_tree_walker(node, security_barrier_replace_vars_walker,
+                                                                 (void *) context);
+}
index 52dcc720de7cc491057e64570ab1155b0cb943fc..cdf541d34d5f55c5d4e333b1f09251ec555f89be 100644 (file)
@@ -55,6 +55,7 @@ typedef struct
 {
        PlannerInfo *root;
        AppendRelInfo *appinfo;
+       int                     sublevels_up;
 } adjust_appendrel_attrs_context;
 
 static Plan *recurse_set_operations(Node *setOp, PlannerInfo *root,
@@ -1580,8 +1581,9 @@ translate_col_privs(const Bitmapset *parent_privs,
  *       child rel instead.  We also update rtindexes appearing outside Vars,
  *       such as resultRelation and jointree relids.
  *
- * Note: this is only applied after conversion of sublinks to subplans,
- * so we don't need to cope with recursion into sub-queries.
+ * Note: this is applied after conversion of sublinks to subplans in the
+ * query jointree, but there may still be sublinks in the security barrier
+ * quals of RTEs, so we do need to cope with recursion into sub-queries.
  *
  * Note: this is not hugely different from what pullup_replace_vars() does;
  * maybe we should try to fold the two routines together.
@@ -1594,9 +1596,12 @@ adjust_appendrel_attrs(PlannerInfo *root, Node *node, AppendRelInfo *appinfo)
 
        context.root = root;
        context.appinfo = appinfo;
+       context.sublevels_up = 0;
 
        /*
-        * Must be prepared to start with a Query or a bare expression tree.
+        * Must be prepared to start with a Query or a bare expression tree; if
+        * it's a Query, go straight to query_tree_walker to make sure that
+        * sublevels_up doesn't get incremented prematurely.
         */
        if (node && IsA(node, Query))
        {
@@ -1635,7 +1640,7 @@ adjust_appendrel_attrs_mutator(Node *node,
        {
                Var                *var = (Var *) copyObject(node);
 
-               if (var->varlevelsup == 0 &&
+               if (var->varlevelsup == context->sublevels_up &&
                        var->varno == appinfo->parent_relid)
                {
                        var->varno = appinfo->child_relid;
@@ -1652,6 +1657,7 @@ adjust_appendrel_attrs_mutator(Node *node,
                                if (newnode == NULL)
                                        elog(ERROR, "attribute %d of relation \"%s\" does not exist",
                                                 var->varattno, get_rel_name(appinfo->parent_reloid));
+                               ((Var *) newnode)->varlevelsup += context->sublevels_up;
                                return newnode;
                        }
                        else if (var->varattno == 0)
@@ -1694,10 +1700,16 @@ adjust_appendrel_attrs_mutator(Node *node,
                                        RowExpr    *rowexpr;
                                        List       *fields;
                                        RangeTblEntry *rte;
+                                       ListCell   *lc;
 
                                        rte = rt_fetch(appinfo->parent_relid,
                                                                   context->root->parse->rtable);
                                        fields = (List *) copyObject(appinfo->translated_vars);
+                                       foreach(lc, fields)
+                                       {
+                                               Var                *field = (Var *) lfirst(lc);
+                                               field->varlevelsup += context->sublevels_up;
+                                       }
                                        rowexpr = makeNode(RowExpr);
                                        rowexpr->args = fields;
                                        rowexpr->row_typeid = var->vartype;
@@ -1716,7 +1728,8 @@ adjust_appendrel_attrs_mutator(Node *node,
        {
                CurrentOfExpr *cexpr = (CurrentOfExpr *) copyObject(node);
 
-               if (cexpr->cvarno == appinfo->parent_relid)
+               if (context->sublevels_up == 0 &&
+                       cexpr->cvarno == appinfo->parent_relid)
                        cexpr->cvarno = appinfo->child_relid;
                return (Node *) cexpr;
        }
@@ -1724,7 +1737,8 @@ adjust_appendrel_attrs_mutator(Node *node,
        {
                RangeTblRef *rtr = (RangeTblRef *) copyObject(node);
 
-               if (rtr->rtindex == appinfo->parent_relid)
+               if (context->sublevels_up == 0 &&
+                       rtr->rtindex == appinfo->parent_relid)
                        rtr->rtindex = appinfo->child_relid;
                return (Node *) rtr;
        }
@@ -1737,7 +1751,8 @@ adjust_appendrel_attrs_mutator(Node *node,
                                                                                          adjust_appendrel_attrs_mutator,
                                                                                                 (void *) context);
                /* now fix JoinExpr's rtindex (probably never happens) */
-               if (j->rtindex == appinfo->parent_relid)
+               if (context->sublevels_up == 0 &&
+                       j->rtindex == appinfo->parent_relid)
                        j->rtindex = appinfo->child_relid;
                return (Node *) j;
        }
@@ -1750,7 +1765,7 @@ adjust_appendrel_attrs_mutator(Node *node,
                                                                                          adjust_appendrel_attrs_mutator,
                                                                                                                 (void *) context);
                /* now fix PlaceHolderVar's relid sets */
-               if (phv->phlevelsup == 0)
+               if (phv->phlevelsup == context->sublevels_up)
                        phv->phrels = adjust_relid_set(phv->phrels,
                                                                                   appinfo->parent_relid,
                                                                                   appinfo->child_relid);
@@ -1822,12 +1837,29 @@ adjust_appendrel_attrs_mutator(Node *node,
                return (Node *) newinfo;
        }
 
-       /*
-        * NOTE: we do not need to recurse into sublinks, because they should
-        * already have been converted to subplans before we see them.
-        */
-       Assert(!IsA(node, SubLink));
-       Assert(!IsA(node, Query));
+       if (IsA(node, Query))
+       {
+               /*
+                * Recurse into sublink subqueries. This should only be possible in
+                * security barrier quals of top-level RTEs. All other sublinks should
+                * have already been converted to subplans during expression
+                * preprocessing, but this doesn't happen for security barrier quals,
+                * since they are destined to become quals of a subquery RTE, which
+                * will be recursively planned, and so should not be preprocessed at
+                * this stage.
+                *
+                * We don't explicitly Assert() for securityQuals here simply because
+                * it's not trivial to do so.
+                */
+               Query      *newnode;
+
+               context->sublevels_up++;
+               newnode = query_tree_mutator((Query *) node,
+                                                                        adjust_appendrel_attrs_mutator,
+                                                                        (void *) context, 0);
+               context->sublevels_up--;
+               return (Node *) newnode;
+       }
 
        return expression_tree_mutator(node, adjust_appendrel_attrs_mutator,
                                                                   (void *) context);
index 5dbcce3e550b0ed5a76e86361fc4442c933e771c..caed8caee6b222ee3b082a6a023b18219ce2a2a7 100644 (file)
@@ -2023,8 +2023,7 @@ view_col_is_auto_updatable(RangeTblRef *rtr, TargetEntry *tle)
  * updatable.
  */
 const char *
-view_query_is_auto_updatable(Query *viewquery, bool security_barrier,
-                                                        bool check_cols)
+view_query_is_auto_updatable(Query *viewquery, bool check_cols)
 {
        RangeTblRef *rtr;
        RangeTblEntry *base_rte;
@@ -2097,14 +2096,6 @@ view_query_is_auto_updatable(Query *viewquery, bool security_barrier,
        if (expression_returns_set((Node *) viewquery->targetList))
                return gettext_noop("Views that return set-returning functions are not automatically updatable.");
 
-       /*
-        * For now, we also don't support security-barrier views, because of the
-        * difficulty of keeping upper-level qual expressions away from
-        * lower-level data.  This might get relaxed in the future.
-        */
-       if (security_barrier)
-               return gettext_noop("Security-barrier views are not automatically updatable.");
-
        /*
         * The view query should select from a single base relation, which must be
         * a table or another view.
@@ -2353,9 +2344,7 @@ relation_is_updatable(Oid reloid,
        {
                Query      *viewquery = get_view_query(rel);
 
-               if (view_query_is_auto_updatable(viewquery,
-                                                                                RelationIsSecurityView(rel),
-                                                                                false) == NULL)
+               if (view_query_is_auto_updatable(viewquery, false) == NULL)
                {
                        Bitmapset  *updatable_cols;
                        int                     auto_events;
@@ -2510,7 +2499,6 @@ rewriteTargetView(Query *parsetree, Relation view)
 
        auto_update_detail =
                view_query_is_auto_updatable(viewquery,
-                                                                        RelationIsSecurityView(view),
                                                                         parsetree->commandType != CMD_DELETE);
 
        if (auto_update_detail)
@@ -2713,6 +2701,14 @@ rewriteTargetView(Query *parsetree, Relation view)
        new_rte->modifiedCols = adjust_view_column_set(view_rte->modifiedCols,
                                                                                                   view_targetlist);
 
+       /*
+        * Move any security barrier quals from the view RTE onto the new target
+        * RTE.  Any such quals should now apply to the new target RTE and will not
+        * reference the original view RTE in the rewritten query.
+        */
+       new_rte->securityQuals = view_rte->securityQuals;
+       view_rte->securityQuals = NIL;
+
        /*
         * For UPDATE/DELETE, rewriteTargetListUD will have added a wholerow junk
         * TLE for the view to the end of the targetlist, which we no longer need.
@@ -2793,6 +2789,10 @@ rewriteTargetView(Query *parsetree, Relation view)
         * only adjust their varnos to reference the new target (just the same as
         * we did with the view targetlist).
         *
+        * Note that there is special-case handling for the quals of a security
+        * barrier view, since they need to be kept separate from any user-supplied
+        * quals, so these quals are kept on the new target RTE.
+        *
         * For INSERT, the view's quals can be ignored in the main query.
         */
        if (parsetree->commandType != CMD_INSERT &&
@@ -2801,7 +2801,25 @@ rewriteTargetView(Query *parsetree, Relation view)
                Node       *viewqual = (Node *) copyObject(viewquery->jointree->quals);
 
                ChangeVarNodes(viewqual, base_rt_index, new_rt_index, 0);
-               AddQual(parsetree, (Node *) viewqual);
+
+               if (RelationIsSecurityView(view))
+               {
+                       /*
+                        * Note: the parsetree has been mutated, so the new_rte pointer is
+                        * stale and needs to be re-computed.
+                        */
+                       new_rte = rt_fetch(new_rt_index, parsetree->rtable);
+                       new_rte->securityQuals = lcons(viewqual, new_rte->securityQuals);
+
+                       /*
+                        * Make sure that the query is marked correctly if the added qual
+                        * has sublinks.
+                        */
+                       if (!parsetree->hasSubLinks)
+                               parsetree->hasSubLinks = checkExprHasSubLink(viewqual);
+               }
+               else
+                       AddQual(parsetree, (Node *) viewqual);
        }
 
        /*
@@ -2863,9 +2881,8 @@ rewriteTargetView(Query *parsetree, Relation view)
                                 * Make sure that the query is marked correctly if the added
                                 * qual has sublinks.  We can skip this check if the query is
                                 * already marked, or if the command is an UPDATE, in which
-                                * case the same qual will have already been added to the
-                                * query's WHERE clause, and AddQual will have already done
-                                * this check.
+                                * case the same qual will have already been added, and this
+                                * check will already have been done.
                                 */
                                if (!parsetree->hasSubLinks &&
                                        parsetree->commandType != CMD_UPDATE)
index b5011af3e7faeedc6646cb161e3746a5061ec55a..18d499100889b4f987379233302cd280ee9127fd 100644 (file)
@@ -801,6 +801,7 @@ typedef struct RangeTblEntry
        Oid                     checkAsUser;    /* if valid, check access as this role */
        Bitmapset  *selectedCols;       /* columns needing SELECT permission */
        Bitmapset  *modifiedCols;       /* columns needing INSERT/UPDATE permission */
+       List       *securityQuals;      /* any security barrier quals to apply */
 } RangeTblEntry;
 
 /*
index 0f5a7d36ec1f4b917ec535255453e31121e210bf..f5fc7e8e71351a9b334b2bfb44ff22f1f27310a5 100644 (file)
@@ -35,6 +35,11 @@ extern Relids get_relids_for_join(PlannerInfo *root, int joinrelid);
 extern Node *negate_clause(Node *node);
 extern Expr *canonicalize_qual(Expr *qual);
 
+/*
+ * prototypes for prepsecurity.c
+ */
+extern void expand_security_quals(PlannerInfo *root, List *tlist);
+
 /*
  * prototypes for preptlist.c
  */
index 22551ece3ec7caa449d55f04b878311f0b279a42..1b5121314d34965ed7d5f8b87f0ce0d425cfd39f 100644 (file)
@@ -25,7 +25,6 @@ extern void AcquireRewriteLocks(Query *parsetree,
 extern Node *build_column_default(Relation rel, int attrno);
 extern Query *get_view_query(Relation view);
 extern const char *view_query_is_auto_updatable(Query *viewquery,
-                                                                                bool security_barrier,
                                                                                 bool check_cols);
 extern int     relation_is_updatable(Oid reloid,
                                                  bool include_triggers,
index 91d163961060f8aca194ead3c202b197ecd1018b..f6db582afda509885adec397cbba454c4dcd2979 100644 (file)
@@ -252,7 +252,7 @@ CREATE VIEW mysecview4 WITH (security_barrier)
        AS SELECT * FROM tbl1 WHERE a <> 0;
 CREATE VIEW mysecview5 WITH (security_barrier=100)     -- Error
        AS SELECT * FROM tbl1 WHERE a > 100;
-ERROR:  security_barrier requires a Boolean value
+ERROR:  invalid value for boolean option "security_barrier": 100
 CREATE VIEW mysecview6 WITH (invalid_option)           -- Error
        AS SELECT * FROM tbl1 WHERE a < 100;
 ERROR:  unrecognized parameter "invalid_option"
index 99c9165a95fbd6d6a0d9db01fbb75ce31b651020..83a33772cd6d7ce7998306b5070bfebed58adb3d 100644 (file)
@@ -22,12 +22,10 @@ CREATE VIEW rw_view14 AS SELECT ctid, a, b FROM base_tbl; -- System columns may
 CREATE VIEW rw_view15 AS SELECT a, upper(b) FROM base_tbl; -- Expression/function may be part of an updatable view
 CREATE VIEW rw_view16 AS SELECT a, b, a AS aa FROM base_tbl; -- Repeated column may be part of an updatable view
 CREATE VIEW ro_view17 AS SELECT * FROM ro_view1; -- Base relation not updatable
-CREATE VIEW ro_view18 WITH (security_barrier = true)
-  AS SELECT * FROM base_tbl; -- Security barrier views not updatable
-CREATE VIEW ro_view19 AS SELECT * FROM (VALUES(1)) AS tmp(a); -- VALUES in rangetable
+CREATE VIEW ro_view18 AS SELECT * FROM (VALUES(1)) AS tmp(a); -- VALUES in rangetable
 CREATE SEQUENCE seq;
-CREATE VIEW ro_view20 AS SELECT * FROM seq; -- View based on a sequence
-CREATE VIEW ro_view21 AS SELECT a, b, generate_series(1, a) g FROM base_tbl; -- SRF in targetlist not supported
+CREATE VIEW ro_view19 AS SELECT * FROM seq; -- View based on a sequence
+CREATE VIEW ro_view20 AS SELECT a, b, generate_series(1, a) g FROM base_tbl; -- SRF in targetlist not supported
 SELECT table_name, is_insertable_into
   FROM information_schema.tables
  WHERE table_name LIKE E'r_\\_view%'
@@ -44,7 +42,6 @@ SELECT table_name, is_insertable_into
  ro_view19  | NO
  ro_view2   | NO
  ro_view20  | NO
- ro_view21  | NO
  ro_view3   | NO
  ro_view4   | NO
  ro_view5   | NO
@@ -55,7 +52,7 @@ SELECT table_name, is_insertable_into
  rw_view14  | YES
  rw_view15  | YES
  rw_view16  | YES
-(21 rows)
+(20 rows)
 
 SELECT table_name, is_updatable, is_insertable_into
   FROM information_schema.views
@@ -73,7 +70,6 @@ SELECT table_name, is_updatable, is_insertable_into
  ro_view19  | NO           | NO
  ro_view2   | NO           | NO
  ro_view20  | NO           | NO
- ro_view21  | NO           | NO
  ro_view3   | NO           | NO
  ro_view4   | NO           | NO
  ro_view5   | NO           | NO
@@ -84,7 +80,7 @@ SELECT table_name, is_updatable, is_insertable_into
  rw_view14  | YES          | YES
  rw_view15  | YES          | YES
  rw_view16  | YES          | YES
-(21 rows)
+(20 rows)
 
 SELECT table_name, column_name, is_updatable
   FROM information_schema.columns
@@ -103,23 +99,21 @@ SELECT table_name, column_name, is_updatable
  ro_view17  | a             | NO
  ro_view17  | b             | NO
  ro_view18  | a             | NO
- ro_view18  | b             | NO
- ro_view19  | a             | NO
+ ro_view19  | sequence_name | NO
+ ro_view19  | last_value    | NO
+ ro_view19  | start_value   | NO
+ ro_view19  | increment_by  | NO
+ ro_view19  | max_value     | NO
+ ro_view19  | min_value     | NO
+ ro_view19  | cache_value   | NO
+ ro_view19  | log_cnt       | NO
+ ro_view19  | is_cycled     | NO
+ ro_view19  | is_called     | NO
  ro_view2   | a             | NO
  ro_view2   | b             | NO
- ro_view20  | sequence_name | NO
- ro_view20  | last_value    | NO
- ro_view20  | start_value   | NO
- ro_view20  | increment_by  | NO
- ro_view20  | max_value     | NO
- ro_view20  | min_value     | NO
- ro_view20  | cache_value   | NO
- ro_view20  | log_cnt       | NO
- ro_view20  | is_cycled     | NO
- ro_view20  | is_called     | NO
- ro_view21  | a             | NO
- ro_view21  | b             | NO
- ro_view21  | g             | NO
+ ro_view20  | a             | NO
+ ro_view20  | b             | NO
+ ro_view20  | g             | NO
  ro_view3   | ?column?      | NO
  ro_view4   | count         | NO
  ro_view5   | a             | NO
@@ -140,7 +134,7 @@ SELECT table_name, column_name, is_updatable
  rw_view16  | a             | YES
  rw_view16  | b             | YES
  rw_view16  | aa            | YES
-(48 rows)
+(46 rows)
 
 -- Read-only views
 DELETE FROM ro_view1;
@@ -268,24 +262,20 @@ INSERT INTO ro_view17 VALUES (3, 'ROW 3');
 ERROR:  cannot insert into view "ro_view1"
 DETAIL:  Views containing DISTINCT are not automatically updatable.
 HINT:  To enable inserting into the view, provide an INSTEAD OF INSERT trigger or an unconditional ON INSERT DO INSTEAD rule.
-INSERT INTO ro_view18 VALUES (3, 'ROW 3');
-ERROR:  cannot insert into view "ro_view18"
-DETAIL:  Security-barrier views are not automatically updatable.
-HINT:  To enable inserting into the view, provide an INSTEAD OF INSERT trigger or an unconditional ON INSERT DO INSTEAD rule.
-DELETE FROM ro_view19;
-ERROR:  cannot delete from view "ro_view19"
+DELETE FROM ro_view18;
+ERROR:  cannot delete from view "ro_view18"
 DETAIL:  Views that do not select from a single table or view are not automatically updatable.
 HINT:  To enable deleting from the view, provide an INSTEAD OF DELETE trigger or an unconditional ON DELETE DO INSTEAD rule.
-UPDATE ro_view20 SET max_value=1000;
-ERROR:  cannot update view "ro_view20"
+UPDATE ro_view19 SET max_value=1000;
+ERROR:  cannot update view "ro_view19"
 DETAIL:  Views that do not select from a single table or view are not automatically updatable.
 HINT:  To enable updating the view, provide an INSTEAD OF UPDATE trigger or an unconditional ON UPDATE DO INSTEAD rule.
-UPDATE ro_view21 SET b=upper(b);
-ERROR:  cannot update view "ro_view21"
+UPDATE ro_view20 SET b=upper(b);
+ERROR:  cannot update view "ro_view20"
 DETAIL:  Views that return set-returning functions are not automatically updatable.
 HINT:  To enable updating the view, provide an INSTEAD OF UPDATE trigger or an unconditional ON UPDATE DO INSTEAD rule.
 DROP TABLE base_tbl CASCADE;
-NOTICE:  drop cascades to 17 other objects
+NOTICE:  drop cascades to 16 other objects
 DETAIL:  drop cascades to view ro_view1
 drop cascades to view ro_view17
 drop cascades to view ro_view2
@@ -299,13 +289,12 @@ drop cascades to view ro_view11
 drop cascades to view ro_view13
 drop cascades to view rw_view15
 drop cascades to view rw_view16
-drop cascades to view ro_view18
-drop cascades to view ro_view21
+drop cascades to view ro_view20
 drop cascades to view ro_view4
 drop cascades to view rw_view14
-DROP VIEW ro_view10, ro_view12, ro_view19;
+DROP VIEW ro_view10, ro_view12, ro_view18;
 DROP SEQUENCE seq CASCADE;
-NOTICE:  drop cascades to view ro_view20
+NOTICE:  drop cascades to view ro_view19
 -- simple updatable view
 CREATE TABLE base_tbl (a int PRIMARY KEY, b text DEFAULT 'Unspecified');
 INSERT INTO base_tbl SELECT i, 'Row ' || i FROM generate_series(-2, 2) g(i);
@@ -1740,3 +1729,554 @@ DROP TABLE base_tbl CASCADE;
 NOTICE:  drop cascades to 2 other objects
 DETAIL:  drop cascades to view rw_view1
 drop cascades to view rw_view2
+-- security barrier view
+CREATE TABLE base_tbl (person text, visibility text);
+INSERT INTO base_tbl VALUES ('Tom', 'public'),
+                            ('Dick', 'private'),
+                            ('Harry', 'public');
+CREATE VIEW rw_view1 AS
+  SELECT person FROM base_tbl WHERE visibility = 'public';
+CREATE FUNCTION snoop(anyelement)
+RETURNS boolean AS
+$$
+BEGIN
+  RAISE NOTICE 'snooped value: %', $1;
+  RETURN true;
+END;
+$$
+LANGUAGE plpgsql COST 0.000001;
+CREATE OR REPLACE FUNCTION leakproof(anyelement)
+RETURNS boolean AS
+$$
+BEGIN
+  RETURN true;
+END;
+$$
+LANGUAGE plpgsql STRICT IMMUTABLE LEAKPROOF;
+SELECT * FROM rw_view1 WHERE snoop(person);
+NOTICE:  snooped value: Tom
+NOTICE:  snooped value: Dick
+NOTICE:  snooped value: Harry
+ person 
+--------
+ Tom
+ Harry
+(2 rows)
+
+UPDATE rw_view1 SET person=person WHERE snoop(person);
+NOTICE:  snooped value: Tom
+NOTICE:  snooped value: Dick
+NOTICE:  snooped value: Harry
+DELETE FROM rw_view1 WHERE NOT snoop(person);
+NOTICE:  snooped value: Dick
+NOTICE:  snooped value: Tom
+NOTICE:  snooped value: Harry
+ALTER VIEW rw_view1 SET (security_barrier = true);
+SELECT table_name, is_insertable_into
+  FROM information_schema.tables
+ WHERE table_name = 'rw_view1';
+ table_name | is_insertable_into 
+------------+--------------------
+ rw_view1   | YES
+(1 row)
+
+SELECT table_name, is_updatable, is_insertable_into
+  FROM information_schema.views
+ WHERE table_name = 'rw_view1';
+ table_name | is_updatable | is_insertable_into 
+------------+--------------+--------------------
+ rw_view1   | YES          | YES
+(1 row)
+
+SELECT table_name, column_name, is_updatable
+  FROM information_schema.columns
+ WHERE table_name = 'rw_view1'
+ ORDER BY ordinal_position;
+ table_name | column_name | is_updatable 
+------------+-------------+--------------
+ rw_view1   | person      | YES
+(1 row)
+
+SELECT * FROM rw_view1 WHERE snoop(person);
+NOTICE:  snooped value: Tom
+NOTICE:  snooped value: Harry
+ person 
+--------
+ Tom
+ Harry
+(2 rows)
+
+UPDATE rw_view1 SET person=person WHERE snoop(person);
+NOTICE:  snooped value: Tom
+NOTICE:  snooped value: Harry
+DELETE FROM rw_view1 WHERE NOT snoop(person);
+NOTICE:  snooped value: Tom
+NOTICE:  snooped value: Harry
+EXPLAIN (costs off) SELECT * FROM rw_view1 WHERE snoop(person);
+                  QUERY PLAN                   
+-----------------------------------------------
+ Subquery Scan on rw_view1
+   Filter: snoop(rw_view1.person)
+   ->  Seq Scan on base_tbl
+         Filter: (visibility = 'public'::text)
+(4 rows)
+
+EXPLAIN (costs off) UPDATE rw_view1 SET person=person WHERE snoop(person);
+                     QUERY PLAN                      
+-----------------------------------------------------
+ Update on base_tbl base_tbl_1
+   ->  Subquery Scan on base_tbl
+         Filter: snoop(base_tbl.person)
+         ->  Seq Scan on base_tbl base_tbl_2
+               Filter: (visibility = 'public'::text)
+(5 rows)
+
+EXPLAIN (costs off) DELETE FROM rw_view1 WHERE NOT snoop(person);
+                     QUERY PLAN                      
+-----------------------------------------------------
+ Delete on base_tbl base_tbl_1
+   ->  Subquery Scan on base_tbl
+         Filter: (NOT snoop(base_tbl.person))
+         ->  Seq Scan on base_tbl base_tbl_2
+               Filter: (visibility = 'public'::text)
+(5 rows)
+
+-- security barrier view on top of security barrier view
+CREATE VIEW rw_view2 WITH (security_barrier = true) AS
+  SELECT * FROM rw_view1 WHERE snoop(person);
+SELECT table_name, is_insertable_into
+  FROM information_schema.tables
+ WHERE table_name = 'rw_view2';
+ table_name | is_insertable_into 
+------------+--------------------
+ rw_view2   | YES
+(1 row)
+
+SELECT table_name, is_updatable, is_insertable_into
+  FROM information_schema.views
+ WHERE table_name = 'rw_view2';
+ table_name | is_updatable | is_insertable_into 
+------------+--------------+--------------------
+ rw_view2   | YES          | YES
+(1 row)
+
+SELECT table_name, column_name, is_updatable
+  FROM information_schema.columns
+ WHERE table_name = 'rw_view2'
+ ORDER BY ordinal_position;
+ table_name | column_name | is_updatable 
+------------+-------------+--------------
+ rw_view2   | person      | YES
+(1 row)
+
+SELECT * FROM rw_view2 WHERE snoop(person);
+NOTICE:  snooped value: Tom
+NOTICE:  snooped value: Tom
+NOTICE:  snooped value: Harry
+NOTICE:  snooped value: Harry
+ person 
+--------
+ Tom
+ Harry
+(2 rows)
+
+UPDATE rw_view2 SET person=person WHERE snoop(person);
+NOTICE:  snooped value: Tom
+NOTICE:  snooped value: Tom
+NOTICE:  snooped value: Harry
+NOTICE:  snooped value: Harry
+DELETE FROM rw_view2 WHERE NOT snoop(person);
+NOTICE:  snooped value: Tom
+NOTICE:  snooped value: Tom
+NOTICE:  snooped value: Harry
+NOTICE:  snooped value: Harry
+EXPLAIN (costs off) SELECT * FROM rw_view2 WHERE snoop(person);
+                     QUERY PLAN                      
+-----------------------------------------------------
+ Subquery Scan on rw_view2
+   Filter: snoop(rw_view2.person)
+   ->  Subquery Scan on rw_view1
+         Filter: snoop(rw_view1.person)
+         ->  Seq Scan on base_tbl
+               Filter: (visibility = 'public'::text)
+(6 rows)
+
+EXPLAIN (costs off) UPDATE rw_view2 SET person=person WHERE snoop(person);
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ Update on base_tbl base_tbl_1
+   ->  Subquery Scan on base_tbl
+         Filter: snoop(base_tbl.person)
+         ->  Subquery Scan on base_tbl_2
+               Filter: snoop(base_tbl_2.person)
+               ->  Seq Scan on base_tbl base_tbl_3
+                     Filter: (visibility = 'public'::text)
+(7 rows)
+
+EXPLAIN (costs off) DELETE FROM rw_view2 WHERE NOT snoop(person);
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ Delete on base_tbl base_tbl_1
+   ->  Subquery Scan on base_tbl
+         Filter: (NOT snoop(base_tbl.person))
+         ->  Subquery Scan on base_tbl_2
+               Filter: snoop(base_tbl_2.person)
+               ->  Seq Scan on base_tbl base_tbl_3
+                     Filter: (visibility = 'public'::text)
+(7 rows)
+
+DROP TABLE base_tbl CASCADE;
+NOTICE:  drop cascades to 2 other objects
+DETAIL:  drop cascades to view rw_view1
+drop cascades to view rw_view2
+-- security barrier view on top of table with rules
+CREATE TABLE base_tbl(id int PRIMARY KEY, data text, deleted boolean);
+INSERT INTO base_tbl VALUES (1, 'Row 1', false), (2, 'Row 2', true);
+CREATE RULE base_tbl_ins_rule AS ON INSERT TO base_tbl
+  WHERE EXISTS (SELECT 1 FROM base_tbl t WHERE t.id = new.id)
+  DO INSTEAD
+    UPDATE base_tbl SET data = new.data, deleted = false WHERE id = new.id;
+CREATE RULE base_tbl_del_rule AS ON DELETE TO base_tbl
+  DO INSTEAD
+    UPDATE base_tbl SET deleted = true WHERE id = old.id;
+CREATE VIEW rw_view1 WITH (security_barrier=true) AS
+  SELECT id, data FROM base_tbl WHERE NOT deleted;
+SELECT * FROM rw_view1;
+ id | data  
+----+-------
+  1 | Row 1
+(1 row)
+
+EXPLAIN (costs off) DELETE FROM rw_view1 WHERE id = 1 AND snoop(data);
+                               QUERY PLAN                                
+-------------------------------------------------------------------------
+ Update on base_tbl base_tbl_1
+   ->  Nested Loop
+         ->  Index Scan using base_tbl_pkey on base_tbl base_tbl_1
+               Index Cond: (id = 1)
+         ->  Subquery Scan on base_tbl
+               Filter: snoop(base_tbl.data)
+               ->  Index Scan using base_tbl_pkey on base_tbl base_tbl_2
+                     Index Cond: (id = 1)
+                     Filter: (NOT deleted)
+(9 rows)
+
+DELETE FROM rw_view1 WHERE id = 1 AND snoop(data);
+NOTICE:  snooped value: Row 1
+EXPLAIN (costs off) INSERT INTO rw_view1 VALUES (2, 'New row 2');
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ Insert on base_tbl
+   InitPlan 1 (returns $0)
+     ->  Index Only Scan using base_tbl_pkey on base_tbl t
+           Index Cond: (id = 2)
+   ->  Result
+         One-Time Filter: ($0 IS NOT TRUE)
+ Update on base_tbl
+   InitPlan 1 (returns $0)
+     ->  Index Only Scan using base_tbl_pkey on base_tbl t
+           Index Cond: (id = 2)
+   ->  Result
+         One-Time Filter: $0
+         ->  Index Scan using base_tbl_pkey on base_tbl
+               Index Cond: (id = 2)
+(15 rows)
+
+INSERT INTO rw_view1 VALUES (2, 'New row 2');
+SELECT * FROM base_tbl;
+ id |   data    | deleted 
+----+-----------+---------
+  1 | Row 1     | t
+  2 | New row 2 | f
+(2 rows)
+
+DROP TABLE base_tbl CASCADE;
+NOTICE:  drop cascades to view rw_view1
+-- security barrier view based on inheiritance set
+CREATE TABLE t1 (a int, b float, c text);
+CREATE INDEX t1_a_idx ON t1(a);
+INSERT INTO t1
+SELECT i,i,'t1' FROM generate_series(1,10) g(i);
+CREATE TABLE t11 (d text) INHERITS (t1);
+CREATE INDEX t11_a_idx ON t11(a);
+INSERT INTO t11
+SELECT i,i,'t11','t11d' FROM generate_series(1,10) g(i);
+CREATE TABLE t12 (e int[]) INHERITS (t1);
+CREATE INDEX t12_a_idx ON t12(a);
+INSERT INTO t12
+SELECT i,i,'t12','{1,2}'::int[] FROM generate_series(1,10) g(i);
+CREATE TABLE t111 () INHERITS (t11, t12);
+NOTICE:  merging multiple inherited definitions of column "a"
+NOTICE:  merging multiple inherited definitions of column "b"
+NOTICE:  merging multiple inherited definitions of column "c"
+CREATE INDEX t111_a_idx ON t111(a);
+INSERT INTO t111
+SELECT i,i,'t111','t111d','{1,1,1}'::int[] FROM generate_series(1,10) g(i);
+CREATE VIEW v1 WITH (security_barrier=true) AS
+SELECT *, (SELECT d FROM t11 WHERE t11.a = t1.a LIMIT 1) AS d
+FROM t1
+WHERE a > 5 AND EXISTS(SELECT 1 FROM t12 WHERE t12.a = t1.a);
+SELECT * FROM v1 WHERE a=3; -- should not see anything
+ a | b | c | d 
+---+---+---+---
+(0 rows)
+
+SELECT * FROM v1 WHERE a=8;
+ a | b |  c   |  d   
+---+---+------+------
+ 8 | 8 | t1   | t11d
+ 8 | 8 | t11  | t11d
+ 8 | 8 | t12  | t11d
+ 8 | 8 | t111 | t11d
+(4 rows)
+
+EXPLAIN (VERBOSE, COSTS OFF)
+UPDATE v1 SET a=100 WHERE snoop(a) AND leakproof(a) AND a = 3;
+                                           QUERY PLAN                                            
+-------------------------------------------------------------------------------------------------
+ Update on public.t1 t1_4
+   ->  Subquery Scan on t1
+         Output: 100, t1.b, t1.c, t1.ctid
+         Filter: snoop(t1.a)
+         ->  Hash Join
+               Output: t1_5.ctid, t1_5.a, t1_5.b, t1_5.c
+               Hash Cond: (t12.a = t1_5.a)
+               ->  HashAggregate
+                     Output: t12.a
+                     Group Key: t12.a
+                     ->  Append
+                           ->  Seq Scan on public.t12
+                                 Output: t12.a
+                           ->  Seq Scan on public.t111
+                                 Output: t111.a
+               ->  Hash
+                     Output: t1_5.ctid, t1_5.a, t1_5.b, t1_5.c
+                     ->  Index Scan using t1_a_idx on public.t1 t1_5
+                           Output: t1_5.ctid, t1_5.a, t1_5.b, t1_5.c
+                           Index Cond: ((t1_5.a > 5) AND (t1_5.a = 3))
+                           Filter: leakproof(t1_5.a)
+   ->  Subquery Scan on t1_1
+         Output: 100, t1_1.b, t1_1.c, t1_1.d, t1_1.ctid
+         Filter: snoop(t1_1.a)
+         ->  Hash Join
+               Output: t11.ctid, t11.a, t11.b, t11.c, t11.d
+               Hash Cond: (t12_1.a = t11.a)
+               ->  HashAggregate
+                     Output: t12_1.a
+                     Group Key: t12_1.a
+                     ->  Append
+                           ->  Seq Scan on public.t12 t12_1
+                                 Output: t12_1.a
+                           ->  Seq Scan on public.t111 t111_1
+                                 Output: t111_1.a
+               ->  Hash
+                     Output: t11.ctid, t11.a, t11.b, t11.c, t11.d
+                     ->  Index Scan using t11_a_idx on public.t11
+                           Output: t11.ctid, t11.a, t11.b, t11.c, t11.d
+                           Index Cond: ((t11.a > 5) AND (t11.a = 3))
+                           Filter: leakproof(t11.a)
+   ->  Subquery Scan on t1_2
+         Output: 100, t1_2.b, t1_2.c, t1_2.e, t1_2.ctid
+         Filter: snoop(t1_2.a)
+         ->  Hash Join
+               Output: t12_2.ctid, t12_2.a, t12_2.b, t12_2.c, t12_2.e
+               Hash Cond: (t12_3.a = t12_2.a)
+               ->  HashAggregate
+                     Output: t12_3.a
+                     Group Key: t12_3.a
+                     ->  Append
+                           ->  Seq Scan on public.t12 t12_3
+                                 Output: t12_3.a
+                           ->  Seq Scan on public.t111 t111_2
+                                 Output: t111_2.a
+               ->  Hash
+                     Output: t12_2.ctid, t12_2.a, t12_2.b, t12_2.c, t12_2.e
+                     ->  Index Scan using t12_a_idx on public.t12 t12_2
+                           Output: t12_2.ctid, t12_2.a, t12_2.b, t12_2.c, t12_2.e
+                           Index Cond: ((t12_2.a > 5) AND (t12_2.a = 3))
+                           Filter: leakproof(t12_2.a)
+   ->  Subquery Scan on t1_3
+         Output: 100, t1_3.b, t1_3.c, t1_3.d, t1_3.e, t1_3.ctid
+         Filter: snoop(t1_3.a)
+         ->  Hash Join
+               Output: t111_3.ctid, t111_3.a, t111_3.b, t111_3.c, t111_3.d, t111_3.e
+               Hash Cond: (t12_4.a = t111_3.a)
+               ->  HashAggregate
+                     Output: t12_4.a
+                     Group Key: t12_4.a
+                     ->  Append
+                           ->  Seq Scan on public.t12 t12_4
+                                 Output: t12_4.a
+                           ->  Seq Scan on public.t111 t111_4
+                                 Output: t111_4.a
+               ->  Hash
+                     Output: t111_3.ctid, t111_3.a, t111_3.b, t111_3.c, t111_3.d, t111_3.e
+                     ->  Index Scan using t111_a_idx on public.t111 t111_3
+                           Output: t111_3.ctid, t111_3.a, t111_3.b, t111_3.c, t111_3.d, t111_3.e
+                           Index Cond: ((t111_3.a > 5) AND (t111_3.a = 3))
+                           Filter: leakproof(t111_3.a)
+(81 rows)
+
+UPDATE v1 SET a=100 WHERE snoop(a) AND leakproof(a) AND a = 3;
+SELECT * FROM v1 WHERE a=100; -- Nothing should have been changed to 100
+ a | b | c | d 
+---+---+---+---
+(0 rows)
+
+SELECT * FROM t1 WHERE a=100; -- Nothing should have been changed to 100
+ a | b | c 
+---+---+---
+(0 rows)
+
+EXPLAIN (VERBOSE, COSTS OFF)
+UPDATE v1 SET a=a+1 WHERE snoop(a) AND leakproof(a) AND a = 8;
+                                           QUERY PLAN                                            
+-------------------------------------------------------------------------------------------------
+ Update on public.t1 t1_4
+   ->  Subquery Scan on t1
+         Output: (t1.a + 1), t1.b, t1.c, t1.ctid
+         Filter: snoop(t1.a)
+         ->  Hash Join
+               Output: t1_5.a, t1_5.ctid, t1_5.b, t1_5.c
+               Hash Cond: (t12.a = t1_5.a)
+               ->  HashAggregate
+                     Output: t12.a
+                     Group Key: t12.a
+                     ->  Append
+                           ->  Seq Scan on public.t12
+                                 Output: t12.a
+                           ->  Seq Scan on public.t111
+                                 Output: t111.a
+               ->  Hash
+                     Output: t1_5.a, t1_5.ctid, t1_5.b, t1_5.c
+                     ->  Index Scan using t1_a_idx on public.t1 t1_5
+                           Output: t1_5.a, t1_5.ctid, t1_5.b, t1_5.c
+                           Index Cond: ((t1_5.a > 5) AND (t1_5.a = 8))
+                           Filter: leakproof(t1_5.a)
+   ->  Subquery Scan on t1_1
+         Output: (t1_1.a + 1), t1_1.b, t1_1.c, t1_1.d, t1_1.ctid
+         Filter: snoop(t1_1.a)
+         ->  Hash Join
+               Output: t11.a, t11.ctid, t11.b, t11.c, t11.d
+               Hash Cond: (t12_1.a = t11.a)
+               ->  HashAggregate
+                     Output: t12_1.a
+                     Group Key: t12_1.a
+                     ->  Append
+                           ->  Seq Scan on public.t12 t12_1
+                                 Output: t12_1.a
+                           ->  Seq Scan on public.t111 t111_1
+                                 Output: t111_1.a
+               ->  Hash
+                     Output: t11.a, t11.ctid, t11.b, t11.c, t11.d
+                     ->  Index Scan using t11_a_idx on public.t11
+                           Output: t11.a, t11.ctid, t11.b, t11.c, t11.d
+                           Index Cond: ((t11.a > 5) AND (t11.a = 8))
+                           Filter: leakproof(t11.a)
+   ->  Subquery Scan on t1_2
+         Output: (t1_2.a + 1), t1_2.b, t1_2.c, t1_2.e, t1_2.ctid
+         Filter: snoop(t1_2.a)
+         ->  Hash Join
+               Output: t12_2.a, t12_2.ctid, t12_2.b, t12_2.c, t12_2.e
+               Hash Cond: (t12_3.a = t12_2.a)
+               ->  HashAggregate
+                     Output: t12_3.a
+                     Group Key: t12_3.a
+                     ->  Append
+                           ->  Seq Scan on public.t12 t12_3
+                                 Output: t12_3.a
+                           ->  Seq Scan on public.t111 t111_2
+                                 Output: t111_2.a
+               ->  Hash
+                     Output: t12_2.a, t12_2.ctid, t12_2.b, t12_2.c, t12_2.e
+                     ->  Index Scan using t12_a_idx on public.t12 t12_2
+                           Output: t12_2.a, t12_2.ctid, t12_2.b, t12_2.c, t12_2.e
+                           Index Cond: ((t12_2.a > 5) AND (t12_2.a = 8))
+                           Filter: leakproof(t12_2.a)
+   ->  Subquery Scan on t1_3
+         Output: (t1_3.a + 1), t1_3.b, t1_3.c, t1_3.d, t1_3.e, t1_3.ctid
+         Filter: snoop(t1_3.a)
+         ->  Hash Join
+               Output: t111_3.a, t111_3.ctid, t111_3.b, t111_3.c, t111_3.d, t111_3.e
+               Hash Cond: (t12_4.a = t111_3.a)
+               ->  HashAggregate
+                     Output: t12_4.a
+                     Group Key: t12_4.a
+                     ->  Append
+                           ->  Seq Scan on public.t12 t12_4
+                                 Output: t12_4.a
+                           ->  Seq Scan on public.t111 t111_4
+                                 Output: t111_4.a
+               ->  Hash
+                     Output: t111_3.a, t111_3.ctid, t111_3.b, t111_3.c, t111_3.d, t111_3.e
+                     ->  Index Scan using t111_a_idx on public.t111 t111_3
+                           Output: t111_3.a, t111_3.ctid, t111_3.b, t111_3.c, t111_3.d, t111_3.e
+                           Index Cond: ((t111_3.a > 5) AND (t111_3.a = 8))
+                           Filter: leakproof(t111_3.a)
+(81 rows)
+
+UPDATE v1 SET a=a+1 WHERE snoop(a) AND leakproof(a) AND a = 8;
+NOTICE:  snooped value: 8
+NOTICE:  snooped value: 8
+NOTICE:  snooped value: 8
+NOTICE:  snooped value: 8
+SELECT * FROM v1 WHERE b=8;
+ a | b |  c   |  d   
+---+---+------+------
+ 9 | 8 | t111 | t11d
+ 9 | 8 | t12  | t11d
+ 9 | 8 | t11  | t11d
+ 9 | 8 | t1   | t11d
+(4 rows)
+
+DELETE FROM v1 WHERE snoop(a) AND leakproof(a); -- should not delete everything, just where a>5
+NOTICE:  snooped value: 10
+NOTICE:  snooped value: 9
+NOTICE:  snooped value: 9
+NOTICE:  snooped value: 6
+NOTICE:  snooped value: 7
+NOTICE:  snooped value: 10
+NOTICE:  snooped value: 9
+NOTICE:  snooped value: 9
+NOTICE:  snooped value: 6
+NOTICE:  snooped value: 7
+NOTICE:  snooped value: 10
+NOTICE:  snooped value: 9
+NOTICE:  snooped value: 9
+NOTICE:  snooped value: 6
+NOTICE:  snooped value: 7
+NOTICE:  snooped value: 6
+NOTICE:  snooped value: 7
+NOTICE:  snooped value: 9
+NOTICE:  snooped value: 10
+NOTICE:  snooped value: 9
+TABLE t1; -- verify all a<=5 are intact
+ a | b |  c   
+---+---+------
+ 1 | 1 | t1
+ 2 | 2 | t1
+ 3 | 3 | t1
+ 4 | 4 | t1
+ 5 | 5 | t1
+ 1 | 1 | t11
+ 2 | 2 | t11
+ 3 | 3 | t11
+ 4 | 4 | t11
+ 5 | 5 | t11
+ 1 | 1 | t12
+ 2 | 2 | t12
+ 3 | 3 | t12
+ 4 | 4 | t12
+ 5 | 5 | t12
+ 1 | 1 | t111
+ 2 | 2 | t111
+ 3 | 3 | t111
+ 4 | 4 | t111
+ 5 | 5 | t111
+(20 rows)
+
+DROP TABLE t1, t11, t12, t111 CASCADE;
+NOTICE:  drop cascades to view v1
+DROP FUNCTION snoop(anyelement);
+DROP FUNCTION leakproof(anyelement);
index a77cf197582e5e935631e45f8379155c6b64a153..eb7b17979ed0d688560d18636ced3cbabf282d79 100644 (file)
@@ -25,12 +25,10 @@ CREATE VIEW rw_view14 AS SELECT ctid, a, b FROM base_tbl; -- System columns may
 CREATE VIEW rw_view15 AS SELECT a, upper(b) FROM base_tbl; -- Expression/function may be part of an updatable view
 CREATE VIEW rw_view16 AS SELECT a, b, a AS aa FROM base_tbl; -- Repeated column may be part of an updatable view
 CREATE VIEW ro_view17 AS SELECT * FROM ro_view1; -- Base relation not updatable
-CREATE VIEW ro_view18 WITH (security_barrier = true)
-  AS SELECT * FROM base_tbl; -- Security barrier views not updatable
-CREATE VIEW ro_view19 AS SELECT * FROM (VALUES(1)) AS tmp(a); -- VALUES in rangetable
+CREATE VIEW ro_view18 AS SELECT * FROM (VALUES(1)) AS tmp(a); -- VALUES in rangetable
 CREATE SEQUENCE seq;
-CREATE VIEW ro_view20 AS SELECT * FROM seq; -- View based on a sequence
-CREATE VIEW ro_view21 AS SELECT a, b, generate_series(1, a) g FROM base_tbl; -- SRF in targetlist not supported
+CREATE VIEW ro_view19 AS SELECT * FROM seq; -- View based on a sequence
+CREATE VIEW ro_view20 AS SELECT a, b, generate_series(1, a) g FROM base_tbl; -- SRF in targetlist not supported
 
 SELECT table_name, is_insertable_into
   FROM information_schema.tables
@@ -87,13 +85,12 @@ SELECT * FROM base_tbl;
 DELETE FROM rw_view16 WHERE a=-3; -- should be OK
 -- Read-only views
 INSERT INTO ro_view17 VALUES (3, 'ROW 3');
-INSERT INTO ro_view18 VALUES (3, 'ROW 3');
-DELETE FROM ro_view19;
-UPDATE ro_view20 SET max_value=1000;
-UPDATE ro_view21 SET b=upper(b);
+DELETE FROM ro_view18;
+UPDATE ro_view19 SET max_value=1000;
+UPDATE ro_view20 SET b=upper(b);
 
 DROP TABLE base_tbl CASCADE;
-DROP VIEW ro_view10, ro_view12, ro_view19;
+DROP VIEW ro_view10, ro_view12, ro_view18;
 DROP SEQUENCE seq CASCADE;
 
 -- simple updatable view
@@ -828,3 +825,166 @@ CREATE VIEW rw_view2 AS
   SELECT * FROM rw_view1 WHERE a > b WITH LOCAL CHECK OPTION;
 INSERT INTO rw_view2 VALUES (2,3); -- ok, but not in view (doesn't fail rw_view2's check)
 DROP TABLE base_tbl CASCADE;
+
+-- security barrier view
+
+CREATE TABLE base_tbl (person text, visibility text);
+INSERT INTO base_tbl VALUES ('Tom', 'public'),
+                            ('Dick', 'private'),
+                            ('Harry', 'public');
+
+CREATE VIEW rw_view1 AS
+  SELECT person FROM base_tbl WHERE visibility = 'public';
+
+CREATE FUNCTION snoop(anyelement)
+RETURNS boolean AS
+$$
+BEGIN
+  RAISE NOTICE 'snooped value: %', $1;
+  RETURN true;
+END;
+$$
+LANGUAGE plpgsql COST 0.000001;
+
+CREATE OR REPLACE FUNCTION leakproof(anyelement)
+RETURNS boolean AS
+$$
+BEGIN
+  RETURN true;
+END;
+$$
+LANGUAGE plpgsql STRICT IMMUTABLE LEAKPROOF;
+
+SELECT * FROM rw_view1 WHERE snoop(person);
+UPDATE rw_view1 SET person=person WHERE snoop(person);
+DELETE FROM rw_view1 WHERE NOT snoop(person);
+
+ALTER VIEW rw_view1 SET (security_barrier = true);
+
+SELECT table_name, is_insertable_into
+  FROM information_schema.tables
+ WHERE table_name = 'rw_view1';
+
+SELECT table_name, is_updatable, is_insertable_into
+  FROM information_schema.views
+ WHERE table_name = 'rw_view1';
+
+SELECT table_name, column_name, is_updatable
+  FROM information_schema.columns
+ WHERE table_name = 'rw_view1'
+ ORDER BY ordinal_position;
+
+SELECT * FROM rw_view1 WHERE snoop(person);
+UPDATE rw_view1 SET person=person WHERE snoop(person);
+DELETE FROM rw_view1 WHERE NOT snoop(person);
+
+EXPLAIN (costs off) SELECT * FROM rw_view1 WHERE snoop(person);
+EXPLAIN (costs off) UPDATE rw_view1 SET person=person WHERE snoop(person);
+EXPLAIN (costs off) DELETE FROM rw_view1 WHERE NOT snoop(person);
+
+-- security barrier view on top of security barrier view
+
+CREATE VIEW rw_view2 WITH (security_barrier = true) AS
+  SELECT * FROM rw_view1 WHERE snoop(person);
+
+SELECT table_name, is_insertable_into
+  FROM information_schema.tables
+ WHERE table_name = 'rw_view2';
+
+SELECT table_name, is_updatable, is_insertable_into
+  FROM information_schema.views
+ WHERE table_name = 'rw_view2';
+
+SELECT table_name, column_name, is_updatable
+  FROM information_schema.columns
+ WHERE table_name = 'rw_view2'
+ ORDER BY ordinal_position;
+
+SELECT * FROM rw_view2 WHERE snoop(person);
+UPDATE rw_view2 SET person=person WHERE snoop(person);
+DELETE FROM rw_view2 WHERE NOT snoop(person);
+
+EXPLAIN (costs off) SELECT * FROM rw_view2 WHERE snoop(person);
+EXPLAIN (costs off) UPDATE rw_view2 SET person=person WHERE snoop(person);
+EXPLAIN (costs off) DELETE FROM rw_view2 WHERE NOT snoop(person);
+
+DROP TABLE base_tbl CASCADE;
+
+-- security barrier view on top of table with rules
+
+CREATE TABLE base_tbl(id int PRIMARY KEY, data text, deleted boolean);
+INSERT INTO base_tbl VALUES (1, 'Row 1', false), (2, 'Row 2', true);
+
+CREATE RULE base_tbl_ins_rule AS ON INSERT TO base_tbl
+  WHERE EXISTS (SELECT 1 FROM base_tbl t WHERE t.id = new.id)
+  DO INSTEAD
+    UPDATE base_tbl SET data = new.data, deleted = false WHERE id = new.id;
+
+CREATE RULE base_tbl_del_rule AS ON DELETE TO base_tbl
+  DO INSTEAD
+    UPDATE base_tbl SET deleted = true WHERE id = old.id;
+
+CREATE VIEW rw_view1 WITH (security_barrier=true) AS
+  SELECT id, data FROM base_tbl WHERE NOT deleted;
+
+SELECT * FROM rw_view1;
+
+EXPLAIN (costs off) DELETE FROM rw_view1 WHERE id = 1 AND snoop(data);
+DELETE FROM rw_view1 WHERE id = 1 AND snoop(data);
+
+EXPLAIN (costs off) INSERT INTO rw_view1 VALUES (2, 'New row 2');
+INSERT INTO rw_view1 VALUES (2, 'New row 2');
+
+SELECT * FROM base_tbl;
+
+DROP TABLE base_tbl CASCADE;
+
+-- security barrier view based on inheiritance set
+CREATE TABLE t1 (a int, b float, c text);
+CREATE INDEX t1_a_idx ON t1(a);
+INSERT INTO t1
+SELECT i,i,'t1' FROM generate_series(1,10) g(i);
+
+CREATE TABLE t11 (d text) INHERITS (t1);
+CREATE INDEX t11_a_idx ON t11(a);
+INSERT INTO t11
+SELECT i,i,'t11','t11d' FROM generate_series(1,10) g(i);
+
+CREATE TABLE t12 (e int[]) INHERITS (t1);
+CREATE INDEX t12_a_idx ON t12(a);
+INSERT INTO t12
+SELECT i,i,'t12','{1,2}'::int[] FROM generate_series(1,10) g(i);
+
+CREATE TABLE t111 () INHERITS (t11, t12);
+CREATE INDEX t111_a_idx ON t111(a);
+INSERT INTO t111
+SELECT i,i,'t111','t111d','{1,1,1}'::int[] FROM generate_series(1,10) g(i);
+
+CREATE VIEW v1 WITH (security_barrier=true) AS
+SELECT *, (SELECT d FROM t11 WHERE t11.a = t1.a LIMIT 1) AS d
+FROM t1
+WHERE a > 5 AND EXISTS(SELECT 1 FROM t12 WHERE t12.a = t1.a);
+
+SELECT * FROM v1 WHERE a=3; -- should not see anything
+SELECT * FROM v1 WHERE a=8;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+UPDATE v1 SET a=100 WHERE snoop(a) AND leakproof(a) AND a = 3;
+UPDATE v1 SET a=100 WHERE snoop(a) AND leakproof(a) AND a = 3;
+
+SELECT * FROM v1 WHERE a=100; -- Nothing should have been changed to 100
+SELECT * FROM t1 WHERE a=100; -- Nothing should have been changed to 100
+
+EXPLAIN (VERBOSE, COSTS OFF)
+UPDATE v1 SET a=a+1 WHERE snoop(a) AND leakproof(a) AND a = 8;
+UPDATE v1 SET a=a+1 WHERE snoop(a) AND leakproof(a) AND a = 8;
+
+SELECT * FROM v1 WHERE b=8;
+
+DELETE FROM v1 WHERE snoop(a) AND leakproof(a); -- should not delete everything, just where a>5
+
+TABLE t1; -- verify all a<=5 are intact
+
+DROP TABLE t1, t11, t12, t111 CASCADE;
+DROP FUNCTION snoop(anyelement);
+DROP FUNCTION leakproof(anyelement);