]> granicus.if.org Git - postgresql/blobdiff - src/backend/executor/execMain.c
Ensure tableoid reads correctly in EvalPlanQual-manufactured tuples.
[postgresql] / src / backend / executor / execMain.c
index 5b70cc90f65f4cd336b596d648573009ac5e9290..13ceffae5c46390e1bb912dea8c7294ac7e17ccc 100644 (file)
@@ -56,6 +56,7 @@
 #include "utils/acl.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/rls.h"
 #include "utils/snapmgr.h"
 #include "utils/tqual.h"
 
@@ -82,12 +83,23 @@ static void ExecutePlan(EState *estate, PlanState *planstate,
                        DestReceiver *dest);
 static bool ExecCheckRTEPerms(RangeTblEntry *rte);
 static void ExecCheckXactReadOnly(PlannedStmt *plannedstmt);
-static char *ExecBuildSlotValueDescription(TupleTableSlot *slot,
+static char *ExecBuildSlotValueDescription(Oid reloid,
+                                                         TupleTableSlot *slot,
                                                          TupleDesc tupdesc,
+                                                         Bitmapset *modifiedCols,
                                                          int maxfieldlen);
 static void EvalPlanQualStart(EPQState *epqstate, EState *parentestate,
                                  Plan *planTree);
 
+/*
+ * Note that this macro also exists in commands/trigger.c.  There does not
+ * appear to be any good header to put it into, given the structures that
+ * it uses, so we let them be duplicated.  Be sure to update both if one needs
+ * to be changed, however.
+ */
+#define GetModifiedColumns(relinfo, estate) \
+       (rt_fetch((relinfo)->ri_RangeTableIndex, (estate)->es_range_table)->modifiedCols)
+
 /* end of local decls */
 
 
@@ -1607,15 +1619,24 @@ ExecConstraints(ResultRelInfo *resultRelInfo,
                {
                        if (tupdesc->attrs[attrChk - 1]->attnotnull &&
                                slot_attisnull(slot, attrChk))
+                       {
+                               char       *val_desc;
+                               Bitmapset  *modifiedCols;
+
+                               modifiedCols = GetModifiedColumns(resultRelInfo, estate);
+                               val_desc = ExecBuildSlotValueDescription(RelationGetRelid(rel),
+                                                                                                                slot,
+                                                                                                                tupdesc,
+                                                                                                                modifiedCols,
+                                                                                                                64);
+
                                ereport(ERROR,
                                                (errcode(ERRCODE_NOT_NULL_VIOLATION),
                                                 errmsg("null value in column \"%s\" violates not-null constraint",
                                                          NameStr(tupdesc->attrs[attrChk - 1]->attname)),
-                                                errdetail("Failing row contains %s.",
-                                                                  ExecBuildSlotValueDescription(slot,
-                                                                                                                                tupdesc,
-                                                                                                                                64)),
+                                                val_desc ? errdetail("Failing row contains %s.", val_desc) : 0,
                                                 errtablecol(rel, attrChk)));
+                       }
                }
        }
 
@@ -1624,15 +1645,23 @@ ExecConstraints(ResultRelInfo *resultRelInfo,
                const char *failed;
 
                if ((failed = ExecRelCheck(resultRelInfo, slot, estate)) != NULL)
+               {
+                       char       *val_desc;
+                       Bitmapset  *modifiedCols;
+
+                       modifiedCols = GetModifiedColumns(resultRelInfo, estate);
+                       val_desc = ExecBuildSlotValueDescription(RelationGetRelid(rel),
+                                                                                                        slot,
+                                                                                                        tupdesc,
+                                                                                                        modifiedCols,
+                                                                                                        64);
                        ereport(ERROR,
                                        (errcode(ERRCODE_CHECK_VIOLATION),
                                         errmsg("new row for relation \"%s\" violates check constraint \"%s\"",
                                                        RelationGetRelationName(rel), failed),
-                                        errdetail("Failing row contains %s.",
-                                                          ExecBuildSlotValueDescription(slot,
-                                                                                                                        tupdesc,
-                                                                                                                        64)),
+                                        val_desc ? errdetail("Failing row contains %s.", val_desc) : 0,
                                         errtableconstraint(rel, failed)));
+               }
        }
 }
 
