]> granicus.if.org Git - postgresql/commitdiff
Add a security_barrier option for views.
authorRobert Haas <rhaas@postgresql.org>
Thu, 22 Dec 2011 21:15:57 +0000 (16:15 -0500)
committerRobert Haas <rhaas@postgresql.org>
Thu, 22 Dec 2011 21:16:31 +0000 (16:16 -0500)
When a view is marked as a security barrier, it will not be pulled up
into the containing query, and no quals will be pushed down into it,
so that no function or operator chosen by the user can be applied to
rows not exposed by the view.  Views not configured with this
option cannot provide robust row-level security, but will perform far
better.

Patch by KaiGai Kohei; original problem report by Heikki Linnakangas
(in October 2009!).  Review (in earlier versions) by Noah Misch and
others.  Design advice by Tom Lane and myself.  Further review and
cleanup by me.

22 files changed:
doc/src/sgml/ref/alter_view.sgml
doc/src/sgml/ref/create_view.sgml
doc/src/sgml/rules.sgml
src/backend/access/common/reloptions.c
src/backend/commands/tablecmds.c
src/backend/commands/view.c
src/backend/nodes/copyfuncs.c
src/backend/nodes/equalfuncs.c
src/backend/nodes/outfuncs.c
src/backend/nodes/readfuncs.c
src/backend/optimizer/path/allpaths.c
src/backend/optimizer/prep/prepjointree.c
src/backend/parser/gram.y
src/backend/rewrite/rewriteHandler.c
src/backend/utils/adt/selfuncs.c
src/backend/utils/cache/relcache.c
src/bin/pg_dump/pg_dump.c
src/include/access/reloptions.h
src/include/nodes/parsenodes.h
src/include/utils/rel.h
src/test/regress/expected/create_view.out
src/test/regress/sql/create_view.sql

index c3839057c0705b67447083a9ac03435f59664e32..e78176bce807438ff3db878afdbcf13c500923f8 100644 (file)
@@ -26,6 +26,8 @@ ALTER VIEW <replaceable class="parameter">name</replaceable> ALTER [ COLUMN ] <r
 ALTER VIEW <replaceable class="parameter">name</replaceable> OWNER TO <replaceable class="PARAMETER">new_owner</replaceable>
 ALTER VIEW <replaceable class="parameter">name</replaceable> RENAME TO <replaceable class="parameter">new_name</replaceable>
 ALTER VIEW <replaceable class="parameter">name</replaceable> SET SCHEMA <replaceable class="parameter">new_schema</replaceable>
+ALTER VIEW <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">view_option_name</replaceable> [= <replaceable class="parameter">view_option_value</replaceable>] [, ... ] )
+ALTER VIEW <replaceable class="parameter">name</replaceable> RESET ( <replaceable class="parameter">view_option_name</replaceable> [, ... ] )
 </synopsis>
  </refsynopsisdiv>
 
@@ -102,6 +104,24 @@ ALTER VIEW <replaceable class="parameter">name</replaceable> SET SCHEMA <replace
      </para>
     </listitem>
    </varlistentry>
+
+   <varlistentry>
+    <term><replaceable class="parameter">view_option_name</replaceable></term>
+    <listitem>
+     <para>
+      The name of a view option to be set or reset.
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><replaceable class="parameter">view_option_name</replaceable></term>
+    <listitem>
+     <para>
+      The new value for a view option.
+     </para>
+    </listitem>
+   </varlistentry>
   </variablelist>
  </refsect1>
 
index 417f8c38e7eebe0a80f809ff53070b09fcff9086..6e868408662485bddb1cfb880349fa04c367be52 100644 (file)
@@ -22,6 +22,7 @@ PostgreSQL documentation
  <refsynopsisdiv>
 <synopsis>
 CREATE [ OR REPLACE ] [ TEMP | TEMPORARY ] VIEW <replaceable class="PARAMETER">name</replaceable> [ ( <replaceable class="PARAMETER">column_name</replaceable> [, ...] ) ]
+    [ WITH ( <replaceable class="PARAMETER">view_option_name</replaceable> [= <replaceable class="PARAMETER">view_option_value</replaceable>] [, ... ] ) ]
     AS <replaceable class="PARAMETER">query</replaceable>
 </synopsis>
  </refsynopsisdiv>
@@ -98,6 +99,18 @@ CREATE [ OR REPLACE ] [ TEMP | TEMPORARY ] VIEW <replaceable class="PARAMETER">n
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>WITH ( <replaceable class="PARAMETER">view_option_name</replaceable> [= <replaceable class="PARAMETER">view_option_value</replaceable>] [, ... ] )</literal></term>
+    <listitem>
+     <para>
+      This clause specifies optional parameters for a view; currently, the
+      only suppored parameter name is <literal>security_barrier</literal>,
+      which should be enabled when a view is intended to provide row-level
+      security.  See <xref linkend="rules-privileges"> for full details.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><replaceable class="parameter">query</replaceable></term>
     <listitem>
