From 6bbaa3148d65b3bed987c40b78e2b9a55bdfab58 Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Mon, 14 Jun 2010 20:49:33 +0000 Subject: [PATCH] Rearrange dblink's dblink_build_sql_insert() and related routines to open and lock the target relation just once per SQL function call. The original coding obtained and released lock several times per call. Aside from saving a not-insignificant number of cycles, this eliminates possible race conditions if someone tries to modify the relation's schema concurrently. Also centralize locking and permission-checking logic. Problem noted while investigating a trouble report from Robert Voinea --- his problem is still to be fixed, though. --- contrib/dblink/dblink.c | 209 ++++++++++++++++------------------------ 1 file changed, 83 insertions(+), 126 deletions(-) diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index cc76b857ac..2e95cfb265 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -8,7 +8,7 @@ * Darko Prenosil * Shridhar Daithankar * - * $PostgreSQL: pgsql/contrib/dblink/dblink.c,v 1.94 2010/06/09 03:39:26 itagaki Exp $ + * $PostgreSQL: pgsql/contrib/dblink/dblink.c,v 1.95 2010/06/14 20:49:33 tgl Exp $ * Copyright (c) 2001-2010, PostgreSQL Global Development Group * ALL RIGHTS RESERVED; * @@ -86,23 +86,23 @@ static remoteConn *getConnectionByName(const char *name); static HTAB *createConnHash(void); static void createNewConnection(const char *name, remoteConn *rconn); static void deleteConnection(const char *name); -static char **get_pkey_attnames(Oid relid, int16 *numatts); +static char **get_pkey_attnames(Relation rel, int16 *numatts); static char **get_text_array_contents(ArrayType *array, int *numitems); -static char *get_sql_insert(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals); -static char *get_sql_delete(Oid relid, int2vector *pkattnums, int16 pknumatts, char **tgt_pkattvals); -static char *get_sql_update(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals); +static char *get_sql_insert(Relation rel, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals); +static char *get_sql_delete(Relation rel, int2vector *pkattnums, int16 pknumatts, char **tgt_pkattvals); +static char *get_sql_update(Relation rel, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals); static char *quote_literal_cstr(char *rawstr); static char *quote_ident_cstr(char *rawstr); static int16 get_attnum_pk_pos(int2vector *pkattnums, int16 pknumatts, int16 key); -static HeapTuple get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals); -static Oid get_relid_from_relname(text *relname_text); -static char *generate_relation_name(Oid relid); +static HeapTuple get_tuple_of_interest(Relation rel, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals); +static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode); +static char *generate_relation_name(Relation rel); static void dblink_connstr_check(const char *connstr); static void dblink_security_check(PGconn *conn, remoteConn *rconn); static void dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail); static char *get_connect_string(const char *servername); static char *escape_param_str(const char *from); -static int get_nondropped_natts(Oid relid); +static int get_nondropped_natts(Relation rel); /* Global */ static remoteConn *pconn = NULL; @@ -1112,7 +1112,6 @@ Datum dblink_get_pkey(PG_FUNCTION_ARGS) { int16 numatts; - Oid relid; char **results; FuncCallContext *funcctx; int32 call_cntr; @@ -1123,7 +1122,8 @@ dblink_get_pkey(PG_FUNCTION_ARGS) /* stuff done only on the first call of the function */ if (SRF_IS_FIRSTCALL()) { - TupleDesc tupdesc = NULL; + Relation rel; + TupleDesc tupdesc; /* create a function context for cross-call persistence */ funcctx = SRF_FIRSTCALL_INIT(); @@ -1133,13 +1133,13 @@ dblink_get_pkey(PG_FUNCTION_ARGS) */ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); - /* convert relname to rel Oid */ - relid = get_relid_from_relname(PG_GETARG_TEXT_P(0)); - if (!OidIsValid(relid)) - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_TABLE), - errmsg("relation \"%s\" does not exist", - text_to_cstring(PG_GETARG_TEXT_PP(0))))); + /* open target relation */ + rel = get_rel_from_relname(PG_GETARG_TEXT_P(0), AccessShareLock, ACL_SELECT); + + /* get the array of attnums */ + results = get_pkey_attnames(rel, &numatts); + + relation_close(rel, AccessShareLock); /* * need a tuple descriptor representing one INT and one TEXT column @@ -1157,9 +1157,6 @@ dblink_get_pkey(PG_FUNCTION_ARGS) attinmeta = TupleDescGetAttInMetadata(tupdesc); funcctx->attinmeta = attinmeta; - /* get an array of attnums */ - results = get_pkey_attnames(relid, &numatts); - if ((results != NULL) && (numatts > 0)) { funcctx->max_calls = numatts; @@ -1246,7 +1243,7 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS) int32 pknumatts_tmp = PG_GETARG_INT32(2); ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3); ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4); - Oid relid; + Relation rel; int16 pknumatts = 0; char **src_pkattvals; char **tgt_pkattvals; @@ -1256,14 +1253,9 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS) int nondropped_natts; /* - * Convert relname to rel OID. + * Open target relation. */ - relid = get_relid_from_relname(relname_text); - if (!OidIsValid(relid)) - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_TABLE), - errmsg("relation \"%s\" does not exist", - text_to_cstring(relname_text)))); + rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT); /* * There should be at least one key attribute @@ -1285,7 +1277,7 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS) * ensure we don't ask for more pk attributes than we have non-dropped * columns */ - nondropped_natts = get_nondropped_natts(relid); + nondropped_natts = get_nondropped_natts(rel); if (pknumatts > nondropped_natts) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("number of primary key fields exceeds number of specified relation attributes"))); @@ -1323,7 +1315,12 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS) /* * Prep work is finally done. Go get the SQL string. */ - sql = get_sql_insert(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals); + sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals); + + /* + * Now we can close the relation. + */ + relation_close(rel, AccessShareLock); /* * And send it @@ -1356,21 +1353,16 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS) int32 pknumatts_tmp = PG_GETARG_INT32(2); ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3); int nondropped_natts; - Oid relid; + Relation rel; int16 pknumatts = 0; char **tgt_pkattvals; int tgt_nitems; char *sql; /* - * Convert relname to rel OID. + * Open target relation. */ - relid = get_relid_from_relname(relname_text); - if (!OidIsValid(relid)) - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_TABLE), - errmsg("relation \"%s\" does not exist", - text_to_cstring(relname_text)))); + rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT); /* * There should be at least one key attribute @@ -1392,7 +1384,7 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS) * ensure we don't ask for more pk attributes than we have non-dropped * columns */ - nondropped_natts = get_nondropped_natts(relid); + nondropped_natts = get_nondropped_natts(rel); if (pknumatts > nondropped_natts) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("number of primary key fields exceeds number of specified relation attributes"))); @@ -1415,7 +1407,12 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS) /* * Prep work is finally done. Go get the SQL string. */ - sql = get_sql_delete(relid, pkattnums, pknumatts, tgt_pkattvals); + sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals); + + /* + * Now we can close the relation. + */ + relation_close(rel, AccessShareLock); /* * And send it @@ -1453,7 +1450,7 @@ dblink_build_sql_update(PG_FUNCTION_ARGS) ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3); ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4); int nondropped_natts; - Oid relid; + Relation rel; int16 pknumatts = 0; char **src_pkattvals; char **tgt_pkattvals; @@ -1462,14 +1459,9 @@ dblink_build_sql_update(PG_FUNCTION_ARGS) char *sql; /* - * Convert relname to rel OID. + * Open target relation. */ - relid = get_relid_from_relname(relname_text); - if (!OidIsValid(relid)) - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_TABLE), - errmsg("relation \"%s\" does not exist", - text_to_cstring(relname_text)))); + rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT); /* * There should be one source array key values for each key attnum @@ -1491,7 +1483,7 @@ dblink_build_sql_update(PG_FUNCTION_ARGS) * ensure we don't ask for more pk attributes than we have non-dropped * columns */ - nondropped_natts = get_nondropped_natts(relid); + nondropped_natts = get_nondropped_natts(rel); if (pknumatts > nondropped_natts) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("number of primary key fields exceeds number of specified relation attributes"))); @@ -1529,7 +1521,12 @@ dblink_build_sql_update(PG_FUNCTION_ARGS) /* * Prep work is finally done. Go get the SQL string. */ - sql = get_sql_update(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals); + sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals); + + /* + * Now we can close the relation. + */ + relation_close(rel, AccessShareLock); /* * And send it @@ -1643,7 +1640,7 @@ dblink_get_notify(PG_FUNCTION_ARGS) * Return NULL, and set numatts = 0, if no primary key exists. */ static char ** -get_pkey_attnames(Oid relid, int16 *numatts) +get_pkey_attnames(Relation rel, int16 *numatts) { Relation indexRelation; ScanKeyData skey; @@ -1651,22 +1648,11 @@ get_pkey_attnames(Oid relid, int16 *numatts) HeapTuple indexTuple; int i; char **result = NULL; - Relation rel; TupleDesc tupdesc; - AclResult aclresult; /* initialize numatts to 0 in case no primary key exists */ *numatts = 0; - /* open relation using relid, check permissions, get tupdesc */ - rel = relation_open(relid, AccessShareLock); - - aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(), - ACL_SELECT); - if (aclresult != ACLCHECK_OK) - aclcheck_error(aclresult, ACL_KIND_CLASS, - RelationGetRelationName(rel)); - tupdesc = rel->rd_att; /* Prepare to scan pg_index for entries having indrelid = this rel. */ @@ -1674,7 +1660,7 @@ get_pkey_attnames(Oid relid, int16 *numatts) ScanKeyInit(&skey, Anum_pg_index_indrelid, BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(relid)); + ObjectIdGetDatum(RelationGetRelid(rel))); scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true, SnapshotNow, 1, &skey); @@ -1700,7 +1686,6 @@ get_pkey_attnames(Oid relid, int16 *numatts) systable_endscan(scan); heap_close(indexRelation, AccessShareLock); - relation_close(rel, AccessShareLock); return result; } @@ -1766,9 +1751,8 @@ get_text_array_contents(ArrayType *array, int *numitems) } static char * -get_sql_insert(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals) +get_sql_insert(Relation rel, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals) { - Relation rel; char *relname; HeapTuple tuple; TupleDesc tupdesc; @@ -1782,16 +1766,12 @@ get_sql_insert(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pka initStringInfo(&buf); /* get relation name including any needed schema prefix and quoting */ - relname = generate_relation_name(relid); + relname = generate_relation_name(rel); - /* - * Open relation using relid - */ - rel = relation_open(relid, AccessShareLock); tupdesc = rel->rd_att; natts = tupdesc->natts; - tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals); + tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals); if (!tuple) ereport(ERROR, (errcode(ERRCODE_CARDINALITY_VIOLATION), @@ -1848,14 +1828,12 @@ get_sql_insert(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pka } appendStringInfo(&buf, ")"); - relation_close(rel, AccessShareLock); return (buf.data); } static char * -get_sql_delete(Oid relid, int2vector *pkattnums, int16 pknumatts, char **tgt_pkattvals) +get_sql_delete(Relation rel, int2vector *pkattnums, int16 pknumatts, char **tgt_pkattvals) { - Relation rel; char *relname; TupleDesc tupdesc; int natts; @@ -1865,12 +1843,8 @@ get_sql_delete(Oid relid, int2vector *pkattnums, int16 pknumatts, char **tgt_pka initStringInfo(&buf); /* get relation name including any needed schema prefix and quoting */ - relname = generate_relation_name(relid); + relname = generate_relation_name(rel); - /* - * Open relation using relid - */ - rel = relation_open(relid, AccessShareLock); tupdesc = rel->rd_att; natts = tupdesc->natts; @@ -1896,14 +1870,12 @@ get_sql_delete(Oid relid, int2vector *pkattnums, int16 pknumatts, char **tgt_pka appendStringInfo(&buf, " IS NULL"); } - relation_close(rel, AccessShareLock); return (buf.data); } static char * -get_sql_update(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals) +get_sql_update(Relation rel, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals) { - Relation rel; char *relname; HeapTuple tuple; TupleDesc tupdesc; @@ -1917,16 +1889,12 @@ get_sql_update(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pka initStringInfo(&buf); /* get relation name including any needed schema prefix and quoting */ - relname = generate_relation_name(relid); + relname = generate_relation_name(rel); - /* - * Open relation using relid - */ - rel = relation_open(relid, AccessShareLock); tupdesc = rel->rd_att; natts = tupdesc->natts; - tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals); + tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals); if (!tuple) ereport(ERROR, (errcode(ERRCODE_CARDINALITY_VIOLATION), @@ -1992,7 +1960,6 @@ get_sql_update(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pka appendStringInfo(&buf, " IS NULL"); } - relation_close(rel, AccessShareLock); return (buf.data); } @@ -2050,9 +2017,8 @@ get_attnum_pk_pos(int2vector *pkattnums, int16 pknumatts, int16 key) } static HeapTuple -get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals) +get_tuple_of_interest(Relation rel, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals) { - Relation rel; char *relname; TupleDesc tupdesc; StringInfoData buf; @@ -2063,14 +2029,9 @@ get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char ** initStringInfo(&buf); /* get relation name including any needed schema prefix and quoting */ - relname = generate_relation_name(relid); + relname = generate_relation_name(rel); - /* - * Open relation using relid - */ - rel = relation_open(relid, AccessShareLock); - tupdesc = CreateTupleDescCopy(rel->rd_att); - relation_close(rel, AccessShareLock); + tupdesc = rel->rd_att; /* * Connect to SPI manager @@ -2141,50 +2102,49 @@ get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char ** return NULL; } -static Oid -get_relid_from_relname(text *relname_text) +/* + * Open the relation named by relname_text, acquire specified type of lock, + * verify we have specified permissions. + * Caller must close rel when done with it. + */ +static Relation +get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode) { RangeVar *relvar; Relation rel; - Oid relid; + AclResult aclresult; relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text)); - rel = heap_openrv(relvar, AccessShareLock); - relid = RelationGetRelid(rel); - relation_close(rel, AccessShareLock); + rel = heap_openrv(relvar, lockmode); - return relid; + aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(), + aclmode); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_CLASS, + RelationGetRelationName(rel)); + + return rel; } /* * generate_relation_name - copied from ruleutils.c - * Compute the name to display for a relation specified by OID + * Compute the name to display for a relation * * The result includes all necessary quoting and schema-prefixing. */ static char * -generate_relation_name(Oid relid) +generate_relation_name(Relation rel) { - HeapTuple tp; - Form_pg_class reltup; char *nspname; char *result; - tp = SearchSysCache1(RELOID, ObjectIdGetDatum(relid)); - if (!HeapTupleIsValid(tp)) - elog(ERROR, "cache lookup failed for relation %u", relid); - - reltup = (Form_pg_class) GETSTRUCT(tp); - /* Qualify the name if not visible in search path */ - if (RelationIsVisible(relid)) + if (RelationIsVisible(RelationGetRelid(rel))) nspname = NULL; else - nspname = get_namespace_name(reltup->relnamespace); + nspname = get_namespace_name(rel->rd_rel->relnamespace); - result = quote_qualified_identifier(nspname, NameStr(reltup->relname)); - - ReleaseSysCache(tp); + result = quote_qualified_identifier(nspname, RelationGetRelationName(rel)); return result; } @@ -2469,15 +2429,13 @@ escape_param_str(const char *str) } static int -get_nondropped_natts(Oid relid) +get_nondropped_natts(Relation rel) { int nondropped_natts = 0; TupleDesc tupdesc; - Relation rel; int natts; int i; - rel = relation_open(relid, AccessShareLock); tupdesc = rel->rd_att; natts = tupdesc->natts; @@ -2488,6 +2446,5 @@ get_nondropped_natts(Oid relid) nondropped_natts++; } - relation_close(rel, AccessShareLock); return nondropped_natts; } -- 2.40.0