]> granicus.if.org Git - postgresql/commitdiff
Added async query capability. Original patch by
authorJoe Conway <mail@joeconway.com>
Sat, 2 Sep 2006 21:11:15 +0000 (21:11 +0000)
committerJoe Conway <mail@joeconway.com>
Sat, 2 Sep 2006 21:11:15 +0000 (21:11 +0000)
Kai Londenberg, modified by Joe Conway

contrib/dblink/README.dblink
contrib/dblink/dblink.c
contrib/dblink/dblink.h
contrib/dblink/dblink.sql.in
contrib/dblink/doc/misc
contrib/dblink/doc/query
contrib/dblink/expected/dblink.out
contrib/dblink/sql/dblink.sql

index 9e1bdba6cba6d974223618a8366057e60d53a384..c765cf5b972239669c47ae5a03932766a807012d 100644 (file)
@@ -7,6 +7,7 @@
  * And contributors:
  * Darko Prenosil <Darko.Prenosil@finteh.hr>
  * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
+ * Kai Londenberg (K.Londenberg@librics.de)
  *
  * Copyright (c) 2001-2006, PostgreSQL Global Development Group
  * ALL RIGHTS RESERVED;
@@ -31,6 +32,9 @@
  */
 
 Release Notes:
+  27 August 2006
+    - Added async query capability. Original patch by
+      Kai Londenberg (K.Londenberg@librics.de), modified by Joe Conway
   Version 0.7 (as of 25 Feb, 2004)
     - Added new version of dblink, dblink_exec, dblink_open, dblink_close,
       and, dblink_fetch -- allows ERROR on remote side of connection to
@@ -85,75 +89,7 @@ Installation:
 
     psql template1 < dblink.sql
 
-  installs following functions into database template1:
-
-     connection
-     ------------
-     dblink_connect(text) RETURNS text
-       - opens an unnamed connection that will persist for duration of
-         current backend or until it is disconnected
-     dblink_connect(text,text) RETURNS text
-       - opens a named connection that will persist for duration of current
-         backend or until it is disconnected
-     dblink_disconnect() RETURNS text
-       - disconnects the unnamed persistent connection
-     dblink_disconnect(text) RETURNS text
-       - disconnects a named persistent connection
-
-     cursor
-     ------------
-     dblink_open(text,text [, bool fail_on_error]) RETURNS text
-       - opens a cursor using unnamed connection already opened with
-         dblink_connect() that will persist for duration of current backend
-         or until it is closed
-     dblink_open(text,text,text [, bool fail_on_error]) RETURNS text
-       - opens a cursor using a named connection already opened with
-         dblink_connect() that will persist for duration of current backend
-         or until it is closed
-     dblink_fetch(text, int [, bool fail_on_error]) RETURNS setof record
-       - fetches data from an already opened cursor on the unnamed connection
-     dblink_fetch(text, text, int [, bool fail_on_error]) RETURNS setof record
-       - fetches data from an already opened cursor on a named connection
-     dblink_close(text [, bool fail_on_error]) RETURNS text
-       - closes a cursor on the unnamed connection
-     dblink_close(text,text [, bool fail_on_error]) RETURNS text
-       - closes a cursor on a named connection
-
-     query
-     ------------
-     dblink(text,text [, bool fail_on_error]) RETURNS setof record
-       - returns a set of results from remote SELECT query; the first argument
-         is either a connection string, or the name of an already opened
-         persistant connection
-     dblink(text [, bool fail_on_error]) RETURNS setof record
-       - returns a set of results from remote SELECT query, using the unnamed
-         connection already opened with dblink_connect()
-
-     execute
-     ------------
-     dblink_exec(text, text [, bool fail_on_error]) RETURNS text
-       - executes an INSERT/UPDATE/DELETE query remotely; the first argument
-         is either a connection string, or the name of an already opened
-         persistant connection
-     dblink_exec(text [, bool fail_on_error]) RETURNS text
-       - executes an INSERT/UPDATE/DELETE query remotely, using connection
-         already opened with dblink_connect()
-
-     misc
-     ------------
-     dblink_current_query() RETURNS text
-       - returns the current query string
-     dblink_get_pkey(text) RETURNS setof text
-       - returns the field names of a relation's primary key fields
-     dblink_build_sql_insert(text,int2vector,int2,_text,_text) RETURNS text
-       - builds an insert statement using a local tuple, replacing the
-         selection key field values with alternate supplied values
-     dblink_build_sql_delete(text,int2vector,int2,_text) RETURNS text
-       - builds a delete statement using supplied values for selection
-         key field values
-     dblink_build_sql_update(text,int2vector,int2,_text,_text) RETURNS text
-       - builds an update statement using a local tuple, replacing the
-         selection key field values with alternate supplied values
+  installs dblink functions into database template1
 
 Documentation:
 
