]> granicus.if.org Git - postgresql/commitdiff
Attached is a fairly sizeable update to contrib/dblink. I'd love to get
authorBruce Momjian <bruce@momjian.us>
Mon, 2 Sep 2002 06:13:31 +0000 (06:13 +0000)
committerBruce Momjian <bruce@momjian.us>
Mon, 2 Sep 2002 06:13:31 +0000 (06:13 +0000)
review/feedback if anyone is interested and can spend the time. But I'd
also love to get this committed and address changes as incremental
patches ;-), so if there are no objections, please apply.

Below I'll give a synopsis of the changes. More detailed descriptions
are now in a new doc directory under contrib/dblink. There is also a new

dblink.test.sql file which will give a pretty good overview of the
functions and their use.

Joe Conway

contrib/README
contrib/dblink/README.dblink
contrib/dblink/dblink.c
contrib/dblink/dblink.h
contrib/dblink/dblink.sql.in

index 2b0085483e6ea1862c587aa52bf7ccc95a8458a4..0d3b04fd1f90f8b7cfe315b5c54ea52e7551a960 100644 (file)
@@ -48,7 +48,7 @@ dbase -
 
 dblink -
        Allows remote query execution
-       by Joe Conway <joe.conway@mail.com>
+       by Joe Conway <mail@joeconway.com>
 
 dbmirror -
        Replication server
@@ -73,7 +73,7 @@ fulltextindex -
 
 fuzzystrmatch -
        Levenshtein, metaphone, and soundex fuzzy string matching
-       by Joe Conway <joseph.conway@home.com>, Joel Burton <jburton@scw.org>
+       by Joe Conway <mail@joeconway.com>, Joel Burton <jburton@scw.org>
 
 intagg -
        Integer aggregator
index 8e6adf069f13dc0dc80a8d5dd34cafa9ba488520..f304b7729d19cf34db6f1d0181e58a9a00e4391f 100644 (file)
@@ -3,7 +3,9 @@
  *
  * Functions returning results from a remote database
  *
- * Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002,
+ * Joe Conway <mail@joeconway.com>
+ *
+ * Copyright (c) 2001, 2002 by PostgreSQL Global Development Group
  * ALL RIGHTS RESERVED;
  * 
  * Permission to use, copy, modify, and distribute this software and its
  *
  */
 
-
-Version 0.4 (7 April, 2002):
-  Functions allowing remote database INSERT/UPDATE/DELETE/SELECT, and
-  various utility functions.
-  Tested under Linux (Red Hat 7.2) and PostgreSQL 7.2 and 7.3devel
+Version 0.5 (25 August, 2002):
+  Major overhaul to work with new backend "table function" capability. Removed
+  dblink_strtok() and dblink_replace() functions because they are now
+  available as backend functions (split() and replace() respectively).
+  Tested under Linux (Red Hat 7.3) and PostgreSQL 7.3devel. This version
+  is no longer backwards portable to PostgreSQL 7.2.
 
 Release Notes:
+  Version 0.5
+    - dblink now supports use directly as a table function; this is the new
+      preferred usage going forward
+    - Use of dblink_tok is now deprecated; original form of dblink is also
+      deprecated. They _will_ be removed in the next version.
+    - dblink_last_oid is also deprecated; use dblink_exec() which returns
+      the command status as a single row, single column result.
+    - Original dblink, dblink_tok, and dblink_last_oid are commented out in
+      dblink.sql; remove the comments to use the deprecated functions.
+    - dblink_strtok() and dblink_replace() functions were removed. Use
+      split() and replace() respectively (new backend functions in
+      PostgreSQL 7.3) instead.
+    - New functions: dblink_exec() for non-SELECT queries; dblink_connect()
+      opens connection that persists for duration of a backend;
+      dblink_disconnect() closes a persistent connection; dblink_open()
+      opens a cursor; dblink_fetch() fetches results from an open cursor.
+      dblink_close() closes a cursor.
+    - New test suite: dblink_check.sh, dblink.test.sql,
+      dblink.test.expected.out. Execute dblink_check.sh from the same
+      directory as the other two files. Output is dblink.test.out and
+      dblink.test.diff. Note that dblink.test.sql is a good source
+      of example usage.
 
   Version 0.4
     - removed cursor wrap around input sql to allow for remote
@@ -59,16 +84,48 @@ Installation:
 
   installs following functions into database template1:
 
-     dblink(text,text) RETURNS setof int
-       - returns a resource id for results from remote query
-     dblink_tok(int,int) RETURNS text
-       - extracts and returns individual field results
-     dblink_strtok(text,text,int) RETURNS text
-       - extracts and returns individual token from delimited text
+     connection
+     ------------
+     dblink_connect(text) RETURNS text
+       - opens a connection that will persist for duration of current
+         backend or until it is disconnected
+     dblink_disconnect() RETURNS text
+       - disconnects a persistent connection
+
+     cursor
+     ------------
+     dblink_open(text,text) RETURNS text
+       - opens a cursor using connection already opened with dblink_connect()
+         that will persist for duration of current backend or until it is
+         closed
+     dblink_fetch(text, int) RETURNS setof record
+       - fetches data from an already opened cursor
+     dblink_close(text) RETURNS text
+       - closes a cursor
+
+     query
+     ------------
+     dblink(text,text) RETURNS setof record
+       - returns a set of results from remote SELECT query
+         (Note: comment out in dblink.sql to use deprecated version)
+     dblink(text) RETURNS setof record
+       - returns a set of results from remote SELECT query, using connection
+         already opened with dblink_connect()
+
+     execute
+     ------------
+     dblink_exec(text, text) RETURNS text
+       - executes an INSERT/UPDATE/DELETE query remotely
+     dblink_exec(text) RETURNS text
+       - executes an INSERT/UPDATE/DELETE query remotely, using connection
+         already opened with dblink_connect()
+
+     misc
+     ------------
+     dblink_current_query() RETURNS text
+       - returns the current query string
      dblink_get_pkey(text) RETURNS setof text
        - returns the field names of a relation's primary key fields
-     dblink_last_oid(int) RETURNS oid
-       - returns the last inserted oid
      dblink_build_sql_insert(text,int2vector,int2,_text,_text) RETURNS text
        - builds an insert statement using a local tuple, replacing the
          selection key field values with alternate supplied values
@@ -78,338 +135,30 @@ Installation:
      dblink_build_sql_update(text,int2vector,int2,_text,_text) RETURNS text
        - builds an update statement using a local tuple, replacing the
          selection key field values with alternate supplied values
-     dblink_current_query() RETURNS text
-       - returns the current query string
-     dblink_replace(text,text,text) RETURNS text
-       - replace all occurences of substring-a in the input-string 
-         with substring-b
-
-Documentation
-==================================================================
-Name
-
-dblink -- Returns a resource id for a data set from a remote database
-
-Synopsis
-
-dblink(text connstr, text sql)
-
-Inputs
-
-  connstr
-
-    standard libpq format connection srting, 
-    e.g. "hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd"
-
-  sql
-
-    sql statement that you wish to execute on the remote host
-    e.g. "select * from pg_class"
-
-Outputs
-
-  Returns setof int (res_id)
-
-Example usage
-
-  select dblink('hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd'
-               ,'select f1, f2 from mytable');
-
-
-==================================================================
-
-Name
-
-dblink_tok -- Returns individual select field results from a dblink remote query
-
-Synopsis
-
-dblink_tok(int res_id, int fnumber)
-
-Inputs
-
-  res_id
-
-    a resource id returned by a call to dblink()
-
-  fnumber
-
-    the ordinal position (zero based) of the field to be returned from the dblink result set
-
-Outputs
-
-  Returns text
-
-Example usage
-
-  select dblink_tok(t1.dblink_p,0) as f1, dblink_tok(t1.dblink_p,1) as f2
-  from (select dblink('hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd'
-                     ,'select f1, f2 from mytable') as dblink_p) as t1;
-
-
-==================================================================
-
-A more convenient way to use dblink may be to create a view:
-
- create view myremotetable as
- select dblink_tok(t1.dblink_p,0) as f1, dblink_tok(t1.dblink_p,1) as f2
- from (select dblink('hostaddr=127.0.0.1 port=5432 dbname=template1 user=postgres password=postgres'
-                    ,'select proname, prosrc from pg_proc') as dblink_p) as t1;
-
-Then you can simply write:
-
-   select f1, f2 from myremotetable where f1 like 'bytea%';
-
-==================================================================
-Name
-
-dblink_strtok -- Extracts and returns individual token from delimited text
-
-Synopsis
-
-dblink_strtok(text inputstring, text delimiter, int posn) RETURNS text
-
-Inputs
-
-  inputstring
-
-    any string you want to parse a token out of;
-    e.g. 'f=1&g=3&h=4'
-
-  delimiter
-
-    a single character to use as the delimiter;
-    e.g. '&' or '='
-
-  posn
-
-    the position of the token of interest, 0 based;
-    e.g. 1
-
-Outputs
-
-  Returns text
-
-Example usage
-
-test=# select dblink_strtok(dblink_strtok('f=1&g=3&h=4','&',1),'=',1);
- dblink_strtok
----------------
- 3
-(1 row)
-
-==================================================================
-Name
-
-dblink_get_pkey -- returns the field names of a relation's primary
-                   key fields
-
-Synopsis
-
-dblink_get_pkey(text relname) RETURNS setof text
-
-Inputs
-
-  relname
-
-    any relation name;
-    e.g. 'foobar'
-
-Outputs
-
-  Returns setof text -- one row for each primary key field, in order of
-                        precedence
 
