]> granicus.if.org Git - postgresql/commitdiff
Rearrange dblink's dblink_build_sql_insert() and related routines to open and
authorTom Lane <tgl@sss.pgh.pa.us>
Mon, 14 Jun 2010 20:49:33 +0000 (20:49 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Mon, 14 Jun 2010 20:49:33 +0000 (20:49 +0000)
lock the target relation just once per SQL function call.  The original coding
obtained and released lock several times per call.  Aside from saving a
not-insignificant number of cycles, this eliminates possible race conditions
if someone tries to modify the relation's schema concurrently.  Also
centralize locking and permission-checking logic.

Problem noted while investigating a trouble report from Robert Voinea --- his
problem is still to be fixed, though.

contrib/dblink/dblink.c

index cc76b857acad58ecb26d6002a02597fe6a346aa3..2e95cfb2657fe9df38c7954f75b9a29cc1c11548 100644 (file)
@@ -8,7 +8,7 @@
  * Darko Prenosil <Darko.Prenosil@finteh.hr>
  * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
  *
- * $PostgreSQL: pgsql/contrib/dblink/dblink.c,v 1.94 2010/06/09 03:39:26 itagaki Exp $
+ * $PostgreSQL: pgsql/contrib/dblink/dblink.c,v 1.95 2010/06/14 20:49:33 tgl Exp $
  * Copyright (c) 2001-2010, PostgreSQL Global Development Group
  * ALL RIGHTS RESERVED;
  *
@@ -86,23 +86,23 @@ static remoteConn *getConnectionByName(const char *name);
 static HTAB *createConnHash(void);
 static void createNewConnection(const char *name, remoteConn *rconn);
 static void deleteConnection(const char *name);
-static char **get_pkey_attnames(Oid relid, int16 *numatts);
+static char **get_pkey_attnames(Relation rel, int16 *numatts);
 static char **get_text_array_contents(ArrayType *array, int *numitems);
-static char *get_sql_insert(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
-static char *get_sql_delete(Oid relid, int2vector *pkattnums, int16 pknumatts, char **tgt_pkattvals);
-static char *get_sql_update(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
+static char *get_sql_insert(Relation rel, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
+static char *get_sql_delete(Relation rel, int2vector *pkattnums, int16 pknumatts, char **tgt_pkattvals);
+static char *get_sql_update(Relation rel, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
 static char *quote_literal_cstr(char *rawstr);
 static char *quote_ident_cstr(char *rawstr);
 static int16 get_attnum_pk_pos(int2vector *pkattnums, int16 pknumatts, int16 key);
-static HeapTuple get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals);
-static Oid     get_relid_from_relname(text *relname_text);
-static char *generate_relation_name(Oid relid);
+static HeapTuple get_tuple_of_interest(Relation rel, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals);
+static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
+static char *generate_relation_name(Relation rel);
 static void dblink_connstr_check(const char *connstr);
 static void dblink_security_check(PGconn *conn, remoteConn *rconn);
 static void dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail);
 static char *get_connect_string(const char *servername);
 static char *escape_param_str(const char *from);
-static int     get_nondropped_natts(Oid relid);
+static int     get_nondropped_natts(Relation rel);
 
 /* Global */
 static remoteConn *pconn = NULL;
@@ -1112,7 +1112,6 @@ Datum
 dblink_get_pkey(PG_FUNCTION_ARGS)
 {
        int16           numatts;
-       Oid                     relid;
        char      **results;
        FuncCallContext *funcctx;
        int32           call_cntr;
@@ -1123,7 +1122,8 @@ dblink_get_pkey(PG_FUNCTION_ARGS)
        /* stuff done only on the first call of the function */
        if (SRF_IS_FIRSTCALL())
        {
-               TupleDesc       tupdesc = NULL;
+               Relation        rel;
+               TupleDesc       tupdesc;
 
                /* create a function context for cross-call persistence */
                funcctx = SRF_FIRSTCALL_INIT();
@@ -1133,13 +1133,13 @@ dblink_get_pkey(PG_FUNCTION_ARGS)
                 */
                oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
 
-               /* convert relname to rel Oid */
-               relid = get_relid_from_relname(PG_GETARG_TEXT_P(0));
-               if (!OidIsValid(relid))
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_UNDEFINED_TABLE),
-                                        errmsg("relation \"%s\" does not exist",
-                                                       text_to_cstring(PG_GETARG_TEXT_PP(0)))));
+               /* open target relation */
+               rel = get_rel_from_relname(PG_GETARG_TEXT_P(0), AccessShareLock, ACL_SELECT);
+
+               /* get the array of attnums */
+               results = get_pkey_attnames(rel, &numatts);
+
+               relation_close(rel, AccessShareLock);
 
                /*
                 * need a tuple descriptor representing one INT and one TEXT column
@@ -1157,9 +1157,6 @@ dblink_get_pkey(PG_FUNCTION_ARGS)
                attinmeta = TupleDescGetAttInMetadata(tupdesc);
                funcctx->attinmeta = attinmeta;
 
-               /* get an array of attnums */
-               results = get_pkey_attnames(relid, &numatts);
-
                if ((results != NULL) && (numatts > 0))
                {
                        funcctx->max_calls = numatts;
@@ -1246,7 +1243,7 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS)
        int32           pknumatts_tmp = PG_GETARG_INT32(2);
        ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
        ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
-       Oid                     relid;
+       Relation        rel;
        int16           pknumatts = 0;
        char      **src_pkattvals;
        char      **tgt_pkattvals;
@@ -1256,14 +1253,9 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS)
        int                     nondropped_natts;
 
        /*
-        * Convert relname to rel OID.
+        * Open target relation.
         */
-       relid = get_relid_from_relname(relname_text);
-       if (!OidIsValid(relid))
-               ereport(ERROR,
-                               (errcode(ERRCODE_UNDEFINED_TABLE),
-                                errmsg("relation \"%s\" does not exist",
-                                               text_to_cstring(relname_text))));
+       rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
 
        /*
         * There should be at least one key attribute
@@ -1285,7 +1277,7 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS)
         * ensure we don't ask for more pk attributes than we have non-dropped
         * columns
         */
-       nondropped_natts = get_nondropped_natts(relid);
+       nondropped_natts = get_nondropped_natts(rel);
        if (pknumatts > nondropped_natts)
                ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
                                                errmsg("number of primary key fields exceeds number of specified relation attributes")));