index 3405ddeaa1b6da013e9aac5c6663c29d9d23c2ad..7a46673b6b170858c0781ef3b6497ea4b84e3211 100644 (file)
@@ -8,7 +8,7 @@
  * Darko Prenosil <Darko.Prenosil@finteh.hr>
  * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
  *
- * $PostgreSQL: pgsql/contrib/dblink/dblink.c,v 1.57 2006/07/11 16:35:31 momjian Exp $
+ * $PostgreSQL: pgsql/contrib/dblink/dblink.c,v 1.58 2006/09/02 21:11:15 joe Exp $
  * Copyright (c) 2001-2006, PostgreSQL Global Development Group
  * ALL RIGHTS RESERVED;
  *
@@ -73,6 +73,7 @@ typedef struct remoteConn
 /*
  * Internal declarations
  */
+static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get);
 static remoteConn *getConnectionByName(const char *name);
 static HTAB *createConnHash(void);
 static void createNewConnection(const char *name, remoteConn * rconn);
@@ -690,6 +691,26 @@ dblink_fetch(PG_FUNCTION_ARGS)
 PG_FUNCTION_INFO_V1(dblink_record);
 Datum
 dblink_record(PG_FUNCTION_ARGS)
+{
+       return dblink_record_internal(fcinfo, false, false);
+}
+
+PG_FUNCTION_INFO_V1(dblink_send_query);
+Datum
+dblink_send_query(PG_FUNCTION_ARGS)
+{
+       return dblink_record_internal(fcinfo, true, false);
+}
+
+PG_FUNCTION_INFO_V1(dblink_get_result);
+Datum
+dblink_get_result(PG_FUNCTION_ARGS)
+{
+       return dblink_record_internal(fcinfo, true, true);
+}
+
+static Datum
+dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get)
 {
        FuncCallContext *funcctx;
        TupleDesc       tupdesc = NULL;
@@ -723,128 +744,187 @@ dblink_record(PG_FUNCTION_ARGS)
                 */
                oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
 
-               if (PG_NARGS() == 3)
+               if (!is_async)
                {
-                       /* text,text,bool */
-                       DBLINK_GET_CONN;
-                       sql = GET_STR(PG_GETARG_TEXT_P(1));
-                       fail = PG_GETARG_BOOL(2);
-               }
-               else if (PG_NARGS() == 2)
-               {
-                       /* text,text or text,bool */
-                       if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
+                       if (PG_NARGS() == 3)
                        {
+                               /* text,text,bool */
+                               DBLINK_GET_CONN;
+                               sql = GET_STR(PG_GETARG_TEXT_P(1));
+                               fail = PG_GETARG_BOOL(2);
+                       }
+                       else if (PG_NARGS() == 2)
+                       {
+                               /* text,text or text,bool */
+                               if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
+                               {
+                                       conn = pconn->conn;
+                                       sql = GET_STR(PG_GETARG_TEXT_P(0));
+                                       fail = PG_GETARG_BOOL(1);
+                               }
+                               else
+                               {
+                                       DBLINK_GET_CONN;
+                                       sql = GET_STR(PG_GETARG_TEXT_P(1));
+                               }
+                       }
+                       else if (PG_NARGS() == 1)
+                       {
+                               /* text */
                                conn = pconn->conn;
                                sql = GET_STR(PG_GETARG_TEXT_P(0));
-                               fail = PG_GETARG_BOOL(1);
                        }
                        else
+                               /* shouldn't happen */
+                               elog(ERROR, "wrong number of arguments");
+               }
+               else if (is_async && do_get)
+               {
+                       /* get async result */
+                       if (PG_NARGS() == 2)
                        {
+                               /* text,bool */
                                DBLINK_GET_CONN;
-                               sql = GET_STR(PG_GETARG_TEXT_P(1));
+                               fail = PG_GETARG_BOOL(2);
                        }
+                       else if (PG_NARGS() == 1)
+                       {
+                               /* text */
+                               DBLINK_GET_CONN;
+                       }
+                       else
+                               /* shouldn't happen */
+                               elog(ERROR, "wrong number of arguments");
                }
-               else if (PG_NARGS() == 1)
+               else
                {
-                       /* text */
-                       conn = pconn->conn;
-                       sql = GET_STR(PG_GETARG_TEXT_P(0));
+                       /* send async query */
+                       if (PG_NARGS() == 2)
+                       {
+                               DBLINK_GET_CONN;
+                               sql = GET_STR(PG_GETARG_TEXT_P(1));
+                       }
+                       else
+                               /* shouldn't happen */
+                               elog(ERROR, "wrong number of arguments");
                }
-               else
-                       /* shouldn't happen */
-                       elog(ERROR, "wrong number of arguments");
 
                if (!conn)
                        DBLINK_CONN_NOT_AVAIL;
 
-               res = PQexec(conn, sql);
-               if (!res ||
-                       (PQresultStatus(res) != PGRES_COMMAND_OK &&
-                        PQresultStatus(res) != PGRES_TUPLES_OK))
+               if (!is_async || (is_async && do_get))
                {
-                       if (fail)
-                               DBLINK_RES_ERROR("sql error");
+                       /* synchronous query, or async result retrieval */
+                       if (!is_async)
+                               res = PQexec(conn, sql);
                        else
                        {
-                               DBLINK_RES_ERROR_AS_NOTICE("sql error");
-                               if (freeconn)
-                                       PQfinish(conn);
-                               SRF_RETURN_DONE(funcctx);
+                               res = PQgetResult(conn);
+                               /* NULL means we're all done with the async results */
+                               if (!res)
+                                       SRF_RETURN_DONE(funcctx);
                        }
-               }
 
-               if (PQresultStatus(res) == PGRES_COMMAND_OK)
-               {
-                       is_sql_cmd = true;
-
-                       /* need a tuple descriptor representing one TEXT column */
-                       tupdesc = CreateTemplateTupleDesc(1, false);
-                       TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
-                                                          TEXTOID, -1, 0);
-
-                       /*
-                        * and save a copy of the command status string to return as our
-                        * result tuple
-                        */
-                       sql_cmd_status = PQcmdStatus(res);
-                       funcctx->max_calls = 1;
-               }
-               else
-                       funcctx->max_calls = PQntuples(res);
-
-               /* got results, keep track of them */
-               funcctx->user_fctx = res;
-
-               /* if needed, close the connection to the database and cleanup */
-               if (freeconn)
-                       PQfinish(conn);
-
-               if (!is_sql_cmd)
-               {
-                       /* get a tuple descriptor for our result type */
-                       switch (get_call_result_type(fcinfo, NULL, &tupdesc))
+                       if (!res ||
+                               (PQresultStatus(res) != PGRES_COMMAND_OK &&
+                               PQresultStatus(res) != PGRES_TUPLES_OK))
                        {
-                               case TYPEFUNC_COMPOSITE:
-                                       /* success */
-                                       break;
-                               case TYPEFUNC_RECORD:
-                                       /* failed to determine actual type of RECORD */
-                                       ereport(ERROR,
-                                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                                               errmsg("function returning record called in context "
-                                                          "that cannot accept type record")));
-                                       break;
-                               default:
-                                       /* result type isn't composite */
-                                       elog(ERROR, "return type must be a row type");
-                                       break;
+                               if (fail)
+                                       DBLINK_RES_ERROR("sql error");
+                               else
+                               {
+                                       DBLINK_RES_ERROR_AS_NOTICE("sql error");
+                                       if (freeconn)
+                                               PQfinish(conn);
+                                       SRF_RETURN_DONE(funcctx);
+                               }
                        }