-Example usage
-
-test=# select dblink_get_pkey('foobar');
- dblink_get_pkey
------------------
- f1
- f2
- f3
- f4
- f5
-(5 rows)
-
-
-==================================================================
-Name
-
-dblink_last_oid -- Returns last inserted oid
-
-Synopsis
-
-dblink_last_oid(int res_id) RETURNS oid
-
-Inputs
-
-  res_id
-
-    any resource id returned by dblink function;
-
-Outputs
-
-  Returns oid of last inserted tuple
-
-Example usage
-
-test=# select dblink_last_oid(dblink('hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd'
-               ,'insert into mytable (f1, f2) values (1,2)'));
-
- dblink_last_oid
-----------------
- 16553
-(1 row)
-
-
-==================================================================
-Name
-
-dblink_build_sql_insert -- builds an insert statement using a local
-                           tuple, replacing the selection key field
-                           values with alternate supplied values
-dblink_build_sql_delete -- builds a delete statement using supplied
-                           values for selection key field values
-dblink_build_sql_update -- builds an update statement using a local
-                           tuple, replacing the selection key field
-                           values with alternate supplied values
-
-
-Synopsis
-
-dblink_build_sql_insert(text relname
-                         ,int2vector primary_key_attnums
-                         ,int2 num_primary_key_atts
-                         ,_text src_pk_att_vals_array
-                         ,_text tgt_pk_att_vals_array) RETURNS text
-dblink_build_sql_delete(text relname
-                         ,int2vector primary_key_attnums
-                         ,int2 num_primary_key_atts
-                         ,_text tgt_pk_att_vals_array) RETURNS text
-dblink_build_sql_update(text relname
-                         ,int2vector primary_key_attnums
-                         ,int2 num_primary_key_atts
-                         ,_text src_pk_att_vals_array
-                         ,_text tgt_pk_att_vals_array) RETURNS text
-
-Inputs
-
-  relname
-
-    any relation name;
-    e.g. 'foobar'
-
-  primary_key_attnums
-
-    vector of primary key attnums (1 based, see pg_index.indkey);
-    e.g. '1 2'
-
-  num_primary_key_atts
-
-    number of primary key attnums in the vector; e.g. 2
-
-  src_pk_att_vals_array
-
-    array of primary key values, used to look up the local matching
-    tuple, the values of which are then used to construct the SQL
-    statement
-
-  tgt_pk_att_vals_array
-
-    array of primary key values, used to replace the local tuple
-    values in the SQL statement
-
-Outputs
-
-  Returns text -- requested SQL statement
-
-Example usage
-
-test=# select dblink_build_sql_insert('foo','1 2',2,'{"1", "a"}','{"1", "b''a"}');
-             dblink_build_sql_insert
---------------------------------------------------
- INSERT INTO foo(f1,f2,f3) VALUES('1','b''a','1')
-(1 row)
-
-test=# select dblink_build_sql_delete('MyFoo','1 2',2,'{"1", "b"}');
-           dblink_build_sql_delete
----------------------------------------------
- DELETE FROM "MyFoo" WHERE f1='1' AND f2='b'
-(1 row)
-
-test=# select dblink_build_sql_update('foo','1 2',2,'{"1", "a"}','{"1", "b"}');
-                   dblink_build_sql_update
--------------------------------------------------------------
- UPDATE foo SET f1='1',f2='b',f3='1' WHERE f1='1' AND f2='b'
-(1 row)
-
-
-==================================================================
-Name
-
-dblink_current_query -- returns the current query string
-
-Synopsis
-
-dblink_current_query () RETURNS text
-
-Inputs
-
-  None
-
-Outputs
-
-  Returns text -- a copy of the currently executing query
-
-Example usage
-
-test=# select dblink_current_query() from (select dblink('dbname=template1','select oid, proname from pg_proc where proname = ''byteacat''') as f1) as t1;
-                                                                dblink_current_query
------------------------------------------------------------------------------------------------------------------------------------------------------
- select dblink_current_query() from (select dblink('dbname=template1','select oid, proname from pg_proc where proname = ''byteacat''') as f1) as t1;
-(1 row)
-
-
-==================================================================
-Name
-
-dblink_replace -- replace all occurences of substring-a in the
-                  input-string with substring-b
-
-Synopsis
-
-dblink_replace(text input-string, text substring-a, text substring-b) RETURNS text
-
-Inputs
-
-  input-string
-
-    the starting string, before replacement of substring-a
-
-  substring-a
-
-    the substring to find and replace
-
-  substring-b
-
-    the substring to be substituted in place of substring-a
-
-Outputs
-
-  Returns text -- a copy of the starting string, but with all occurences of
-                  substring-a replaced with substring-b
+  Not installed by default
+     deprecated
+     ------------
+     dblink(text,text) RETURNS setof int
+       - *DEPRECATED* returns a resource id for results from remote query
+         (Note: must uncomment in dblink.sql to use)
+     dblink_tok(int,int) RETURNS text
+       - *DEPRECATED* extracts and returns individual field results; used
+         only in conjunction with the *DEPRECATED* form of dblink
+         (Note: must uncomment in dblink.sql to use)
+     dblink_last_oid(int) RETURNS oid
+       - *DEPRECATED* returns the last inserted oid
 
-Example usage
+Documentation:
 
-test=# select dblink_replace('12345678901234567890','56','hello');
-       dblink_replace
-----------------------------
- 1234hello78901234hello7890
-(1 row)
+  See the following files:
+     doc/connection
+     doc/cursor
+     doc/query
+     doc/execute
+     doc/misc
+     doc/deprecated
 
 ==================================================================
-
-
 -- Joe Conway
 
index 0401e06f4f123e75ca29b3aa5704e5cd67108ed3..a6ede5ae1c1c81cd71c11f43f7ae06fe6cef84c7 100644 (file)
@@ -3,7 +3,9 @@
  *
  * Functions returning results from a remote database
  *
- * Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002,
+ * Joe Conway <mail@joeconway.com>
+ *
+ * Copyright (c) 2001, 2002 by PostgreSQL Global Development Group
  * ALL RIGHTS RESERVED;
  *
  * Permission to use, copy, modify, and distribute this software and its
  *
  */
 
-#include "dblink.h"
+#include <string.h>
+#include "postgres.h"
+#include "libpq-fe.h"
+#include "libpq-int.h"
+#include "fmgr.h"
+#include "funcapi.h"
+#include "access/tupdesc.h"
+#include "access/heapam.h"
+#include "catalog/catname.h"
+#include "catalog/namespace.h"
+#include "catalog/pg_index.h"
+#include "catalog/pg_type.h"
+#include "executor/executor.h"
+#include "executor/spi.h"
+#include "lib/stringinfo.h"
+#include "nodes/nodes.h"
+#include "nodes/execnodes.h"
+#include "nodes/pg_list.h"
+#include "parser/parse_type.h"
+#include "tcop/tcopprot.h"
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
+#include "utils/array.h"
+#include "utils/lsyscache.h"
+#include "utils/syscache.h"
 
+#include "dblink.h"
 
 /*
  * Internal declarations
  */
 static dblink_results *init_dblink_results(MemoryContext fn_mcxt);
