From 52a3ed9fac32e16f6061cbc49046c0bd97f8f77a Mon Sep 17 00:00:00 2001 From: Joe Conway Date: Sat, 2 Sep 2006 21:11:15 +0000 Subject: [PATCH] Added async query capability. Original patch by Kai Londenberg, modified by Joe Conway --- contrib/dblink/README.dblink | 74 +----- contrib/dblink/dblink.c | 404 ++++++++++++++++++++++------- contrib/dblink/dblink.h | 8 +- contrib/dblink/dblink.sql.in | 35 +++ contrib/dblink/doc/misc | 93 ++++++- contrib/dblink/doc/query | 122 +++++++++ contrib/dblink/expected/dblink.out | 87 +++++++ contrib/dblink/sql/dblink.sql | 29 +++ 8 files changed, 686 insertions(+), 166 deletions(-) diff --git a/contrib/dblink/README.dblink b/contrib/dblink/README.dblink index 9e1bdba6cb..c765cf5b97 100644 --- a/contrib/dblink/README.dblink +++ b/contrib/dblink/README.dblink @@ -7,6 +7,7 @@ * And contributors: * Darko Prenosil * Shridhar Daithankar + * Kai Londenberg (K.Londenberg@librics.de) * * Copyright (c) 2001-2006, PostgreSQL Global Development Group * ALL RIGHTS RESERVED; @@ -31,6 +32,9 @@ */ Release Notes: + 27 August 2006 + - Added async query capability. Original patch by + Kai Londenberg (K.Londenberg@librics.de), modified by Joe Conway Version 0.7 (as of 25 Feb, 2004) - Added new version of dblink, dblink_exec, dblink_open, dblink_close, and, dblink_fetch -- allows ERROR on remote side of connection to @@ -85,75 +89,7 @@ Installation: psql template1 < dblink.sql - installs following functions into database template1: - - connection - ------------ - dblink_connect(text) RETURNS text - - opens an unnamed connection that will persist for duration of - current backend or until it is disconnected - dblink_connect(text,text) RETURNS text - - opens a named connection that will persist for duration of current - backend or until it is disconnected - dblink_disconnect() RETURNS text - - disconnects the unnamed persistent connection - dblink_disconnect(text) RETURNS text - - disconnects a named persistent connection - - cursor - ------------ - dblink_open(text,text [, bool fail_on_error]) RETURNS text - - opens a cursor using unnamed connection already opened with - dblink_connect() that will persist for duration of current backend - or until it is closed - dblink_open(text,text,text [, bool fail_on_error]) RETURNS text - - opens a cursor using a named connection already opened with - dblink_connect() that will persist for duration of current backend - or until it is closed - dblink_fetch(text, int [, bool fail_on_error]) RETURNS setof record - - fetches data from an already opened cursor on the unnamed connection - dblink_fetch(text, text, int [, bool fail_on_error]) RETURNS setof record - - fetches data from an already opened cursor on a named connection - dblink_close(text [, bool fail_on_error]) RETURNS text - - closes a cursor on the unnamed connection - dblink_close(text,text [, bool fail_on_error]) RETURNS text - - closes a cursor on a named connection - - query - ------------ - dblink(text,text [, bool fail_on_error]) RETURNS setof record - - returns a set of results from remote SELECT query; the first argument - is either a connection string, or the name of an already opened - persistant connection - dblink(text [, bool fail_on_error]) RETURNS setof record - - returns a set of results from remote SELECT query, using the unnamed - connection already opened with dblink_connect() - - execute - ------------ - dblink_exec(text, text [, bool fail_on_error]) RETURNS text - - executes an INSERT/UPDATE/DELETE query remotely; the first argument - is either a connection string, or the name of an already opened - persistant connection - dblink_exec(text [, bool fail_on_error]) RETURNS text - - executes an INSERT/UPDATE/DELETE query remotely, using connection - already opened with dblink_connect() - - misc - ------------ - dblink_current_query() RETURNS text - - returns the current query string - dblink_get_pkey(text) RETURNS setof text - - returns the field names of a relation's primary key fields - dblink_build_sql_insert(text,int2vector,int2,_text,_text) RETURNS text - - builds an insert statement using a local tuple, replacing the - selection key field values with alternate supplied values - dblink_build_sql_delete(text,int2vector,int2,_text) RETURNS text - - builds a delete statement using supplied values for selection - key field values - dblink_build_sql_update(text,int2vector,int2,_text,_text) RETURNS text - - builds an update statement using a local tuple, replacing the - selection key field values with alternate supplied values + installs dblink functions into database template1 Documentation: diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 3405ddeaa1..7a46673b6b 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -8,7 +8,7 @@ * Darko Prenosil * Shridhar Daithankar * - * $PostgreSQL: pgsql/contrib/dblink/dblink.c,v 1.57 2006/07/11 16:35:31 momjian Exp $ + * $PostgreSQL: pgsql/contrib/dblink/dblink.c,v 1.58 2006/09/02 21:11:15 joe Exp $ * Copyright (c) 2001-2006, PostgreSQL Global Development Group * ALL RIGHTS RESERVED; * @@ -73,6 +73,7 @@ typedef struct remoteConn /* * Internal declarations */ +static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get); static remoteConn *getConnectionByName(const char *name); static HTAB *createConnHash(void); static void createNewConnection(const char *name, remoteConn * rconn); @@ -690,6 +691,26 @@ dblink_fetch(PG_FUNCTION_ARGS) PG_FUNCTION_INFO_V1(dblink_record); Datum dblink_record(PG_FUNCTION_ARGS) +{ + return dblink_record_internal(fcinfo, false, false); +} + +PG_FUNCTION_INFO_V1(dblink_send_query); +Datum +dblink_send_query(PG_FUNCTION_ARGS) +{ + return dblink_record_internal(fcinfo, true, false); +} + +PG_FUNCTION_INFO_V1(dblink_get_result); +Datum +dblink_get_result(PG_FUNCTION_ARGS) +{ + return dblink_record_internal(fcinfo, true, true); +} + +static Datum +dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get) { FuncCallContext *funcctx; TupleDesc tupdesc = NULL; @@ -723,128 +744,187 @@ dblink_record(PG_FUNCTION_ARGS) */ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); - if (PG_NARGS() == 3) + if (!is_async) { - /* text,text,bool */ - DBLINK_GET_CONN; - sql = GET_STR(PG_GETARG_TEXT_P(1)); - fail = PG_GETARG_BOOL(2); - } - else if (PG_NARGS() == 2) - { - /* text,text or text,bool */ - if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) + if (PG_NARGS() == 3) { + /* text,text,bool */ + DBLINK_GET_CONN; + sql = GET_STR(PG_GETARG_TEXT_P(1)); + fail = PG_GETARG_BOOL(2); + } + else if (PG_NARGS() == 2) + { + /* text,text or text,bool */ + if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) + { + conn = pconn->conn; + sql = GET_STR(PG_GETARG_TEXT_P(0)); + fail = PG_GETARG_BOOL(1); + } + else + { + DBLINK_GET_CONN; + sql = GET_STR(PG_GETARG_TEXT_P(1)); + } + } + else if (PG_NARGS() == 1) + { + /* text */ conn = pconn->conn; sql = GET_STR(PG_GETARG_TEXT_P(0)); - fail = PG_GETARG_BOOL(1); } else + /* shouldn't happen */ + elog(ERROR, "wrong number of arguments"); + } + else if (is_async && do_get) + { + /* get async result */ + if (PG_NARGS() == 2) { + /* text,bool */ DBLINK_GET_CONN; - sql = GET_STR(PG_GETARG_TEXT_P(1)); + fail = PG_GETARG_BOOL(2); } + else if (PG_NARGS() == 1) + { + /* text */ + DBLINK_GET_CONN; + } + else + /* shouldn't happen */ + elog(ERROR, "wrong number of arguments"); } - else if (PG_NARGS() == 1) + else { - /* text */ - conn = pconn->conn; - sql = GET_STR(PG_GETARG_TEXT_P(0)); + /* send async query */ + if (PG_NARGS() == 2) + { + DBLINK_GET_CONN; + sql = GET_STR(PG_GETARG_TEXT_P(1)); + } + else + /* shouldn't happen */ + elog(ERROR, "wrong number of arguments"); } - else - /* shouldn't happen */ - elog(ERROR, "wrong number of arguments"); if (!conn) DBLINK_CONN_NOT_AVAIL; - res = PQexec(conn, sql); - if (!res || - (PQresultStatus(res) != PGRES_COMMAND_OK && - PQresultStatus(res) != PGRES_TUPLES_OK)) + if (!is_async || (is_async && do_get)) { - if (fail) - DBLINK_RES_ERROR("sql error"); + /* synchronous query, or async result retrieval */ + if (!is_async) + res = PQexec(conn, sql); else { - DBLINK_RES_ERROR_AS_NOTICE("sql error"); - if (freeconn) - PQfinish(conn); - SRF_RETURN_DONE(funcctx); + res = PQgetResult(conn); + /* NULL means we're all done with the async results */ + if (!res) + SRF_RETURN_DONE(funcctx); } - } - if (PQresultStatus(res) == PGRES_COMMAND_OK) - { - is_sql_cmd = true; - - /* need a tuple descriptor representing one TEXT column */ - tupdesc = CreateTemplateTupleDesc(1, false); - TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", - TEXTOID, -1, 0); - - /* - * and save a copy of the command status string to return as our - * result tuple - */ - sql_cmd_status = PQcmdStatus(res); - funcctx->max_calls = 1; - } - else - funcctx->max_calls = PQntuples(res); - - /* got results, keep track of them */ - funcctx->user_fctx = res; - - /* if needed, close the connection to the database and cleanup */ - if (freeconn) - PQfinish(conn); - - if (!is_sql_cmd) - { - /* get a tuple descriptor for our result type */ - switch (get_call_result_type(fcinfo, NULL, &tupdesc)) + if (!res || + (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK)) { - case TYPEFUNC_COMPOSITE: - /* success */ - break; - case TYPEFUNC_RECORD: - /* failed to determine actual type of RECORD */ - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("function returning record called in context " - "that cannot accept type record"))); - break; - default: - /* result type isn't composite */ - elog(ERROR, "return type must be a row type"); - break; + if (fail) + DBLINK_RES_ERROR("sql error"); + else + { + DBLINK_RES_ERROR_AS_NOTICE("sql error"); + if (freeconn) + PQfinish(conn); + SRF_RETURN_DONE(funcctx); + } } - - /* make sure we have a persistent copy of the tupdesc */ - tupdesc = CreateTupleDescCopy(tupdesc); + + if (PQresultStatus(res) == PGRES_COMMAND_OK) + { + is_sql_cmd = true; + + /* need a tuple descriptor representing one TEXT column */ + tupdesc = CreateTemplateTupleDesc(1, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", + TEXTOID, -1, 0); + + /* + * and save a copy of the command status string to return as our + * result tuple + */ + sql_cmd_status = PQcmdStatus(res); + funcctx->max_calls = 1; + } + else + funcctx->max_calls = PQntuples(res); + + /* got results, keep track of them */ + funcctx->user_fctx = res; + + /* if needed, close the connection to the database and cleanup */ + if (freeconn) + PQfinish(conn); + + if (!is_sql_cmd) + { + /* get a tuple descriptor for our result type */ + switch (get_call_result_type(fcinfo, NULL, &tupdesc)) + { + case TYPEFUNC_COMPOSITE: + /* success */ + break; + case TYPEFUNC_RECORD: + /* failed to determine actual type of RECORD */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("function returning record called in context " + "that cannot accept type record"))); + break; + default: + /* result type isn't composite */ + elog(ERROR, "return type must be a row type"); + break; + } + + /* make sure we have a persistent copy of the tupdesc */ + tupdesc = CreateTupleDescCopy(tupdesc); + } + + /* check result and tuple descriptor have the same number of columns */ + if (PQnfields(res) != tupdesc->natts) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); + + /* fast track when no results */ + if (funcctx->max_calls < 1) + { + if (res) + PQclear(res); + SRF_RETURN_DONE(funcctx); + } + + /* store needed metadata for subsequent calls */ + attinmeta = TupleDescGetAttInMetadata(tupdesc); + funcctx->attinmeta = attinmeta; + + MemoryContextSwitchTo(oldcontext); } - - /* check result and tuple descriptor have the same number of columns */ - if (PQnfields(res) != tupdesc->natts) - ereport(ERROR, - (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("remote query result rowtype does not match " - "the specified FROM clause rowtype"))); - - /* fast track when no results */ - if (funcctx->max_calls < 1) + else { - if (res) - PQclear(res); - SRF_RETURN_DONE(funcctx); + /* async query send */ + MemoryContextSwitchTo(oldcontext); + PG_RETURN_INT32(PQsendQuery(conn, sql)); } + } - /* store needed metadata for subsequent calls */ - attinmeta = TupleDescGetAttInMetadata(tupdesc); - funcctx->attinmeta = attinmeta; + if (is_async && !do_get) + { + /* async query send -- should not happen */ + elog(ERROR, "async query send called more than once"); - MemoryContextSwitchTo(oldcontext); } /* stuff done on every call of the function */ @@ -902,6 +982,140 @@ dblink_record(PG_FUNCTION_ARGS) } } +/* + * List all open dblink connections by name. + * Returns an array of all connection names. + * Takes no params + */ +PG_FUNCTION_INFO_V1(dblink_get_connections); +Datum +dblink_get_connections(PG_FUNCTION_ARGS) +{ + HASH_SEQ_STATUS status; + remoteConnHashEnt *hentry; + ArrayBuildState *astate = NULL; + + if (remoteConnHash) + { + hash_seq_init(&status, remoteConnHash); + while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL) + { + /* stash away current value */ + astate = accumArrayResult(astate, + PointerGetDatum(GET_TEXT(hentry->name)), + false, TEXTOID, CurrentMemoryContext); + } + } + + if (astate) + PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate, + CurrentMemoryContext)); + else + PG_RETURN_NULL(); +} + +/* + * Checks if a given remote connection is busy + * + * Returns 1 if the connection is busy, 0 otherwise + * Params: + * text connection_name - name of the connection to check + * + */ +PG_FUNCTION_INFO_V1(dblink_is_busy); +Datum +dblink_is_busy(PG_FUNCTION_ARGS) +{ + char *msg; + PGconn *conn = NULL; + char *conname = NULL; + char *connstr = NULL; + remoteConn *rconn = NULL; + bool freeconn = false; + + DBLINK_INIT; + DBLINK_GET_CONN; + if (!conn) + DBLINK_CONN_NOT_AVAIL; + + PQconsumeInput(conn); + PG_RETURN_INT32(PQisBusy(conn)); +} + +/* + * Cancels a running request on a connection + * + * Returns text: + * "OK" if the cancel request has been sent correctly, + * an error message otherwise + * + * Params: + * text connection_name - name of the connection to check + * + */ +PG_FUNCTION_INFO_V1(dblink_cancel_query); +Datum +dblink_cancel_query(PG_FUNCTION_ARGS) +{ + char *msg; + int res = 0; + PGconn *conn = NULL; + char *conname = NULL; + char *connstr = NULL; + remoteConn *rconn = NULL; + bool freeconn = false; + PGcancel *cancel; + char errbuf[256]; + + DBLINK_INIT; + DBLINK_GET_CONN; + if (!conn) + DBLINK_CONN_NOT_AVAIL; + cancel = PQgetCancel(conn); + + res = PQcancel(cancel, errbuf, 256); + PQfreeCancel(cancel); + + if (res == 0) + PG_RETURN_TEXT_P(GET_TEXT("OK")); + else + PG_RETURN_TEXT_P(GET_TEXT(errbuf)); +} + + +/* + * Get error message from a connection + * + * Returns text: + * "OK" if no error, an error message otherwise + * + * Params: + * text connection_name - name of the connection to check + * + */ +PG_FUNCTION_INFO_V1(dblink_error_message); +Datum +dblink_error_message(PG_FUNCTION_ARGS) +{ + char *msg; + PGconn *conn = NULL; + char *conname = NULL; + char *connstr = NULL; + remoteConn *rconn = NULL; + bool freeconn = false; + + DBLINK_INIT; + DBLINK_GET_CONN; + if (!conn) + DBLINK_CONN_NOT_AVAIL; + + msg = PQerrorMessage(conn); + if (!msg) + PG_RETURN_TEXT_P(GET_TEXT("OK")); + else + PG_RETURN_TEXT_P(GET_TEXT(msg)); +} + /* * Execute an SQL non-SELECT command */ diff --git a/contrib/dblink/dblink.h b/contrib/dblink/dblink.h index 969843765f..a479744621 100644 --- a/contrib/dblink/dblink.h +++ b/contrib/dblink/dblink.h @@ -8,7 +8,7 @@ * Darko Prenosil * Shridhar Daithankar * - * $PostgreSQL: pgsql/contrib/dblink/dblink.h,v 1.16 2006/07/10 18:40:16 momjian Exp $ + * $PostgreSQL: pgsql/contrib/dblink/dblink.h,v 1.17 2006/09/02 21:11:15 joe Exp $ * Copyright (c) 2001-2006, PostgreSQL Global Development Group * ALL RIGHTS RESERVED; * @@ -45,6 +45,12 @@ extern Datum dblink_open(PG_FUNCTION_ARGS); extern Datum dblink_close(PG_FUNCTION_ARGS); extern Datum dblink_fetch(PG_FUNCTION_ARGS); extern Datum dblink_record(PG_FUNCTION_ARGS); +extern Datum dblink_send_query(PG_FUNCTION_ARGS); +extern Datum dblink_get_result(PG_FUNCTION_ARGS); +extern Datum dblink_get_connections(PG_FUNCTION_ARGS); +extern Datum dblink_is_busy(PG_FUNCTION_ARGS); +extern Datum dblink_cancel_query(PG_FUNCTION_ARGS); +extern Datum dblink_error_message(PG_FUNCTION_ARGS); extern Datum dblink_exec(PG_FUNCTION_ARGS); extern Datum dblink_get_pkey(PG_FUNCTION_ARGS); extern Datum dblink_build_sql_insert(PG_FUNCTION_ARGS); diff --git a/contrib/dblink/dblink.sql.in b/contrib/dblink/dblink.sql.in index 7cd705ba54..e99ea05ec7 100644 --- a/contrib/dblink/dblink.sql.in +++ b/contrib/dblink/dblink.sql.in @@ -144,3 +144,38 @@ CREATE OR REPLACE FUNCTION dblink_current_query () RETURNS text AS 'MODULE_PATHNAME','dblink_current_query' LANGUAGE C; + +CREATE OR REPLACE FUNCTION dblink_send_query(text, text) +RETURNS int4 +AS 'MODULE_PATHNAME', 'dblink_send_query' +LANGUAGE C STRICT; + +CREATE OR REPLACE FUNCTION dblink_is_busy(text) +RETURNS int4 +AS 'MODULE_PATHNAME', 'dblink_is_busy' +LANGUAGE C STRICT; + +CREATE OR REPLACE FUNCTION dblink_get_result(text) +RETURNS SETOF record +AS 'MODULE_PATHNAME', 'dblink_get_result' +LANGUAGE C STRICT; + +CREATE OR REPLACE FUNCTION dblink_get_result(text, bool) +RETURNS SETOF record +AS 'MODULE_PATHNAME', 'dblink_get_result' +LANGUAGE C STRICT; + +CREATE OR REPLACE FUNCTION dblink_get_connections() +RETURNS text[] +AS 'MODULE_PATHNAME', 'dblink_get_connections' +LANGUAGE C; + +CREATE OR REPLACE FUNCTION dblink_cancel_query(text) +RETURNS text +AS 'MODULE_PATHNAME', 'dblink_cancel_query' +LANGUAGE C STRICT; + +CREATE OR REPLACE FUNCTION dblink_error_message(text) +RETURNS text +AS 'MODULE_PATHNAME', 'dblink_error_message' +LANGUAGE C STRICT; diff --git a/contrib/dblink/doc/misc b/contrib/dblink/doc/misc index ae79cf88ba..3834afd872 100644 --- a/contrib/dblink/doc/misc +++ b/contrib/dblink/doc/misc @@ -1,4 +1,4 @@ -$PostgreSQL: pgsql/contrib/dblink/doc/misc,v 1.3 2006/03/11 04:38:29 momjian Exp $ +$PostgreSQL: pgsql/contrib/dblink/doc/misc,v 1.4 2006/09/02 21:11:15 joe Exp $ ================================================================== Name @@ -139,3 +139,94 @@ test=# select dblink_build_sql_update('foo','1 2',2,'{"1", "a"}','{"1", "b"}'); UPDATE foo SET f1='1',f2='b',f3='1' WHERE f1='1' AND f2='b' (1 row) + +================================================================== +Name + +dblink_get_connections -- returns a text array of all active named + dblink connections + +Synopsis + +dblink_get_connections() RETURNS text[] + +Inputs + + none + +Outputs + + Returns text array of all active named dblink connections + +Example usage + + SELECT dblink_get_connections(); + +================================================================== +Name + +dblink_is_busy -- checks to see if named connection is busy + with an async query + +Synopsis + +dblink_is_busy(text connname) RETURNS int + +Inputs + + connname + The specific connection name to use. + +Outputs + + Returns 1 if connection is busy, 0 if it is not busy. + If this function returns 0, it is guaranteed that dblink_get_result + will not block. + +Example usage + + SELECT dblink_is_busy('dtest1'); + +================================================================== +Name + +dblink_cancel_query -- cancels any active query on the named connection + +Synopsis + +dblink_cancel_query(text connname) RETURNS text + +Inputs + + connname + The specific connection name to use. + +Outputs + + Returns "OK" on success, or an error message on failure. + +Example usage + + SELECT dblink_cancel_query('dtest1'); + +================================================================== +Name + +dblink_error_message -- gets last error message on the named connection + +Synopsis + +dblink_error_message(text connname) RETURNS text + +Inputs + + connname + The specific connection name to use. + +Outputs + + Returns last error message. + +Example usage + + SELECT dblink_error_message('dtest1'); diff --git a/contrib/dblink/doc/query b/contrib/dblink/doc/query index cd58a36142..42427b5d5c 100644 --- a/contrib/dblink/doc/query +++ b/contrib/dblink/doc/query @@ -118,3 +118,125 @@ Then you can simply write: select * from myremote_pg_proc where proname like 'bytea%'; + +================================================================== +Name + +dblink_send_query -- Sends an async query to a remote database + +Synopsis + +dblink_send_query(text connname, text sql) + +Inputs + + connname + The specific connection name to use. + + sql + + sql statement that you wish to execute on the remote host + e.g. "select * from pg_class" + +Outputs + + Returns int. A return value of 1 if the query was successfully dispatched, + 0 otherwise. If 1, results must be fetched by dblink_get_result(connname). + A running query may be cancelled by dblink_cancel_query(connname). + +Example usage + + SELECT dblink_connect('dtest1', 'dbname=contrib_regression'); + SELECT * from + dblink_send_query('dtest1', 'select * from foo where f1 < 3') as t1; + +================================================================== +Name + +dblink_get_result -- Gets an async query result + +Synopsis + +dblink_get_result(text connname [, bool fail_on_error]) + +Inputs + + connname + The specific connection name to use. An asynchronous query must + have already been sent using dblink_send_query() + + fail_on_error + + If true (default when not present) then an ERROR thrown on the remote side + of the connection causes an ERROR to also be thrown locally. If false, the + remote ERROR is locally treated as a NOTICE, and no rows are returned. + +Outputs + + Returns setof record + +Notes + Blocks until a result gets available. + + This function *must* be called if dblink_send_query returned + a 1, even on cancelled queries - otherwise the connection + can't be used anymore. It must be called once for each query + sent, and one additional time to obtain an empty set result, + prior to using the connection again. + +Example usage + +contrib_regression=# SELECT dblink_connect('dtest1', 'dbname=contrib_regression'); + dblink_connect +---------------- + OK +(1 row) + +contrib_regression=# SELECT * from +contrib_regression-# dblink_send_query('dtest1', 'select * from foo where f1 < 3') as t1; + t1 +---- + 1 +(1 row) + +contrib_regression=# SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]); + f1 | f2 | f3 +----+----+------------ + 0 | a | {a0,b0,c0} + 1 | b | {a1,b1,c1} + 2 | c | {a2,b2,c2} +(3 rows) + +contrib_regression=# SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]); + f1 | f2 | f3 +----+----+---- +(0 rows) + +contrib_regression=# SELECT * from + dblink_send_query('dtest1', 'select * from foo where f1 < 3; select * from foo where f1 > 6') as t1; + t1 +---- + 1 +(1 row) + +contrib_regression=# SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]); + f1 | f2 | f3 +----+----+------------ + 0 | a | {a0,b0,c0} + 1 | b | {a1,b1,c1} + 2 | c | {a2,b2,c2} +(3 rows) + +contrib_regression=# SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]); + f1 | f2 | f3 +----+----+--------------- + 7 | h | {a7,b7,c7} + 8 | i | {a8,b8,c8} + 9 | j | {a9,b9,c9} + 10 | k | {a10,b10,c10} +(4 rows) + +contrib_regression=# SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]); + f1 | f2 | f3 +----+----+---- +(0 rows) diff --git a/contrib/dblink/expected/dblink.out b/contrib/dblink/expected/dblink.out index f2e364a942..c98ae5cf6b 100644 --- a/contrib/dblink/expected/dblink.out +++ b/contrib/dblink/expected/dblink.out @@ -669,3 +669,90 @@ SELECT dblink_disconnect('myconn'); -- should get 'connection "myconn" not available' error SELECT dblink_disconnect('myconn'); ERROR: connection "myconn" not available +-- test asynchronous queries +SELECT dblink_connect('dtest1', 'dbname=contrib_regression'); + dblink_connect +---------------- + OK +(1 row) + +SELECT * from + dblink_send_query('dtest1', 'select * from foo where f1 < 3') as t1; + t1 +---- + 1 +(1 row) + +SELECT dblink_connect('dtest2', 'dbname=contrib_regression'); + dblink_connect +---------------- + OK +(1 row) + +SELECT * from + dblink_send_query('dtest2', 'select * from foo where f1 > 2 and f1 < 7') as t1; + t1 +---- + 1 +(1 row) + +SELECT dblink_connect('dtest3', 'dbname=contrib_regression'); + dblink_connect +---------------- + OK +(1 row) + +SELECT * from + dblink_send_query('dtest3', 'select * from foo where f1 > 6') as t1; + t1 +---- + 1 +(1 row) + +CREATE TEMPORARY TABLE result AS +(SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[])) +UNION +(SELECT * from dblink_get_result('dtest2') as t2(f1 int, f2 text, f3 text[])) +UNION +(SELECT * from dblink_get_result('dtest3') as t3(f1 int, f2 text, f3 text[])) +ORDER by f1; +SELECT dblink_get_connections(); + dblink_get_connections +------------------------ + {dtest1,dtest2,dtest3} +(1 row) + +SELECT dblink_disconnect('dtest1'); + dblink_disconnect +------------------- + OK +(1 row) + +SELECT dblink_disconnect('dtest2'); + dblink_disconnect +------------------- + OK +(1 row) + +SELECT dblink_disconnect('dtest3'); + dblink_disconnect +------------------- + OK +(1 row) + +SELECT * from result; + f1 | f2 | f3 +----+----+--------------- + 0 | a | {a0,b0,c0} + 1 | b | {a1,b1,c1} + 2 | c | {a2,b2,c2} + 3 | d | {a3,b3,c3} + 4 | e | {a4,b4,c4} + 5 | f | {a5,b5,c5} + 6 | g | {a6,b6,c6} + 7 | h | {a7,b7,c7} + 8 | i | {a8,b8,c8} + 9 | j | {a9,b9,c9} + 10 | k | {a10,b10,c10} +(11 rows) + diff --git a/contrib/dblink/sql/dblink.sql b/contrib/dblink/sql/dblink.sql index 66e2607cfe..52a3d049b9 100644 --- a/contrib/dblink/sql/dblink.sql +++ b/contrib/dblink/sql/dblink.sql @@ -319,3 +319,32 @@ SELECT dblink_disconnect('myconn'); -- close the named persistent connection again -- should get 'connection "myconn" not available' error SELECT dblink_disconnect('myconn'); + +-- test asynchronous queries +SELECT dblink_connect('dtest1', 'dbname=contrib_regression'); +SELECT * from + dblink_send_query('dtest1', 'select * from foo where f1 < 3') as t1; + +SELECT dblink_connect('dtest2', 'dbname=contrib_regression'); +SELECT * from + dblink_send_query('dtest2', 'select * from foo where f1 > 2 and f1 < 7') as t1; + +SELECT dblink_connect('dtest3', 'dbname=contrib_regression'); +SELECT * from + dblink_send_query('dtest3', 'select * from foo where f1 > 6') as t1; + +CREATE TEMPORARY TABLE result AS +(SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[])) +UNION +(SELECT * from dblink_get_result('dtest2') as t2(f1 int, f2 text, f3 text[])) +UNION +(SELECT * from dblink_get_result('dtest3') as t3(f1 int, f2 text, f3 text[])) +ORDER by f1; + +SELECT dblink_get_connections(); + +SELECT dblink_disconnect('dtest1'); +SELECT dblink_disconnect('dtest2'); +SELECT dblink_disconnect('dtest3'); +SELECT * from result; + -- 2.40.0