index 1b06519b380154ad615bc2dda3ba2eacb3873a55..896cc64ecd2f1d07761bb9f2b7888d50738f6d0b 100644 (file)
@@ -1822,8 +1822,9 @@ GRANT SELECT ON phone_number TO secretary;
 <para>
     Note that while views can be used to hide the contents of certain
     columns using the technique shown above, they cannot be used to reliably
-    conceal the data in unseen rows.  For example, the following view is
-    insecure:
+    conceal the data in unseen rows unless the
+    <literal>security_barrier</literal> flag has been set.  For example,
+    the following view is insecure:
 <programlisting>
 CREATE VIEW phone_number AS
     SELECT person, phone FROM phone_data WHERE phone NOT LIKE '412%';
@@ -1870,6 +1871,40 @@ SELECT * FROM phone_number WHERE tricky(person, phone);
     which references <literal>shoelace_log</> is an unqualified
     <literal>INSERT</>.  This might not be true in more complex scenarios.
 </para>
+
+<para>
+    When it is necessary for a view to provide row-level security, the
+    <literal>security_barrier</literal> attribute should be applied to
+    the view.  This prevents maliciously-chosen functions and operators from
+    being invoked on rows until afterthe view has done its work.  For
+    example, if the view shown above had been created like this, it would
+    be secure:
+<programlisting>
+CREATE VIEW phone_number WITH (security_barrier) AS
+    SELECT person, phone FROM phone_data WHERE phone NOT LIKE '412%';
+</programlisting>
+    Views created with the <literal>security_barrier</literal> may perform
+    far worse than views created without this option.  In general, there is
+    no way to avoid this: the fastest possible plan must be rejected
+    if it may compromise security.  For this reason, this option is not
+    enabled by default.
+</para>
+
+<para>
+    It is important to understand that even a view created with the
+    <literal>security_barrier</literal> option is intended to be secure only
+    in the limited sense that the contents of the invisible tuples will not
+    passed to possibly-insecure functions.  The user may well have other means
+    of making inferences about the unseen data; for example, they can see the
+    query plan using <command>EXPLAIN</command>, or measure the runtime of
+    queries against the view.  A malicious attacker might be able to infer
+    something about the amount of unseen data, or even gain some information
+    about the data distribution or most common values (since these things may
+    affect the runtime of the plan; or even, since they are also reflected in
+    the optimizer statistics, the choice of plan).  If these types of "covert
+    channel" attacks are of concern, it is probably unwise to grant any access
+    to the data at all.
+</para>
 </sect1>
 
 <sect1 id="rules-status">
index 100172fa4ac37c92ec30acb8e26170656ab3a87e..7220e0e0ce96af5c1411dbe79e9e6e018adcaf4e 100644 (file)
@@ -67,6 +67,14 @@ static relopt_bool boolRelOpts[] =
                },
                true
        },
+       {
+               {
+                       "security_barrier",
+                       "View acts as a row security barrier",
+                       RELOPT_KIND_VIEW
+               },
+               false
+       },
        /* list terminator */
        {{NULL}}
 };
@@ -781,6 +789,7 @@ extractRelOptions(HeapTuple tuple, TupleDesc tupdesc, Oid amoptions)
        {
                case RELKIND_RELATION:
                case RELKIND_TOASTVALUE:
+               case RELKIND_VIEW:
                case RELKIND_UNCATALOGED:
                        options = heap_reloptions(classForm->relkind, datum, false);
                        break;
@@ -1139,7 +1148,9 @@ default_reloptions(Datum reloptions, bool validate, relopt_kind kind)
                {"autovacuum_vacuum_scale_factor", RELOPT_TYPE_REAL,
                offsetof(StdRdOptions, autovacuum) +offsetof(AutoVacOpts, vacuum_scale_factor)},
                {"autovacuum_analyze_scale_factor", RELOPT_TYPE_REAL,
-               offsetof(StdRdOptions, autovacuum) +offsetof(AutoVacOpts, analyze_scale_factor)}
+               offsetof(StdRdOptions, autovacuum) +offsetof(AutoVacOpts, analyze_scale_factor)},
+               {"security_barrier", RELOPT_TYPE_BOOL,
+               offsetof(StdRdOptions, security_barrier)},
        };
 
        options = parseRelOptions(reloptions, validate, kind, &numoptions);
@@ -1159,7 +1170,7 @@ default_reloptions(Datum reloptions, bool validate, relopt_kind kind)
 }
 
 /*
- * Parse options for heaps and toast tables.
+ * Parse options for heaps, views and toast tables.
  */
 bytea *
 heap_reloptions(char relkind, Datum reloptions, bool validate)
@@ -1181,6 +1192,8 @@ heap_reloptions(char relkind, Datum reloptions, bool validate)
                        return (bytea *) rdopts;
                case RELKIND_RELATION:
                        return default_reloptions(reloptions, validate, RELOPT_KIND_HEAP);