-static dblink_array_results *init_dblink_array_results(MemoryContext fn_mcxt);
 static char **get_pkey_attnames(Oid relid, int16 *numatts);
-static char *get_strtok(char *fldtext, char *fldsep, int fldnum);
 static char *get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
 static char *get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals);
 static char *get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
@@ -43,14 +68,593 @@ static char *quote_ident_cstr(char *rawstr);
 static int16 get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key);
 static HeapTuple get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals);
 static Oid get_relid_from_relname(text *relname_text);
-static dblink_results  *get_res_ptr(int32 res_id_index);
+static dblink_results *get_res_ptr(int32 res_id_index);
 static void append_res_ptr(dblink_results *results);
 static void remove_res_ptr(dblink_results *results);
+static TupleDesc pgresultGetTupleDesc(PGresult *res);
 
 /* Global */
-List   *res_id = NIL;
-int            res_id_index = 0;
+List      *res_id = NIL;
+int                    res_id_index = 0;
+PGconn    *persistent_conn = NULL;
+
+#define GET_TEXT(cstrp) DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(cstrp)))
+#define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp)))
+#define xpfree(var_) \
+       do { \
+               if (var_ != NULL) \
+               { \
+                       pfree(var_); \
+                       var_ = NULL; \
+               } \
+       } while (0)
+
+
+/*
+ * Create a persistent connection to another database
+ */
+PG_FUNCTION_INFO_V1(dblink_connect);
+Datum
+dblink_connect(PG_FUNCTION_ARGS)
+{
+       char               *connstr = GET_STR(PG_GETARG_TEXT_P(0));
+       char               *msg;
+       text               *result_text;
+       MemoryContext   oldcontext;
+
+       if (persistent_conn != NULL)
+               PQfinish(persistent_conn);
+
+       oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+       persistent_conn = PQconnectdb(connstr);
+       MemoryContextSwitchTo(oldcontext);
+
+       if (PQstatus(persistent_conn) == CONNECTION_BAD)
+       {
+               msg = pstrdup(PQerrorMessage(persistent_conn));
+               PQfinish(persistent_conn);
+               persistent_conn = NULL;
+               elog(ERROR, "dblink_connect: connection error: %s", msg);
+       }
+
+       result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK")));
+       PG_RETURN_TEXT_P(result_text);
+}
+
+/*
+ * Clear a persistent connection to another database
+ */
+PG_FUNCTION_INFO_V1(dblink_disconnect);
+Datum
+dblink_disconnect(PG_FUNCTION_ARGS)
+{
+       text               *result_text;
+
+       if (persistent_conn != NULL)
+               PQfinish(persistent_conn);
+
+       persistent_conn = NULL;
+
+       result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK")));
+       PG_RETURN_TEXT_P(result_text);
+}
+
+/*
+ * opens a cursor using a persistent connection
+ */
+PG_FUNCTION_INFO_V1(dblink_open);
+Datum
+dblink_open(PG_FUNCTION_ARGS)
+{
+       char               *msg;
+       PGresult           *res = NULL;
+       PGconn             *conn = NULL;
+       text               *result_text;
+       char               *curname = GET_STR(PG_GETARG_TEXT_P(0));
+       char               *sql = GET_STR(PG_GETARG_TEXT_P(1));
+       StringInfo              str = makeStringInfo();
+
+       if (persistent_conn != NULL)
+               conn = persistent_conn;
+       else
+               elog(ERROR, "dblink_open: no connection available");
+
+       res = PQexec(conn, "BEGIN");
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+       {
+               msg = pstrdup(PQerrorMessage(conn));
+               PQclear(res);
+
+               PQfinish(conn);
+               persistent_conn = NULL;
+
+               elog(ERROR, "dblink_open: begin error: %s", msg);
+       }
+       PQclear(res);
+
+       appendStringInfo(str, "DECLARE %s CURSOR FOR %s", quote_ident_cstr(curname), sql);
+       res = PQexec(conn, str->data);
+       if (!res ||
+               (PQresultStatus(res) != PGRES_COMMAND_OK &&
+                PQresultStatus(res) != PGRES_TUPLES_OK))
+       {
+               msg = pstrdup(PQerrorMessage(conn));
+
+               PQclear(res);
+
+               PQfinish(conn);
+               persistent_conn = NULL;
+
+               elog(ERROR, "dblink: sql error: %s", msg);
+       }
+
+       result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK")));
+       PG_RETURN_TEXT_P(result_text);
+}
 