@@ -1323,7 +1315,12 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS)
        /*
         * Prep work is finally done. Go get the SQL string.
         */
-       sql = get_sql_insert(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
+       sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
+
+       /*
+        * Now we can close the relation.
+        */
+       relation_close(rel, AccessShareLock);
 
        /*
         * And send it
@@ -1356,21 +1353,16 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS)
        int32           pknumatts_tmp = PG_GETARG_INT32(2);
        ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
        int                     nondropped_natts;
-       Oid                     relid;
+       Relation        rel;
        int16           pknumatts = 0;
        char      **tgt_pkattvals;
        int                     tgt_nitems;
        char       *sql;
 
        /*
-        * Convert relname to rel OID.
+        * Open target relation.
         */
-       relid = get_relid_from_relname(relname_text);
-       if (!OidIsValid(relid))
-               ereport(ERROR,
-                               (errcode(ERRCODE_UNDEFINED_TABLE),
-                                errmsg("relation \"%s\" does not exist",
-                                               text_to_cstring(relname_text))));
+       rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
 
        /*
         * There should be at least one key attribute
@@ -1392,7 +1384,7 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS)
         * ensure we don't ask for more pk attributes than we have non-dropped
         * columns
         */
-       nondropped_natts = get_nondropped_natts(relid);
+       nondropped_natts = get_nondropped_natts(rel);
        if (pknumatts > nondropped_natts)
                ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
                                                errmsg("number of primary key fields exceeds number of specified relation attributes")));
@@ -1415,7 +1407,12 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS)
        /*
         * Prep work is finally done. Go get the SQL string.
         */
-       sql = get_sql_delete(relid, pkattnums, pknumatts, tgt_pkattvals);
+       sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
+
+       /*
+        * Now we can close the relation.
+        */
+       relation_close(rel, AccessShareLock);
 
        /*
         * And send it
@@ -1453,7 +1450,7 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
        ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
        ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
        int                     nondropped_natts;
-       Oid                     relid;
+       Relation        rel;
        int16           pknumatts = 0;
        char      **src_pkattvals;
        char      **tgt_pkattvals;
@@ -1462,14 +1459,9 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
        char       *sql;
 
        /*
-        * Convert relname to rel OID.
+        * Open target relation.
         */
-       relid = get_relid_from_relname(relname_text);
-       if (!OidIsValid(relid))
-               ereport(ERROR,
-                               (errcode(ERRCODE_UNDEFINED_TABLE),
-                                errmsg("relation \"%s\" does not exist",
-                                               text_to_cstring(relname_text))));
+       rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
 
        /*
         * There should be one source array key values for each key attnum
@@ -1491,7 +1483,7 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
         * ensure we don't ask for more pk attributes than we have non-dropped
         * columns
         */
-       nondropped_natts = get_nondropped_natts(relid);
+       nondropped_natts = get_nondropped_natts(rel);
        if (pknumatts > nondropped_natts)
                ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
                                                errmsg("number of primary key fields exceeds number of specified relation attributes")));
