From 8f337e86cd98b0f033aa823c6cbb3844ccb78dff Mon Sep 17 00:00:00 2001 From: Bruce Momjian Date: Wed, 25 Jun 2003 01:10:15 +0000 Subject: [PATCH] Please apply attached patch to contrib/dblink. It adds named persistent connections to dblink. Shridhar Daithanka --- contrib/dblink/README.dblink | 72 +-- contrib/dblink/dblink.c | 873 ++++++++++++----------------- contrib/dblink/dblink.h | 33 +- contrib/dblink/dblink.sql.in | 45 +- contrib/dblink/doc/connection | 28 +- contrib/dblink/doc/cursor | 35 +- contrib/dblink/doc/execute | 29 +- contrib/dblink/doc/query | 45 +- contrib/dblink/expected/dblink.out | 208 ++++++- contrib/dblink/sql/dblink.sql | 101 +++- 10 files changed, 855 insertions(+), 614 deletions(-) diff --git a/contrib/dblink/README.dblink b/contrib/dblink/README.dblink index af627901dc..7724aa9e91 100644 --- a/contrib/dblink/README.dblink +++ b/contrib/dblink/README.dblink @@ -4,8 +4,11 @@ * Functions returning results from a remote database * * Joe Conway + * And contributors: + * Darko Prenosil + * Shridhar Daithankar * - * Copyright (c) 2001, 2002 by PostgreSQL Global Development Group + * Copyright (c) 2001, 2002, 2003 by PostgreSQL Global Development Group * ALL RIGHTS RESERVED; * * Permission to use, copy, modify, and distribute this software and its @@ -27,14 +30,16 @@ * */ -Version 0.5 (25 August, 2002): - Major overhaul to work with new backend "table function" capability. Removed - dblink_strtok() and dblink_replace() functions because they are now - available as backend functions (split() and replace() respectively). - Tested under Linux (Red Hat 7.3) and PostgreSQL 7.3devel. This version - is no longer backwards portable to PostgreSQL 7.2. +Version 0.6 (14 June, 2003): + Completely removed previously deprecated functions. Added ability + to create "named" persistent connections in addition to the single global + "unnamed" persistent connection. + Tested under Linux (Red Hat 9) and PostgreSQL 7.4devel. Release Notes: + Version 0.6 + - functions deprecated in 0.5 have been removed + - added ability to create "named" persistent connections Version 0.5 - dblink now supports use directly as a table function; this is the new preferred usage going forward @@ -87,35 +92,51 @@ Installation: connection ------------ dblink_connect(text) RETURNS text - - opens a connection that will persist for duration of current + - opens an unnamed connection that will persist for duration of + current backend or until it is disconnected + dblink_connect(text,text) RETURNS text + - opens a named connection that will persist for duration of current backend or until it is disconnected dblink_disconnect() RETURNS text - - disconnects a persistent connection + - disconnects the unnamed persistent connection + dblink_disconnect(text) RETURNS text + - disconnects a named persistent connection cursor ------------ dblink_open(text,text) RETURNS text - - opens a cursor using connection already opened with dblink_connect() - that will persist for duration of current backend or until it is - closed + - opens a cursor using unnamed connection already opened with + dblink_connect() that will persist for duration of current backend + or until it is closed + dblink_open(text,text,text) RETURNS text + - opens a cursor using a named connection already opened with + dblink_connect() that will persist for duration of current backend + or until it is closed dblink_fetch(text, int) RETURNS setof record - - fetches data from an already opened cursor + - fetches data from an already opened cursor on the unnamed connection + dblink_fetch(text, text, int) RETURNS setof record + - fetches data from an already opened cursor on a named connection dblink_close(text) RETURNS text - - closes a cursor + - closes a cursor on the unnamed connection + dblink_close(text,text) RETURNS text + - closes a cursor on a named connection query ------------ dblink(text,text) RETURNS setof record - - returns a set of results from remote SELECT query - (Note: comment out in dblink.sql to use deprecated version) + - returns a set of results from remote SELECT query; the first argument + is either a connection string, or the name of an already opened + persistant connection dblink(text) RETURNS setof record - - returns a set of results from remote SELECT query, using connection - already opened with dblink_connect() + - returns a set of results from remote SELECT query, using the unnamed + connection already opened with dblink_connect() execute ------------ dblink_exec(text, text) RETURNS text - - executes an INSERT/UPDATE/DELETE query remotely + - executes an INSERT/UPDATE/DELETE query remotely; the first argument + is either a connection string, or the name of an already opened + persistant connection dblink_exec(text) RETURNS text - executes an INSERT/UPDATE/DELETE query remotely, using connection already opened with dblink_connect() @@ -136,19 +157,6 @@ Installation: - builds an update statement using a local tuple, replacing the selection key field values with alternate supplied values - Not installed by default - deprecated - ------------ - dblink(text,text) RETURNS setof int - - *DEPRECATED* returns a resource id for results from remote query - (Note: must uncomment in dblink.sql to use) - dblink_tok(int,int) RETURNS text - - *DEPRECATED* extracts and returns individual field results; used - only in conjunction with the *DEPRECATED* form of dblink - (Note: must uncomment in dblink.sql to use) - dblink_last_oid(int) RETURNS oid - - *DEPRECATED* returns the last inserted oid - Documentation: Note: Parameters representing relation names must include double diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index a8e9c5ab50..acddd1d469 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -4,8 +4,11 @@ * Functions returning results from a remote database * * Joe Conway + * And contributors: + * Darko Prenosil + * Shridhar Daithankar * - * Copyright (c) 2001, 2002 by PostgreSQL Global Development Group + * Copyright (c) 2001, 2002, 2003 by PostgreSQL Global Development Group * ALL RIGHTS RESERVED; * * Permission to use, copy, modify, and distribute this software and its @@ -27,9 +30,7 @@ * */ #include "postgres.h" - #include "libpq-fe.h" - #include "fmgr.h" #include "funcapi.h" #include "access/tupdesc.h" @@ -51,13 +52,27 @@ #include "utils/array.h" #include "utils/lsyscache.h" #include "utils/syscache.h" +#include "utils/palloc.h" +#include "utils/dynahash.h" +#include "utils/hsearch.h" +#include "utils/memutils.h" #include "dblink.h" +typedef struct remoteConn +{ + PGconn *con; /* Hold the remote connection */ + bool remoteTrFlag; /* Indicates whether or not a transaction + * on remote database is in progress*/ +} remoteConn; + /* * Internal declarations */ -static dblink_results *init_dblink_results(MemoryContext fn_mcxt); +static remoteConn *getConnectionByName(const char *name); +static HTAB *createConnHash(void); +static bool createNewConnection(const char *name,remoteConn *con); +static void deleteConnection(const char *name); static char **get_pkey_attnames(Oid relid, int16 *numatts); static char *get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals); static char *get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals); @@ -67,17 +82,32 @@ static char *quote_ident_cstr(char *rawstr); static int16 get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key); static HeapTuple get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals); static Oid get_relid_from_relname(text *relname_text); -static dblink_results *get_res_ptr(int32 res_id_index); -static void append_res_ptr(dblink_results * results); -static void remove_res_ptr(dblink_results * results); static TupleDesc pgresultGetTupleDesc(PGresult *res); static char *generate_relation_name(Oid relid); /* Global */ -List *res_id = NIL; -int res_id_index = 0; -PGconn *persistent_conn = NULL; +List *res_id = NIL; +int res_id_index = 0; +PGconn *persistent_conn = NULL; +static HTAB *remoteConnHash=NULL; + +/* +Following is list that holds multiple remote connections. +Calling convention of each dblink function changes to accept +connection name as the first parameter. The connection list is +much like ecpg e.g. a mapping between a name and a PGconn object. +*/ + +typedef struct remoteConnHashEnt +{ + char name[NAMEDATALEN]; + remoteConn *rcon; +} remoteConnHashEnt; + +/* initial number of connection hashes */ +#define NUMCONN 16 +/* general utility */ #define GET_TEXT(cstrp) DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(cstrp))) #define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp))) #define xpfree(var_) \ @@ -88,6 +118,41 @@ PGconn *persistent_conn = NULL; var_ = NULL; \ } \ } while (0) +#define DBLINK_RES_ERROR(p1, p2) \ + do { \ + msg = pstrdup(PQerrorMessage(conn)); \ + if (res) \ + PQclear(res); \ + elog(ERROR, "%s: %s: %s", p1, p2, msg); \ + } while (0) +#define DBLINK_CONN_NOT_AVAIL(p1) \ + do { \ + if(conname) \ + elog(ERROR, "%s: connection %s not available", p1, conname); \ + else \ + elog(ERROR, "%s: connection not available", p1); \ + } while (0) +#define DBLINK_GET_CONN(p1) \ + do { \ + char *conname_or_str = GET_STR(PG_GETARG_TEXT_P(0)); \ + rcon = getConnectionByName(conname_or_str); \ + if(rcon) \ + { \ + conn = rcon->con; \ + freeconn = false; \ + } \ + else \ + { \ + connstr = conname_or_str; \ + conn = PQconnectdb(connstr); \ + if (PQstatus(conn) == CONNECTION_BAD) \ + { \ + msg = pstrdup(PQerrorMessage(conn)); \ + PQfinish(conn); \ + elog(ERROR, "%s: connection error: %s", p1, msg); \ + } \ + } \ + } while (0) /* @@ -97,28 +162,52 @@ PG_FUNCTION_INFO_V1(dblink_connect); Datum dblink_connect(PG_FUNCTION_ARGS) { - char *connstr = GET_STR(PG_GETARG_TEXT_P(0)); + char *connstr = NULL; + char *connname = NULL; char *msg; - text *result_text; MemoryContext oldcontext; + PGconn *conn = NULL; + remoteConn *rcon = NULL; - if (persistent_conn != NULL) - PQfinish(persistent_conn); + if(PG_NARGS()==2) + { + connstr = GET_STR(PG_GETARG_TEXT_P(1)); + connname = GET_STR(PG_GETARG_TEXT_P(0)); + } + else if(PG_NARGS()==1) + connstr = GET_STR(PG_GETARG_TEXT_P(0)); oldcontext = MemoryContextSwitchTo(TopMemoryContext); - persistent_conn = PQconnectdb(connstr); + + if(connname) + rcon=(remoteConn *) palloc(sizeof(remoteConn)); + conn = PQconnectdb(connstr); + MemoryContextSwitchTo(oldcontext); - if (PQstatus(persistent_conn) == CONNECTION_BAD) + if (PQstatus(conn) == CONNECTION_BAD) { - msg = pstrdup(PQerrorMessage(persistent_conn)); - PQfinish(persistent_conn); - persistent_conn = NULL; + msg = pstrdup(PQerrorMessage(conn)); + PQfinish(conn); + if(rcon) + pfree(rcon); elog(ERROR, "dblink_connect: connection error: %s", msg); } - result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK"))); - PG_RETURN_TEXT_P(result_text); + if(connname) + { + rcon->con = conn; + if(createNewConnection(connname, rcon) == false) + { + PQfinish(conn); + pfree(rcon); + elog(ERROR, "dblink_connect: cannot save named connection"); + } + } + else + persistent_conn = conn; + + PG_RETURN_TEXT_P(GET_TEXT("OK")); } /* @@ -128,15 +217,37 @@ PG_FUNCTION_INFO_V1(dblink_disconnect); Datum dblink_disconnect(PG_FUNCTION_ARGS) { - text *result_text; + char *str = NULL; + remoteConn *rcon = NULL; + PGconn *conn = NULL; + + if (PG_NARGS() ==1 ) + { + str = GET_STR(PG_GETARG_TEXT_P(0)); + rcon = getConnectionByName(str); + if (rcon) + conn = rcon->con; + } + else + conn = persistent_conn; - if (persistent_conn != NULL) - PQfinish(persistent_conn); + if (!conn) + { + if (str) + elog(ERROR,"dblink_disconnect: connection named \"%s\" not found", + str); + else + elog(ERROR,"dblink_disconnect: connection not found"); + } - persistent_conn = NULL; + PQfinish(conn); + if (rcon) + { + deleteConnection(str); + pfree(rcon); + } - result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK"))); - PG_RETURN_TEXT_P(result_text); + PG_RETURN_TEXT_P(GET_TEXT("OK")); } /* @@ -149,27 +260,35 @@ dblink_open(PG_FUNCTION_ARGS) char *msg; PGresult *res = NULL; PGconn *conn = NULL; - text *result_text; - char *curname = GET_STR(PG_GETARG_TEXT_P(0)); - char *sql = GET_STR(PG_GETARG_TEXT_P(1)); + char *curname = NULL; + char *sql = NULL; + char *conname = NULL; StringInfo str = makeStringInfo(); + remoteConn *rcon = NULL; - if (persistent_conn != NULL) + if(PG_NARGS() == 2) + { + curname = GET_STR(PG_GETARG_TEXT_P(0)); + sql = GET_STR(PG_GETARG_TEXT_P(1)); conn = persistent_conn; - else - elog(ERROR, "dblink_open: no connection available"); + } + else if(PG_NARGS() == 3) + { + conname = GET_STR(PG_GETARG_TEXT_P(0)); + curname = GET_STR(PG_GETARG_TEXT_P(1)); + sql = GET_STR(PG_GETARG_TEXT_P(2)); + rcon = getConnectionByName(conname); + if (rcon) + conn = rcon->con; + } + + if (!conn) + DBLINK_CONN_NOT_AVAIL("dblink_open"); res = PQexec(conn, "BEGIN"); if (PQresultStatus(res) != PGRES_COMMAND_OK) - { - msg = pstrdup(PQerrorMessage(conn)); - PQclear(res); + DBLINK_RES_ERROR("dblink_open", "begin error"); - PQfinish(conn); - persistent_conn = NULL; - - elog(ERROR, "dblink_open: begin error: %s", msg); - } PQclear(res); appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql); @@ -177,19 +296,11 @@ dblink_open(PG_FUNCTION_ARGS) if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) - { - msg = pstrdup(PQerrorMessage(conn)); - - PQclear(res); - - PQfinish(conn); - persistent_conn = NULL; + DBLINK_RES_ERROR("dblink_open", "sql error"); - elog(ERROR, "dblink: sql error: %s", msg); - } + PQclear(res); - result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK"))); - PG_RETURN_TEXT_P(result_text); + PG_RETURN_TEXT_P(GET_TEXT("OK")); } /* @@ -201,49 +312,46 @@ dblink_close(PG_FUNCTION_ARGS) { PGconn *conn = NULL; PGresult *res = NULL; - char *curname = GET_STR(PG_GETARG_TEXT_P(0)); + char *curname = NULL; + char *conname = NULL; StringInfo str = makeStringInfo(); - text *result_text; char *msg; + remoteConn *rcon = NULL; - if (persistent_conn != NULL) + if (PG_NARGS() == 1) + { + curname = GET_STR(PG_GETARG_TEXT_P(0)); conn = persistent_conn; - else - elog(ERROR, "dblink_close: no connection available"); + } + else if (PG_NARGS()==2) + { + conname = GET_STR(PG_GETARG_TEXT_P(0)); + curname = GET_STR(PG_GETARG_TEXT_P(1)); + rcon = getConnectionByName(conname); + if(rcon) + conn = rcon->con; + } + + if (!conn) + DBLINK_CONN_NOT_AVAIL("dblink_close"); appendStringInfo(str, "CLOSE %s", curname); /* close the cursor */ res = PQexec(conn, str->data); if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) - { - msg = pstrdup(PQerrorMessage(conn)); - PQclear(res); - - PQfinish(persistent_conn); - persistent_conn = NULL; - - elog(ERROR, "dblink_close: sql error: %s", msg); - } + DBLINK_RES_ERROR("dblink_close", "sql error"); PQclear(res); /* commit the transaction */ res = PQexec(conn, "COMMIT"); if (PQresultStatus(res) != PGRES_COMMAND_OK) - { - msg = pstrdup(PQerrorMessage(conn)); - PQclear(res); - - PQfinish(persistent_conn); - persistent_conn = NULL; + DBLINK_RES_ERROR("dblink_close", "commit error"); - elog(ERROR, "dblink_close: commit error: %s", msg); - } PQclear(res); - result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK"))); - PG_RETURN_TEXT_P(result_text); + PG_RETURN_TEXT_P(GET_TEXT("OK")); } /* @@ -262,6 +370,8 @@ dblink_fetch(PG_FUNCTION_ARGS) char *msg; PGresult *res = NULL; MemoryContext oldcontext; + char *conname = NULL; + remoteConn *rcon=NULL; /* stuff done only on the first call of the function */ if (SRF_IS_FIRSTCALL()) @@ -271,8 +381,28 @@ dblink_fetch(PG_FUNCTION_ARGS) Oid funcid = fcinfo->flinfo->fn_oid; PGconn *conn = NULL; StringInfo str = makeStringInfo(); - char *curname = GET_STR(PG_GETARG_TEXT_P(0)); - int howmany = PG_GETARG_INT32(1); + char *curname = NULL; + int howmany = 0; + + if (PG_NARGS() == 3) + { + conname = GET_STR(PG_GETARG_TEXT_P(0)); + curname = GET_STR(PG_GETARG_TEXT_P(1)); + howmany = PG_GETARG_INT32(2); + + rcon = getConnectionByName(conname); + if(rcon) + conn = rcon->con; + } + else if (PG_NARGS() == 2) + { + curname = GET_STR(PG_GETARG_TEXT_P(0)); + howmany = PG_GETARG_INT32(1); + conn = persistent_conn; + } + + if(!conn) + DBLINK_CONN_NOT_AVAIL("dblink_fetch"); /* create a function context for cross-call persistence */ funcctx = SRF_FIRSTCALL_INIT(); @@ -283,11 +413,6 @@ dblink_fetch(PG_FUNCTION_ARGS) */ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); - if (persistent_conn != NULL) - conn = persistent_conn; - else - elog(ERROR, "dblink_fetch: no connection available"); - appendStringInfo(str, "FETCH %d FROM %s", howmany, curname); res = PQexec(conn, str->data); @@ -295,19 +420,13 @@ dblink_fetch(PG_FUNCTION_ARGS) (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) { - msg = pstrdup(PQerrorMessage(conn)); - PQclear(res); - - PQfinish(persistent_conn); - persistent_conn = NULL; - - elog(ERROR, "dblink_fetch: sql error: %s", msg); + DBLINK_RES_ERROR("dblink_fetch", "sql error"); } else if (PQresultStatus(res) == PGRES_COMMAND_OK) { /* cursor does not exist - closed already or bad name */ PQclear(res); - elog(ERROR, "dblink_fetch: cursor %s does not exist", curname); + elog(ERROR, "dblink_fetch: cursor not found: %s", curname); } funcctx->max_calls = PQntuples(res); @@ -380,8 +499,8 @@ dblink_fetch(PG_FUNCTION_ARGS) SRF_RETURN_NEXT(funcctx, result); } else -/* do when there is no more left */ { + /* do when there is no more left */ PQclear(res); SRF_RETURN_DONE(funcctx); } @@ -405,6 +524,7 @@ dblink_record(PG_FUNCTION_ARGS) bool is_sql_cmd = false; char *sql_cmd_status = NULL; MemoryContext oldcontext; + bool freeconn = true; /* stuff done only on the first call of the function */ if (SRF_IS_FIRSTCALL()) @@ -415,6 +535,8 @@ dblink_record(PG_FUNCTION_ARGS) PGconn *conn = NULL; char *connstr = NULL; char *sql = NULL; + char *conname = NULL; + remoteConn *rcon=NULL; /* create a function context for cross-call persistence */ funcctx = SRF_FIRSTCALL_INIT(); @@ -425,70 +547,51 @@ dblink_record(PG_FUNCTION_ARGS) */ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); - if (fcinfo->nargs == 2) + if (PG_NARGS() == 2) { - connstr = GET_STR(PG_GETARG_TEXT_P(0)); + DBLINK_GET_CONN("dblink"); sql = GET_STR(PG_GETARG_TEXT_P(1)); - - conn = PQconnectdb(connstr); - if (PQstatus(conn) == CONNECTION_BAD) - { - msg = pstrdup(PQerrorMessage(conn)); - PQfinish(conn); - elog(ERROR, "dblink: connection error: %s", msg); - } } - else if (fcinfo->nargs == 1) + else if (PG_NARGS() == 1) { + conn = persistent_conn; sql = GET_STR(PG_GETARG_TEXT_P(0)); - - if (persistent_conn != NULL) - conn = persistent_conn; - else - elog(ERROR, "dblink: no connection available"); } else elog(ERROR, "dblink: wrong number of arguments"); + if(!conn) + DBLINK_CONN_NOT_AVAIL("dblink_record"); + res = PQexec(conn, sql); if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) + DBLINK_RES_ERROR("dblink", "sql error"); + + if (PQresultStatus(res) == PGRES_COMMAND_OK) { - msg = pstrdup(PQerrorMessage(conn)); - PQclear(res); - PQfinish(conn); - if (fcinfo->nargs == 1) - persistent_conn = NULL; + is_sql_cmd = true; + + /* need a tuple descriptor representing one TEXT column */ + tupdesc = CreateTemplateTupleDesc(1, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", + TEXTOID, -1, 0, false); - elog(ERROR, "dblink: sql error: %s", msg); + /* + * and save a copy of the command status string to return + * as our result tuple + */ + sql_cmd_status = PQcmdStatus(res); + funcctx->max_calls = 1; } else - { - if (PQresultStatus(res) == PGRES_COMMAND_OK) - { - is_sql_cmd = true; - - /* need a tuple descriptor representing one TEXT column */ - tupdesc = CreateTemplateTupleDesc(1, false); - TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", - TEXTOID, -1, 0, false); - - /* - * and save a copy of the command status string to return - * as our result tuple - */ - sql_cmd_status = PQcmdStatus(res); - funcctx->max_calls = 1; - } - else - funcctx->max_calls = PQntuples(res); + funcctx->max_calls = PQntuples(res); - /* got results, keep track of them */ - funcctx->user_fctx = res; + /* got results, keep track of them */ + funcctx->user_fctx = res; - /* if needed, close the connection to the database and cleanup */ - if (fcinfo->nargs == 2) - PQfinish(conn); - } + /* if needed, close the connection to the database and cleanup */ + if (freeconn && PG_NARGS() == 2) + PQfinish(conn); /* fast track when no results */ if (funcctx->max_calls < 1) @@ -567,8 +670,8 @@ dblink_record(PG_FUNCTION_ARGS) SRF_RETURN_NEXT(funcctx, result); } else -/* do when there is no more left */ { + /* do when there is no more left */ PQclear(res); SRF_RETURN_DONE(funcctx); } @@ -583,272 +686,62 @@ dblink_exec(PG_FUNCTION_ARGS) { char *msg; PGresult *res = NULL; - char *sql_cmd_status = NULL; + text *sql_cmd_status = NULL; TupleDesc tupdesc = NULL; - text *result_text; PGconn *conn = NULL; char *connstr = NULL; char *sql = NULL; + char *conname = NULL; + remoteConn *rcon=NULL; + bool freeconn = true; - if (fcinfo->nargs == 2) + if (PG_NARGS() == 2) { - connstr = GET_STR(PG_GETARG_TEXT_P(0)); + DBLINK_GET_CONN("dblink_exec"); sql = GET_STR(PG_GETARG_TEXT_P(1)); - - conn = PQconnectdb(connstr); - if (PQstatus(conn) == CONNECTION_BAD) - { - msg = pstrdup(PQerrorMessage(conn)); - PQfinish(conn); - elog(ERROR, "dblink_exec: connection error: %s", msg); - } } - else if (fcinfo->nargs == 1) + else if (PG_NARGS() == 1) { + conn = persistent_conn; sql = GET_STR(PG_GETARG_TEXT_P(0)); - - if (persistent_conn != NULL) - conn = persistent_conn; - else - elog(ERROR, "dblink_exec: no connection available"); } else elog(ERROR, "dblink_exec: wrong number of arguments"); + if(!conn) + DBLINK_CONN_NOT_AVAIL("dblink_exec"); res = PQexec(conn, sql); - if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) + if (!res || + (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK)) + DBLINK_RES_ERROR("dblink_exec", "sql error"); + + if (PQresultStatus(res) == PGRES_COMMAND_OK) { - msg = pstrdup(PQerrorMessage(conn)); - PQclear(res); - PQfinish(conn); - if (fcinfo->nargs == 1) - persistent_conn = NULL; + /* need a tuple descriptor representing one TEXT column */ + tupdesc = CreateTemplateTupleDesc(1, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", + TEXTOID, -1, 0, false); - elog(ERROR, "dblink_exec: sql error: %s", msg); + /* + * and save a copy of the command status string to return as + * our result tuple + */ + sql_cmd_status = GET_TEXT(PQcmdStatus(res)); } else - { - if (PQresultStatus(res) == PGRES_COMMAND_OK) - { - /* need a tuple descriptor representing one TEXT column */ - tupdesc = CreateTemplateTupleDesc(1, false); - TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", - TEXTOID, -1, 0, false); + elog(ERROR, "dblink_exec: queries returning results not allowed"); - /* - * and save a copy of the command status string to return as - * our result tuple - */ - sql_cmd_status = PQcmdStatus(res); - } - else - elog(ERROR, "dblink_exec: queries returning results not allowed"); - } PQclear(res); /* if needed, close the connection to the database and cleanup */ - if (fcinfo->nargs == 2) + if (freeconn && fcinfo->nargs == 2) PQfinish(conn); - result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql_cmd_status))); - PG_RETURN_TEXT_P(result_text); + PG_RETURN_TEXT_P(sql_cmd_status); } -/* - * Note: this original version of dblink is DEPRECATED; - * it *will* be removed in favor of the new version on next release - */ -PG_FUNCTION_INFO_V1(dblink); -Datum -dblink(PG_FUNCTION_ARGS) -{ - PGconn *conn = NULL; - PGresult *res = NULL; - dblink_results *results; - char *optstr; - char *sqlstatement; - char *execstatement; - char *msg; - int ntuples = 0; - ReturnSetInfo *rsi; - - if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo)) - elog(ERROR, "dblink: function called in context that does not accept a set result"); - - optstr = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0)))); - sqlstatement = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1)))); - - if (fcinfo->flinfo->fn_extra == NULL) - { - - conn = PQconnectdb(optstr); - if (PQstatus(conn) == CONNECTION_BAD) - { - msg = pstrdup(PQerrorMessage(conn)); - PQfinish(conn); - elog(ERROR, "dblink: connection error: %s", msg); - } - - execstatement = (char *) palloc(strlen(sqlstatement) + 1); - if (execstatement != NULL) - { - strcpy(execstatement, sqlstatement); - strcat(execstatement, "\0"); - } - else - elog(ERROR, "dblink: insufficient memory"); - - res = PQexec(conn, execstatement); - if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) - { - msg = pstrdup(PQerrorMessage(conn)); - PQclear(res); - PQfinish(conn); - elog(ERROR, "dblink: sql error: %s", msg); - } - else - { - /* - * got results, start fetching them - */ - ntuples = PQntuples(res); - - /* - * increment resource index - */ - res_id_index++; - - results = init_dblink_results(fcinfo->flinfo->fn_mcxt); - results->tup_num = 0; - results->res_id_index = res_id_index; - results->res = res; - - /* - * Append node to res_id to hold pointer to results. Needed by - * dblink_tok to access the data - */ - append_res_ptr(results); - - /* - * save pointer to results for the next function manager call - */ - fcinfo->flinfo->fn_extra = (void *) results; - - /* close the connection to the database and cleanup */ - PQfinish(conn); - - rsi = (ReturnSetInfo *) fcinfo->resultinfo; - rsi->isDone = ExprMultipleResult; - - PG_RETURN_INT32(res_id_index); - } - } - else - { - /* - * check for more results - */ - results = fcinfo->flinfo->fn_extra; - - results->tup_num++; - res_id_index = results->res_id_index; - ntuples = PQntuples(results->res); - - if (results->tup_num < ntuples) - { - /* - * fetch them if available - */ - - rsi = (ReturnSetInfo *) fcinfo->resultinfo; - rsi->isDone = ExprMultipleResult; - - PG_RETURN_INT32(res_id_index); - } - else - { - /* - * or if no more, clean things up - */ - results = fcinfo->flinfo->fn_extra; - - remove_res_ptr(results); - PQclear(results->res); - pfree(results); - fcinfo->flinfo->fn_extra = NULL; - - rsi = (ReturnSetInfo *) fcinfo->resultinfo; - rsi->isDone = ExprEndResult; - - PG_RETURN_NULL(); - } - } - PG_RETURN_NULL(); -} - -/* - * Note: dblink_tok is DEPRECATED; - * it *will* be removed in favor of the new version on next release - * - * dblink_tok - * parse dblink output string - * return fldnum item (0 based) - * based on provided field separator - */ -PG_FUNCTION_INFO_V1(dblink_tok); -Datum -dblink_tok(PG_FUNCTION_ARGS) -{ - dblink_results *results; - int fldnum; - text *result_text; - char *result; - int nfields = 0; - int text_len = 0; - - results = get_res_ptr(PG_GETARG_INT32(0)); - if (results == NULL) - { - if (res_id != NIL) - { - freeList(res_id); - res_id = NIL; - res_id_index = 0; - } - - elog(ERROR, "dblink_tok: function called with invalid resource id"); - } - - fldnum = PG_GETARG_INT32(1); - if (fldnum < 0) - elog(ERROR, "dblink_tok: field number < 0 not permitted"); - - nfields = PQnfields(results->res); - if (fldnum > (nfields - 1)) - elog(ERROR, "dblink_tok: field number %d does not exist", fldnum); - - if (PQgetisnull(results->res, results->tup_num, fldnum) == 1) - PG_RETURN_NULL(); - else - { - text_len = PQgetlength(results->res, results->tup_num, fldnum); - - result = (char *) palloc(text_len + 1); - - if (result != NULL) - { - strcpy(result, PQgetvalue(results->res, results->tup_num, fldnum)); - strcat(result, "\0"); - } - else - elog(ERROR, "dblink: insufficient memory"); - - result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result))); - - PG_RETURN_TEXT_P(result_text); - } -} /* * dblink_get_pkey @@ -923,7 +816,7 @@ dblink_get_pkey(PG_FUNCTION_ARGS) funcctx->user_fctx = results; } else -/* fast track when no results */ + /* fast track when no results */ SRF_RETURN_DONE(funcctx); MemoryContextSwitchTo(oldcontext); @@ -965,37 +858,10 @@ dblink_get_pkey(PG_FUNCTION_ARGS) SRF_RETURN_NEXT(funcctx, result); } else -/* do when there is no more left */ - SRF_RETURN_DONE(funcctx); -} - -/* - * Note: dblink_last_oid is DEPRECATED; - * it *will* be removed on next release - * - * dblink_last_oid - * return last inserted oid - */ -PG_FUNCTION_INFO_V1(dblink_last_oid); -Datum -dblink_last_oid(PG_FUNCTION_ARGS) -{ - dblink_results *results; - - results = get_res_ptr(PG_GETARG_INT32(0)); - if (results == NULL) { - if (res_id != NIL) - { - freeList(res_id); - res_id = NIL; - res_id_index = 0; - } - - elog(ERROR, "dblink_tok: function called with invalid resource id"); + /* do when there is no more left */ + SRF_RETURN_DONE(funcctx); } - - PG_RETURN_OID(PQoidValue(results->res)); } @@ -1043,7 +909,6 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS) int i; char *ptr; char *sql; - text *sql_text; int16 typlen; bool typbyval; char typalign; @@ -1138,15 +1003,10 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS) */ sql = get_sql_insert(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals); - /* - * Make it into TEXT for return to the client - */ - sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql))); - /* * And send it */ - PG_RETURN_TEXT_P(sql_text); + PG_RETURN_TEXT_P(GET_TEXT(sql)); } @@ -1182,7 +1042,6 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS) int i; char *ptr; char *sql; - text *sql_text; int16 typlen; bool typbyval; char typalign; @@ -1246,15 +1105,10 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS) */ sql = get_sql_delete(relid, pkattnums, pknumatts, tgt_pkattvals); - /* - * Make it into TEXT for return to the client - */ - sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql))); - /* * And send it */ - PG_RETURN_TEXT_P(sql_text); + PG_RETURN_TEXT_P(GET_TEXT(sql)); } @@ -1299,7 +1153,6 @@ dblink_build_sql_update(PG_FUNCTION_ARGS) int i; char *ptr; char *sql; - text *sql_text; int16 typlen; bool typbyval; char typalign; @@ -1394,15 +1247,10 @@ dblink_build_sql_update(PG_FUNCTION_ARGS) */ sql = get_sql_update(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals); - /* - * Make it into TEXT for return to the client - */ - sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql))); - /* * And send it */ - PG_RETURN_TEXT_P(sql_text); + PG_RETURN_TEXT_P(GET_TEXT(sql)); } /* @@ -1415,10 +1263,7 @@ PG_FUNCTION_INFO_V1(dblink_current_query); Datum dblink_current_query(PG_FUNCTION_ARGS) { - text *result_text; - - result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(debug_query_string))); - PG_RETURN_TEXT_P(result_text); + PG_RETURN_TEXT_P(GET_TEXT(debug_query_string)); } @@ -1427,29 +1272,6 @@ dblink_current_query(PG_FUNCTION_ARGS) */ -/* - * init_dblink_results - * - create an empty dblink_results data structure - */ -static dblink_results * -init_dblink_results(MemoryContext fn_mcxt) -{ - MemoryContext oldcontext; - dblink_results *retval; - - oldcontext = MemoryContextSwitchTo(fn_mcxt); - - retval = (dblink_results *) palloc0(sizeof(dblink_results)); - - retval->tup_num = -1; - retval->res_id_index = -1; - retval->res = NULL; - - MemoryContextSwitchTo(oldcontext); - - return retval; -} - /* * get_pkey_attnames * @@ -1488,7 +1310,10 @@ get_pkey_attnames(Oid relid, int16 *numatts) /* we're only interested if it is the primary key */ if (index->indisprimary == TRUE) { - *numatts = index->indnatts; + i = 0; + while (index->indkey[i++] != 0) + (*numatts)++; + if (*numatts > 0) { result = (char **) palloc(*numatts * sizeof(char *)); @@ -1907,52 +1732,6 @@ get_relid_from_relname(text *relname_text) return relid; } -static dblink_results * -get_res_ptr(int32 res_id_index) -{ - List *ptr; - - /* - * short circuit empty list - */ - if (res_id == NIL) - return NULL; - - /* - * OK, should be good to go - */ - foreach(ptr, res_id) - { - dblink_results *this_res_id = (dblink_results *) lfirst(ptr); - - if (this_res_id->res_id_index == res_id_index) - return this_res_id; - } - return NULL; -} - -/* - * Add node to global List res_id - */ -static void -append_res_ptr(dblink_results * results) -{ - res_id = lappend(res_id, results); -} - -/* - * Remove node from global List - * using res_id_index - */ -static void -remove_res_ptr(dblink_results * results) -{ - res_id = lremove(results, res_id); - - if (res_id == NIL) - res_id_index = 0; -} - static TupleDesc pgresultGetTupleDesc(PGresult *res) { @@ -2039,3 +1818,91 @@ generate_relation_name(Oid relid) return result; } + + +static remoteConn * +getConnectionByName(const char *name) +{ + remoteConnHashEnt *hentry; + char key[NAMEDATALEN]; + + if(!remoteConnHash) + remoteConnHash=createConnHash(); + + MemSet(key, 0, NAMEDATALEN); + snprintf(key, NAMEDATALEN - 1, "%s", name); + hentry = (remoteConnHashEnt*) hash_search(remoteConnHash, + key, HASH_FIND, NULL); + + if(hentry) + return(hentry->rcon); + + return(NULL); +} + +static HTAB * +createConnHash(void) +{ + HASHCTL ctl; + HTAB *ptr; + + ctl.keysize = NAMEDATALEN; + ctl.entrysize = sizeof(remoteConnHashEnt); + + ptr=hash_create("Remote Con hash", NUMCONN, &ctl, HASH_ELEM); + + if(!ptr) + elog(ERROR,"Can not create connections hash table. Out of memory"); + + return(ptr); +} + +static bool +createNewConnection(const char *name, remoteConn *con) +{ + remoteConnHashEnt *hentry; + bool found; + char key[NAMEDATALEN]; + + if(!remoteConnHash) + remoteConnHash=createConnHash(); + + MemSet(key, 0, NAMEDATALEN); + snprintf(key, NAMEDATALEN - 1, "%s", name); + hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key, + HASH_ENTER, &found); + + if(!hentry) + elog(ERROR, "failed to create connection"); + + if(found) + { + elog(NOTICE, "cannot use a connection name more than once"); + return false; + } + + hentry->rcon = con; + strncpy(hentry->name, name, NAMEDATALEN - 1); + + return true; +} + +static void +deleteConnection(const char *name) +{ + remoteConnHashEnt *hentry; + bool found; + char key[NAMEDATALEN]; + + if(!remoteConnHash) + remoteConnHash=createConnHash(); + + MemSet(key, 0, NAMEDATALEN); + snprintf(key, NAMEDATALEN - 1, "%s", name); + + hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, + key, HASH_REMOVE, &found); + + if(!hentry) + elog(WARNING,"Trying to delete a connection that does not exist"); +} diff --git a/contrib/dblink/dblink.h b/contrib/dblink/dblink.h index 3e9119f81a..4da345cbae 100644 --- a/contrib/dblink/dblink.h +++ b/contrib/dblink/dblink.h @@ -4,8 +4,11 @@ * Functions returning results from a remote database * * Joe Conway + * And contributors: + * Darko Prenosil + * Shridhar Daithankar * - * Copyright (c) 2001, 2002 by PostgreSQL Global Development Group + * Copyright (c) 2001, 2002, 2003 by PostgreSQL Global Development Group * ALL RIGHTS RESERVED; * * Permission to use, copy, modify, and distribute this software and its @@ -30,36 +33,9 @@ #ifndef DBLINK_H #define DBLINK_H -/* - * This struct holds the results of the remote query. - * Use fn_extra to hold a pointer to it across calls - */ -typedef struct -{ - /* - * last tuple number accessed - */ - int tup_num; - - /* - * resource index number for this context - */ - int res_id_index; - - /* - * the actual query results - */ - PGresult *res; -} dblink_results; - /* * External declarations */ -/* deprecated */ -extern Datum dblink(PG_FUNCTION_ARGS); -extern Datum dblink_tok(PG_FUNCTION_ARGS); - -/* supported */ extern Datum dblink_connect(PG_FUNCTION_ARGS); extern Datum dblink_disconnect(PG_FUNCTION_ARGS); extern Datum dblink_open(PG_FUNCTION_ARGS); @@ -68,7 +44,6 @@ extern Datum dblink_fetch(PG_FUNCTION_ARGS); extern Datum dblink_record(PG_FUNCTION_ARGS); extern Datum dblink_exec(PG_FUNCTION_ARGS); extern Datum dblink_get_pkey(PG_FUNCTION_ARGS); -extern Datum dblink_last_oid(PG_FUNCTION_ARGS); extern Datum dblink_build_sql_insert(PG_FUNCTION_ARGS); extern Datum dblink_build_sql_delete(PG_FUNCTION_ARGS); extern Datum dblink_build_sql_update(PG_FUNCTION_ARGS); diff --git a/contrib/dblink/dblink.sql.in b/contrib/dblink/dblink.sql.in index 42e483de34..cd670390d7 100644 --- a/contrib/dblink/dblink.sql.in +++ b/contrib/dblink/dblink.sql.in @@ -1,50 +1,53 @@ --- --- Uncomment the following commented lines to use original DEPRECATED functions --- ---CREATE OR REPLACE FUNCTION dblink (text,text) ---RETURNS setof int ---AS 'MODULE_PATHNAME','dblink' ---LANGUAGE 'C' WITH (isstrict); ---CREATE OR REPLACE FUNCTION dblink_tok (int,int) ---RETURNS text ---AS 'MODULE_PATHNAME','dblink_tok' ---LANGUAGE 'C' WITH (isstrict); ---CREATE OR REPLACE FUNCTION dblink_last_oid (int) ---RETURNS oid ---AS 'MODULE_PATHNAME','dblink_last_oid' ---LANGUAGE 'C' WITH (isstrict); - CREATE OR REPLACE FUNCTION dblink_connect (text) RETURNS text AS 'MODULE_PATHNAME','dblink_connect' LANGUAGE 'C' WITH (isstrict); +CREATE OR REPLACE FUNCTION dblink_connect (text, text) +RETURNS text +AS 'MODULE_PATHNAME','dblink_connect' +LANGUAGE 'C' WITH (isstrict); + CREATE OR REPLACE FUNCTION dblink_disconnect () RETURNS text AS 'MODULE_PATHNAME','dblink_disconnect' LANGUAGE 'C' WITH (isstrict); +CREATE OR REPLACE FUNCTION dblink_disconnect (text) +RETURNS text +AS 'MODULE_PATHNAME','dblink_disconnect' +LANGUAGE 'C' WITH (isstrict); + CREATE OR REPLACE FUNCTION dblink_open (text,text) RETURNS text AS 'MODULE_PATHNAME','dblink_open' LANGUAGE 'C' WITH (isstrict); +CREATE OR REPLACE FUNCTION dblink_open (text,text,text) +RETURNS text +AS 'MODULE_PATHNAME','dblink_open' +LANGUAGE 'C' WITH (isstrict); + CREATE OR REPLACE FUNCTION dblink_fetch (text,int) RETURNS setof record AS 'MODULE_PATHNAME','dblink_fetch' LANGUAGE 'C' WITH (isstrict); +CREATE OR REPLACE FUNCTION dblink_fetch (text,text,int) +RETURNS setof record +AS 'MODULE_PATHNAME','dblink_fetch' +LANGUAGE 'C' WITH (isstrict); + CREATE OR REPLACE FUNCTION dblink_close (text) RETURNS text AS 'MODULE_PATHNAME','dblink_close' LANGUAGE 'C' WITH (isstrict); --- Note: if this is not a first time install of dblink, uncomment the --- following DROP which prepares the database for the new, non-deprecated --- version. ---DROP FUNCTION dblink (text,text); +CREATE OR REPLACE FUNCTION dblink_close (text,text) +RETURNS text +AS 'MODULE_PATHNAME','dblink_close' +LANGUAGE 'C' WITH (isstrict); --- Comment out the following 3 lines if the DEPRECATED functions are used. CREATE OR REPLACE FUNCTION dblink (text,text) RETURNS setof record AS 'MODULE_PATHNAME','dblink_record' diff --git a/contrib/dblink/doc/connection b/contrib/dblink/doc/connection index 3a749d8903..251bd93961 100644 --- a/contrib/dblink/doc/connection +++ b/contrib/dblink/doc/connection @@ -6,21 +6,35 @@ dblink_connect -- Opens a persistent connection to a remote database Synopsis dblink_connect(text connstr) +dblink_connect(text connname, text connstr) Inputs + connname + if 2 arguments are given, the first is used as a name for a persistent + connection + connstr standard libpq format connection string, e.g. "hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd" + if only one argument is given, the connection is unnamed; only one unnamed + connection can exist at a time + Outputs Returns status = "OK" Example usage -test=# select dblink_connect('dbname=template1'); +select dblink_connect('dbname=template1'); + dblink_connect +---------------- + OK +(1 row) + +select dblink_connect('myconn','dbname=template1'); dblink_connect ---------------- OK @@ -29,15 +43,18 @@ test=# select dblink_connect('dbname=template1'); ================================================================== Name -dblink_disconnect -- Closes the persistent connection to a remote database +dblink_disconnect -- Closes a persistent connection to a remote database Synopsis dblink_disconnect() +dblink_disconnect(text connname) Inputs - none + connname + if an argument is given, it is used as a name for a persistent + connection to close; otherwiase the unnamed connection is closed Outputs @@ -51,3 +68,8 @@ test=# select dblink_disconnect(); OK (1 row) +select dblink_disconnect('myconn'); + dblink_disconnect +------------------- + OK +(1 row) diff --git a/contrib/dblink/doc/cursor b/contrib/dblink/doc/cursor index 3bc6bdb2fe..7c9cc3cde2 100644 --- a/contrib/dblink/doc/cursor +++ b/contrib/dblink/doc/cursor @@ -6,9 +6,14 @@ dblink_open -- Opens a cursor on a remote database Synopsis dblink_open(text cursorname, text sql) +dblink_open(text connname, text cursorname, text sql) Inputs + connname + if three arguments are present, the first is taken as the specific + connection name to use; otherwise the unnamed connection is assumed + cursorname a reference name for the cursor @@ -52,9 +57,14 @@ dblink_fetch -- Returns a set from an open cursor on a remote database Synopsis dblink_fetch(text cursorname, int32 howmany) +dblink_fetch(text connname, text cursorname, int32 howmany) Inputs + connname + if three arguments are present, the first is taken as the specific + connection name to use; otherwise the unnamed connection is assumed + cursorname The reference name for the cursor @@ -123,9 +133,14 @@ dblink_close -- Closes a cursor on a remote database Synopsis dblink_close(text cursorname) +dblink_close(text connname, text cursorname) Inputs + connname + if two arguments are present, the first is taken as the specific + connection name to use; otherwise the unnamed connection is assumed + cursorname a reference name for the cursor @@ -135,7 +150,8 @@ Outputs Returns status = "OK" Note - dblink_connect(text connstr) must be executed first. + dblink_connect(text connstr) or dblink_connect(text connname, text connstr) + must be executed first. Example usage @@ -157,3 +173,20 @@ test=# select dblink_close('foo'); OK (1 row) +select dblink_connect('myconn','dbname=regression'); + dblink_connect +---------------- + OK +(1 row) + +select dblink_open('myconn','foo','select proname, prosrc from pg_proc'); + dblink_open +------------- + OK +(1 row) + +select dblink_close('myconn','foo'); + dblink_close +-------------- + OK +(1 row) diff --git a/contrib/dblink/doc/execute b/contrib/dblink/doc/execute index 27ed5e35a0..72a21276d9 100644 --- a/contrib/dblink/doc/execute +++ b/contrib/dblink/doc/execute @@ -6,22 +6,23 @@ dblink_exec -- Executes an UPDATE/INSERT/DELETE on a remote database Synopsis dblink_exec(text connstr, text sql) -- or - +dblink_exec(text connname, text sql) dblink_exec(text sql) Inputs + connname connstr + If two arguments are present, the first is first assumed to be a specific + connection name to use. If the name is not found, the argument is then + assumed to be a valid connection string, of standard libpq format, + e.g.: "hostaddr=127.0.0.1 dbname=mydb user=postgres password=mypasswd" - standard libpq format connection string, - e.g. "hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd" - If the second form is used, then the dblink_connect(text connstr) must be - executed first. + If only one argument is used, then the unnamed connection is used. sql sql statement that you wish to execute on the remote host, e.g.: - insert into foo values(0,'a','{"a0","b0","c0"}'); Outputs @@ -36,14 +37,26 @@ Notes Example usage -test=# select dblink_connect('dbname=dblink_test_slave'); +select dblink_connect('dbname=dblink_test_slave'); dblink_connect ---------------- OK (1 row) -test=# select dblink_exec('insert into foo values(21,''z'',''{"a0","b0","c0"}'');'); +select dblink_exec('insert into foo values(21,''z'',''{"a0","b0","c0"}'');'); dblink_exec ----------------- INSERT 943366 1 (1 row) + +select dblink_connect('myconn','dbname=regression'); + dblink_connect +---------------- + OK +(1 row) + +select dblink_exec('myconn','insert into foo values(21,''z'',''{"a0","b0","c0"}'');'); + dblink_exec +------------------ + INSERT 6432584 1 +(1 row) diff --git a/contrib/dblink/doc/query b/contrib/dblink/doc/query index 525ffab45a..9c81417741 100644 --- a/contrib/dblink/doc/query +++ b/contrib/dblink/doc/query @@ -6,17 +6,19 @@ dblink -- Returns a set from a remote database Synopsis dblink(text connstr, text sql) -- or - +dblink(text connname, text sql) dblink(text sql) Inputs + connname connstr + If two arguments are present, the first is first assumed to be a specific + connection name to use. If the name is not found, the argument is then + assumed to be a valid connection string, of standard libpq format, + e.g.: "hostaddr=127.0.0.1 dbname=mydb user=postgres password=mypasswd" - standard libpq format connection string, - e.g. "hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd" - If the second form is used, then the dblink_connect(text connstr) must be - executed first. + If only one argument is used, then the unnamed connection is used. sql @@ -29,7 +31,7 @@ Outputs Example usage -test=# select * from dblink('dbname=template1','select proname, prosrc from pg_proc') +select * from dblink('dbname=template1','select proname, prosrc from pg_proc') as t1(proname name, prosrc text) where proname like 'bytea%'; proname | prosrc ------------+------------ @@ -47,13 +49,13 @@ test=# select * from dblink('dbname=template1','select proname, prosrc from pg_p byteaout | byteaout (12 rows) -test=# select dblink_connect('dbname=template1'); +select dblink_connect('dbname=template1'); dblink_connect ---------------- OK (1 row) -test=# select * from dblink('select proname, prosrc from pg_proc') +select * from dblink('select proname, prosrc from pg_proc') as t1(proname name, prosrc text) where proname like 'bytea%'; proname | prosrc ------------+------------ @@ -71,6 +73,33 @@ test=# select * from dblink('select proname, prosrc from pg_proc') byteaout | byteaout (12 rows) +select dblink_connect('myconn','dbname=regression'); + dblink_connect +---------------- + OK +(1 row) + +select * from dblink('myconn','select proname, prosrc from pg_proc') + as t1(proname name, prosrc text) where proname like 'bytea%'; + proname | prosrc +------------+------------ + bytearecv | bytearecv + byteasend | byteasend + byteale | byteale + byteagt | byteagt + byteage | byteage + byteane | byteane + byteacmp | byteacmp + bytealike | bytealike + byteanlike | byteanlike + byteacat | byteacat + byteaeq | byteaeq + bytealt | bytealt + byteain | byteain + byteaout | byteaout +(14 rows) + + ================================================================== A more convenient way to use dblink may be to create a view: diff --git a/contrib/dblink/expected/dblink.out b/contrib/dblink/expected/dblink.out index 945f1bd16a..0beeeeeb84 100644 --- a/contrib/dblink/expected/dblink.out +++ b/contrib/dblink/expected/dblink.out @@ -106,11 +106,11 @@ WHERE t.a > 7; 9 | j | {a9,b9,c9} (2 rows) --- should generate "no connection available" error +-- should generate "connection not available" error SELECT * FROM dblink('SELECT * FROM foo') AS t(a int, b text, c text[]) WHERE t.a > 7; -ERROR: dblink: no connection available +ERROR: dblink_record: connection not available -- create a persistent connection SELECT dblink_connect('dbname=regression'); dblink_connect @@ -172,10 +172,10 @@ SELECT dblink_close('rmt_foo_cursor'); OK (1 row) --- should generate "cursor rmt_foo_cursor does not exist" error +-- should generate "cursor not found: rmt_foo_cursor" error SELECT * FROM dblink_fetch('rmt_foo_cursor',4) AS t(a int, b text, c text[]); -ERROR: dblink_fetch: cursor rmt_foo_cursor does not exist +ERROR: dblink_fetch: cursor not found: rmt_foo_cursor -- close the persistent connection SELECT dblink_disconnect(); dblink_disconnect @@ -183,11 +183,12 @@ SELECT dblink_disconnect(); OK (1 row) --- should generate "no connection available" error +-- should generate "no connection to the server" error SELECT * FROM dblink('SELECT * FROM foo') AS t(a int, b text, c text[]) WHERE t.a > 7; -ERROR: dblink: no connection available +ERROR: dblink: sql error: no connection to the server + -- put more data into our slave table, first using arbitrary connection syntax -- but truncate the actual return value so we can use diff to check for success SELECT substr(dblink_exec('dbname=regression','INSERT INTO foo VALUES(10,''k'',''{"a10","b10","c10"}'')'),1,6); @@ -268,3 +269,198 @@ SELECT dblink_disconnect(); OK (1 row) +-- +-- tests for the new named persistent connection syntax +-- +-- should generate "missing "=" after "myconn" in connection info string" error +SELECT * +FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]) +WHERE t.a > 7; +ERROR: dblink: connection error: missing "=" after "myconn" in connection info string + +-- create a named persistent connection +SELECT dblink_connect('myconn','dbname=regression'); + dblink_connect +---------------- + OK +(1 row) + +-- use the named persistent connection +SELECT * +FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]) +WHERE t.a > 7; + a | b | c +----+---+--------------- + 8 | i | {a8,b8,c8} + 9 | j | {a9,b9,c9} + 10 | k | {a10,b10,c10} +(3 rows) + +-- create a second named persistent connection +-- should error with "cannot save named connection" +SELECT dblink_connect('myconn','dbname=regression'); +NOTICE: cannot use a connection name more than once +ERROR: dblink_connect: cannot save named connection +-- create a second named persistent connection with a new name +SELECT dblink_connect('myconn2','dbname=regression'); + dblink_connect +---------------- + OK +(1 row) + +-- use the second named persistent connection +SELECT * +FROM dblink('myconn2','SELECT * FROM foo') AS t(a int, b text, c text[]) +WHERE t.a > 7; + a | b | c +----+---+--------------- + 8 | i | {a8,b8,c8} + 9 | j | {a9,b9,c9} + 10 | k | {a10,b10,c10} +(3 rows) + +-- close the second named persistent connection +SELECT dblink_disconnect('myconn2'); + dblink_disconnect +------------------- + OK +(1 row) + +-- open a cursor +SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo'); + dblink_open +------------- + OK +(1 row) + +-- fetch some data +SELECT * +FROM dblink_fetch('myconn','rmt_foo_cursor',4) AS t(a int, b text, c text[]); + a | b | c +---+---+------------ + 0 | a | {a0,b0,c0} + 1 | b | {a1,b1,c1} + 2 | c | {a2,b2,c2} + 3 | d | {a3,b3,c3} +(4 rows) + +SELECT * +FROM dblink_fetch('myconn','rmt_foo_cursor',4) AS t(a int, b text, c text[]); + a | b | c +---+---+------------ + 4 | e | {a4,b4,c4} + 5 | f | {a5,b5,c5} + 6 | g | {a6,b6,c6} + 7 | h | {a7,b7,c7} +(4 rows) + +-- this one only finds three rows left +SELECT * +FROM dblink_fetch('myconn','rmt_foo_cursor',4) AS t(a int, b text, c text[]); + a | b | c +----+---+--------------- + 8 | i | {a8,b8,c8} + 9 | j | {a9,b9,c9} + 10 | k | {a10,b10,c10} +(3 rows) + +-- close the cursor +SELECT dblink_close('myconn','rmt_foo_cursor'); + dblink_close +-------------- + OK +(1 row) + +-- should generate "cursor not found: rmt_foo_cursor" error +SELECT * +FROM dblink_fetch('myconn','rmt_foo_cursor',4) AS t(a int, b text, c text[]); +ERROR: dblink_fetch: cursor not found: rmt_foo_cursor +-- close the named persistent connection +SELECT dblink_disconnect('myconn'); + dblink_disconnect +------------------- + OK +(1 row) + +-- should generate "missing "=" after "myconn" in connection info string" error +SELECT * +FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]) +WHERE t.a > 7; +ERROR: dblink: connection error: missing "=" after "myconn" in connection info string + +-- create a named persistent connection +SELECT dblink_connect('myconn','dbname=regression'); + dblink_connect +---------------- + OK +(1 row) + +-- put more data into our slave table, using named persistent connection syntax +-- but truncate the actual return value so we can use diff to check for success +SELECT substr(dblink_exec('myconn','INSERT INTO foo VALUES(11,''l'',''{"a11","b11","c11"}'')'),1,6); + substr +-------- + INSERT +(1 row) + +-- let's see it +SELECT * +FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]); + a | b | c +----+---+--------------- + 0 | a | {a0,b0,c0} + 1 | b | {a1,b1,c1} + 2 | c | {a2,b2,c2} + 3 | d | {a3,b3,c3} + 4 | e | {a4,b4,c4} + 5 | f | {a5,b5,c5} + 6 | g | {a6,b6,c6} + 7 | h | {a7,b7,c7} + 8 | i | {a8,b8,c8} + 9 | j | {a9,b9,c9} + 10 | k | {a10,b10,c10} + 11 | l | {a11,b11,c11} +(12 rows) + +-- change some data +SELECT dblink_exec('myconn','UPDATE foo SET f3[2] = ''b99'' WHERE f1 = 11'); + dblink_exec +------------- + UPDATE 1 +(1 row) + +-- let's see it +SELECT * +FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]) +WHERE a = 11; + a | b | c +----+---+--------------- + 11 | l | {a11,b99,c11} +(1 row) + +-- delete some data +SELECT dblink_exec('myconn','DELETE FROM foo WHERE f1 = 11'); + dblink_exec +------------- + DELETE 1 +(1 row) + +-- let's see it +SELECT * +FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]) +WHERE a = 11; + a | b | c +---+---+--- +(0 rows) + +-- close the named persistent connection +SELECT dblink_disconnect('myconn'); + dblink_disconnect +------------------- + OK +(1 row) + +-- close the named persistent connection again +-- should get "connection named "myconn" not found" error +SELECT dblink_disconnect('myconn'); +ERROR: dblink_disconnect: connection named "myconn" not found diff --git a/contrib/dblink/sql/dblink.sql b/contrib/dblink/sql/dblink.sql index f041e0a770..6385a79e2b 100644 --- a/contrib/dblink/sql/dblink.sql +++ b/contrib/dblink/sql/dblink.sql @@ -68,7 +68,7 @@ SELECT * FROM dblink('dbname=regression','SELECT * FROM foo') AS t(a int, b text, c text[]) WHERE t.a > 7; --- should generate "no connection available" error +-- should generate "connection not available" error SELECT * FROM dblink('SELECT * FROM foo') AS t(a int, b text, c text[]) WHERE t.a > 7; @@ -98,14 +98,14 @@ FROM dblink_fetch('rmt_foo_cursor',4) AS t(a int, b text, c text[]); -- close the cursor SELECT dblink_close('rmt_foo_cursor'); --- should generate "cursor rmt_foo_cursor does not exist" error +-- should generate "cursor not found: rmt_foo_cursor" error SELECT * FROM dblink_fetch('rmt_foo_cursor',4) AS t(a int, b text, c text[]); -- close the persistent connection SELECT dblink_disconnect(); --- should generate "no connection available" error +-- should generate "no connection to the server" error SELECT * FROM dblink('SELECT * FROM foo') AS t(a int, b text, c text[]) WHERE t.a > 7; @@ -143,3 +143,98 @@ WHERE a = 11; -- close the persistent connection SELECT dblink_disconnect(); + +-- +-- tests for the new named persistent connection syntax +-- + +-- should generate "missing "=" after "myconn" in connection info string" error +SELECT * +FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]) +WHERE t.a > 7; + +-- create a named persistent connection +SELECT dblink_connect('myconn','dbname=regression'); + +-- use the named persistent connection +SELECT * +FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]) +WHERE t.a > 7; + +-- create a second named persistent connection +-- should error with "cannot save named connection" +SELECT dblink_connect('myconn','dbname=regression'); + +-- create a second named persistent connection with a new name +SELECT dblink_connect('myconn2','dbname=regression'); + +-- use the second named persistent connection +SELECT * +FROM dblink('myconn2','SELECT * FROM foo') AS t(a int, b text, c text[]) +WHERE t.a > 7; + +-- close the second named persistent connection +SELECT dblink_disconnect('myconn2'); + +-- open a cursor +SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo'); + +-- fetch some data +SELECT * +FROM dblink_fetch('myconn','rmt_foo_cursor',4) AS t(a int, b text, c text[]); + +SELECT * +FROM dblink_fetch('myconn','rmt_foo_cursor',4) AS t(a int, b text, c text[]); + +-- this one only finds three rows left +SELECT * +FROM dblink_fetch('myconn','rmt_foo_cursor',4) AS t(a int, b text, c text[]); + +-- close the cursor +SELECT dblink_close('myconn','rmt_foo_cursor'); + +-- should generate "cursor not found: rmt_foo_cursor" error +SELECT * +FROM dblink_fetch('myconn','rmt_foo_cursor',4) AS t(a int, b text, c text[]); + +-- close the named persistent connection +SELECT dblink_disconnect('myconn'); + +-- should generate "missing "=" after "myconn" in connection info string" error +SELECT * +FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]) +WHERE t.a > 7; + +-- create a named persistent connection +SELECT dblink_connect('myconn','dbname=regression'); + +-- put more data into our slave table, using named persistent connection syntax +-- but truncate the actual return value so we can use diff to check for success +SELECT substr(dblink_exec('myconn','INSERT INTO foo VALUES(11,''l'',''{"a11","b11","c11"}'')'),1,6); + +-- let's see it +SELECT * +FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]); + +-- change some data +SELECT dblink_exec('myconn','UPDATE foo SET f3[2] = ''b99'' WHERE f1 = 11'); + +-- let's see it +SELECT * +FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]) +WHERE a = 11; + +-- delete some data +SELECT dblink_exec('myconn','DELETE FROM foo WHERE f1 = 11'); + +-- let's see it +SELECT * +FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]) +WHERE a = 11; + +-- close the named persistent connection +SELECT dblink_disconnect('myconn'); + +-- close the named persistent connection again +-- should get "connection named "myconn" not found" error +SELECT dblink_disconnect('myconn'); -- 2.40.0