+/*
+ * closes a cursor
+ */
+PG_FUNCTION_INFO_V1(dblink_close);
+Datum
+dblink_close(PG_FUNCTION_ARGS)
+{
+       PGconn             *conn = NULL;
+       PGresult           *res = NULL;
+       char               *curname = GET_STR(PG_GETARG_TEXT_P(0));
+       StringInfo              str = makeStringInfo();
+       text               *result_text;
+       char               *msg;
+
+       if (persistent_conn != NULL)
+               conn = persistent_conn;
+       else
+               elog(ERROR, "dblink_close: no connection available");
+
+       appendStringInfo(str, "CLOSE %s", quote_ident_cstr(curname));
+
+       /* close the cursor */
+       res = PQexec(conn, str->data);
+       if (!res ||     PQresultStatus(res) != PGRES_COMMAND_OK)
+       {
+               msg = pstrdup(PQerrorMessage(conn));
+               PQclear(res);
+
+               PQfinish(persistent_conn);
+               persistent_conn = NULL;
+
+               elog(ERROR, "dblink_close: sql error: %s", msg);
+       }
+
+       PQclear(res);
+
+       /* commit the transaction */
+       res = PQexec(conn, "COMMIT");
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+       {
+               msg = pstrdup(PQerrorMessage(conn));
+               PQclear(res);
+
+               PQfinish(persistent_conn);
+               persistent_conn = NULL;
+
+               elog(ERROR, "dblink_close: commit error: %s", msg);
+       }
+       PQclear(res);
+
+       result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK")));
+       PG_RETURN_TEXT_P(result_text);
+}
+
+/*
+ * Fetch results from an open cursor
+ */
+PG_FUNCTION_INFO_V1(dblink_fetch);
+Datum
+dblink_fetch(PG_FUNCTION_ARGS)
+{
+       FuncCallContext    *funcctx;
+       TupleDesc                       tupdesc = NULL;
+       int                                     call_cntr;
+       int                                     max_calls;
+       TupleTableSlot     *slot;
+       AttInMetadata      *attinmeta;
+       char                       *msg;
+       PGresult                   *res = NULL;
+       MemoryContext           oldcontext;
+
+       /* stuff done only on the first call of the function */
+       if(SRF_IS_FIRSTCALL())
+       {
+               Oid                     functypeid;
+               char                    functyptype;
+               Oid                     funcid = fcinfo->flinfo->fn_oid;
+               PGconn             *conn = NULL;
+               StringInfo              str = makeStringInfo();
+               char               *curname = GET_STR(PG_GETARG_TEXT_P(0));
+               int                             howmany = PG_GETARG_INT32(1);
+
+               /* create a function context for cross-call persistence */
+               funcctx = SRF_FIRSTCALL_INIT();
+
+               /* switch to memory context appropriate for multiple function calls */
+               oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+               if (persistent_conn != NULL)
+                       conn = persistent_conn;
+               else
+                       elog(ERROR, "dblink_fetch: no connection available");
+
+               appendStringInfo(str, "FETCH %d FROM %s", howmany, quote_ident_cstr(curname));
+
+               res = PQexec(conn, str->data);
+               if (!res ||
+                       (PQresultStatus(res) != PGRES_COMMAND_OK &&
+                        PQresultStatus(res) != PGRES_TUPLES_OK))
+               {
+                       msg = pstrdup(PQerrorMessage(conn));
+                       PQclear(res);
+
+                       PQfinish(persistent_conn);
+                       persistent_conn = NULL;
+
+                       elog(ERROR, "dblink_fetch: sql error: %s", msg);
+               }
+               else if (PQresultStatus(res) == PGRES_COMMAND_OK)
+               {
+                       /* cursor does not exist - closed already or bad name */
+                       PQclear(res);
+                       elog(ERROR, "dblink_fetch: cursor %s does not exist", quote_ident_cstr(curname));
+               }
+
+               funcctx->max_calls = PQntuples(res);
+
+               /* got results, keep track of them */
+               funcctx->user_fctx = res;
+
+               /* fast track when no results */
+               if (funcctx->max_calls < 1)
+                       SRF_RETURN_DONE(funcctx);
+
+               /* check typtype to see if we have a predetermined return type */
+               functypeid = get_func_rettype(funcid);
+               functyptype = get_typtype(functypeid);
+
+               if (functyptype == 'c')
+                       tupdesc = TypeGetTupleDesc(functypeid, NIL);
+               else if (functyptype == 'p' && functypeid == RECORDOID)
+                       tupdesc = pgresultGetTupleDesc(res);
+               else if (functyptype == 'b')
+                       elog(ERROR, "dblink_fetch: invalid kind of return type specified for function");
+               else
+                       elog(ERROR, "dblink_fetch: unknown kind of return type specified for function");
+
+               /* store needed metadata for subsequent calls */
+               slot = TupleDescGetSlot(tupdesc);
+               funcctx->slot = slot;
+               attinmeta = TupleDescGetAttInMetadata(tupdesc);
+               funcctx->attinmeta = attinmeta;
+
+               MemoryContextSwitchTo(oldcontext);
+    }
+
+       /* stuff done on every call of the function */
+       funcctx = SRF_PERCALL_SETUP();
+
+       /*
+        * initialize per-call variables
+        */
+       call_cntr = funcctx->call_cntr;
+       max_calls = funcctx->max_calls;
+
+       slot = funcctx->slot;
+
+       res = (PGresult *) funcctx->user_fctx;
+       attinmeta = funcctx->attinmeta;
+       tupdesc = attinmeta->tupdesc;
+
+       if (call_cntr < max_calls)      /* do when there is more left to send */
+       {
+               char      **values;
+               HeapTuple       tuple;
+               Datum           result;
+               int             i;
+               int             nfields = PQnfields(res);
+
+               values = (char **) palloc(nfields * sizeof(char *));
+               for (i = 0; i < nfields; i++)
+               {
+                       if (PQgetisnull(res, call_cntr, i) == 0)
+                               values[i] = PQgetvalue(res, call_cntr, i);
+                       else
+                               values[i] = NULL;
+               }
+
+               /* build the tuple */
+               tuple = BuildTupleFromCStrings(attinmeta, values);
+
+               /* make the tuple into a datum */
+               result = TupleGetDatum(slot, tuple);
+
+               SRF_RETURN_NEXT(funcctx, result);
+       }
+       else    /* do when there is no more left */
+       {
+               PQclear(res);
+               SRF_RETURN_DONE(funcctx);
+       }
+}
+
+/*
+ * Note: this is the new preferred version of dblink
+ */
+PG_FUNCTION_INFO_V1(dblink_record);
+Datum
+dblink_record(PG_FUNCTION_ARGS)
+{
+       FuncCallContext    *funcctx;
+       TupleDesc                       tupdesc = NULL;
+       int                                     call_cntr;
+       int                                     max_calls;
+       TupleTableSlot     *slot;
+       AttInMetadata      *attinmeta;
+       char                       *msg;
+       PGresult                   *res = NULL;
+       bool                            is_sql_cmd = false;
+       char                       *sql_cmd_status = NULL;
+       MemoryContext           oldcontext;
+
+       /* stuff done only on the first call of the function */
+       if(SRF_IS_FIRSTCALL())
+       {
+               Oid                     functypeid;
+               char                    functyptype;
+               Oid                     funcid = fcinfo->flinfo->fn_oid;
+               PGconn             *conn = NULL;
+               char               *connstr = NULL;
+               char               *sql = NULL;
+
+               /* create a function context for cross-call persistence */
+               funcctx = SRF_FIRSTCALL_INIT();
+
+               /* switch to memory context appropriate for multiple function calls */
+               oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+               if (fcinfo->nargs == 2)
+               {
+                       connstr = GET_STR(PG_GETARG_TEXT_P(0));
+                       sql = GET_STR(PG_GETARG_TEXT_P(1));
+
+                       conn = PQconnectdb(connstr);
+                       if (PQstatus(conn) == CONNECTION_BAD)
+                       {
+                               msg = pstrdup(PQerrorMessage(conn));
+                               PQfinish(conn);
+                               elog(ERROR, "dblink: connection error: %s", msg);
+                       }
+               }
+               else if (fcinfo->nargs == 1)
+               {
+                       sql = GET_STR(PG_GETARG_TEXT_P(0));
+
+                       if (persistent_conn != NULL)
+                               conn = persistent_conn;
+                       else
+                               elog(ERROR, "dblink: no connection available");
+               }
+               else
+                       elog(ERROR, "dblink: wrong number of arguments");
+
+               res = PQexec(conn, sql);
+               if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK))
+               {
+                       msg = pstrdup(PQerrorMessage(conn));
+                       PQclear(res);
+                       PQfinish(conn);
+                       if (fcinfo->nargs == 1)
+                               persistent_conn = NULL;
+
+                       elog(ERROR, "dblink: sql error: %s", msg);
+               }
+               else
+               {
+                       if (PQresultStatus(res) == PGRES_COMMAND_OK)
+                       {
+                               is_sql_cmd = true;
+
+                               /* need a tuple descriptor representing one TEXT column */
+                               tupdesc = CreateTemplateTupleDesc(1, WITHOUTOID);
+                               TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
+                                                                  TEXTOID, -1, 0, false);
+
+                               /*
+                                * and save a copy of the command status string to return
+                                * as our result tuple
+                                */
+                               sql_cmd_status = PQcmdStatus(res);
+                               funcctx->max_calls = 1;
+                       }
+                       else
+                               funcctx->max_calls = PQntuples(res);
+
+                       /* got results, keep track of them */
+                       funcctx->user_fctx = res;
+
+                       /* if needed, close the connection to the database and cleanup */
+                       if (fcinfo->nargs == 2)
+                               PQfinish(conn);
+               }
+
+               /* fast track when no results */
+               if (funcctx->max_calls < 1)
+                       SRF_RETURN_DONE(funcctx);
+
+               /* check typtype to see if we have a predetermined return type */
+               functypeid = get_func_rettype(funcid);
+               functyptype = get_typtype(functypeid);
+
+               if (!is_sql_cmd)
+               {
+                       if (functyptype == 'c')
+                               tupdesc = TypeGetTupleDesc(functypeid, NIL);
+                       else if (functyptype == 'p' && functypeid == RECORDOID)
+                               tupdesc = pgresultGetTupleDesc(res);
+                       else if (functyptype == 'b')
+                               elog(ERROR, "Invalid kind of return type specified for function");
+                       else
+                               elog(ERROR, "Unknown kind of return type specified for function");
+               }
+
+               /* store needed metadata for subsequent calls */
+               slot = TupleDescGetSlot(tupdesc);
+               funcctx->slot = slot;
+               attinmeta = TupleDescGetAttInMetadata(tupdesc);
+               funcctx->attinmeta = attinmeta;
+
+               MemoryContextSwitchTo(oldcontext);
+    }
+
+       /* stuff done on every call of the function */
+       funcctx = SRF_PERCALL_SETUP();
+
+       /*
+        * initialize per-call variables
+        */
+       call_cntr = funcctx->call_cntr;
+       max_calls = funcctx->max_calls;
+
+       slot = funcctx->slot;
+
+       res = (PGresult *) funcctx->user_fctx;
+       attinmeta = funcctx->attinmeta;
+       tupdesc = attinmeta->tupdesc;
+
+       if (call_cntr < max_calls)      /* do when there is more left to send */
+       {
+               char      **values;
+               HeapTuple       tuple;
+               Datum           result;
+
+               if (!is_sql_cmd)
+               {
+                       int             i;
+                       int             nfields = PQnfields(res);
+
+                       values = (char **) palloc(nfields * sizeof(char *));
+                       for (i = 0; i < nfields; i++)
+                       {
+                               if (PQgetisnull(res, call_cntr, i) == 0)
+                                       values[i] = PQgetvalue(res, call_cntr, i);
+                               else
+                                       values[i] = NULL;
+                       }
+               }
+               else
+               {
+                       values = (char **) palloc(1 * sizeof(char *));
+                       values[0] = sql_cmd_status;
+               }
+
+               /* build the tuple */
+               tuple = BuildTupleFromCStrings(attinmeta, values);
+
+               /* make the tuple into a datum */
+               result = TupleGetDatum(slot, tuple);
+
+               SRF_RETURN_NEXT(funcctx, result);
+       }
+       else    /* do when there is no more left */
+       {
+               PQclear(res);
+               SRF_RETURN_DONE(funcctx);
+       }
+}
+
+/*
+ * Execute an SQL non-SELECT command
+ */
+PG_FUNCTION_INFO_V1(dblink_exec);
+Datum
+dblink_exec(PG_FUNCTION_ARGS)
+{
+       char               *msg;
+       PGresult           *res = NULL;
+       char               *sql_cmd_status = NULL;
+       TupleDesc               tupdesc = NULL;
+       text               *result_text;
+       PGconn             *conn = NULL;
+       char               *connstr = NULL;
+       char               *sql = NULL;
+
+       if (fcinfo->nargs == 2)
+       {
+               connstr = GET_STR(PG_GETARG_TEXT_P(0));
+               sql = GET_STR(PG_GETARG_TEXT_P(1));
+
+               conn = PQconnectdb(connstr);
+               if (PQstatus(conn) == CONNECTION_BAD)
+               {
+                       msg = pstrdup(PQerrorMessage(conn));
+                       PQfinish(conn);
+                       elog(ERROR, "dblink_exec: connection error: %s", msg);
+               }
+       }
+       else if (fcinfo->nargs == 1)
+       {
+               sql = GET_STR(PG_GETARG_TEXT_P(0));
+
+               if (persistent_conn != NULL)
+                       conn = persistent_conn;
+               else
+                       elog(ERROR, "dblink_exec: no connection available");
+       }
+       else
+               elog(ERROR, "dblink_exec: wrong number of arguments");
+
+
+       res = PQexec(conn, sql);
+       if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK))
+       {
+               msg = pstrdup(PQerrorMessage(conn));
+               PQclear(res);
+               PQfinish(conn);
+               if (fcinfo->nargs == 1)
+                       persistent_conn = NULL;
+
+               elog(ERROR, "dblink_exec: sql error: %s", msg);
+       }
+       else
+       {
+               if (PQresultStatus(res) == PGRES_COMMAND_OK)
+               {
+                       /* need a tuple descriptor representing one TEXT column */
+                       tupdesc = CreateTemplateTupleDesc(1, WITHOUTOID);
+                       TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
+                                                          TEXTOID, -1, 0, false);
+
+                       /*
+                        * and save a copy of the command status string to return
+                        * as our result tuple
+                        */
+                       sql_cmd_status = PQcmdStatus(res);
+               }
+               else
+                       elog(ERROR, "dblink_exec: queries returning results not allowed");
+       }
+       PQclear(res);
+
+       /* if needed, close the connection to the database and cleanup */
+       if (fcinfo->nargs == 2)
+               PQfinish(conn);
+
+       result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql_cmd_status)));
+       PG_RETURN_TEXT_P(result_text);
+}
+
+/*
+ * Note: this original version of dblink is DEPRECATED;
+ * it *will* be removed in favor of the new version on next release
+ */
 PG_FUNCTION_INFO_V1(dblink);
 Datum
 dblink(PG_FUNCTION_ARGS)
