From 3bf6b8f06aef274bc38ee965bf8527b482ad015e Mon Sep 17 00:00:00 2001 From: Bruce Momjian Date: Wed, 24 Apr 2002 02:28:28 +0000 Subject: [PATCH] Attached is an update to contrib/dblink. Please apply if there are no objections. Major changes: - removed cursor wrap around input sql to allow for remote execution of INSERT/UPDATE/DELETE - dblink now returns a resource id instead of a real pointer - added several utility functions I'm still hoping to add explicit cursor open/fetch/close support before 7.3 is released, but I need a bit more time on that. On a somewhat unrelated topic, I never got any feedback on the unknownin/out patch and the mb_substring patch. Is there anything else I need to do to get those applied? Joe Conway --- contrib/dblink/README.dblink | 301 +++++++- contrib/dblink/dblink.c | 1323 +++++++++++++++++++++++++++++++--- contrib/dblink/dblink.h | 79 +- contrib/dblink/dblink.sql.in | 41 +- 4 files changed, 1623 insertions(+), 121 deletions(-) diff --git a/contrib/dblink/README.dblink b/contrib/dblink/README.dblink index 0a2f87b0ee..80aeb0cf4a 100644 --- a/contrib/dblink/README.dblink +++ b/contrib/dblink/README.dblink @@ -3,7 +3,8 @@ * * Functions returning results from a remote database * - * Copyright (c) Joseph Conway , 2001; + * Copyright (c) Joseph Conway , 2001, 2002, + * ALL RIGHTS RESERVED; * * Permission to use, copy, modify, and distribute this software and its * documentation for any purpose, without fee, and without a written agreement @@ -25,12 +26,19 @@ */ -Version 0.3 (14 June, 2001): - Function to test returning data set from remote database - Tested under Linux (Red Hat 6.2 and 7.0) and PostgreSQL 7.1 and 7.2devel +Version 0.4 (7 April, 2002): + Functions allowing remote database INSERT/UPDATE/DELETE/SELECT, and + various utility functions. + Tested under Linux (Red Hat 7.2) and PostgreSQL 7.2 and 7.3devel Release Notes: + Version 0.4 + - removed cursor wrap around input sql to allow for remote + execution of INSERT/UPDATE/DELETE + - dblink now returns a resource id instead of a real pointer + - added several utility functions -- see below + Version 0.3 - fixed dblink invalid pointer causing corrupt elog message - fixed dblink_tok improper handling of null results @@ -51,14 +59,36 @@ Installation: installs following functions into database template1: - dblink() - returns a pointer to results from remote query - dblink_tok() - extracts and returns individual field results + dblink(text,text) RETURNS setof int + - returns a resource id for results from remote query + dblink_tok(int,int) RETURNS text + - extracts and returns individual field results + dblink_strtok(text,text,int) RETURNS text + - extracts and returns individual token from delimited text + dblink_get_pkey(name) RETURNS setof text + - returns the field names of a relation's primary key fields + dblink_last_oid(int) RETURNS oid + - returns the last inserted oid + dblink_build_sql_insert(name,int2vector,int2,_text,_text) RETURNS text + - builds an insert statement using a local tuple, replacing the + selection key field values with alternate supplied values + dblink_build_sql_delete(name,int2vector,int2,_text) RETURNS text + - builds a delete statement using supplied values for selection + key field values + dblink_build_sql_update(name,int2vector,int2,_text,_text) RETURNS text + - builds an update statement using a local tuple, replacing the + selection key field values with alternate supplied values + dblink_current_query() RETURNS text + - returns the current query string + dblink_replace(text,text,text) RETURNS text + - replace all occurences of substring-a in the input-string + with substring-b Documentation ================================================================== Name -dblink -- Returns a pointer to a data set from a remote database +dblink -- Returns a resource id for a data set from a remote database Synopsis @@ -78,7 +108,7 @@ Inputs Outputs - Returns setof int (pointer) + Returns setof int (res_id) Example usage @@ -94,13 +124,13 @@ dblink_tok -- Returns individual select field results from a dblink remote query Synopsis -dblink_tok(int pointer, int fnumber) +dblink_tok(int res_id, int fnumber) Inputs - pointer + res_id - a pointer returned by a call to dblink() + a resource id returned by a call to dblink() fnumber @@ -131,6 +161,255 @@ Then you can simply write: select f1, f2 from myremotetable where f1 like 'bytea%'; ================================================================== +Name + +dblink_strtok -- Extracts and returns individual token from delimited text + +Synopsis + +dblink_strtok(text inputstring, text delimiter, int posn) RETURNS text + +Inputs + + inputstring + + any string you want to parse a token out of; + e.g. 'f=1&g=3&h=4' + + delimiter + + a single character to use as the delimiter; + e.g. '&' or '=' + + posn + + the position of the token of interest, 0 based; + e.g. 1 + +Outputs + + Returns text + +Example usage + +test=# select dblink_strtok(dblink_strtok('f=1&g=3&h=4','&',1),'=',1); + dblink_strtok +--------------- + 3 +(1 row) + +================================================================== +Name + +dblink_get_pkey -- returns the field names of a relation's primary + key fields + +Synopsis + +dblink_get_pkey(name relname) RETURNS setof text + +Inputs + + relname + + any relation name; + e.g. 'foobar' + +Outputs + + Returns setof text -- one row for each primary key field, in order of + precedence + +Example usage + +test=# select dblink_get_pkey('foobar'); + dblink_get_pkey +----------------- + f1 + f2 + f3 + f4 + f5 +(5 rows) + + +================================================================== +Name + +dblink_last_oid -- Returns last inserted oid + +Synopsis + +dblink_last_oid(int res_id) RETURNS oid + +Inputs + + res_id + + any resource id returned by dblink function; + +Outputs + + Returns oid of last inserted tuple + +Example usage + +test=# select dblink_last_oid(dblink('hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd' + ,'insert into mytable (f1, f2) values (1,2)')); + + dblink_last_oid +---------------- + 16553 +(1 row) + + +================================================================== +Name + +dblink_build_sql_insert -- builds an insert statement using a local + tuple, replacing the selection key field + values with alternate supplied values +dblink_build_sql_delete -- builds a delete statement using supplied + values for selection key field values +dblink_build_sql_update -- builds an update statement using a local + tuple, replacing the selection key field + values with alternate supplied values + + +Synopsis + +dblink_build_sql_insert(name relname + ,int2vector primary_key_attnums + ,int2 num_primary_key_atts + ,_text src_pk_att_vals_array + ,_text tgt_pk_att_vals_array) RETURNS text +dblink_build_sql_delete(name relname + ,int2vector primary_key_attnums + ,int2 num_primary_key_atts + ,_text tgt_pk_att_vals_array) RETURNS text +dblink_build_sql_update(name relname + ,int2vector primary_key_attnums + ,int2 num_primary_key_atts + ,_text src_pk_att_vals_array + ,_text tgt_pk_att_vals_array) RETURNS text + +Inputs + + relname + + any relation name; + e.g. 'foobar' + + primary_key_attnums + + vector of primary key attnums (1 based, see pg_index.indkey); + e.g. '1 2' + + num_primary_key_atts + + number of primary key attnums in the vector; e.g. 2 + + src_pk_att_vals_array + + array of primary key values, used to look up the local matching + tuple, the values of which are then used to construct the SQL + statement + + tgt_pk_att_vals_array + + array of primary key values, used to replace the local tuple + values in the SQL statement + +Outputs + + Returns text -- requested SQL statement + +Example usage + +test=# select dblink_build_sql_insert('foo','1 2',2,'{"1", "a"}','{"1", "b''a"}'); + dblink_build_sql_insert +-------------------------------------------------- + INSERT INTO foo(f1,f2,f3) VALUES('1','b''a','1') +(1 row) + +test=# select dblink_build_sql_delete('MyFoo','1 2',2,'{"1", "b"}'); + dblink_build_sql_delete +--------------------------------------------- + DELETE FROM "MyFoo" WHERE f1='1' AND f2='b' +(1 row) + +test=# select dblink_build_sql_update('foo','1 2',2,'{"1", "a"}','{"1", "b"}'); + dblink_build_sql_update +------------------------------------------------------------- + UPDATE foo SET f1='1',f2='b',f3='1' WHERE f1='1' AND f2='b' +(1 row) + + +================================================================== +Name + +dblink_current_query -- returns the current query string + +Synopsis + +dblink_current_query () RETURNS text + +Inputs + + None + +Outputs + + Returns text -- a copy of the currently executing query + +Example usage + +test=# select dblink_current_query() from (select dblink('dbname=template1','select oid, proname from pg_proc where proname = ''byteacat''') as f1) as t1; + dblink_current_query +----------------------------------------------------------------------------------------------------------------------------------------------------- + select dblink_current_query() from (select dblink('dbname=template1','select oid, proname from pg_proc where proname = ''byteacat''') as f1) as t1; +(1 row) + + +================================================================== +Name + +dblink_replace -- replace all occurences of substring-a in the + input-string with substring-b + +Synopsis + +dblink_replace(text input-string, text substring-a, text substring-b) RETURNS text + +Inputs + + input-string + + the starting string, before replacement of substring-a + + substring-a + + the substring to find and replace + + substring-b + + the substring to be substituted in place of substring-a + +Outputs + + Returns text -- a copy of the starting string, but with all occurences of + substring-a replaced with substring-b + +Example usage + +test=# select dblink_replace('12345678901234567890','56','hello'); + dblink_replace +---------------------------- + 1234hello78901234hello7890 +(1 row) + +================================================================== + -- Joe Conway diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 4ab1315fa7..68fd2a797e 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -3,7 +3,8 @@ * * Functions returning results from a remote database * - * Copyright (c) Joseph Conway , 2001; + * Copyright (c) Joseph Conway , 2001, 2002, + * ALL RIGHTS RESERVED; * * Permission to use, copy, modify, and distribute this software and its * documentation for any purpose, without fee, and without a written agreement @@ -26,23 +27,23 @@ #include "dblink.h" +/* Global */ +List *res_id = NIL; +int res_id_index = 0; + PG_FUNCTION_INFO_V1(dblink); Datum dblink(PG_FUNCTION_ARGS) { - PGconn *conn = NULL; - PGresult *res = NULL; - dblink_results *results; - char *optstr; - char *sqlstatement; - char *curstr = "DECLARE mycursor CURSOR FOR "; - char *execstatement; - char *msg; - int ntuples = 0; - ReturnSetInfo *rsi; - - if (PG_ARGISNULL(0) || PG_ARGISNULL(1)) - elog(ERROR, "dblink: NULL arguments are not permitted"); + PGconn *conn = NULL; + PGresult *res = NULL; + dblink_results *results; + char *optstr; + char *sqlstatement; + char *execstatement; + char *msg; + int ntuples = 0; + ReturnSetInfo *rsi; if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo)) elog(ERROR, "dblink: function called in context that does not accept a set result"); @@ -61,21 +62,10 @@ dblink(PG_FUNCTION_ARGS) elog(ERROR, "dblink: connection error: %s", msg); } - res = PQexec(conn, "BEGIN"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - { - msg = pstrdup(PQerrorMessage(conn)); - PQclear(res); - PQfinish(conn); - elog(ERROR, "dblink: begin error: %s", msg); - } - PQclear(res); - - execstatement = (char *) palloc(strlen(curstr) + strlen(sqlstatement) + 1); + execstatement = (char *) palloc(strlen(sqlstatement) + 1); if (execstatement != NULL) { - strcpy(execstatement, curstr); - strcat(execstatement, sqlstatement); + strcpy(execstatement, sqlstatement); strcat(execstatement, "\0"); } else @@ -94,70 +84,36 @@ dblink(PG_FUNCTION_ARGS) /* * got results, start fetching them */ - PQclear(res); - - res = PQexec(conn, "FETCH ALL in mycursor"); - if (!res || PQresultStatus(res) != PGRES_TUPLES_OK) - { - msg = pstrdup(PQerrorMessage(conn)); - PQclear(res); - PQfinish(conn); - elog(ERROR, "dblink: sql error: %s", msg); - } - ntuples = PQntuples(res); - if (ntuples > 0) - { - - results = init_dblink_results(fcinfo->flinfo->fn_mcxt); - results->tup_num = 0; - results->res = res; - res = NULL; - - fcinfo->flinfo->fn_extra = (void *) results; - - results = NULL; - results = fcinfo->flinfo->fn_extra; - - /* close the cursor */ - res = PQexec(conn, "CLOSE mycursor"); - PQclear(res); - - /* commit the transaction */ - res = PQexec(conn, "COMMIT"); - PQclear(res); - - /* close the connection to the database and cleanup */ - PQfinish(conn); - - rsi = (ReturnSetInfo *) fcinfo->resultinfo; - rsi->isDone = ExprMultipleResult; - - PG_RETURN_POINTER(results); - - } - else - { + /* + * increment resource index + */ + res_id_index++; - PQclear(res); + results = init_dblink_results(fcinfo->flinfo->fn_mcxt); + results->tup_num = 0; + results->res_id_index = res_id_index; + results->res = res; - /* close the cursor */ - res = PQexec(conn, "CLOSE mycursor"); - PQclear(res); + /* + * Append node to res_id to hold pointer to results. + * Needed by dblink_tok to access the data + */ + append_res_ptr(results); - /* commit the transaction */ - res = PQexec(conn, "COMMIT"); - PQclear(res); + /* + * save pointer to results for the next function manager call + */ + fcinfo->flinfo->fn_extra = (void *) results; - /* close the connection to the database and cleanup */ - PQfinish(conn); + /* close the connection to the database and cleanup */ + PQfinish(conn); - rsi = (ReturnSetInfo *) fcinfo->resultinfo; - rsi->isDone = ExprEndResult; + rsi = (ReturnSetInfo *) fcinfo->resultinfo; + rsi->isDone = ExprMultipleResult; - PG_RETURN_NULL(); - } + PG_RETURN_INT32(res_id_index); } } else @@ -165,9 +121,10 @@ dblink(PG_FUNCTION_ARGS) /* * check for more results */ - results = fcinfo->flinfo->fn_extra; + results->tup_num++; + res_id_index = results->res_id_index; ntuples = PQntuples(results->res); if (results->tup_num < ntuples) @@ -179,18 +136,19 @@ dblink(PG_FUNCTION_ARGS) rsi = (ReturnSetInfo *) fcinfo->resultinfo; rsi->isDone = ExprMultipleResult; - PG_RETURN_POINTER(results); - + PG_RETURN_INT32(res_id_index); } else { /* * or if no more, clean things up */ - results = fcinfo->flinfo->fn_extra; + remove_res_ptr(results); PQclear(results->res); + pfree(results); + fcinfo->flinfo->fn_extra = NULL; rsi = (ReturnSetInfo *) fcinfo->resultinfo; rsi->isDone = ExprEndResult; @@ -214,36 +172,37 @@ Datum dblink_tok(PG_FUNCTION_ARGS) { dblink_results *results; - int fldnum; - text *result_text; - char *result; - int nfields = 0; - int text_len = 0; - - if (PG_ARGISNULL(0) || PG_ARGISNULL(1)) - elog(ERROR, "dblink: NULL arguments are not permitted"); + int fldnum; + text *result_text; + char *result; + int nfields = 0; + int text_len = 0; - results = (dblink_results *) PG_GETARG_POINTER(0); + results = get_res_ptr(PG_GETARG_INT32(0)); if (results == NULL) - elog(ERROR, "dblink: function called with invalid result pointer"); + { + if (res_id != NIL) + { + freeList(res_id); + res_id = NIL; + res_id_index = 0; + } + + elog(ERROR, "dblink_tok: function called with invalid resource id"); + } fldnum = PG_GETARG_INT32(1); if (fldnum < 0) - elog(ERROR, "dblink: field number < 0 not permitted"); + elog(ERROR, "dblink_tok: field number < 0 not permitted"); nfields = PQnfields(results->res); if (fldnum > (nfields - 1)) - elog(ERROR, "dblink: field number %d does not exist", fldnum); + elog(ERROR, "dblink_tok: field number %d does not exist", fldnum); if (PQgetisnull(results->res, results->tup_num, fldnum) == 1) - { - PG_RETURN_NULL(); - - } else { - text_len = PQgetlength(results->res, results->tup_num, fldnum); result = (char *) palloc(text_len + 1); @@ -259,12 +218,621 @@ dblink_tok(PG_FUNCTION_ARGS) result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result))); PG_RETURN_TEXT_P(result_text); + } +} + +/* + * dblink_strtok + * parse input string + * return ord item (0 based) + * based on provided field separator + */ +PG_FUNCTION_INFO_V1(dblink_strtok); +Datum +dblink_strtok(PG_FUNCTION_ARGS) +{ + char *fldtext; + char *fldsep; + int fldnum; + char *buffer; + text *result_text; + + fldtext = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0)))); + fldsep = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1)))); + fldnum = PG_GETARG_INT32(2); + + if (fldtext[0] == '\0') + { + elog(ERROR, "get_strtok: blank list not permitted"); + } + if (fldsep[0] == '\0') + { + elog(ERROR, "get_strtok: blank field separator not permitted"); + } + + buffer = get_strtok(fldtext, fldsep, fldnum); + + pfree(fldtext); + pfree(fldsep); + + if (buffer == NULL) + { + PG_RETURN_NULL(); + } + else + { + result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(buffer))); + pfree(buffer); + + PG_RETURN_TEXT_P(result_text); + } +} + + +/* + * dblink_get_pkey + * + * Return comma delimited list of primary key + * fields for the supplied relation, + * or NULL if none exists. + */ +PG_FUNCTION_INFO_V1(dblink_get_pkey); +Datum +dblink_get_pkey(PG_FUNCTION_ARGS) +{ + char *relname; + Oid relid; + char **result; + text *result_text; + int16 numatts; + ReturnSetInfo *rsi; + dblink_array_results *ret_set; + + if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo)) + elog(ERROR, "dblink: function called in context that does not accept a set result"); + + if (fcinfo->flinfo->fn_extra == NULL) + { + relname = NameStr(*PG_GETARG_NAME(0)); + + /* + * Convert relname to rel OID. + */ + relid = get_relid_from_relname(relname); + if (!OidIsValid(relid)) + elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist", + relname); + + /* + * get an array of attnums. + */ + result = get_pkey_attnames(relid, &numatts); + + if ((result != NULL) && (numatts > 0)) + { + ret_set = init_dblink_array_results(fcinfo->flinfo->fn_mcxt); + + ret_set->elem_num = 0; + ret_set->num_elems = numatts; + ret_set->res = result; + + fcinfo->flinfo->fn_extra = (void *) ret_set; + + rsi = (ReturnSetInfo *) fcinfo->resultinfo; + rsi->isDone = ExprMultipleResult; + + result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num]))); + + PG_RETURN_TEXT_P(result_text); + } + else + { + rsi = (ReturnSetInfo *) fcinfo->resultinfo; + rsi->isDone = ExprEndResult; + + PG_RETURN_NULL(); + } + } + else + { + /* + * check for more results + */ + ret_set = fcinfo->flinfo->fn_extra; + ret_set->elem_num++; + result = ret_set->res; + + if (ret_set->elem_num < ret_set->num_elems) + { + /* + * fetch next one + */ + rsi = (ReturnSetInfo *) fcinfo->resultinfo; + rsi->isDone = ExprMultipleResult; + + result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num]))); + PG_RETURN_TEXT_P(result_text); + } + else + { + int i; + + /* + * or if no more, clean things up + */ + for (i = 0; i < ret_set->num_elems; i++) + pfree(result[i]); + + pfree(ret_set->res); + pfree(ret_set); + + rsi = (ReturnSetInfo *) fcinfo->resultinfo; + rsi->isDone = ExprEndResult; + + PG_RETURN_NULL(); + } + } + PG_RETURN_NULL(); +} + + +/* + * dblink_last_oid + * return last inserted oid + */ +PG_FUNCTION_INFO_V1(dblink_last_oid); +Datum +dblink_last_oid(PG_FUNCTION_ARGS) +{ + dblink_results *results; + + results = get_res_ptr(PG_GETARG_INT32(0)); + if (results == NULL) + { + if (res_id != NIL) + { + freeList(res_id); + res_id = NIL; + res_id_index = 0; + } + + elog(ERROR, "dblink_tok: function called with invalid resource id"); + } + + PG_RETURN_OID(PQoidValue(results->res)); +} + + +/* + * dblink_build_sql_insert + * + * Used to generate an SQL insert statement + * based on an existing tuple in a local relation. + * This is useful for selectively replicating data + * to another server via dblink. + * + * API: + * - name of local table of interest + * - an int2vector of attnums which will be used + * to identify the local tuple of interest + * - number of attnums in pkattnums + * - text array of key values which will be used + * to identify the local tuple of interest + * - text array of key values which will be used + * to build the string for execution remotely. These are substituted + * for their counterparts in src_pkattvals_arry + */ +PG_FUNCTION_INFO_V1(dblink_build_sql_insert); +Datum +dblink_build_sql_insert(PG_FUNCTION_ARGS) +{ + Oid relid; + char *relname; + int16 *pkattnums; + int16 pknumatts; + char **src_pkattvals; + char **tgt_pkattvals; + ArrayType *src_pkattvals_arry; + ArrayType *tgt_pkattvals_arry; + int src_ndim; + int *src_dim; + int src_nitems; + int tgt_ndim; + int *tgt_dim; + int tgt_nitems; + int i; + char *ptr; + char *sql; + text *sql_text; + + relname = NameStr(*PG_GETARG_NAME(0)); + + /* + * Convert relname to rel OID. + */ + relid = get_relid_from_relname(relname); + if (!OidIsValid(relid)) + elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist", + relname); + + pkattnums = (int16 *) PG_GETARG_POINTER(1); + pknumatts = PG_GETARG_INT16(2); + /* + * There should be at least one key attribute + */ + if (pknumatts == 0) + elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0."); + + src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3); + tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4); + + /* + * Source array is made up of key values that will be used to + * locate the tuple of interest from the local system. + */ + src_ndim = ARR_NDIM(src_pkattvals_arry); + src_dim = ARR_DIMS(src_pkattvals_arry); + src_nitems = ArrayGetNItems(src_ndim, src_dim); + + /* + * There should be one source array key value for each key attnum + */ + if (src_nitems != pknumatts) + elog(ERROR, "dblink_build_sql_insert: source key array length does not match number of key attributes."); + + /* + * get array of pointers to c-strings from the input source array + */ + src_pkattvals = (char **) palloc(src_nitems * sizeof(char *)); + ptr = ARR_DATA_PTR(src_pkattvals_arry); + for (i = 0; i < src_nitems; i++) + { + src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); + ptr += INTALIGN(*(int32 *) ptr); + } + + /* + * Target array is made up of key values that will be used to + * build the SQL string for use on the remote system. + */ + tgt_ndim = ARR_NDIM(tgt_pkattvals_arry); + tgt_dim = ARR_DIMS(tgt_pkattvals_arry); + tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim); + + /* + * There should be one target array key value for each key attnum + */ + if (tgt_nitems != pknumatts) + elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes."); + + /* + * get array of pointers to c-strings from the input target array + */ + tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *)); + ptr = ARR_DATA_PTR(tgt_pkattvals_arry); + for (i = 0; i < tgt_nitems; i++) + { + tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); + ptr += INTALIGN(*(int32 *) ptr); + } + + /* + * Prep work is finally done. Go get the SQL string. + */ + sql = get_sql_insert(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals); + + /* + * Make it into TEXT for return to the client + */ + sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql))); + + /* + * And send it + */ + PG_RETURN_TEXT_P(sql_text); +} + + +/* + * dblink_build_sql_delete + * + * Used to generate an SQL delete statement. + * This is useful for selectively replicating a + * delete to another server via dblink. + * + * API: + * - name of remote table of interest + * - an int2vector of attnums which will be used + * to identify the remote tuple of interest + * - number of attnums in pkattnums + * - text array of key values which will be used + * to build the string for execution remotely. + */ +PG_FUNCTION_INFO_V1(dblink_build_sql_delete); +Datum +dblink_build_sql_delete(PG_FUNCTION_ARGS) +{ + Oid relid; + char *relname; + int16 *pkattnums; + int16 pknumatts; + char **tgt_pkattvals; + ArrayType *tgt_pkattvals_arry; + int tgt_ndim; + int *tgt_dim; + int tgt_nitems; + int i; + char *ptr; + char *sql; + text *sql_text; + + relname = NameStr(*PG_GETARG_NAME(0)); + + /* + * Convert relname to rel OID. + */ + relid = get_relid_from_relname(relname); + if (!OidIsValid(relid)) + elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist", + relname); + + pkattnums = (int16 *) PG_GETARG_POINTER(1); + pknumatts = PG_GETARG_INT16(2); + /* + * There should be at least one key attribute + */ + if (pknumatts == 0) + elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0."); + + tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3); + + /* + * Target array is made up of key values that will be used to + * build the SQL string for use on the remote system. + */ + tgt_ndim = ARR_NDIM(tgt_pkattvals_arry); + tgt_dim = ARR_DIMS(tgt_pkattvals_arry); + tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim); + + /* + * There should be one target array key value for each key attnum + */ + if (tgt_nitems != pknumatts) + elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes."); + + /* + * get array of pointers to c-strings from the input target array + */ + tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *)); + ptr = ARR_DATA_PTR(tgt_pkattvals_arry); + for (i = 0; i < tgt_nitems; i++) + { + tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); + ptr += INTALIGN(*(int32 *) ptr); + } + + /* + * Prep work is finally done. Go get the SQL string. + */ + sql = get_sql_delete(relid, pkattnums, pknumatts, tgt_pkattvals); + + /* + * Make it into TEXT for return to the client + */ + sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql))); + + /* + * And send it + */ + PG_RETURN_TEXT_P(sql_text); +} + + +/* + * dblink_build_sql_update + * + * Used to generate an SQL update statement + * based on an existing tuple in a local relation. + * This is useful for selectively replicating data + * to another server via dblink. + * + * API: + * - name of local table of interest + * - an int2vector of attnums which will be used + * to identify the local tuple of interest + * - number of attnums in pkattnums + * - text array of key values which will be used + * to identify the local tuple of interest + * - text array of key values which will be used + * to build the string for execution remotely. These are substituted + * for their counterparts in src_pkattvals_arry + */ +PG_FUNCTION_INFO_V1(dblink_build_sql_update); +Datum +dblink_build_sql_update(PG_FUNCTION_ARGS) +{ + Oid relid; + char *relname; + int16 *pkattnums; + int16 pknumatts; + char **src_pkattvals; + char **tgt_pkattvals; + ArrayType *src_pkattvals_arry; + ArrayType *tgt_pkattvals_arry; + int src_ndim; + int *src_dim; + int src_nitems; + int tgt_ndim; + int *tgt_dim; + int tgt_nitems; + int i; + char *ptr; + char *sql; + text *sql_text; + + relname = NameStr(*PG_GETARG_NAME(0)); + + /* + * Convert relname to rel OID. + */ + relid = get_relid_from_relname(relname); + if (!OidIsValid(relid)) + elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist", + relname); + + pkattnums = (int16 *) PG_GETARG_POINTER(1); + pknumatts = PG_GETARG_INT16(2); + /* + * There should be one source array key values for each key attnum + */ + if (pknumatts == 0) + elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0."); + + src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3); + tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4); + + /* + * Source array is made up of key values that will be used to + * locate the tuple of interest from the local system. + */ + src_ndim = ARR_NDIM(src_pkattvals_arry); + src_dim = ARR_DIMS(src_pkattvals_arry); + src_nitems = ArrayGetNItems(src_ndim, src_dim); + + /* + * There should be one source array key value for each key attnum + */ + if (src_nitems != pknumatts) + elog(ERROR, "dblink_build_sql_insert: source key array length does not match number of key attributes."); + + /* + * get array of pointers to c-strings from the input source array + */ + src_pkattvals = (char **) palloc(src_nitems * sizeof(char *)); + ptr = ARR_DATA_PTR(src_pkattvals_arry); + for (i = 0; i < src_nitems; i++) + { + src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); + ptr += INTALIGN(*(int32 *) ptr); + } + + /* + * Target array is made up of key values that will be used to + * build the SQL string for use on the remote system. + */ + tgt_ndim = ARR_NDIM(tgt_pkattvals_arry); + tgt_dim = ARR_DIMS(tgt_pkattvals_arry); + tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim); + + /* + * There should be one target array key value for each key attnum + */ + if (tgt_nitems != pknumatts) + elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes."); + + /* + * get array of pointers to c-strings from the input target array + */ + tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *)); + ptr = ARR_DATA_PTR(tgt_pkattvals_arry); + for (i = 0; i < tgt_nitems; i++) + { + tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); + ptr += INTALIGN(*(int32 *) ptr); } + + /* + * Prep work is finally done. Go get the SQL string. + */ + sql = get_sql_update(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals); + + /* + * Make it into TEXT for return to the client + */ + sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql))); + + /* + * And send it + */ + PG_RETURN_TEXT_P(sql_text); } /* + * dblink_current_query + * return the current query string + * to allow its use in (among other things) + * rewrite rules + */ +PG_FUNCTION_INFO_V1(dblink_current_query); +Datum +dblink_current_query(PG_FUNCTION_ARGS) +{ + text *result_text; + + result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(debug_query_string))); + PG_RETURN_TEXT_P(result_text); +} + + +/* + * dblink_replace_text + * replace all occurences of 'old_sub_str' in 'orig_str' + * with 'new_sub_str' to form 'new_str' + * + * returns 'orig_str' if 'old_sub_str' == '' or 'orig_str' == '' + * otherwise returns 'new_str' + */ +PG_FUNCTION_INFO_V1(dblink_replace_text); +Datum +dblink_replace_text(PG_FUNCTION_ARGS) +{ + text *left_text; + text *right_text; + text *buf_text; + text *ret_text; + char *ret_str; + int curr_posn; + text *src_text = PG_GETARG_TEXT_P(0); + int src_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(src_text))); + text *from_sub_text = PG_GETARG_TEXT_P(1); + int from_sub_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(from_sub_text))); + text *to_sub_text = PG_GETARG_TEXT_P(2); + char *to_sub_str = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(to_sub_text))); + StringInfo str = makeStringInfo(); + + if (src_text_len == 0 || from_sub_text_len == 0) + PG_RETURN_TEXT_P(src_text); + + buf_text = DatumGetTextPCopy(PointerGetDatum(src_text)); + curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))); + + while (curr_posn > 0) + { + left_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text), 1, DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) - 1)); + right_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text), DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) + from_sub_text_len, -1)); + + appendStringInfo(str, DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(left_text)))); + appendStringInfo(str, to_sub_str); + + pfree(buf_text); + pfree(left_text); + buf_text = right_text; + curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))); + } + + appendStringInfo(str, DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(buf_text)))); + pfree(buf_text); + + ret_str = pstrdup(str->data); + ret_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(ret_str))); + + PG_RETURN_TEXT_P(ret_text); +} + + +/************************************************************* * internal functions */ @@ -285,9 +853,556 @@ init_dblink_results(MemoryContext fn_mcxt) MemSet(retval, 0, sizeof(dblink_results)); retval->tup_num = -1; + retval->res_id_index =-1; retval->res = NULL; MemoryContextSwitchTo(oldcontext); return retval; } + + +/* + * init_dblink_array_results + * - create an empty dblink_array_results data structure + */ +dblink_array_results * +init_dblink_array_results(MemoryContext fn_mcxt) +{ + MemoryContext oldcontext; + dblink_array_results *retval; + + oldcontext = MemoryContextSwitchTo(fn_mcxt); + + retval = (dblink_array_results *) palloc(sizeof(dblink_array_results)); + MemSet(retval, 0, sizeof(dblink_array_results)); + + retval->elem_num = -1; + retval->num_elems = 0; + retval->res = NULL; + + MemoryContextSwitchTo(oldcontext); + + return retval; +} + +/* + * get_pkey_attnames + * + * Get the primary key attnames for the given relation. + * Return NULL, and set numatts = 0, if no primary key exists. + */ +char ** +get_pkey_attnames(Oid relid, int16 *numatts) +{ + Relation indexRelation; + ScanKeyData entry; + HeapScanDesc scan; + HeapTuple indexTuple; + int i; + char **result = NULL; + Relation rel; + TupleDesc tupdesc; + + /* + * Open relation using relid, get tupdesc + */ + rel = relation_open(relid, AccessShareLock); + tupdesc = rel->rd_att; + + /* + * Initialize numatts to 0 in case no primary key + * exists + */ + *numatts = 0; + + /* + * Use relid to get all related indexes + */ + indexRelation = heap_openr(IndexRelationName, AccessShareLock); + ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indrelid, + F_OIDEQ, ObjectIdGetDatum(relid)); + scan = heap_beginscan(indexRelation, false, SnapshotNow, + 1, &entry); + + while (HeapTupleIsValid(indexTuple = heap_getnext(scan, 0))) + { + Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple); + + /* + * We're only interested if it is the primary key + */ + if (index->indisprimary == TRUE) + { + i = 0; + while (index->indkey[i++] != 0) + (*numatts)++; + + if (*numatts > 0) + { + result = (char **) palloc(*numatts * sizeof(char *)); + for (i = 0; i < *numatts; i++) + result[i] = SPI_fname(tupdesc, index->indkey[i]); + } + break; + } + } + heap_endscan(scan); + heap_close(indexRelation, AccessShareLock); + relation_close(rel, AccessShareLock); + + return result; +} + + +/* + * get_strtok + * + * parse input string + * return ord item (0 based) + * based on provided field separator + */ +char * +get_strtok(char *fldtext, char *fldsep, int fldnum) +{ + int j = 0; + char *result; + + if (fldnum < 0) + { + elog(ERROR, "get_strtok: field number < 0 not permitted"); + } + + if (fldsep[0] == '\0') + { + elog(ERROR, "get_strtok: blank field separator not permitted"); + } + + result = strtok(fldtext, fldsep); + for (j = 1; j < fldnum + 1; j++) + { + result = strtok(NULL, fldsep); + if (result == NULL) + return NULL; + } + + return pstrdup(result); +} + +char * +get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals) +{ + Relation rel; + char *relname; + HeapTuple tuple; + TupleDesc tupdesc; + int natts; + StringInfo str = makeStringInfo(); + char *sql = NULL; + char *val = NULL; + int16 key; + unsigned int i; + + /* + * Open relation using relid + */ + rel = relation_open(relid, AccessShareLock); + relname = RelationGetRelationName(rel); + tupdesc = rel->rd_att; + natts = tupdesc->natts; + + tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals); + + appendStringInfo(str, "INSERT INTO %s(", quote_ident_cstr(relname)); + for (i = 0; i < natts; i++) + { + if (i > 0) + appendStringInfo(str, ","); + + appendStringInfo(str, NameStr(tupdesc->attrs[i]->attname)); + } + + appendStringInfo(str, ") VALUES("); + + /* + * remember attvals are 1 based + */ + for (i = 0; i < natts; i++) + { + if (i > 0) + appendStringInfo(str, ","); + + if (tgt_pkattvals != NULL) + key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1); + else + key = -1; + + if (key > -1) + val = pstrdup(tgt_pkattvals[key]); + else + val = SPI_getvalue(tuple, tupdesc, i + 1); + + if (val != NULL) + { + appendStringInfo(str, quote_literal_cstr(val)); + pfree(val); + } + else + appendStringInfo(str, "NULL"); + } + appendStringInfo(str, ")"); + + sql = pstrdup(str->data); + pfree(str->data); + pfree(str); + relation_close(rel, AccessShareLock); + + return (sql); +} + +char * +get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals) +{ + Relation rel; + char *relname; + TupleDesc tupdesc; + int natts; + StringInfo str = makeStringInfo(); + char *sql = NULL; + char *val = NULL; + unsigned int i; + + /* + * Open relation using relid + */ + rel = relation_open(relid, AccessShareLock); + relname = RelationGetRelationName(rel); + tupdesc = rel->rd_att; + natts = tupdesc->natts; + + appendStringInfo(str, "DELETE FROM %s WHERE ", quote_ident_cstr(relname)); + for (i = 0; i < pknumatts; i++) + { + int16 pkattnum = pkattnums[i]; + + if (i > 0) + appendStringInfo(str, " AND "); + + appendStringInfo(str, NameStr(tupdesc->attrs[pkattnum - 1]->attname)); + + if (tgt_pkattvals != NULL) + val = pstrdup(tgt_pkattvals[i]); + else + elog(ERROR, "Target key array must not be NULL"); + + if (val != NULL) + { + appendStringInfo(str, "="); + appendStringInfo(str, quote_literal_cstr(val)); + pfree(val); + } + else + appendStringInfo(str, "IS NULL"); + } + + sql = pstrdup(str->data); + pfree(str->data); + pfree(str); + relation_close(rel, AccessShareLock); + + return (sql); +} + +char * +get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals) +{ + Relation rel; + char *relname; + HeapTuple tuple; + TupleDesc tupdesc; + int natts; + StringInfo str = makeStringInfo(); + char *sql = NULL; + char *val = NULL; + int16 key; + int i; + + /* + * Open relation using relid + */ + rel = relation_open(relid, AccessShareLock); + relname = RelationGetRelationName(rel); + tupdesc = rel->rd_att; + natts = tupdesc->natts; + + tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals); + + appendStringInfo(str, "UPDATE %s SET ", quote_ident_cstr(relname)); + + for (i = 0; i < natts; i++) + { + if (i > 0) + appendStringInfo(str, ","); + + appendStringInfo(str, NameStr(tupdesc->attrs[i]->attname)); + appendStringInfo(str, "="); + + if (tgt_pkattvals != NULL) + key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1); + else + key = -1; + + if (key > -1) + val = pstrdup(tgt_pkattvals[key]); + else + val = SPI_getvalue(tuple, tupdesc, i + 1); + + if (val != NULL) + { + appendStringInfo(str, quote_literal_cstr(val)); + pfree(val); + } + else + appendStringInfo(str, "NULL"); + } + + appendStringInfo(str, " WHERE "); + + for (i = 0; i < pknumatts; i++) + { + int16 pkattnum = pkattnums[i]; + + if (i > 0) + appendStringInfo(str, " AND "); + + appendStringInfo(str, NameStr(tupdesc->attrs[pkattnum - 1]->attname)); + + if (tgt_pkattvals != NULL) + val = pstrdup(tgt_pkattvals[i]); + else + val = SPI_getvalue(tuple, tupdesc, pkattnum); + + if (val != NULL) + { + appendStringInfo(str, "="); + appendStringInfo(str, quote_literal_cstr(val)); + pfree(val); + } + else + appendStringInfo(str, "IS NULL"); + } + + sql = pstrdup(str->data); + pfree(str->data); + pfree(str); + relation_close(rel, AccessShareLock); + + return (sql); +} + +/* + * Return a properly quoted literal value. + * Uses quote_literal in quote.c + */ +static char * +quote_literal_cstr(char *rawstr) +{ + text *rawstr_text; + text *result_text; + char *result; + + rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr))); + result_text = DatumGetTextP(DirectFunctionCall1(quote_literal, PointerGetDatum(rawstr_text))); + result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text))); + + return result; +} + +/* + * Return a properly quoted identifier. + * Uses quote_ident in quote.c + */ +static char * +quote_ident_cstr(char *rawstr) +{ + text *rawstr_text; + text *result_text; + char *result; + + rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr))); + result_text = DatumGetTextP(DirectFunctionCall1(quote_ident, PointerGetDatum(rawstr_text))); + result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text))); + + return result; +} + +int16 +get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key) +{ + int i; + + /* + * Not likely a long list anyway, so just scan for + * the value + */ + for (i = 0; i < pknumatts; i++) + if (key == pkattnums[i]) + return i; + + return -1; +} + +HeapTuple +get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals) +{ + Relation rel; + char *relname; + TupleDesc tupdesc; + StringInfo str = makeStringInfo(); + char *sql = NULL; + int ret; + HeapTuple tuple; + int i; + char *val = NULL; + + /* + * Open relation using relid + */ + rel = relation_open(relid, AccessShareLock); + relname = RelationGetRelationName(rel); + tupdesc = rel->rd_att; + + /* + * Connect to SPI manager + */ + if ((ret = SPI_connect()) < 0) + elog(ERROR, "get_tuple_of_interest: SPI_connect returned %d", ret); + + /* + * Build sql statement to look up tuple of interest + * Use src_pkattvals as the criteria. + */ + appendStringInfo(str, "SELECT * from %s WHERE ", relname); + + for (i = 0; i < pknumatts; i++) + { + int16 pkattnum = pkattnums[i]; + + if (i > 0) + appendStringInfo(str, " AND "); + + appendStringInfo(str, NameStr(tupdesc->attrs[pkattnum - 1]->attname)); + + val = pstrdup(src_pkattvals[i]); + if (val != NULL) + { + appendStringInfo(str, "="); + appendStringInfo(str, quote_literal_cstr(val)); + pfree(val); + } + else + appendStringInfo(str, "IS NULL"); + } + + sql = pstrdup(str->data); + pfree(str->data); + pfree(str); + /* + * Retrieve the desired tuple + */ + ret = SPI_exec(sql, 0); + pfree(sql); + + /* + * Only allow one qualifying tuple + */ + if ((ret == SPI_OK_SELECT) && (SPI_processed > 1)) + { + elog(ERROR, "get_tuple_of_interest: Source criteria may not match more than one record."); + } + else if (ret == SPI_OK_SELECT && SPI_processed == 1) + { + SPITupleTable *tuptable = SPI_tuptable; + tuple = SPI_copytuple(tuptable->vals[0]); + + return tuple; + } + else + { + /* + * no qualifying tuples + */ + return NULL; + } + + /* + * never reached, but keep compiler quiet + */ + return NULL; +} + +Oid +get_relid_from_relname(char *relname) +{ +#ifdef NamespaceRelationName + Oid relid; + + relid = RelnameGetRelid(relname); +#else + Relation rel; + Oid relid; + + rel = relation_openr(relname, AccessShareLock); + relid = RelationGetRelid(rel); + relation_close(rel, AccessShareLock); +#endif /* NamespaceRelationName */ + + return relid; +} + +dblink_results * +get_res_ptr(int32 res_id_index) +{ + List *ptr; + + /* + * short circuit empty list + */ + if(res_id == NIL) + return NULL; + + /* + * OK, should be good to go + */ + foreach(ptr, res_id) + { + dblink_results *this_res_id = (dblink_results *) lfirst(ptr); + if (this_res_id->res_id_index == res_id_index) + return this_res_id; + } + return NULL; +} + +/* + * Add node to global List res_id + */ +void +append_res_ptr(dblink_results *results) +{ + res_id = lappend(res_id, results); +} + +/* + * Remove node from global List + * using res_id_index + */ +void +remove_res_ptr(dblink_results *results) +{ + res_id = lremove(results, res_id); + + if (res_id == NIL) + res_id_index = 0; +} + + diff --git a/contrib/dblink/dblink.h b/contrib/dblink/dblink.h index 8c5a5bdc28..9be48ead2e 100644 --- a/contrib/dblink/dblink.h +++ b/contrib/dblink/dblink.h @@ -3,7 +3,8 @@ * * Functions returning results from a remote database * - * Copyright (c) Joseph Conway , 2001; + * Copyright (c) Joseph Conway , 2001, 2002, + * ALL RIGHTS RESERVED; * * Permission to use, copy, modify, and distribute this software and its * documentation for any purpose, without fee, and without a written agreement @@ -33,10 +34,31 @@ #include "libpq-int.h" #include "fmgr.h" #include "access/tupdesc.h" +#include "access/heapam.h" +#include "catalog/catname.h" +#include "catalog/pg_index.h" +#include "catalog/pg_type.h" #include "executor/executor.h" +#include "executor/spi.h" +#include "lib/stringinfo.h" #include "nodes/nodes.h" #include "nodes/execnodes.h" +#include "nodes/pg_list.h" +#include "parser/parse_type.h" +#include "tcop/tcopprot.h" #include "utils/builtins.h" +#include "utils/fmgroids.h" +#include "utils/array.h" +#include "utils/syscache.h" + +#ifdef NamespaceRelationName +#include "catalog/namespace.h" +#endif /* NamespaceRelationName */ + +/* + * Max SQL statement size + */ +#define DBLINK_MAX_SQLSTATE_SIZE 16384 /* * This struct holds the results of the remote query. @@ -49,22 +71,75 @@ typedef struct */ int tup_num; + /* + * resource index number for this context + */ + int res_id_index; + /* * the actual query results */ PGresult *res; - } dblink_results; + +/* + * This struct holds results in the form of an array. + * Use fn_extra to hold a pointer to it across calls + */ +typedef struct +{ + /* + * elem being accessed + */ + int elem_num; + + /* + * number of elems + */ + int num_elems; + + /* + * the actual array + */ + void *res; + +} dblink_array_results; + /* * External declarations */ extern Datum dblink(PG_FUNCTION_ARGS); extern Datum dblink_tok(PG_FUNCTION_ARGS); +extern Datum dblink_strtok(PG_FUNCTION_ARGS); +extern Datum dblink_get_pkey(PG_FUNCTION_ARGS); +extern Datum dblink_last_oid(PG_FUNCTION_ARGS); +extern Datum dblink_build_sql_insert(PG_FUNCTION_ARGS); +extern Datum dblink_build_sql_delete(PG_FUNCTION_ARGS); +extern Datum dblink_build_sql_update(PG_FUNCTION_ARGS); +extern Datum dblink_current_query(PG_FUNCTION_ARGS); +extern Datum dblink_replace_text(PG_FUNCTION_ARGS); /* * Internal declarations */ dblink_results *init_dblink_results(MemoryContext fn_mcxt); +dblink_array_results *init_dblink_array_results(MemoryContext fn_mcxt); +char **get_pkey_attnames(Oid relid, int16 *numatts); +char *get_strtok(char *fldtext, char *fldsep, int fldnum); +char *getvalue(HeapTuple tuple, TupleDesc tupdesc, int fnumber); +char *get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals); +char *get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals); +char *get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals); +static char *quote_literal_cstr(char *rawstr); +static char *quote_ident_cstr(char *rawstr); +int16 get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key); +HeapTuple get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals); +Oid get_relid_from_relname(char *relname); +dblink_results *get_res_ptr(int32 res_id_index); +void append_res_ptr(dblink_results *results); +void remove_res_ptr(dblink_results *results); + +extern char *debug_query_string; #endif /* DBLINK_H */ diff --git a/contrib/dblink/dblink.sql.in b/contrib/dblink/dblink.sql.in index 1615c00413..6b567b8cd5 100644 --- a/contrib/dblink/dblink.sql.in +++ b/contrib/dblink/dblink.sql.in @@ -1,5 +1,38 @@ -CREATE FUNCTION dblink (text,text) RETURNS setof int - AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c'; +CREATE OR REPLACE FUNCTION dblink (text,text) RETURNS setof int + AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c' + WITH (isstrict); -CREATE FUNCTION dblink_tok (int,int) RETURNS text - AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c'; +CREATE OR REPLACE FUNCTION dblink_tok (int,int) RETURNS text + AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c' + WITH (isstrict); + +CREATE OR REPLACE FUNCTION dblink_strtok (text,text,int) RETURNS text + AS 'MODULE_PATHNAME','dblink_strtok' LANGUAGE 'c' + WITH (iscachable, isstrict); + +CREATE OR REPLACE FUNCTION dblink_get_pkey (name) RETURNS setof text + AS 'MODULE_PATHNAME','dblink_get_pkey' LANGUAGE 'c' + WITH (isstrict); + +CREATE OR REPLACE FUNCTION dblink_last_oid (int) RETURNS oid + AS 'MODULE_PATHNAME','dblink_last_oid' LANGUAGE 'c' + WITH (isstrict); + +CREATE OR REPLACE FUNCTION dblink_build_sql_insert (name, int2vector, int2, _text, _text) RETURNS text + AS 'MODULE_PATHNAME','dblink_build_sql_insert' LANGUAGE 'c' + WITH (isstrict); + +CREATE OR REPLACE FUNCTION dblink_build_sql_delete (name, int2vector, int2, _text) RETURNS text + AS 'MODULE_PATHNAME','dblink_build_sql_delete' LANGUAGE 'c' + WITH (isstrict); + +CREATE OR REPLACE FUNCTION dblink_build_sql_update (name, int2vector, int2, _text, _text) RETURNS text + AS 'MODULE_PATHNAME','dblink_build_sql_update' LANGUAGE 'c' + WITH (isstrict); + +CREATE OR REPLACE FUNCTION dblink_current_query () RETURNS text + AS 'MODULE_PATHNAME','dblink_current_query' LANGUAGE 'c'; + +CREATE OR REPLACE FUNCTION dblink_replace (text,text,text) RETURNS text + AS 'MODULE_PATHNAME','dblink_replace_text' LANGUAGE 'c' + WITH (iscachable, isstrict); -- 2.40.0