+               case RELKIND_VIEW:
+                       return default_reloptions(reloptions, validate, RELOPT_KIND_VIEW);
                default:
                        /* other relkinds are not supported */
                        return NULL;
index 8473c9e7f4452d274016bbb3c8962bbd1a275e19..4953def029b67311870704cc7b6d8d929e338c28 100644 (file)
@@ -366,7 +366,9 @@ static void ATExecDropCluster(Relation rel, LOCKMODE lockmode);
 static void ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel,
                                        char *tablespacename, LOCKMODE lockmode);
 static void ATExecSetTableSpace(Oid tableOid, Oid newTableSpace, LOCKMODE lockmode);
-static void ATExecSetRelOptions(Relation rel, List *defList, bool isReset, LOCKMODE lockmode);
+static void ATExecSetRelOptions(Relation rel, List *defList,
+                                                               AlterTableType operation,
+                                                               LOCKMODE lockmode);
 static void ATExecEnableDisableTrigger(Relation rel, char *trigname,
                                           char fires_when, bool skip_system, LOCKMODE lockmode);
 static void ATExecEnableDisableRule(Relation rel, char *rulename,
@@ -2866,6 +2868,7 @@ AlterTableGetLockLevel(List *cmds)
                        case AT_DropCluster:
                        case AT_SetRelOptions:
                        case AT_ResetRelOptions:
+                       case AT_ReplaceRelOptions:
                        case AT_SetOptions:
                        case AT_ResetOptions:
                        case AT_SetStorage:
@@ -3094,8 +3097,9 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
                        pass = AT_PASS_MISC;    /* doesn't actually matter */
                        break;
                case AT_SetRelOptions:  /* SET (...) */
-               case AT_ResetRelOptions:                /* RESET (...) */
-                       ATSimplePermissions(rel, ATT_TABLE | ATT_INDEX);
+               case AT_ResetRelOptions:        /* RESET (...) */
+               case AT_ReplaceRelOptions:      /* reset them all, then set just these */
+                       ATSimplePermissions(rel, ATT_TABLE | ATT_INDEX | ATT_VIEW);
                        /* This command never recurses */
                        /* No command-specific prep needed */
                        pass = AT_PASS_MISC;
@@ -3338,12 +3342,10 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
                         */
                        break;
                case AT_SetRelOptions:  /* SET (...) */
-                       ATExecSetRelOptions(rel, (List *) cmd->def, false, lockmode);
-                       break;
                case AT_ResetRelOptions:                /* RESET (...) */
-                       ATExecSetRelOptions(rel, (List *) cmd->def, true, lockmode);
+               case AT_ReplaceRelOptions:              /* replace entire option list */
+                       ATExecSetRelOptions(rel, (List *) cmd->def, cmd->subtype, lockmode);
                        break;
-
                case AT_EnableTrig:             /* ENABLE TRIGGER name */
                        ATExecEnableDisableTrigger(rel, cmd->name,
                                                                   TRIGGER_FIRES_ON_ORIGIN, false, lockmode);
@@ -8271,10 +8273,11 @@ ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel, char *tablespacename, L
 }
 
 /*
- * ALTER TABLE/INDEX SET (...) or RESET (...)
+ * Set, reset, or replace reloptions.
  */
 static void