@@ -179,14 +783,15 @@ dblink(PG_FUNCTION_ARGS)
        PG_RETURN_NULL();
 }
 
-
 /*
+ * Note: dblink_tok is DEPRECATED;
+ * it *will* be removed in favor of the new version on next release
+ *
  * dblink_tok
  * parse dblink output string
  * return fldnum item (0 based)
  * based on provided field separator
  */
-
 PG_FUNCTION_INFO_V1(dblink_tok);
 Datum
 dblink_tok(PG_FUNCTION_ARGS)
@@ -241,162 +846,121 @@ dblink_tok(PG_FUNCTION_ARGS)
        }
 }
 
-
-/*
- * dblink_strtok
- * parse input string
- * return ord item (0 based)
- * based on provided field separator
- */
-PG_FUNCTION_INFO_V1(dblink_strtok);
-Datum
-dblink_strtok(PG_FUNCTION_ARGS)
-{
-       char            *fldtext;
-       char            *fldsep;
-       int                     fldnum;
-       char            *buffer;
-       text            *result_text;
-
-       fldtext = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0))));
-       fldsep = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1))));
-       fldnum = PG_GETARG_INT32(2);
-
-       if (fldtext[0] == '\0')
-       {
-               elog(ERROR, "get_strtok: blank list not permitted");
-       }
-       if (fldsep[0] == '\0')
-       {
-               elog(ERROR, "get_strtok: blank field separator not permitted");
-       }
-
-       buffer = get_strtok(fldtext, fldsep, fldnum);
-
-       pfree(fldtext);
-       pfree(fldsep);
-
-       if (buffer == NULL)
-       {
-               PG_RETURN_NULL();
-       }
-       else
-       {
-               result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(buffer)));
-               pfree(buffer);
-
-               PG_RETURN_TEXT_P(result_text);
-       }
-}
-
-
 /*
  * dblink_get_pkey
  * 
- * Return comma delimited list of primary key
- * fields for the supplied relation,
+ * Return list of primary key fields for the supplied relation,
  * or NULL if none exists.
  */
 PG_FUNCTION_INFO_V1(dblink_get_pkey);
 Datum
 dblink_get_pkey(PG_FUNCTION_ARGS)
 {
-       text                                    *relname_text;
-       Oid                                             relid;
-       char                                    **result;
-       text                                    *result_text;
-       int16                                   numatts;
-       ReturnSetInfo                   *rsi;
-       dblink_array_results    *ret_set;
+       int16                           numatts;
+       Oid                                     relid;
+       char                      **results;
+       FuncCallContext    *funcctx;
+       int32                           call_cntr;
+       int32                           max_calls;
+       TupleTableSlot     *slot;
+       AttInMetadata      *attinmeta;
+       MemoryContext           oldcontext;
+
+       /* stuff done only on the first call of the function */
+       if(SRF_IS_FIRSTCALL())
+       {
+               TupleDesc               tupdesc = NULL;
+
+               /* create a function context for cross-call persistence */
+               funcctx = SRF_FIRSTCALL_INIT();
+
+               /* switch to memory context appropriate for multiple function calls */
+               oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+               /* convert relname to rel Oid */
+               relid = get_relid_from_relname(PG_GETARG_TEXT_P(0));
+               if (!OidIsValid(relid))
+                       elog(ERROR, "dblink_get_pkey: relation does not exist");
 
-       if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo))
-               elog(ERROR, "dblink: function called in context that does not accept a set result");
+               /* need a tuple descriptor representing one INT and one TEXT column */
+               tupdesc = CreateTemplateTupleDesc(2, WITHOUTOID);
+               TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
+                                                  INT4OID, -1, 0, false);
+               TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
+                                                  TEXTOID, -1, 0, false);
 
-       if (fcinfo->flinfo->fn_extra == NULL)
-       {
-               relname_text = PG_GETARG_TEXT_P(0);
+               /* allocate a slot for a tuple with this tupdesc */
+               slot = TupleDescGetSlot(tupdesc);
 
-               /*
-                * Convert relname to rel OID.
-                */
-               relid = get_relid_from_relname(relname_text);
-               if (!OidIsValid(relid))
-                       elog(ERROR, "dblink_get_pkey: relation does not exist");
+               /* assign slot to function context */
+               funcctx->slot = slot;
 
                /*
-                * get an array of attnums.
+                * Generate attribute metadata needed later to produce tuples from raw
+                * C strings
                 */
-               result = get_pkey_attnames(relid, &numatts);
+               attinmeta = TupleDescGetAttInMetadata(tupdesc);
+               funcctx->attinmeta = attinmeta;
+
+               /* get an array of attnums */
+               results = get_pkey_attnames(relid, &numatts);
 
-               if ((result != NULL) && (numatts > 0))
+               if ((results != NULL) && (numatts > 0))
                {
-                       ret_set = init_dblink_array_results(fcinfo->flinfo->fn_mcxt);
+                       funcctx->max_calls = numatts;
 
-                       ret_set->elem_num = 0;
-                       ret_set->num_elems = numatts;
-                       ret_set->res = result;
+                       /* got results, keep track of them */
+                       funcctx->user_fctx = results;
+               }
+               else    /* fast track when no results */
+                       SRF_RETURN_DONE(funcctx);
 
-                       fcinfo->flinfo->fn_extra = (void *) ret_set;
+               MemoryContextSwitchTo(oldcontext);
+    }
 