@@ -1529,7 +1521,12 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
        /*
         * Prep work is finally done. Go get the SQL string.
         */
-       sql = get_sql_update(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
+       sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
+
+       /*
+        * Now we can close the relation.
+        */
+       relation_close(rel, AccessShareLock);
 
        /*
         * And send it
@@ -1643,7 +1640,7 @@ dblink_get_notify(PG_FUNCTION_ARGS)
  * Return NULL, and set numatts = 0, if no primary key exists.
  */
 static char **
-get_pkey_attnames(Oid relid, int16 *numatts)
+get_pkey_attnames(Relation rel, int16 *numatts)
 {
        Relation        indexRelation;
        ScanKeyData skey;
@@ -1651,22 +1648,11 @@ get_pkey_attnames(Oid relid, int16 *numatts)
        HeapTuple       indexTuple;
        int                     i;
        char      **result = NULL;
-       Relation        rel;
        TupleDesc       tupdesc;
-       AclResult       aclresult;
 
        /* initialize numatts to 0 in case no primary key exists */
        *numatts = 0;
 
-       /* open relation using relid, check permissions, get tupdesc */
-       rel = relation_open(relid, AccessShareLock);
-
-       aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
-                                                                 ACL_SELECT);
-       if (aclresult != ACLCHECK_OK)
-               aclcheck_error(aclresult, ACL_KIND_CLASS,
-                                          RelationGetRelationName(rel));
-
        tupdesc = rel->rd_att;
 
        /* Prepare to scan pg_index for entries having indrelid = this rel. */
@@ -1674,7 +1660,7 @@ get_pkey_attnames(Oid relid, int16 *numatts)
        ScanKeyInit(&skey,
                                Anum_pg_index_indrelid,
                                BTEqualStrategyNumber, F_OIDEQ,
-                               ObjectIdGetDatum(relid));
+                               ObjectIdGetDatum(RelationGetRelid(rel)));
 
        scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
                                                          SnapshotNow, 1, &skey);
@@ -1700,7 +1686,6 @@ get_pkey_attnames(Oid relid, int16 *numatts)
 
        systable_endscan(scan);
        heap_close(indexRelation, AccessShareLock);
-       relation_close(rel, AccessShareLock);
 
        return result;
 }
@@ -1766,9 +1751,8 @@ get_text_array_contents(ArrayType *array, int *numitems)
 }
 
 static char *