-ATExecSetRelOptions(Relation rel, List *defList, bool isReset, LOCKMODE lockmode)
+ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation,
+                                       LOCKMODE lockmode)
 {
        Oid                     relid;
        Relation        pgclass;
@@ -8288,28 +8291,44 @@ ATExecSetRelOptions(Relation rel, List *defList, bool isReset, LOCKMODE lockmode
        bool            repl_repl[Natts_pg_class];
        static char *validnsps[] = HEAP_RELOPT_NAMESPACES;
 
-       if (defList == NIL)
+       if (defList == NIL && operation != AT_ReplaceRelOptions)
                return;                                 /* nothing to do */
 
        pgclass = heap_open(RelationRelationId, RowExclusiveLock);
 
-       /* Get the old reloptions */
+       /* Fetch heap tuple */
        relid = RelationGetRelid(rel);
        tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "cache lookup failed for relation %u", relid);
 
-       datum = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions, &isnull);
+       if (operation == AT_ReplaceRelOptions)
+       {
+               /*
+                * If we're supposed to replace the reloptions list, we just pretend
+                * there were none before.
+                */
+               datum = (Datum) 0;
+               isnull = true;
+       }
+       else
+       {
+               /* Get the old reloptions */
+               datum = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
+                                                               &isnull);
+       }
 
        /* Generate new proposed reloptions (text array) */
        newOptions = transformRelOptions(isnull ? (Datum) 0 : datum,
-                                                                  defList, NULL, validnsps, false, isReset);
+                                                                  defList, NULL, validnsps, false,
+                                                                  operation == AT_ResetRelOptions);
 
        /* Validate */
        switch (rel->rd_rel->relkind)
        {
                case RELKIND_RELATION:
                case RELKIND_TOASTVALUE:
+               case RELKIND_VIEW:
                        (void) heap_reloptions(rel->rd_rel->relkind, newOptions, true);
                        break;
                case RELKIND_INDEX:
@@ -8357,15 +8376,30 @@ ATExecSetRelOptions(Relation rel, List *defList, bool isReset, LOCKMODE lockmode
 
                toastrel = heap_open(toastid, lockmode);
 
-               /* Get the old reloptions */
+               /* Fetch heap tuple */
                tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(toastid));
                if (!HeapTupleIsValid(tuple))
                        elog(ERROR, "cache lookup failed for relation %u", toastid);
 
-               datum = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions, &isnull);
+               if (operation == AT_ReplaceRelOptions)
+               {
+                       /*
+                        * If we're supposed to replace the reloptions list, we just
+                        * pretend there were none before.
+                        */
+                       datum = (Datum) 0;
+                       isnull = true;
+               }
+               else
+               {
+                       /* Get the old reloptions */
+                       datum = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
+                                                                       &isnull);
+               }
 
                newOptions = transformRelOptions(isnull ? (Datum) 0 : datum,
-                                                               defList, "toast", validnsps, false, isReset);
+                                                               defList, "toast", validnsps, false,
+                                                               operation == AT_ResetRelOptions);
 
                (void) heap_reloptions(RELKIND_TOASTVALUE, newOptions, true);
 
index b23819965869a58af27ac5633cb20af1664228bc..3edcd33eaf4c1a26fa034b68e94ae2b480b5b5a1 100644 (file)
@@ -32,6 +32,7 @@
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/rel.h"
+#include "utils/syscache.h"
 
 
 static void checkViewTupleDesc(TupleDesc newdesc, TupleDesc olddesc);
@@ -98,7 +99,7 @@ isViewOnTempTable_walker(Node *node, void *context)
  */
 static Oid
 DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace,