-
-                       /* make sure we have a persistent copy of the tupdesc */
-                       tupdesc = CreateTupleDescCopy(tupdesc);
+       
+                       if (PQresultStatus(res) == PGRES_COMMAND_OK)
+                       {
+                               is_sql_cmd = true;
+       
+                               /* need a tuple descriptor representing one TEXT column */
+                               tupdesc = CreateTemplateTupleDesc(1, false);
+                               TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
+                                                               TEXTOID, -1, 0);
+       
+                               /*
+                               * and save a copy of the command status string to return as our
+                               * result tuple
+                               */
+                               sql_cmd_status = PQcmdStatus(res);
+                               funcctx->max_calls = 1;
+                       }
+                       else
+                               funcctx->max_calls = PQntuples(res);
+       
+                       /* got results, keep track of them */
+                       funcctx->user_fctx = res;
+       
+                       /* if needed, close the connection to the database and cleanup */
+                       if (freeconn)
+                               PQfinish(conn);
+       
+                       if (!is_sql_cmd)
+                       {
+                               /* get a tuple descriptor for our result type */
+                               switch (get_call_result_type(fcinfo, NULL, &tupdesc))
+                               {
+                                       case TYPEFUNC_COMPOSITE:
+                                               /* success */
+                                               break;
+                                       case TYPEFUNC_RECORD:
+                                               /* failed to determine actual type of RECORD */
+                                               ereport(ERROR,
+                                                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                                       errmsg("function returning record called in context "
+                                                               "that cannot accept type record")));
+                                               break;
+                                       default:
+                                               /* result type isn't composite */
+                                               elog(ERROR, "return type must be a row type");
+                                               break;
+                               }
+       
+                               /* make sure we have a persistent copy of the tupdesc */
+                               tupdesc = CreateTupleDescCopy(tupdesc);
+                       }
+       
+                       /* check result and tuple descriptor have the same number of columns */
+                       if (PQnfields(res) != tupdesc->natts)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_DATATYPE_MISMATCH),
+                                       errmsg("remote query result rowtype does not match "
+                                                       "the specified FROM clause rowtype")));
+       
+                       /* fast track when no results */
+                       if (funcctx->max_calls < 1)
+                       {
+                               if (res)
+                                       PQclear(res);
+                               SRF_RETURN_DONE(funcctx);
+                       }
+       
+                       /* store needed metadata for subsequent calls */
+                       attinmeta = TupleDescGetAttInMetadata(tupdesc);
+                       funcctx->attinmeta = attinmeta;
+       
+                       MemoryContextSwitchTo(oldcontext);
                }