-get_sql_insert(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
+get_sql_insert(Relation rel, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
 {
-       Relation        rel;
        char       *relname;
        HeapTuple       tuple;
        TupleDesc       tupdesc;
@@ -1782,16 +1766,12 @@ get_sql_insert(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pka
        initStringInfo(&buf);
 
        /* get relation name including any needed schema prefix and quoting */
-       relname = generate_relation_name(relid);
+       relname = generate_relation_name(rel);
 
-       /*
-        * Open relation using relid
-        */
-       rel = relation_open(relid, AccessShareLock);
        tupdesc = rel->rd_att;
        natts = tupdesc->natts;
 
-       tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
+       tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
        if (!tuple)
                ereport(ERROR,
                                (errcode(ERRCODE_CARDINALITY_VIOLATION),
@@ -1848,14 +1828,12 @@ get_sql_insert(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pka
        }
        appendStringInfo(&buf, ")");
 
-       relation_close(rel, AccessShareLock);
        return (buf.data);
 }
 
 static char *
-get_sql_delete(Oid relid, int2vector *pkattnums, int16 pknumatts, char **tgt_pkattvals)
+get_sql_delete(Relation rel, int2vector *pkattnums, int16 pknumatts, char **tgt_pkattvals)
 {
-       Relation        rel;
        char       *relname;
        TupleDesc       tupdesc;
        int                     natts;
@@ -1865,12 +1843,8 @@ get_sql_delete(Oid relid, int2vector *pkattnums, int16 pknumatts, char **tgt_pka
        initStringInfo(&buf);
 
        /* get relation name including any needed schema prefix and quoting */
-       relname = generate_relation_name(relid);
+       relname = generate_relation_name(rel);
 
-       /*
-        * Open relation using relid
-        */
-       rel = relation_open(relid, AccessShareLock);
        tupdesc = rel->rd_att;
        natts = tupdesc->natts;
 
@@ -1896,14 +1870,12 @@ get_sql_delete(Oid relid, int2vector *pkattnums, int16 pknumatts, char **tgt_pka
                        appendStringInfo(&buf, " IS NULL");
        }
 
-       relation_close(rel, AccessShareLock);
        return (buf.data);
 }
 
 static char *
-get_sql_update(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
+get_sql_update(Relation rel, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
 {
-       Relation        rel;
        char       *relname;
        HeapTuple       tuple;
        TupleDesc       tupdesc;
@@ -1917,16 +1889,12 @@ get_sql_update(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pka
        initStringInfo(&buf);
 
        /* get relation name including any needed schema prefix and quoting */
-       relname = generate_relation_name(relid);
+       relname = generate_relation_name(rel);
 
-       /*
-        * Open relation using relid
-        */
-       rel = relation_open(relid, AccessShareLock);
        tupdesc = rel->rd_att;
        natts = tupdesc->natts;
 
-       tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
+       tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
        if (!tuple)
                ereport(ERROR,
                                (errcode(ERRCODE_CARDINALITY_VIOLATION),
@@ -1992,7 +1960,6 @@ get_sql_update(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pka
                        appendStringInfo(&buf, " IS NULL");
        }
 
-       relation_close(rel, AccessShareLock);
        return (buf.data);
 }
 
@@ -2050,9 +2017,8 @@ get_attnum_pk_pos(int2vector *pkattnums, int16 pknumatts, int16 key)
 }
 
 static HeapTuple
-get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals)
+get_tuple_of_interest(Relation rel, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals)
 {
-       Relation        rel;
        char       *relname;
        TupleDesc       tupdesc;
        StringInfoData buf;
@@ -2063,14 +2029,9 @@ get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char **
        initStringInfo(&buf);
 
        /* get relation name including any needed schema prefix and quoting */
-       relname = generate_relation_name(relid);
+       relname = generate_relation_name(rel);
 
-       /*
-        * Open relation using relid
-        */
-       rel = relation_open(relid, AccessShareLock);
-       tupdesc = CreateTupleDescCopy(rel->rd_att);
-       relation_close(rel, AccessShareLock);
+       tupdesc = rel->rd_att;
 
        /*
         * Connect to SPI manager
@@ -2141,50 +2102,49 @@ get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char **
        return NULL;
 }
 
-static Oid
-get_relid_from_relname(text *relname_text)
+/*
+ * Open the relation named by relname_text, acquire specified type of lock,
+ * verify we have specified permissions.
+ * Caller must close rel when done with it.
+ */
+static Relation
+get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
 {
        RangeVar   *relvar;
        Relation        rel;
-       Oid                     relid;
+       AclResult       aclresult;
 
        relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
-       rel = heap_openrv(relvar, AccessShareLock);
-       relid = RelationGetRelid(rel);
-       relation_close(rel, AccessShareLock);
+       rel = heap_openrv(relvar, lockmode);
 
-       return relid;
+       aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
+                                                                 aclmode);
+       if (aclresult != ACLCHECK_OK)
+               aclcheck_error(aclresult, ACL_KIND_CLASS,
+                                          RelationGetRelationName(rel));
+
+       return rel;
 }
 
 /*
  * generate_relation_name - copied from ruleutils.c
- *             Compute the name to display for a relation specified by OID
+ *             Compute the name to display for a relation
  *
  * The result includes all necessary quoting and schema-prefixing.
  */
 static char *
-generate_relation_name(Oid relid)
+generate_relation_name(Relation rel)
 {
-       HeapTuple       tp;
-       Form_pg_class reltup;
        char       *nspname;
        char       *result;
 
-       tp = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
-       if (!HeapTupleIsValid(tp))
-               elog(ERROR, "cache lookup failed for relation %u", relid);
-
-       reltup = (Form_pg_class) GETSTRUCT(tp);
-
        /* Qualify the name if not visible in search path */
-       if (RelationIsVisible(relid))
+       if (RelationIsVisible(RelationGetRelid(rel)))
                nspname = NULL;
        else
-               nspname = get_namespace_name(reltup->relnamespace);
+               nspname = get_namespace_name(rel->rd_rel->relnamespace);
 
-       result = quote_qualified_identifier(nspname, NameStr(reltup->relname));
-
-       ReleaseSysCache(tp);
+       result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
 
        return result;
 }
@@ -2469,15 +2429,13 @@ escape_param_str(const char *str)
 }
 
 static int
-get_nondropped_natts(Oid relid)
+get_nondropped_natts(Relation rel)
 {
        int                     nondropped_natts = 0;
        TupleDesc       tupdesc;
-       Relation        rel;
        int                     natts;
        int                     i;
 
-       rel = relation_open(relid, AccessShareLock);
        tupdesc = rel->rd_att;
        natts = tupdesc->natts;
 
@@ -2488,6 +2446,5 @@ get_nondropped_natts(Oid relid)
                nondropped_natts++;
        }
 
-       relation_close(rel, AccessShareLock);
        return nondropped_natts;
 }