-                                         Oid namespaceId)
+                                         Oid namespaceId, List *options)
 {
        Oid                     viewOid;
        CreateStmt *createStmt = makeNode(CreateStmt);
@@ -166,6 +167,8 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace,
        {
                Relation        rel;
                TupleDesc       descriptor;
+               List       *atcmds = NIL;
+               AlterTableCmd *atcmd;
 
                /*
                 * Yes.  Get exclusive lock on the existing view ...
@@ -203,6 +206,15 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace,
                descriptor = BuildDescForRelation(attrList);
                checkViewTupleDesc(descriptor, rel->rd_att);
 
+               /*
+                * The new options list replaces the existing options list, even
+                * if it's empty.
+                */
+               atcmd = makeNode(AlterTableCmd);
+               atcmd->subtype = AT_ReplaceRelOptions;
+               atcmd->def = (Node *) options;
+               atcmds = lappend(atcmds, atcmd);
+
                /*
                 * If new attributes have been added, we must add pg_attribute entries
                 * for them.  It is convenient (although overkill) to use the ALTER
@@ -210,14 +222,11 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace,
                 */
                if (list_length(attrList) > rel->rd_att->natts)
                {
-                       List       *atcmds = NIL;
                        ListCell   *c;
                        int                     skip = rel->rd_att->natts;
 
                        foreach(c, attrList)
                        {
-                               AlterTableCmd *atcmd;
-
                                if (skip > 0)
                                {
                                        skip--;
@@ -228,9 +237,11 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace,
                                atcmd->def = (Node *) lfirst(c);
                                atcmds = lappend(atcmds, atcmd);
                        }
-                       AlterTableInternal(viewOid, atcmds, true);
                }
 
+               /* OK, let's do it. */
+               AlterTableInternal(viewOid, atcmds, true);
+
                /*
                 * Seems okay, so return the OID of the pre-existing view.
                 */
@@ -250,7 +261,8 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace,
                createStmt->tableElts = attrList;
                createStmt->inhRelations = NIL;
                createStmt->constraints = NIL;
-               createStmt->options = list_make1(defWithOids(false));
+               createStmt->options = options;
+               createStmt->options = lappend(options, defWithOids(false));
                createStmt->oncommit = ONCOMMIT_NOOP;
                createStmt->tablespacename = NULL;
                createStmt->if_not_exists = false;
@@ -513,7 +525,7 @@ DefineView(ViewStmt *stmt, const char *queryString)
         * aborted.
         */
        viewOid = DefineVirtualRelation(view, viewParse->targetList,
-                                                                       stmt->replace, namespaceId);
+                                                                       stmt->replace, namespaceId, stmt->options);
 
        /*
         * The relation we have just created is not visible to any other commands
index 7178b529a2e1820b8e8388a82751b3af2b8b725a..dd2dd25ed1b7b8ed5c2e93df4a85b55245a59120 100644 (file)
@@ -1965,6 +1965,7 @@ _copyRangeTblEntry(const RangeTblEntry *from)
        COPY_SCALAR_FIELD(relid);
        COPY_SCALAR_FIELD(relkind);
        COPY_NODE_FIELD(subquery);
+       COPY_SCALAR_FIELD(security_barrier);
        COPY_SCALAR_FIELD(jointype);
        COPY_NODE_FIELD(joinaliasvars);
        COPY_NODE_FIELD(funcexpr);
index 9f7daf4eda2f206afde7896b732327da783ff6aa..c2fdb2ba365cb143a6bb5413b3961e9095d8764f 100644 (file)
@@ -2228,6 +2228,7 @@ _equalRangeTblEntry(const RangeTblEntry *a, const RangeTblEntry *b)
        COMPARE_SCALAR_FIELD(relid);
        COMPARE_SCALAR_FIELD(relkind);
        COMPARE_NODE_FIELD(subquery);
+       COMPARE_SCALAR_FIELD(security_barrier);
        COMPARE_SCALAR_FIELD(jointype);
        COMPARE_NODE_FIELD(joinaliasvars);
        COMPARE_NODE_FIELD(funcexpr);
index bef1e78f5a2d1e82a4bdd6cb905d60bcb0fa676e..cdc2cab55c61127a9b14afaa392f9bf372c94697 100644 (file)
@@ -2321,6 +2321,7 @@ _outRangeTblEntry(StringInfo str, const RangeTblEntry *node)
                        break;
                case RTE_SUBQUERY:
                        WRITE_NODE_FIELD(subquery);
+                       WRITE_BOOL_FIELD(security_barrier);
                        break;
                case RTE_JOIN:
                        WRITE_ENUM_FIELD(jointype, JoinType);
index 3de20ad93a96a5749194791c700517907f8816d6..fa9ad164234dfa044c83e54f9fea65c1a71dba43 100644 (file)
@@ -1192,6 +1192,7 @@ _readRangeTblEntry(void)
                        break;
                case RTE_SUBQUERY:
                        READ_NODE_FIELD(subquery);
+                       READ_BOOL_FIELD(security_barrier);
                        break;
                case RTE_JOIN:
                        READ_ENUM_FIELD(jointype, JoinType);
index 815b996a131d82baa419a2228ac1d7fb51f075d1..b3777eae2e40407f60a6133486259399ac15ac53 100644 (file)
@@ -744,6 +744,11 @@ set_subquery_pathlist(PlannerInfo *root, RelOptInfo *rel,
         * pseudoconstant clauses; better to have the gating node above the
         * subquery.
         *
+        * Also, if the sub-query has "security_barrier" flag, it means the
+        * sub-query originated from a view that must enforce row-level security.
+        * We must not push down quals in order to avoid information leaks, either
+        * via side-effects or error output.
+        *
         * Non-pushed-down clauses will get evaluated as qpquals of the
         * SubqueryScan node.
         *
@@ -762,7 +767,16 @@ set_subquery_pathlist(PlannerInfo *root, RelOptInfo *rel,
                        RestrictInfo *rinfo = (RestrictInfo *) lfirst(l);
                        Node       *clause = (Node *) rinfo->clause;
 
+                       /*
+                        * XXX.  You might wonder why we're testing rte->security_barrier
+                        * qual-by-qual here rather than hoisting the test up into the
+                        * surrounding if statement; after all, the answer will be the
+                        * same for all quals.  The answer is that we expect to shortly
+                        * change this logic to allow pushing down some quals that use only
+                        * "leakproof" operators even through a security barrier.
+                        */
                        if (!rinfo->pseudoconstant &&
+                               !rte->security_barrier &&
                                qual_is_pushdown_safe(subquery, rti, clause, differentTypes))
                        {
                                /* Push it down */
index 8bb011b711658d32e9d02c9e2f4d75fc40eb4670..a711c4f646b5a1621d12a43915f9d56bc5db6838 100644 (file)
@@ -543,6 +543,7 @@ pull_up_subqueries(PlannerInfo *root, Node *jtnode,
                 */
                if (rte->rtekind == RTE_SUBQUERY &&
                        is_simple_subquery(rte->subquery) &&
+                       !rte->security_barrier &&
                        (containing_appendrel == NULL ||
                         is_safe_append_member(rte->subquery)))
                        return pull_up_simple_subquery(root, jtnode, rte,
@@ -712,6 +713,7 @@ pull_up_simple_subquery(PlannerInfo *root, Node *jtnode, RangeTblEntry *rte,
         * pull_up_subqueries.
         */
        if (is_simple_subquery(subquery) &&
+               !rte->security_barrier &&
                (containing_appendrel == NULL || is_safe_append_member(subquery)))
        {
                /* good to go */
index 7e8f39abdf7085dfe434adb93acce4cb11b2df25..8943c5b7d042b3d18586bf22941d2e76042b455f 100644 (file)
@@ -7322,26 +7322,28 @@ transaction_mode_list_or_empty:
  *
  *****************************************************************************/
 
-ViewStmt: CREATE OptTemp VIEW qualified_name opt_column_list
+ViewStmt: CREATE OptTemp VIEW qualified_name opt_column_list opt_reloptions
                                AS SelectStmt opt_check_option
                                {
                                        ViewStmt *n = makeNode(ViewStmt);
                                        n->view = $4;
                                        n->view->relpersistence = $2;
                                        n->aliases = $5;
-                                       n->query = $7;
+                                       n->query = $8;
                                        n->replace = false;
+                                       n->options = $6;
                                        $$ = (Node *) n;
                                }
-               | CREATE OR REPLACE OptTemp VIEW qualified_name opt_column_list
+               | CREATE OR REPLACE OptTemp VIEW qualified_name opt_column_list opt_reloptions
                                AS SelectStmt opt_check_option
                                {
                                        ViewStmt *n = makeNode(ViewStmt);
                                        n->view = $6;
                                        n->view->relpersistence = $4;
                                        n->aliases = $7;
-                                       n->query = $9;
+                                       n->query = $10;
                                        n->replace = true;
+                                       n->options = $8;
                                        $$ = (Node *) n;
                                }
                ;
index a6f4141dd7b677785e8cd45af0aeee7a70a657f7..8448226bfc84c29fbe136a9d91aed2003ee0c36c 100644 (file)
@@ -1382,6 +1382,7 @@ ApplyRetrieveRule(Query *parsetree,
 
        rte->rtekind = RTE_SUBQUERY;
        rte->relid = InvalidOid;
+       rte->security_barrier = RelationIsSecurityView(relation);
        rte->subquery = rule_action;
        rte->inh = false;                       /* must not be set for a subquery */
 
index abef8abc3826ebbad00d7ed06dbabdfbdc0bdcd3..bb411f9ad1d08fcc9a040d2919ba7cedfc692182 100644 (file)
@@ -4372,6 +4372,19 @@ examine_simple_variable(PlannerInfo *root, Var *var,
                        subquery->distinctClause)
                        return;
 
+               /*
+                * If the sub-query originated from a view with the security_barrier
+                * attribute, we treat it as a black-box from outside of the view.
+                * This is probably a harsher restriction than necessary; it's
+                * certainly OK for the selectivity estimator (which is a C function,
+                * and therefore omnipotent anyway) to look at the statistics.  But
+                * many selectivity estimators will happily *invoke the operator
+                * function* to try to work out a good estimate - and that's not OK.
+                * So for now, we do this.
+                */
+               if (rte->security_barrier)
+                       return;
+
                /*
                 * OK, fetch RelOptInfo for subquery.  Note that we don't change the
                 * rel returned in vardata, since caller expects it to be a rel of the
index f9ad75e7f8901261fa9b1d988ba3c57dc81dd1e5..cfb48d3ec4c32687e147bbe3b158da22f41fa160 100644 (file)
@@ -374,6 +374,7 @@ RelationParseRelOptions(Relation relation, HeapTuple tuple)
                case RELKIND_RELATION:
                case RELKIND_TOASTVALUE:
                case RELKIND_INDEX:
+               case RELKIND_VIEW:
                        break;
                default:
                        return;
index 5deb9d658d8ece96f069846e06835afaeb38b14a..89a8a23599fe3193151abdf4e6081a86cf34c9c3 100644 (file)
@@ -12397,8 +12397,10 @@ dumpTableSchema(Archive *fout, TableInfo *tbinfo)
                if (binary_upgrade)
                        binary_upgrade_set_pg_class_oids(q, tbinfo->dobj.catId.oid, false);
 
-               appendPQExpBuffer(q, "CREATE VIEW %s AS\n    %s\n",
-                                                 fmtId(tbinfo->dobj.name), viewdef);
+               appendPQExpBuffer(q, "CREATE VIEW %s", fmtId(tbinfo->dobj.name));
+               if (tbinfo->reloptions && strlen(tbinfo->reloptions) > 0)
+                       appendPQExpBuffer(q, " WITH (%s)", tbinfo->reloptions);
+               appendPQExpBuffer(q, " AS\n    %s\n", viewdef);
 
                appendPQExpBuffer(labelq, "VIEW %s",
                                                  fmtId(tbinfo->dobj.name));
index 10b2f9ea4db0917ab75e80b2f456f3113ef9a526..afba016195a1f48b3fda2a502591d8d3edf39dcb 100644 (file)
@@ -43,8 +43,9 @@ typedef enum relopt_kind
        RELOPT_KIND_ATTRIBUTE = (1 << 6),
        RELOPT_KIND_TABLESPACE = (1 << 7),
        RELOPT_KIND_SPGIST = (1 << 8),
+       RELOPT_KIND_VIEW = (1 << 9),
        /* if you add a new kind, make sure you update "last_default" too */
-       RELOPT_KIND_LAST_DEFAULT = RELOPT_KIND_SPGIST,
+       RELOPT_KIND_LAST_DEFAULT = RELOPT_KIND_VIEW,
        /* some compilers treat enums as signed ints, so we can't use 1 << 31 */
        RELOPT_KIND_MAX = (1 << 30)
 } relopt_kind;
index 734227366d18f36ef14a2674f5551b8f82e7043f..6e8b11027d495e03932ac6fc125414d710c7cc23 100644 (file)
@@ -706,6 +706,7 @@ typedef struct RangeTblEntry
         * Fields valid for a subquery RTE (else NULL):
         */
        Query      *subquery;           /* the sub-query */
+       bool            security_barrier;       /* subquery from security_barrier view */
 
        /*
         * Fields valid for a join RTE (else NULL/zero):
@@ -1208,6 +1209,7 @@ typedef enum AlterTableType
        AT_SetTableSpace,                       /* SET TABLESPACE */
        AT_SetRelOptions,                       /* SET (...) -- AM specific parameters */
        AT_ResetRelOptions,                     /* RESET (...) -- AM specific parameters */
+       AT_ReplaceRelOptions,           /* replace reloption list in its entirety */
        AT_EnableTrig,                          /* ENABLE TRIGGER name */
        AT_EnableAlwaysTrig,            /* ENABLE ALWAYS TRIGGER name */
        AT_EnableReplicaTrig,           /* ENABLE REPLICA TRIGGER name */
@@ -2277,6 +2279,7 @@ typedef struct ViewStmt
        List       *aliases;            /* target column names */
        Node       *query;                      /* the SELECT query */
        bool            replace;                /* replace an existing view? */
+       List       *options;            /* options from WITH clause */
 } ViewStmt;
 
 /* ----------------------
index 70d16eb01e42c1b8e4decc4e8cc2028f8e55a31c..ed9c003857123da36f1f715b26c58ce438da3c6e 100644 (file)
@@ -195,6 +195,7 @@ typedef struct StdRdOptions
        int32           vl_len_;                /* varlena header (do not touch directly!) */
        int                     fillfactor;             /* page fill factor in percent (0..100) */
        AutoVacOpts autovacuum;         /* autovacuum-related options */
+       bool            security_barrier;       /* for views */
 } StdRdOptions;
 
 #define HEAP_MIN_FILLFACTOR                    10
@@ -222,6 +223,14 @@ typedef struct StdRdOptions
 #define RelationGetTargetPageFreeSpace(relation, defaultff) \
        (BLCKSZ * (100 - RelationGetFillFactor(relation, defaultff)) / 100)
 
+/*
+ * RelationIsSecurityView
+ *             Returns whether the relation is security view, or not
+ */
+#define RelationIsSecurityView(relation)       \
+       ((relation)->rd_options ?                               \
+        ((StdRdOptions *) (relation)->rd_options)->security_barrier : false)
+
 /*
  * RelationIsValid
  *             True iff relation descriptor is valid.
index f9490a3a5506268a8e462f165a2c0530d725c4c1..cc93854c423084e1e0a1afdc6d9d67f4c22235e8 100644 (file)
@@ -239,6 +239,55 @@ And relnamespace IN (SELECT OID FROM pg_namespace WHERE nspname LIKE 'pg_temp%')
      1
 (1 row)
 
+--
+-- CREATE VIEW and WITH(...) clause
+--
+CREATE VIEW mysecview1
+       AS SELECT * FROM tbl1 WHERE a = 0;
+CREATE VIEW mysecview2 WITH (security_barrier=true)
+       AS SELECT * FROM tbl1 WHERE a > 0;
+CREATE VIEW mysecview3 WITH (security_barrier=false)
+       AS SELECT * FROM tbl1 WHERE a < 0;
+CREATE VIEW mysecview4 WITH (security_barrier)
+       AS SELECT * FROM tbl1 WHERE a <> 0;
+CREATE VIEW mysecview5 WITH (security_barrier=100)     -- Error
+       AS SELECT * FROM tbl1 WHERE a > 100;
+ERROR:  invalid value for boolean option "security_barrier": 100
+CREATE VIEW mysecview6 WITH (invalid_option)           -- Error
+       AS SELECT * FROM tbl1 WHERE a < 100;
+ERROR:  unrecognized parameter "invalid_option"
+SELECT relname, relkind, reloptions FROM pg_class
+       WHERE oid in ('mysecview1'::regclass, 'mysecview2'::regclass,
+                     'mysecview3'::regclass, 'mysecview4'::regclass)
+       ORDER BY relname;
+  relname   | relkind |        reloptions        
+------------+---------+--------------------------
+ mysecview1 | v       | 
+ mysecview2 | v       | {security_barrier=true}
+ mysecview3 | v       | {security_barrier=false}
+ mysecview4 | v       | {security_barrier=true}
+(4 rows)
+
+CREATE OR REPLACE VIEW mysecview1
+       AS SELECT * FROM tbl1 WHERE a = 256;
+CREATE OR REPLACE VIEW mysecview2
+       AS SELECT * FROM tbl1 WHERE a > 256;
+CREATE OR REPLACE VIEW mysecview3 WITH (security_barrier=true)
+       AS SELECT * FROM tbl1 WHERE a < 256;
+CREATE OR REPLACE VIEW mysecview4 WITH (security_barrier=false)
+       AS SELECT * FROM tbl1 WHERE a <> 256;
+SELECT relname, relkind, reloptions FROM pg_class
+       WHERE oid in ('mysecview1'::regclass, 'mysecview2'::regclass,
+                     'mysecview3'::regclass, 'mysecview4'::regclass)
+       ORDER BY relname;
+  relname   | relkind |        reloptions        
+------------+---------+--------------------------
+ mysecview1 | v       | 
+ mysecview2 | v       | 
+ mysecview3 | v       | {security_barrier=true}
+ mysecview4 | v       | {security_barrier=false}
+(4 rows)
+
 DROP SCHEMA temp_view_test CASCADE;
 NOTICE:  drop cascades to 22 other objects
 DETAIL:  drop cascades to table temp_view_test.base_table
@@ -264,7 +313,7 @@ drop cascades to view temp_view_test.v8
 drop cascades to sequence temp_view_test.seq1
 drop cascades to view temp_view_test.v9
 DROP SCHEMA testviewschm2 CASCADE;
-NOTICE:  drop cascades to 16 other objects
+NOTICE:  drop cascades to 20 other objects
 DETAIL:  drop cascades to table t1
 drop cascades to view temporal1
 drop cascades to view temporal2
@@ -281,4 +330,8 @@ drop cascades to table tbl3
 drop cascades to table tbl4
 drop cascades to view mytempview
 drop cascades to view pubview
+drop cascades to view mysecview1
+drop cascades to view mysecview2
+drop cascades to view mysecview3
+drop cascades to view mysecview4
 SET search_path to public;
index 86cfc5162c6959146f788db6b1bd1af636a4c418..48d8d22d1b8e4e997f1bd236b75a2b7773a6ad7f 100644 (file)
@@ -191,6 +191,39 @@ AND NOT EXISTS (SELECT g FROM tbl4 LEFT JOIN tmptbl ON tbl4.h = tmptbl.j);
 SELECT count(*) FROM pg_class where relname LIKE 'mytempview'
 And relnamespace IN (SELECT OID FROM pg_namespace WHERE nspname LIKE 'pg_temp%');
 
+--
+-- CREATE VIEW and WITH(...) clause
+--
+CREATE VIEW mysecview1
+       AS SELECT * FROM tbl1 WHERE a = 0;
+CREATE VIEW mysecview2 WITH (security_barrier=true)
+       AS SELECT * FROM tbl1 WHERE a > 0;
+CREATE VIEW mysecview3 WITH (security_barrier=false)
+       AS SELECT * FROM tbl1 WHERE a < 0;
+CREATE VIEW mysecview4 WITH (security_barrier)
+       AS SELECT * FROM tbl1 WHERE a <> 0;
+CREATE VIEW mysecview5 WITH (security_barrier=100)     -- Error
+       AS SELECT * FROM tbl1 WHERE a > 100;
+CREATE VIEW mysecview6 WITH (invalid_option)           -- Error
+       AS SELECT * FROM tbl1 WHERE a < 100;
+SELECT relname, relkind, reloptions FROM pg_class
+       WHERE oid in ('mysecview1'::regclass, 'mysecview2'::regclass,
+                     'mysecview3'::regclass, 'mysecview4'::regclass)
+       ORDER BY relname;
+
+CREATE OR REPLACE VIEW mysecview1
+       AS SELECT * FROM tbl1 WHERE a = 256;
+CREATE OR REPLACE VIEW mysecview2
+       AS SELECT * FROM tbl1 WHERE a > 256;
+CREATE OR REPLACE VIEW mysecview3 WITH (security_barrier=true)
+       AS SELECT * FROM tbl1 WHERE a < 256;
+CREATE OR REPLACE VIEW mysecview4 WITH (security_barrier=false)
+       AS SELECT * FROM tbl1 WHERE a <> 256;
+SELECT relname, relkind, reloptions FROM pg_class
+       WHERE oid in ('mysecview1'::regclass, 'mysecview2'::regclass,
+                     'mysecview3'::regclass, 'mysecview4'::regclass)
+       ORDER BY relname;
+
 DROP SCHEMA temp_view_test CASCADE;
 DROP SCHEMA testviewschm2 CASCADE;