-
-               /* check result and tuple descriptor have the same number of columns */
-               if (PQnfields(res) != tupdesc->natts)
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_DATATYPE_MISMATCH),
-                               errmsg("remote query result rowtype does not match "
-                                               "the specified FROM clause rowtype")));
-
-               /* fast track when no results */
-               if (funcctx->max_calls < 1)
+               else
                {
-                       if (res)
-                               PQclear(res);
-                       SRF_RETURN_DONE(funcctx);
+                       /* async query send */
+                       MemoryContextSwitchTo(oldcontext);
+                       PG_RETURN_INT32(PQsendQuery(conn, sql));
                }
+       }
 
-               /* store needed metadata for subsequent calls */
-               attinmeta = TupleDescGetAttInMetadata(tupdesc);
-               funcctx->attinmeta = attinmeta;
+       if (is_async && !do_get)
+       {
+               /* async query send -- should not happen */
+               elog(ERROR, "async query send called more than once");
 
-               MemoryContextSwitchTo(oldcontext);
        }
 
        /* stuff done on every call of the function */
@@ -902,6 +982,140 @@ dblink_record(PG_FUNCTION_ARGS)
        }
 }
 
+/*
+ * List all open dblink connections by name.
+ * Returns an array of all connection names.
+ * Takes no params
+ */
+PG_FUNCTION_INFO_V1(dblink_get_connections);
+Datum
+dblink_get_connections(PG_FUNCTION_ARGS)
+{
+       HASH_SEQ_STATUS                 status;
+       remoteConnHashEnt          *hentry;
+       ArrayBuildState            *astate = NULL;
+
+       if (remoteConnHash)
+       {
+               hash_seq_init(&status, remoteConnHash);
+               while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
+               {
+                       /* stash away current value */
+                       astate = accumArrayResult(astate,
+                                                                         PointerGetDatum(GET_TEXT(hentry->name)),
+                                                                         false, TEXTOID, CurrentMemoryContext);
+               }
+       }
+
+       if (astate)
+               PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate,
+                                                                                         CurrentMemoryContext));
+       else
+               PG_RETURN_NULL();
+}
+
+/*
+ * Checks if a given remote connection is busy
+ *
+ * Returns 1 if the connection is busy, 0 otherwise
+ * Params:
+ *     text connection_name - name of the connection to check
+ * 
+ */
+PG_FUNCTION_INFO_V1(dblink_is_busy);
+Datum
+dblink_is_busy(PG_FUNCTION_ARGS)
+{
+       char               *msg;
+       PGconn             *conn = NULL;
+       char               *conname = NULL;
+       char               *connstr = NULL;
+       remoteConn         *rconn = NULL;
+       bool                    freeconn = false;
+
+       DBLINK_INIT;
+       DBLINK_GET_CONN;
+       if (!conn)
+               DBLINK_CONN_NOT_AVAIL;
+
+       PQconsumeInput(conn);
+       PG_RETURN_INT32(PQisBusy(conn));
+}
+
+/*
+ * Cancels a running request on a connection
+ *
+ * Returns text: 
+ *     "OK" if the cancel request has been sent correctly,
+ *      an error message otherwise
+ * 
+ * Params:
+ *     text connection_name - name of the connection to check
+ * 
+ */
+PG_FUNCTION_INFO_V1(dblink_cancel_query);
+Datum
+dblink_cancel_query(PG_FUNCTION_ARGS)
+{
+       char               *msg;
+       int                             res = 0;
+       PGconn             *conn = NULL;
+       char               *conname = NULL;
+       char               *connstr = NULL;
+       remoteConn         *rconn = NULL;
+       bool                    freeconn = false;
+       PGcancel           *cancel;
+       char                    errbuf[256];
+
+       DBLINK_INIT;
+       DBLINK_GET_CONN;
+       if (!conn)
+               DBLINK_CONN_NOT_AVAIL;
+       cancel = PQgetCancel(conn);
+
+       res = PQcancel(cancel, errbuf, 256);
+       PQfreeCancel(cancel);
+
+       if (res == 0)
+               PG_RETURN_TEXT_P(GET_TEXT("OK"));       
+       else
+               PG_RETURN_TEXT_P(GET_TEXT(errbuf));
+}
+
+
+/*
+ * Get error message from a connection
+ *
+ * Returns text: 
+ *     "OK" if no error, an error message otherwise
+ * 
+ * Params:
+ *     text connection_name - name of the connection to check
+ * 
+ */
+PG_FUNCTION_INFO_V1(dblink_error_message);
+Datum
+dblink_error_message(PG_FUNCTION_ARGS)
+{
+       char               *msg;
+       PGconn             *conn = NULL;
+       char               *conname = NULL;
+       char               *connstr = NULL;
+       remoteConn         *rconn = NULL;
+       bool                    freeconn = false;
+
+       DBLINK_INIT;
+       DBLINK_GET_CONN;
+       if (!conn)
+               DBLINK_CONN_NOT_AVAIL;
+
+       msg = PQerrorMessage(conn);
+       if (!msg)
+               PG_RETURN_TEXT_P(GET_TEXT("OK"));
+       else
+               PG_RETURN_TEXT_P(GET_TEXT(msg));
+}
+
 /*
  * Execute an SQL non-SELECT command
  */