-                       rsi = (ReturnSetInfo *) fcinfo->resultinfo;
-                       rsi->isDone = ExprMultipleResult;
+       /* stuff done on every call of the function */
+       funcctx = SRF_PERCALL_SETUP();
 
-                       result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num])));
+       /*
+        * initialize per-call variables
+        */
+       call_cntr = funcctx->call_cntr;
+       max_calls = funcctx->max_calls;
 
-                       PG_RETURN_TEXT_P(result_text);
-               }
-               else
-               {
-                       rsi = (ReturnSetInfo *) fcinfo->resultinfo;
-                       rsi->isDone = ExprEndResult;
+       slot = funcctx->slot;
 
-                       PG_RETURN_NULL();
-               }
-       }
-       else
-       {
-               /*
-                * check for more results
-                */
-               ret_set = fcinfo->flinfo->fn_extra;
-               ret_set->elem_num++;
-               result = ret_set->res;
+       results = (char **) funcctx->user_fctx;
+       attinmeta = funcctx->attinmeta;
 
-               if (ret_set->elem_num < ret_set->num_elems)
-               {
-                       /*
-                        * fetch next one
-                        */
-                       rsi = (ReturnSetInfo *) fcinfo->resultinfo;
-                       rsi->isDone = ExprMultipleResult;
+       if (call_cntr < max_calls)      /* do when there is more left to send */
+       {
+               char      **values;
+               HeapTuple       tuple;
+               Datum           result;
 
-                       result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num])));
-                       PG_RETURN_TEXT_P(result_text);
-               }
-               else
-               {
-                       int             i;
+               values = (char **) palloc(2 * sizeof(char *));
+               values[0] = (char *) palloc(12);        /* sign, 10 digits, '\0' */
 
-                       /*
-                        * or if no more, clean things up
-                        */
-                       for (i = 0; i < ret_set->num_elems; i++)
-                               pfree(result[i]);
+               sprintf(values[0], "%d", call_cntr + 1);
 
-                       pfree(ret_set->res);
-                       pfree(ret_set);
+               values[1] = results[call_cntr];
 
-                       rsi = (ReturnSetInfo *) fcinfo->resultinfo;
-                       rsi->isDone = ExprEndResult;
+               /* build the tuple */
+               tuple = BuildTupleFromCStrings(attinmeta, values);
 
-                       PG_RETURN_NULL();
-               }
+               /* make the tuple into a datum */
+               result = TupleGetDatum(slot, tuple);
+
+               SRF_RETURN_NEXT(funcctx, result);
        }
-       PG_RETURN_NULL();
+       else    /* do when there is no more left */
+               SRF_RETURN_DONE(funcctx);
 }
 
-
 /*
+ * Note: dblink_last_oid is DEPRECATED;
+ * it *will* be removed on next release
+ *
  * dblink_last_oid
  * return last inserted oid
  */
@@ -447,23 +1011,26 @@ Datum
 dblink_build_sql_insert(PG_FUNCTION_ARGS)
 {
        Oid                     relid;
-       text            *relname_text;
-       int16           *pkattnums;
+       text       *relname_text;
+       int16      *pkattnums;
        int16           pknumatts;
-       char            **src_pkattvals;
-       char            **tgt_pkattvals;
-       ArrayType       *src_pkattvals_arry;
-       ArrayType       *tgt_pkattvals_arry;
+       char      **src_pkattvals;
+       char      **tgt_pkattvals;
+       ArrayType  *src_pkattvals_arry;
+       ArrayType  *tgt_pkattvals_arry;
        int                     src_ndim;
-       int                     *src_dim;
+       int                *src_dim;
        int                     src_nitems;
        int                     tgt_ndim;
        int                     *tgt_dim;
        int                     tgt_nitems;
        int                     i;
-       char            *ptr;
-       char            *sql;
-       text            *sql_text;
+       char       *ptr;
+       char       *sql;
+       text       *sql_text;
+       int16           typlen;
+       bool            typbyval;
+       char            typalign;
 
        relname_text = PG_GETARG_TEXT_P(0);
 
@@ -503,12 +1070,16 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS)
         * get array of pointers to c-strings from the input source array
         */
        Assert(ARR_ELEMTYPE(src_pkattvals_arry) == TEXTOID);
+       get_typlenbyvalalign(ARR_ELEMTYPE(src_pkattvals_arry),
+                                                                       &typlen, &typbyval, &typalign);
+
        src_pkattvals = (char **) palloc(src_nitems * sizeof(char *));
        ptr = ARR_DATA_PTR(src_pkattvals_arry);
        for (i = 0; i < src_nitems; i++)
        {
                src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
-               ptr += INTALIGN(*(int32 *) ptr);
+               ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr));
+               ptr = (char *) att_align(ptr, typalign);
        }
 
        /*
@@ -529,12 +1100,16 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS)
         * get array of pointers to c-strings from the input target array
         */
        Assert(ARR_ELEMTYPE(tgt_pkattvals_arry) == TEXTOID);
+       get_typlenbyvalalign(ARR_ELEMTYPE(tgt_pkattvals_arry),
+                                                                       &typlen, &typbyval, &typalign);
+
        tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
        ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
        for (i = 0; i < tgt_nitems; i++)
        {
                tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
-               ptr += INTALIGN(*(int32 *) ptr);
+               ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr));
+               ptr = (char *) att_align(ptr, typalign);
        }
 
        /*
@@ -586,6 +1161,9 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS)
        char            *ptr;
        char            *sql;
        text            *sql_text;
+       int16           typlen;
+       bool            typbyval;
+       char            typalign;
 
        relname_text = PG_GETARG_TEXT_P(0);
 
@@ -624,12 +1202,16 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS)
         * get array of pointers to c-strings from the input target array
         */
        Assert(ARR_ELEMTYPE(tgt_pkattvals_arry) == TEXTOID);
+       get_typlenbyvalalign(ARR_ELEMTYPE(tgt_pkattvals_arry),
+                                                                       &typlen, &typbyval, &typalign);
+
        tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
        ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
        for (i = 0; i < tgt_nitems; i++)
        {
                tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
-               ptr += INTALIGN(*(int32 *) ptr);
+               ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr));
+               ptr = (char *) att_align(ptr, typalign);
        }
 
        /*
@@ -690,6 +1272,9 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
        char            *ptr;
        char            *sql;
        text            *sql_text;
+       int16           typlen;
+       bool            typbyval;
+       char            typalign;
 
        relname_text = PG_GETARG_TEXT_P(0);
 
@@ -729,12 +1314,16 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
         * get array of pointers to c-strings from the input source array
         */
        Assert(ARR_ELEMTYPE(src_pkattvals_arry) == TEXTOID);
+       get_typlenbyvalalign(ARR_ELEMTYPE(src_pkattvals_arry),
+                                                                       &typlen, &typbyval, &typalign);
+
        src_pkattvals = (char **) palloc(src_nitems * sizeof(char *));
        ptr = ARR_DATA_PTR(src_pkattvals_arry);
        for (i = 0; i < src_nitems; i++)
        {
                src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
-               ptr += INTALIGN(*(int32 *) ptr);
+               ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr));
+               ptr = (char *) att_align(ptr, typalign);
        }
 
        /*
@@ -755,12 +1344,16 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
         * get array of pointers to c-strings from the input target array
         */
        Assert(ARR_ELEMTYPE(tgt_pkattvals_arry) == TEXTOID);