@@ -1643,6 +1672,8 @@ void
 ExecWithCheckOptions(ResultRelInfo *resultRelInfo,
                                         TupleTableSlot *slot, EState *estate)
 {
+       Relation        rel = resultRelInfo->ri_RelationDesc;
+       TupleDesc       tupdesc = RelationGetDescr(rel);
        ExprContext *econtext;
        ListCell   *l1,
                           *l2;
@@ -1673,14 +1704,24 @@ ExecWithCheckOptions(ResultRelInfo *resultRelInfo,
                 * case (the opposite of what we do above for CHECK constraints).
                 */
                if (!ExecQual((List *) wcoExpr, econtext, false))
+               {
+                       char       *val_desc;
+                       Bitmapset  *modifiedCols;
+
+                       modifiedCols = GetModifiedColumns(resultRelInfo, estate);
+                       val_desc = ExecBuildSlotValueDescription(RelationGetRelid(rel),
+                                                                                                        slot,
+                                                                                                        tupdesc,
+                                                                                                        modifiedCols,
+                                                                                                        64);
+
                        ereport(ERROR,
                                        (errcode(ERRCODE_WITH_CHECK_OPTION_VIOLATION),
                                 errmsg("new row violates WITH CHECK OPTION for \"%s\"",
                                                wco->viewname),
-                                        errdetail("Failing row contains %s.",
-                                                          ExecBuildSlotValueDescription(slot,
-                                                       RelationGetDescr(resultRelInfo->ri_RelationDesc),
-                                                                                                                        64))));
+                                       val_desc ? errdetail("Failing row contains %s.", val_desc) :
+                                                          0));
+               }
        }
 }
 
