*
* Functions returning results from a remote database
*
- * Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002,
+ * Joe Conway <mail@joeconway.com>
+ *
+ * Copyright (c) 2001, 2002 by PostgreSQL Global Development Group
* ALL RIGHTS RESERVED;
*
* Permission to use, copy, modify, and distribute this software and its
*
*/
-
-Version 0.4 (7 April, 2002):
- Functions allowing remote database INSERT/UPDATE/DELETE/SELECT, and
- various utility functions.
- Tested under Linux (Red Hat 7.2) and PostgreSQL 7.2 and 7.3devel
+Version 0.5 (25 August, 2002):
+ Major overhaul to work with new backend "table function" capability. Removed
+ dblink_strtok() and dblink_replace() functions because they are now
+ available as backend functions (split() and replace() respectively).
+ Tested under Linux (Red Hat 7.3) and PostgreSQL 7.3devel. This version
+ is no longer backwards portable to PostgreSQL 7.2.
Release Notes:
+ Version 0.5
+ - dblink now supports use directly as a table function; this is the new
+ preferred usage going forward
+ - Use of dblink_tok is now deprecated; original form of dblink is also
+ deprecated. They _will_ be removed in the next version.
+ - dblink_last_oid is also deprecated; use dblink_exec() which returns
+ the command status as a single row, single column result.
+ - Original dblink, dblink_tok, and dblink_last_oid are commented out in
+ dblink.sql; remove the comments to use the deprecated functions.
+ - dblink_strtok() and dblink_replace() functions were removed. Use
+ split() and replace() respectively (new backend functions in
+ PostgreSQL 7.3) instead.
+ - New functions: dblink_exec() for non-SELECT queries; dblink_connect()
+ opens connection that persists for duration of a backend;
+ dblink_disconnect() closes a persistent connection; dblink_open()
+ opens a cursor; dblink_fetch() fetches results from an open cursor.
+ dblink_close() closes a cursor.
+ - New test suite: dblink_check.sh, dblink.test.sql,
+ dblink.test.expected.out. Execute dblink_check.sh from the same
+ directory as the other two files. Output is dblink.test.out and
+ dblink.test.diff. Note that dblink.test.sql is a good source
+ of example usage.
Version 0.4
- removed cursor wrap around input sql to allow for remote
installs following functions into database template1:
- dblink(text,text) RETURNS setof int
- - returns a resource id for results from remote query
- dblink_tok(int,int) RETURNS text
- - extracts and returns individual field results
- dblink_strtok(text,text,int) RETURNS text
- - extracts and returns individual token from delimited text
+ connection
+ ------------
+ dblink_connect(text) RETURNS text
+ - opens a connection that will persist for duration of current
+ backend or until it is disconnected
+ dblink_disconnect() RETURNS text
+ - disconnects a persistent connection
+
+ cursor
+ ------------
+ dblink_open(text,text) RETURNS text
+ - opens a cursor using connection already opened with dblink_connect()
+ that will persist for duration of current backend or until it is
+ closed
+ dblink_fetch(text, int) RETURNS setof record
+ - fetches data from an already opened cursor
+ dblink_close(text) RETURNS text
+ - closes a cursor
+
+ query
+ ------------
+ dblink(text,text) RETURNS setof record
+ - returns a set of results from remote SELECT query
+ (Note: comment out in dblink.sql to use deprecated version)
+ dblink(text) RETURNS setof record
+ - returns a set of results from remote SELECT query, using connection
+ already opened with dblink_connect()
+
+ execute
+ ------------
+ dblink_exec(text, text) RETURNS text
+ - executes an INSERT/UPDATE/DELETE query remotely
+ dblink_exec(text) RETURNS text
+ - executes an INSERT/UPDATE/DELETE query remotely, using connection
+ already opened with dblink_connect()
+
+ misc
+ ------------
+ dblink_current_query() RETURNS text
+ - returns the current query string
dblink_get_pkey(text) RETURNS setof text
- returns the field names of a relation's primary key fields
- dblink_last_oid(int) RETURNS oid
- - returns the last inserted oid
dblink_build_sql_insert(text,int2vector,int2,_text,_text) RETURNS text
- builds an insert statement using a local tuple, replacing the
selection key field values with alternate supplied values
dblink_build_sql_update(text,int2vector,int2,_text,_text) RETURNS text
- builds an update statement using a local tuple, replacing the
selection key field values with alternate supplied values
- dblink_current_query() RETURNS text
- - returns the current query string
- dblink_replace(text,text,text) RETURNS text
- - replace all occurences of substring-a in the input-string
- with substring-b
-
-Documentation
-==================================================================
-Name
-
-dblink -- Returns a resource id for a data set from a remote database
-
-Synopsis
-
-dblink(text connstr, text sql)
-
-Inputs
-
- connstr
-
- standard libpq format connection srting,
- e.g. "hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd"
-
- sql
-
- sql statement that you wish to execute on the remote host
- e.g. "select * from pg_class"
-
-Outputs
-
- Returns setof int (res_id)
-
-Example usage
-
- select dblink('hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd'
- ,'select f1, f2 from mytable');
-
-
-==================================================================
-
-Name
-
-dblink_tok -- Returns individual select field results from a dblink remote query
-
-Synopsis
-
-dblink_tok(int res_id, int fnumber)
-
-Inputs
-
- res_id
-
- a resource id returned by a call to dblink()
-
- fnumber
-
- the ordinal position (zero based) of the field to be returned from the dblink result set
-
-Outputs
-
- Returns text
-
-Example usage
-
- select dblink_tok(t1.dblink_p,0) as f1, dblink_tok(t1.dblink_p,1) as f2
- from (select dblink('hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd'
- ,'select f1, f2 from mytable') as dblink_p) as t1;
-
-
-==================================================================
-
-A more convenient way to use dblink may be to create a view:
-
- create view myremotetable as
- select dblink_tok(t1.dblink_p,0) as f1, dblink_tok(t1.dblink_p,1) as f2
- from (select dblink('hostaddr=127.0.0.1 port=5432 dbname=template1 user=postgres password=postgres'
- ,'select proname, prosrc from pg_proc') as dblink_p) as t1;
-
-Then you can simply write:
-
- select f1, f2 from myremotetable where f1 like 'bytea%';
-
-==================================================================
-Name
-
-dblink_strtok -- Extracts and returns individual token from delimited text
-
-Synopsis
-
-dblink_strtok(text inputstring, text delimiter, int posn) RETURNS text
-
-Inputs
-
- inputstring
-
- any string you want to parse a token out of;
- e.g. 'f=1&g=3&h=4'
-
- delimiter
-
- a single character to use as the delimiter;
- e.g. '&' or '='
-
- posn
-
- the position of the token of interest, 0 based;
- e.g. 1
-
-Outputs
-
- Returns text
-
-Example usage
-
-test=# select dblink_strtok(dblink_strtok('f=1&g=3&h=4','&',1),'=',1);
- dblink_strtok
----------------
- 3
-(1 row)
-
-==================================================================
-Name
-
-dblink_get_pkey -- returns the field names of a relation's primary
- key fields
-
-Synopsis
-
-dblink_get_pkey(text relname) RETURNS setof text
-
-Inputs
-
- relname
-
- any relation name;
- e.g. 'foobar'
-
-Outputs
-
- Returns setof text -- one row for each primary key field, in order of
- precedence
-Example usage
-
-test=# select dblink_get_pkey('foobar');
- dblink_get_pkey
------------------
- f1
- f2
- f3
- f4
- f5
-(5 rows)
-
-
-==================================================================
-Name
-
-dblink_last_oid -- Returns last inserted oid
-
-Synopsis
-
-dblink_last_oid(int res_id) RETURNS oid
-
-Inputs
-
- res_id
-
- any resource id returned by dblink function;
-
-Outputs
-
- Returns oid of last inserted tuple
-
-Example usage
-
-test=# select dblink_last_oid(dblink('hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd'
- ,'insert into mytable (f1, f2) values (1,2)'));
-
- dblink_last_oid
-----------------
- 16553
-(1 row)
-
-
-==================================================================
-Name
-
-dblink_build_sql_insert -- builds an insert statement using a local
- tuple, replacing the selection key field
- values with alternate supplied values
-dblink_build_sql_delete -- builds a delete statement using supplied
- values for selection key field values
-dblink_build_sql_update -- builds an update statement using a local
- tuple, replacing the selection key field
- values with alternate supplied values
-
-
-Synopsis
-
-dblink_build_sql_insert(text relname
- ,int2vector primary_key_attnums
- ,int2 num_primary_key_atts
- ,_text src_pk_att_vals_array
- ,_text tgt_pk_att_vals_array) RETURNS text
-dblink_build_sql_delete(text relname
- ,int2vector primary_key_attnums
- ,int2 num_primary_key_atts
- ,_text tgt_pk_att_vals_array) RETURNS text
-dblink_build_sql_update(text relname
- ,int2vector primary_key_attnums
- ,int2 num_primary_key_atts
- ,_text src_pk_att_vals_array
- ,_text tgt_pk_att_vals_array) RETURNS text
-
-Inputs
-
- relname
-
- any relation name;
- e.g. 'foobar'
-
- primary_key_attnums
-
- vector of primary key attnums (1 based, see pg_index.indkey);
- e.g. '1 2'
-
- num_primary_key_atts
-
- number of primary key attnums in the vector; e.g. 2
-
- src_pk_att_vals_array
-
- array of primary key values, used to look up the local matching
- tuple, the values of which are then used to construct the SQL
- statement
-
- tgt_pk_att_vals_array
-
- array of primary key values, used to replace the local tuple
- values in the SQL statement
-
-Outputs
-
- Returns text -- requested SQL statement
-
-Example usage
-
-test=# select dblink_build_sql_insert('foo','1 2',2,'{"1", "a"}','{"1", "b''a"}');
- dblink_build_sql_insert
---------------------------------------------------
- INSERT INTO foo(f1,f2,f3) VALUES('1','b''a','1')
-(1 row)
-
-test=# select dblink_build_sql_delete('MyFoo','1 2',2,'{"1", "b"}');
- dblink_build_sql_delete
----------------------------------------------
- DELETE FROM "MyFoo" WHERE f1='1' AND f2='b'
-(1 row)
-
-test=# select dblink_build_sql_update('foo','1 2',2,'{"1", "a"}','{"1", "b"}');
- dblink_build_sql_update
--------------------------------------------------------------
- UPDATE foo SET f1='1',f2='b',f3='1' WHERE f1='1' AND f2='b'
-(1 row)
-
-
-==================================================================
-Name
-
-dblink_current_query -- returns the current query string
-
-Synopsis
-
-dblink_current_query () RETURNS text
-
-Inputs
-
- None
-
-Outputs
-
- Returns text -- a copy of the currently executing query
-
-Example usage
-
-test=# select dblink_current_query() from (select dblink('dbname=template1','select oid, proname from pg_proc where proname = ''byteacat''') as f1) as t1;
- dblink_current_query
------------------------------------------------------------------------------------------------------------------------------------------------------
- select dblink_current_query() from (select dblink('dbname=template1','select oid, proname from pg_proc where proname = ''byteacat''') as f1) as t1;
-(1 row)
-
-
-==================================================================
-Name
-
-dblink_replace -- replace all occurences of substring-a in the
- input-string with substring-b
-
-Synopsis
-
-dblink_replace(text input-string, text substring-a, text substring-b) RETURNS text
-
-Inputs
-
- input-string
-
- the starting string, before replacement of substring-a
-
- substring-a
-
- the substring to find and replace
-
- substring-b
-
- the substring to be substituted in place of substring-a
-
-Outputs
-
- Returns text -- a copy of the starting string, but with all occurences of
- substring-a replaced with substring-b
+ Not installed by default
+ deprecated
+ ------------
+ dblink(text,text) RETURNS setof int
+ - *DEPRECATED* returns a resource id for results from remote query
+ (Note: must uncomment in dblink.sql to use)
+ dblink_tok(int,int) RETURNS text
+ - *DEPRECATED* extracts and returns individual field results; used
+ only in conjunction with the *DEPRECATED* form of dblink
+ (Note: must uncomment in dblink.sql to use)
+ dblink_last_oid(int) RETURNS oid
+ - *DEPRECATED* returns the last inserted oid
-Example usage
+Documentation:
-test=# select dblink_replace('12345678901234567890','56','hello');
- dblink_replace
-----------------------------
- 1234hello78901234hello7890
-(1 row)
+ See the following files:
+ doc/connection
+ doc/cursor
+ doc/query
+ doc/execute
+ doc/misc
+ doc/deprecated
==================================================================
-
-
-- Joe Conway
*
* Functions returning results from a remote database
*
- * Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002,
+ * Joe Conway <mail@joeconway.com>
+ *
+ * Copyright (c) 2001, 2002 by PostgreSQL Global Development Group
* ALL RIGHTS RESERVED;
*
* Permission to use, copy, modify, and distribute this software and its
*
*/
-#include "dblink.h"
+#include <string.h>
+#include "postgres.h"
+#include "libpq-fe.h"
+#include "libpq-int.h"
+#include "fmgr.h"
+#include "funcapi.h"
+#include "access/tupdesc.h"
+#include "access/heapam.h"
+#include "catalog/catname.h"
+#include "catalog/namespace.h"
+#include "catalog/pg_index.h"
+#include "catalog/pg_type.h"
+#include "executor/executor.h"
+#include "executor/spi.h"
+#include "lib/stringinfo.h"
+#include "nodes/nodes.h"
+#include "nodes/execnodes.h"
+#include "nodes/pg_list.h"
+#include "parser/parse_type.h"
+#include "tcop/tcopprot.h"
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
+#include "utils/array.h"
+#include "utils/lsyscache.h"
+#include "utils/syscache.h"
+#include "dblink.h"
/*
* Internal declarations
*/
static dblink_results *init_dblink_results(MemoryContext fn_mcxt);
-static dblink_array_results *init_dblink_array_results(MemoryContext fn_mcxt);
static char **get_pkey_attnames(Oid relid, int16 *numatts);
-static char *get_strtok(char *fldtext, char *fldsep, int fldnum);
static char *get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
static char *get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals);
static char *get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
static int16 get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key);
static HeapTuple get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals);
static Oid get_relid_from_relname(text *relname_text);
-static dblink_results *get_res_ptr(int32 res_id_index);
+static dblink_results *get_res_ptr(int32 res_id_index);
static void append_res_ptr(dblink_results *results);
static void remove_res_ptr(dblink_results *results);
+static TupleDesc pgresultGetTupleDesc(PGresult *res);
/* Global */
-List *res_id = NIL;
-int res_id_index = 0;
+List *res_id = NIL;
+int res_id_index = 0;
+PGconn *persistent_conn = NULL;
+
+#define GET_TEXT(cstrp) DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(cstrp)))
+#define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp)))
+#define xpfree(var_) \
+ do { \
+ if (var_ != NULL) \
+ { \
+ pfree(var_); \
+ var_ = NULL; \
+ } \
+ } while (0)
+
+
+/*
+ * Create a persistent connection to another database
+ */
+PG_FUNCTION_INFO_V1(dblink_connect);
+Datum
+dblink_connect(PG_FUNCTION_ARGS)
+{
+ char *connstr = GET_STR(PG_GETARG_TEXT_P(0));
+ char *msg;
+ text *result_text;
+ MemoryContext oldcontext;
+
+ if (persistent_conn != NULL)
+ PQfinish(persistent_conn);
+
+ oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+ persistent_conn = PQconnectdb(connstr);
+ MemoryContextSwitchTo(oldcontext);
+
+ if (PQstatus(persistent_conn) == CONNECTION_BAD)
+ {
+ msg = pstrdup(PQerrorMessage(persistent_conn));
+ PQfinish(persistent_conn);
+ persistent_conn = NULL;
+ elog(ERROR, "dblink_connect: connection error: %s", msg);
+ }
+
+ result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK")));
+ PG_RETURN_TEXT_P(result_text);
+}
+
+/*
+ * Clear a persistent connection to another database
+ */
+PG_FUNCTION_INFO_V1(dblink_disconnect);
+Datum
+dblink_disconnect(PG_FUNCTION_ARGS)
+{
+ text *result_text;
+
+ if (persistent_conn != NULL)
+ PQfinish(persistent_conn);
+
+ persistent_conn = NULL;
+
+ result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK")));
+ PG_RETURN_TEXT_P(result_text);
+}
+
+/*
+ * opens a cursor using a persistent connection
+ */
+PG_FUNCTION_INFO_V1(dblink_open);
+Datum
+dblink_open(PG_FUNCTION_ARGS)
+{
+ char *msg;
+ PGresult *res = NULL;
+ PGconn *conn = NULL;
+ text *result_text;
+ char *curname = GET_STR(PG_GETARG_TEXT_P(0));
+ char *sql = GET_STR(PG_GETARG_TEXT_P(1));
+ StringInfo str = makeStringInfo();
+
+ if (persistent_conn != NULL)
+ conn = persistent_conn;
+ else
+ elog(ERROR, "dblink_open: no connection available");
+
+ res = PQexec(conn, "BEGIN");
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ msg = pstrdup(PQerrorMessage(conn));
+ PQclear(res);
+
+ PQfinish(conn);
+ persistent_conn = NULL;
+
+ elog(ERROR, "dblink_open: begin error: %s", msg);
+ }
+ PQclear(res);
+
+ appendStringInfo(str, "DECLARE %s CURSOR FOR %s", quote_ident_cstr(curname), sql);
+ res = PQexec(conn, str->data);
+ if (!res ||
+ (PQresultStatus(res) != PGRES_COMMAND_OK &&
+ PQresultStatus(res) != PGRES_TUPLES_OK))
+ {
+ msg = pstrdup(PQerrorMessage(conn));
+
+ PQclear(res);
+
+ PQfinish(conn);
+ persistent_conn = NULL;
+
+ elog(ERROR, "dblink: sql error: %s", msg);
+ }
+
+ result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK")));
+ PG_RETURN_TEXT_P(result_text);
+}
+/*
+ * closes a cursor
+ */
+PG_FUNCTION_INFO_V1(dblink_close);
+Datum
+dblink_close(PG_FUNCTION_ARGS)
+{
+ PGconn *conn = NULL;
+ PGresult *res = NULL;
+ char *curname = GET_STR(PG_GETARG_TEXT_P(0));
+ StringInfo str = makeStringInfo();
+ text *result_text;
+ char *msg;
+
+ if (persistent_conn != NULL)
+ conn = persistent_conn;
+ else
+ elog(ERROR, "dblink_close: no connection available");
+
+ appendStringInfo(str, "CLOSE %s", quote_ident_cstr(curname));
+
+ /* close the cursor */
+ res = PQexec(conn, str->data);
+ if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ msg = pstrdup(PQerrorMessage(conn));
+ PQclear(res);
+
+ PQfinish(persistent_conn);
+ persistent_conn = NULL;
+
+ elog(ERROR, "dblink_close: sql error: %s", msg);
+ }
+
+ PQclear(res);
+
+ /* commit the transaction */
+ res = PQexec(conn, "COMMIT");
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ msg = pstrdup(PQerrorMessage(conn));
+ PQclear(res);
+
+ PQfinish(persistent_conn);
+ persistent_conn = NULL;
+
+ elog(ERROR, "dblink_close: commit error: %s", msg);
+ }
+ PQclear(res);
+
+ result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK")));
+ PG_RETURN_TEXT_P(result_text);
+}
+
+/*
+ * Fetch results from an open cursor
+ */
+PG_FUNCTION_INFO_V1(dblink_fetch);
+Datum
+dblink_fetch(PG_FUNCTION_ARGS)
+{
+ FuncCallContext *funcctx;
+ TupleDesc tupdesc = NULL;
+ int call_cntr;
+ int max_calls;
+ TupleTableSlot *slot;
+ AttInMetadata *attinmeta;
+ char *msg;
+ PGresult *res = NULL;
+ MemoryContext oldcontext;
+
+ /* stuff done only on the first call of the function */
+ if(SRF_IS_FIRSTCALL())
+ {
+ Oid functypeid;
+ char functyptype;
+ Oid funcid = fcinfo->flinfo->fn_oid;
+ PGconn *conn = NULL;
+ StringInfo str = makeStringInfo();
+ char *curname = GET_STR(PG_GETARG_TEXT_P(0));
+ int howmany = PG_GETARG_INT32(1);
+
+ /* create a function context for cross-call persistence */
+ funcctx = SRF_FIRSTCALL_INIT();
+
+ /* switch to memory context appropriate for multiple function calls */
+ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+ if (persistent_conn != NULL)
+ conn = persistent_conn;
+ else
+ elog(ERROR, "dblink_fetch: no connection available");
+
+ appendStringInfo(str, "FETCH %d FROM %s", howmany, quote_ident_cstr(curname));
+
+ res = PQexec(conn, str->data);
+ if (!res ||
+ (PQresultStatus(res) != PGRES_COMMAND_OK &&
+ PQresultStatus(res) != PGRES_TUPLES_OK))
+ {
+ msg = pstrdup(PQerrorMessage(conn));
+ PQclear(res);
+
+ PQfinish(persistent_conn);
+ persistent_conn = NULL;
+
+ elog(ERROR, "dblink_fetch: sql error: %s", msg);
+ }
+ else if (PQresultStatus(res) == PGRES_COMMAND_OK)
+ {
+ /* cursor does not exist - closed already or bad name */
+ PQclear(res);
+ elog(ERROR, "dblink_fetch: cursor %s does not exist", quote_ident_cstr(curname));
+ }
+
+ funcctx->max_calls = PQntuples(res);
+
+ /* got results, keep track of them */
+ funcctx->user_fctx = res;
+
+ /* fast track when no results */
+ if (funcctx->max_calls < 1)
+ SRF_RETURN_DONE(funcctx);
+
+ /* check typtype to see if we have a predetermined return type */
+ functypeid = get_func_rettype(funcid);
+ functyptype = get_typtype(functypeid);
+
+ if (functyptype == 'c')
+ tupdesc = TypeGetTupleDesc(functypeid, NIL);
+ else if (functyptype == 'p' && functypeid == RECORDOID)
+ tupdesc = pgresultGetTupleDesc(res);
+ else if (functyptype == 'b')
+ elog(ERROR, "dblink_fetch: invalid kind of return type specified for function");
+ else
+ elog(ERROR, "dblink_fetch: unknown kind of return type specified for function");
+
+ /* store needed metadata for subsequent calls */
+ slot = TupleDescGetSlot(tupdesc);
+ funcctx->slot = slot;
+ attinmeta = TupleDescGetAttInMetadata(tupdesc);
+ funcctx->attinmeta = attinmeta;
+
+ MemoryContextSwitchTo(oldcontext);
+ }
+
+ /* stuff done on every call of the function */
+ funcctx = SRF_PERCALL_SETUP();
+
+ /*
+ * initialize per-call variables
+ */
+ call_cntr = funcctx->call_cntr;
+ max_calls = funcctx->max_calls;
+
+ slot = funcctx->slot;
+
+ res = (PGresult *) funcctx->user_fctx;
+ attinmeta = funcctx->attinmeta;
+ tupdesc = attinmeta->tupdesc;
+
+ if (call_cntr < max_calls) /* do when there is more left to send */
+ {
+ char **values;
+ HeapTuple tuple;
+ Datum result;
+ int i;
+ int nfields = PQnfields(res);
+
+ values = (char **) palloc(nfields * sizeof(char *));
+ for (i = 0; i < nfields; i++)
+ {
+ if (PQgetisnull(res, call_cntr, i) == 0)
+ values[i] = PQgetvalue(res, call_cntr, i);
+ else
+ values[i] = NULL;
+ }
+
+ /* build the tuple */
+ tuple = BuildTupleFromCStrings(attinmeta, values);
+
+ /* make the tuple into a datum */
+ result = TupleGetDatum(slot, tuple);
+
+ SRF_RETURN_NEXT(funcctx, result);
+ }
+ else /* do when there is no more left */
+ {
+ PQclear(res);
+ SRF_RETURN_DONE(funcctx);
+ }
+}
+
+/*
+ * Note: this is the new preferred version of dblink
+ */
+PG_FUNCTION_INFO_V1(dblink_record);
+Datum
+dblink_record(PG_FUNCTION_ARGS)
+{
+ FuncCallContext *funcctx;
+ TupleDesc tupdesc = NULL;
+ int call_cntr;
+ int max_calls;
+ TupleTableSlot *slot;
+ AttInMetadata *attinmeta;
+ char *msg;
+ PGresult *res = NULL;
+ bool is_sql_cmd = false;
+ char *sql_cmd_status = NULL;
+ MemoryContext oldcontext;
+
+ /* stuff done only on the first call of the function */
+ if(SRF_IS_FIRSTCALL())
+ {
+ Oid functypeid;
+ char functyptype;
+ Oid funcid = fcinfo->flinfo->fn_oid;
+ PGconn *conn = NULL;
+ char *connstr = NULL;
+ char *sql = NULL;
+
+ /* create a function context for cross-call persistence */
+ funcctx = SRF_FIRSTCALL_INIT();
+
+ /* switch to memory context appropriate for multiple function calls */
+ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+ if (fcinfo->nargs == 2)
+ {
+ connstr = GET_STR(PG_GETARG_TEXT_P(0));
+ sql = GET_STR(PG_GETARG_TEXT_P(1));
+
+ conn = PQconnectdb(connstr);
+ if (PQstatus(conn) == CONNECTION_BAD)
+ {
+ msg = pstrdup(PQerrorMessage(conn));
+ PQfinish(conn);
+ elog(ERROR, "dblink: connection error: %s", msg);
+ }
+ }
+ else if (fcinfo->nargs == 1)
+ {
+ sql = GET_STR(PG_GETARG_TEXT_P(0));
+
+ if (persistent_conn != NULL)
+ conn = persistent_conn;
+ else
+ elog(ERROR, "dblink: no connection available");
+ }
+ else
+ elog(ERROR, "dblink: wrong number of arguments");
+
+ res = PQexec(conn, sql);
+ if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK))
+ {
+ msg = pstrdup(PQerrorMessage(conn));
+ PQclear(res);
+ PQfinish(conn);
+ if (fcinfo->nargs == 1)
+ persistent_conn = NULL;
+
+ elog(ERROR, "dblink: sql error: %s", msg);
+ }
+ else
+ {
+ if (PQresultStatus(res) == PGRES_COMMAND_OK)
+ {
+ is_sql_cmd = true;
+
+ /* need a tuple descriptor representing one TEXT column */
+ tupdesc = CreateTemplateTupleDesc(1, WITHOUTOID);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
+ TEXTOID, -1, 0, false);
+
+ /*
+ * and save a copy of the command status string to return
+ * as our result tuple
+ */
+ sql_cmd_status = PQcmdStatus(res);
+ funcctx->max_calls = 1;
+ }
+ else
+ funcctx->max_calls = PQntuples(res);
+
+ /* got results, keep track of them */
+ funcctx->user_fctx = res;
+
+ /* if needed, close the connection to the database and cleanup */
+ if (fcinfo->nargs == 2)
+ PQfinish(conn);
+ }
+
+ /* fast track when no results */
+ if (funcctx->max_calls < 1)
+ SRF_RETURN_DONE(funcctx);
+
+ /* check typtype to see if we have a predetermined return type */
+ functypeid = get_func_rettype(funcid);
+ functyptype = get_typtype(functypeid);
+
+ if (!is_sql_cmd)
+ {
+ if (functyptype == 'c')
+ tupdesc = TypeGetTupleDesc(functypeid, NIL);
+ else if (functyptype == 'p' && functypeid == RECORDOID)
+ tupdesc = pgresultGetTupleDesc(res);
+ else if (functyptype == 'b')
+ elog(ERROR, "Invalid kind of return type specified for function");
+ else
+ elog(ERROR, "Unknown kind of return type specified for function");
+ }
+
+ /* store needed metadata for subsequent calls */
+ slot = TupleDescGetSlot(tupdesc);
+ funcctx->slot = slot;
+ attinmeta = TupleDescGetAttInMetadata(tupdesc);
+ funcctx->attinmeta = attinmeta;
+
+ MemoryContextSwitchTo(oldcontext);
+ }
+
+ /* stuff done on every call of the function */
+ funcctx = SRF_PERCALL_SETUP();
+
+ /*
+ * initialize per-call variables
+ */
+ call_cntr = funcctx->call_cntr;
+ max_calls = funcctx->max_calls;
+
+ slot = funcctx->slot;
+
+ res = (PGresult *) funcctx->user_fctx;
+ attinmeta = funcctx->attinmeta;
+ tupdesc = attinmeta->tupdesc;
+
+ if (call_cntr < max_calls) /* do when there is more left to send */
+ {
+ char **values;
+ HeapTuple tuple;
+ Datum result;
+
+ if (!is_sql_cmd)
+ {
+ int i;
+ int nfields = PQnfields(res);
+
+ values = (char **) palloc(nfields * sizeof(char *));
+ for (i = 0; i < nfields; i++)
+ {
+ if (PQgetisnull(res, call_cntr, i) == 0)
+ values[i] = PQgetvalue(res, call_cntr, i);
+ else
+ values[i] = NULL;
+ }
+ }
+ else
+ {
+ values = (char **) palloc(1 * sizeof(char *));
+ values[0] = sql_cmd_status;
+ }
+
+ /* build the tuple */
+ tuple = BuildTupleFromCStrings(attinmeta, values);
+
+ /* make the tuple into a datum */
+ result = TupleGetDatum(slot, tuple);
+
+ SRF_RETURN_NEXT(funcctx, result);
+ }
+ else /* do when there is no more left */
+ {
+ PQclear(res);
+ SRF_RETURN_DONE(funcctx);
+ }
+}
+
+/*
+ * Execute an SQL non-SELECT command
+ */
+PG_FUNCTION_INFO_V1(dblink_exec);
+Datum
+dblink_exec(PG_FUNCTION_ARGS)
+{
+ char *msg;
+ PGresult *res = NULL;
+ char *sql_cmd_status = NULL;
+ TupleDesc tupdesc = NULL;
+ text *result_text;
+ PGconn *conn = NULL;
+ char *connstr = NULL;
+ char *sql = NULL;
+
+ if (fcinfo->nargs == 2)
+ {
+ connstr = GET_STR(PG_GETARG_TEXT_P(0));
+ sql = GET_STR(PG_GETARG_TEXT_P(1));
+
+ conn = PQconnectdb(connstr);
+ if (PQstatus(conn) == CONNECTION_BAD)
+ {
+ msg = pstrdup(PQerrorMessage(conn));
+ PQfinish(conn);
+ elog(ERROR, "dblink_exec: connection error: %s", msg);
+ }
+ }
+ else if (fcinfo->nargs == 1)
+ {
+ sql = GET_STR(PG_GETARG_TEXT_P(0));
+
+ if (persistent_conn != NULL)
+ conn = persistent_conn;
+ else
+ elog(ERROR, "dblink_exec: no connection available");
+ }
+ else
+ elog(ERROR, "dblink_exec: wrong number of arguments");
+
+
+ res = PQexec(conn, sql);
+ if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK))
+ {
+ msg = pstrdup(PQerrorMessage(conn));
+ PQclear(res);
+ PQfinish(conn);
+ if (fcinfo->nargs == 1)
+ persistent_conn = NULL;
+
+ elog(ERROR, "dblink_exec: sql error: %s", msg);
+ }
+ else
+ {
+ if (PQresultStatus(res) == PGRES_COMMAND_OK)
+ {
+ /* need a tuple descriptor representing one TEXT column */
+ tupdesc = CreateTemplateTupleDesc(1, WITHOUTOID);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
+ TEXTOID, -1, 0, false);
+
+ /*
+ * and save a copy of the command status string to return
+ * as our result tuple
+ */
+ sql_cmd_status = PQcmdStatus(res);
+ }
+ else
+ elog(ERROR, "dblink_exec: queries returning results not allowed");
+ }
+ PQclear(res);
+
+ /* if needed, close the connection to the database and cleanup */
+ if (fcinfo->nargs == 2)
+ PQfinish(conn);
+
+ result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql_cmd_status)));
+ PG_RETURN_TEXT_P(result_text);
+}
+
+/*
+ * Note: this original version of dblink is DEPRECATED;
+ * it *will* be removed in favor of the new version on next release
+ */
PG_FUNCTION_INFO_V1(dblink);
Datum
dblink(PG_FUNCTION_ARGS)
PG_RETURN_NULL();
}
-
/*
+ * Note: dblink_tok is DEPRECATED;
+ * it *will* be removed in favor of the new version on next release
+ *
* dblink_tok
* parse dblink output string
* return fldnum item (0 based)
* based on provided field separator
*/
-
PG_FUNCTION_INFO_V1(dblink_tok);
Datum
dblink_tok(PG_FUNCTION_ARGS)
}
}
-
-/*
- * dblink_strtok
- * parse input string
- * return ord item (0 based)
- * based on provided field separator
- */
-PG_FUNCTION_INFO_V1(dblink_strtok);
-Datum
-dblink_strtok(PG_FUNCTION_ARGS)
-{
- char *fldtext;
- char *fldsep;
- int fldnum;
- char *buffer;
- text *result_text;
-
- fldtext = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0))));
- fldsep = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1))));
- fldnum = PG_GETARG_INT32(2);
-
- if (fldtext[0] == '\0')
- {
- elog(ERROR, "get_strtok: blank list not permitted");
- }
- if (fldsep[0] == '\0')
- {
- elog(ERROR, "get_strtok: blank field separator not permitted");
- }
-
- buffer = get_strtok(fldtext, fldsep, fldnum);
-
- pfree(fldtext);
- pfree(fldsep);
-
- if (buffer == NULL)
- {
- PG_RETURN_NULL();
- }
- else
- {
- result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(buffer)));
- pfree(buffer);
-
- PG_RETURN_TEXT_P(result_text);
- }
-}
-
-
/*
* dblink_get_pkey
*
- * Return comma delimited list of primary key
- * fields for the supplied relation,
+ * Return list of primary key fields for the supplied relation,
* or NULL if none exists.
*/
PG_FUNCTION_INFO_V1(dblink_get_pkey);
Datum
dblink_get_pkey(PG_FUNCTION_ARGS)
{
- text *relname_text;
- Oid relid;
- char **result;
- text *result_text;
- int16 numatts;
- ReturnSetInfo *rsi;
- dblink_array_results *ret_set;
+ int16 numatts;
+ Oid relid;
+ char **results;
+ FuncCallContext *funcctx;
+ int32 call_cntr;
+ int32 max_calls;
+ TupleTableSlot *slot;
+ AttInMetadata *attinmeta;
+ MemoryContext oldcontext;
+
+ /* stuff done only on the first call of the function */
+ if(SRF_IS_FIRSTCALL())
+ {
+ TupleDesc tupdesc = NULL;
+
+ /* create a function context for cross-call persistence */
+ funcctx = SRF_FIRSTCALL_INIT();
+
+ /* switch to memory context appropriate for multiple function calls */
+ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+ /* convert relname to rel Oid */
+ relid = get_relid_from_relname(PG_GETARG_TEXT_P(0));
+ if (!OidIsValid(relid))
+ elog(ERROR, "dblink_get_pkey: relation does not exist");
- if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo))
- elog(ERROR, "dblink: function called in context that does not accept a set result");
+ /* need a tuple descriptor representing one INT and one TEXT column */
+ tupdesc = CreateTemplateTupleDesc(2, WITHOUTOID);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
+ INT4OID, -1, 0, false);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
+ TEXTOID, -1, 0, false);
- if (fcinfo->flinfo->fn_extra == NULL)
- {
- relname_text = PG_GETARG_TEXT_P(0);
+ /* allocate a slot for a tuple with this tupdesc */
+ slot = TupleDescGetSlot(tupdesc);
- /*
- * Convert relname to rel OID.
- */
- relid = get_relid_from_relname(relname_text);
- if (!OidIsValid(relid))
- elog(ERROR, "dblink_get_pkey: relation does not exist");
+ /* assign slot to function context */
+ funcctx->slot = slot;
/*
- * get an array of attnums.
+ * Generate attribute metadata needed later to produce tuples from raw
+ * C strings
*/
- result = get_pkey_attnames(relid, &numatts);
+ attinmeta = TupleDescGetAttInMetadata(tupdesc);
+ funcctx->attinmeta = attinmeta;
+
+ /* get an array of attnums */
+ results = get_pkey_attnames(relid, &numatts);
- if ((result != NULL) && (numatts > 0))
+ if ((results != NULL) && (numatts > 0))
{
- ret_set = init_dblink_array_results(fcinfo->flinfo->fn_mcxt);
+ funcctx->max_calls = numatts;
- ret_set->elem_num = 0;
- ret_set->num_elems = numatts;
- ret_set->res = result;
+ /* got results, keep track of them */
+ funcctx->user_fctx = results;
+ }
+ else /* fast track when no results */
+ SRF_RETURN_DONE(funcctx);
- fcinfo->flinfo->fn_extra = (void *) ret_set;
+ MemoryContextSwitchTo(oldcontext);
+ }
- rsi = (ReturnSetInfo *) fcinfo->resultinfo;
- rsi->isDone = ExprMultipleResult;
+ /* stuff done on every call of the function */
+ funcctx = SRF_PERCALL_SETUP();
- result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num])));
+ /*
+ * initialize per-call variables
+ */
+ call_cntr = funcctx->call_cntr;
+ max_calls = funcctx->max_calls;
- PG_RETURN_TEXT_P(result_text);
- }
- else
- {
- rsi = (ReturnSetInfo *) fcinfo->resultinfo;
- rsi->isDone = ExprEndResult;
+ slot = funcctx->slot;
- PG_RETURN_NULL();
- }
- }
- else
- {
- /*
- * check for more results
- */
- ret_set = fcinfo->flinfo->fn_extra;
- ret_set->elem_num++;
- result = ret_set->res;
+ results = (char **) funcctx->user_fctx;
+ attinmeta = funcctx->attinmeta;
- if (ret_set->elem_num < ret_set->num_elems)
- {
- /*
- * fetch next one
- */
- rsi = (ReturnSetInfo *) fcinfo->resultinfo;
- rsi->isDone = ExprMultipleResult;
+ if (call_cntr < max_calls) /* do when there is more left to send */
+ {
+ char **values;
+ HeapTuple tuple;
+ Datum result;
- result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num])));
- PG_RETURN_TEXT_P(result_text);
- }
- else
- {
- int i;
+ values = (char **) palloc(2 * sizeof(char *));
+ values[0] = (char *) palloc(12); /* sign, 10 digits, '\0' */
- /*
- * or if no more, clean things up
- */
- for (i = 0; i < ret_set->num_elems; i++)
- pfree(result[i]);
+ sprintf(values[0], "%d", call_cntr + 1);
- pfree(ret_set->res);
- pfree(ret_set);
+ values[1] = results[call_cntr];
- rsi = (ReturnSetInfo *) fcinfo->resultinfo;
- rsi->isDone = ExprEndResult;
+ /* build the tuple */
+ tuple = BuildTupleFromCStrings(attinmeta, values);
- PG_RETURN_NULL();
- }
+ /* make the tuple into a datum */
+ result = TupleGetDatum(slot, tuple);
+
+ SRF_RETURN_NEXT(funcctx, result);
}
- PG_RETURN_NULL();
+ else /* do when there is no more left */
+ SRF_RETURN_DONE(funcctx);
}
-
/*
+ * Note: dblink_last_oid is DEPRECATED;
+ * it *will* be removed on next release
+ *
* dblink_last_oid
* return last inserted oid
*/
dblink_build_sql_insert(PG_FUNCTION_ARGS)
{
Oid relid;
- text *relname_text;
- int16 *pkattnums;
+ text *relname_text;
+ int16 *pkattnums;
int16 pknumatts;
- char **src_pkattvals;
- char **tgt_pkattvals;
- ArrayType *src_pkattvals_arry;
- ArrayType *tgt_pkattvals_arry;
+ char **src_pkattvals;
+ char **tgt_pkattvals;
+ ArrayType *src_pkattvals_arry;
+ ArrayType *tgt_pkattvals_arry;
int src_ndim;
- int *src_dim;
+ int *src_dim;
int src_nitems;
int tgt_ndim;
int *tgt_dim;
int tgt_nitems;
int i;
- char *ptr;
- char *sql;
- text *sql_text;
+ char *ptr;
+ char *sql;
+ text *sql_text;
+ int16 typlen;
+ bool typbyval;
+ char typalign;
relname_text = PG_GETARG_TEXT_P(0);
* get array of pointers to c-strings from the input source array
*/
Assert(ARR_ELEMTYPE(src_pkattvals_arry) == TEXTOID);
+ get_typlenbyvalalign(ARR_ELEMTYPE(src_pkattvals_arry),
+ &typlen, &typbyval, &typalign);
+
src_pkattvals = (char **) palloc(src_nitems * sizeof(char *));
ptr = ARR_DATA_PTR(src_pkattvals_arry);
for (i = 0; i < src_nitems; i++)
{
src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
- ptr += INTALIGN(*(int32 *) ptr);
+ ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr));
+ ptr = (char *) att_align(ptr, typalign);
}
/*
* get array of pointers to c-strings from the input target array
*/
Assert(ARR_ELEMTYPE(tgt_pkattvals_arry) == TEXTOID);
+ get_typlenbyvalalign(ARR_ELEMTYPE(tgt_pkattvals_arry),
+ &typlen, &typbyval, &typalign);
+
tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
for (i = 0; i < tgt_nitems; i++)
{
tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
- ptr += INTALIGN(*(int32 *) ptr);
+ ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr));
+ ptr = (char *) att_align(ptr, typalign);
}
/*
char *ptr;
char *sql;
text *sql_text;
+ int16 typlen;
+ bool typbyval;
+ char typalign;
relname_text = PG_GETARG_TEXT_P(0);
* get array of pointers to c-strings from the input target array
*/
Assert(ARR_ELEMTYPE(tgt_pkattvals_arry) == TEXTOID);
+ get_typlenbyvalalign(ARR_ELEMTYPE(tgt_pkattvals_arry),
+ &typlen, &typbyval, &typalign);
+
tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
for (i = 0; i < tgt_nitems; i++)
{
tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
- ptr += INTALIGN(*(int32 *) ptr);
+ ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr));
+ ptr = (char *) att_align(ptr, typalign);
}
/*
char *ptr;
char *sql;
text *sql_text;
+ int16 typlen;
+ bool typbyval;
+ char typalign;
relname_text = PG_GETARG_TEXT_P(0);
* get array of pointers to c-strings from the input source array
*/
Assert(ARR_ELEMTYPE(src_pkattvals_arry) == TEXTOID);
+ get_typlenbyvalalign(ARR_ELEMTYPE(src_pkattvals_arry),
+ &typlen, &typbyval, &typalign);
+
src_pkattvals = (char **) palloc(src_nitems * sizeof(char *));
ptr = ARR_DATA_PTR(src_pkattvals_arry);
for (i = 0; i < src_nitems; i++)
{
src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
- ptr += INTALIGN(*(int32 *) ptr);
+ ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr));
+ ptr = (char *) att_align(ptr, typalign);
}
/*
* get array of pointers to c-strings from the input target array
*/
Assert(ARR_ELEMTYPE(tgt_pkattvals_arry) == TEXTOID);
+ get_typlenbyvalalign(ARR_ELEMTYPE(tgt_pkattvals_arry),
+ &typlen, &typbyval, &typalign);
+
tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
for (i = 0; i < tgt_nitems; i++)
{
tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
- ptr += INTALIGN(*(int32 *) ptr);
+ ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr));
+ ptr = (char *) att_align(ptr, typalign);
}
/*
PG_RETURN_TEXT_P(sql_text);
}
-
/*
* dblink_current_query
* return the current query string
}
-/*
- * dblink_replace_text
- * replace all occurences of 'old_sub_str' in 'orig_str'
- * with 'new_sub_str' to form 'new_str'
- *
- * returns 'orig_str' if 'old_sub_str' == '' or 'orig_str' == ''
- * otherwise returns 'new_str'
- */
-PG_FUNCTION_INFO_V1(dblink_replace_text);
-Datum
-dblink_replace_text(PG_FUNCTION_ARGS)
-{
- text *left_text;
- text *right_text;
- text *buf_text;
- text *ret_text;
- char *ret_str;
- int curr_posn;
- text *src_text = PG_GETARG_TEXT_P(0);
- int src_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(src_text)));
- text *from_sub_text = PG_GETARG_TEXT_P(1);
- int from_sub_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(from_sub_text)));
- text *to_sub_text = PG_GETARG_TEXT_P(2);
- char *to_sub_str = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(to_sub_text)));
- StringInfo str = makeStringInfo();
-
- if (src_text_len == 0 || from_sub_text_len == 0)
- PG_RETURN_TEXT_P(src_text);
-
- buf_text = DatumGetTextPCopy(PointerGetDatum(src_text));
- curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text)));
-
- while (curr_posn > 0)
- {
- left_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text), 1, DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) - 1));
- right_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text), DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) + from_sub_text_len, -1));
-
- appendStringInfo(str, "%s",
- DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(left_text))));
- appendStringInfo(str, "%s", to_sub_str);
-
- pfree(buf_text);
- pfree(left_text);
- buf_text = right_text;
- curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text)));
- }
-
- appendStringInfo(str, "%s",
- DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(buf_text))));
- pfree(buf_text);
-
- ret_str = pstrdup(str->data);
- ret_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(ret_str)));
-
- PG_RETURN_TEXT_P(ret_text);
-}
-
-
/*************************************************************
* internal functions
*/
return retval;
}
-
-/*
- * init_dblink_array_results
- * - create an empty dblink_array_results data structure
- */
-static dblink_array_results *
-init_dblink_array_results(MemoryContext fn_mcxt)
-{
- MemoryContext oldcontext;
- dblink_array_results *retval;
-
- oldcontext = MemoryContextSwitchTo(fn_mcxt);
-
- retval = (dblink_array_results *) palloc(sizeof(dblink_array_results));
- MemSet(retval, 0, sizeof(dblink_array_results));
-
- retval->elem_num = -1;
- retval->num_elems = 0;
- retval->res = NULL;
-
- MemoryContextSwitchTo(oldcontext);
-
- return retval;
-}
-
/*
* get_pkey_attnames
*
Relation rel;
TupleDesc tupdesc;
- /*
- * Open relation using relid, get tupdesc
- */
+ /* open relation using relid, get tupdesc */
rel = relation_open(relid, AccessShareLock);
tupdesc = rel->rd_att;
- /*
- * Initialize numatts to 0 in case no primary key
- * exists
- */
+ /* initialize numatts to 0 in case no primary key exists */
*numatts = 0;
- /*
- * Use relid to get all related indexes
- */
+ /* use relid to get all related indexes */
indexRelation = heap_openr(IndexRelationName, AccessShareLock);
ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indrelid,
F_OIDEQ, ObjectIdGetDatum(relid));
{
Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
- /*
- * We're only interested if it is the primary key
- */
+ /* we're only interested if it is the primary key */
if (index->indisprimary == TRUE)
{
i = 0;
if (*numatts > 0)
{
result = (char **) palloc(*numatts * sizeof(char *));
+
for (i = 0; i < *numatts; i++)
result[i] = SPI_fname(tupdesc, index->indkey[i]);
}
return result;
}
-
-/*
- * get_strtok
- *
- * parse input string
- * return ord item (0 based)
- * based on provided field separator
- */
-static char *
-get_strtok(char *fldtext, char *fldsep, int fldnum)
-{
- int j = 0;
- char *result;
-
- if (fldnum < 0)
- {
- elog(ERROR, "get_strtok: field number < 0 not permitted");
- }
-
- if (fldsep[0] == '\0')
- {
- elog(ERROR, "get_strtok: blank field separator not permitted");
- }
-
- result = strtok(fldtext, fldsep);
- for (j = 1; j < fldnum + 1; j++)
- {
- result = strtok(NULL, fldsep);
- if (result == NULL)
- return NULL;
- }
-
- return pstrdup(result);
-}
-
static char *
get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
{
natts = tupdesc->natts;
tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
+ if (!tuple)
+ elog(ERROR, "dblink_build_sql_insert: row not found");
appendStringInfo(str, "INSERT INTO %s(", quote_ident_cstr(relname));
natts = tupdesc->natts;
tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
+ if (!tuple)
+ elog(ERROR, "dblink_build_sql_update: row not found");
appendStringInfo(str, "UPDATE %s SET ", quote_ident_cstr(relname));
*/
rel = relation_open(relid, AccessShareLock);
relname = RelationGetRelationName(rel);
- tupdesc = rel->rd_att;
+ tupdesc = CreateTupleDescCopy(rel->rd_att);
+ relation_close(rel, AccessShareLock);
/*
* Connect to SPI manager
static Oid
get_relid_from_relname(text *relname_text)
{
-#ifdef NamespaceRelationName
RangeVar *relvar;
Relation rel;
Oid relid;
rel = heap_openrv(relvar, AccessShareLock);
relid = RelationGetRelid(rel);
relation_close(rel, AccessShareLock);
-#else
- char *relname;
- Relation rel;
- Oid relid;
-
- relname = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(relname_text)));
- rel = relation_openr(relname, AccessShareLock);
- relid = RelationGetRelid(rel);
- relation_close(rel, AccessShareLock);
-#endif /* NamespaceRelationName */
return relid;
}
res_id_index = 0;
}
+static TupleDesc
+pgresultGetTupleDesc(PGresult *res)
+{
+ int natts;
+ AttrNumber attnum;
+ TupleDesc desc;
+ char *attname;
+ int32 atttypmod;
+ int attdim;
+ bool attisset;
+ Oid atttypid;
+ int i;
+
+ /*
+ * allocate a new tuple descriptor
+ */
+ natts = PQnfields(res);
+ if (natts < 1)
+ elog(ERROR, "cannot create a description for empty results");
+
+ desc = CreateTemplateTupleDesc(natts, WITHOUTOID);
+
+ attnum = 0;
+
+ for (i = 0; i < natts; i++)
+ {
+ /*
+ * for each field, get the name and type information from the query
+ * result and have TupleDescInitEntry fill in the attribute
+ * information we need.
+ */
+ attnum++;
+
+ attname = PQfname(res, i);
+ atttypid = PQftype(res, i);
+ atttypmod = PQfmod(res, i);
+
+ if (PQfsize(res, i) != get_typlen(atttypid))
+ elog(ERROR, "Size of remote field \"%s\" does not match size "
+ "of local type \"%s\"",
+ attname,
+ format_type_with_typemod(atttypid, atttypmod));
+
+ attdim = 0;
+ attisset = false;
+
+ TupleDescInitEntry(desc, attnum, attname, atttypid,
+ atttypmod, attdim, attisset);
+ }
+
+ return desc;
+}