index 969843765fb1378466e9f6c83b6c608d78b84e24..a479744621f940404ea7814766720126839cc513 100644 (file)
@@ -8,7 +8,7 @@
  * Darko Prenosil <Darko.Prenosil@finteh.hr>
  * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
  *
- * $PostgreSQL: pgsql/contrib/dblink/dblink.h,v 1.16 2006/07/10 18:40:16 momjian Exp $
+ * $PostgreSQL: pgsql/contrib/dblink/dblink.h,v 1.17 2006/09/02 21:11:15 joe Exp $
  * Copyright (c) 2001-2006, PostgreSQL Global Development Group
  * ALL RIGHTS RESERVED;
  *
@@ -45,6 +45,12 @@ extern Datum dblink_open(PG_FUNCTION_ARGS);
 extern Datum dblink_close(PG_FUNCTION_ARGS);
 extern Datum dblink_fetch(PG_FUNCTION_ARGS);
 extern Datum dblink_record(PG_FUNCTION_ARGS);
+extern Datum dblink_send_query(PG_FUNCTION_ARGS);
+extern Datum dblink_get_result(PG_FUNCTION_ARGS);
+extern Datum dblink_get_connections(PG_FUNCTION_ARGS);
+extern Datum dblink_is_busy(PG_FUNCTION_ARGS);
+extern Datum dblink_cancel_query(PG_FUNCTION_ARGS);
+extern Datum dblink_error_message(PG_FUNCTION_ARGS);
 extern Datum dblink_exec(PG_FUNCTION_ARGS);
 extern Datum dblink_get_pkey(PG_FUNCTION_ARGS);
 extern Datum dblink_build_sql_insert(PG_FUNCTION_ARGS);