@@ -1696,25 +1737,64 @@ ExecWithCheckOptions(ResultRelInfo *resultRelInfo,
  * dropped columns.  We used to use the slot's tuple descriptor to decode the
  * data, but the slot's descriptor doesn't identify dropped columns, so we
  * now need to be passed the relation's descriptor.
+ *
+ * Note that, like BuildIndexValueDescription, if the user does not have
+ * permission to view any of the columns involved, a NULL is returned.  Unlike
+ * BuildIndexValueDescription, if the user has access to view a subset of the
+ * column involved, that subset will be returned with a key identifying which
+ * columns they are.
  */
 static char *
-ExecBuildSlotValueDescription(TupleTableSlot *slot,
+ExecBuildSlotValueDescription(Oid reloid,
+                                                         TupleTableSlot *slot,
                                                          TupleDesc tupdesc,
+                                                         Bitmapset *modifiedCols,
                                                          int maxfieldlen)
 {
        StringInfoData buf;
+       StringInfoData collist;
        bool            write_comma = false;
+       bool            write_comma_collist = false;
        int                     i;
+       AclResult       aclresult;
+       bool            table_perm = false;
+       bool            any_perm = false;
 
-       /* Make sure the tuple is fully deconstructed */
-       slot_getallattrs(slot);
+       /*
+        * Check if RLS is enabled and should be active for the relation; if so,
+        * then don't return anything.  Otherwise, go through normal permission
+        * checks.
+        */
+       if (check_enable_rls(reloid, GetUserId(), true) == RLS_ENABLED)
+               return NULL;
 
        initStringInfo(&buf);
 
        appendStringInfoChar(&buf, '(');
 
+       /*
+        * Check if the user has permissions to see the row.  Table-level SELECT
+        * allows access to all columns.  If the user does not have table-level
+        * SELECT then we check each column and include those the user has SELECT
+        * rights on.  Additionally, we always include columns the user provided
+        * data for.
+        */
+       aclresult = pg_class_aclcheck(reloid, GetUserId(), ACL_SELECT);
+       if (aclresult != ACLCHECK_OK)
+       {
+               /* Set up the buffer for the column list */
+               initStringInfo(&collist);
+               appendStringInfoChar(&collist, '(');
+       }
+       else
+               table_perm = any_perm = true;
+
+       /* Make sure the tuple is fully deconstructed */
+       slot_getallattrs(slot);
+
        for (i = 0; i < tupdesc->natts; i++)
        {
+               bool            column_perm = false;
                char       *val;
                int                     vallen;
 
@@ -1722,37 +1802,76 @@ ExecBuildSlotValueDescription(TupleTableSlot *slot,
                if (tupdesc->attrs[i]->attisdropped)
                        continue;
 
-               if (slot->tts_isnull[i])
-                       val = "null";
-               else
+               if (!table_perm)
                {
-                       Oid                     foutoid;
-                       bool            typisvarlena;
+                       /*
+                        * No table-level SELECT, so need to make sure they either have
+                        * SELECT rights on the column or that they have provided the
+                        * data for the column.  If not, omit this column from the error
+                        * message.
+                        */
+                       aclresult = pg_attribute_aclcheck(reloid, tupdesc->attrs[i]->attnum,
+                                                                                         GetUserId(), ACL_SELECT);
+                       if (bms_is_member(tupdesc->attrs[i]->attnum - FirstLowInvalidHeapAttributeNumber,
+                                                         modifiedCols) || aclresult == ACLCHECK_OK)
+                       {
+                               column_perm = any_perm = true;
 
-                       getTypeOutputInfo(tupdesc->attrs[i]->atttypid,
-                                                         &foutoid, &typisvarlena);
-                       val = OidOutputFunctionCall(foutoid, slot->tts_values[i]);
-               }
+                               if (write_comma_collist)
+                                       appendStringInfoString(&collist, ", ");
+                               else
+                                       write_comma_collist = true;
 
-               if (write_comma)
-                       appendStringInfoString(&buf, ", ");
-               else
-                       write_comma = true;
+                               appendStringInfoString(&collist, NameStr(tupdesc->attrs[i]->attname));
+                       }
+               }
 
-               /* truncate if needed */
-               vallen = strlen(val);
-               if (vallen <= maxfieldlen)
-                       appendStringInfoString(&buf, val);
-               else
+               if (table_perm || column_perm)
                {
-                       vallen = pg_mbcliplen(val, vallen, maxfieldlen);
-                       appendBinaryStringInfo(&buf, val, vallen);
-                       appendStringInfoString(&buf, "...");
+                       if (slot->tts_isnull[i])
+                               val = "null";
+                       else
+                       {
+                               Oid                     foutoid;
+                               bool            typisvarlena;
+
+                               getTypeOutputInfo(tupdesc->attrs[i]->atttypid,
+                                                                 &foutoid, &typisvarlena);
+                               val = OidOutputFunctionCall(foutoid, slot->tts_values[i]);
+                       }
+
+                       if (write_comma)
+                               appendStringInfoString(&buf, ", ");
+                       else
+                               write_comma = true;
+
+                       /* truncate if needed */
+                       vallen = strlen(val);
+                       if (vallen <= maxfieldlen)
+                               appendStringInfoString(&buf, val);
+                       else
+                       {
+                               vallen = pg_mbcliplen(val, vallen, maxfieldlen);
+                               appendBinaryStringInfo(&buf, val, vallen);
+                               appendStringInfoString(&buf, "...");
+                       }
                }
        }
 
+       /* If we end up with zero columns being returned, then return NULL. */
+       if (!any_perm)
+               return NULL;
+
        appendStringInfoChar(&buf, ')');
 
+       if (!table_perm)
+       {
+               appendStringInfoString(&collist, ") = ");
+               appendStringInfoString(&collist, buf.data);
+
+               return collist.data;
+       }
+
        return buf.data;
 }
 
@@ -1999,7 +2118,7 @@ EvalPlanQualFetch(EState *estate, Relation relation, int lockmode,
                                {
                                        case LockWaitBlock:
                                                XactLockTableWait(SnapshotDirty.xmax,
-                                                                                 relation, &tuple.t_data->t_ctid,
+                                                                                 relation, &tuple.t_self,
                                                                                  XLTW_FetchUpdated);
                                                break;
                                        case LockWaitSkip:
@@ -2024,7 +2143,7 @@ EvalPlanQualFetch(EState *estate, Relation relation, int lockmode,
                         * heap_lock_tuple() will throw an error, and so would any later
                         * attempt to update or delete the tuple.  (We need not check cmax
                         * because HeapTupleSatisfiesDirty will consider a tuple deleted
-                        * by our transaction dead, regardless of cmax.) Wee just checked
+                        * by our transaction dead, regardless of cmax.) We just checked
                         * that priorXmax == xmin, so we can test that variable instead of
                         * doing HeapTupleHeaderGetXmin again.
                         */
@@ -2182,7 +2301,7 @@ EvalPlanQualInit(EPQState *epqstate, EState *estate,
 /*
  * EvalPlanQualSetPlan -- set or change subplan of an EPQState.
  *
- * We need this so that ModifyTuple can deal with multiple subplans.
+ * We need this so that ModifyTable can deal with multiple subplans.
  */
 void
 EvalPlanQualSetPlan(EPQState *epqstate, Plan *subplan, List *auxrowmarks)
@@ -2319,7 +2438,9 @@ EvalPlanQualFetchRowMarks(EPQState *epqstate)
                        /* build a temporary HeapTuple control structure */
                        tuple.t_len = HeapTupleHeaderGetDatumLength(td);
                        ItemPointerSetInvalid(&(tuple.t_self));
-                       tuple.t_tableOid = InvalidOid;
+                       /* relation might be a foreign table, if so provide tableoid */
+                       tuple.t_tableOid = getrelid(erm->rti,
+                                                                               epqstate->estate->es_range_table);
                        tuple.t_data = td;
 
                        /* copy and store tuple */
@@ -2419,6 +2540,14 @@ EvalPlanQualStart(EPQState *epqstate, EState *parentestate, Plan *planTree)
         * the snapshot, rangetable, result-rel info, and external Param info.
         * They need their own copies of local state, including a tuple table,
         * es_param_exec_vals, etc.
+        *
+        * The ResultRelInfo array management is trickier than it looks.  We
+        * create a fresh array for the child but copy all the content from the
+        * parent.  This is because it's okay for the child to share any
+        * per-relation state the parent has already created --- but if the child
+        * sets up any ResultRelInfo fields, such as its own junkfilter, that
+        * state must *not* propagate back to the parent.  (For one thing, the
+        * pointed-to data is in a memory context that won't last long enough.)
         */
        estate->es_direction = ForwardScanDirection;
        estate->es_snapshot = parentestate->es_snapshot;
@@ -2427,9 +2556,19 @@ EvalPlanQualStart(EPQState *epqstate, EState *parentestate, Plan *planTree)
        estate->es_plannedstmt = parentestate->es_plannedstmt;
        estate->es_junkFilter = parentestate->es_junkFilter;
        estate->es_output_cid = parentestate->es_output_cid;
-       estate->es_result_relations = parentestate->es_result_relations;
-       estate->es_num_result_relations = parentestate->es_num_result_relations;
-       estate->es_result_relation_info = parentestate->es_result_relation_info;
+       if (parentestate->es_num_result_relations > 0)
+       {
+               int                     numResultRelations = parentestate->es_num_result_relations;
+               ResultRelInfo *resultRelInfos;
+
+               resultRelInfos = (ResultRelInfo *)
+                       palloc(numResultRelations * sizeof(ResultRelInfo));
+               memcpy(resultRelInfos, parentestate->es_result_relations,
+                          numResultRelations * sizeof(ResultRelInfo));
+               estate->es_result_relations = resultRelInfos;
+               estate->es_num_result_relations = numResultRelations;
+       }
+       /* es_result_relation_info must NOT be copied */
        /* es_trig_target_relations must NOT be copied */
        estate->es_rowMarks = parentestate->es_rowMarks;
        estate->es_top_eflags = parentestate->es_top_eflags;