+       get_typlenbyvalalign(ARR_ELEMTYPE(tgt_pkattvals_arry),
+                                                                       &typlen, &typbyval, &typalign);
+
        tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
        ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
        for (i = 0; i < tgt_nitems; i++)
        {
                tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
-               ptr += INTALIGN(*(int32 *) ptr);
+               ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr));
+               ptr = (char *) att_align(ptr, typalign);
        }
 
        /*
@@ -779,7 +1372,6 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
        PG_RETURN_TEXT_P(sql_text);
 }
 
-
 /*
  * dblink_current_query
  * return the current query string
@@ -797,64 +1389,6 @@ dblink_current_query(PG_FUNCTION_ARGS)
 }
 
 
-/*
- * dblink_replace_text
- * replace all occurences of 'old_sub_str' in 'orig_str'
- * with 'new_sub_str' to form 'new_str'
- * 
- * returns 'orig_str' if 'old_sub_str' == '' or 'orig_str' == ''
- * otherwise returns 'new_str' 
- */
-PG_FUNCTION_INFO_V1(dblink_replace_text);
-Datum
-dblink_replace_text(PG_FUNCTION_ARGS)
-{
-       text            *left_text;
-       text            *right_text;
-       text            *buf_text;
-       text            *ret_text;
-       char            *ret_str;
-       int                     curr_posn;
-       text            *src_text = PG_GETARG_TEXT_P(0);
-       int                     src_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(src_text)));
-       text            *from_sub_text = PG_GETARG_TEXT_P(1);
-       int                     from_sub_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(from_sub_text)));
-       text            *to_sub_text = PG_GETARG_TEXT_P(2);
-       char            *to_sub_str = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(to_sub_text)));
-       StringInfo      str = makeStringInfo();
-
-       if (src_text_len == 0 || from_sub_text_len == 0)
-               PG_RETURN_TEXT_P(src_text);
-
-       buf_text = DatumGetTextPCopy(PointerGetDatum(src_text));
-       curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text)));
-
-       while (curr_posn > 0)
-       {
-               left_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text), 1, DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) - 1));
-               right_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text), DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) + from_sub_text_len, -1));
-
-               appendStringInfo(str, "%s",
-                                                DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(left_text))));
-               appendStringInfo(str, "%s", to_sub_str);
-
-               pfree(buf_text);
-               pfree(left_text);
-               buf_text = right_text;
-               curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text)));
-       }
-
-       appendStringInfo(str, "%s",
-                                        DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(buf_text))));
-       pfree(buf_text);
-
-       ret_str = pstrdup(str->data);
-       ret_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(ret_str)));
-
-       PG_RETURN_TEXT_P(ret_text);
-}
-
-
 /*************************************************************
  * internal functions
  */
@@ -884,31 +1418,6 @@ init_dblink_results(MemoryContext fn_mcxt)
        return retval;
 }
 
-
-/*
- * init_dblink_array_results
- *      - create an empty dblink_array_results data structure
- */
-static dblink_array_results *
-init_dblink_array_results(MemoryContext fn_mcxt)
-{
-       MemoryContext oldcontext;
-       dblink_array_results *retval;
-
-       oldcontext = MemoryContextSwitchTo(fn_mcxt);
-
-       retval = (dblink_array_results *) palloc(sizeof(dblink_array_results));
-       MemSet(retval, 0, sizeof(dblink_array_results));
-
-       retval->elem_num = -1;
-       retval->num_elems = 0;
-       retval->res = NULL;
-
-       MemoryContextSwitchTo(oldcontext);
-
-       return retval;
-}
-
 /*
  * get_pkey_attnames
  * 
@@ -927,21 +1436,14 @@ get_pkey_attnames(Oid relid, int16 *numatts)
        Relation                rel;
        TupleDesc               tupdesc;
 
-       /*
-        * Open relation using relid, get tupdesc
-        */
+       /* open relation using relid, get tupdesc */
        rel = relation_open(relid, AccessShareLock);
        tupdesc = rel->rd_att;
 
-       /*
-        * Initialize numatts to 0 in case no primary key
-        * exists
-        */
+       /* initialize numatts to 0 in case no primary key exists */
        *numatts = 0;
 
-       /*
-        * Use relid to get all related indexes
-        */
+       /* use relid to get all related indexes */
        indexRelation = heap_openr(IndexRelationName, AccessShareLock);
        ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indrelid,
                                                   F_OIDEQ, ObjectIdGetDatum(relid));
@@ -951,9 +1453,7 @@ get_pkey_attnames(Oid relid, int16 *numatts)
        {
                Form_pg_index   index = (Form_pg_index) GETSTRUCT(indexTuple);
 
-               /*
-                * We're only interested if it is the primary key
-                */
+               /* we're only interested if it is the primary key */
                if (index->indisprimary == TRUE)
                {
                        i = 0;
@@ -963,6 +1463,7 @@ get_pkey_attnames(Oid relid, int16 *numatts)
                        if (*numatts > 0)
                        {
                                result = (char **) palloc(*numatts * sizeof(char *));
+
                                for (i = 0; i < *numatts; i++)
                                        result[i] = SPI_fname(tupdesc, index->indkey[i]);
                        }
@@ -976,41 +1477,6 @@ get_pkey_attnames(Oid relid, int16 *numatts)
        return result;
 }
 
-
-/*
- * get_strtok
- * 
- * parse input string
- * return ord item (0 based)
- * based on provided field separator
- */
-static char *
-get_strtok(char *fldtext, char *fldsep, int fldnum)
-{
-       int                     j = 0;
-       char            *result;
-
-       if (fldnum < 0)
-       {
-               elog(ERROR, "get_strtok: field number < 0 not permitted");
-       }
-
-       if (fldsep[0] == '\0')
-       {
-               elog(ERROR, "get_strtok: blank field separator not permitted");
-       }
-
-       result = strtok(fldtext, fldsep);
-       for (j = 1; j < fldnum + 1; j++)
-       {
-               result = strtok(NULL, fldsep);
-               if (result == NULL)
-                       return NULL;
-       } 
-
-       return pstrdup(result);
-}
-
 static char *
 get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
 {
@@ -1035,6 +1501,8 @@ get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattval
        natts = tupdesc->natts;
 
        tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
+       if (!tuple)
+               elog(ERROR, "dblink_build_sql_insert: row not found");
 
        appendStringInfo(str, "INSERT INTO %s(", quote_ident_cstr(relname));
 
@@ -1175,6 +1643,8 @@ get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattval
        natts = tupdesc->natts;
 
        tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
+       if (!tuple)
+               elog(ERROR, "dblink_build_sql_update: row not found");
 
        appendStringInfo(str, "UPDATE %s SET ", quote_ident_cstr(relname));
 
@@ -1314,7 +1784,8 @@ get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_p
         */
        rel = relation_open(relid, AccessShareLock);
        relname =  RelationGetRelationName(rel);
-       tupdesc = rel->rd_att;
+       tupdesc = CreateTupleDescCopy(rel->rd_att);
+       relation_close(rel, AccessShareLock);
 
        /*
         * Connect to SPI manager
@@ -1388,7 +1859,6 @@ get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_p
 static Oid
 get_relid_from_relname(text *relname_text)
 {
-#ifdef NamespaceRelationName
        RangeVar   *relvar;
        Relation        rel;
        Oid                     relid;
@@ -1397,16 +1867,6 @@ get_relid_from_relname(text *relname_text)
        rel = heap_openrv(relvar, AccessShareLock);
        relid = RelationGetRelid(rel);
        relation_close(rel, AccessShareLock);
-#else
-       char       *relname;
-       Relation        rel;
-       Oid                     relid;
-
-       relname = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(relname_text)));
-       rel = relation_openr(relname, AccessShareLock);
-       relid = RelationGetRelid(rel);
-       relation_close(rel, AccessShareLock);
-#endif   /* NamespaceRelationName */
 
        return relid;
 }
@@ -1456,3 +1916,55 @@ remove_res_ptr(dblink_results *results)
                res_id_index = 0;
 }
 
+static TupleDesc
+pgresultGetTupleDesc(PGresult *res)
+{
+       int                             natts;
+       AttrNumber              attnum;
+       TupleDesc               desc;
+       char               *attname;
+       int32                   atttypmod;
+       int                             attdim;
+       bool                    attisset;
+       Oid                             atttypid;
+       int                             i;
+
+       /*
+        * allocate a new tuple descriptor
+        */
+       natts = PQnfields(res);
+       if (natts < 1)
+               elog(ERROR, "cannot create a description for empty results");
+
+       desc = CreateTemplateTupleDesc(natts, WITHOUTOID);
+
+       attnum = 0;
+
+       for (i = 0; i < natts; i++)
+       {
+               /*
+                * for each field, get the name and type information from the query
+                * result and have TupleDescInitEntry fill in the attribute
+                * information we need.
+                */
+               attnum++;
+
+               attname = PQfname(res, i);
+               atttypid = PQftype(res, i);
+               atttypmod = PQfmod(res, i);
+
+               if (PQfsize(res, i) != get_typlen(atttypid))
+                       elog(ERROR, "Size of remote field \"%s\" does not match size "
+                               "of local type \"%s\"",
+                               attname,
+                               format_type_with_typemod(atttypid, atttypmod));
+
+               attdim = 0;
+               attisset = false;
+
+               TupleDescInitEntry(desc, attnum, attname, atttypid,
+                                                  atttypmod, attdim, attisset);
+       }
+
+       return desc;
+}
index 4d53005ac604b1cda72bd2d30897deba224f62e5..ddca6241c4a03e89e26b69e730249e8a83143365 100644 (file)
@@ -3,7 +3,9 @@
  *
  * Functions returning results from a remote database
  *
- * Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002,
+ * Joe Conway <mail@joeconway.com>
+ *
+ * Copyright (c) 2001, 2002 by PostgreSQL Global Development Group
  * ALL RIGHTS RESERVED;
  *
  * Permission to use, copy, modify, and distribute this software and its
 #ifndef DBLINK_H
 #define DBLINK_H
 
-#include <string.h>
-#include "postgres.h"
-#include "libpq-fe.h"
-#include "libpq-int.h"
-#include "fmgr.h"
-#include "access/tupdesc.h"
-#include "access/heapam.h"
-#include "catalog/catname.h"
-#include "catalog/pg_index.h"
-#include "catalog/pg_type.h"
-#include "executor/executor.h"
-#include "executor/spi.h"
-#include "lib/stringinfo.h"
-#include "nodes/nodes.h"
-#include "nodes/execnodes.h"
-#include "nodes/pg_list.h"
-#include "parser/parse_type.h"
-#include "tcop/tcopprot.h"
-#include "utils/builtins.h"
-#include "utils/fmgroids.h"
-#include "utils/array.h"
-#include "utils/syscache.h"
-
-#ifdef NamespaceRelationName
-#include "catalog/namespace.h"
-#endif   /* NamespaceRelationName */
-
-/*
- * Max SQL statement size
- */
-#define DBLINK_MAX_SQLSTATE_SIZE               16384
-
 /*
  * This struct holds the results of the remote query.
  * Use fn_extra to hold a pointer to it across calls
@@ -82,43 +52,27 @@ typedef struct
        PGresult   *res;
 }      dblink_results;
 
-
-/*
- * This struct holds results in the form of an array.
- * Use fn_extra to hold a pointer to it across calls
- */
-typedef struct
-{
-       /*
-        * elem being accessed
-        */
-       int                     elem_num;
-
-       /*
-        * number of elems
-        */
-       int                     num_elems;
-
-       /*
-        * the actual array
-        */
-       void            *res;
-
-}      dblink_array_results;
-
 /*
  * External declarations
  */
+/* deprecated */
 extern Datum dblink(PG_FUNCTION_ARGS);
 extern Datum dblink_tok(PG_FUNCTION_ARGS);
-extern Datum dblink_strtok(PG_FUNCTION_ARGS);
+
+/* supported */
+extern Datum dblink_connect(PG_FUNCTION_ARGS);
+extern Datum dblink_disconnect(PG_FUNCTION_ARGS);
+extern Datum dblink_open(PG_FUNCTION_ARGS);
+extern Datum dblink_close(PG_FUNCTION_ARGS);
+extern Datum dblink_fetch(PG_FUNCTION_ARGS);
+extern Datum dblink_record(PG_FUNCTION_ARGS);
+extern Datum dblink_exec(PG_FUNCTION_ARGS);
 extern Datum dblink_get_pkey(PG_FUNCTION_ARGS);
 extern Datum dblink_last_oid(PG_FUNCTION_ARGS);
 extern Datum dblink_build_sql_insert(PG_FUNCTION_ARGS);
 extern Datum dblink_build_sql_delete(PG_FUNCTION_ARGS);
 extern Datum dblink_build_sql_update(PG_FUNCTION_ARGS);
 extern Datum dblink_current_query(PG_FUNCTION_ARGS);
-extern Datum dblink_replace_text(PG_FUNCTION_ARGS);
 
 extern char    *debug_query_string;
 
index bea4378907244508a937a86cad9aa37e4ea47e13..b92801a5c51ecac8e3510ca3dde125a75b881ffb 100644 (file)
@@ -1,21 +1,58 @@
-CREATE OR REPLACE FUNCTION dblink (text,text) RETURNS setof int
-  AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c'
+-- Uncomment the following 9 lines to use original DEPRECATED functions
+--CREATE OR REPLACE FUNCTION dblink (text,text) RETURNS setof int
+--  AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c'
+--  WITH (isstrict);
+--CREATE OR REPLACE FUNCTION dblink_tok (int,int) RETURNS text
+--  AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c'
+--  WITH (isstrict);
+--CREATE OR REPLACE FUNCTION dblink_last_oid (int) RETURNS oid
+--  AS 'MODULE_PATHNAME','dblink_last_oid' LANGUAGE 'c'
+--  WITH (isstrict);
+
+CREATE OR REPLACE FUNCTION dblink_connect (text) RETURNS text
+  AS 'MODULE_PATHNAME','dblink_connect' LANGUAGE 'c'
   WITH (isstrict);
 
-CREATE OR REPLACE FUNCTION dblink_tok (int,int) RETURNS text
-  AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c'
+CREATE OR REPLACE FUNCTION dblink_disconnect () RETURNS text
+  AS 'MODULE_PATHNAME','dblink_disconnect' LANGUAGE 'c'
   WITH (isstrict);
 
-CREATE OR REPLACE FUNCTION dblink_strtok (text,text,int) RETURNS text
-  AS 'MODULE_PATHNAME','dblink_strtok' LANGUAGE 'c'
-  WITH (iscachable, isstrict);
+CREATE OR REPLACE FUNCTION dblink_open (text,text) RETURNS text
+  AS 'MODULE_PATHNAME','dblink_open' LANGUAGE 'c'
+  WITH (isstrict);
 
-CREATE OR REPLACE FUNCTION dblink_get_pkey (text) RETURNS setof text
-  AS 'MODULE_PATHNAME','dblink_get_pkey' LANGUAGE 'c'
+CREATE OR REPLACE FUNCTION dblink_fetch (text,int) RETURNS setof record
+  AS 'MODULE_PATHNAME','dblink_fetch' LANGUAGE 'c'
+  WITH (isstrict);
+
+CREATE OR REPLACE FUNCTION dblink_close (text) RETURNS text
+  AS 'MODULE_PATHNAME','dblink_close' LANGUAGE 'c'
+  WITH (isstrict);
+
+-- Note: if this is a first time install of dblink, the following DROP
+-- FUNCTION line is expected to fail.
+-- Comment out the following 4 lines if the DEPRECATED functions are used.
+DROP FUNCTION dblink (text,text);
+CREATE OR REPLACE FUNCTION dblink (text,text) RETURNS setof record
+  AS 'MODULE_PATHNAME','dblink_record' LANGUAGE 'c'
+  WITH (isstrict);
+
+CREATE OR REPLACE FUNCTION dblink (text) RETURNS setof record
+  AS 'MODULE_PATHNAME','dblink_record' LANGUAGE 'c'
+  WITH (isstrict);
+
+CREATE OR REPLACE FUNCTION dblink_exec (text,text) RETURNS text
+  AS 'MODULE_PATHNAME','dblink_exec' LANGUAGE 'c'
+  WITH (isstrict);
+
+CREATE OR REPLACE FUNCTION dblink_exec (text) RETURNS text
+  AS 'MODULE_PATHNAME','dblink_exec' LANGUAGE 'c'
   WITH (isstrict);
 
-CREATE OR REPLACE FUNCTION dblink_last_oid (int) RETURNS oid
-  AS 'MODULE_PATHNAME','dblink_last_oid' LANGUAGE 'c'
+CREATE TYPE dblink_pkey_results AS (position int4, colname text);
+
+CREATE OR REPLACE FUNCTION dblink_get_pkey (text) RETURNS setof dblink_pkey_results
+  AS 'MODULE_PATHNAME','dblink_get_pkey' LANGUAGE 'c'
   WITH (isstrict);
 
 CREATE OR REPLACE FUNCTION dblink_build_sql_insert (text, int2vector, int2, _text, _text) RETURNS text
@@ -32,7 +69,3 @@ CREATE OR REPLACE FUNCTION dblink_build_sql_update (text, int2vector, int2, _tex
 
 CREATE OR REPLACE FUNCTION dblink_current_query () RETURNS text
   AS 'MODULE_PATHNAME','dblink_current_query' LANGUAGE 'c';
-
-CREATE OR REPLACE FUNCTION dblink_replace (text,text,text) RETURNS text
-  AS 'MODULE_PATHNAME','dblink_replace_text' LANGUAGE 'c'
-  WITH (iscachable, isstrict);