index 7cd705ba5411c32afc7d5254a6105b169ad475dc..e99ea05ec785822f1824b4238ef627a3452a4019 100644 (file)
@@ -144,3 +144,38 @@ CREATE OR REPLACE FUNCTION dblink_current_query ()
 RETURNS text
 AS 'MODULE_PATHNAME','dblink_current_query'
 LANGUAGE C;
+
+CREATE OR REPLACE FUNCTION dblink_send_query(text, text)
+RETURNS int4
+AS 'MODULE_PATHNAME', 'dblink_send_query'
+LANGUAGE C STRICT;
+
+CREATE OR REPLACE FUNCTION dblink_is_busy(text)
+RETURNS int4
+AS 'MODULE_PATHNAME', 'dblink_is_busy'
+LANGUAGE C STRICT;
+
+CREATE OR REPLACE FUNCTION dblink_get_result(text)
+RETURNS SETOF record
+AS 'MODULE_PATHNAME', 'dblink_get_result'
+LANGUAGE C STRICT;
+
+CREATE OR REPLACE FUNCTION dblink_get_result(text, bool)
+RETURNS SETOF record
+AS 'MODULE_PATHNAME', 'dblink_get_result'
+LANGUAGE C STRICT;
+
+CREATE OR REPLACE FUNCTION dblink_get_connections()
+RETURNS text[]
+AS 'MODULE_PATHNAME', 'dblink_get_connections'
+LANGUAGE C;
+
+CREATE OR REPLACE FUNCTION dblink_cancel_query(text)
+RETURNS text
+AS 'MODULE_PATHNAME', 'dblink_cancel_query'
+LANGUAGE C STRICT;
+
+CREATE OR REPLACE FUNCTION dblink_error_message(text)
+RETURNS text
+AS 'MODULE_PATHNAME', 'dblink_error_message'
+LANGUAGE C STRICT;
index ae79cf88ba4b40328d47001bfc3037c87a88167c..3834afd8724bf10f15e95cdd78fe61193d259743 100644 (file)
@@ -1,4 +1,4 @@
-$PostgreSQL: pgsql/contrib/dblink/doc/misc,v 1.3 2006/03/11 04:38:29 momjian Exp $
+$PostgreSQL: pgsql/contrib/dblink/doc/misc,v 1.4 2006/09/02 21:11:15 joe Exp $
 ==================================================================
 Name
 
@@ -139,3 +139,94 @@ test=# select dblink_build_sql_update('foo','1 2',2,'{"1", "a"}','{"1", "b"}');
  UPDATE foo SET f1='1',f2='b',f3='1' WHERE f1='1' AND f2='b'
 (1 row)
 
+
+==================================================================
+Name
+
+dblink_get_connections -- returns a text array of all active named
+                          dblink connections
+
+Synopsis
+
+dblink_get_connections() RETURNS text[]
+
+Inputs
+
+  none
+
+Outputs
+
+  Returns text array of all active named dblink connections
+
+Example usage
+
+  SELECT dblink_get_connections();
+
+==================================================================
+Name
+
+dblink_is_busy -- checks to see if named connection is busy
+                  with an async query
+
+Synopsis
+
+dblink_is_busy(text connname) RETURNS int
+
+Inputs
+
+  connname
+    The specific connection name to use.
+
+Outputs
+
+  Returns 1 if connection is busy, 0 if it is not busy.
+  If this function returns 0, it is guaranteed that dblink_get_result
+  will not block.
+
+Example usage
+
+  SELECT dblink_is_busy('dtest1');
+
+==================================================================
+Name
+
+dblink_cancel_query -- cancels any active query on the named connection
+
+Synopsis
+
+dblink_cancel_query(text connname) RETURNS text
+
+Inputs
+
+  connname
+    The specific connection name to use.
+
+Outputs
+
+  Returns "OK" on success, or an error message on failure.
+
+Example usage
+
+  SELECT dblink_cancel_query('dtest1');
+
+==================================================================
+Name
+
+dblink_error_message -- gets last error message on the named connection
+
+Synopsis
+
+dblink_error_message(text connname) RETURNS text
+
+Inputs
+
+  connname
+    The specific connection name to use.
+
+Outputs
+
+  Returns last error message.
+
+Example usage
+
+  SELECT dblink_error_message('dtest1');
index cd58a3614224fbfccd3e4da95d9ee4d00108cd4b..42427b5d5c361dc152d2dc0c294ac15e66cf4fd1 100644 (file)
@@ -118,3 +118,125 @@ Then you can simply write:
 
    select * from myremote_pg_proc where proname like 'bytea%';
 
+
+==================================================================
+Name
+
+dblink_send_query -- Sends an async query to a remote database
+
+Synopsis
+
+dblink_send_query(text connname, text sql)
+
+Inputs
+
+  connname
+    The specific connection name to use.
+
+  sql
+
+    sql statement that you wish to execute on the remote host
+    e.g. "select * from pg_class"
+
+Outputs
+
+  Returns int. A return value of 1 if the query was successfully dispatched,
+  0 otherwise. If 1, results must be fetched by dblink_get_result(connname).
+  A running query may be cancelled by dblink_cancel_query(connname).
+
+Example usage
+
+  SELECT dblink_connect('dtest1', 'dbname=contrib_regression');
+  SELECT * from 
+   dblink_send_query('dtest1', 'select * from foo where f1 < 3') as t1;
+
+==================================================================
+Name
+
+dblink_get_result -- Gets an async query result
+
+Synopsis
+
+dblink_get_result(text connname [, bool fail_on_error])
+
+Inputs
+
+  connname
+    The specific connection name to use. An asynchronous query must
+    have already been sent using dblink_send_query()
+
+  fail_on_error
+
+    If true (default when not present) then an ERROR thrown on the remote side
+    of the connection causes an ERROR to also be thrown locally. If false, the
+    remote ERROR is locally treated as a NOTICE, and no rows are returned.
+
+Outputs
+
+  Returns setof record
+
+Notes
+  Blocks until a result gets available.
+
+  This function *must* be called if dblink_send_query returned
+  a 1, even on cancelled queries - otherwise the connection
+  can't be used anymore. It must be called once for each query
+  sent, and one additional time to obtain an empty set result,
+  prior to using the connection again.
+
+Example usage
+
+contrib_regression=#   SELECT dblink_connect('dtest1', 'dbname=contrib_regression');
+ dblink_connect
+----------------
+ OK
+(1 row)
+
+contrib_regression=#   SELECT * from
+contrib_regression-#    dblink_send_query('dtest1', 'select * from foo where f1 < 3') as t1;
+ t1
+----
+  1
+(1 row)
+
+contrib_regression=#   SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]);
+ f1 | f2 |     f3
+----+----+------------
+  0 | a  | {a0,b0,c0}
+  1 | b  | {a1,b1,c1}
+  2 | c  | {a2,b2,c2}
+(3 rows)
+
+contrib_regression=#   SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]);
+ f1 | f2 | f3
+----+----+----
+(0 rows)
+
+contrib_regression=#   SELECT * from
+   dblink_send_query('dtest1', 'select * from foo where f1 < 3; select * from foo where f1 > 6') as t1;
+ t1
+----
+  1
+(1 row)
+
+contrib_regression=#   SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]);
+ f1 | f2 |     f3
+----+----+------------
+  0 | a  | {a0,b0,c0}
+  1 | b  | {a1,b1,c1}
+  2 | c  | {a2,b2,c2}
+(3 rows)
+
+contrib_regression=#   SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]);
+ f1 | f2 |      f3
+----+----+---------------
+  7 | h  | {a7,b7,c7}
+  8 | i  | {a8,b8,c8}
+  9 | j  | {a9,b9,c9}
+ 10 | k  | {a10,b10,c10}
+(4 rows)
+
+contrib_regression=#   SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]);
+ f1 | f2 | f3
+----+----+----
+(0 rows)
index f2e364a9423e872416bda12b1c45c1ed4525dbca..c98ae5cf6b55e8dc50466e7135bf1f8e003db36d 100644 (file)
@@ -669,3 +669,90 @@ SELECT dblink_disconnect('myconn');
 -- should get 'connection "myconn" not available' error
 SELECT dblink_disconnect('myconn');
 ERROR:  connection "myconn" not available
+-- test asynchronous queries
+SELECT dblink_connect('dtest1', 'dbname=contrib_regression');
+ dblink_connect 
+----------------
+ OK
+(1 row)
+
+SELECT * from 
+ dblink_send_query('dtest1', 'select * from foo where f1 < 3') as t1;
+ t1 
+----
+  1
+(1 row)
+
+SELECT dblink_connect('dtest2', 'dbname=contrib_regression');
+ dblink_connect 
+----------------
+ OK
+(1 row)
+
+SELECT * from 
+ dblink_send_query('dtest2', 'select * from foo where f1 > 2 and f1 < 7') as t1;
+ t1 
+----
+  1
+(1 row)
+
+SELECT dblink_connect('dtest3', 'dbname=contrib_regression');
+ dblink_connect 
+----------------
+ OK
+(1 row)
+
+SELECT * from 
+ dblink_send_query('dtest3', 'select * from foo where f1 > 6') as t1;
+ t1 
+----
+  1
+(1 row)
+
+CREATE TEMPORARY TABLE result AS
+(SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]))
+UNION
+(SELECT * from dblink_get_result('dtest2') as t2(f1 int, f2 text, f3 text[]))
+UNION
+(SELECT * from dblink_get_result('dtest3') as t3(f1 int, f2 text, f3 text[]))
+ORDER by f1;
+SELECT dblink_get_connections();
+ dblink_get_connections
+------------------------
+ {dtest1,dtest2,dtest3}
+(1 row)
+
+SELECT dblink_disconnect('dtest1');
+ dblink_disconnect 
+-------------------
+ OK
+(1 row)
+
+SELECT dblink_disconnect('dtest2');
+ dblink_disconnect 
+-------------------
+ OK
+(1 row)
+
+SELECT dblink_disconnect('dtest3');
+ dblink_disconnect 
+-------------------
+ OK
+(1 row)
+
+SELECT * from result;
+ f1 | f2 |      f3       
+----+----+---------------
+  0 | a  | {a0,b0,c0}
+  1 | b  | {a1,b1,c1}
+  2 | c  | {a2,b2,c2}
+  3 | d  | {a3,b3,c3}
+  4 | e  | {a4,b4,c4}
+  5 | f  | {a5,b5,c5}
+  6 | g  | {a6,b6,c6}
+  7 | h  | {a7,b7,c7}
+  8 | i  | {a8,b8,c8}
+  9 | j  | {a9,b9,c9}
+ 10 | k  | {a10,b10,c10}
+(11 rows)
+
index 66e2607cfee667bbe8c226da4c0f4bb66ec39669..52a3d049b9c2db92318547d14480cec938330c32 100644 (file)
@@ -319,3 +319,32 @@ SELECT dblink_disconnect('myconn');
 -- close the named persistent connection again
 -- should get 'connection "myconn" not available' error
 SELECT dblink_disconnect('myconn');
+
+-- test asynchronous queries
+SELECT dblink_connect('dtest1', 'dbname=contrib_regression');
+SELECT * from 
+ dblink_send_query('dtest1', 'select * from foo where f1 < 3') as t1;
+
+SELECT dblink_connect('dtest2', 'dbname=contrib_regression');
+SELECT * from 
+ dblink_send_query('dtest2', 'select * from foo where f1 > 2 and f1 < 7') as t1;
+
+SELECT dblink_connect('dtest3', 'dbname=contrib_regression');
+SELECT * from 
+ dblink_send_query('dtest3', 'select * from foo where f1 > 6') as t1;
+
+CREATE TEMPORARY TABLE result AS
+(SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]))
+UNION
+(SELECT * from dblink_get_result('dtest2') as t2(f1 int, f2 text, f3 text[]))
+UNION
+(SELECT * from dblink_get_result('dtest3') as t3(f1 int, f2 text, f3 text[]))
+ORDER by f1;
+
+SELECT dblink_get_connections();
+
+SELECT dblink_disconnect('dtest1');
+SELECT dblink_disconnect('dtest2');
+SELECT dblink_disconnect('dtest3');
+SELECT * from result;
+